mirror of
https://github.com/vector-im/hydrogen-web.git
synced 2024-12-23 03:25:12 +01:00
allow members to be persisted as part of a larger txn when fetched
This commit is contained in:
parent
2b6530b459
commit
93f200673f
@ -22,7 +22,7 @@ import {Timeline} from "./timeline/Timeline.js";
|
|||||||
import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js";
|
import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js";
|
||||||
import {SendQueue} from "./sending/SendQueue.js";
|
import {SendQueue} from "./sending/SendQueue.js";
|
||||||
import {WrappedError} from "../error.js"
|
import {WrappedError} from "../error.js"
|
||||||
import {fetchOrLoadMembers} from "./members/load.js";
|
import {loadMembers, fetchMemberSnapshot} from "./members/load.js";
|
||||||
import {MemberList} from "./members/MemberList.js";
|
import {MemberList} from "./members/MemberList.js";
|
||||||
import {Heroes} from "./members/Heroes.js";
|
import {Heroes} from "./members/Heroes.js";
|
||||||
|
|
||||||
@ -141,20 +141,60 @@ export class Room extends EventEmitter {
|
|||||||
return this._sendQueue.enqueueEvent(eventType, content);
|
return this._sendQueue.enqueueEvent(eventType, content);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @package
|
||||||
|
* @return {MemberSnapshot} the member snapshot, be sure to call dispose
|
||||||
|
*/
|
||||||
|
async fetchMemberSnapshot() {
|
||||||
|
if (!this.hasFetchedMembers) {
|
||||||
|
if (this._changedMembersDuringSync) {
|
||||||
|
throw new Error("already snapshot being written");
|
||||||
|
}
|
||||||
|
return await fetchMemberSnapshot({
|
||||||
|
summary: this._summary,
|
||||||
|
roomId: this._roomId,
|
||||||
|
hsApi: this._hsApi,
|
||||||
|
// to handle race between /members and /sync
|
||||||
|
setChangedMembersMap: map => this._changedMembersDuringSync = map,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async _fetchAndWriteMembers() {
|
||||||
|
const snapshot = this.fetchMemberSnapshot();
|
||||||
|
try {
|
||||||
|
const txn = await this._storage.readWriteTxn([
|
||||||
|
this._storage.storeNames.roomSummary,
|
||||||
|
this._storage.storeNames.roomMembers,
|
||||||
|
]);
|
||||||
|
let changes;
|
||||||
|
try {
|
||||||
|
changes = snapshot.write(txn);
|
||||||
|
} catch (err) {
|
||||||
|
txn.abort();
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
await txn.complete();
|
||||||
|
snapshot.applyWrite(changes);
|
||||||
|
return snapshot.members;
|
||||||
|
} finally {
|
||||||
|
snapshot.dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** @public */
|
/** @public */
|
||||||
async loadMemberList() {
|
async loadMemberList() {
|
||||||
if (this._memberList) {
|
if (this._memberList) {
|
||||||
this._memberList.retain();
|
this._memberList.retain();
|
||||||
return this._memberList;
|
return this._memberList;
|
||||||
} else {
|
} else {
|
||||||
const members = await fetchOrLoadMembers({
|
let members;
|
||||||
summary: this._summary,
|
if (this.hasFetchedMembers) {
|
||||||
roomId: this._roomId,
|
const txn = await this._storage.readTxn([this._storage.storeNames.roomMembers]);
|
||||||
hsApi: this._hsApi,
|
members = await loadMembers(this._roomId, txn);
|
||||||
storage: this._storage,
|
} else {
|
||||||
// to handle race between /members and /sync
|
members = await this._fetchAndWriteMembers();
|
||||||
setChangedMembersMap: map => this._changedMembersDuringSync = map,
|
}
|
||||||
});
|
|
||||||
this._memberList = new MemberList({
|
this._memberList = new MemberList({
|
||||||
members,
|
members,
|
||||||
closeCallback: () => { this._memberList = null; }
|
closeCallback: () => { this._memberList = null; }
|
||||||
@ -256,6 +296,10 @@ export class Room extends EventEmitter {
|
|||||||
return !!(tags && tags['m.lowpriority']);
|
return !!(tags && tags['m.lowpriority']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get hasFetchedMembers() {
|
||||||
|
return this._summary.hasFetchedMembers;
|
||||||
|
}
|
||||||
|
|
||||||
async _getLastEventId() {
|
async _getLastEventId() {
|
||||||
const lastKey = this._syncWriter.lastMessageKey;
|
const lastKey = this._syncWriter.lastMessageKey;
|
||||||
if (lastKey) {
|
if (lastKey) {
|
||||||
|
@ -17,74 +17,80 @@ limitations under the License.
|
|||||||
|
|
||||||
import {RoomMember} from "./RoomMember.js";
|
import {RoomMember} from "./RoomMember.js";
|
||||||
|
|
||||||
async function loadMembers({roomId, storage}) {
|
export async function loadMembers(roomId, txn) {
|
||||||
const txn = await storage.readTxn([
|
|
||||||
storage.storeNames.roomMembers,
|
|
||||||
]);
|
|
||||||
const memberDatas = await txn.roomMembers.getAll(roomId);
|
const memberDatas = await txn.roomMembers.getAll(roomId);
|
||||||
return memberDatas.map(d => new RoomMember(d));
|
return memberDatas.map(d => new RoomMember(d));
|
||||||
}
|
}
|
||||||
|
|
||||||
async function fetchMembers({summary, roomId, hsApi, storage, setChangedMembersMap}) {
|
export async function fetchMemberSnapshot({summary, roomId, hsApi, setChangedMembersMap}) {
|
||||||
// if any members are changed by sync while we're fetching members,
|
// if any members are changed by sync while we're fetching members,
|
||||||
// they will end up here, so we check not to override them
|
// they will end up here, so we check not to override them
|
||||||
const changedMembersDuringSync = new Map();
|
const changedMembersDuringSync = new Map();
|
||||||
setChangedMembersMap(changedMembersDuringSync);
|
setChangedMembersMap(changedMembersDuringSync);
|
||||||
|
|
||||||
const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response;
|
const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response();
|
||||||
|
if (!Array.isArray(memberResponse?.chunk)) {
|
||||||
const txn = await storage.readWriteTxn([
|
|
||||||
storage.storeNames.roomSummary,
|
|
||||||
storage.storeNames.roomMembers,
|
|
||||||
]);
|
|
||||||
|
|
||||||
let summaryChanges;
|
|
||||||
let members;
|
|
||||||
|
|
||||||
try {
|
|
||||||
summaryChanges = summary.writeHasFetchedMembers(true, txn);
|
|
||||||
const {roomMembers} = txn;
|
|
||||||
const memberEvents = memberResponse.chunk;
|
|
||||||
if (!Array.isArray(memberEvents)) {
|
|
||||||
throw new Error("malformed");
|
throw new Error("malformed");
|
||||||
}
|
}
|
||||||
members = await Promise.all(memberEvents.map(async memberEvent => {
|
return new MemberSnapshot({memberEvents: memberResponse.chunk,
|
||||||
|
setChangedMembersMap, changedMembersDuringSync, summary, roomId});
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Container for fetching /members while handling race with /sync. Can be persisted as part of a wider transaction */
|
||||||
|
class MemberSnapshot {
|
||||||
|
constructor({memberEvents, setChangedMembersMap, changedMembersDuringSync, summary, roomId}) {
|
||||||
|
this._memberEvents = memberEvents;
|
||||||
|
this._setChangedMembersMap = setChangedMembersMap;
|
||||||
|
this._changedMembersDuringSync = changedMembersDuringSync;
|
||||||
|
this._summary = summary;
|
||||||
|
this._roomId = roomId;
|
||||||
|
this._members = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
write(txn) {
|
||||||
|
let summaryChanges;
|
||||||
|
// this needs to happen after the txn is opened to prevent a race
|
||||||
|
// between awaiting the opening of the txn and the sync
|
||||||
|
this._members = this._memberEvents.map(memberEvent => {
|
||||||
const userId = memberEvent?.state_key;
|
const userId = memberEvent?.state_key;
|
||||||
if (!userId) {
|
if (!userId) {
|
||||||
throw new Error("malformed");
|
throw new Error("malformed");
|
||||||
}
|
}
|
||||||
// this member was changed during a sync that happened while calling /members
|
// this member was changed during a sync that happened while calling /members
|
||||||
// and thus is more recent, so don't overwrite
|
// and thus is more recent, so don't overwrite
|
||||||
const changedMember = changedMembersDuringSync.get(userId);
|
const changedMember = this._changedMembersDuringSync.get(userId);
|
||||||
if (changedMember) {
|
if (changedMember) {
|
||||||
return changedMember;
|
return changedMember;
|
||||||
} else {
|
} else {
|
||||||
const member = RoomMember.fromMemberEvent(roomId, memberEvent);
|
return RoomMember.fromMemberEvent(this._roomId, memberEvent);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// store members
|
||||||
|
const {roomMembers} = txn;
|
||||||
|
for (const member of this._members) {
|
||||||
if (member) {
|
if (member) {
|
||||||
roomMembers.set(member.serialize());
|
roomMembers.set(member.serialize());
|
||||||
}
|
}
|
||||||
return member;
|
|
||||||
}
|
}
|
||||||
}));
|
// store flag
|
||||||
} catch (err) {
|
summaryChanges = this._summary.writeHasFetchedMembers(true, txn);
|
||||||
// abort txn on any error
|
return summaryChanges;
|
||||||
txn.abort();
|
|
||||||
throw err;
|
|
||||||
} finally {
|
|
||||||
// important this gets cleared
|
|
||||||
// or otherwise Room remains in "fetching-members" mode
|
|
||||||
setChangedMembersMap(null);
|
|
||||||
}
|
|
||||||
await txn.complete();
|
|
||||||
summary.applyChanges(summaryChanges);
|
|
||||||
return members;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function fetchOrLoadMembers(options) {
|
applyWrite(summaryChanges) {
|
||||||
const {summary} = options;
|
this._summary.applyChanges(summaryChanges);
|
||||||
if (!summary.hasFetchedMembers) {
|
}
|
||||||
return fetchMembers(options);
|
|
||||||
} else {
|
get members() {
|
||||||
return loadMembers(options);
|
if (!this._members) {
|
||||||
|
throw new Error("call write first");
|
||||||
|
}
|
||||||
|
return this._members;
|
||||||
|
}
|
||||||
|
|
||||||
|
dispose() {
|
||||||
|
// important this gets cleared
|
||||||
|
// or otherwise Room remains in "fetching-members" mode
|
||||||
|
this._setChangedMembersMap(null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user