diff --git a/src/matrix/common.js b/src/matrix/common.js index 3c893234..67a95205 100644 --- a/src/matrix/common.js +++ b/src/matrix/common.js @@ -19,4 +19,19 @@ export function makeTxnId() { const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); const str = n.toString(16); return "t" + "0".repeat(14 - str.length) + str; +} + +export function isTxnId(txnId) { + return txnId.startsWith("t") && txnId.length === 15; +} + +export function tests() { + return { + "isTxnId succeeds on result of makeTxnId": assert => { + assert(isTxnId(makeTxnId())); + }, + "isTxnId fails on event id": assert => { + assert(!isTxnId("$yS_n5n3cIO2aTtek0_2ZSlv-7g4YYR2zKrk2mFCW_rm")); + }, + } } \ No newline at end of file diff --git a/src/matrix/net/HomeServerApi.js b/src/matrix/net/HomeServerApi.js index f834b94b..02d4dc2d 100644 --- a/src/matrix/net/HomeServerApi.js +++ b/src/matrix/net/HomeServerApi.js @@ -121,6 +121,10 @@ export class HomeServerApi { return this._put(`/rooms/${encodeURIComponent(roomId)}/send/${encodeURIComponent(eventType)}/${encodeURIComponent(txnId)}`, {}, content, options); } + redact(roomId, eventId, txnId, content, options = null) { + return this._put(`/rooms/${encodeURIComponent(roomId)}/redact/${encodeURIComponent(eventId)}/${encodeURIComponent(txnId)}`, {}, content, options); + } + receipt(roomId, receiptType, eventId, options = null) { return this._post(`/rooms/${encodeURIComponent(roomId)}/receipt/${encodeURIComponent(receiptType)}/${encodeURIComponent(eventId)}`, {}, {}, options); diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js index e6d518ca..f62cbc64 100644 --- a/src/matrix/room/sending/PendingEvent.js +++ b/src/matrix/room/sending/PendingEvent.js @@ -15,6 +15,8 @@ limitations under the License. */ import {createEnum} from "../../../utils/enum.js"; import {AbortError} from "../../../utils/error.js"; +import {isTxnId} from "../../common.js"; +import {REDACTION_TYPE} from "./SendQueue.js"; export const SendStatus = createEnum( "Waiting", @@ -134,7 +136,7 @@ export class PendingEvent { this._data.needsUpload = false; } - abort() { + async abort() { if (!this._aborted) { this._aborted = true; if (this._attachments) { @@ -143,7 +145,7 @@ export class PendingEvent { } } this._sendRequest?.abort(); - this._removeFromQueueCallback(); + await this._removeFromQueueCallback(); } } @@ -156,15 +158,27 @@ export class PendingEvent { this._emitUpdate("status"); const eventType = this._data.encryptedEventType || this._data.eventType; const content = this._data.encryptedContent || this._data.content; - this._sendRequest = hsApi.send( - this.roomId, - eventType, - this.txnId, - content, - {log} - ); + if (eventType === REDACTION_TYPE) { + // TODO: should we double check here that this._data.redacts is not a txnId here anymore? + this._sendRequest = hsApi.redact( + this.roomId, + this._data.redacts, + this.txnId, + content, + {log} + ); + } else { + this._sendRequest = hsApi.send( + this.roomId, + eventType, + this.txnId, + content, + {log} + ); + } const response = await this._sendRequest.response(); this._sendRequest = null; + // both /send and /redact have the same response format this._data.remoteId = response.event_id; log.set("id", this._data.remoteId); this._status = SendStatus.Sent; @@ -178,4 +192,16 @@ export class PendingEvent { } } } + + get relatedTxnId() { + if (isTxnId(this._data.redacts)) { + return this._data.redacts; + } + } + + setRelatedEventId(eventId) { + if (this._data.redacts) { + this._data.redacts = eventId; + } + } } diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index 4ad5e527..1420b56e 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -17,7 +17,9 @@ limitations under the License. import {SortedArray} from "../../../observable/list/SortedArray.js"; import {ConnectionError} from "../../error.js"; import {PendingEvent} from "./PendingEvent.js"; -import {makeTxnId} from "../../common.js"; +import {makeTxnId, isTxnId} from "../../common.js"; + +export const REDACTION_TYPE = "m.room.redaction"; export class SendQueue { constructor({roomId, storage, hsApi, pendingEvents}) { @@ -101,8 +103,24 @@ export class SendQueue { } if (pendingEvent.needsSending) { await pendingEvent.send(this._hsApi, log); - - await this._tryUpdateEvent(pendingEvent); + // we now have a remoteId, but this pending event may be removed at any point in the future + // once the remote echo comes in. So if we have any related events that need to resolve + // the relatedTxnId to a related event id, they need to do so now. + // We ensure this by writing the new remote id for the pending event and all related events + // with unresolved relatedTxnId in the queue in one transaction. + const relatedEvents = this._pendingEvents.array.find(pe => pe.relatedTxnId === pendingEvent.txnId); + const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); + try { + await this._tryUpdateEventWithTxn(pendingEvent, txn); + for (const relatedPE of relatedEvents) { + relatedPE.setRelatedEventId(pendingEvent.remoteId); + await this._tryUpdateEventWithTxn(relatedPE, txn); + } + } catch (err) { + txn.abort(); + throw err; + } + await txn.complete(); } } @@ -168,7 +186,12 @@ export class SendQueue { } async enqueueEvent(eventType, content, attachments, log) { - const pendingEvent = await this._createAndStoreEvent(eventType, content, attachments); + await this._enqueueEvent(eventType, content, attachments, null, log); + } + + + async _enqueueEvent(eventType, content, attachments, redacts, log) { + const pendingEvent = await this._createAndStoreEvent(eventType, content, redacts, attachments); this._pendingEvents.set(pendingEvent); log.set("queueIndex", pendingEvent.queueIndex); log.set("pendingEvents", this._pendingEvents.length); @@ -180,6 +203,27 @@ export class SendQueue { } } + async enqueueRedaction(eventIdOrTxnId, reason, log) { + if (isTxnId(eventIdOrTxnId)) { + const txnId = eventIdOrTxnId; + const pe = this._pendingEvents.array.find(pe => pe.txnId === txnId); + if (pe && !pe.remoteId && pe.status !== SendStatus.Sending) { + // haven't started sending this event yet, + // just remove it from the queue + await pe.abort(); + return; + } else if (!pe) { + // we don't have the pending event anymore, + // the remote echo must have arrived in the meantime. + // we could look for it in the timeline, but for now + // we don't do anything as this race is quite unlikely + // and a bit complicated to fix. + return; + } + } + await this._enqueueEvent(REDACTION_TYPE, {reason}, null, eventIdOrTxnId, log); + } + get pendingEvents() { return this._pendingEvents; } @@ -187,11 +231,7 @@ export class SendQueue { async _tryUpdateEvent(pendingEvent) { const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); try { - // pendingEvent might have been removed already here - // by a racing remote echo, so check first so we don't recreate it - if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) { - txn.pendingEvents.update(pendingEvent.data); - } + this._tryUpdateEventWithTxn(pendingEvent, txn); } catch (err) { txn.abort(); throw err; @@ -199,20 +239,30 @@ export class SendQueue { await txn.complete(); } - async _createAndStoreEvent(eventType, content, attachments) { + async _tryUpdateEventWithTxn(pendingEvent, txn) { + // pendingEvent might have been removed already here + // by a racing remote echo, so check first so we don't recreate it + if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) { + txn.pendingEvents.update(pendingEvent.data); + } + } + + async _createAndStoreEvent(eventType, content, redacts, attachments) { const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); let pendingEvent; try { const pendingEventsStore = txn.pendingEvents; const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; const queueIndex = maxQueueIndex + 1; + const needsEncryption = eventType !== REDACTION_TYPE && !!this._roomEncryption; pendingEvent = this._createPendingEvent({ roomId: this._roomId, queueIndex, eventType, content, + redacts, txnId: makeTxnId(), - needsEncryption: !!this._roomEncryption, + needsEncryption, needsUpload: !!attachments }, attachments); pendingEventsStore.add(pendingEvent.data);