make archived room part of sync lifecycle (draft)

This commit is contained in:
Bruno Windels 2021-05-10 18:42:30 +02:00
parent 79d97737bc
commit 99d5467ad1
3 changed files with 194 additions and 86 deletions

View File

@ -16,6 +16,7 @@ limitations under the License.
*/ */
import {Room} from "./room/Room.js"; import {Room} from "./room/Room.js";
import {ArchivedRoom} from "./room/ArchivedRoom.js";
import {RoomStatus} from "./room/RoomStatus.js"; import {RoomStatus} from "./room/RoomStatus.js";
import {Invite} from "./room/Invite.js"; import {Invite} from "./room/Invite.js";
import {Pusher} from "./push/Pusher.js"; import {Pusher} from "./push/Pusher.js";
@ -399,10 +400,18 @@ export class Session {
} }
/** @internal */ /** @internal */
addRoomAfterSync(room) { createArchivedRoom(roomId) {
this._rooms.add(room.id, room); return new ArchivedRoom({
const statusObservable = this._observedRoomStatus.get(room.id); roomId,
statusObservable?.set(RoomStatus.joined); getSyncToken: this._getSyncToken,
storage: this._storage,
emitCollectionChange: () => {},
hsApi: this._hsApi,
mediaRepository: this._mediaRepository,
user: this._user,
createRoomEncryption: this._createRoomEncryption,
platform: this._platform
});
} }
get invites() { get invites() {
@ -421,34 +430,6 @@ export class Session {
}); });
} }
/** @internal */
addInviteAfterSync(invite) {
this._invites.add(invite.id, invite);
const statusObservable = this._observedRoomStatus.get(invite.id);
if (statusObservable) {
statusObservable.set(statusObservable.get().withInvited());
}
}
/** @internal */
removeInviteAfterSync(invite) {
this._invites.remove(invite.id);
const statusObservable = this._observedRoomStatus.get(invite.id);
if (statusObservable) {
statusObservable.set(statusObservable.get().withoutInvited());
}
}
/** @internal */
archiveRoomAfterSync(room) {
this._rooms.remove(room.id);
const statusObservable = this._observedRoomStatus.get(room.id);
statusObservable?.set(RoomStatus.archived);
if (this._archivedRooms) {
this._archivedRooms.add(room.id, room);
}
}
async obtainSyncLock(syncResponse) { async obtainSyncLock(syncResponse) {
const toDeviceEvents = syncResponse.to_device?.events; const toDeviceEvents = syncResponse.to_device?.events;
if (Array.isArray(toDeviceEvents) && toDeviceEvents.length) { if (Array.isArray(toDeviceEvents) && toDeviceEvents.length) {
@ -528,6 +509,58 @@ export class Session {
} }
} }
applyRoomCollectionChangesAfterSync(inviteStates, roomStates, archivedRoomStates) {
// update the collections after sync
if (this._archivedRooms) {
for (const ars of archivedRoomStates) {
if (ars.shouldAdd) {
this._archivedRooms.add(ars.id, ars.archivedRoom);
} else if (ars.shouldRemove) {
this._archivedRooms.remove(ars.id);
}
}
}
for (const rs of roomStates) {
if (rs.shouldAdd) {
this._rooms.add(rs.id, rs.room);
} else if (rs.shouldRemove) {
this._rooms.remove(rs.id);
}
}
for (const is of inviteStates) {
if (is.shouldAdd) {
this._invites.add(is.id, is.invite);
} else if (is.shouldRemove) {
this._invites.remove(is.id);
}
}
// now all the collections are updated, update the room status
// so any listeners to the status will find the collections
// completely up to date
if (this._observedRoomStatus.size !== 0) {
for (const ars of archivedRoomStates) {
if (ars.shouldAdd) {
this._observedRoomStatus.get(ars.id)?.set(RoomStatus.archived);
}
}
for (const rs of roomStates) {
if (rs.shouldAdd) {
this._observedRoomStatus.get(rs.id)?.set(RoomStatus.joined);
}
}
for (const is of inviteStates) {
const statusObservable = this._observedRoomStatus.get(is.id);
if (statusObservable) {
if (is.shouldAdd) {
statusObservable.set(statusObservable.get().withInvited());
} else if (is.shouldRemove) {
statusObservable.set(statusObservable.get().withoutInvited());
}
}
}
}
}
/** @internal */ /** @internal */
get syncToken() { get syncToken() {
return this._syncInfo?.token; return this._syncInfo?.token;
@ -658,10 +691,7 @@ export class Session {
]); ]);
const summary = await txn.archivedRoomSummary.get(roomId); const summary = await txn.archivedRoomSummary.get(roomId);
if (summary) { if (summary) {
// TODO: should we really be using a Room here? const room = this.createArchivedRoom(roomId);
// Or rather an ArchivedRoom that shares a common base class with Room?
// That will make the Room code harder to read though ...
const room = this.createRoom(roomId);
await room.load(summary, txn, log); await room.load(summary, txn, log);
return room; return room;
} }

