From f02641c808ac08752c021a80aae21c70f2f5ee0f Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Sun, 22 Mar 2020 00:07:37 +0100 Subject: [PATCH] look for transaction_id in /messages response to delete pending events --- src/matrix/room/room.js | 35 +++++++++--- .../room/timeline/persistence/GapWriter.js | 54 ++++++++----------- 2 files changed, 49 insertions(+), 40 deletions(-) diff --git a/src/matrix/room/room.js b/src/matrix/room/room.js index 530e42fd..7de0d92b 100644 --- a/src/matrix/room/room.js +++ b/src/matrix/room/room.js @@ -68,14 +68,35 @@ export default class Room extends EventEmitter { limit: amount, filter: {lazy_load_members: true} }).response(); - const gapWriter = new GapWriter({ - roomId: this._roomId, - storage: this._storage, - fragmentIdComparer: this._fragmentIdComparer - }); - const newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response); + + const txn = await this._storage.readWriteTxn([ + this._storage.storeNames.pendingEvents, + this._storage.storeNames.timelineEvents, + this._storage.storeNames.timelineFragments, + ]); + let removedPendingEvents; + let newEntries; + try { + // detect remote echos of pending messages in the gap + removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn); + // write new events into gap + const gapWriter = new GapWriter({ + roomId: this._roomId, + storage: this._storage, + fragmentIdComparer: this._fragmentIdComparer + }); + newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response, txn); + } catch (err) { + txn.abort(); + throw err; + } + await txn.complete(); + // once txn is committed, emit events + if (removedPendingEvents) { + this._sendQueue.emitRemovals(removedPendingEvents); + } if (this._timeline) { - this._timeline.addGapEntries(newEntries) + this._timeline.addGapEntries(newEntries); } } diff --git a/src/matrix/room/timeline/persistence/GapWriter.js b/src/matrix/room/timeline/persistence/GapWriter.js index 9941c712..5dba0664 100644 --- a/src/matrix/room/timeline/persistence/GapWriter.js +++ b/src/matrix/room/timeline/persistence/GapWriter.js @@ -76,7 +76,7 @@ export default class GapWriter { txn.timelineFragments.update(fragmentEntry.fragment); } - async writeFragmentFill(fragmentEntry, response) { + async writeFragmentFill(fragmentEntry, response, txn) { const {fragmentId, direction} = fragmentEntry; // chunk is in reverse-chronological order when backwards const {chunk, start, end} = response; @@ -89,40 +89,28 @@ export default class GapWriter { throw new Error("Invalid end token in response"); } - const txn = await this._storage.readWriteTxn([ - this._storage.storeNames.timelineEvents, - this._storage.storeNames.timelineFragments, - ]); - - try { - // make sure we have the latest fragment from the store - const fragment = await txn.timelineFragments.get(this._roomId, fragmentId); - if (!fragment) { - throw new Error(`Unknown fragment: ${fragmentId}`); - } - fragmentEntry = fragmentEntry.withUpdatedFragment(fragment); - // check that the request was done with the token we are aware of (extra care to avoid timeline corruption) - if (fragmentEntry.token !== start) { - throw new Error("start is not equal to prev_batch or next_batch"); - } - // find last event in fragment so we get the eventIndex to begin creating keys at - let lastKey = await this._findLastFragmentEventKey(fragmentEntry, txn); - // find out if any event in chunk is already present using findFirstOrLastOccurringEventId - const { - nonOverlappingEvents, - neighbourFragmentEntry - } = await this._findOverlappingEvents(fragmentEntry, chunk, txn); - - // create entries for all events in chunk, add them to entries - entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn); - await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn); - } catch (err) { - txn.abort(); - throw err; + // make sure we have the latest fragment from the store + const fragment = await txn.timelineFragments.get(this._roomId, fragmentId); + if (!fragment) { + throw new Error(`Unknown fragment: ${fragmentId}`); } + fragmentEntry = fragmentEntry.withUpdatedFragment(fragment); + // check that the request was done with the token we are aware of (extra care to avoid timeline corruption) + if (fragmentEntry.token !== start) { + throw new Error("start is not equal to prev_batch or next_batch"); + } + // find last event in fragment so we get the eventIndex to begin creating keys at + let lastKey = await this._findLastFragmentEventKey(fragmentEntry, txn); + // find out if any event in chunk is already present using findFirstOrLastOccurringEventId + const { + nonOverlappingEvents, + neighbourFragmentEntry + } = await this._findOverlappingEvents(fragmentEntry, chunk, txn); - await txn.complete(); - + // create entries for all events in chunk, add them to entries + entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn); + await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn); + return entries; } }