diff --git a/src/matrix/DeviceMessageHandler.js b/src/matrix/DeviceMessageHandler.js index 4c7d0e75..8a50c66c 100644 --- a/src/matrix/DeviceMessageHandler.js +++ b/src/matrix/DeviceMessageHandler.js @@ -59,12 +59,12 @@ export class DeviceMessageHandler { return {roomKeys}; } - _applyDecryptChanges(rooms, {roomKeys}) { - if (roomKeys && roomKeys.length) { - const roomKeysByRoom = groupBy(roomKeys, s => s.roomId); - for (const [roomId, roomKeys] of roomKeysByRoom) { - const room = rooms.get(roomId); - room?.notifyRoomKeys(roomKeys); + async _applyDecryptChanges(rooms, {roomKeys}) { + if (Array.isArray(roomKeys)) { + for (const roomKey of roomKeys) { + const room = rooms.get(roomKey.roomId); + // TODO: this is less parallized than it could be (like sync) + await room?.notifyRoomKey(roomKey); } } } @@ -101,7 +101,7 @@ export class DeviceMessageHandler { throw err; } await txn.complete(); - this._applyDecryptChanges(rooms, changes); + await this._applyDecryptChanges(rooms, changes); } async _getPendingEvents(txn) { diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js index f0446f98..7b45a60c 100644 --- a/src/matrix/e2ee/RoomEncryption.js +++ b/src/matrix/e2ee/RoomEncryption.js @@ -42,8 +42,12 @@ export class RoomEncryption { this._megolmBackfillCache = this._megolmDecryption.createSessionCache(); this._megolmSyncCache = this._megolmDecryption.createSessionCache(); - // not `event_id`, but an internal event id passed in to the decrypt methods - this._eventIdsByMissingSession = new Map(); + // session => event ids of messages we tried to decrypt and the session was missing + this._missingSessions = new SessionToEventIdsMap(); + // sessions that may or may not be missing, but that while + // looking for a particular session came up as a candidate and were + // added to the cache to prevent further lookups from storage + this._missingSessionCandidates = new SessionToEventIdsMap(); this._senderDeviceCache = new Map(); this._storage = storage; this._sessionBackup = sessionBackup; @@ -57,8 +61,7 @@ export class RoomEncryption { return; } this._sessionBackup = sessionBackup; - for(const key of this._eventIdsByMissingSession.keys()) { - const {senderKey, sessionId} = decodeMissingSessionKey(key); + for(const {senderKey, sessionId} of this._missingSessions.getSessions()) { await this._requestMissingSessionFromBackup(senderKey, sessionId, null); } } @@ -115,13 +118,17 @@ export class RoomEncryption { if (customCache) { customCache.dispose(); } - return new DecryptionPreparation(preparation, errors, {isTimelineOpen, source}, this); + return new DecryptionPreparation(preparation, errors, {isTimelineOpen, source}, this, events); } - async _processDecryptionResults(results, errors, flags, txn) { - for (const error of errors.values()) { - if (error.code === "MEGOLM_NO_SESSION") { - this._addMissingSessionEvent(error.event, flags.source); + async _processDecryptionResults(events, results, errors, flags, txn) { + for (const event of events) { + const error = errors.get(event.event_id); + if (error?.code === "MEGOLM_NO_SESSION") { + this._addMissingSessionEvent(event, flags.source); + } else { + this._missingSessions.removeEvent(event); + this._missingSessionCandidates.removeEvent(event); } } if (flags.isTimelineOpen) { @@ -145,17 +152,12 @@ export class RoomEncryption { } _addMissingSessionEvent(event, source) { - const senderKey = event.content?.["sender_key"]; - const sessionId = event.content?.["session_id"]; - const key = encodeMissingSessionKey(senderKey, sessionId); - let eventIds = this._eventIdsByMissingSession.get(key); - // new missing session - if (!eventIds) { + const isNewSession = this._missingSessions.addEvent(event); + if (isNewSession) { + const senderKey = event.content?.["sender_key"]; + const sessionId = event.content?.["session_id"]; this._requestMissingSessionFromBackup(senderKey, sessionId, source); - eventIds = new Set(); - this._eventIdsByMissingSession.set(key, eventIds); } - eventIds.add(event.event_id); } async _requestMissingSessionFromBackup(senderKey, sessionId, source) { @@ -163,7 +165,7 @@ export class RoomEncryption { // and only after that proceed to request from backup if (source === DecryptionSource.Sync) { await this._clock.createTimeout(10000).elapsed(); - if (this._disposed || !this._eventIdsByMissingSession.has(encodeMissingSessionKey(senderKey, sessionId))) { + if (this._disposed || !this._missingSessions.hasSession(senderKey, sessionId)) { return; } } @@ -192,8 +194,8 @@ export class RoomEncryption { await txn.complete(); if (roomKey) { - // this will call into applyRoomKeys below - await this._room.notifyRoomKeys([roomKey]); + // this will reattempt decryption + await this._room.notifyRoomKey(roomKey); } } else if (session?.algorithm) { console.info(`Backed-up session of unknown algorithm: ${session.algorithm}`); @@ -212,18 +214,17 @@ export class RoomEncryption { * @param {Array} roomKeys * @return {Array} the event ids that should be retried to decrypt */ - applyRoomKeys(roomKeys) { - // retry decryption with the new sessions - const retryEventIds = []; - for (const roomKey of roomKeys) { - const key = encodeMissingSessionKey(roomKey.senderKey, roomKey.sessionId); - const entriesForSession = this._eventIdsByMissingSession.get(key); - if (entriesForSession) { - this._eventIdsByMissingSession.delete(key); - retryEventIds.push(...entriesForSession); - } + getEventIdsForRoomKey(roomKey) { + let eventIds = this._missingSessions.getEventIds(roomKey.senderKey, roomKey.sessionId); + if (!eventIds) { + eventIds = this._missingSessionCandidates.getEventIds(roomKey.senderKey, roomKey.sessionId); } - return retryEventIds; + return eventIds; + } + + findAndCacheEntriesForRoomKey(roomKey, candidateEntries) { + // add all to _missingSessionCandidates + // filter messages to roomKey } async encrypt(type, content, hsApi) { @@ -354,11 +355,12 @@ export class RoomEncryption { * the decryption results before turning them */ class DecryptionPreparation { - constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption) { + constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption, events) { this._megolmDecryptionPreparation = megolmDecryptionPreparation; this._extraErrors = extraErrors; this._flags = flags; this._roomEncryption = roomEncryption; + this._events = events; } async decrypt() { @@ -366,7 +368,8 @@ class DecryptionPreparation { await this._megolmDecryptionPreparation.decrypt(), this._extraErrors, this._flags, - this._roomEncryption); + this._roomEncryption, + this._events); } dispose() { @@ -375,17 +378,18 @@ class DecryptionPreparation { } class DecryptionChanges { - constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption) { + constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption, events) { this._megolmDecryptionChanges = megolmDecryptionChanges; this._extraErrors = extraErrors; this._flags = flags; this._roomEncryption = roomEncryption; + this._events = events; } async write(txn) { const {results, errors} = await this._megolmDecryptionChanges.write(txn); mergeMap(this._extraErrors, errors); - await this._roomEncryption._processDecryptionResults(results, errors, this._flags, txn); + await this._roomEncryption._processDecryptionResults(this._events, results, errors, this._flags, txn); return new BatchDecryptionResult(results, errors); } } @@ -410,3 +414,58 @@ class BatchDecryptionResult { } } } + +class SessionToEventIdsMap { + constructor() { + this._eventIdsBySession = new Map(); + } + + addEvent(event) { + let isNewSession = false; + const senderKey = event.content?.["sender_key"]; + const sessionId = event.content?.["session_id"]; + const key = encodeMissingSessionKey(senderKey, sessionId); + let eventIds = this._eventIdsBySession.get(key); + // new missing session + if (!eventIds) { + eventIds = new Set(); + this._eventIdsBySession.set(key, eventIds); + isNewSession = true; + } + eventIds.add(event.event_id); + return isNewSession; + } + + getEventIds(senderKey, sessionId) { + const key = encodeMissingSessionKey(senderKey, sessionId); + const entriesForSession = this._eventIdsBySession.get(key); + if (entriesForSession) { + return [...entriesForSession]; + } + } + + getSessions() { + return Array.from(this._eventIdsBySession.keys()).map(decodeMissingSessionKey); + } + + hasSession(senderKey, sessionId) { + return this._eventIdsBySession.has(encodeMissingSessionKey(senderKey, sessionId)); + } + + removeEvent(event) { + let hasRemovedSession = false; + const senderKey = event.content?.["sender_key"]; + const sessionId = event.content?.["session_id"]; + const key = encodeMissingSessionKey(senderKey, sessionId); + let eventIds = this._eventIdsBySession.get(key); + if (eventIds) { + if (eventIds.delete(event.event_id)) { + if (!eventIds.length) { + this._eventIdsBySession.delete(key); + hasRemovedSession = true; + } + } + } + return hasRemovedSession; + } +} diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 2688c6b9..c9a490cb 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -52,42 +52,63 @@ export class Room extends EventEmitter { this._clock = clock; } - _readEntriesToRetryDecryption(retryEventIds) { - const readFromEventKey = retryEventIds.length !== 0; - const stores = - const txn = await this._storage.readTxn([ - this._storage.storeNames.timelineEvents, - this._storage.storeNames.inboundGroupSessions, - ]); + _readRetryDecryptCandidateEntries(sinceEventKey, txn) { + if (sinceEventKey) { + return readFromWithTxn(sinceEventKey, Direction.Forward, Number.MAX_SAFE_INTEGER, txn); + } else { + // all messages for room + return readFromWithTxn(this._syncWriter.lastMessageKey, Direction.Backward, Number.MAX_SAFE_INTEGER, txn); + } } - async notifyRoomKeys(roomKeys) { - if (this._roomEncryption) { - let retryEventIds = this._roomEncryption.applyRoomKeys(roomKeys); - if (retryEventIds.length) { - const retryEntries = []; - const txn = await this._storage.readTxn([ - this._storage.storeNames.timelineEvents, - this._storage.storeNames.inboundGroupSessions, - ]); - for (const eventId of retryEventIds) { - const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); - if (storageEntry) { - retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer)); - } + async notifyRoomKey(roomKey) { + if (!this._roomEncryption) { + return; + } + const retryEventIds = this._roomEncryption.getEventIdsForRoomKey(roomKey); + const stores = [ + this._storage.storeNames.timelineEvents, + this._storage.storeNames.inboundGroupSessions, + ]; + let txn; + let retryEntries; + if (retryEventIds) { + retryEntries = []; + txn = await this._storage.readTxn(stores); + for (const eventId of retryEventIds) { + const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); + if (storageEntry) { + retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer)); } - const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn); - await decryptRequest.complete(); + } + } else { + // we only look for messages since lastDecryptedEventKey because + // the timeline must be closed (otherwise getEventIdsForRoomKey would have found the event ids) + // and to update the summary we only care about events since lastDecryptedEventKey + const key = this._summary.data.lastDecryptedEventKey; + // key might be missing if we haven't decrypted any events in this room + const sinceEventKey = key && new EventKey(key.fragmentId, key.entryIndex); + // check we have not already decrypted the most recent event in the room + // otherwise we know that the messages for this room key will not update the room summary + if (!sinceEventKey || !sinceEventKey.equals(this._syncWriter.lastMessageKey)) { + txn = await this._storage.readTxn(stores.concat(this._storage.storeNames.timelineFragments)); + const candidateEntries = await this._readRetryDecryptCandidateEntries(sinceEventKey, txn); + retryEntries = this._roomEncryption.findAndCacheEntriesForRoomKey(roomKey, candidateEntries); + } + } + if (retryEntries?.length) { + const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn); + // this will close txn while awaiting decryption + await decryptRequest.complete(); - this._timeline?.replaceEntries(retryEntries); - // we would ideally write the room summary in the same txn as the groupSessionDecryptions in the - // _decryptEntries entries and could even know which events have been decrypted for the first - // time from DecryptionChanges.write and only pass those to the summary. As timeline changes - // are not essential to the room summary, it's fine to write this in a separate txn for now. - const changes = this._summary.data.applyTimelineEntries(retryEntries, false, this._isTimelineOpen); - if (await this._summary.writeAndApplyData(changes, this._storage)) { - this._emitUpdate(); - } + this._timeline?.replaceEntries(retryEntries); + // we would ideally write the room summary in the same txn as the groupSessionDecryptions in the + // _decryptEntries entries and could even know which events have been decrypted for the first + // time from DecryptionChanges.write and only pass those to the summary. As timeline changes + // are not essential to the room summary, it's fine to write this in a separate txn for now. + const changes = this._summary.data.applyTimelineEntries(retryEntries, false, this._isTimelineOpen); + if (await this._summary.writeAndApplyData(changes, this._storage)) { + this._emitUpdate(); } } } @@ -203,7 +224,7 @@ export class Room extends EventEmitter { heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn); } let removedPendingEvents; - if (roomResponse.timeline && roomResponse.timeline.events) { + if (Array.isArray(roomResponse.timeline?.events)) { removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn); } return { diff --git a/src/matrix/room/timeline/EventKey.js b/src/matrix/room/timeline/EventKey.js index 128f8805..e1771258 100644 --- a/src/matrix/room/timeline/EventKey.js +++ b/src/matrix/room/timeline/EventKey.js @@ -63,6 +63,10 @@ export class EventKey { toString() { return `[${this.fragmentId}/${this.eventIndex}]`; } + + equals(other) { + return this.fragmentId === other?.fragmentId && this.eventIndex === other?.eventIndex; + } } export function xtests() { diff --git a/src/matrix/room/timeline/persistence/TimelineReader.js b/src/matrix/room/timeline/persistence/TimelineReader.js index 7e832b32..b2bb8fbf 100644 --- a/src/matrix/room/timeline/persistence/TimelineReader.js +++ b/src/matrix/room/timeline/persistence/TimelineReader.js @@ -46,6 +46,7 @@ export async function readFromWithTxn(eventKey, direction, amount, r, txn) { while (entries.length < amount && eventKey) { let eventsWithinFragment; if (direction.isForward) { + // TODO: should we pass amount - entries.length here? eventsWithinFragment = await timelineStore.eventsAfter(this._roomId, eventKey, amount); } else { eventsWithinFragment = await timelineStore.eventsBefore(this._roomId, eventKey, amount); @@ -56,6 +57,10 @@ export async function readFromWithTxn(eventKey, direction, amount, r, txn) { if (entries.length < amount) { const fragment = await fragmentStore.get(this._roomId, eventKey.fragmentId); + // TODO: why does the first fragment not need to be added? (the next *is* added below) + // it looks like this would be fine when loading in the sync island + // (as the live fragment should be added already) but not for permalinks when we support them + // // this._fragmentIdComparer.addFragment(fragment); let fragmentEntry = new FragmentBoundaryEntry(fragment, direction.isBackward, this._fragmentIdComparer); // append or prepend fragmentEntry, reuse func from GapWriter?