add prepareSync and afterPrepareSync steps to sync, run decryption in it

This commit is contained in:
Bruno Windels 2020-09-10 12:11:43 +02:00
parent 1c77c3b876
commit 94b0cfbd72
4 changed files with 190 additions and 84 deletions

View File

@ -255,7 +255,7 @@ export class Session {
return room;
}
async writeSync(syncResponse, syncFilterId, roomChanges, txn) {
async writeSync(syncResponse, syncFilterId, txn) {
const changes = {};
const syncToken = syncResponse.next_batch;
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
@ -362,7 +362,7 @@ export function tests() {
}
}
};
const newSessionData = await session.writeSync({next_batch: "b"}, 6, {}, syncTxn);
const newSessionData = await session.writeSync({next_batch: "b"}, 6, syncTxn);
assert(syncSet);
assert.equal(session.syncToken, "a");
assert.equal(session.syncFilterId, 5);

View File

@ -29,21 +29,6 @@ export const SyncStatus = createEnum(
"Stopped"
);
function parseRooms(roomsSection, roomCallback) {
if (roomsSection) {
const allMemberships = ["join", "invite", "leave"];
for(const membership of allMemberships) {
const membershipSection = roomsSection[membership];
if (membershipSection) {
return Object.entries(membershipSection).map(([roomId, roomResponse]) => {
return roomCallback(roomId, roomResponse, membership);
});
}
}
}
return [];
}
function timelineIsEmpty(roomResponse) {
try {
const events = roomResponse?.timeline?.events;
@ -53,6 +38,26 @@ function timelineIsEmpty(roomResponse) {
}
}
/**
* Sync steps in js-pseudocode:
* ```js
* let preparation;
* if (room.needsPrepareSync) {
* // can only read some stores
* preparation = await room.prepareSync(roomResponse, prepareTxn);
* // can do async work that is not related to storage (such as decryption)
* preparation = await room.afterPrepareSync(preparation);
* }
* // writes and calculates changes
* const changes = await room.writeSync(roomResponse, membership, isInitialSync, preparation, syncTxn);
* // applies and emits changes once syncTxn is committed
* room.afterSync(changes);
* if (room.needsAfterSyncCompleted(changes)) {
* // can do network requests
* await room.afterSyncCompleted(changes);
* }
* ```
*/
export class Sync {
constructor({hsApi, session, storage}) {
this._hsApi = hsApi;
@ -90,13 +95,13 @@ export class Sync {
let afterSyncCompletedPromise = Promise.resolve();
// if syncToken is falsy, it will first do an initial sync ...
while(this._status.get() !== SyncStatus.Stopped) {
let roomChanges;
let roomStates;
try {
console.log(`starting sync request with since ${syncToken} ...`);
const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined;
const syncResult = await this._syncRequest(syncToken, timeout, afterSyncCompletedPromise);
syncToken = syncResult.syncToken;
roomChanges = syncResult.roomChanges;
roomStates = syncResult.roomStates;
this._status.set(SyncStatus.Syncing);
} catch (err) {
if (!(err instanceof AbortError)) {
@ -105,12 +110,12 @@ export class Sync {
}
}
if (!this._error) {
afterSyncCompletedPromise = this._runAfterSyncCompleted(roomChanges);
afterSyncCompletedPromise = this._runAfterSyncCompleted(roomStates);
}
}
}
async _runAfterSyncCompleted(roomChanges) {
async _runAfterSyncCompleted(roomStates) {
const sessionPromise = (async () => {
try {
await this._session.afterSyncCompleted();
@ -118,23 +123,22 @@ export class Sync {
console.error("error during session afterSyncCompleted, continuing", err.stack);
}
})();
let allPromises = [sessionPromise];
const roomsNeedingAfterSyncCompleted = roomChanges.filter(rc => {
return rc.changes.needsAfterSyncCompleted;
const roomsNeedingAfterSyncCompleted = roomStates.filter(rs => {
return rs.room.needsAfterSyncCompleted(rs.changes);
});
const roomsPromises = roomsNeedingAfterSyncCompleted.map(async rs => {
try {
await rs.room.afterSyncCompleted(rs.changes);
} catch (err) {
console.error(`error during room ${rs.room.id} afterSyncCompleted, continuing`, err.stack);
}
});
if (roomsNeedingAfterSyncCompleted.length) {
allPromises = allPromises.concat(roomsNeedingAfterSyncCompleted.map(async ({room, changes}) => {
try {
await room.afterSyncCompleted(changes);
} catch (err) {
console.error(`error during room ${room.id} afterSyncCompleted, continuing`, err.stack);
}
}));
}
// run everything in parallel,
// we don't want to delay the next sync too much
await Promise.all(allPromises);
// Also, since all promises won't reject (as they have a try/catch)
// it's fine to use Promise.all
await Promise.all(roomsPromises.concat(sessionPromise));
}
async _syncRequest(syncToken, timeout, prevAfterSyncCompletedPromise) {
@ -152,16 +156,17 @@ export class Sync {
const isInitialSync = !syncToken;
syncToken = response.next_batch;
const syncTxn = await this._openSyncTxn();
let roomChanges = [];
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
await this._prepareRooms(roomStates);
let sessionChanges;
const syncTxn = await this._openSyncTxn();
try {
// to_device
// presence
if (response.rooms) {
roomChanges = await this._writeRoomResponses(response.rooms, isInitialSync, syncTxn);
}
sessionChanges = await this._session.writeSync(response, syncFilterId, roomChanges, syncTxn);
await Promise.all(roomStates.map(async rs => {
console.log(` * applying sync response to room ${rs.room.id} ...`);
rs.changes = await rs.room.writeSync(
rs.roomResponse, rs.membership, isInitialSync, rs.preparation, syncTxn);
}));
sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn);
} catch(err) {
console.warn("aborting syncTxn because of error");
console.error(err);
@ -180,31 +185,31 @@ export class Sync {
}
this._session.afterSync(sessionChanges);
// emit room related events after txn has been closed
for(let {room, changes} of roomChanges) {
room.afterSync(changes);
for(let rs of roomStates) {
rs.room.afterSync(rs.changes);
}
return {syncToken, roomChanges};
return {syncToken, roomStates};
}
async _writeRoomResponses(roomResponses, isInitialSync, syncTxn) {
const roomChanges = [];
const promises = parseRooms(roomResponses, async (roomId, roomResponse, membership) => {
// ignore rooms with empty timelines during initial sync,
// see https://github.com/vector-im/hydrogen-web/issues/15
if (isInitialSync && timelineIsEmpty(roomResponse)) {
return;
}
let room = this._session.rooms.get(roomId);
if (!room) {
room = this._session.createRoom(roomId);
}
console.log(` * applying sync response to room ${roomId} ...`);
const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn);
roomChanges.push({room, changes});
});
await Promise.all(promises);
return roomChanges;
async _openPrepareSyncTxn() {
const storeNames = this._storage.storeNames;
return await this._storage.readTxn([
storeNames.inboundGroupSessions,
]);
}
async _prepareRooms(roomStates) {
const prepareRoomStates = roomStates.filter(rs => rs.room.needsPrepareSync);
if (prepareRoomStates.length) {
const prepareTxn = await this._openPrepareSyncTxn();
await Promise.all(prepareRoomStates.map(async rs => {
rs.preparation = await rs.room.prepareSync(rs.roomResponse, prepareTxn);
}));
await Promise.all(prepareRoomStates.map(async rs => {
rs.preparation = await rs.room.afterPrepareSync(rs.preparation);
}));
}
}
async _openSyncTxn() {
@ -218,7 +223,6 @@ export class Sync {
storeNames.timelineFragments,
storeNames.pendingEvents,
storeNames.userIdentities,
storeNames.inboundGroupSessions,
storeNames.groupSessionDecryptions,
storeNames.deviceIdentities,
// to discard outbound session when somebody leaves a room
@ -226,6 +230,33 @@ export class Sync {
]);
}
_parseRoomsResponse(roomsSection, isInitialSync) {
const roomStates = [];
if (roomsSection) {
// don't do "invite", "leave" for now
const allMemberships = ["join"];
for(const membership of allMemberships) {
const membershipSection = roomsSection[membership];
if (membershipSection) {
for (const [roomId, roomResponse] of Object.entries(membershipSection)) {
// ignore rooms with empty timelines during initial sync,
// see https://github.com/vector-im/hydrogen-web/issues/15
if (isInitialSync && timelineIsEmpty(roomResponse)) {
return;
}
let room = this._session.rooms.get(roomId);
if (!room) {
room = this._session.createRoom(roomId);
}
roomStates.push(new RoomSyncProcessState(room, roomResponse, membership));
}
}
}
}
return roomStates;
}
stop() {
if (this._status.get() === SyncStatus.Stopped) {
return;
@ -237,3 +268,13 @@ export class Sync {
}
}
}
class RoomSyncProcessState {
constructor(room, roomResponse, membership) {
this.room = room;
this.roomResponse = roomResponse;
this.membership = membership;
this.preparation = null;
this.changes = null;
}
}

44
src/matrix/e2ee/README.md Normal file
View File

@ -0,0 +1,44 @@
## Integratation within the sync lifetime cycle
### prepareSync
The session can start its own read/write transactions here, rooms only read from a shared transaction
- session
- device handler
- txn
- write pending encrypted
- txn
- olm decryption read
- olm async decryption
- dispatch to worker
- txn
- olm decryption write / remove pending encrypted
- rooms (with shared read txn)
- megolm decryption read
### afterPrepareSync
- rooms
- megolm async decryption
- dispatch to worker
### writeSync
- rooms (with shared readwrite txn)
- megolm decryption write, yielding decrypted events
- use decrypted events to write room summary
### afterSync
- rooms
- emit changes
### afterSyncCompleted
- session
- e2ee account
- generate more otks if needed
- upload new otks if needed or device keys if not uploaded before
- rooms
- share new room keys if needed

View File

@ -116,30 +116,52 @@ export class Room extends EventEmitter {
decryption.applyToEntries(entries);
}
}
}
return entry;
get needsPrepareSync() {
// only encrypted rooms need the prepare sync steps
return !!this._roomEncryption;
}
async _decryptEntries(entries, txn, isSync = false) {
return await Promise.all(entries.map(async e => this._decryptEntry(e, txn, isSync)));
async prepareSync(roomResponse, txn) {
if (this._roomEncryption) {
const events = roomResponse?.timeline?.events;
if (Array.isArray(events)) {
const eventsToDecrypt = events.filter(event => {
return event?.type === EVENT_ENCRYPTED_TYPE;
});
const preparation = await this._roomEncryption.prepareDecryptAll(
eventsToDecrypt, DecryptionSource.Sync, this._isTimelineOpen, txn);
return preparation;
}
}
}
async afterPrepareSync(preparation) {
if (preparation) {
const decryptChanges = await preparation.decrypt();
return decryptChanges;
}
}
/** @package */
async writeSync(roomResponse, membership, isInitialSync, txn) {
const isTimelineOpen = !!this._timeline;
async writeSync(roomResponse, membership, isInitialSync, decryptChanges, txn) {
let decryption;
if (this._roomEncryption && decryptChanges) {
decryption = await decryptChanges.write(txn);
}
const {entries, newLiveKey, memberChanges} =
await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn);
if (decryption) {
decryption.applyToEntries(entries);
}
// pass member changes to device tracker
if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) {
await this._roomEncryption.writeMemberChanges(memberChanges, txn);
}
const summaryChanges = this._summary.writeSync(
roomResponse,
membership,
isInitialSync, isTimelineOpen,
isInitialSync, this._isTimelineOpen,
txn);
const {entries: encryptedEntries, newLiveKey, memberChanges} =
await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn);
// decrypt if applicable
let entries = encryptedEntries;
if (this._roomEncryption) {
entries = await this._decryptEntries(encryptedEntries, txn, true);
}
// fetch new members while we have txn open,
// but don't make any in-memory changes yet
let heroChanges;
@ -150,10 +172,6 @@ export class Room extends EventEmitter {
}
heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn);
}
// pass member changes to device tracker
if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) {
await this._roomEncryption.writeMemberChanges(memberChanges, txn);
}
let removedPendingEvents;
if (roomResponse.timeline && roomResponse.timeline.events) {
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
@ -165,7 +183,6 @@ export class Room extends EventEmitter {
removedPendingEvents,
memberChanges,
heroChanges,
needsAfterSyncCompleted: this._roomEncryption?.needsToShareKeys(memberChanges)
};
}
@ -216,6 +233,10 @@ export class Room extends EventEmitter {
}
}
needsAfterSyncCompleted({memberChanges}) {
return this._roomEncryption?.needsToShareKeys(memberChanges);
}
/**
* Only called if the result of writeSync had `needsAfterSyncCompleted` set.
* Can be used to do longer running operations that resulted from the last sync,