use hsApi wrapper that handles rate-limiting instead of send scheduler

This commit is contained in:
Bruno Windels 2020-09-22 13:43:18 +02:00
parent d7c25e3106
commit 0a00d4c865
4 changed files with 95 additions and 117 deletions

View File

@ -14,72 +14,70 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
import {Platform} from "../Platform.js";
import {HomeServerError, ConnectionError} from "./error.js";
import {AbortError} from "../utils/error.js";
import {HomeServerError} from "./error.js";
import {HomeServerApi} from "./net/HomeServerApi.js";
import {ExponentialRetryDelay} from "./net/ExponentialRetryDelay.js";
export class RateLimitingBackoff {
constructor() {
this._remainingRateLimitedRequest = 0;
class Request {
constructor(methodName, args) {
this._methodName = methodName;
this._args = args;
this._responsePromise = new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
});
this._requestResult = null;
}
async waitAfterLimitExceeded(retryAfterMs) {
// this._remainingRateLimitedRequest = 5;
// if (typeof retryAfterMs !== "number") {
// } else {
// }
if (!retryAfterMs) {
retryAfterMs = 5000;
abort() {
if (this._requestResult) {
this._requestResult.abort();
} else {
this._reject(new AbortError());
}
await Platform.delay(retryAfterMs);
}
// do we have to know about succeeding requests?
// we can just
async waitForNextSend() {
// this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1);
response() {
return this._responsePromise;
}
}
/*
this represents a slot to do one rate limited api call.
because rate-limiting is handled here, it should only
try to do one call, so the SendScheduler can safely
retry if the call ends up being rate limited.
This is also why we have this abstraction it hsApi is not
passed straight to SendQueue when it is its turn to send.
e.g. we wouldn't want to repeat the callback in SendQueue that could
have other side-effects before the call to hsApi that we wouldn't want
repeated (setting up progress handlers for file uploads,
... a UI update to say it started sending?
... updating storage would probably only happen once the call succeeded
... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried)
) maybe it is a bit overengineering, but lets stick with it for now.
At least the above is a clear definition why we have this class
*/
//class SendSlot -- obsolete
class HomeServerApiWrapper {
constructor(scheduler) {
this._scheduler = scheduler;
}
}
// add request-wrapping methods to prototype
for (const methodName of Object.getOwnPropertyNames(HomeServerApi.prototype)) {
if (methodName !== "constructor" && !methodName.startsWith("_")) {
HomeServerApiWrapper.prototype[methodName] = function(...args) {
return this._scheduler._hsApiRequest(methodName, args);
};
}
}
export class SendScheduler {
constructor({hsApi, backoff}) {
constructor({hsApi, clock}) {
this._hsApi = hsApi;
this._sendRequests = [];
this._sendScheduled = false;
this._clock = clock;
this._requests = new Set();
this._isRateLimited = false;
this._isDrainingRateLimit = false;
this._stopped = false;
this._waitTime = 0;
this._backoff = backoff;
/*
we should have some sort of flag here that we enable
after all the rooms have been notified that they can resume
sending, so that from session, we can say scheduler.enable();
this way, when we have better scheduling, it won't be first come,
first serve, when there are a lot of events in different rooms to send,
but we can apply some priorization of who should go first
*/
// this._enabled;
}
createHomeServerApiWrapper() {
return new HomeServerApiWrapper(this);
}
stop() {
// TODO: abort current requests and set offline
this._stopped = true;
for (const request of this._requests) {
request.abort();
}
this._requests.clear();
}
start() {
@ -90,60 +88,45 @@ export class SendScheduler {
return !this._stopped;
}
// this should really be per roomId to avoid head-of-line blocking
//
// takes a callback instead of returning a promise with the slot
// to make sure the scheduler doesn't get blocked by a slot that is not consumed
request(sendCallback) {
let request;
const promise = new Promise((resolve, reject) => request = {resolve, reject, sendCallback});
this._sendRequests.push(request);
if (!this._sendScheduled && !this._stopped) {
this._sendLoop();
}
return promise;
_hsApiRequest(name, args) {
const request = new Request(name, args);
this._doSend(request);
return request;
}
async _sendLoop() {
while (this._sendRequests.length) {
const request = this._sendRequests.shift();
let result;
try {
// this can throw!
result = await this._doSend(request.sendCallback);
} catch (err) {
if (err instanceof ConnectionError) {
// we're offline, everybody will have
// to re-request slots when we come back online
this._stopped = true;
for (const r of this._sendRequests) {
r.reject(err);
async _doSend(request) {
this._requests.add(request);
try {
let retryDelay;
while (!this._stopped) {
try {
const requestResult = this._hsApi[request._methodName].apply(this._hsApi, request._args);
// so the request can be aborted
request._requestResult = requestResult;
const response = await requestResult.response();
request._resolve(response);
return;
} catch (err) {
if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") {
if (Number.isSafeInteger(err.retry_after_ms)) {
await this._clock.createTimeout(err.retry_after_ms).elapsed();
} else {
if (!retryDelay) {
retryDelay = new ExponentialRetryDelay(this._clock.createTimeout);
}
await retryDelay.waitForRetry();
}
} else {
request._reject(err);
return;
}
this._sendRequests = [];
}
console.error("error for request", err);
request.reject(err);
break;
}
request.resolve(result);
}
// do next here instead of in _doSend
}
async _doSend(sendCallback) {
this._sendScheduled = false;
await this._backoff.waitForNextSend();
// loop is left by return or throw
while (true) { // eslint-disable-line no-constant-condition
try {
return await sendCallback(this._hsApi);
} catch (err) {
if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") {
await this._backoff.waitAfterLimitExceeded(err.retry_after_ms);
} else {
throw err;
}
}
if (this._stopped) {
request.abort();
}
} finally {
this._requests.delete(request);
}
}
}

View File

@ -16,7 +16,7 @@ limitations under the License.
import {Room} from "./room/Room.js";
import { ObservableMap } from "../observable/index.js";
import { SendScheduler, RateLimitingBackoff } from "./SendScheduler.js";
import {SendScheduler} from "./SendScheduler.js";
import {User} from "./User.js";
import {DeviceMessageHandler} from "./DeviceMessageHandler.js";
import {Account as E2EEAccount} from "./e2ee/Account.js";
@ -42,15 +42,15 @@ const PICKLE_KEY = "DEFAULT_KEY";
export class Session {
// sessionInfo contains deviceId, userId and homeServer
constructor({clock, storage, hsApi, sessionInfo, olm, olmWorker, cryptoDriver, mediaRepository}) {
constructor({clock, storage, unwrappedHsApi, sessionInfo, olm, olmWorker, cryptoDriver, mediaRepository}) {
this._clock = clock;
this._storage = storage;
this._hsApi = hsApi;
this._sendScheduler = new SendScheduler({hsApi: unwrappedHsApi, clock});
this._hsApi = this._sendScheduler.createHomeServerApiWrapper();
this._mediaRepository = mediaRepository;
this._syncInfo = null;
this._sessionInfo = sessionInfo;
this._rooms = new ObservableMap();
this._sendScheduler = new SendScheduler({hsApi, backoff: new RateLimitingBackoff()});
this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params);
this._user = new User(sessionInfo.userId);
this._deviceMessageHandler = new DeviceMessageHandler({storage});
@ -332,9 +332,8 @@ export class Session {
storage: this._storage,
emitCollectionChange: this._roomUpdateCallback,
hsApi: this._hsApi,
sendScheduler: this._sendScheduler,
._hsApi,
mediaRepository: this._mediaRep pendingEvents,
mediaRepository: this._mediaRepository,
pendingEvents,
user: this._user,
createRoomEncryption: this._createRoomEncryption,
clock: this._clock

View File

@ -31,7 +31,7 @@ import {DecryptionSource} from "../e2ee/common.js";
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
export class Room extends EventEmitter {
constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, sendScheduler, pendingEvents, user, createRoomEncryption, getSyncToken, clock}) {
constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, pendingEvents, user, createRoomEncryption, getSyncToken, clock}) {
super();
this._roomId = roomId;
this._storage = storage;
@ -41,7 +41,7 @@ export class Room extends EventEmitter {
this._fragmentIdComparer = new FragmentIdComparer([]);
this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer});
this._emitCollectionChange = emitCollectionChange;
this._sendQueue = new SendQueue({roomId, storage, sendScheduler, pendingEvents});
this._sendQueue = new SendQueue({roomId, storage, hsApi, pendingEvents});
this._timeline = null;
this._user = user;
this._changedMembersDuringSync = null;

View File

@ -20,11 +20,11 @@ import {PendingEvent} from "./PendingEvent.js";
import {makeTxnId} from "../../common.js";
export class SendQueue {
constructor({roomId, storage, sendScheduler, pendingEvents}) {
constructor({roomId, storage, hsApi, pendingEvents}) {
pendingEvents = pendingEvents || [];
this._roomId = roomId;
this._storage = storage;
this._sendScheduler = sendScheduler;
this._hsApi = hsApi;
this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex);
if (pendingEvents.length) {
console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents);
@ -51,22 +51,18 @@ export class SendQueue {
continue;
}
if (pendingEvent.needsEncryption) {
const {type, content} = await this._sendScheduler.request(async hsApi => {
return await this._roomEncryption.encrypt(pendingEvent.eventType, pendingEvent.content, hsApi);
});
const {type, content} = await this._roomEncryption.encrypt(
pendingEvent.eventType, pendingEvent.content, this._hsApi);
pendingEvent.setEncrypted(type, content);
await this._tryUpdateEvent(pendingEvent);
}
console.log("really sending now");
const response = await this._sendScheduler.request(hsApi => {
console.log("got sendScheduler slot");
return hsApi.send(
const response = await this._hsApi.send(
pendingEvent.roomId,
pendingEvent.eventType,
pendingEvent.txnId,
pendingEvent.content
).response();
});
pendingEvent.remoteId = response.event_id;
//
console.log("writing remoteId now");