View File

@ -192,7 +192,8 @@ export class Sync {
const isInitialSync = !syncToken; const isInitialSync = !syncToken;
const sessionState = new SessionSyncProcessState(); const sessionState = new SessionSyncProcessState();
const inviteStates = this._parseInvites(response.rooms); const inviteStates = this._parseInvites(response.rooms);
const roomStates = this._parseRoomsResponse(response.rooms, inviteStates, isInitialSync); const {roomStates, archivedRoomStates} = await this._parseRoomsResponse(
response.rooms, inviteStates, isInitialSync);
try { try {
// take a lock on olm sessions used in this sync so sending a message doesn't change them while syncing // take a lock on olm sessions used in this sync so sending a message doesn't change them while syncing
@ -202,12 +203,14 @@ export class Sync {
return rs.room.afterPrepareSync(rs.preparation, log); return rs.room.afterPrepareSync(rs.preparation, log);
}))); })));
await log.wrap("write", async log => this._writeSync( await log.wrap("write", async log => this._writeSync(
sessionState, inviteStates, roomStates, response, syncFilterId, isInitialSync, log)); sessionState, inviteStates, roomStates, archivedRoomStates,
response, syncFilterId, isInitialSync, log));
} finally { } finally {
sessionState.dispose(); sessionState.dispose();
} }
// sync txn comitted, emit updates and apply changes to in-memory state // sync txn comitted, emit updates and apply changes to in-memory state
log.wrap("after", log => this._afterSync(sessionState, inviteStates, roomStates, log)); log.wrap("after", log => this._afterSync(
sessionState, inviteStates, roomStates, archivedRoomStates, log));
const toDeviceEvents = response.to_device?.events; const toDeviceEvents = response.to_device?.events;
return { return {
@ -255,6 +258,8 @@ export class Sync {
await Promise.all(roomStates.map(async rs => { await Promise.all(roomStates.map(async rs => {
const newKeys = newKeysByRoom?.get(rs.room.id); const newKeys = newKeysByRoom?.get(rs.room.id);
rs.preparation = await log.wrap("room", async log => { rs.preparation = await log.wrap("room", async log => {
// if previously joined and we still have the timeline for it,
// this loads the syncWriter at the correct position to continue writing the timeline
if (rs.isNewRoom) { if (rs.isNewRoom) {
await rs.room.load(null, prepareTxn, log); await rs.room.load(null, prepareTxn, log);
} }
@ -267,7 +272,7 @@ export class Sync {
await prepareTxn.complete(); await prepareTxn.complete();
} }
async _writeSync(sessionState, inviteStates, roomStates, response, syncFilterId, isInitialSync, log) { async _writeSync(sessionState, inviteStates, roomStates, archivedRoomStates, response, syncFilterId, isInitialSync, log) {
const syncTxn = await this._openSyncTxn(); const syncTxn = await this._openSyncTxn();
try { try {
sessionState.changes = await log.wrap("session", log => this._session.writeSync( sessionState.changes = await log.wrap("session", log => this._session.writeSync(
@ -280,6 +285,13 @@ export class Sync {
rs.changes = await log.wrap("room", log => rs.room.writeSync( rs.changes = await log.wrap("room", log => rs.room.writeSync(
rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log)); rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log));
})); }));
// important to do this after roomStates,
// as we're referring to the roomState to get the summaryChanges
await Promise.all(archivedRoomStates.map(async ars => {
const summaryChanges = ars.roomState?.summaryChanges;
ars.changes = await log.wrap("archivedRoom", log => ars.archivedRoom.writeSync(
summaryChanges, ars.roomResponse, ars.membership, syncTxn, log));
}));
} catch(err) { } catch(err) {
// avoid corrupting state by only // avoid corrupting state by only
// storing the sync up till the point // storing the sync up till the point
@ -294,35 +306,18 @@ export class Sync {
await syncTxn.complete(); await syncTxn.complete();
} }
_afterSync(sessionState, inviteStates, roomStates, log) { _afterSync(sessionState, inviteStates, roomStates, archivedRoomStates, log) {
log.wrap("session", log => this._session.afterSync(sessionState.changes, log), log.level.Detail); log.wrap("session", log => this._session.afterSync(sessionState.changes, log), log.level.Detail);
// emit room related events after txn has been closed for(let ars of archivedRoomStates) {
log.wrap("archivedRoom", () => ars.archivedRoom.afterSync(ars.changes), log.level.Detail);
}
for(let rs of roomStates) { for(let rs of roomStates) {
log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail); log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail);
if (rs.isNewRoom) {
// important to add the room before removing the invite,
// so the room will be found if looking for it when the invite
// is removed
this._session.addRoomAfterSync(rs.room);
} else if (rs.membership === "leave") {
this._session.archiveRoomAfterSync(rs.room);
}
} }
// emit invite related events after txn has been closed
for(let is of inviteStates) { for(let is of inviteStates) {
log.wrap("invite", () => { log.wrap("invite", () => is.invite.afterSync(is.changes), log.level.Detail);
// important to remove before emitting change in afterSync
// so code checking session.invites.get(id) won't
// find the invite anymore on update
if (is.membership !== "invite") {
this._session.removeInviteAfterSync(is.invite);
}
is.invite.afterSync(is.changes);
}, log.level.Detail);
if (is.isNewInvite) {
this._session.addInviteAfterSync(is.invite);
}
} }
this._session.applyRoomCollectionChangesAfterSync(inviteStates, roomStates, archivedRoomStates);
} }
_openSyncTxn() { _openSyncTxn() {
@ -351,8 +346,9 @@ export class Sync {
]); ]);
} }
_parseRoomsResponse(roomsSection, inviteStates, isInitialSync) { async _parseRoomsResponse(roomsSection, inviteStates, isInitialSync) {
const roomStates = []; const roomStates = [];
const archivedRoomStates = [];
if (roomsSection) { if (roomsSection) {
const allMemberships = ["join", "leave"]; const allMemberships = ["join", "leave"];
for(const membership of allMemberships) { for(const membership of allMemberships) {
@ -364,28 +360,64 @@ export class Sync {
if (isInitialSync && timelineIsEmpty(roomResponse)) { if (isInitialSync && timelineIsEmpty(roomResponse)) {
continue; continue;
} }
let isNewRoom = false;
let room = this._session.rooms.get(roomId);
// don't create a room for a rejected invite
if (!room && membership === "join") {
room = this._session.createRoom(roomId);
isNewRoom = true;
}
const invite = this._session.invites.get(roomId); const invite = this._session.invites.get(roomId);
// if there is an existing invite, add a process state for it // if there is an existing invite, add a process state for it
// so its writeSync and afterSync will run and remove the invite // so its writeSync and afterSync will run and remove the invite
if (invite) { if (invite) {
inviteStates.push(new InviteSyncProcessState(invite, false, null, membership, null)); inviteStates.push(new InviteSyncProcessState(invite, false, null, membership));
} }
if (room) { const roomState = this._createRoomSyncState(roomId, invite, roomResponse, membership);
roomStates.push(new RoomSyncProcessState( if (roomState) {
room, isNewRoom, invite, roomResponse, membership)); roomStates.push(roomState);
}
const ars = await this._createArchivedRoomSyncState(roomId, roomState, roomResponse, membership);
if (ars) {
archivedRoomStates.push(ars);
} }
} }
} }
} }
} }
return roomStates; return {roomStates, archivedRoomStates};
}
_createRoomSyncState(roomId, invite, roomResponse, membership) {
let isNewRoom = false;
let room = this._session.rooms.get(roomId);
// create room only on new join,
// don't create a room for a rejected invite
if (!room && membership === "join") {
room = this._session.createRoom(roomId);
isNewRoom = true;
}
if (room) {
return new RoomSyncProcessState(
room, isNewRoom, invite, roomResponse, membership);
}
}
async _createArchivedRoomSyncState(roomId, roomState, roomResponse, membership) {
let archivedRoom;
if (membership === "join") {
// when joining, always create the archived room to write the removal
archivedRoom = this._session.createArchivedRoom(roomId);
} else if (membership === "leave") {
if (roomState) {
// we still have a roomState, so we just left it
// in this case, create a new archivedRoom
archivedRoom = this._session.createArchivedRoom(roomId);
} else {
// this is an update of an already left room, restore
// it from storage first, so we can increment it.
// this happens for example when our membership changes
// after leaving (e.g. being (un)banned, possibly after being kicked), etc
archivedRoom = await this._session.loadArchivedRoom(roomId);
}
}
if (archivedRoom) {
return new ArchivedRoomSyncProcessState(
archivedRoom, roomState, roomResponse, membership);
}
} }
_parseInvites(roomsSection) { _parseInvites(roomsSection) {
@ -398,8 +430,7 @@ export class Sync {
invite = this._session.createInvite(roomId); invite = this._session.createInvite(roomId);
isNewInvite = true; isNewInvite = true;
} }
const room = this._session.rooms.get(roomId); inviteStates.push(new InviteSyncProcessState(invite, isNewInvite, roomResponse, "invite"));
inviteStates.push(new InviteSyncProcessState(invite, isNewInvite, room, "invite", roomResponse));
} }
} }
return inviteStates; return inviteStates;
@ -440,15 +471,65 @@ class RoomSyncProcessState {
this.preparation = null; this.preparation = null;
this.changes = null; this.changes = null;
} }
get id() {
return this.room.id;
}
get shouldAdd() {
return this.isNewRoom;
}
get shouldRemove() {
return this.membership !== "join";
}
get summaryChanges() {
return this.changes?.summaryChanges;
}
}
class ArchivedRoomSyncProcessState {
constructor(archivedRoom, roomState, roomResponse, membership) {
this.archivedRoom = archivedRoom;
this.roomState = roomState;
this.roomResponse = roomResponse;
this.membership = membership;
this.changes = null;
}
get id() {
return this.archivedRoom.id;
}
get shouldAdd() {
return this.roomState && this.membership === "leave";
}
get shouldRemove() {
return this.membership === "join";
}
} }
class InviteSyncProcessState { class InviteSyncProcessState {
constructor(invite, isNewInvite, room, membership, roomResponse) { constructor(invite, isNewInvite, roomResponse, membership) {
this.invite = invite; this.invite = invite;
this.isNewInvite = isNewInvite; this.isNewInvite = isNewInvite;
this.room = room;
this.membership = membership; this.membership = membership;
this.roomResponse = roomResponse; this.roomResponse = roomResponse;
this.changes = null; this.changes = null;
} }
get id() {
return this.invite.id;
}
get shouldAdd() {
return this.isNewInvite;
}
get shouldRemove() {
return this.membership !== "invite";
}
} }

