From 99d5467ad1f7674b0768a76daadc0cb759693041 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 10 May 2021 18:42:30 +0200 Subject: [PATCH] make archived room part of sync lifecycle (draft) --- src/matrix/Session.js | 102 +++++++++++++++--------- src/matrix/Sync.js | 169 +++++++++++++++++++++++++++++----------- src/matrix/room/Room.js | 9 +-- 3 files changed, 194 insertions(+), 86 deletions(-) diff --git a/src/matrix/Session.js b/src/matrix/Session.js index c2b85215..961adf11 100644 --- a/src/matrix/Session.js +++ b/src/matrix/Session.js @@ -16,6 +16,7 @@ limitations under the License. */ import {Room} from "./room/Room.js"; +import {ArchivedRoom} from "./room/ArchivedRoom.js"; import {RoomStatus} from "./room/RoomStatus.js"; import {Invite} from "./room/Invite.js"; import {Pusher} from "./push/Pusher.js"; @@ -399,10 +400,18 @@ export class Session { } /** @internal */ - addRoomAfterSync(room) { - this._rooms.add(room.id, room); - const statusObservable = this._observedRoomStatus.get(room.id); - statusObservable?.set(RoomStatus.joined); + createArchivedRoom(roomId) { + return new ArchivedRoom({ + roomId, + getSyncToken: this._getSyncToken, + storage: this._storage, + emitCollectionChange: () => {}, + hsApi: this._hsApi, + mediaRepository: this._mediaRepository, + user: this._user, + createRoomEncryption: this._createRoomEncryption, + platform: this._platform + }); } get invites() { @@ -421,34 +430,6 @@ export class Session { }); } - /** @internal */ - addInviteAfterSync(invite) { - this._invites.add(invite.id, invite); - const statusObservable = this._observedRoomStatus.get(invite.id); - if (statusObservable) { - statusObservable.set(statusObservable.get().withInvited()); - } - } - - /** @internal */ - removeInviteAfterSync(invite) { - this._invites.remove(invite.id); - const statusObservable = this._observedRoomStatus.get(invite.id); - if (statusObservable) { - statusObservable.set(statusObservable.get().withoutInvited()); - } - } - - /** @internal */ - archiveRoomAfterSync(room) { - this._rooms.remove(room.id); - const statusObservable = this._observedRoomStatus.get(room.id); - statusObservable?.set(RoomStatus.archived); - if (this._archivedRooms) { - this._archivedRooms.add(room.id, room); - } - } - async obtainSyncLock(syncResponse) { const toDeviceEvents = syncResponse.to_device?.events; if (Array.isArray(toDeviceEvents) && toDeviceEvents.length) { @@ -528,6 +509,58 @@ export class Session { } } + applyRoomCollectionChangesAfterSync(inviteStates, roomStates, archivedRoomStates) { + // update the collections after sync + if (this._archivedRooms) { + for (const ars of archivedRoomStates) { + if (ars.shouldAdd) { + this._archivedRooms.add(ars.id, ars.archivedRoom); + } else if (ars.shouldRemove) { + this._archivedRooms.remove(ars.id); + } + } + } + for (const rs of roomStates) { + if (rs.shouldAdd) { + this._rooms.add(rs.id, rs.room); + } else if (rs.shouldRemove) { + this._rooms.remove(rs.id); + } + } + for (const is of inviteStates) { + if (is.shouldAdd) { + this._invites.add(is.id, is.invite); + } else if (is.shouldRemove) { + this._invites.remove(is.id); + } + } + // now all the collections are updated, update the room status + // so any listeners to the status will find the collections + // completely up to date + if (this._observedRoomStatus.size !== 0) { + for (const ars of archivedRoomStates) { + if (ars.shouldAdd) { + this._observedRoomStatus.get(ars.id)?.set(RoomStatus.archived); + } + } + for (const rs of roomStates) { + if (rs.shouldAdd) { + this._observedRoomStatus.get(rs.id)?.set(RoomStatus.joined); + } + } + for (const is of inviteStates) { + const statusObservable = this._observedRoomStatus.get(is.id); + if (statusObservable) { + if (is.shouldAdd) { + statusObservable.set(statusObservable.get().withInvited()); + } else if (is.shouldRemove) { + statusObservable.set(statusObservable.get().withoutInvited()); + } + } + } + } + } + /** @internal */ get syncToken() { return this._syncInfo?.token; @@ -658,10 +691,7 @@ export class Session { ]); const summary = await txn.archivedRoomSummary.get(roomId); if (summary) { - // TODO: should we really be using a Room here? - // Or rather an ArchivedRoom that shares a common base class with Room? - // That will make the Room code harder to read though ... - const room = this.createRoom(roomId); + const room = this.createArchivedRoom(roomId); await room.load(summary, txn, log); return room; } diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js index 35703589..dc613d00 100644 --- a/src/matrix/Sync.js +++ b/src/matrix/Sync.js @@ -192,7 +192,8 @@ export class Sync { const isInitialSync = !syncToken; const sessionState = new SessionSyncProcessState(); const inviteStates = this._parseInvites(response.rooms); - const roomStates = this._parseRoomsResponse(response.rooms, inviteStates, isInitialSync); + const {roomStates, archivedRoomStates} = await this._parseRoomsResponse( + response.rooms, inviteStates, isInitialSync); try { // take a lock on olm sessions used in this sync so sending a message doesn't change them while syncing @@ -202,12 +203,14 @@ export class Sync { return rs.room.afterPrepareSync(rs.preparation, log); }))); await log.wrap("write", async log => this._writeSync( - sessionState, inviteStates, roomStates, response, syncFilterId, isInitialSync, log)); + sessionState, inviteStates, roomStates, archivedRoomStates, + response, syncFilterId, isInitialSync, log)); } finally { sessionState.dispose(); } // sync txn comitted, emit updates and apply changes to in-memory state - log.wrap("after", log => this._afterSync(sessionState, inviteStates, roomStates, log)); + log.wrap("after", log => this._afterSync( + sessionState, inviteStates, roomStates, archivedRoomStates, log)); const toDeviceEvents = response.to_device?.events; return { @@ -255,6 +258,8 @@ export class Sync { await Promise.all(roomStates.map(async rs => { const newKeys = newKeysByRoom?.get(rs.room.id); rs.preparation = await log.wrap("room", async log => { + // if previously joined and we still have the timeline for it, + // this loads the syncWriter at the correct position to continue writing the timeline if (rs.isNewRoom) { await rs.room.load(null, prepareTxn, log); } @@ -267,7 +272,7 @@ export class Sync { await prepareTxn.complete(); } - async _writeSync(sessionState, inviteStates, roomStates, response, syncFilterId, isInitialSync, log) { + async _writeSync(sessionState, inviteStates, roomStates, archivedRoomStates, response, syncFilterId, isInitialSync, log) { const syncTxn = await this._openSyncTxn(); try { sessionState.changes = await log.wrap("session", log => this._session.writeSync( @@ -280,6 +285,13 @@ export class Sync { rs.changes = await log.wrap("room", log => rs.room.writeSync( rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log)); })); + // important to do this after roomStates, + // as we're referring to the roomState to get the summaryChanges + await Promise.all(archivedRoomStates.map(async ars => { + const summaryChanges = ars.roomState?.summaryChanges; + ars.changes = await log.wrap("archivedRoom", log => ars.archivedRoom.writeSync( + summaryChanges, ars.roomResponse, ars.membership, syncTxn, log)); + })); } catch(err) { // avoid corrupting state by only // storing the sync up till the point @@ -294,35 +306,18 @@ export class Sync { await syncTxn.complete(); } - _afterSync(sessionState, inviteStates, roomStates, log) { + _afterSync(sessionState, inviteStates, roomStates, archivedRoomStates, log) { log.wrap("session", log => this._session.afterSync(sessionState.changes, log), log.level.Detail); - // emit room related events after txn has been closed + for(let ars of archivedRoomStates) { + log.wrap("archivedRoom", () => ars.archivedRoom.afterSync(ars.changes), log.level.Detail); + } for(let rs of roomStates) { log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail); - if (rs.isNewRoom) { - // important to add the room before removing the invite, - // so the room will be found if looking for it when the invite - // is removed - this._session.addRoomAfterSync(rs.room); - } else if (rs.membership === "leave") { - this._session.archiveRoomAfterSync(rs.room); - } } - // emit invite related events after txn has been closed for(let is of inviteStates) { - log.wrap("invite", () => { - // important to remove before emitting change in afterSync - // so code checking session.invites.get(id) won't - // find the invite anymore on update - if (is.membership !== "invite") { - this._session.removeInviteAfterSync(is.invite); - } - is.invite.afterSync(is.changes); - }, log.level.Detail); - if (is.isNewInvite) { - this._session.addInviteAfterSync(is.invite); - } + log.wrap("invite", () => is.invite.afterSync(is.changes), log.level.Detail); } + this._session.applyRoomCollectionChangesAfterSync(inviteStates, roomStates, archivedRoomStates); } _openSyncTxn() { @@ -351,8 +346,9 @@ export class Sync { ]); } - _parseRoomsResponse(roomsSection, inviteStates, isInitialSync) { + async _parseRoomsResponse(roomsSection, inviteStates, isInitialSync) { const roomStates = []; + const archivedRoomStates = []; if (roomsSection) { const allMemberships = ["join", "leave"]; for(const membership of allMemberships) { @@ -364,28 +360,64 @@ export class Sync { if (isInitialSync && timelineIsEmpty(roomResponse)) { continue; } - let isNewRoom = false; - let room = this._session.rooms.get(roomId); - // don't create a room for a rejected invite - if (!room && membership === "join") { - room = this._session.createRoom(roomId); - isNewRoom = true; - } const invite = this._session.invites.get(roomId); // if there is an existing invite, add a process state for it // so its writeSync and afterSync will run and remove the invite if (invite) { - inviteStates.push(new InviteSyncProcessState(invite, false, null, membership, null)); + inviteStates.push(new InviteSyncProcessState(invite, false, null, membership)); } - if (room) { - roomStates.push(new RoomSyncProcessState( - room, isNewRoom, invite, roomResponse, membership)); + const roomState = this._createRoomSyncState(roomId, invite, roomResponse, membership); + if (roomState) { + roomStates.push(roomState); + } + const ars = await this._createArchivedRoomSyncState(roomId, roomState, roomResponse, membership); + if (ars) { + archivedRoomStates.push(ars); } } } } } - return roomStates; + return {roomStates, archivedRoomStates}; + } + + _createRoomSyncState(roomId, invite, roomResponse, membership) { + let isNewRoom = false; + let room = this._session.rooms.get(roomId); + // create room only on new join, + // don't create a room for a rejected invite + if (!room && membership === "join") { + room = this._session.createRoom(roomId); + isNewRoom = true; + } + if (room) { + return new RoomSyncProcessState( + room, isNewRoom, invite, roomResponse, membership); + } + } + + async _createArchivedRoomSyncState(roomId, roomState, roomResponse, membership) { + let archivedRoom; + if (membership === "join") { + // when joining, always create the archived room to write the removal + archivedRoom = this._session.createArchivedRoom(roomId); + } else if (membership === "leave") { + if (roomState) { + // we still have a roomState, so we just left it + // in this case, create a new archivedRoom + archivedRoom = this._session.createArchivedRoom(roomId); + } else { + // this is an update of an already left room, restore + // it from storage first, so we can increment it. + // this happens for example when our membership changes + // after leaving (e.g. being (un)banned, possibly after being kicked), etc + archivedRoom = await this._session.loadArchivedRoom(roomId); + } + } + if (archivedRoom) { + return new ArchivedRoomSyncProcessState( + archivedRoom, roomState, roomResponse, membership); + } } _parseInvites(roomsSection) { @@ -398,8 +430,7 @@ export class Sync { invite = this._session.createInvite(roomId); isNewInvite = true; } - const room = this._session.rooms.get(roomId); - inviteStates.push(new InviteSyncProcessState(invite, isNewInvite, room, "invite", roomResponse)); + inviteStates.push(new InviteSyncProcessState(invite, isNewInvite, roomResponse, "invite")); } } return inviteStates; @@ -440,15 +471,65 @@ class RoomSyncProcessState { this.preparation = null; this.changes = null; } + + get id() { + return this.room.id; + } + + get shouldAdd() { + return this.isNewRoom; + } + + get shouldRemove() { + return this.membership !== "join"; + } + + get summaryChanges() { + return this.changes?.summaryChanges; + } +} + + +class ArchivedRoomSyncProcessState { + constructor(archivedRoom, roomState, roomResponse, membership) { + this.archivedRoom = archivedRoom; + this.roomState = roomState; + this.roomResponse = roomResponse; + this.membership = membership; + this.changes = null; + } + + get id() { + return this.archivedRoom.id; + } + + get shouldAdd() { + return this.roomState && this.membership === "leave"; + } + + get shouldRemove() { + return this.membership === "join"; + } } class InviteSyncProcessState { - constructor(invite, isNewInvite, room, membership, roomResponse) { + constructor(invite, isNewInvite, roomResponse, membership) { this.invite = invite; this.isNewInvite = isNewInvite; - this.room = room; this.membership = membership; this.roomResponse = roomResponse; this.changes = null; } + + get id() { + return this.invite.id; + } + + get shouldAdd() { + return this.isNewInvite; + } + + get shouldRemove() { + return this.membership !== "invite"; + } } diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 7e45a8da..ff13c0f4 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -45,7 +45,7 @@ export class Room extends BaseRoom { if (newKeys) { log.set("newKeys", newKeys.length); } - let summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership, this._user.id); + let summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership); if (membership === "join" && invite) { summaryChanges = summaryChanges.applyInvite(invite); } @@ -105,8 +105,6 @@ export class Room extends BaseRoom { // so no old state sticks around txn.roomState.removeAllForRoom(this.id); txn.roomMembers.removeAllForRoom(this.id); - // TODO: this should be done in ArchivedRoom - txn.archivedRoomSummary.remove(this.id); } const {entries: newEntries, newLiveKey, memberChanges} = await log.wrap("syncWriter", log => this._syncWriter.writeSync(roomResponse, isRejoin, txn, log), log.level.Detail); @@ -135,10 +133,9 @@ export class Room extends BaseRoom { summaryChanges = summaryChanges.applyTimelineEntries( allEntries, isInitialSync, !this._isTimelineOpen, this._user.id); - // only archive a room if we had previously joined it - if (summaryChanges.membership === "leave" && this.membership === "join") { + // if we've have left the room, remove the summary + if (summaryChanges.membership !== "join") { txn.roomSummary.remove(this.id); - summaryChanges = this._summary.writeArchivedData(summaryChanges, txn); } else { // write summary changes, and unset if nothing was actually changed summaryChanges = this._summary.writeData(summaryChanges, txn);