diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js index 4198618a..1295e7db 100644 --- a/src/matrix/Sync.js +++ b/src/matrix/Sync.js @@ -87,12 +87,16 @@ export class Sync { } async _syncLoop(syncToken) { + let afterSyncCompletedPromise = Promise.resolve(); // if syncToken is falsy, it will first do an initial sync ... while(this._status.get() !== SyncStatus.Stopped) { + let roomChanges; try { console.log(`starting sync request with since ${syncToken} ...`); const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined; - syncToken = await this._syncRequest(syncToken, timeout); + const syncResult = await this._syncRequest(syncToken, timeout, afterSyncCompletedPromise); + syncToken = syncResult.syncToken; + roomChanges = syncResult.roomChanges; this._status.set(SyncStatus.Syncing); } catch (err) { if (!(err instanceof AbortError)) { @@ -101,18 +105,39 @@ export class Sync { } } if (!this._error) { - try { - // TODO: run this in parallel with the next sync request - await this._session.afterSyncCompleted(); - } catch (err) { - console.error("error during after sync completed, continuing to sync.", err.stack); - // swallowing error here apart from logging - } + afterSyncCompletedPromise = this._runAfterSyncCompleted(roomChanges); } } } - async _syncRequest(syncToken, timeout) { + async _runAfterSyncCompleted(roomChanges) { + const sessionPromise = (async () => { + try { + await this._session.afterSyncCompleted(); + } catch (err) { + console.error("error during session afterSyncCompleted, continuing", err.stack); + } + })(); + let allPromises = [sessionPromise]; + + const roomsNeedingAfterSyncCompleted = roomChanges.filter(rc => { + return rc.changes.needsAfterSyncCompleted; + }); + if (roomsNeedingAfterSyncCompleted.length) { + allPromises = allPromises.concat(roomsNeedingAfterSyncCompleted.map(async ({room, changes}) => { + try { + await room.afterSyncCompleted(changes); + } catch (err) { + console.error(`error during room ${room.id} afterSyncCompleted, continuing`, err.stack); + } + })); + } + // run everything in parallel, + // we don't want to delay the next sync too much + await Promise.all(allPromises); + } + + async _syncRequest(syncToken, timeout, prevAfterSyncCompletedPromise) { let {syncFilterId} = this._session; if (typeof syncFilterId !== "string") { this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}}); @@ -121,43 +146,20 @@ export class Sync { const totalRequestTimeout = timeout + (80 * 1000); // same as riot-web, don't get stuck on wedged long requests this._currentRequest = this._hsApi.sync(syncToken, syncFilterId, timeout, {timeout: totalRequestTimeout}); const response = await this._currentRequest.response(); + // wait here for the afterSyncCompleted step of the previous sync to complete + // before we continue processing this sync response + await prevAfterSyncCompletedPromise; + const isInitialSync = !syncToken; syncToken = response.next_batch; - const storeNames = this._storage.storeNames; - const syncTxn = await this._storage.readWriteTxn([ - storeNames.session, - storeNames.roomSummary, - storeNames.roomState, - storeNames.roomMembers, - storeNames.timelineEvents, - storeNames.timelineFragments, - storeNames.pendingEvents, - storeNames.userIdentities, - storeNames.inboundGroupSessions, - storeNames.groupSessionDecryptions, - storeNames.deviceIdentities, - ]); - const roomChanges = []; + const syncTxn = await this._openSyncTxn(); + let roomChanges = []; let sessionChanges; try { // to_device // presence if (response.rooms) { - const promises = parseRooms(response.rooms, async (roomId, roomResponse, membership) => { - // ignore rooms with empty timelines during initial sync, - // see https://github.com/vector-im/hydrogen-web/issues/15 - if (isInitialSync && timelineIsEmpty(roomResponse)) { - return; - } - let room = this._session.rooms.get(roomId); - if (!room) { - room = this._session.createRoom(roomId); - } - console.log(` * applying sync response to room ${roomId} ...`); - const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn); - roomChanges.push({room, changes}); - }); - await Promise.all(promises); + roomChanges = await this._writeRoomResponses(response.rooms, isInitialSync, syncTxn); } sessionChanges = await this._session.writeSync(response, syncFilterId, roomChanges, syncTxn); } catch(err) { @@ -182,7 +184,44 @@ export class Sync { room.afterSync(changes); } - return syncToken; + return {syncToken, roomChanges}; + } + + async _writeRoomResponses(roomResponses, isInitialSync, syncTxn) { + const roomChanges = []; + const promises = parseRooms(roomResponses, async (roomId, roomResponse, membership) => { + // ignore rooms with empty timelines during initial sync, + // see https://github.com/vector-im/hydrogen-web/issues/15 + if (isInitialSync && timelineIsEmpty(roomResponse)) { + return; + } + let room = this._session.rooms.get(roomId); + if (!room) { + room = this._session.createRoom(roomId); + } + console.log(` * applying sync response to room ${roomId} ...`); + const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn); + roomChanges.push({room, changes}); + }); + await Promise.all(promises); + return roomChanges; + } + + async _openSyncTxn() { + const storeNames = this._storage.storeNames; + return await this._storage.readWriteTxn([ + storeNames.session, + storeNames.roomSummary, + storeNames.roomState, + storeNames.roomMembers, + storeNames.timelineEvents, + storeNames.timelineFragments, + storeNames.pendingEvents, + storeNames.userIdentities, + storeNames.inboundGroupSessions, + storeNames.groupSessionDecryptions, + storeNames.deviceIdentities, + ]); } stop() {