View File

@ -45,7 +45,7 @@ export class Room extends BaseRoom {
if (newKeys) { if (newKeys) {
log.set("newKeys", newKeys.length); log.set("newKeys", newKeys.length);
} }
let summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership, this._user.id); let summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership);
if (membership === "join" && invite) { if (membership === "join" && invite) {
summaryChanges = summaryChanges.applyInvite(invite); summaryChanges = summaryChanges.applyInvite(invite);
} }
@ -105,8 +105,6 @@ export class Room extends BaseRoom {
// so no old state sticks around // so no old state sticks around
txn.roomState.removeAllForRoom(this.id); txn.roomState.removeAllForRoom(this.id);
txn.roomMembers.removeAllForRoom(this.id); txn.roomMembers.removeAllForRoom(this.id);
// TODO: this should be done in ArchivedRoom
txn.archivedRoomSummary.remove(this.id);
} }
const {entries: newEntries, newLiveKey, memberChanges} = const {entries: newEntries, newLiveKey, memberChanges} =
await log.wrap("syncWriter", log => this._syncWriter.writeSync(roomResponse, isRejoin, txn, log), log.level.Detail); await log.wrap("syncWriter", log => this._syncWriter.writeSync(roomResponse, isRejoin, txn, log), log.level.Detail);
@ -135,10 +133,9 @@ export class Room extends BaseRoom {
summaryChanges = summaryChanges.applyTimelineEntries( summaryChanges = summaryChanges.applyTimelineEntries(
allEntries, isInitialSync, !this._isTimelineOpen, this._user.id); allEntries, isInitialSync, !this._isTimelineOpen, this._user.id);
// only archive a room if we had previously joined it // if we've have left the room, remove the summary
if (summaryChanges.membership === "leave" && this.membership === "join") { if (summaryChanges.membership !== "join") {
txn.roomSummary.remove(this.id); txn.roomSummary.remove(this.id);
summaryChanges = this._summary.writeArchivedData(summaryChanges, txn);
} else { } else {
// write summary changes, and unset if nothing was actually changed // write summary changes, and unset if nothing was actually changed
summaryChanges = this._summary.writeData(summaryChanges, txn); summaryChanges = this._summary.writeData(summaryChanges, txn);