From 93f200673f2008bf3a611638998adb276968fe78 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 28 Aug 2020 16:31:14 +0200 Subject: [PATCH] allow members to be persisted as part of a larger txn when fetched --- src/matrix/room/Room.js | 62 ++++++++++++++++++--- src/matrix/room/members/load.js | 98 +++++++++++++++++---------------- 2 files changed, 105 insertions(+), 55 deletions(-) diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 27031203..3b3dc51e 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -22,7 +22,7 @@ import {Timeline} from "./timeline/Timeline.js"; import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js"; import {SendQueue} from "./sending/SendQueue.js"; import {WrappedError} from "../error.js" -import {fetchOrLoadMembers} from "./members/load.js"; +import {loadMembers, fetchMemberSnapshot} from "./members/load.js"; import {MemberList} from "./members/MemberList.js"; import {Heroes} from "./members/Heroes.js"; @@ -141,20 +141,60 @@ export class Room extends EventEmitter { return this._sendQueue.enqueueEvent(eventType, content); } + /** + * @package + * @return {MemberSnapshot} the member snapshot, be sure to call dispose + */ + async fetchMemberSnapshot() { + if (!this.hasFetchedMembers) { + if (this._changedMembersDuringSync) { + throw new Error("already snapshot being written"); + } + return await fetchMemberSnapshot({ + summary: this._summary, + roomId: this._roomId, + hsApi: this._hsApi, + // to handle race between /members and /sync + setChangedMembersMap: map => this._changedMembersDuringSync = map, + }); + } + } + + async _fetchAndWriteMembers() { + const snapshot = this.fetchMemberSnapshot(); + try { + const txn = await this._storage.readWriteTxn([ + this._storage.storeNames.roomSummary, + this._storage.storeNames.roomMembers, + ]); + let changes; + try { + changes = snapshot.write(txn); + } catch (err) { + txn.abort(); + throw err; + } + await txn.complete(); + snapshot.applyWrite(changes); + return snapshot.members; + } finally { + snapshot.dispose(); + } + } + /** @public */ async loadMemberList() { if (this._memberList) { this._memberList.retain(); return this._memberList; } else { - const members = await fetchOrLoadMembers({ - summary: this._summary, - roomId: this._roomId, - hsApi: this._hsApi, - storage: this._storage, - // to handle race between /members and /sync - setChangedMembersMap: map => this._changedMembersDuringSync = map, - }); + let members; + if (this.hasFetchedMembers) { + const txn = await this._storage.readTxn([this._storage.storeNames.roomMembers]); + members = await loadMembers(this._roomId, txn); + } else { + members = await this._fetchAndWriteMembers(); + } this._memberList = new MemberList({ members, closeCallback: () => { this._memberList = null; } @@ -256,6 +296,10 @@ export class Room extends EventEmitter { return !!(tags && tags['m.lowpriority']); } + get hasFetchedMembers() { + return this._summary.hasFetchedMembers; + } + async _getLastEventId() { const lastKey = this._syncWriter.lastMessageKey; if (lastKey) { diff --git a/src/matrix/room/members/load.js b/src/matrix/room/members/load.js index 54d7c3dc..9797cd0b 100644 --- a/src/matrix/room/members/load.js +++ b/src/matrix/room/members/load.js @@ -17,74 +17,80 @@ limitations under the License. import {RoomMember} from "./RoomMember.js"; -async function loadMembers({roomId, storage}) { - const txn = await storage.readTxn([ - storage.storeNames.roomMembers, - ]); +export async function loadMembers(roomId, txn) { const memberDatas = await txn.roomMembers.getAll(roomId); return memberDatas.map(d => new RoomMember(d)); } -async function fetchMembers({summary, roomId, hsApi, storage, setChangedMembersMap}) { +export async function fetchMemberSnapshot({summary, roomId, hsApi, setChangedMembersMap}) { // if any members are changed by sync while we're fetching members, // they will end up here, so we check not to override them const changedMembersDuringSync = new Map(); setChangedMembersMap(changedMembersDuringSync); - const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response; + const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response(); + if (!Array.isArray(memberResponse?.chunk)) { + throw new Error("malformed"); + } + return new MemberSnapshot({memberEvents: memberResponse.chunk, + setChangedMembersMap, changedMembersDuringSync, summary, roomId}); +} - const txn = await storage.readWriteTxn([ - storage.storeNames.roomSummary, - storage.storeNames.roomMembers, - ]); +/** Container for fetching /members while handling race with /sync. Can be persisted as part of a wider transaction */ +class MemberSnapshot { + constructor({memberEvents, setChangedMembersMap, changedMembersDuringSync, summary, roomId}) { + this._memberEvents = memberEvents; + this._setChangedMembersMap = setChangedMembersMap; + this._changedMembersDuringSync = changedMembersDuringSync; + this._summary = summary; + this._roomId = roomId; + this._members = null; + } - let summaryChanges; - let members; - - try { - summaryChanges = summary.writeHasFetchedMembers(true, txn); - const {roomMembers} = txn; - const memberEvents = memberResponse.chunk; - if (!Array.isArray(memberEvents)) { - throw new Error("malformed"); - } - members = await Promise.all(memberEvents.map(async memberEvent => { + write(txn) { + let summaryChanges; + // this needs to happen after the txn is opened to prevent a race + // between awaiting the opening of the txn and the sync + this._members = this._memberEvents.map(memberEvent => { const userId = memberEvent?.state_key; if (!userId) { throw new Error("malformed"); } // this member was changed during a sync that happened while calling /members // and thus is more recent, so don't overwrite - const changedMember = changedMembersDuringSync.get(userId); + const changedMember = this._changedMembersDuringSync.get(userId); if (changedMember) { return changedMember; } else { - const member = RoomMember.fromMemberEvent(roomId, memberEvent); - if (member) { - roomMembers.set(member.serialize()); - } - return member; + return RoomMember.fromMemberEvent(this._roomId, memberEvent); } - })); - } catch (err) { - // abort txn on any error - txn.abort(); - throw err; - } finally { + }); + // store members + const {roomMembers} = txn; + for (const member of this._members) { + if (member) { + roomMembers.set(member.serialize()); + } + } + // store flag + summaryChanges = this._summary.writeHasFetchedMembers(true, txn); + return summaryChanges; + } + + applyWrite(summaryChanges) { + this._summary.applyChanges(summaryChanges); + } + + get members() { + if (!this._members) { + throw new Error("call write first"); + } + return this._members; + } + + dispose() { // important this gets cleared // or otherwise Room remains in "fetching-members" mode - setChangedMembersMap(null); - } - await txn.complete(); - summary.applyChanges(summaryChanges); - return members; -} - -export async function fetchOrLoadMembers(options) { - const {summary} = options; - if (!summary.hasFetchedMembers) { - return fetchMembers(options); - } else { - return loadMembers(options); + this._setChangedMembersMap(null); } }