This commit is contained in:
Bruno Windels 2019-06-28 00:52:54 +02:00
parent fc873fbfa5
commit c5b2d0c8b2
5 changed files with 151 additions and 6 deletions

View File

@ -76,7 +76,35 @@ steps of sending
// sender is the thing that is shared across rooms to handle rate limiting.
const sendQueue = new SendQueue({roomId, hsApi, sender, storage});
await sendQueue.load(); //loads the queue?
//might need to load members for e2e rooms
//might need to load members for e2e rooms ...
//events should be encrypted before storing them though ...
// terminology ...?
// task: to let us wait for it to be our turn
// given rate limiting
class Sender {
acquireSlot() {
return new SendSlot();
}
}
// terminology ...?
// task: after waiting for it to be our turn given rate-limiting,
// send the actual thing we want to send.
// this should be used for all rate-limited apis... ?
class SendSlot {
sendContent(content) {
}
sendRedaction() {
}
uploadMedia() {
}
}
class SendQueue {
// when trying to send
@ -93,11 +121,13 @@ steps of sending
while (let pendingEvent = await findNextPendingEvent()) {
pendingEvent.status = QUEUED;
try {
await this.sender.sendEvent(() => {
// callback gets called
pendingEvent.status = SENDING;
return pendingEvent;
});
const mediaSlot = await this.sender.acquireSlot();
const mxcUrl = await mediaSlot.uploadMedia(pendingEvent.blob);
pendingEvent.content.url = mxcUrl;
const contentSlot = await this.sender.acquireSlot();
contentSlot.sendContent(pendingEvent.content);
pendingEvent.status = SENDING;
await slot.sendContent(...);
} catch (err) {
//offline
}

View File

@ -2,6 +2,7 @@ export class HomeServerError extends Error {
constructor(method, url, body) {
super(`${body.error} on ${method} ${url}`);
this.errcode = body.errcode;
this.retry_after_ms = body.retry_after_ms;
}
}

View File

@ -0,0 +1,13 @@
export default class PendingEvent {
static fromRedaction(eventId) {
}
static fromContent(content) {
}
static fromStateKey(eventType, stateKey, content) {
}
}

View File

@ -0,0 +1,97 @@
class Sender {
constructor({hsApi}) {
this._hsApi = hsApi;
this._slotRequests = [];
this._sendScheduled = false;
this._offline = false;
this._waitTime = 0;
}
// this should really be per roomId to avoid head-of-line blocking
acquireSlot() {
let request;
const promise = new Promise((resolve) => request = {resolve});
this._slotRequests.push(request);
if (!this._sendScheduled) {
this._startNextSlot();
}
return promise;
}
async _startNextSlot() {
if (this._waitTime !== 0) {
await Platform.delay(this._waitTime);
}
const request = this._slotRequests.unshift();
this._currentSlot = new SenderSlot(this);
request.resolve(this._currentSlot);
}
_discardSlot(slot) {
if (slot === this._currentSlot) {
this._currentSlot = null;
this._sendScheduled = true;
Promise.resolve().then(() => this._startNextSlot());
}
}
async _doSend(slot, callback) {
this._sendScheduled = false;
if (slot !== this._currentSlot) {
throw new Error("slot is not active");
}
try {
// loop is left by return or throw
while(true) {
try {
return await callback(this._hsApi);
} catch (err) {
if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") {
await Platform.delay(err.retry_after_ms);
} else {
throw err;
}
}
}
} catch (err) {
if (err instanceof NetworkError) {
this._offline = true;
// went offline, probably want to notify SendQueues somehow
}
throw err;
} finally {
this._currentSlot = null;
if (!this._offline && this._slotRequests.length) {
this._sendScheduled = true;
Promise.resolve().then(() => this._startNextSlot());
}
}
}
}
class SenderSlot {
constructor(sender) {
this._sender = sender;
}
sendEvent(pendingEvent) {
return this._sender._doSend(this, async hsApi => {
const request = hsApi.send(
pendingEvent.roomId,
pendingEvent.eventType,
pendingEvent.txnId,
pendingEvent.content
);
const response = await request.response();
return response.event_id;
});
}
discard() {
this._sender._discardSlot(this);
}
}
export default class SendQueue {
constructor({sender})
}

View File

@ -13,4 +13,8 @@ export default {
// for indexeddb, we use unsigned 32 bit integers as keys
return 0xFFFFFFFF;
},
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}