mirror of
https://github.com/vector-im/hydrogen-web.git
synced 2025-01-22 10:11:39 +01:00
Merge pull request #87 from vector-im/bwindels/megolm-member-changes
Rotate and share room key on membership changes
This commit is contained in:
commit
1cd93df81c
@ -125,6 +125,7 @@ export class Session {
|
||||
olmEncryption: this._olmEncryption,
|
||||
megolmEncryption: this._megolmEncryption,
|
||||
megolmDecryption: this._megolmDecryption,
|
||||
storage: this._storage,
|
||||
encryptionParams
|
||||
});
|
||||
}
|
||||
@ -216,7 +217,7 @@ export class Session {
|
||||
|
||||
this._sendScheduler.start();
|
||||
for (const [, room] of this._rooms) {
|
||||
room.resumeSending();
|
||||
room.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,12 +87,16 @@ export class Sync {
|
||||
}
|
||||
|
||||
async _syncLoop(syncToken) {
|
||||
let afterSyncCompletedPromise = Promise.resolve();
|
||||
// if syncToken is falsy, it will first do an initial sync ...
|
||||
while(this._status.get() !== SyncStatus.Stopped) {
|
||||
let roomChanges;
|
||||
try {
|
||||
console.log(`starting sync request with since ${syncToken} ...`);
|
||||
const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined;
|
||||
syncToken = await this._syncRequest(syncToken, timeout);
|
||||
const syncResult = await this._syncRequest(syncToken, timeout, afterSyncCompletedPromise);
|
||||
syncToken = syncResult.syncToken;
|
||||
roomChanges = syncResult.roomChanges;
|
||||
this._status.set(SyncStatus.Syncing);
|
||||
} catch (err) {
|
||||
if (!(err instanceof AbortError)) {
|
||||
@ -101,18 +105,39 @@ export class Sync {
|
||||
}
|
||||
}
|
||||
if (!this._error) {
|
||||
try {
|
||||
// TODO: run this in parallel with the next sync request
|
||||
await this._session.afterSyncCompleted();
|
||||
} catch (err) {
|
||||
console.error("error during after sync completed, continuing to sync.", err.stack);
|
||||
// swallowing error here apart from logging
|
||||
}
|
||||
afterSyncCompletedPromise = this._runAfterSyncCompleted(roomChanges);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async _syncRequest(syncToken, timeout) {
|
||||
async _runAfterSyncCompleted(roomChanges) {
|
||||
const sessionPromise = (async () => {
|
||||
try {
|
||||
await this._session.afterSyncCompleted();
|
||||
} catch (err) {
|
||||
console.error("error during session afterSyncCompleted, continuing", err.stack);
|
||||
}
|
||||
})();
|
||||
let allPromises = [sessionPromise];
|
||||
|
||||
const roomsNeedingAfterSyncCompleted = roomChanges.filter(rc => {
|
||||
return rc.changes.needsAfterSyncCompleted;
|
||||
});
|
||||
if (roomsNeedingAfterSyncCompleted.length) {
|
||||
allPromises = allPromises.concat(roomsNeedingAfterSyncCompleted.map(async ({room, changes}) => {
|
||||
try {
|
||||
await room.afterSyncCompleted(changes);
|
||||
} catch (err) {
|
||||
console.error(`error during room ${room.id} afterSyncCompleted, continuing`, err.stack);
|
||||
}
|
||||
}));
|
||||
}
|
||||
// run everything in parallel,
|
||||
// we don't want to delay the next sync too much
|
||||
await Promise.all(allPromises);
|
||||
}
|
||||
|
||||
async _syncRequest(syncToken, timeout, prevAfterSyncCompletedPromise) {
|
||||
let {syncFilterId} = this._session;
|
||||
if (typeof syncFilterId !== "string") {
|
||||
this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}});
|
||||
@ -121,43 +146,20 @@ export class Sync {
|
||||
const totalRequestTimeout = timeout + (80 * 1000); // same as riot-web, don't get stuck on wedged long requests
|
||||
this._currentRequest = this._hsApi.sync(syncToken, syncFilterId, timeout, {timeout: totalRequestTimeout});
|
||||
const response = await this._currentRequest.response();
|
||||
// wait here for the afterSyncCompleted step of the previous sync to complete
|
||||
// before we continue processing this sync response
|
||||
await prevAfterSyncCompletedPromise;
|
||||
|
||||
const isInitialSync = !syncToken;
|
||||
syncToken = response.next_batch;
|
||||
const storeNames = this._storage.storeNames;
|
||||
const syncTxn = await this._storage.readWriteTxn([
|
||||
storeNames.session,
|
||||
storeNames.roomSummary,
|
||||
storeNames.roomState,
|
||||
storeNames.roomMembers,
|
||||
storeNames.timelineEvents,
|
||||
storeNames.timelineFragments,
|
||||
storeNames.pendingEvents,
|
||||
storeNames.userIdentities,
|
||||
storeNames.inboundGroupSessions,
|
||||
storeNames.groupSessionDecryptions,
|
||||
storeNames.deviceIdentities,
|
||||
]);
|
||||
const roomChanges = [];
|
||||
const syncTxn = await this._openSyncTxn();
|
||||
let roomChanges = [];
|
||||
let sessionChanges;
|
||||
try {
|
||||
// to_device
|
||||
// presence
|
||||
if (response.rooms) {
|
||||
const promises = parseRooms(response.rooms, async (roomId, roomResponse, membership) => {
|
||||
// ignore rooms with empty timelines during initial sync,
|
||||
// see https://github.com/vector-im/hydrogen-web/issues/15
|
||||
if (isInitialSync && timelineIsEmpty(roomResponse)) {
|
||||
return;
|
||||
}
|
||||
let room = this._session.rooms.get(roomId);
|
||||
if (!room) {
|
||||
room = this._session.createRoom(roomId);
|
||||
}
|
||||
console.log(` * applying sync response to room ${roomId} ...`);
|
||||
const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn);
|
||||
roomChanges.push({room, changes});
|
||||
});
|
||||
await Promise.all(promises);
|
||||
roomChanges = await this._writeRoomResponses(response.rooms, isInitialSync, syncTxn);
|
||||
}
|
||||
sessionChanges = await this._session.writeSync(response, syncFilterId, roomChanges, syncTxn);
|
||||
} catch(err) {
|
||||
@ -182,7 +184,46 @@ export class Sync {
|
||||
room.afterSync(changes);
|
||||
}
|
||||
|
||||
return syncToken;
|
||||
return {syncToken, roomChanges};
|
||||
}
|
||||
|
||||
async _writeRoomResponses(roomResponses, isInitialSync, syncTxn) {
|
||||
const roomChanges = [];
|
||||
const promises = parseRooms(roomResponses, async (roomId, roomResponse, membership) => {
|
||||
// ignore rooms with empty timelines during initial sync,
|
||||
// see https://github.com/vector-im/hydrogen-web/issues/15
|
||||
if (isInitialSync && timelineIsEmpty(roomResponse)) {
|
||||
return;
|
||||
}
|
||||
let room = this._session.rooms.get(roomId);
|
||||
if (!room) {
|
||||
room = this._session.createRoom(roomId);
|
||||
}
|
||||
console.log(` * applying sync response to room ${roomId} ...`);
|
||||
const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn);
|
||||
roomChanges.push({room, changes});
|
||||
});
|
||||
await Promise.all(promises);
|
||||
return roomChanges;
|
||||
}
|
||||
|
||||
async _openSyncTxn() {
|
||||
const storeNames = this._storage.storeNames;
|
||||
return await this._storage.readWriteTxn([
|
||||
storeNames.session,
|
||||
storeNames.roomSummary,
|
||||
storeNames.roomState,
|
||||
storeNames.roomMembers,
|
||||
storeNames.timelineEvents,
|
||||
storeNames.timelineFragments,
|
||||
storeNames.pendingEvents,
|
||||
storeNames.userIdentities,
|
||||
storeNames.inboundGroupSessions,
|
||||
storeNames.groupSessionDecryptions,
|
||||
storeNames.deviceIdentities,
|
||||
// to discard outbound session when somebody leaves a room
|
||||
storeNames.outboundGroupSessions
|
||||
]);
|
||||
}
|
||||
|
||||
stop() {
|
||||
|
@ -65,7 +65,7 @@ export class DeviceTracker {
|
||||
}
|
||||
|
||||
async trackRoom(room) {
|
||||
if (room.isTrackingMembers) {
|
||||
if (room.isTrackingMembers || !room.isEncrypted) {
|
||||
return;
|
||||
}
|
||||
const memberList = await room.loadMemberList();
|
||||
@ -230,8 +230,7 @@ export class DeviceTracker {
|
||||
* @param {String} roomId [description]
|
||||
* @return {[type]} [description]
|
||||
*/
|
||||
async deviceIdentitiesForTrackedRoom(roomId, hsApi) {
|
||||
let identities;
|
||||
async devicesForTrackedRoom(roomId, hsApi) {
|
||||
const txn = await this._storage.readTxn([
|
||||
this._storage.storeNames.roomMembers,
|
||||
this._storage.storeNames.userIdentities,
|
||||
@ -243,8 +242,27 @@ export class DeviceTracker {
|
||||
|
||||
// So, this will also contain non-joined memberships
|
||||
const userIds = await txn.roomMembers.getAllUserIds(roomId);
|
||||
const allMemberIdentities = await Promise.all(userIds.map(userId => txn.userIdentities.get(userId)));
|
||||
identities = allMemberIdentities.filter(identity => {
|
||||
|
||||
return await this._devicesForUserIds(roomId, userIds, txn, hsApi);
|
||||
}
|
||||
|
||||
async devicesForRoomMembers(roomId, userIds, hsApi) {
|
||||
const txn = await this._storage.readTxn([
|
||||
this._storage.storeNames.userIdentities,
|
||||
]);
|
||||
return await this._devicesForUserIds(roomId, userIds, txn, hsApi);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} roomId [description]
|
||||
* @param {Array<string>} userIds a set of user ids to try and find the identity for. Will be check to belong to roomId.
|
||||
* @param {Transaction} userIdentityTxn to read the user identities
|
||||
* @param {HomeServerApi} hsApi
|
||||
* @return {Array<DeviceIdentity>}
|
||||
*/
|
||||
async _devicesForUserIds(roomId, userIds, userIdentityTxn, hsApi) {
|
||||
const allMemberIdentities = await Promise.all(userIds.map(userId => userIdentityTxn.userIdentities.get(userId)));
|
||||
const identities = allMemberIdentities.filter(identity => {
|
||||
// identity will be missing for any userIds that don't have
|
||||
// membership join in any of your encrypted rooms
|
||||
return identity && identity.roomIds.includes(roomId);
|
||||
|
@ -21,7 +21,7 @@ import {makeTxnId} from "../common.js";
|
||||
const ENCRYPTED_TYPE = "m.room.encrypted";
|
||||
|
||||
export class RoomEncryption {
|
||||
constructor({room, deviceTracker, olmEncryption, megolmEncryption, megolmDecryption, encryptionParams}) {
|
||||
constructor({room, deviceTracker, olmEncryption, megolmEncryption, megolmDecryption, encryptionParams, storage}) {
|
||||
this._room = room;
|
||||
this._deviceTracker = deviceTracker;
|
||||
this._olmEncryption = olmEncryption;
|
||||
@ -35,6 +35,7 @@ export class RoomEncryption {
|
||||
// not `event_id`, but an internal event id passed in to the decrypt methods
|
||||
this._eventIdsByMissingSession = new Map();
|
||||
this._senderDeviceCache = new Map();
|
||||
this._storage = storage;
|
||||
}
|
||||
|
||||
notifyTimelineClosed() {
|
||||
@ -45,6 +46,12 @@ export class RoomEncryption {
|
||||
}
|
||||
|
||||
async writeMemberChanges(memberChanges, txn) {
|
||||
for (const m of memberChanges.values()) {
|
||||
if (m.hasLeft) {
|
||||
this._megolmEncryption.discardOutboundSession(this._room.id, txn);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn);
|
||||
}
|
||||
|
||||
@ -108,10 +115,12 @@ export class RoomEncryption {
|
||||
// share the new megolm session if needed
|
||||
if (megolmResult.roomKeyMessage) {
|
||||
await this._deviceTracker.trackRoom(this._room);
|
||||
const devices = await this._deviceTracker.deviceIdentitiesForTrackedRoom(this._room.id, hsApi);
|
||||
const messages = await this._olmEncryption.encrypt(
|
||||
"m.room_key", megolmResult.roomKeyMessage, devices, hsApi);
|
||||
await this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi);
|
||||
const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi);
|
||||
await this._sendRoomKey(megolmResult.roomKeyMessage, devices, hsApi);
|
||||
// if we happen to rotate the session before we have sent newly joined members the room key
|
||||
// then mark those members as not needing the key anymore
|
||||
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
|
||||
await this._clearNeedsRoomKeyFlag(userIds);
|
||||
}
|
||||
return {
|
||||
type: ENCRYPTED_TYPE,
|
||||
@ -119,6 +128,74 @@ export class RoomEncryption {
|
||||
};
|
||||
}
|
||||
|
||||
needsToShareKeys(memberChanges) {
|
||||
for (const m of memberChanges.values()) {
|
||||
if (m.member.needsRoomKey) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
async shareRoomKeyToPendingMembers(hsApi) {
|
||||
// sucks to call this for all encrypted rooms on startup?
|
||||
const txn = await this._storage.readTxn([this._storage.storeNames.roomMembers]);
|
||||
const pendingUserIds = await txn.roomMembers.getUserIdsNeedingRoomKey(this._room.id);
|
||||
return await this._shareRoomKey(pendingUserIds, hsApi);
|
||||
}
|
||||
|
||||
async shareRoomKeyForMemberChanges(memberChanges, hsApi) {
|
||||
const pendingUserIds = [];
|
||||
for (const m of memberChanges.values()) {
|
||||
if (m.member.needsRoomKey) {
|
||||
pendingUserIds.push(m.userId);
|
||||
}
|
||||
}
|
||||
return await this._shareRoomKey(pendingUserIds, hsApi);
|
||||
}
|
||||
|
||||
async _shareRoomKey(userIds, hsApi) {
|
||||
if (userIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
const readRoomKeyTxn = await this._storage.readTxn([this._storage.storeNames.outboundGroupSessions]);
|
||||
const roomKeyMessage = await this._megolmEncryption.createRoomKeyMessage(this._room.id, readRoomKeyTxn);
|
||||
// no room key if we haven't created a session yet
|
||||
// (or we removed it and will create a new one on the next send)
|
||||
if (roomKeyMessage) {
|
||||
const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, userIds, hsApi);
|
||||
await this._sendRoomKey(roomKeyMessage, devices, hsApi);
|
||||
const actuallySentUserIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
|
||||
await this._clearNeedsRoomKeyFlag(actuallySentUserIds);
|
||||
} else {
|
||||
// we don't have a session yet, clear them all
|
||||
await this._clearNeedsRoomKeyFlag(userIds);
|
||||
}
|
||||
}
|
||||
|
||||
async _clearNeedsRoomKeyFlag(userIds) {
|
||||
const txn = await this._storage.readWriteTxn([this._storage.storeNames.roomMembers]);
|
||||
try {
|
||||
await Promise.all(userIds.map(async userId => {
|
||||
const memberData = await txn.roomMembers.get(this._room.id, userId);
|
||||
if (memberData.needsRoomKey) {
|
||||
memberData.needsRoomKey = false;
|
||||
txn.roomMembers.set(memberData);
|
||||
}
|
||||
}));
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
}
|
||||
|
||||
async _sendRoomKey(roomKeyMessage, devices, hsApi) {
|
||||
const messages = await this._olmEncryption.encrypt(
|
||||
"m.room_key", roomKeyMessage, devices, hsApi);
|
||||
await this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi);
|
||||
}
|
||||
|
||||
async _sendMessagesToDevices(type, messages, hsApi) {
|
||||
const messagesByUser = groupBy(messages, message => message.device.userId);
|
||||
const payload = {
|
||||
|
@ -26,6 +26,23 @@ export class Encryption {
|
||||
this._ownDeviceId = ownDeviceId;
|
||||
}
|
||||
|
||||
discardOutboundSession(roomId, txn) {
|
||||
txn.outboundGroupSessions.remove(roomId);
|
||||
}
|
||||
|
||||
async createRoomKeyMessage(roomId, txn) {
|
||||
let sessionEntry = await txn.outboundGroupSessions.get(roomId);
|
||||
if (sessionEntry) {
|
||||
const session = new this._olm.OutboundGroupSession();
|
||||
try {
|
||||
session.unpickle(this._pickleKey, sessionEntry.session);
|
||||
return this._createRoomKeyMessage(session, roomId);
|
||||
} finally {
|
||||
session.free();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Encrypts a message with megolm
|
||||
* @param {string} roomId
|
||||
@ -123,12 +140,9 @@ export class Encryption {
|
||||
session_id: session.session_id(),
|
||||
session_key: session.session_key(),
|
||||
algorithm: MEGOLM_ALGORITHM,
|
||||
// if we need to do this, do we need to create
|
||||
// the room key message after or before having encrypted
|
||||
// with the new session? I guess before as we do now
|
||||
// because the chain_index is where you should start decrypting?
|
||||
//
|
||||
// chain_index: session.message_index()
|
||||
// chain_index is ignored by element-web if not all clients
|
||||
// but let's send it anyway, as element-web does so
|
||||
chain_index: session.message_index()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,7 +127,7 @@ export class Room extends EventEmitter {
|
||||
isInitialSync, isTimelineOpen,
|
||||
txn);
|
||||
const {entries: encryptedEntries, newLiveKey, memberChanges} =
|
||||
await this._syncWriter.writeSync(roomResponse, txn);
|
||||
await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn);
|
||||
// decrypt if applicable
|
||||
let entries = encryptedEntries;
|
||||
if (this._roomEncryption) {
|
||||
@ -157,11 +157,16 @@ export class Room extends EventEmitter {
|
||||
newLiveKey,
|
||||
removedPendingEvents,
|
||||
memberChanges,
|
||||
heroChanges
|
||||
heroChanges,
|
||||
needsAfterSyncCompleted: this._roomEncryption?.needsToShareKeys(memberChanges)
|
||||
};
|
||||
}
|
||||
|
||||
/** @package */
|
||||
/**
|
||||
* @package
|
||||
* Called with the changes returned from `writeSync` to apply them and emit changes.
|
||||
* No storage or network operations should be done here.
|
||||
*/
|
||||
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges}) {
|
||||
this._syncWriter.afterSync(newLiveKey);
|
||||
if (!this._summary.encryption && summaryChanges.encryption && !this._roomEncryption) {
|
||||
@ -204,8 +209,28 @@ export class Room extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Only called if the result of writeSync had `needsAfterSyncCompleted` set.
|
||||
* Can be used to do longer running operations that resulted from the last sync,
|
||||
* like network operations.
|
||||
*/
|
||||
async afterSyncCompleted({memberChanges}) {
|
||||
if (this._roomEncryption) {
|
||||
await this._roomEncryption.shareRoomKeyForMemberChanges(memberChanges, this._hsApi);
|
||||
}
|
||||
}
|
||||
|
||||
/** @package */
|
||||
resumeSending() {
|
||||
async start() {
|
||||
if (this._roomEncryption) {
|
||||
try {
|
||||
// if we got interrupted last time sending keys to newly joined members
|
||||
await this._roomEncryption.shareRoomKeyToPendingMembers(this._hsApi);
|
||||
} catch (err) {
|
||||
// we should not throw here
|
||||
console.error(`could not send out pending room keys for room ${this.id}`, err.stack);
|
||||
}
|
||||
}
|
||||
this._sendQueue.resumeSending();
|
||||
}
|
||||
|
||||
|
@ -67,6 +67,14 @@ export class RoomMember {
|
||||
});
|
||||
}
|
||||
|
||||
get needsRoomKey() {
|
||||
return this._data.needsRoomKey;
|
||||
}
|
||||
|
||||
set needsRoomKey(value) {
|
||||
this._data.needsRoomKey = !!value;
|
||||
}
|
||||
|
||||
get membership() {
|
||||
return this._data.membership;
|
||||
}
|
||||
@ -134,4 +142,12 @@ export class MemberChange {
|
||||
get membership() {
|
||||
return this._memberEvent.content?.membership;
|
||||
}
|
||||
|
||||
get hasLeft() {
|
||||
return this.previousMembership === "join" && this.membership !== "join";
|
||||
}
|
||||
|
||||
get hasJoined() {
|
||||
return this.previousMembership !== "join" && this.membership === "join";
|
||||
}
|
||||
}
|
||||
|
@ -98,39 +98,47 @@ export class SyncWriter {
|
||||
return {oldFragment, newFragment};
|
||||
}
|
||||
|
||||
_writeStateEvent(event, txn) {
|
||||
if (event.type === MEMBER_EVENT_TYPE) {
|
||||
const userId = event.state_key;
|
||||
if (userId) {
|
||||
const memberChange = new MemberChange(this._roomId, event);
|
||||
if (memberChange.member) {
|
||||
// as this is sync, we can just replace the member
|
||||
// if it is there already
|
||||
txn.roomMembers.set(memberChange.member.serialize());
|
||||
return memberChange;
|
||||
async _writeMember(event, trackNewlyJoined, txn) {
|
||||
const userId = event.state_key;
|
||||
if (userId) {
|
||||
const memberChange = new MemberChange(this._roomId, event);
|
||||
const {member} = memberChange;
|
||||
if (member) {
|
||||
if (trackNewlyJoined) {
|
||||
const existingMemberData = await txn.roomMembers.get(this._roomId, userId);
|
||||
// mark new members so we know who needs our the room key for our outbound megolm session
|
||||
member.needsRoomKey = existingMemberData?.needsRoomKey || memberChange.hasJoined;
|
||||
}
|
||||
txn.roomMembers.set(member.serialize());
|
||||
return memberChange;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async _writeStateEvent(event, trackNewlyJoined, txn) {
|
||||
if (event.type === MEMBER_EVENT_TYPE) {
|
||||
return await this._writeMember(event, trackNewlyJoined, txn);
|
||||
} else {
|
||||
txn.roomState.set(this._roomId, event);
|
||||
}
|
||||
}
|
||||
|
||||
_writeStateEvents(roomResponse, txn) {
|
||||
async _writeStateEvents(roomResponse, trackNewlyJoined, txn) {
|
||||
const memberChanges = new Map();
|
||||
// persist state
|
||||
const {state} = roomResponse;
|
||||
if (Array.isArray(state?.events)) {
|
||||
for (const event of state.events) {
|
||||
const memberChange = this._writeStateEvent(event, txn);
|
||||
await Promise.all(state.events.map(async event => {
|
||||
const memberChange = await this._writeStateEvent(event, trackNewlyJoined, txn);
|
||||
if (memberChange) {
|
||||
memberChanges.set(memberChange.userId, memberChange);
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
return memberChanges;
|
||||
}
|
||||
|
||||
async _writeTimeline(entries, timeline, currentKey, txn) {
|
||||
async _writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn) {
|
||||
const memberChanges = new Map();
|
||||
if (timeline.events) {
|
||||
const events = deduplicateEvents(timeline.events);
|
||||
@ -145,15 +153,17 @@ export class SyncWriter {
|
||||
}
|
||||
txn.timelineEvents.insert(entry);
|
||||
entries.push(new EventEntry(entry, this._fragmentIdComparer));
|
||||
|
||||
// process live state events first, so new member info is available
|
||||
if (typeof event.state_key === "string") {
|
||||
const memberChange = this._writeStateEvent(event, txn);
|
||||
if (memberChange) {
|
||||
memberChanges.set(memberChange.userId, memberChange);
|
||||
}
|
||||
}
|
||||
}
|
||||
// process live state events first, so new member info is available
|
||||
// also run async state event writing in parallel
|
||||
await Promise.all(events.filter(event => {
|
||||
return typeof event.state_key === "string";
|
||||
}).map(async stateEvent => {
|
||||
const memberChange = await this._writeStateEvent(stateEvent, trackNewlyJoined, txn);
|
||||
if (memberChange) {
|
||||
memberChanges.set(memberChange.userId, memberChange);
|
||||
}
|
||||
}));
|
||||
}
|
||||
return {currentKey, memberChanges};
|
||||
}
|
||||
@ -176,7 +186,18 @@ export class SyncWriter {
|
||||
}
|
||||
}
|
||||
|
||||
async writeSync(roomResponse, txn) {
|
||||
/**
|
||||
* @type {SyncWriterResult}
|
||||
* @property {Array<BaseEntry>} entries new timeline entries written
|
||||
* @property {EventKey} newLiveKey the advanced key to write events at
|
||||
* @property {Map<string, MemberChange>} memberChanges member changes in the processed sync ny user id
|
||||
*
|
||||
* @param {Object} roomResponse [description]
|
||||
* @param {Boolean} trackNewlyJoined needed to know if we need to keep track whether a user needs keys when they join an encrypted room
|
||||
* @param {Transaction} txn
|
||||
* @return {SyncWriterResult}
|
||||
*/
|
||||
async writeSync(roomResponse, trackNewlyJoined, txn) {
|
||||
const entries = [];
|
||||
const {timeline} = roomResponse;
|
||||
let currentKey = this._lastLiveKey;
|
||||
@ -198,8 +219,8 @@ export class SyncWriter {
|
||||
}
|
||||
// important this happens before _writeTimeline so
|
||||
// members are available in the transaction
|
||||
const memberChanges = this._writeStateEvents(roomResponse, txn);
|
||||
const timelineResult = await this._writeTimeline(entries, timeline, currentKey, txn);
|
||||
const memberChanges = await this._writeStateEvents(roomResponse, trackNewlyJoined, txn);
|
||||
const timelineResult = await this._writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn);
|
||||
currentKey = timelineResult.currentKey;
|
||||
// merge member changes from state and timeline, giving precedence to the latter
|
||||
for (const [userId, memberChange] of timelineResult.memberChanges.entries()) {
|
||||
|
@ -187,6 +187,14 @@ export class QueryTarget {
|
||||
return results;
|
||||
}
|
||||
|
||||
async iterateWhile(range, predicate) {
|
||||
const cursor = this._openCursor(range, "next");
|
||||
await iterateCursor(cursor, (value) => {
|
||||
const passesPredicate = predicate(value);
|
||||
return {done: !passesPredicate};
|
||||
});
|
||||
}
|
||||
|
||||
async _find(range, predicate, direction) {
|
||||
const cursor = this._openCursor(range, direction);
|
||||
let result;
|
||||
|
@ -19,6 +19,10 @@ export class OutboundGroupSessionStore {
|
||||
this._store = store;
|
||||
}
|
||||
|
||||
remove(roomId) {
|
||||
this._store.delete(roomId);
|
||||
}
|
||||
|
||||
get(roomId) {
|
||||
return this._store.get(roomId);
|
||||
}
|
||||
|
@ -60,4 +60,19 @@ export class RoomMemberStore {
|
||||
});
|
||||
return userIds;
|
||||
}
|
||||
|
||||
async getUserIdsNeedingRoomKey(roomId) {
|
||||
const userIds = [];
|
||||
const range = IDBKeyRange.lowerBound(encodeKey(roomId, ""));
|
||||
await this._roomMembersStore.iterateWhile(range, member => {
|
||||
if (member.roomId !== roomId) {
|
||||
return false;
|
||||
}
|
||||
if (member.needsRoomKey) {
|
||||
userIds.push(member.userId);
|
||||
}
|
||||
return true;
|
||||
});
|
||||
return userIds;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user