From 137264edcbd94f274bac7a74e2ebb1a426bdb274 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 30 Oct 2020 15:19:51 +0100 Subject: [PATCH] implement subscribing to a single event --- src/matrix/room/ObservedEventMap.js | 90 ++++++++++++++++++++++++++++ src/matrix/room/Room.js | 48 ++++++++++++++- src/matrix/room/timeline/Timeline.js | 9 +++ src/observable/BaseObservable.js | 4 ++ 4 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 src/matrix/room/ObservedEventMap.js diff --git a/src/matrix/room/ObservedEventMap.js b/src/matrix/room/ObservedEventMap.js new file mode 100644 index 00000000..1e21df63 --- /dev/null +++ b/src/matrix/room/ObservedEventMap.js @@ -0,0 +1,90 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import {BaseObservableValue} from "../../observable/ObservableValue.js"; + +export class ObservedEventMap { + constructor(notifyEmpty) { + this._map = new Map(); + this._notifyEmpty = notifyEmpty; + } + + observe(eventId, eventEntry = null) { + let observable = this._map.get(eventId); + if (!observable) { + observable = new ObservedEvent(this, eventEntry); + this._map.set(eventId, observable); + } + return observable; + } + + updateEvents(eventEntries) { + for (let i = 0; i < eventEntries.length; i += 1) { + const entry = eventEntries[i]; + const observable = this._map.get(entry.id); + observable?.update(entry); + } + } + + _remove(observable) { + this._map.delete(observable.get().id); + if (this._map.size === 0) { + this._notifyEmpty(); + } + } +} + +class ObservedEvent extends BaseObservableValue { + constructor(eventMap, entry) { + super(); + this._eventMap = eventMap; + this._entry = entry; + // remove subscription in microtask after creating it + // otherwise ObservedEvents would easily never get + // removed if you never subscribe + Promise.resolve().then(() => { + if (!this.hasSubscriptions) { + this._eventMap.remove(this); + this._eventMap = null; + } + }); + } + + subscribe(handler) { + if (!this._eventMap) { + throw new Error("ObservedEvent expired, subscribe right after calling room.observeEvent()"); + } + return super.subscribe(handler); + } + + onUnsubscribeLast() { + this._eventMap._remove(this); + this._eventMap = null; + super.onUnsubscribeLast(); + } + + update(entry) { + // entries are mostly updated in-place, + // apart from when they are created, + // but doesn't hurt to reassign + this._entry = entry; + this.emit(this._entry); + } + + get() { + return this._entry; + } +} diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index f12da45b..b2d2b635 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -29,8 +29,8 @@ import {Heroes} from "./members/Heroes.js"; import {EventEntry} from "./timeline/entries/EventEntry.js"; import {EventKey} from "./timeline/EventKey.js"; import {Direction} from "./timeline/Direction.js"; +import {ObservedEventMap} from "./ObservedEventMap.js"; import {DecryptionSource} from "../e2ee/common.js"; - const EVENT_ENCRYPTED_TYPE = "m.room.encrypted"; export class Room extends EventEmitter { @@ -53,6 +53,7 @@ export class Room extends EventEmitter { this._roomEncryption = null; this._getSyncToken = getSyncToken; this._clock = clock; + this._observedEvents = null; } _readRetryDecryptCandidateEntries(sinceEventKey, txn) { @@ -165,6 +166,9 @@ export class Room extends EventEmitter { } await writeTxn.complete(); decryption.applyToEntries(entries); + if (this._observedEvents) { + this._observedEvents.updateEvents(entries); + } }); return request; } @@ -285,6 +289,9 @@ export class Room extends EventEmitter { if (this._timeline) { this._timeline.appendLiveEntries(newTimelineEntries); } + if (this._observedEvents) { + this._observedEvents.updateEvents(newTimelineEntries); + } if (removedPendingEvents) { this._sendQueue.emitRemovals(removedPendingEvents); } @@ -580,6 +587,45 @@ export class Room extends EventEmitter { this._summary.applyChanges(changes); } + observeEvent(eventId) { + if (!this._observedEvents) { + this._observedEvents = new ObservedEventMap(() => { + this._observedEvents = null; + }); + } + let entry = null; + if (this._timeline) { + entry = this._timeline.getByEventId(eventId); + } + const observable = this._observedEvents.observe(eventId, entry); + if (!entry) { + // update in the background + this._readEventById(eventId).then(entry => { + observable.update(entry); + }).catch(err => { + console.warn(`could not load event ${eventId} from storage`, err); + }); + } + return observable; + } + + async _readEventById(eventId) { + let stores = [this._storage.storeNames.timelineEvents]; + if (this.isEncrypted) { + stores.push(this._storage.storeNames.inboundGroupSessions); + } + const txn = this._storage.readTxn(stores); + const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); + if (storageEntry) { + const entry = new EventEntry(storageEntry, this._fragmentIdComparer); + if (entry.eventType === EVENT_ENCRYPTED_TYPE) { + const request = this._decryptEntries(DecryptionSource.Timeline, [entry], txn); + await request.complete(); + } + return entry; + } + } + dispose() { this._roomEncryption?.dispose(); this._timeline?.dispose(); diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index 8cad17a1..1b7c8a18 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -95,6 +95,15 @@ export class Timeline { } } + getByEventId(eventId) { + for (let i = 0; i < this._remoteEntries.length; i += 1) { + const entry = this._remoteEntries.get(i); + if (entry.id === eventId) { + return entry; + } + } + } + /** @public */ get entries() { return this._allEntries; diff --git a/src/observable/BaseObservable.js b/src/observable/BaseObservable.js index 660f3200..29387020 100644 --- a/src/observable/BaseObservable.js +++ b/src/observable/BaseObservable.js @@ -48,6 +48,10 @@ export class BaseObservable { return null; } + get hasSubscriptions() { + return this._handlers.size !== 0; + } + // Add iterator over handlers here }