Merge pull request #107 from vector-im/bwindels/rate-limiting-for-all

Rate-limiting for all events
This commit is contained in:
Bruno Windels 2020-09-22 14:46:18 +00:00 committed by GitHub
commit fe8c63b5f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 258 additions and 245 deletions

View File

@ -81,7 +81,7 @@ async function loadOlmWorker(paths) {
// see https://github.com/rollup/plugins/tree/master/packages/multi-entry
export async function main(container, paths, legacyExtras) {
try {
// TODO: add .legacy to body in (legacy)platform.createAndMountRootView; and use body:not(.legacy) if needed for modern stuff
// TODO: add .legacy to .hydrogen (container) in (legacy)platform.createAndMountRootView; and use .hydrogen:not(.legacy) if needed for modern stuff
const isIE11 = !!window.MSInputMethodContext && !!document.documentMode;
if (isIE11) {
document.body.className += " ie11";
@ -104,7 +104,7 @@ export async function main(container, paths, legacyExtras) {
} else {
request = xhrRequest;
}
const sessionInfoStorage = new SessionInfoStorage("brawl_sessions_v1");
const sessionInfoStorage = new SessionInfoStorage("hydrogen_sessions_v1");
const storageFactory = new StorageFactory();
const olmPromise = loadOlm(paths.olm);

View File

@ -1,149 +0,0 @@
/*
Copyright 2020 Bruno Windels <bruno@windels.cloud>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {Platform} from "../Platform.js";
import {HomeServerError, ConnectionError} from "./error.js";
export class RateLimitingBackoff {
constructor() {
this._remainingRateLimitedRequest = 0;
}
async waitAfterLimitExceeded(retryAfterMs) {
// this._remainingRateLimitedRequest = 5;
// if (typeof retryAfterMs !== "number") {
// } else {
// }
if (!retryAfterMs) {
retryAfterMs = 5000;
}
await Platform.delay(retryAfterMs);
}
// do we have to know about succeeding requests?
// we can just
async waitForNextSend() {
// this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1);
}
}
/*
this represents a slot to do one rate limited api call.
because rate-limiting is handled here, it should only
try to do one call, so the SendScheduler can safely
retry if the call ends up being rate limited.
This is also why we have this abstraction it hsApi is not
passed straight to SendQueue when it is its turn to send.
e.g. we wouldn't want to repeat the callback in SendQueue that could
have other side-effects before the call to hsApi that we wouldn't want
repeated (setting up progress handlers for file uploads,
... a UI update to say it started sending?
... updating storage would probably only happen once the call succeeded
... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried)
) maybe it is a bit overengineering, but lets stick with it for now.
At least the above is a clear definition why we have this class
*/
//class SendSlot -- obsolete
export class SendScheduler {
constructor({hsApi, backoff}) {
this._hsApi = hsApi;
this._sendRequests = [];
this._sendScheduled = false;
this._stopped = false;
this._waitTime = 0;
this._backoff = backoff;
/*
we should have some sort of flag here that we enable
after all the rooms have been notified that they can resume
sending, so that from session, we can say scheduler.enable();
this way, when we have better scheduling, it won't be first come,
first serve, when there are a lot of events in different rooms to send,
but we can apply some priorization of who should go first
*/
// this._enabled;
}
stop() {
// TODO: abort current requests and set offline
}
start() {
this._stopped = false;
}
get isStarted() {
return !this._stopped;
}
// this should really be per roomId to avoid head-of-line blocking
//
// takes a callback instead of returning a promise with the slot
// to make sure the scheduler doesn't get blocked by a slot that is not consumed
request(sendCallback) {
let request;
const promise = new Promise((resolve, reject) => request = {resolve, reject, sendCallback});
this._sendRequests.push(request);
if (!this._sendScheduled && !this._stopped) {
this._sendLoop();
}
return promise;
}
async _sendLoop() {
while (this._sendRequests.length) {
const request = this._sendRequests.shift();
let result;
try {
// this can throw!
result = await this._doSend(request.sendCallback);
} catch (err) {
if (err instanceof ConnectionError) {
// we're offline, everybody will have
// to re-request slots when we come back online
this._stopped = true;
for (const r of this._sendRequests) {
r.reject(err);
}
this._sendRequests = [];
}
console.error("error for request", err);
request.reject(err);
break;
}
request.resolve(result);
}
// do next here instead of in _doSend
}
async _doSend(sendCallback) {
this._sendScheduled = false;
await this._backoff.waitForNextSend();
// loop is left by return or throw
while (true) { // eslint-disable-line no-constant-condition
try {
return await sendCallback(this._hsApi);
} catch (err) {
if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") {
await this._backoff.waitAfterLimitExceeded(err.retry_after_ms);
} else {
throw err;
}
}
}
}
}

View File

@ -16,7 +16,6 @@ limitations under the License.
import {Room} from "./room/Room.js";
import { ObservableMap } from "../observable/index.js";
import { SendScheduler, RateLimitingBackoff } from "./SendScheduler.js";
import {User} from "./User.js";
import {DeviceMessageHandler} from "./DeviceMessageHandler.js";
import {Account as E2EEAccount} from "./e2ee/Account.js";
@ -42,14 +41,14 @@ const PICKLE_KEY = "DEFAULT_KEY";
export class Session {
// sessionInfo contains deviceId, userId and homeServer
constructor({clock, storage, hsApi, sessionInfo, olm, olmWorker, cryptoDriver}) {
constructor({clock, storage, hsApi, sessionInfo, olm, olmWorker, cryptoDriver, mediaRepository}) {
this._clock = clock;
this._storage = storage;
this._hsApi = hsApi;
this._mediaRepository = mediaRepository;
this._syncInfo = null;
this._sessionInfo = sessionInfo;
this._rooms = new ObservableMap();
this._sendScheduler = new SendScheduler({hsApi, backoff: new RateLimitingBackoff()});
this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params);
this._user = new User(sessionInfo.userId);
this._deviceMessageHandler = new DeviceMessageHandler({storage});
@ -266,13 +265,8 @@ export class Session {
}));
}
get isStarted() {
return this._sendScheduler.isStarted;
}
dispose() {
this._olmWorker?.dispose();
this._sendScheduler.stop();
this._sessionBackup?.dispose();
for (const room of this._rooms.values()) {
room.dispose();
@ -296,7 +290,6 @@ export class Session {
const operations = await opsTxn.operations.getAll();
const operationsByScope = groupBy(operations, o => o.scope);
this._sendScheduler.start();
for (const [, room] of this._rooms) {
let roomOperationsByType;
const roomOperations = operationsByScope.get(room.id);
@ -331,7 +324,7 @@ export class Session {
storage: this._storage,
emitCollectionChange: this._roomUpdateCallback,
hsApi: this._hsApi,
sendScheduler: this._sendScheduler,
mediaRepository: this._mediaRepository,
pendingEvents,
user: this._user,
createRoomEncryption: this._createRoomEncryption,

View File

@ -19,6 +19,8 @@ import {ObservableValue} from "../observable/ObservableValue.js";
import {HomeServerApi} from "./net/HomeServerApi.js";
import {Reconnector, ConnectionStatus} from "./net/Reconnector.js";
import {ExponentialRetryDelay} from "./net/ExponentialRetryDelay.js";
import {MediaRepository} from "./net/MediaRepository.js";
import {RequestScheduler} from "./net/RequestScheduler.js";
import {HomeServerError, ConnectionError, AbortError} from "./error.js";
import {Sync, SyncStatus} from "./Sync.js";
import {Session} from "./Session.js";
@ -49,7 +51,7 @@ export class SessionContainer {
this._request = request;
this._storageFactory = storageFactory;
this._sessionInfoStorage = sessionInfoStorage;
this._sessionStartedByReconnector = false;
this._status = new ObservableValue(LoadStatus.NotLoading);
this._error = null;
this._loginFailure = null;
@ -58,6 +60,7 @@ export class SessionContainer {
this._sync = null;
this._sessionId = null;
this._storage = null;
this._requestScheduler = null;
this._olmPromise = olmPromise;
this._workerPromise = workerPromise;
this._cryptoDriver = cryptoDriver;
@ -132,6 +135,7 @@ export class SessionContainer {
}
async _loadSessionInfo(sessionInfo, isNewLogin) {
this._sessionStartedByReconnector = false;
this._status.set(LoadStatus.Loading);
this._reconnector = new Reconnector({
onlineStatus: this._onlineStatus,
@ -158,18 +162,30 @@ export class SessionContainer {
if (this._workerPromise) {
olmWorker = await this._workerPromise;
}
this._session = new Session({storage: this._storage,
sessionInfo: filteredSessionInfo, hsApi, olm,
clock: this._clock, olmWorker, cryptoDriver: this._cryptoDriver});
this._requestScheduler = new RequestScheduler({hsApi, clock: this._clock});
this._requestScheduler.start();
this._session = new Session({
storage: this._storage,
sessionInfo: filteredSessionInfo,
hsApi: this._requestScheduler.hsApi,
olm,
clock: this._clock,
olmWorker,
cryptoDriver: this._cryptoDriver,
mediaRepository: new MediaRepository(sessionInfo.homeServer)
});
await this._session.load();
this._status.set(LoadStatus.SessionSetup);
await this._session.beforeFirstSync(isNewLogin);
this._sync = new Sync({hsApi, storage: this._storage, session: this._session});
this._sync = new Sync({hsApi: this._requestScheduler.hsApi, storage: this._storage, session: this._session});
// notify sync and session when back online
this._reconnectSubscription = this._reconnector.connectionStatus.subscribe(state => {
if (state === ConnectionStatus.Online) {
// needs to happen before sync and session or it would abort all requests
this._requestScheduler.start();
this._sync.start();
this._sessionStartedByReconnector = true;
this._session.start(this._reconnector.lastVersionsResponse);
}
});
@ -181,11 +197,7 @@ export class SessionContainer {
// restored the connection, it would have already
// started to session, so check first
// to prevent an extra /versions request
// TODO: this doesn't look logical, but works. Why?
// I think because isStarted is true by default. That's probably not what we intend.
// I think there is a bug here, in that even if the reconnector already started the session, we'd still do this.
if (this._session.isStarted) {
if (!this._sessionStartedByReconnector) {
const lastVersionsResponse = await hsApi.versions({timeout: 10000}).response();
this._session.start(lastVersionsResponse);
}
@ -251,6 +263,9 @@ export class SessionContainer {
this._reconnectSubscription();
this._reconnectSubscription = null;
}
if (this._requestScheduler) {
this._requestScheduler.stop();
}
if (this._sync) {
this._sync.stop();
}

View File

@ -120,11 +120,11 @@ export class Sync {
this._status.set(SyncStatus.Syncing);
}
} catch (err) {
this._status.set(SyncStatus.Stopped);
if (!(err instanceof AbortError)) {
console.warn("stopping sync because of error");
console.error(err);
this._error = err;
this._status.set(SyncStatus.Stopped);
}
}
if (this._status.get() !== SyncStatus.Stopped) {

View File

@ -159,10 +159,6 @@ export class RoomEncryption {
}
async _requestMissingSessionFromBackup(senderKey, sessionId, source) {
if (!this._sessionBackup) {
this._notifyMissingMegolmSession();
return;
}
// if the message came from sync, wait 10s to see if the room key arrives,
// and only after that proceed to request from backup
if (source === DecryptionSource.Sync) {
@ -171,6 +167,11 @@ export class RoomEncryption {
return;
}
}
// show prompt to enable secret storage
if (!this._sessionBackup) {
this._notifyMissingMegolmSession();
return;
}
try {
const session = await this._sessionBackup.getSession(this._room.id, sessionId);

View File

@ -1,5 +1,6 @@
/*
Copyright 2020 Bruno Windels <bruno@windels.cloud>
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,11 +15,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
import {
HomeServerError,
ConnectionError,
AbortError
} from "../error.js";
import {HomeServerError} from "../error.js";
import {encodeQueryParams} from "./common.js";
class RequestWrapper {
constructor(method, url, requestResult) {
@ -45,18 +43,6 @@ class RequestWrapper {
}
}
function encodeQueryParams(queryParams) {
return Object.entries(queryParams || {})
.filter(([, value]) => value !== undefined)
.map(([name, value]) => {
if (typeof value === "object") {
value = JSON.stringify(value);
}
return `${encodeURIComponent(name)}=${encodeURIComponent(value)}`;
})
.join("&");
}
export class HomeServerApi {
constructor({homeServer, accessToken, request, createTimeout, reconnector}) {
// store these both in a closure somehow so it's harder to get at in case of XSS?
@ -66,7 +52,6 @@ export class HomeServerApi {
this._requestFn = request;
this._createTimeout = createTimeout;
this._reconnector = reconnector;
this._mediaRepository = new MediaRepository(homeServer);
}
_url(csPath) {
@ -196,45 +181,6 @@ export class HomeServerApi {
roomKeyForRoomAndSession(version, roomId, sessionId, options = null) {
return this._get(`/room_keys/keys/${encodeURIComponent(roomId)}/${encodeURIComponent(sessionId)}`, {version}, null, options);
}
get mediaRepository() {
return this._mediaRepository;
}
}
class MediaRepository {
constructor(homeserver) {
this._homeserver = homeserver;
}
mxcUrlThumbnail(url, width, height, method) {
const parts = this._parseMxcUrl(url);
if (parts) {
const [serverName, mediaId] = parts;
const httpUrl = `${this._homeserver}/_matrix/media/r0/thumbnail/${encodeURIComponent(serverName)}/${encodeURIComponent(mediaId)}`;
return httpUrl + "?" + encodeQueryParams({width, height, method});
}
return null;
}
mxcUrl(url) {
const parts = this._parseMxcUrl(url);
if (parts) {
const [serverName, mediaId] = parts;
return `${this._homeserver}/_matrix/media/r0/download/${encodeURIComponent(serverName)}/${encodeURIComponent(mediaId)}`;
} else {
return null;
}
}
_parseMxcUrl(url) {
const prefix = "mxc://";
if (url.startsWith(prefix)) {
return url.substr(prefix.length).split("/", 2);
} else {
return null;
}
}
}
export function tests() {

View File

@ -0,0 +1,52 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {encodeQueryParams} from "./common.js";
export class MediaRepository {
constructor(homeserver) {
this._homeserver = homeserver;
}
mxcUrlThumbnail(url, width, height, method) {
const parts = this._parseMxcUrl(url);
if (parts) {
const [serverName, mediaId] = parts;
const httpUrl = `${this._homeserver}/_matrix/media/r0/thumbnail/${encodeURIComponent(serverName)}/${encodeURIComponent(mediaId)}`;
return httpUrl + "?" + encodeQueryParams({width, height, method});
}
return null;
}
mxcUrl(url) {
const parts = this._parseMxcUrl(url);
if (parts) {
const [serverName, mediaId] = parts;
return `${this._homeserver}/_matrix/media/r0/download/${encodeURIComponent(serverName)}/${encodeURIComponent(mediaId)}`;
} else {
return null;
}
}
_parseMxcUrl(url) {
const prefix = "mxc://";
if (url.startsWith(prefix)) {
return url.substr(prefix.length).split("/", 2);
} else {
return null;
}
}
}

View File

@ -0,0 +1,130 @@
/*
Copyright 2020 Bruno Windels <bruno@windels.cloud>
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {AbortError} from "../../utils/error.js";
import {HomeServerError} from "../error.js";
import {HomeServerApi} from "./HomeServerApi.js";
import {ExponentialRetryDelay} from "./ExponentialRetryDelay.js";
class Request {
constructor(methodName, args) {
this._methodName = methodName;
this._args = args;
this._responsePromise = new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
});
this._requestResult = null;
}
abort() {
if (this._requestResult) {
this._requestResult.abort();
} else {
this._reject(new AbortError());
}
}
response() {
return this._responsePromise;
}
}
class HomeServerApiWrapper {
constructor(scheduler) {
this._scheduler = scheduler;
}
}
// add request-wrapping methods to prototype
for (const methodName of Object.getOwnPropertyNames(HomeServerApi.prototype)) {
if (methodName !== "constructor" && !methodName.startsWith("_")) {
HomeServerApiWrapper.prototype[methodName] = function(...args) {
return this._scheduler._hsApiRequest(methodName, args);
};
}
}
export class RequestScheduler {
constructor({hsApi, clock}) {
this._hsApi = hsApi;
this._clock = clock;
this._requests = new Set();
this._isRateLimited = false;
this._isDrainingRateLimit = false;
this._stopped = true;
this._wrapper = new HomeServerApiWrapper(this);
}
get hsApi() {
return this._wrapper;
}
stop() {
this._stopped = true;
for (const request of this._requests) {
request.abort();
}
this._requests.clear();
}
start() {
this._stopped = false;
}
_hsApiRequest(name, args) {
const request = new Request(name, args);
this._doSend(request);
return request;
}
async _doSend(request) {
this._requests.add(request);
try {
let retryDelay;
while (!this._stopped) {
try {
const requestResult = this._hsApi[request._methodName].apply(this._hsApi, request._args);
// so the request can be aborted
request._requestResult = requestResult;
const response = await requestResult.response();
request._resolve(response);
return;
} catch (err) {
if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") {
if (Number.isSafeInteger(err.retry_after_ms)) {
await this._clock.createTimeout(err.retry_after_ms).elapsed();
} else {
if (!retryDelay) {
retryDelay = new ExponentialRetryDelay(this._clock.createTimeout);
}
await retryDelay.waitForRetry();
}
} else {
request._reject(err);
return;
}
}
}
if (this._stopped) {
request.abort();
}
} finally {
this._requests.delete(request);
}
}
}

28
src/matrix/net/common.js Normal file
View File

@ -0,0 +1,28 @@
/*
Copyright 2020 Bruno Windels <bruno@windels.cloud>
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
export function encodeQueryParams(queryParams) {
return Object.entries(queryParams || {})
.filter(([, value]) => value !== undefined)
.map(([name, value]) => {
if (typeof value === "object") {
value = JSON.stringify(value);
}
return `${encodeURIComponent(name)}=${encodeURIComponent(value)}`;
})
.join("&");
}

View File

@ -31,16 +31,17 @@ import {DecryptionSource} from "../e2ee/common.js";
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
export class Room extends EventEmitter {
constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents, user, createRoomEncryption, getSyncToken, clock}) {
constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, pendingEvents, user, createRoomEncryption, getSyncToken, clock}) {
super();
this._roomId = roomId;
this._storage = storage;
this._hsApi = hsApi;
this._mediaRepository = mediaRepository;
this._summary = new RoomSummary(roomId, user.id);
this._fragmentIdComparer = new FragmentIdComparer([]);
this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer});
this._emitCollectionChange = emitCollectionChange;
this._sendQueue = new SendQueue({roomId, storage, sendScheduler, pendingEvents});
this._sendQueue = new SendQueue({roomId, storage, hsApi, pendingEvents});
this._timeline = null;
this._user = user;
this._changedMembersDuringSync = null;
@ -517,7 +518,7 @@ export class Room extends EventEmitter {
}
get mediaRepository() {
return this._hsApi.mediaRepository;
return this._mediaRepository;
}
/** @package */

View File

@ -20,11 +20,11 @@ import {PendingEvent} from "./PendingEvent.js";
import {makeTxnId} from "../../common.js";
export class SendQueue {
constructor({roomId, storage, sendScheduler, pendingEvents}) {
constructor({roomId, storage, hsApi, pendingEvents}) {
pendingEvents = pendingEvents || [];
this._roomId = roomId;
this._storage = storage;
this._sendScheduler = sendScheduler;
this._hsApi = hsApi;
this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex);
if (pendingEvents.length) {
console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents);
@ -51,22 +51,18 @@ export class SendQueue {
continue;
}
if (pendingEvent.needsEncryption) {
const {type, content} = await this._sendScheduler.request(async hsApi => {
return await this._roomEncryption.encrypt(pendingEvent.eventType, pendingEvent.content, hsApi);
});
const {type, content} = await this._roomEncryption.encrypt(
pendingEvent.eventType, pendingEvent.content, this._hsApi);
pendingEvent.setEncrypted(type, content);
await this._tryUpdateEvent(pendingEvent);
}
console.log("really sending now");
const response = await this._sendScheduler.request(hsApi => {
console.log("got sendScheduler slot");
return hsApi.send(
const response = await this._hsApi.send(
pendingEvent.roomId,
pendingEvent.eventType,
pendingEvent.txnId,
pendingEvent.content
).response();
});
pendingEvent.remoteId = response.event_id;
//
console.log("writing remoteId now");