vector-im-hydrogen-web/src/matrix/Sync.js

151 lines
5.1 KiB
JavaScript
Raw Normal View History

import {AbortError} from "./error.js";
import {ObservableValue} from "../observable/ObservableValue.js";
import {createEnum} from "../utils/enum.js";
2018-12-21 14:35:24 +01:00
2019-02-10 21:25:29 +01:00
const INCREMENTAL_TIMEOUT = 30000;
const SYNC_EVENT_LIMIT = 10;
2018-12-21 14:35:24 +01:00
export const SyncStatus = createEnum(
"InitialSync",
"CatchupSync",
"Syncing",
"Stopped"
);
2019-02-10 21:25:29 +01:00
function parseRooms(roomsSection, roomCallback) {
if (roomsSection) {
const allMemberships = ["join", "invite", "leave"];
for(const membership of allMemberships) {
const membershipSection = roomsSection[membership];
if (membershipSection) {
return Object.entries(membershipSection).map(([roomId, roomResponse]) => {
return roomCallback(roomId, roomResponse, membership);
});
2019-05-12 20:26:46 +02:00
}
}
}
return [];
2019-02-04 23:26:45 +01:00
}
2020-04-20 19:47:45 +02:00
export class Sync {
constructor({hsApi, session, storage}) {
2019-05-12 20:26:46 +02:00
this._hsApi = hsApi;
this._session = session;
this._storage = storage;
this._currentRequest = null;
this._status = new ObservableValue(SyncStatus.Stopped);
this._error = null;
}
get status() {
return this._status;
2019-05-12 20:26:46 +02:00
}
/** the error that made the sync stop */
get error() {
return this._error;
}
start() {
// not already syncing?
if (this._status.get() !== SyncStatus.Stopped) {
2019-05-12 20:26:46 +02:00
return;
}
let syncToken = this._session.syncToken;
if (syncToken) {
this._status.set(SyncStatus.CatchupSync);
} else {
this._status.set(SyncStatus.InitialSync);
2019-05-12 20:26:46 +02:00
}
this._syncLoop(syncToken);
}
2018-12-21 14:35:24 +01:00
2019-05-12 20:26:46 +02:00
async _syncLoop(syncToken) {
// if syncToken is falsy, it will first do an initial sync ...
while(this._status.get() !== SyncStatus.Stopped) {
2019-05-12 20:26:46 +02:00
try {
console.log(`starting sync request with since ${syncToken} ...`);
const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined;
syncToken = await this._syncRequest(syncToken, timeout);
this._status.set(SyncStatus.Syncing);
2019-05-12 20:26:46 +02:00
} catch (err) {
if (!(err instanceof AbortError)) {
this._error = err;
this._status.set(SyncStatus.Stopped);
2019-05-12 20:26:46 +02:00
}
}
}
}
2019-02-03 22:17:24 +01:00
2019-05-12 20:26:46 +02:00
async _syncRequest(syncToken, timeout) {
2019-10-12 20:24:09 +02:00
let {syncFilterId} = this._session;
if (typeof syncFilterId !== "string") {
2020-04-20 21:27:07 +02:00
// TODO: this should be interruptable by stop, we can reuse _currentRequest
2019-10-12 20:24:09 +02:00
syncFilterId = (await this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}}).response()).filter_id;
}
this._currentRequest = this._hsApi.sync(syncToken, syncFilterId, timeout);
2019-05-12 20:26:46 +02:00
const response = await this._currentRequest.response();
syncToken = response.next_batch;
const storeNames = this._storage.storeNames;
const syncTxn = await this._storage.readWriteTxn([
storeNames.session,
storeNames.roomSummary,
storeNames.roomState,
2019-05-12 20:26:46 +02:00
storeNames.timelineEvents,
2019-05-19 20:49:46 +02:00
storeNames.timelineFragments,
storeNames.pendingEvents,
2019-05-12 20:26:46 +02:00
]);
const roomChanges = [];
let sessionChanges;
try {
sessionChanges = this._session.writeSync(syncToken, syncFilterId, response.account_data, syncTxn);
// to_device
// presence
2019-05-12 20:26:46 +02:00
if (response.rooms) {
const promises = parseRooms(response.rooms, async (roomId, roomResponse, membership) => {
2019-05-12 20:26:46 +02:00
let room = this._session.rooms.get(roomId);
if (!room) {
room = this._session.createRoom(roomId);
}
console.log(` * applying sync response to room ${roomId} ...`);
const changes = await room.writeSync(roomResponse, membership, syncTxn);
roomChanges.push({room, changes});
2019-05-12 20:26:46 +02:00
});
await Promise.all(promises);
2019-05-12 20:26:46 +02:00
}
} catch(err) {
2019-06-02 14:59:30 +02:00
console.warn("aborting syncTxn because of error");
2019-05-12 20:26:46 +02:00
// avoid corrupting state by only
// storing the sync up till the point
// the exception occurred
syncTxn.abort();
throw err;
}
try {
await syncTxn.complete();
console.info("syncTxn committed!!");
} catch (err) {
2019-10-12 22:18:19 +02:00
console.error("unable to commit sync tranaction");
throw err;
2019-05-12 20:26:46 +02:00
}
this._session.afterSync(sessionChanges);
// emit room related events after txn has been closed
for(let {room, changes} of roomChanges) {
room.afterSync(changes);
}
2019-05-12 20:26:46 +02:00
return syncToken;
}
2018-12-21 14:35:24 +01:00
2019-05-12 20:26:46 +02:00
stop() {
if (this._status.get() === SyncStatus.Stopped) {
2019-05-12 20:26:46 +02:00
return;
}
this._status.set(SyncStatus.Stopped);
2019-05-12 20:26:46 +02:00
if (this._currentRequest) {
this._currentRequest.abort();
this._currentRequest = null;
}
}
2019-02-20 23:48:16 +01:00
}