2020-08-05 18:38:55 +02:00
|
|
|
/*
|
|
|
|
Copyright 2020 Bruno Windels <bruno@windels.cloud>
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
2020-04-20 21:26:39 +02:00
|
|
|
import {SortedArray} from "../../../observable/list/SortedArray.js";
|
2020-04-19 19:05:12 +02:00
|
|
|
import {ConnectionError} from "../../error.js";
|
2021-05-20 11:43:09 +02:00
|
|
|
import {PendingEvent, SendStatus} from "./PendingEvent.js";
|
2021-05-19 16:41:07 +02:00
|
|
|
import {makeTxnId, isTxnId} from "../../common.js";
|
2021-05-20 10:01:30 +02:00
|
|
|
import {REDACTION_TYPE} from "../common.js";
|
2019-07-01 10:00:29 +02:00
|
|
|
|
2020-04-20 21:26:39 +02:00
|
|
|
export class SendQueue {
|
2020-09-22 13:43:18 +02:00
|
|
|
constructor({roomId, storage, hsApi, pendingEvents}) {
|
2019-07-26 22:33:33 +02:00
|
|
|
pendingEvents = pendingEvents || [];
|
2019-07-26 22:03:57 +02:00
|
|
|
this._roomId = roomId;
|
|
|
|
this._storage = storage;
|
2020-09-22 13:43:18 +02:00
|
|
|
this._hsApi = hsApi;
|
2019-07-26 22:03:57 +02:00
|
|
|
this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex);
|
2020-11-18 13:02:38 +01:00
|
|
|
this._pendingEvents.setManyUnsorted(pendingEvents.map(data => this._createPendingEvent(data)));
|
2019-07-26 22:03:57 +02:00
|
|
|
this._isSending = false;
|
2019-06-28 00:52:54 +02:00
|
|
|
this._offline = false;
|
2020-09-03 15:36:48 +02:00
|
|
|
this._roomEncryption = null;
|
|
|
|
}
|
|
|
|
|
2020-11-18 13:02:38 +01:00
|
|
|
_createPendingEvent(data, attachments = null) {
|
|
|
|
const pendingEvent = new PendingEvent({
|
|
|
|
data,
|
|
|
|
remove: () => this._removeEvent(pendingEvent),
|
2021-04-08 18:36:26 +02:00
|
|
|
emitUpdate: () => this._pendingEvents.update(pendingEvent),
|
2020-11-18 13:02:38 +01:00
|
|
|
attachments
|
|
|
|
});
|
|
|
|
return pendingEvent;
|
|
|
|
}
|
|
|
|
|
2020-09-03 15:36:48 +02:00
|
|
|
enableEncryption(roomEncryption) {
|
|
|
|
this._roomEncryption = roomEncryption;
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
|
|
|
|
2021-02-23 19:22:59 +01:00
|
|
|
_sendLoop(log) {
|
2019-07-26 22:03:57 +02:00
|
|
|
this._isSending = true;
|
2021-02-23 19:22:59 +01:00
|
|
|
this._sendLoopLogItem = log.runDetached("send queue flush", async log => {
|
|
|
|
try {
|
2021-05-20 14:51:04 +02:00
|
|
|
for (const pendingEvent of this._pendingEvents) {
|
2021-02-23 19:22:59 +01:00
|
|
|
await log.wrap("send event", async log => {
|
2021-02-23 19:58:01 +01:00
|
|
|
log.set("queueIndex", pendingEvent.queueIndex);
|
2021-02-23 19:22:59 +01:00
|
|
|
try {
|
|
|
|
await this._sendEvent(pendingEvent, log);
|
|
|
|
} catch(err) {
|
|
|
|
if (err instanceof ConnectionError) {
|
|
|
|
this._offline = true;
|
|
|
|
log.set("offline", true);
|
2021-05-20 14:49:54 +02:00
|
|
|
pendingEvent.setWaiting();
|
2021-02-23 19:22:59 +01:00
|
|
|
} else {
|
|
|
|
log.catch(err);
|
2021-05-20 14:52:30 +02:00
|
|
|
const isPermanentError = err.name === "HomeServerError" && (
|
|
|
|
err.statusCode === 400 || // bad request, must be a bug on our end
|
|
|
|
err.statusCode === 403 || // forbidden
|
|
|
|
err.statusCode === 404 // not found
|
|
|
|
);
|
|
|
|
if (isPermanentError) {
|
|
|
|
log.set("remove", true);
|
|
|
|
await pendingEvent.abort();
|
|
|
|
} else {
|
|
|
|
pendingEvent.setError(err);
|
|
|
|
}
|
2021-02-23 19:22:59 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
this._isSending = false;
|
|
|
|
this._sendLoopLogItem = null;
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
2021-02-23 19:22:59 +01:00
|
|
|
});
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
|
|
|
|
2021-02-23 19:22:59 +01:00
|
|
|
async _sendEvent(pendingEvent, log) {
|
2020-11-18 13:02:38 +01:00
|
|
|
if (pendingEvent.needsUpload) {
|
2021-02-23 19:22:59 +01:00
|
|
|
await log.wrap("upload attachments", log => pendingEvent.uploadAttachments(this._hsApi, log));
|
2020-11-18 13:02:38 +01:00
|
|
|
await this._tryUpdateEvent(pendingEvent);
|
|
|
|
}
|
|
|
|
if (pendingEvent.needsEncryption) {
|
|
|
|
pendingEvent.setEncrypting();
|
2021-02-23 19:22:59 +01:00
|
|
|
const {type, content} = await log.wrap("encrypt", log => this._roomEncryption.encrypt(
|
|
|
|
pendingEvent.eventType, pendingEvent.content, this._hsApi, log));
|
2020-11-18 13:02:38 +01:00
|
|
|
pendingEvent.setEncrypted(type, content);
|
|
|
|
await this._tryUpdateEvent(pendingEvent);
|
|
|
|
}
|
|
|
|
if (pendingEvent.needsSending) {
|
2021-02-23 19:22:59 +01:00
|
|
|
await pendingEvent.send(this._hsApi, log);
|
2021-05-19 16:41:07 +02:00
|
|
|
// we now have a remoteId, but this pending event may be removed at any point in the future
|
2021-05-31 15:55:31 +02:00
|
|
|
// (or past, so can't assume it still exists) once the remote echo comes in.
|
|
|
|
// So if we have any related events that need to resolve the relatedTxnId to a related event id,
|
|
|
|
// they need to do so now.
|
2021-05-19 16:41:07 +02:00
|
|
|
// We ensure this by writing the new remote id for the pending event and all related events
|
|
|
|
// with unresolved relatedTxnId in the queue in one transaction.
|
|
|
|
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
|
|
|
|
try {
|
|
|
|
await this._tryUpdateEventWithTxn(pendingEvent, txn);
|
2021-05-21 10:47:48 +02:00
|
|
|
await this._resolveRemoteIdInPendingRelations(
|
|
|
|
pendingEvent.txnId, pendingEvent.remoteId, txn);
|
2021-05-19 16:41:07 +02:00
|
|
|
} catch (err) {
|
|
|
|
txn.abort();
|
|
|
|
throw err;
|
|
|
|
}
|
|
|
|
await txn.complete();
|
2020-11-18 13:02:38 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-21 10:47:48 +02:00
|
|
|
async _resolveRemoteIdInPendingRelations(txnId, remoteId, txn) {
|
|
|
|
const relatedEventWithoutRemoteId = this._pendingEvents.array.filter(pe => {
|
|
|
|
return pe.relatedTxnId === txnId && pe.relatedEventId !== remoteId;
|
|
|
|
});
|
|
|
|
for (const relatedPE of relatedEventWithoutRemoteId) {
|
|
|
|
relatedPE.setRelatedEventId(remoteId);
|
|
|
|
await this._tryUpdateEventWithTxn(relatedPE, txn);
|
|
|
|
}
|
|
|
|
return relatedEventWithoutRemoteId;
|
|
|
|
}
|
|
|
|
|
|
|
|
async removeRemoteEchos(events, txn, parentLog) {
|
2019-07-26 22:33:33 +02:00
|
|
|
const removed = [];
|
|
|
|
for (const event of events) {
|
|
|
|
const txnId = event.unsigned && event.unsigned.transaction_id;
|
2020-03-23 23:00:33 +01:00
|
|
|
let idx;
|
2019-07-26 22:33:33 +02:00
|
|
|
if (txnId) {
|
2020-03-23 23:00:33 +01:00
|
|
|
idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId);
|
|
|
|
} else {
|
|
|
|
idx = this._pendingEvents.array.findIndex(pe => pe.remoteId === event.event_id);
|
|
|
|
}
|
|
|
|
if (idx !== -1) {
|
|
|
|
const pendingEvent = this._pendingEvents.get(idx);
|
2021-05-21 10:47:48 +02:00
|
|
|
const remoteId = event.event_id;
|
|
|
|
parentLog.log({l: "removeRemoteEcho", queueIndex: pendingEvent.queueIndex, remoteId, txnId});
|
2020-03-23 23:00:33 +01:00
|
|
|
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
|
|
|
|
removed.push(pendingEvent);
|
2021-05-21 10:47:48 +02:00
|
|
|
await this._resolveRemoteIdInPendingRelations(txnId, remoteId, txn);
|
2019-07-26 22:33:33 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return removed;
|
|
|
|
}
|
2019-06-28 00:52:54 +02:00
|
|
|
|
2020-11-18 13:02:38 +01:00
|
|
|
async _removeEvent(pendingEvent) {
|
|
|
|
const idx = this._pendingEvents.array.indexOf(pendingEvent);
|
|
|
|
if (idx !== -1) {
|
2021-03-04 19:47:02 +01:00
|
|
|
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
|
2020-11-18 13:02:38 +01:00
|
|
|
try {
|
|
|
|
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
|
|
|
|
} catch (err) {
|
|
|
|
txn.abort();
|
|
|
|
}
|
|
|
|
await txn.complete();
|
|
|
|
this._pendingEvents.remove(idx);
|
|
|
|
}
|
2020-11-18 20:08:42 +01:00
|
|
|
pendingEvent.dispose();
|
2020-11-18 13:02:38 +01:00
|
|
|
}
|
|
|
|
|
2019-07-26 22:33:33 +02:00
|
|
|
emitRemovals(pendingEvents) {
|
|
|
|
for (const pendingEvent of pendingEvents) {
|
|
|
|
const idx = this._pendingEvents.array.indexOf(pendingEvent);
|
|
|
|
if (idx !== -1) {
|
|
|
|
this._pendingEvents.remove(idx);
|
|
|
|
}
|
2020-11-18 20:08:42 +01:00
|
|
|
pendingEvent.dispose();
|
2019-07-26 22:03:57 +02:00
|
|
|
}
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
|
|
|
|
2021-02-23 19:22:59 +01:00
|
|
|
resumeSending(parentLog) {
|
2019-07-26 22:03:57 +02:00
|
|
|
this._offline = false;
|
2021-02-23 19:22:59 +01:00
|
|
|
if (this._pendingEvents.length) {
|
|
|
|
parentLog.wrap("resumeSending", log => {
|
|
|
|
log.set("id", this._roomId);
|
|
|
|
log.set("pendingEvents", this._pendingEvents.length);
|
|
|
|
if (!this._isSending) {
|
|
|
|
this._sendLoop(log);
|
|
|
|
}
|
|
|
|
if (this._sendLoopLogItem) {
|
|
|
|
log.refDetached(this._sendLoopLogItem);
|
|
|
|
}
|
|
|
|
});
|
2019-07-26 22:03:57 +02:00
|
|
|
}
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
2019-07-01 10:00:29 +02:00
|
|
|
|
2021-02-23 19:22:59 +01:00
|
|
|
async enqueueEvent(eventType, content, attachments, log) {
|
2021-05-21 10:47:48 +02:00
|
|
|
await this._enqueueEvent(eventType, content, attachments, null, null, log);
|
2021-05-19 16:41:07 +02:00
|
|
|
}
|
|
|
|
|
2021-05-21 10:47:48 +02:00
|
|
|
async _enqueueEvent(eventType, content, attachments, relatedTxnId, relatedEventId, log) {
|
|
|
|
const pendingEvent = await this._createAndStoreEvent(eventType, content, relatedTxnId, relatedEventId, attachments);
|
2019-07-26 22:03:57 +02:00
|
|
|
this._pendingEvents.set(pendingEvent);
|
2021-02-23 19:22:59 +01:00
|
|
|
log.set("queueIndex", pendingEvent.queueIndex);
|
|
|
|
log.set("pendingEvents", this._pendingEvents.length);
|
2019-07-26 22:03:57 +02:00
|
|
|
if (!this._isSending && !this._offline) {
|
2021-02-23 19:22:59 +01:00
|
|
|
this._sendLoop(log);
|
|
|
|
}
|
|
|
|
if (this._sendLoopLogItem) {
|
|
|
|
log.refDetached(this._sendLoopLogItem);
|
2019-07-26 22:03:57 +02:00
|
|
|
}
|
2019-07-01 10:00:29 +02:00
|
|
|
}
|
2019-06-28 00:52:54 +02:00
|
|
|
|
2021-05-19 16:41:07 +02:00
|
|
|
async enqueueRedaction(eventIdOrTxnId, reason, log) {
|
2021-05-21 10:47:48 +02:00
|
|
|
let relatedTxnId;
|
|
|
|
let relatedEventId;
|
2021-05-19 16:41:07 +02:00
|
|
|
if (isTxnId(eventIdOrTxnId)) {
|
2021-05-21 10:47:48 +02:00
|
|
|
relatedTxnId = eventIdOrTxnId;
|
2021-05-19 16:41:07 +02:00
|
|
|
const txnId = eventIdOrTxnId;
|
|
|
|
const pe = this._pendingEvents.array.find(pe => pe.txnId === txnId);
|
|
|
|
if (pe && !pe.remoteId && pe.status !== SendStatus.Sending) {
|
|
|
|
// haven't started sending this event yet,
|
|
|
|
// just remove it from the queue
|
2021-05-21 16:59:29 +02:00
|
|
|
log.set("remove", relatedTxnId);
|
2021-05-19 16:41:07 +02:00
|
|
|
await pe.abort();
|
|
|
|
return;
|
2021-05-21 10:47:48 +02:00
|
|
|
} else if (pe) {
|
|
|
|
relatedEventId = pe.remoteId;
|
|
|
|
} else {
|
2021-05-19 16:41:07 +02:00
|
|
|
// we don't have the pending event anymore,
|
|
|
|
// the remote echo must have arrived in the meantime.
|
|
|
|
// we could look for it in the timeline, but for now
|
|
|
|
// we don't do anything as this race is quite unlikely
|
|
|
|
// and a bit complicated to fix.
|
|
|
|
return;
|
|
|
|
}
|
2021-05-20 14:53:17 +02:00
|
|
|
} else {
|
2021-05-21 10:47:48 +02:00
|
|
|
relatedEventId = eventIdOrTxnId;
|
2021-05-21 16:59:29 +02:00
|
|
|
const pe = this._pendingEvents.array.find(pe => pe.remoteId === relatedEventId);
|
|
|
|
if (pe) {
|
|
|
|
// also set the txn id just in case that an event id was passed
|
|
|
|
// for relating to a pending event that is still waiting for the remote echo
|
|
|
|
relatedTxnId = pe.txnId;
|
|
|
|
}
|
2021-05-19 16:41:07 +02:00
|
|
|
}
|
2021-05-21 16:59:29 +02:00
|
|
|
log.set("relatedTxnId", eventIdOrTxnId);
|
|
|
|
log.set("relatedEventId", relatedEventId);
|
2021-05-21 10:47:48 +02:00
|
|
|
await this._enqueueEvent(REDACTION_TYPE, {reason}, null, relatedTxnId, relatedEventId, log);
|
2021-05-19 16:41:07 +02:00
|
|
|
}
|
|
|
|
|
2019-07-26 22:03:57 +02:00
|
|
|
get pendingEvents() {
|
|
|
|
return this._pendingEvents;
|
2019-07-01 10:00:29 +02:00
|
|
|
}
|
|
|
|
|
2019-07-26 22:03:57 +02:00
|
|
|
async _tryUpdateEvent(pendingEvent) {
|
2021-03-04 19:47:02 +01:00
|
|
|
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
|
2019-07-26 22:03:57 +02:00
|
|
|
try {
|
2021-05-19 16:41:07 +02:00
|
|
|
this._tryUpdateEventWithTxn(pendingEvent, txn);
|
2019-07-26 22:03:57 +02:00
|
|
|
} catch (err) {
|
|
|
|
txn.abort();
|
|
|
|
throw err;
|
|
|
|
}
|
|
|
|
await txn.complete();
|
|
|
|
}
|
2019-07-01 10:00:29 +02:00
|
|
|
|
2021-05-19 16:41:07 +02:00
|
|
|
async _tryUpdateEventWithTxn(pendingEvent, txn) {
|
|
|
|
// pendingEvent might have been removed already here
|
|
|
|
// by a racing remote echo, so check first so we don't recreate it
|
|
|
|
if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) {
|
|
|
|
txn.pendingEvents.update(pendingEvent.data);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-21 10:47:48 +02:00
|
|
|
async _createAndStoreEvent(eventType, content, relatedTxnId, relatedEventId, attachments) {
|
2021-03-04 19:47:02 +01:00
|
|
|
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
|
2019-07-26 22:03:57 +02:00
|
|
|
let pendingEvent;
|
|
|
|
try {
|
|
|
|
const pendingEventsStore = txn.pendingEvents;
|
|
|
|
const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0;
|
|
|
|
const queueIndex = maxQueueIndex + 1;
|
2021-05-19 16:41:07 +02:00
|
|
|
const needsEncryption = eventType !== REDACTION_TYPE && !!this._roomEncryption;
|
2020-11-18 13:02:38 +01:00
|
|
|
pendingEvent = this._createPendingEvent({
|
2019-07-26 22:03:57 +02:00
|
|
|
roomId: this._roomId,
|
|
|
|
queueIndex,
|
|
|
|
eventType,
|
|
|
|
content,
|
2021-05-21 10:47:48 +02:00
|
|
|
relatedTxnId,
|
|
|
|
relatedEventId,
|
2020-09-03 15:36:48 +02:00
|
|
|
txnId: makeTxnId(),
|
2021-05-19 16:41:07 +02:00
|
|
|
needsEncryption,
|
2020-11-18 13:02:38 +01:00
|
|
|
needsUpload: !!attachments
|
2020-11-13 17:19:19 +01:00
|
|
|
}, attachments);
|
2019-07-26 22:03:57 +02:00
|
|
|
pendingEventsStore.add(pendingEvent.data);
|
|
|
|
} catch (err) {
|
|
|
|
txn.abort();
|
|
|
|
throw err;
|
|
|
|
}
|
2019-07-01 10:00:29 +02:00
|
|
|
await txn.complete();
|
2019-07-26 22:03:57 +02:00
|
|
|
return pendingEvent;
|
2019-07-01 10:00:29 +02:00
|
|
|
}
|
2020-11-18 20:08:42 +01:00
|
|
|
|
|
|
|
dispose() {
|
2021-02-23 19:04:25 +01:00
|
|
|
for (const pe of this._pendingEvents) {
|
2020-11-18 20:08:42 +01:00
|
|
|
pe.dispose();
|
|
|
|
}
|
|
|
|
}
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
2021-06-02 12:33:15 +02:00
|
|
|
|
|
|
|
import {HomeServer as MockHomeServer} from "../../../mocks/HomeServer.js";
|
|
|
|
import {createMockStorage} from "../../../mocks/Storage.js";
|
|
|
|
import {NullLogger} from "../../../logging/NullLogger.js";
|
|
|
|
import {event, withContent, withTextBody, withTxnId} from "../../../mocks/event.js";
|
|
|
|
import {poll} from "../../../mocks/poll.js";
|
|
|
|
|
|
|
|
export function tests() {
|
|
|
|
const logger = new NullLogger();
|
|
|
|
return {
|
|
|
|
"enqueue second message when remote echo of first arrives before /send returns": async assert => {
|
|
|
|
const storage = await createMockStorage();
|
|
|
|
const hs = new MockHomeServer();
|
|
|
|
// 1. enqueue and start send event 1
|
|
|
|
const queue = new SendQueue({roomId: "!abc", storage, hsApi: hs.api});
|
|
|
|
const event1 = withTextBody(event("m.room.message", "$123"), "message 1");
|
|
|
|
await logger.run("event1", log => queue.enqueueEvent(event1.type, event1.content, null, log));
|
|
|
|
assert.equal(queue.pendingEvents.length, 1);
|
|
|
|
const sendRequest1 = hs.requests.send[0];
|
|
|
|
// 2. receive remote echo, before /send has returned
|
|
|
|
const remoteEcho = withTxnId(event1, sendRequest1.arguments[2]);
|
|
|
|
const txn = await storage.readWriteTxn([storage.storeNames.pendingEvents]);
|
|
|
|
const removal = await logger.run("remote echo", log => queue.removeRemoteEchos([remoteEcho], txn, log));
|
|
|
|
await txn.complete();
|
|
|
|
assert.equal(removal.length, 1);
|
|
|
|
queue.emitRemovals(removal);
|
|
|
|
assert.equal(queue.pendingEvents.length, 0);
|
|
|
|
// 3. now enqueue event 2
|
|
|
|
const event2 = withTextBody(event("m.room.message", "$456"), "message 2");
|
|
|
|
await logger.run("event2", log => queue.enqueueEvent(event2.type, event2.content, null, log));
|
|
|
|
// even though the first pending event has been removed by the remote echo,
|
|
|
|
// the second should get the next index, as the send loop is still blocking on the first one
|
|
|
|
assert.equal(Array.from(queue.pendingEvents)[0].queueIndex, 2);
|
|
|
|
// 4. send for event 1 comes back
|
|
|
|
sendRequest1.respond({event_id: event1.event_id});
|
|
|
|
// 5. now expect second send request for event 2
|
|
|
|
const sendRequest2 = await poll(() => hs.requests.send[1]);
|
|
|
|
sendRequest2.respond({event_id: event2.event_id});
|
|
|
|
await poll(() => !queue._isSending);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|