support running afterSyncCompleted step on rooms as well

and make it in parallel with next sync request
This commit is contained in:
Bruno Windels 2020-09-08 14:37:24 +02:00
parent 52c3c7c03d
commit c158e3da77

View File

@ -87,12 +87,16 @@ export class Sync {
} }
async _syncLoop(syncToken) { async _syncLoop(syncToken) {
let afterSyncCompletedPromise = Promise.resolve();
// if syncToken is falsy, it will first do an initial sync ... // if syncToken is falsy, it will first do an initial sync ...
while(this._status.get() !== SyncStatus.Stopped) { while(this._status.get() !== SyncStatus.Stopped) {
let roomChanges;
try { try {
console.log(`starting sync request with since ${syncToken} ...`); console.log(`starting sync request with since ${syncToken} ...`);
const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined; const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined;
syncToken = await this._syncRequest(syncToken, timeout); const syncResult = await this._syncRequest(syncToken, timeout, afterSyncCompletedPromise);
syncToken = syncResult.syncToken;
roomChanges = syncResult.roomChanges;
this._status.set(SyncStatus.Syncing); this._status.set(SyncStatus.Syncing);
} catch (err) { } catch (err) {
if (!(err instanceof AbortError)) { if (!(err instanceof AbortError)) {
@ -101,18 +105,39 @@ export class Sync {
} }
} }
if (!this._error) { if (!this._error) {
try { afterSyncCompletedPromise = this._runAfterSyncCompleted(roomChanges);
// TODO: run this in parallel with the next sync request
await this._session.afterSyncCompleted();
} catch (err) {
console.error("error during after sync completed, continuing to sync.", err.stack);
// swallowing error here apart from logging
}
} }
} }
} }
async _syncRequest(syncToken, timeout) { async _runAfterSyncCompleted(roomChanges) {
const sessionPromise = (async () => {
try {
await this._session.afterSyncCompleted();
} catch (err) {
console.error("error during session afterSyncCompleted, continuing", err.stack);
}
})();
let allPromises = [sessionPromise];
const roomsNeedingAfterSyncCompleted = roomChanges.filter(rc => {
return rc.changes.needsAfterSyncCompleted;
});
if (roomsNeedingAfterSyncCompleted.length) {
allPromises = allPromises.concat(roomsNeedingAfterSyncCompleted.map(async ({room, changes}) => {
try {
await room.afterSyncCompleted(changes);
} catch (err) {
console.error(`error during room ${room.id} afterSyncCompleted, continuing`, err.stack);
}
}));
}
// run everything in parallel,
// we don't want to delay the next sync too much
await Promise.all(allPromises);
}
async _syncRequest(syncToken, timeout, prevAfterSyncCompletedPromise) {
let {syncFilterId} = this._session; let {syncFilterId} = this._session;
if (typeof syncFilterId !== "string") { if (typeof syncFilterId !== "string") {
this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}}); this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}});
@ -121,43 +146,20 @@ export class Sync {
const totalRequestTimeout = timeout + (80 * 1000); // same as riot-web, don't get stuck on wedged long requests const totalRequestTimeout = timeout + (80 * 1000); // same as riot-web, don't get stuck on wedged long requests
this._currentRequest = this._hsApi.sync(syncToken, syncFilterId, timeout, {timeout: totalRequestTimeout}); this._currentRequest = this._hsApi.sync(syncToken, syncFilterId, timeout, {timeout: totalRequestTimeout});
const response = await this._currentRequest.response(); const response = await this._currentRequest.response();
// wait here for the afterSyncCompleted step of the previous sync to complete
// before we continue processing this sync response
await prevAfterSyncCompletedPromise;
const isInitialSync = !syncToken; const isInitialSync = !syncToken;
syncToken = response.next_batch; syncToken = response.next_batch;
const storeNames = this._storage.storeNames; const syncTxn = await this._openSyncTxn();
const syncTxn = await this._storage.readWriteTxn([ let roomChanges = [];
storeNames.session,
storeNames.roomSummary,
storeNames.roomState,
storeNames.roomMembers,
storeNames.timelineEvents,
storeNames.timelineFragments,
storeNames.pendingEvents,
storeNames.userIdentities,
storeNames.inboundGroupSessions,
storeNames.groupSessionDecryptions,
storeNames.deviceIdentities,
]);
const roomChanges = [];
let sessionChanges; let sessionChanges;
try { try {
// to_device // to_device
// presence // presence
if (response.rooms) { if (response.rooms) {
const promises = parseRooms(response.rooms, async (roomId, roomResponse, membership) => { roomChanges = await this._writeRoomResponses(response.rooms, isInitialSync, syncTxn);
// ignore rooms with empty timelines during initial sync,
// see https://github.com/vector-im/hydrogen-web/issues/15
if (isInitialSync && timelineIsEmpty(roomResponse)) {
return;
}
let room = this._session.rooms.get(roomId);
if (!room) {
room = this._session.createRoom(roomId);
}
console.log(` * applying sync response to room ${roomId} ...`);
const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn);
roomChanges.push({room, changes});
});
await Promise.all(promises);
} }
sessionChanges = await this._session.writeSync(response, syncFilterId, roomChanges, syncTxn); sessionChanges = await this._session.writeSync(response, syncFilterId, roomChanges, syncTxn);
} catch(err) { } catch(err) {
@ -182,7 +184,44 @@ export class Sync {
room.afterSync(changes); room.afterSync(changes);
} }
return syncToken; return {syncToken, roomChanges};
}
async _writeRoomResponses(roomResponses, isInitialSync, syncTxn) {
const roomChanges = [];
const promises = parseRooms(roomResponses, async (roomId, roomResponse, membership) => {
// ignore rooms with empty timelines during initial sync,
// see https://github.com/vector-im/hydrogen-web/issues/15
if (isInitialSync && timelineIsEmpty(roomResponse)) {
return;
}
let room = this._session.rooms.get(roomId);
if (!room) {
room = this._session.createRoom(roomId);
}
console.log(` * applying sync response to room ${roomId} ...`);
const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn);
roomChanges.push({room, changes});
});
await Promise.all(promises);
return roomChanges;
}
async _openSyncTxn() {
const storeNames = this._storage.storeNames;
return await this._storage.readWriteTxn([
storeNames.session,
storeNames.roomSummary,
storeNames.roomState,
storeNames.roomMembers,
storeNames.timelineEvents,
storeNames.timelineFragments,
storeNames.pendingEvents,
storeNames.userIdentities,
storeNames.inboundGroupSessions,
storeNames.groupSessionDecryptions,
storeNames.deviceIdentities,
]);
} }
stop() { stop() {