also resolve related event ids when removing remote echo during sync

as /sync races with /send, and remote echo may happen first.
It's important for local echo that the pending redaction/relation
will also get attached to the remote echo before /send returns,
otherwise the remote echo would be "unannotated" until /send returns
This commit is contained in:
Bruno Windels 2021-05-21 10:47:48 +02:00
parent c3fb35848b
commit c934049523
4 changed files with 48 additions and 39 deletions

View File

@ -263,7 +263,7 @@ export class BaseRoom extends EventEmitter {
let gapResult;
try {
// detect remote echos of pending messages in the gap
extraGapFillChanges = this._writeGapFill(response.chunk, txn, log);
extraGapFillChanges = await this._writeGapFill(response.chunk, txn, log);
// write new events into gap
const gapWriter = new GapWriter({
roomId: this._roomId,
@ -300,7 +300,7 @@ export class BaseRoom extends EventEmitter {
JoinedRoom uses this update remote echos.
*/
// eslint-disable-next-line no-unused-vars
_writeGapFill(chunk, txn, log) {}
async _writeGapFill(chunk, txn, log) {}
_applyGapFill() {}
/** @public */

View File

@ -159,7 +159,7 @@ export class Room extends BaseRoom {
}
let removedPendingEvents;
if (Array.isArray(roomResponse.timeline?.events)) {
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn, log);
removedPendingEvents = await this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn, log);
}
return {
summaryChanges,
@ -280,8 +280,8 @@ export class Room extends BaseRoom {
}
}
_writeGapFill(gapChunk, txn, log) {
const removedPendingEvents = this._sendQueue.removeRemoteEchos(gapChunk, txn, log);
async _writeGapFill(gapChunk, txn, log) {
const removedPendingEvents = await this._sendQueue.removeRemoteEchos(gapChunk, txn, log);
return removedPendingEvents;
}

View File

@ -16,7 +16,6 @@ limitations under the License.
import {createEnum} from "../../../utils/enum.js";
import {AbortError} from "../../../utils/error.js";
import {REDACTION_TYPE} from "../common.js";
import {isTxnId} from "../../common.js";
export const SendStatus = createEnum(
"Waiting",
@ -49,6 +48,13 @@ export class PendingEvent {
get txnId() { return this._data.txnId; }
get remoteId() { return this._data.remoteId; }
get content() { return this._data.content; }
get relatedTxnId() { return this._data.relatedTxnId; }
get relatedEventId() { return this._data.relatedEventId; }
setRelatedEventId(eventId) {
this._data.relatedEventId = eventId;
}
get data() { return this._data; }
getAttachment(key) {
@ -164,10 +170,9 @@ export class PendingEvent {
const eventType = this._data.encryptedEventType || this._data.eventType;
const content = this._data.encryptedContent || this._data.content;
if (eventType === REDACTION_TYPE) {
// TODO: should we double check here that this._data.redacts is not a txnId here anymore?
this._sendRequest = hsApi.redact(
this.roomId,
this._data.redacts,
this._data.relatedEventId,
this.txnId,
content,
{log}
@ -197,17 +202,4 @@ export class PendingEvent {
}
}
}
get relatedTxnId() {
if (isTxnId(this._data.redacts)) {
return this._data.redacts;
}
return null;
}
setRelatedEventId(eventId) {
if (this._data.redacts) {
this._data.redacts = eventId;
}
}
}

View File

@ -104,16 +104,11 @@ export class SendQueue {
// the relatedTxnId to a related event id, they need to do so now.
// We ensure this by writing the new remote id for the pending event and all related events
// with unresolved relatedTxnId in the queue in one transaction.
const relatedEvents = this._pendingEvents.array.filter(pe => pe.relatedTxnId === pendingEvent.txnId);
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
try {
await this._tryUpdateEventWithTxn(pendingEvent, txn);
for (const relatedPE of relatedEvents) {
relatedPE.setRelatedEventId(pendingEvent.remoteId);
await this._tryUpdateEventWithTxn(relatedPE, txn);
// emit that we now have a related remote id
this._pendingEvents.update(relatedPE)
}
await this._resolveRemoteIdInPendingRelations(
pendingEvent.txnId, pendingEvent.remoteId, txn);
} catch (err) {
txn.abort();
throw err;
@ -122,7 +117,20 @@ export class SendQueue {
}
}
removeRemoteEchos(events, txn, parentLog) {
async _resolveRemoteIdInPendingRelations(txnId, remoteId, txn) {
const relatedEventWithoutRemoteId = this._pendingEvents.array.filter(pe => {
return pe.relatedTxnId === txnId && pe.relatedEventId !== remoteId;
});
for (const relatedPE of relatedEventWithoutRemoteId) {
relatedPE.setRelatedEventId(remoteId);
await this._tryUpdateEventWithTxn(relatedPE, txn);
// emit that we now have a related remote id
// this._pendingEvents.update(relatedPE);
}
return relatedEventWithoutRemoteId;
}
async removeRemoteEchos(events, txn, parentLog) {
const removed = [];
for (const event of events) {
const txnId = event.unsigned && event.unsigned.transaction_id;
@ -134,9 +142,11 @@ export class SendQueue {
}
if (idx !== -1) {
const pendingEvent = this._pendingEvents.get(idx);
parentLog.log({l: "removeRemoteEcho", queueIndex: pendingEvent.queueIndex, remoteId: event.event_id, txnId});
const remoteId = event.event_id;
parentLog.log({l: "removeRemoteEcho", queueIndex: pendingEvent.queueIndex, remoteId, txnId});
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
removed.push(pendingEvent);
await this._resolveRemoteIdInPendingRelations(txnId, remoteId, txn);
}
}
return removed;
@ -184,12 +194,12 @@ export class SendQueue {
}
async enqueueEvent(eventType, content, attachments, log) {
await this._enqueueEvent(eventType, content, attachments, null, log);
await this._enqueueEvent(eventType, content, attachments, null, null, log);
}
async _enqueueEvent(eventType, content, attachments, redacts, log) {
const pendingEvent = await this._createAndStoreEvent(eventType, content, redacts, attachments);
async _enqueueEvent(eventType, content, attachments, relatedTxnId, relatedEventId, log) {
const pendingEvent = await this._createAndStoreEvent(eventType, content, relatedTxnId, relatedEventId, attachments);
this._pendingEvents.set(pendingEvent);
log.set("queueIndex", pendingEvent.queueIndex);
log.set("pendingEvents", this._pendingEvents.length);
@ -202,8 +212,11 @@ export class SendQueue {
}
async enqueueRedaction(eventIdOrTxnId, reason, log) {
let relatedTxnId;
let relatedEventId;
if (isTxnId(eventIdOrTxnId)) {
log.set("txnIdToRedact", eventIdOrTxnId);
relatedTxnId = eventIdOrTxnId;
log.set("relatedTxnId", eventIdOrTxnId);
const txnId = eventIdOrTxnId;
const pe = this._pendingEvents.array.find(pe => pe.txnId === txnId);
if (pe && !pe.remoteId && pe.status !== SendStatus.Sending) {
@ -211,7 +224,9 @@ export class SendQueue {
// just remove it from the queue
await pe.abort();
return;
} else if (!pe) {
} else if (pe) {
relatedEventId = pe.remoteId;
} else {
// we don't have the pending event anymore,
// the remote echo must have arrived in the meantime.
// we could look for it in the timeline, but for now
@ -220,9 +235,10 @@ export class SendQueue {
return;
}
} else {
log.set("eventIdToRedact", eventIdOrTxnId);
relatedEventId = eventIdOrTxnId;
log.set("relatedEventId", relatedEventId);
}
await this._enqueueEvent(REDACTION_TYPE, {reason}, null, eventIdOrTxnId, log);
await this._enqueueEvent(REDACTION_TYPE, {reason}, null, relatedTxnId, relatedEventId, log);
}
get pendingEvents() {
@ -248,7 +264,7 @@ export class SendQueue {
}
}
async _createAndStoreEvent(eventType, content, redacts, attachments) {
async _createAndStoreEvent(eventType, content, relatedTxnId, relatedEventId, attachments) {
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
let pendingEvent;
try {
@ -261,7 +277,8 @@ export class SendQueue {
queueIndex,
eventType,
content,
redacts,
relatedTxnId,
relatedEventId,
txnId: makeTxnId(),
needsEncryption,
needsUpload: !!attachments