From 94b0cfbd72c46416e42a0c54007b635b28091eab Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 12:11:43 +0200 Subject: [PATCH] add prepareSync and afterPrepareSync steps to sync, run decryption in it --- src/matrix/Session.js | 4 +- src/matrix/Sync.js | 165 ++++++++++++++++++++++++-------------- src/matrix/e2ee/README.md | 44 ++++++++++ src/matrix/room/Room.js | 61 +++++++++----- 4 files changed, 190 insertions(+), 84 deletions(-) create mode 100644 src/matrix/e2ee/README.md diff --git a/src/matrix/Session.js b/src/matrix/Session.js index 19725b58..c5c0c94f 100644 --- a/src/matrix/Session.js +++ b/src/matrix/Session.js @@ -255,7 +255,7 @@ export class Session { return room; } - async writeSync(syncResponse, syncFilterId, roomChanges, txn) { + async writeSync(syncResponse, syncFilterId, txn) { const changes = {}; const syncToken = syncResponse.next_batch; const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count; @@ -362,7 +362,7 @@ export function tests() { } } }; - const newSessionData = await session.writeSync({next_batch: "b"}, 6, {}, syncTxn); + const newSessionData = await session.writeSync({next_batch: "b"}, 6, syncTxn); assert(syncSet); assert.equal(session.syncToken, "a"); assert.equal(session.syncFilterId, 5); diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js index 598b9169..c81acee0 100644 --- a/src/matrix/Sync.js +++ b/src/matrix/Sync.js @@ -29,21 +29,6 @@ export const SyncStatus = createEnum( "Stopped" ); -function parseRooms(roomsSection, roomCallback) { - if (roomsSection) { - const allMemberships = ["join", "invite", "leave"]; - for(const membership of allMemberships) { - const membershipSection = roomsSection[membership]; - if (membershipSection) { - return Object.entries(membershipSection).map(([roomId, roomResponse]) => { - return roomCallback(roomId, roomResponse, membership); - }); - } - } - } - return []; -} - function timelineIsEmpty(roomResponse) { try { const events = roomResponse?.timeline?.events; @@ -53,6 +38,26 @@ function timelineIsEmpty(roomResponse) { } } +/** + * Sync steps in js-pseudocode: + * ```js + * let preparation; + * if (room.needsPrepareSync) { + * // can only read some stores + * preparation = await room.prepareSync(roomResponse, prepareTxn); + * // can do async work that is not related to storage (such as decryption) + * preparation = await room.afterPrepareSync(preparation); + * } + * // writes and calculates changes + * const changes = await room.writeSync(roomResponse, membership, isInitialSync, preparation, syncTxn); + * // applies and emits changes once syncTxn is committed + * room.afterSync(changes); + * if (room.needsAfterSyncCompleted(changes)) { + * // can do network requests + * await room.afterSyncCompleted(changes); + * } + * ``` + */ export class Sync { constructor({hsApi, session, storage}) { this._hsApi = hsApi; @@ -90,13 +95,13 @@ export class Sync { let afterSyncCompletedPromise = Promise.resolve(); // if syncToken is falsy, it will first do an initial sync ... while(this._status.get() !== SyncStatus.Stopped) { - let roomChanges; + let roomStates; try { console.log(`starting sync request with since ${syncToken} ...`); const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined; const syncResult = await this._syncRequest(syncToken, timeout, afterSyncCompletedPromise); syncToken = syncResult.syncToken; - roomChanges = syncResult.roomChanges; + roomStates = syncResult.roomStates; this._status.set(SyncStatus.Syncing); } catch (err) { if (!(err instanceof AbortError)) { @@ -105,12 +110,12 @@ export class Sync { } } if (!this._error) { - afterSyncCompletedPromise = this._runAfterSyncCompleted(roomChanges); + afterSyncCompletedPromise = this._runAfterSyncCompleted(roomStates); } } } - async _runAfterSyncCompleted(roomChanges) { + async _runAfterSyncCompleted(roomStates) { const sessionPromise = (async () => { try { await this._session.afterSyncCompleted(); @@ -118,23 +123,22 @@ export class Sync { console.error("error during session afterSyncCompleted, continuing", err.stack); } })(); - let allPromises = [sessionPromise]; - const roomsNeedingAfterSyncCompleted = roomChanges.filter(rc => { - return rc.changes.needsAfterSyncCompleted; + const roomsNeedingAfterSyncCompleted = roomStates.filter(rs => { + return rs.room.needsAfterSyncCompleted(rs.changes); + }); + const roomsPromises = roomsNeedingAfterSyncCompleted.map(async rs => { + try { + await rs.room.afterSyncCompleted(rs.changes); + } catch (err) { + console.error(`error during room ${rs.room.id} afterSyncCompleted, continuing`, err.stack); + } }); - if (roomsNeedingAfterSyncCompleted.length) { - allPromises = allPromises.concat(roomsNeedingAfterSyncCompleted.map(async ({room, changes}) => { - try { - await room.afterSyncCompleted(changes); - } catch (err) { - console.error(`error during room ${room.id} afterSyncCompleted, continuing`, err.stack); - } - })); - } // run everything in parallel, // we don't want to delay the next sync too much - await Promise.all(allPromises); + // Also, since all promises won't reject (as they have a try/catch) + // it's fine to use Promise.all + await Promise.all(roomsPromises.concat(sessionPromise)); } async _syncRequest(syncToken, timeout, prevAfterSyncCompletedPromise) { @@ -152,16 +156,17 @@ export class Sync { const isInitialSync = !syncToken; syncToken = response.next_batch; - const syncTxn = await this._openSyncTxn(); - let roomChanges = []; + const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync); + await this._prepareRooms(roomStates); let sessionChanges; + const syncTxn = await this._openSyncTxn(); try { - // to_device - // presence - if (response.rooms) { - roomChanges = await this._writeRoomResponses(response.rooms, isInitialSync, syncTxn); - } - sessionChanges = await this._session.writeSync(response, syncFilterId, roomChanges, syncTxn); + await Promise.all(roomStates.map(async rs => { + console.log(` * applying sync response to room ${rs.room.id} ...`); + rs.changes = await rs.room.writeSync( + rs.roomResponse, rs.membership, isInitialSync, rs.preparation, syncTxn); + })); + sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn); } catch(err) { console.warn("aborting syncTxn because of error"); console.error(err); @@ -180,31 +185,31 @@ export class Sync { } this._session.afterSync(sessionChanges); // emit room related events after txn has been closed - for(let {room, changes} of roomChanges) { - room.afterSync(changes); + for(let rs of roomStates) { + rs.room.afterSync(rs.changes); } - return {syncToken, roomChanges}; + return {syncToken, roomStates}; } - async _writeRoomResponses(roomResponses, isInitialSync, syncTxn) { - const roomChanges = []; - const promises = parseRooms(roomResponses, async (roomId, roomResponse, membership) => { - // ignore rooms with empty timelines during initial sync, - // see https://github.com/vector-im/hydrogen-web/issues/15 - if (isInitialSync && timelineIsEmpty(roomResponse)) { - return; - } - let room = this._session.rooms.get(roomId); - if (!room) { - room = this._session.createRoom(roomId); - } - console.log(` * applying sync response to room ${roomId} ...`); - const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn); - roomChanges.push({room, changes}); - }); - await Promise.all(promises); - return roomChanges; + async _openPrepareSyncTxn() { + const storeNames = this._storage.storeNames; + return await this._storage.readTxn([ + storeNames.inboundGroupSessions, + ]); + } + + async _prepareRooms(roomStates) { + const prepareRoomStates = roomStates.filter(rs => rs.room.needsPrepareSync); + if (prepareRoomStates.length) { + const prepareTxn = await this._openPrepareSyncTxn(); + await Promise.all(prepareRoomStates.map(async rs => { + rs.preparation = await rs.room.prepareSync(rs.roomResponse, prepareTxn); + })); + await Promise.all(prepareRoomStates.map(async rs => { + rs.preparation = await rs.room.afterPrepareSync(rs.preparation); + })); + } } async _openSyncTxn() { @@ -218,13 +223,39 @@ export class Sync { storeNames.timelineFragments, storeNames.pendingEvents, storeNames.userIdentities, - storeNames.inboundGroupSessions, storeNames.groupSessionDecryptions, storeNames.deviceIdentities, // to discard outbound session when somebody leaves a room storeNames.outboundGroupSessions ]); } + + _parseRoomsResponse(roomsSection, isInitialSync) { + const roomStates = []; + if (roomsSection) { + // don't do "invite", "leave" for now + const allMemberships = ["join"]; + for(const membership of allMemberships) { + const membershipSection = roomsSection[membership]; + if (membershipSection) { + for (const [roomId, roomResponse] of Object.entries(membershipSection)) { + // ignore rooms with empty timelines during initial sync, + // see https://github.com/vector-im/hydrogen-web/issues/15 + if (isInitialSync && timelineIsEmpty(roomResponse)) { + return; + } + let room = this._session.rooms.get(roomId); + if (!room) { + room = this._session.createRoom(roomId); + } + roomStates.push(new RoomSyncProcessState(room, roomResponse, membership)); + } + } + } + } + return roomStates; + } + stop() { if (this._status.get() === SyncStatus.Stopped) { @@ -237,3 +268,13 @@ export class Sync { } } } + +class RoomSyncProcessState { + constructor(room, roomResponse, membership) { + this.room = room; + this.roomResponse = roomResponse; + this.membership = membership; + this.preparation = null; + this.changes = null; + } +} diff --git a/src/matrix/e2ee/README.md b/src/matrix/e2ee/README.md new file mode 100644 index 00000000..46f4e95f --- /dev/null +++ b/src/matrix/e2ee/README.md @@ -0,0 +1,44 @@ +## Integratation within the sync lifetime cycle + +### prepareSync + + The session can start its own read/write transactions here, rooms only read from a shared transaction + + - session + - device handler + - txn + - write pending encrypted + - txn + - olm decryption read + - olm async decryption + - dispatch to worker + - txn + - olm decryption write / remove pending encrypted + - rooms (with shared read txn) + - megolm decryption read + +### afterPrepareSync + + - rooms + - megolm async decryption + - dispatch to worker + +### writeSync + + - rooms (with shared readwrite txn) + - megolm decryption write, yielding decrypted events + - use decrypted events to write room summary + +### afterSync + + - rooms + - emit changes + +### afterSyncCompleted + + - session + - e2ee account + - generate more otks if needed + - upload new otks if needed or device keys if not uploaded before + - rooms + - share new room keys if needed diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 6a18f34f..f281f166 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -116,30 +116,52 @@ export class Room extends EventEmitter { decryption.applyToEntries(entries); } - } - } - return entry; + get needsPrepareSync() { + // only encrypted rooms need the prepare sync steps + return !!this._roomEncryption; } - async _decryptEntries(entries, txn, isSync = false) { - return await Promise.all(entries.map(async e => this._decryptEntry(e, txn, isSync))); + async prepareSync(roomResponse, txn) { + if (this._roomEncryption) { + const events = roomResponse?.timeline?.events; + if (Array.isArray(events)) { + const eventsToDecrypt = events.filter(event => { + return event?.type === EVENT_ENCRYPTED_TYPE; + }); + const preparation = await this._roomEncryption.prepareDecryptAll( + eventsToDecrypt, DecryptionSource.Sync, this._isTimelineOpen, txn); + return preparation; + } + } + } + + async afterPrepareSync(preparation) { + if (preparation) { + const decryptChanges = await preparation.decrypt(); + return decryptChanges; + } } /** @package */ - async writeSync(roomResponse, membership, isInitialSync, txn) { - const isTimelineOpen = !!this._timeline; + async writeSync(roomResponse, membership, isInitialSync, decryptChanges, txn) { + let decryption; + if (this._roomEncryption && decryptChanges) { + decryption = await decryptChanges.write(txn); + } + const {entries, newLiveKey, memberChanges} = + await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn); + if (decryption) { + decryption.applyToEntries(entries); + } + // pass member changes to device tracker + if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) { + await this._roomEncryption.writeMemberChanges(memberChanges, txn); + } const summaryChanges = this._summary.writeSync( roomResponse, membership, - isInitialSync, isTimelineOpen, + isInitialSync, this._isTimelineOpen, txn); - const {entries: encryptedEntries, newLiveKey, memberChanges} = - await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn); - // decrypt if applicable - let entries = encryptedEntries; - if (this._roomEncryption) { - entries = await this._decryptEntries(encryptedEntries, txn, true); - } // fetch new members while we have txn open, // but don't make any in-memory changes yet let heroChanges; @@ -150,10 +172,6 @@ export class Room extends EventEmitter { } heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn); } - // pass member changes to device tracker - if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) { - await this._roomEncryption.writeMemberChanges(memberChanges, txn); - } let removedPendingEvents; if (roomResponse.timeline && roomResponse.timeline.events) { removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn); @@ -165,7 +183,6 @@ export class Room extends EventEmitter { removedPendingEvents, memberChanges, heroChanges, - needsAfterSyncCompleted: this._roomEncryption?.needsToShareKeys(memberChanges) }; } @@ -216,6 +233,10 @@ export class Room extends EventEmitter { } } + needsAfterSyncCompleted({memberChanges}) { + return this._roomEncryption?.needsToShareKeys(memberChanges); + } + /** * Only called if the result of writeSync had `needsAfterSyncCompleted` set. * Can be used to do longer running operations that resulted from the last sync,