Merge pull request #88 from vector-im/bwindels/megolm-decryption-worker

Run megolm decryption in a worker
This commit is contained in:
Bruno Windels 2020-09-10 16:58:44 +00:00 committed by GitHub
commit 2f835238c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1425 additions and 382 deletions

View File

@ -19,9 +19,12 @@
<script id="main" type="module">
import {main} from "./src/main.js";
main(document.body, {
worker: "src/worker.js",
olm: {
wasm: "lib/olm/olm.wasm",
legacyBundle: "lib/olm/olm_legacy.js",
wasmBundle: "lib/olm/olm.js",
}
});
</script>
<script id="service-worker" type="disabled">

View File

@ -84,10 +84,11 @@ async function build() {
// also creates the directories where the theme css bundles are placed in,
// so do it first
const themeAssets = await copyThemeAssets(themes, legacy);
const jsBundlePath = await buildJs();
const jsLegacyBundlePath = await buildJsLegacy();
const jsBundlePath = await buildJs("src/main.js", `${PROJECT_ID}.js`);
const jsLegacyBundlePath = await buildJsLegacy("src/main.js", `${PROJECT_ID}-legacy.js`);
const jsWorkerPath = await buildWorkerJsLegacy("src/worker.js", `worker.js`);
const cssBundlePaths = await buildCssBundles(legacy ? buildCssLegacy : buildCss, themes, themeAssets);
const assetPaths = createAssetPaths(jsBundlePath, jsLegacyBundlePath, cssBundlePaths, themeAssets);
const assetPaths = createAssetPaths(jsBundlePath, jsLegacyBundlePath, jsWorkerPath, cssBundlePaths, themeAssets);
let manifestPath;
if (offline) {
@ -98,7 +99,7 @@ async function build() {
console.log(`built ${PROJECT_ID} ${version} successfully`);
}
function createAssetPaths(jsBundlePath, jsLegacyBundlePath, cssBundlePaths, themeAssets) {
function createAssetPaths(jsBundlePath, jsLegacyBundlePath, jsWorkerPath, cssBundlePaths, themeAssets) {
function trim(path) {
if (!path.startsWith(targetDir)) {
throw new Error("invalid target path: " + targetDir);
@ -108,6 +109,7 @@ function createAssetPaths(jsBundlePath, jsLegacyBundlePath, cssBundlePaths, them
return {
jsBundle: () => trim(jsBundlePath),
jsLegacyBundle: () => trim(jsLegacyBundlePath),
jsWorker: () => trim(jsWorkerPath),
cssMainBundle: () => trim(cssBundlePaths.main),
cssThemeBundle: themeName => trim(cssBundlePaths.themes[themeName]),
cssThemeBundles: () => Object.values(cssBundlePaths.themes).map(a => trim(a)),
@ -161,10 +163,14 @@ async function buildHtml(doc, version, assetPaths, manifestPath) {
findThemes(doc, (themeName, theme) => {
theme.attr("href", assetPaths.cssThemeBundle(themeName));
});
const pathsJSON = JSON.stringify({
worker: assetPaths.jsWorker(),
olm: olmFiles
});
doc("script#main").replaceWith(
`<script type="module">import {main} from "./${assetPaths.jsBundle()}"; main(document.body, ${JSON.stringify(olmFiles)});</script>` +
`<script type="module">import {main} from "./${assetPaths.jsBundle()}"; main(document.body, ${pathsJSON});</script>` +
`<script type="text/javascript" nomodule src="${assetPaths.jsLegacyBundle()}"></script>` +
`<script type="text/javascript" nomodule>${PROJECT_ID}Bundle.main(document.body, ${JSON.stringify(olmFiles)});</script>`);
`<script type="text/javascript" nomodule>${PROJECT_ID}Bundle.main(document.body, ${pathsJSON});</script>`);
removeOrEnableScript(doc("script#service-worker"), offline);
const versionScript = doc("script#version");
@ -180,23 +186,24 @@ async function buildHtml(doc, version, assetPaths, manifestPath) {
await fs.writeFile(path.join(targetDir, "index.html"), doc.html(), "utf8");
}
async function buildJs() {
async function buildJs(inputFile, outputName) {
// create js bundle
const bundle = await rollup({
input: 'src/main.js',
input: inputFile,
plugins: [removeJsComments({comments: "none"})]
});
const {output} = await bundle.generate({
format: 'es',
// TODO: can remove this?
name: `${PROJECT_ID}Bundle`
});
const code = output[0].code;
const bundlePath = resource(`${PROJECT_ID}.js`, code);
const bundlePath = resource(outputName, code);
await fs.writeFile(bundlePath, code, "utf8");
return bundlePath;
}
async function buildJsLegacy() {
async function buildJsLegacy(inputFile, outputName) {
// compile down to whatever IE 11 needs
const babelPlugin = babel.babel({
babelHelpers: 'bundled',
@ -214,7 +221,7 @@ async function buildJsLegacy() {
});
// create js bundle
const rollupConfig = {
input: ['src/legacy-polyfill.js', 'src/main.js'],
input: ['src/legacy-polyfill.js', inputFile],
plugins: [multi(), commonjs(), nodeResolve(), babelPlugin, removeJsComments({comments: "none"})]
};
const bundle = await rollup(rollupConfig);
@ -223,7 +230,39 @@ async function buildJsLegacy() {
name: `${PROJECT_ID}Bundle`
});
const code = output[0].code;
const bundlePath = resource(`${PROJECT_ID}-legacy.js`, code);
const bundlePath = resource(outputName, code);
await fs.writeFile(bundlePath, code, "utf8");
return bundlePath;
}
async function buildWorkerJsLegacy(inputFile, outputName) {
// compile down to whatever IE 11 needs
const babelPlugin = babel.babel({
babelHelpers: 'bundled',
exclude: 'node_modules/**',
presets: [
[
"@babel/preset-env",
{
useBuiltIns: "entry",
corejs: "3",
targets: "IE 11"
}
]
]
});
// create js bundle
const rollupConfig = {
input: ['src/worker-polyfill.js', inputFile],
plugins: [multi(), commonjs(), nodeResolve(), babelPlugin, removeJsComments({comments: "none"})]
};
const bundle = await rollup(rollupConfig);
const {output} = await bundle.generate({
format: 'iife',
name: `${PROJECT_ID}Bundle`
});
const code = output[0].code;
const bundlePath = resource(outputName, code);
await fs.writeFile(bundlePath, code, "utf8");
return bundlePath;
}

View File

@ -36,8 +36,7 @@ export class ViewModel extends EventEmitter {
if (!this.disposables) {
this.disposables = new Disposables();
}
this.disposables.track(disposable);
return disposable;
return this.disposables.track(disposable);
}
dispose() {

View File

@ -38,7 +38,8 @@ export class RoomViewModel extends ViewModel {
async load() {
this._room.on("change", this._onRoomChange);
try {
this._timeline = await this._room.openTimeline();
this._timeline = this.track(this._room.openTimeline());
await this._timeline.load();
this._timelineVM = new TimelineViewModel(this.childOptions({
room: this._room,
timeline: this._timeline,
@ -62,17 +63,15 @@ export class RoomViewModel extends ViewModel {
}
dispose() {
// this races with enable, on the await openTimeline()
if (this._timeline) {
// will stop the timeline from delivering updates on entries
this._timeline.close();
}
super.dispose();
if (this._clearUnreadTimout) {
this._clearUnreadTimout.abort();
this._clearUnreadTimout = null;
}
}
// called from view to close room
// parent vm will dispose this vm
close() {
this._closeCallback();
}

View File

@ -54,7 +54,7 @@ export class TimelineViewModel extends ViewModel {
if (firstTile.shape === "gap") {
return firstTile.fill();
} else {
await this._timeline.loadAtTop(50);
await this._timeline.loadAtTop(10);
return false;
}
}

View File

@ -25,6 +25,7 @@ import {BrawlViewModel} from "./domain/BrawlViewModel.js";
import {BrawlView} from "./ui/web/BrawlView.js";
import {Clock} from "./ui/web/dom/Clock.js";
import {OnlineStatus} from "./ui/web/dom/OnlineStatus.js";
import {WorkerPool} from "./utils/WorkerPool.js";
function addScript(src) {
return new Promise(function (resolve, reject) {
@ -55,10 +56,27 @@ async function loadOlm(olmPaths) {
return null;
}
// make path relative to basePath,
// assuming it and basePath are relative to document
function relPath(path, basePath) {
const idx = basePath.lastIndexOf("/");
const dir = idx === -1 ? "" : basePath.slice(0, idx);
const dirCount = dir.length ? dir.split("/").length : 0;
return "../".repeat(dirCount) + path;
}
async function loadWorker(paths) {
const workerPool = new WorkerPool(paths.worker, 4);
await workerPool.init();
const path = relPath(paths.olm.legacyBundle, paths.worker);
await workerPool.sendAll({type: "load_olm", path});
return workerPool;
}
// Don't use a default export here, as we use multiple entries during legacy build,
// which does not support default exports,
// see https://github.com/rollup/plugins/tree/master/packages/multi-entry
export async function main(container, olmPaths) {
export async function main(container, paths) {
try {
// to replay:
// const fetchLog = await (await fetch("/fetchlogs/constrainterror.json")).json();
@ -79,6 +97,13 @@ export async function main(container, olmPaths) {
const sessionInfoStorage = new SessionInfoStorage("brawl_sessions_v1");
const storageFactory = new StorageFactory();
// if wasm is not supported, we'll want
// to run some olm operations in a worker (mainly for IE11)
let workerPromise;
if (!window.WebAssembly) {
workerPromise = loadWorker(paths);
}
const vm = new BrawlViewModel({
createSessionContainer: () => {
return new SessionContainer({
@ -88,7 +113,8 @@ export async function main(container, olmPaths) {
sessionInfoStorage,
request,
clock,
olmPromise: loadOlm(olmPaths),
olmPromise: loadOlm(paths.olm),
workerPromise,
});
},
sessionInfoStorage,

View File

@ -33,7 +33,7 @@ const PICKLE_KEY = "DEFAULT_KEY";
export class Session {
// sessionInfo contains deviceId, userId and homeServer
constructor({clock, storage, hsApi, sessionInfo, olm}) {
constructor({clock, storage, hsApi, sessionInfo, olm, workerPool}) {
this._clock = clock;
this._storage = storage;
this._hsApi = hsApi;
@ -52,6 +52,7 @@ export class Session {
this._megolmEncryption = null;
this._megolmDecryption = null;
this._getSyncToken = () => this.syncToken;
this._workerPool = workerPool;
if (olm) {
this._olmUtil = new olm.Utility();
@ -100,6 +101,7 @@ export class Session {
this._megolmDecryption = new MegOlmDecryption({
pickleKey: PICKLE_KEY,
olm: this._olm,
workerPool: this._workerPool,
});
this._deviceMessageHandler.enableEncryption({olmDecryption, megolmDecryption: this._megolmDecryption});
}
@ -202,6 +204,7 @@ export class Session {
}
stop() {
this._workerPool?.dispose();
this._sendScheduler.stop();
}
@ -255,7 +258,7 @@ export class Session {
return room;
}
async writeSync(syncResponse, syncFilterId, roomChanges, txn) {
async writeSync(syncResponse, syncFilterId, txn) {
const changes = {};
const syncToken = syncResponse.next_batch;
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
@ -362,7 +365,7 @@ export function tests() {
}
}
};
const newSessionData = await session.writeSync({next_batch: "b"}, 6, {}, syncTxn);
const newSessionData = await session.writeSync({next_batch: "b"}, 6, syncTxn);
assert(syncSet);
assert.equal(session.syncToken, "a");
assert.equal(session.syncFilterId, 5);

View File

@ -42,7 +42,7 @@ export const LoginFailure = createEnum(
);
export class SessionContainer {
constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise}) {
constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise, workerPromise}) {
this._random = random;
this._clock = clock;
this._onlineStatus = onlineStatus;
@ -59,6 +59,7 @@ export class SessionContainer {
this._sessionId = null;
this._storage = null;
this._olmPromise = olmPromise;
this._workerPromise = workerPromise;
}
createNewSessionId() {
@ -152,8 +153,13 @@ export class SessionContainer {
homeServer: sessionInfo.homeServer,
};
const olm = await this._olmPromise;
let workerPool = null;
if (this._workerPromise) {
workerPool = await this._workerPromise;
}
this._session = new Session({storage: this._storage,
sessionInfo: filteredSessionInfo, hsApi, olm, clock: this._clock});
sessionInfo: filteredSessionInfo, hsApi, olm,
clock: this._clock, workerPool});
await this._session.load();
this._status.set(LoadStatus.SessionSetup);
await this._session.beforeFirstSync(isNewLogin);

View File

@ -29,21 +29,6 @@ export const SyncStatus = createEnum(
"Stopped"
);
function parseRooms(roomsSection, roomCallback) {
if (roomsSection) {
const allMemberships = ["join", "invite", "leave"];
for(const membership of allMemberships) {
const membershipSection = roomsSection[membership];
if (membershipSection) {
return Object.entries(membershipSection).map(([roomId, roomResponse]) => {
return roomCallback(roomId, roomResponse, membership);
});
}
}
}
return [];
}
function timelineIsEmpty(roomResponse) {
try {
const events = roomResponse?.timeline?.events;
@ -53,6 +38,26 @@ function timelineIsEmpty(roomResponse) {
}
}
/**
* Sync steps in js-pseudocode:
* ```js
* let preparation;
* if (room.needsPrepareSync) {
* // can only read some stores
* preparation = await room.prepareSync(roomResponse, prepareTxn);
* // can do async work that is not related to storage (such as decryption)
* preparation = await room.afterPrepareSync(preparation);
* }
* // writes and calculates changes
* const changes = await room.writeSync(roomResponse, membership, isInitialSync, preparation, syncTxn);
* // applies and emits changes once syncTxn is committed
* room.afterSync(changes);
* if (room.needsAfterSyncCompleted(changes)) {
* // can do network requests
* await room.afterSyncCompleted(changes);
* }
* ```
*/
export class Sync {
constructor({hsApi, session, storage}) {
this._hsApi = hsApi;
@ -90,13 +95,13 @@ export class Sync {
let afterSyncCompletedPromise = Promise.resolve();
// if syncToken is falsy, it will first do an initial sync ...
while(this._status.get() !== SyncStatus.Stopped) {
let roomChanges;
let roomStates;
try {
console.log(`starting sync request with since ${syncToken} ...`);
const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined;
const syncResult = await this._syncRequest(syncToken, timeout, afterSyncCompletedPromise);
syncToken = syncResult.syncToken;
roomChanges = syncResult.roomChanges;
roomStates = syncResult.roomStates;
this._status.set(SyncStatus.Syncing);
} catch (err) {
if (!(err instanceof AbortError)) {
@ -105,12 +110,12 @@ export class Sync {
}
}
if (!this._error) {
afterSyncCompletedPromise = this._runAfterSyncCompleted(roomChanges);
afterSyncCompletedPromise = this._runAfterSyncCompleted(roomStates);
}
}
}
async _runAfterSyncCompleted(roomChanges) {
async _runAfterSyncCompleted(roomStates) {
const sessionPromise = (async () => {
try {
await this._session.afterSyncCompleted();
@ -118,23 +123,22 @@ export class Sync {
console.error("error during session afterSyncCompleted, continuing", err.stack);
}
})();
let allPromises = [sessionPromise];
const roomsNeedingAfterSyncCompleted = roomChanges.filter(rc => {
return rc.changes.needsAfterSyncCompleted;
const roomsNeedingAfterSyncCompleted = roomStates.filter(rs => {
return rs.room.needsAfterSyncCompleted(rs.changes);
});
if (roomsNeedingAfterSyncCompleted.length) {
allPromises = allPromises.concat(roomsNeedingAfterSyncCompleted.map(async ({room, changes}) => {
const roomsPromises = roomsNeedingAfterSyncCompleted.map(async rs => {
try {
await room.afterSyncCompleted(changes);
await rs.room.afterSyncCompleted(rs.changes);
} catch (err) {
console.error(`error during room ${room.id} afterSyncCompleted, continuing`, err.stack);
}
}));
console.error(`error during room ${rs.room.id} afterSyncCompleted, continuing`, err.stack);
}
});
// run everything in parallel,
// we don't want to delay the next sync too much
await Promise.all(allPromises);
// Also, since all promises won't reject (as they have a try/catch)
// it's fine to use Promise.all
await Promise.all(roomsPromises.concat(sessionPromise));
}
async _syncRequest(syncToken, timeout, prevAfterSyncCompletedPromise) {
@ -152,16 +156,17 @@ export class Sync {
const isInitialSync = !syncToken;
syncToken = response.next_batch;
const syncTxn = await this._openSyncTxn();
let roomChanges = [];
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
await this._prepareRooms(roomStates);
let sessionChanges;
const syncTxn = await this._openSyncTxn();
try {
// to_device
// presence
if (response.rooms) {
roomChanges = await this._writeRoomResponses(response.rooms, isInitialSync, syncTxn);
}
sessionChanges = await this._session.writeSync(response, syncFilterId, roomChanges, syncTxn);
await Promise.all(roomStates.map(async rs => {
console.log(` * applying sync response to room ${rs.room.id} ...`);
rs.changes = await rs.room.writeSync(
rs.roomResponse, rs.membership, isInitialSync, rs.preparation, syncTxn);
}));
sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn);
} catch(err) {
console.warn("aborting syncTxn because of error");
console.error(err);
@ -180,31 +185,31 @@ export class Sync {
}
this._session.afterSync(sessionChanges);
// emit room related events after txn has been closed
for(let {room, changes} of roomChanges) {
room.afterSync(changes);
for(let rs of roomStates) {
rs.room.afterSync(rs.changes);
}
return {syncToken, roomChanges};
return {syncToken, roomStates};
}
async _writeRoomResponses(roomResponses, isInitialSync, syncTxn) {
const roomChanges = [];
const promises = parseRooms(roomResponses, async (roomId, roomResponse, membership) => {
// ignore rooms with empty timelines during initial sync,
// see https://github.com/vector-im/hydrogen-web/issues/15
if (isInitialSync && timelineIsEmpty(roomResponse)) {
return;
async _openPrepareSyncTxn() {
const storeNames = this._storage.storeNames;
return await this._storage.readTxn([
storeNames.inboundGroupSessions,
]);
}
let room = this._session.rooms.get(roomId);
if (!room) {
room = this._session.createRoom(roomId);
async _prepareRooms(roomStates) {
const prepareRoomStates = roomStates.filter(rs => rs.room.needsPrepareSync);
if (prepareRoomStates.length) {
const prepareTxn = await this._openPrepareSyncTxn();
await Promise.all(prepareRoomStates.map(async rs => {
rs.preparation = await rs.room.prepareSync(rs.roomResponse, prepareTxn);
}));
await Promise.all(prepareRoomStates.map(async rs => {
rs.preparation = await rs.room.afterPrepareSync(rs.preparation);
}));
}
console.log(` * applying sync response to room ${roomId} ...`);
const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn);
roomChanges.push({room, changes});
});
await Promise.all(promises);
return roomChanges;
}
async _openSyncTxn() {
@ -218,7 +223,6 @@ export class Sync {
storeNames.timelineFragments,
storeNames.pendingEvents,
storeNames.userIdentities,
storeNames.inboundGroupSessions,
storeNames.groupSessionDecryptions,
storeNames.deviceIdentities,
// to discard outbound session when somebody leaves a room
@ -226,6 +230,33 @@ export class Sync {
]);
}
_parseRoomsResponse(roomsSection, isInitialSync) {
const roomStates = [];
if (roomsSection) {
// don't do "invite", "leave" for now
const allMemberships = ["join"];
for(const membership of allMemberships) {
const membershipSection = roomsSection[membership];
if (membershipSection) {
for (const [roomId, roomResponse] of Object.entries(membershipSection)) {
// ignore rooms with empty timelines during initial sync,
// see https://github.com/vector-im/hydrogen-web/issues/15
if (isInitialSync && timelineIsEmpty(roomResponse)) {
return;
}
let room = this._session.rooms.get(roomId);
if (!room) {
room = this._session.createRoom(roomId);
}
roomStates.push(new RoomSyncProcessState(room, roomResponse, membership));
}
}
}
}
return roomStates;
}
stop() {
if (this._status.get() === SyncStatus.Stopped) {
return;
@ -237,3 +268,13 @@ export class Sync {
}
}
}
class RoomSyncProcessState {
constructor(room, roomResponse, membership) {
this.room = room;
this.roomResponse = roomResponse;
this.membership = membership;
this.preparation = null;
this.changes = null;
}
}

44
src/matrix/e2ee/README.md Normal file
View File

@ -0,0 +1,44 @@
## Integratation within the sync lifetime cycle
### prepareSync
The session can start its own read/write transactions here, rooms only read from a shared transaction
- session
- device handler
- txn
- write pending encrypted
- txn
- olm decryption read
- olm async decryption
- dispatch to worker
- txn
- olm decryption write / remove pending encrypted
- rooms (with shared read txn)
- megolm decryption read
### afterPrepareSync
- rooms
- megolm async decryption
- dispatch to worker
### writeSync
- rooms (with shared readwrite txn)
- megolm decryption write, yielding decrypted events
- use decrypted events to write room summary
### afterSync
- rooms
- emit changes
### afterSyncCompleted
- session
- e2ee account
- generate more otks if needed
- upload new otks if needed or device keys if not uploaded before
- rooms
- share new room keys if needed

View File

@ -14,8 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
import {MEGOLM_ALGORITHM} from "./common.js";
import {MEGOLM_ALGORITHM, DecryptionSource} from "./common.js";
import {groupBy} from "../../utils/groupBy.js";
import {mergeMap} from "../../utils/mergeMap.js";
import {makeTxnId} from "../common.js";
const ENCRYPTED_TYPE = "m.room.encrypted";
@ -55,23 +56,54 @@ export class RoomEncryption {
return await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn);
}
async decrypt(event, isSync, isTimelineOpen, retryData, txn) {
// this happens before entries exists, as they are created by the syncwriter
// but we want to be able to map it back to something in the timeline easily
// when retrying decryption.
async prepareDecryptAll(events, source, isTimelineOpen, txn) {
const errors = [];
const validEvents = [];
for (const event of events) {
if (event.redacted_because || event.unsigned?.redacted_because) {
return;
continue;
}
if (event.content?.algorithm !== MEGOLM_ALGORITHM) {
throw new Error("Unsupported algorithm: " + event.content?.algorithm);
errors.set(event.event_id, new Error("Unsupported algorithm: " + event.content?.algorithm));
}
let sessionCache = isSync ? this._megolmSyncCache : this._megolmBackfillCache;
const result = await this._megolmDecryption.decrypt(
this._room.id, event, sessionCache, txn);
if (!result) {
this._addMissingSessionEvent(event, isSync, retryData);
validEvents.push(event);
}
if (result && isTimelineOpen) {
let customCache;
let sessionCache;
if (source === DecryptionSource.Sync) {
sessionCache = this._megolmSyncCache;
} else if (source === DecryptionSource.Timeline) {
sessionCache = this._megolmBackfillCache;
} else if (source === DecryptionSource.Retry) {
// when retrying, we could have mixed events from at the bottom of the timeline (sync)
// and somewhere else, so create a custom cache we use just for this operation.
customCache = this._megolmEncryption.createSessionCache();
sessionCache = customCache;
} else {
throw new Error("Unknown source: " + source);
}
const preparation = await this._megolmDecryption.prepareDecryptAll(
this._room.id, validEvents, sessionCache, txn);
if (customCache) {
customCache.dispose();
}
return new DecryptionPreparation(preparation, errors, {isTimelineOpen}, this);
}
async _processDecryptionResults(results, errors, flags, txn) {
for (const error of errors.values()) {
if (error.code === "MEGOLM_NO_SESSION") {
this._addMissingSessionEvent(error.event);
}
}
if (flags.isTimelineOpen) {
for (const result of results.values()) {
await this._verifyDecryptionResult(result, txn);
}
return result;
}
}
async _verifyDecryptionResult(result, txn) {
@ -87,30 +119,30 @@ export class RoomEncryption {
}
}
_addMissingSessionEvent(event, isSync, data) {
_addMissingSessionEvent(event) {
const senderKey = event.content?.["sender_key"];
const sessionId = event.content?.["session_id"];
const key = `${senderKey}|${sessionId}`;
let eventIds = this._eventIdsByMissingSession.get(key);
if (!eventIds) {
eventIds = new Map();
eventIds = new Set();
this._eventIdsByMissingSession.set(key, eventIds);
}
eventIds.set(event.event_id, {data, isSync});
eventIds.add(event.event_id);
}
applyRoomKeys(roomKeys) {
// retry decryption with the new sessions
const retryEntries = [];
const retryEventIds = [];
for (const roomKey of roomKeys) {
const key = `${roomKey.senderKey}|${roomKey.sessionId}`;
const entriesForSession = this._eventIdsByMissingSession.get(key);
if (entriesForSession) {
this._eventIdsByMissingSession.delete(key);
retryEntries.push(...entriesForSession.values());
retryEventIds.push(...entriesForSession);
}
}
return retryEntries;
return retryEventIds;
}
async encrypt(type, content, hsApi) {
@ -214,3 +246,65 @@ export class RoomEncryption {
await hsApi.sendToDevice(type, payload, txnId).response();
}
}
/**
* wrappers around megolm decryption classes to be able to post-process
* the decryption results before turning them
*/
class DecryptionPreparation {
constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption) {
this._megolmDecryptionPreparation = megolmDecryptionPreparation;
this._extraErrors = extraErrors;
this._flags = flags;
this._roomEncryption = roomEncryption;
}
async decrypt() {
return new DecryptionChanges(
await this._megolmDecryptionPreparation.decrypt(),
this._extraErrors,
this._flags,
this._roomEncryption);
}
dispose() {
this._megolmDecryptionPreparation.dispose();
}
}
class DecryptionChanges {
constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption) {
this._megolmDecryptionChanges = megolmDecryptionChanges;
this._extraErrors = extraErrors;
this._flags = flags;
this._roomEncryption = roomEncryption;
}
async write(txn) {
const {results, errors} = await this._megolmDecryptionChanges.write(txn);
mergeMap(this._extraErrors, errors);
await this._roomEncryption._processDecryptionResults(results, errors, this._flags, txn);
return new BatchDecryptionResult(results, errors);
}
}
class BatchDecryptionResult {
constructor(results, errors) {
this.results = results;
this.errors = errors;
}
applyToEntries(entries) {
for (const entry of entries) {
const result = this.results.get(entry.id);
if (result) {
entry.setDecryptionResult(result);
} else {
const error = this.errors.get(entry.id);
if (error) {
entry.setDecryptionError(error);
}
}
}
}
}

View File

@ -15,6 +15,9 @@ limitations under the License.
*/
import anotherjson from "../../../lib/another-json/index.js";
import {createEnum} from "../../utils/enum.js";
export const DecryptionSource = createEnum(["Sync", "Timeline", "Retry"]);
// use common prefix so it's easy to clear properties that are not e2ee related during session clear
export const SESSION_KEY_PREFIX = "e2ee:";

View File

@ -15,102 +15,102 @@ limitations under the License.
*/
import {DecryptionError} from "../common.js";
import {DecryptionResult} from "../DecryptionResult.js";
import {groupBy} from "../../../utils/groupBy.js";
const CACHE_MAX_SIZE = 10;
import {SessionInfo} from "./decryption/SessionInfo.js";
import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js";
import {SessionDecryption} from "./decryption/SessionDecryption.js";
import {SessionCache} from "./decryption/SessionCache.js";
import {DecryptionWorker} from "./decryption/DecryptionWorker.js";
export class Decryption {
constructor({pickleKey, olm}) {
this._pickleKey = pickleKey;
this._olm = olm;
function getSenderKey(event) {
return event.content?.["sender_key"];
}
createSessionCache() {
return new SessionCache();
function getSessionId(event) {
return event.content?.["session_id"];
}
function getCiphertext(event) {
return event.content?.ciphertext;
}
export class Decryption {
constructor({pickleKey, olm, workerPool}) {
this._pickleKey = pickleKey;
this._olm = olm;
this._decryptor = workerPool ? new DecryptionWorker(workerPool) : null;
}
createSessionCache(fallback) {
return new SessionCache(fallback);
}
/**
* [decrypt description]
* Reads all the state from storage to be able to decrypt the given events.
* Decryption can then happen outside of a storage transaction.
* @param {[type]} roomId [description]
* @param {[type]} event [description]
* @param {[type]} events [description]
* @param {[type]} sessionCache [description]
* @param {[type]} txn [description]
* @return {DecryptionResult?} the decrypted event result, or undefined if the session id is not known.
* @return {DecryptionPreparation}
*/
async decrypt(roomId, event, sessionCache, txn) {
const senderKey = event.content?.["sender_key"];
const sessionId = event.content?.["session_id"];
const ciphertext = event.content?.ciphertext;
async prepareDecryptAll(roomId, events, sessionCache, txn) {
const errors = new Map();
const validEvents = [];
if (
typeof senderKey !== "string" ||
typeof sessionId !== "string" ||
typeof ciphertext !== "string"
) {
throw new DecryptionError("MEGOLM_INVALID_EVENT", event);
for (const event of events) {
const isValid = typeof getSenderKey(event) === "string" &&
typeof getSessionId(event) === "string" &&
typeof getCiphertext(event) === "string";
if (isValid) {
validEvents.push(event);
} else {
errors.set(event.event_id, new DecryptionError("MEGOLM_INVALID_EVENT", event))
}
}
let session;
let claimedKeys;
const cacheEntry = sessionCache.get(roomId, senderKey, sessionId);
if (cacheEntry) {
session = cacheEntry.session;
claimedKeys = cacheEntry.claimedKeys;
const eventsBySession = groupBy(validEvents, event => {
return `${getSenderKey(event)}|${getSessionId(event)}`;
});
const sessionDecryptions = [];
await Promise.all(Array.from(eventsBySession.values()).map(async eventsForSession => {
const first = eventsForSession[0];
const senderKey = getSenderKey(first);
const sessionId = getSessionId(first);
const sessionInfo = await this._getSessionInfo(roomId, senderKey, sessionId, sessionCache, txn);
if (!sessionInfo) {
for (const event of eventsForSession) {
errors.set(event.event_id, new DecryptionError("MEGOLM_NO_SESSION", event));
}
} else {
sessionDecryptions.push(new SessionDecryption(sessionInfo, eventsForSession, this._decryptor));
}
}));
return new DecryptionPreparation(roomId, sessionDecryptions, errors);
}
async _getSessionInfo(roomId, senderKey, sessionId, sessionCache, txn) {
let sessionInfo;
sessionInfo = sessionCache.get(roomId, senderKey, sessionId);
if (!sessionInfo) {
const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
if (sessionEntry) {
session = new this._olm.InboundGroupSession();
let session = new this._olm.InboundGroupSession();
try {
session.unpickle(this._pickleKey, sessionEntry.session);
sessionInfo = new SessionInfo(roomId, senderKey, session, sessionEntry.claimedKeys);
} catch (err) {
session.free();
throw err;
}
claimedKeys = sessionEntry.claimedKeys;
sessionCache.add(roomId, senderKey, session, claimedKeys);
sessionCache.add(sessionInfo);
}
}
if (!session) {
return;
}
const {plaintext, message_index: messageIndex} = session.decrypt(ciphertext);
let payload;
try {
payload = JSON.parse(plaintext);
} catch (err) {
throw new DecryptionError("PLAINTEXT_NOT_JSON", event, {plaintext, err});
}
if (payload.room_id !== roomId) {
throw new DecryptionError("MEGOLM_WRONG_ROOM", event,
{encryptedRoomId: payload.room_id, eventRoomId: roomId});
}
await this._handleReplayAttack(roomId, sessionId, messageIndex, event, txn);
return new DecryptionResult(payload, senderKey, claimedKeys);
}
async _handleReplayAttack(roomId, sessionId, messageIndex, event, txn) {
const eventId = event.event_id;
const timestamp = event.origin_server_ts;
const decryption = await txn.groupSessionDecryptions.get(roomId, sessionId, messageIndex);
if (decryption && decryption.eventId !== eventId) {
// the one with the newest timestamp should be the attack
const decryptedEventIsBad = decryption.timestamp < timestamp;
const badEventId = decryptedEventIsBad ? eventId : decryption.eventId;
throw new DecryptionError("MEGOLM_REPLAYED_INDEX", event, {
messageIndex,
badEventId,
otherEventId: decryption.eventId
});
}
if (!decryption) {
txn.groupSessionDecryptions.set({
roomId,
sessionId,
messageIndex,
eventId,
timestamp
});
}
return sessionInfo;
}
/**
@ -165,55 +165,3 @@ export class Decryption {
}
}
class SessionCache {
constructor() {
this._sessions = [];
}
/**
* @type {CacheEntry}
* @property {InboundGroupSession} session the unpickled session
* @property {Object} claimedKeys an object with the claimed ed25519 key
*
*
* @param {string} roomId
* @param {string} senderKey
* @param {string} sessionId
* @return {CacheEntry?}
*/
get(roomId, senderKey, sessionId) {
const idx = this._sessions.findIndex(s => {
return s.roomId === roomId &&
s.senderKey === senderKey &&
sessionId === s.session.session_id();
});
if (idx !== -1) {
const entry = this._sessions[idx];
// move to top
if (idx > 0) {
this._sessions.splice(idx, 1);
this._sessions.unshift(entry);
}
return entry;
}
}
add(roomId, senderKey, session, claimedKeys) {
// add new at top
this._sessions.unshift({roomId, senderKey, session, claimedKeys});
if (this._sessions.length > CACHE_MAX_SIZE) {
// free sessions we're about to remove
for (let i = CACHE_MAX_SIZE; i < this._sessions.length; i += 1) {
this._sessions[i].session.free();
}
this._sessions = this._sessions.slice(0, CACHE_MAX_SIZE);
}
}
dispose() {
for (const entry of this._sessions) {
entry.session.free();
}
}
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {DecryptionError} from "../../common.js";
export class DecryptionChanges {
constructor(roomId, results, errors, replayEntries) {
this._roomId = roomId;
this._results = results;
this._errors = errors;
this._replayEntries = replayEntries;
}
/**
* @type MegolmBatchDecryptionResult
* @property {Map<string, DecryptionResult>} results a map of event id to decryption result
* @property {Map<string, Error>} errors event id -> errors
*
* Handle replay attack detection, and return result
* @param {[type]} txn [description]
* @return {MegolmBatchDecryptionResult}
*/
async write(txn) {
await Promise.all(this._replayEntries.map(async replayEntry => {
try {
this._handleReplayAttack(this._roomId, replayEntry, txn);
} catch (err) {
this._errors.set(replayEntry.eventId, err);
}
}));
return {
results: this._results,
errors: this._errors
};
}
async _handleReplayAttack(roomId, replayEntry, txn) {
const {messageIndex, sessionId, eventId, timestamp} = replayEntry;
const decryption = await txn.groupSessionDecryptions.get(roomId, sessionId, messageIndex);
if (decryption && decryption.eventId !== eventId) {
// the one with the newest timestamp should be the attack
const decryptedEventIsBad = decryption.timestamp < timestamp;
const badEventId = decryptedEventIsBad ? eventId : decryption.eventId;
// discard result
this._results.delete(eventId);
throw new DecryptionError("MEGOLM_REPLAYED_INDEX", event, {
messageIndex,
badEventId,
otherEventId: decryption.eventId
});
}
if (!decryption) {
txn.groupSessionDecryptions.set({
roomId,
sessionId,
messageIndex,
eventId,
timestamp
});
}
}
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {DecryptionChanges} from "./DecryptionChanges.js";
import {mergeMap} from "../../../../utils/mergeMap.js";
/**
* Class that contains all the state loaded from storage to decrypt the given events
*/
export class DecryptionPreparation {
constructor(roomId, sessionDecryptions, errors) {
this._roomId = roomId;
this._sessionDecryptions = sessionDecryptions;
this._initialErrors = errors;
}
async decrypt() {
try {
const errors = this._initialErrors;
const results = new Map();
const replayEntries = [];
await Promise.all(this._sessionDecryptions.map(async sessionDecryption => {
const sessionResult = await sessionDecryption.decryptAll();
mergeMap(sessionResult.errors, errors);
mergeMap(sessionResult.results, results);
replayEntries.push(...sessionResult.replayEntries);
}));
return new DecryptionChanges(this._roomId, results, errors, replayEntries);
} finally {
this.dispose();
}
}
dispose() {
for (const sd of this._sessionDecryptions) {
sd.dispose();
}
}
}

View File

@ -0,0 +1,26 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
export class DecryptionWorker {
constructor(workerPool) {
this._workerPool = workerPool;
}
decrypt(session, ciphertext) {
const sessionKey = session.export_session(session.first_known_index());
return this._workerPool.send({type: "megolm_decrypt", ciphertext, sessionKey});
}
}

View File

@ -0,0 +1,6 @@
Lots of classes here. The complexity comes from needing to offload decryption to a webworker, mainly for IE11. We can't keep a idb transaction open while waiting for the response from the worker, so need to batch decryption of multiple events and do decryption in multiple steps:
1. Read all used inbound sessions for the batch of events, requires a read txn. This happens in `Decryption`. Sessions are loaded into `SessionInfo` objects, which are also kept in a `SessionCache` to prevent having to read and unpickle them all the time.
2. Actually decrypt. No txn can stay open during this step, as it can be offloaded to a worker and is thus async. This happens in `DecryptionPreparation`, which delegates to `SessionDecryption` per session.
3. Read and write for the replay detection, requires a read/write txn. This happens in `DecryptionChanges`
4. Return the decrypted entries, and errors if any

View File

@ -0,0 +1,24 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
export class ReplayDetectionEntry {
constructor(sessionId, messageIndex, event) {
this.sessionId = sessionId;
this.messageIndex = messageIndex;
this.eventId = event.event_id;
this.timestamp = event.origin_server_ts;
}
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
const CACHE_MAX_SIZE = 10;
/**
* Cache of unpickled inbound megolm session.
*/
export class SessionCache {
constructor() {
this._sessions = [];
}
/**
* @param {string} roomId
* @param {string} senderKey
* @param {string} sessionId
* @return {SessionInfo?}
*/
get(roomId, senderKey, sessionId) {
const idx = this._sessions.findIndex(s => {
return s.roomId === roomId &&
s.senderKey === senderKey &&
sessionId === s.session.session_id();
});
if (idx !== -1) {
const sessionInfo = this._sessions[idx];
// move to top
if (idx > 0) {
this._sessions.splice(idx, 1);
this._sessions.unshift(sessionInfo);
}
return sessionInfo;
}
}
add(sessionInfo) {
sessionInfo.retain();
// add new at top
this._sessions.unshift(sessionInfo);
if (this._sessions.length > CACHE_MAX_SIZE) {
// free sessions we're about to remove
for (let i = CACHE_MAX_SIZE; i < this._sessions.length; i += 1) {
this._sessions[i].release();
}
this._sessions = this._sessions.slice(0, CACHE_MAX_SIZE);
}
}
dispose() {
for (const sessionInfo of this._sessions) {
sessionInfo.release();
}
}
}

View File

@ -0,0 +1,90 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {DecryptionResult} from "../../DecryptionResult.js";
import {DecryptionError} from "../../common.js";
import {ReplayDetectionEntry} from "./ReplayDetectionEntry.js";
/**
* Does the actual decryption of all events for a given megolm session in a batch
*/
export class SessionDecryption {
constructor(sessionInfo, events, decryptor) {
sessionInfo.retain();
this._sessionInfo = sessionInfo;
this._events = events;
this._decryptor = decryptor;
this._decryptionRequests = decryptor ? [] : null;
}
async decryptAll() {
const replayEntries = [];
const results = new Map();
let errors;
const roomId = this._sessionInfo.roomId;
await Promise.all(this._events.map(async event => {
try {
const {session} = this._sessionInfo;
const ciphertext = event.content.ciphertext;
let decryptionResult;
if (this._decryptor) {
const request = this._decryptor.decrypt(session, ciphertext);
this._decryptionRequests.push(request);
decryptionResult = await request.response();
} else {
decryptionResult = session.decrypt(ciphertext);
}
const plaintext = decryptionResult.plaintext;
const messageIndex = decryptionResult.message_index;
let payload;
try {
payload = JSON.parse(plaintext);
} catch (err) {
throw new DecryptionError("PLAINTEXT_NOT_JSON", event, {plaintext, err});
}
if (payload.room_id !== roomId) {
throw new DecryptionError("MEGOLM_WRONG_ROOM", event,
{encryptedRoomId: payload.room_id, eventRoomId: roomId});
}
replayEntries.push(new ReplayDetectionEntry(session.session_id(), messageIndex, event));
const result = new DecryptionResult(payload, this._sessionInfo.senderKey, this._sessionInfo.claimedKeys);
results.set(event.event_id, result);
} catch (err) {
// ignore AbortError from cancelling decryption requests in dispose method
if (err.name === "AbortError") {
return;
}
if (!errors) {
errors = new Map();
}
errors.set(event.event_id, err);
}
}));
return {results, errors, replayEntries};
}
dispose() {
if (this._decryptionRequests) {
for (const r of this._decryptionRequests) {
r.abort();
}
}
// TODO: cancel decryptions here
this._sessionInfo.release();
}
}

View File

@ -0,0 +1,44 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/**
* session loaded in memory with everything needed to create DecryptionResults
* and to store/retrieve it in the SessionCache
*/
export class SessionInfo {
constructor(roomId, senderKey, session, claimedKeys) {
this.roomId = roomId;
this.senderKey = senderKey;
this.session = session;
this.claimedKeys = claimedKeys;
this._refCounter = 0;
}
retain() {
this._refCounter += 1;
}
release() {
this._refCounter -= 1;
if (this._refCounter <= 0) {
this.dispose();
}
}
dispose() {
this.session.free();
}
}

View File

@ -26,6 +26,9 @@ import {fetchOrLoadMembers} from "./members/load.js";
import {MemberList} from "./members/MemberList.js";
import {Heroes} from "./members/Heroes.js";
import {EventEntry} from "./timeline/entries/EventEntry.js";
import {DecryptionSource} from "../e2ee/common.js";
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
export class Room extends EventEmitter {
constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents, user, createRoomEncryption, getSyncToken}) {
@ -49,91 +52,124 @@ export class Room extends EventEmitter {
async notifyRoomKeys(roomKeys) {
if (this._roomEncryption) {
// array of {data, isSync}
let retryEntries = this._roomEncryption.applyRoomKeys(roomKeys);
let decryptedEntries = [];
if (retryEntries.length) {
// groupSessionDecryptions can be written, the other stores not
const txn = await this._storage.readWriteTxn([
let retryEventIds = this._roomEncryption.applyRoomKeys(roomKeys);
if (retryEventIds.length) {
const retryEntries = [];
const txn = await this._storage.readTxn([
this._storage.storeNames.timelineEvents,
this._storage.storeNames.inboundGroupSessions,
this._storage.storeNames.groupSessionDecryptions,
this._storage.storeNames.deviceIdentities,
]);
try {
for (const retryEntry of retryEntries) {
const {data: eventKey} = retryEntry;
let entry = this._timeline?.findEntry(eventKey);
if (!entry) {
const storageEntry = await txn.timelineEvents.get(this._roomId, eventKey);
for (const eventId of retryEventIds) {
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
if (storageEntry) {
entry = new EventEntry(storageEntry, this._fragmentIdComparer);
retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
}
}
if (entry) {
entry = await this._decryptEntry(entry, txn, retryEntry.isSync);
decryptedEntries.push(entry);
}
}
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
}
const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn);
await decryptRequest.complete();
if (this._timeline) {
// only adds if already present
this._timeline.replaceEntries(decryptedEntries);
this._timeline.replaceEntries(retryEntries);
}
// pass decryptedEntries to roomSummary
}
}
}
_enableEncryption(encryptionParams) {
this._roomEncryption = this._createRoomEncryption(this, encryptionParams);
if (this._roomEncryption) {
this._sendQueue.enableEncryption(this._roomEncryption);
if (this._timeline) {
this._timeline.enableEncryption(this._decryptEntries.bind(this));
this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
}
}
}
async _decryptEntry(entry, txn, isSync) {
if (entry.eventType === "m.room.encrypted") {
/**
* Used for decrypting when loading/filling the timeline, and retrying decryption,
* not during sync, where it is split up during the multiple phases.
*/
_decryptEntries(source, entries, inboundSessionTxn = null) {
const request = new DecryptionRequest(async r => {
if (!inboundSessionTxn) {
inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
}
if (r.cancelled) return;
const events = entries.filter(entry => {
return entry.eventType === EVENT_ENCRYPTED_TYPE;
}).map(entry => entry.event);
const isTimelineOpen = this._isTimelineOpen;
r.preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn);
if (r.cancelled) return;
const changes = await r.preparation.decrypt();
r.preparation = null;
if (r.cancelled) return;
const stores = [this._storage.storeNames.groupSessionDecryptions];
if (isTimelineOpen) {
// read to fetch devices if timeline is open
stores.push(this._storage.storeNames.deviceIdentities);
}
const writeTxn = await this._storage.readWriteTxn(stores);
let decryption;
try {
const decryptionResult = await this._roomEncryption.decrypt(
entry.event, isSync, !!this._timeline, entry.asEventKey(), txn);
if (decryptionResult) {
entry.setDecryptionResult(decryptionResult);
}
decryption = await changes.write(writeTxn);
} catch (err) {
console.warn("event decryption error", err, entry.event);
entry.setDecryptionError(err);
writeTxn.abort();
throw err;
}
}
return entry;
await writeTxn.complete();
decryption.applyToEntries(entries);
});
return request;
}
async _decryptEntries(entries, txn, isSync = false) {
return await Promise.all(entries.map(async e => this._decryptEntry(e, txn, isSync)));
get needsPrepareSync() {
// only encrypted rooms need the prepare sync steps
return !!this._roomEncryption;
}
async prepareSync(roomResponse, txn) {
if (this._roomEncryption) {
const events = roomResponse?.timeline?.events;
if (Array.isArray(events)) {
const eventsToDecrypt = events.filter(event => {
return event?.type === EVENT_ENCRYPTED_TYPE;
});
const preparation = await this._roomEncryption.prepareDecryptAll(
eventsToDecrypt, DecryptionSource.Sync, this._isTimelineOpen, txn);
return preparation;
}
}
}
async afterPrepareSync(preparation) {
if (preparation) {
const decryptChanges = await preparation.decrypt();
return decryptChanges;
}
}
/** @package */
async writeSync(roomResponse, membership, isInitialSync, txn) {
const isTimelineOpen = !!this._timeline;
async writeSync(roomResponse, membership, isInitialSync, decryptChanges, txn) {
let decryption;
if (this._roomEncryption && decryptChanges) {
decryption = await decryptChanges.write(txn);
}
const {entries, newLiveKey, memberChanges} =
await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn);
if (decryption) {
decryption.applyToEntries(entries);
}
// pass member changes to device tracker
if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) {
await this._roomEncryption.writeMemberChanges(memberChanges, txn);
}
const summaryChanges = this._summary.writeSync(
roomResponse,
membership,
isInitialSync, isTimelineOpen,
isInitialSync, this._isTimelineOpen,
txn);
const {entries: encryptedEntries, newLiveKey, memberChanges} =
await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn);
// decrypt if applicable
let entries = encryptedEntries;
if (this._roomEncryption) {
entries = await this._decryptEntries(encryptedEntries, txn, true);
}
// fetch new members while we have txn open,
// but don't make any in-memory changes yet
let heroChanges;
@ -144,10 +180,6 @@ export class Room extends EventEmitter {
}
heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn);
}
// pass member changes to device tracker
if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) {
await this._roomEncryption.writeMemberChanges(memberChanges, txn);
}
let removedPendingEvents;
if (roomResponse.timeline && roomResponse.timeline.events) {
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
@ -159,7 +191,6 @@ export class Room extends EventEmitter {
removedPendingEvents,
memberChanges,
heroChanges,
needsAfterSyncCompleted: this._roomEncryption?.needsToShareKeys(memberChanges)
};
}
@ -210,6 +241,10 @@ export class Room extends EventEmitter {
}
}
needsAfterSyncCompleted({memberChanges}) {
return this._roomEncryption?.needsToShareKeys(memberChanges);
}
/**
* Only called if the result of writeSync had `needsAfterSyncCompleted` set.
* Can be used to do longer running operations that resulted from the last sync,
@ -299,19 +334,11 @@ export class Room extends EventEmitter {
}
}).response();
let stores = [
const txn = await this._storage.readWriteTxn([
this._storage.storeNames.pendingEvents,
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineFragments,
];
if (this._roomEncryption) {
stores = stores.concat([
this._storage.storeNames.inboundGroupSessions,
this._storage.storeNames.groupSessionDecryptions,
this._storage.storeNames.deviceIdentities,
]);
}
const txn = await this._storage.readWriteTxn(stores);
let removedPendingEvents;
let gapResult;
try {
@ -324,14 +351,15 @@ export class Room extends EventEmitter {
fragmentIdComparer: this._fragmentIdComparer,
});
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
if (this._roomEncryption) {
gapResult.entries = await this._decryptEntries(gapResult.entries, txn, false);
}
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
if (this._roomEncryption) {
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries);
await decryptRequest.complete();
}
// once txn is committed, update in-memory state & emit events
for (const fragment of gapResult.fragments) {
this._fragmentIdComparer.add(fragment);
@ -406,6 +434,10 @@ export class Room extends EventEmitter {
}
}
get _isTimelineOpen() {
return !!this._timeline;
}
async clearUnread() {
if (this.isUnread || this.notificationCount) {
const txn = await this._storage.readWriteTxn([
@ -438,7 +470,7 @@ export class Room extends EventEmitter {
}
/** @public */
async openTimeline() {
openTimeline() {
if (this._timeline) {
throw new Error("not dealing with load race here for now");
}
@ -458,9 +490,8 @@ export class Room extends EventEmitter {
user: this._user,
});
if (this._roomEncryption) {
this._timeline.enableEncryption(this._decryptEntries.bind(this));
this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
}
await this._timeline.load();
return this._timeline;
}
@ -479,3 +510,25 @@ export class Room extends EventEmitter {
}
}
class DecryptionRequest {
constructor(decryptFn) {
this._cancelled = false;
this.preparation = null;
this._promise = decryptFn(this);
}
complete() {
return this._promise;
}
get cancelled() {
return this._cancelled;
}
dispose() {
this._cancelled = true;
if (this.preparation) {
this.preparation.dispose();
}
}
}

View File

@ -15,10 +15,10 @@ limitations under the License.
*/
import {SortedArray, MappedList, ConcatList} from "../../../observable/index.js";
import {Disposables} from "../../../utils/Disposables.js";
import {Direction} from "./Direction.js";
import {TimelineReader} from "./persistence/TimelineReader.js";
import {PendingEventEntry} from "./entries/PendingEventEntry.js";
import {EventEntry} from "./entries/EventEntry.js";
export class Timeline {
constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, user}) {
@ -26,12 +26,14 @@ export class Timeline {
this._storage = storage;
this._closeCallback = closeCallback;
this._fragmentIdComparer = fragmentIdComparer;
this._disposables = new Disposables();
this._remoteEntries = new SortedArray((a, b) => a.compare(b));
this._timelineReader = new TimelineReader({
roomId: this._roomId,
storage: this._storage,
fragmentIdComparer: this._fragmentIdComparer
});
this._readerRequest = null;
const localEntries = new MappedList(pendingEvents, pe => {
return new PendingEventEntry({pendingEvent: pe, user});
}, (pee, params) => {
@ -42,22 +44,13 @@ export class Timeline {
/** @package */
async load() {
const entries = await this._timelineReader.readFromEnd(50);
this._remoteEntries.setManySorted(entries);
}
findEntry(eventKey) {
// a storage event entry has a fragmentId and eventIndex property, used for sorting,
// just like an EventKey, so this will work, but perhaps a bit brittle.
const entry = new EventEntry(eventKey, this._fragmentIdComparer);
// 30 seems to be a good amount to fill the entire screen
const readerRequest = this._disposables.track(this._timelineReader.readFromEnd(30));
try {
const idx = this._remoteEntries.indexOf(entry);
if (idx !== -1) {
return this._remoteEntries.get(idx);
}
} catch (err) {
// fragmentIdComparer threw, ignore
return;
const entries = await readerRequest.complete();
this._remoteEntries.setManySorted(entries);
} finally {
this._disposables.disposeTracked(readerRequest);
}
}
@ -86,12 +79,17 @@ export class Timeline {
if (!firstEventEntry) {
return;
}
const entries = await this._timelineReader.readFrom(
const readerRequest = this._disposables.track(this._timelineReader.readFrom(
firstEventEntry.asEventKey(),
Direction.Backward,
amount
);
));
try {
const entries = await readerRequest.complete();
this._remoteEntries.setManySorted(entries);
} finally {
this._disposables.disposeTracked(readerRequest);
}
}
/** @public */
@ -100,8 +98,9 @@ export class Timeline {
}
/** @public */
close() {
dispose() {
if (this._closeCallback) {
this._disposables.dispose();
this._closeCallback();
this._closeCallback = null;
}

View File

@ -140,7 +140,7 @@ export class SyncWriter {
async _writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn) {
const memberChanges = new Map();
if (timeline.events) {
if (Array.isArray(timeline.events)) {
const events = deduplicateEvents(timeline.events);
for(const event of events) {
// store event in timeline
@ -220,6 +220,7 @@ export class SyncWriter {
// important this happens before _writeTimeline so
// members are available in the transaction
const memberChanges = await this._writeStateEvents(roomResponse, trackNewlyJoined, txn);
// TODO: remove trackNewlyJoined and pass in memberChanges
const timelineResult = await this._writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn);
currentKey = timelineResult.currentKey;
// merge member changes from state and timeline, giving precedence to the latter

View File

@ -19,6 +19,24 @@ import {Direction} from "../Direction.js";
import {EventEntry} from "../entries/EventEntry.js";
import {FragmentBoundaryEntry} from "../entries/FragmentBoundaryEntry.js";
class ReaderRequest {
constructor(fn) {
this.decryptRequest = null;
this._promise = fn(this);
}
complete() {
return this._promise;
}
dispose() {
if (this.decryptRequest) {
this.decryptRequest.dispose();
this.decryptRequest = null;
}
}
}
export class TimelineReader {
constructor({roomId, storage, fragmentIdComparer}) {
this._roomId = roomId;
@ -32,37 +50,43 @@ export class TimelineReader {
}
_openTxn() {
const stores = [
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineFragments,
];
if (this._decryptEntries) {
return this._storage.readWriteTxn([
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineFragments,
this._storage.storeNames.inboundGroupSessions,
this._storage.storeNames.groupSessionDecryptions,
this._storage.storeNames.deviceIdentities,
]);
} else {
return this._storage.readTxn([
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineFragments,
]);
stores.push(this._storage.storeNames.inboundGroupSessions);
}
return this._storage.readTxn(stores);
}
async readFrom(eventKey, direction, amount) {
readFrom(eventKey, direction, amount) {
return new ReaderRequest(async r => {
const txn = await this._openTxn();
let entries;
try {
entries = await this._readFrom(eventKey, direction, amount, txn);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
return entries;
return await this._readFrom(eventKey, direction, amount, r, txn);
});
}
async _readFrom(eventKey, direction, amount, txn) {
readFromEnd(amount) {
return new ReaderRequest(async r => {
const txn = await this._openTxn();
const liveFragment = await txn.timelineFragments.liveFragment(this._roomId);
let entries;
// room hasn't been synced yet
if (!liveFragment) {
entries = [];
} else {
this._fragmentIdComparer.add(liveFragment);
const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer);
const eventKey = liveFragmentEntry.asEventKey();
entries = await this._readFrom(eventKey, Direction.Backward, amount, r, txn);
entries.unshift(liveFragmentEntry);
}
return entries;
});
}
async _readFrom(eventKey, direction, amount, r, txn) {
let entries = [];
const timelineStore = txn.timelineEvents;
const fragmentStore = txn.timelineFragments;
@ -75,9 +99,6 @@ export class TimelineReader {
eventsWithinFragment = await timelineStore.eventsBefore(this._roomId, eventKey, amount);
}
let eventEntries = eventsWithinFragment.map(e => new EventEntry(e, this._fragmentIdComparer));
if (this._decryptEntries) {
eventEntries = await this._decryptEntries(eventEntries, txn);
}
entries = directionalConcat(entries, eventEntries, direction);
// prepend or append eventsWithinFragment to entries, and wrap them in EventEntry
@ -100,29 +121,14 @@ export class TimelineReader {
}
}
return entries;
}
async readFromEnd(amount) {
const txn = await this._openTxn();
let entries;
if (this._decryptEntries) {
r.decryptRequest = this._decryptEntries(entries, txn);
try {
const liveFragment = await txn.timelineFragments.liveFragment(this._roomId);
// room hasn't been synced yet
if (!liveFragment) {
entries = [];
} else {
this._fragmentIdComparer.add(liveFragment);
const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer);
const eventKey = liveFragmentEntry.asEventKey();
entries = await this._readFrom(eventKey, Direction.Backward, amount, txn);
entries.unshift(liveFragmentEntry);
await r.decryptRequest.complete();
} finally {
r.decryptRequest = null;
}
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
return entries;
}
}

View File

@ -28,7 +28,11 @@ export class Disposables {
}
track(disposable) {
if (this.isDisposed) {
throw new Error("Already disposed, check isDisposed after await if needed");
}
this._disposables.push(disposable);
return disposable;
}
dispose() {
@ -40,8 +44,12 @@ export class Disposables {
}
}
get isDisposed() {
return this._disposables === null;
}
disposeTracked(value) {
if (value === undefined || value === null) {
if (value === undefined || value === null || this.isDisposed) {
return null;
}
const idx = this._disposables.indexOf(value);

211
src/utils/WorkerPool.js Normal file
View File

@ -0,0 +1,211 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {AbortError} from "./error.js";
class WorkerState {
constructor(worker) {
this.worker = worker;
this.busy = false;
}
attach(pool) {
this.worker.addEventListener("message", pool);
this.worker.addEventListener("error", pool);
}
detach(pool) {
this.worker.removeEventListener("message", pool);
this.worker.removeEventListener("error", pool);
}
}
class Request {
constructor(message, pool) {
this._promise = new Promise((_resolve, _reject) => {
this._resolve = _resolve;
this._reject = _reject;
});
this._message = message;
this._pool = pool;
this._worker = null;
}
abort() {
if (this._isNotDisposed) {
this._pool._abortRequest(this);
this._dispose();
}
}
response() {
return this._promise;
}
_dispose() {
this._reject = null;
this._resolve = null;
}
get _isNotDisposed() {
return this._resolve && this._reject;
}
}
export class WorkerPool {
// TODO: extract DOM specific bits and write unit tests
constructor(path, amount) {
this._workers = [];
for (let i = 0; i < amount ; ++i) {
const worker = new WorkerState(new Worker(path));
worker.attach(this);
this._workers[i] = worker;
}
this._requests = new Map();
this._counter = 0;
this._pendingFlag = false;
this._init = null;
}
init() {
const promise = new Promise((resolve, reject) => {
this._init = {resolve, reject};
});
this.sendAll({type: "ping"})
.then(this._init.resolve, this._init.reject)
.finally(() => {
this._init = null;
});
return promise;
}
handleEvent(e) {
if (e.type === "message") {
const message = e.data;
const request = this._requests.get(message.replyToId);
if (request) {
request._worker.busy = false;
if (request._isNotDisposed) {
if (message.type === "success") {
request._resolve(message.payload);
} else if (message.type === "error") {
request._reject(new Error(message.stack));
}
request._dispose();
}
this._requests.delete(message.replyToId);
}
this._sendPending();
} else if (e.type === "error") {
if (this._init) {
this._init.reject(new Error("worker error during init"));
}
console.error("worker error", e);
}
}
_getPendingRequest() {
for (const r of this._requests.values()) {
if (!r._worker) {
return r;
}
}
}
_getFreeWorker() {
for (const w of this._workers) {
if (!w.busy) {
return w;
}
}
}
_sendPending() {
this._pendingFlag = false;
let success;
do {
success = false;
const request = this._getPendingRequest();
if (request) {
const worker = this._getFreeWorker();
if (worker) {
this._sendWith(request, worker);
success = true;
}
}
} while (success);
}
_sendWith(request, worker) {
request._worker = worker;
worker.busy = true;
worker.worker.postMessage(request._message);
}
_enqueueRequest(message) {
this._counter += 1;
message.id = this._counter;
const request = new Request(message, this);
this._requests.set(message.id, request);
return request;
}
send(message) {
const request = this._enqueueRequest(message);
const worker = this._getFreeWorker();
if (worker) {
this._sendWith(request, worker);
}
return request;
}
// assumes all workers are free atm
sendAll(message) {
const promises = this._workers.map(worker => {
const request = this._enqueueRequest(Object.assign({}, message));
this._sendWith(request, worker);
return request.response();
});
return Promise.all(promises);
}
dispose() {
for (const w of this._workers) {
w.detach(this);
w.worker.terminate();
}
}
_trySendPendingInNextTick() {
if (!this._pendingFlag) {
this._pendingFlag = true;
Promise.resolve().then(() => {
this._sendPending();
});
}
}
_abortRequest(request) {
request._reject(new AbortError());
if (request._worker) {
request._worker.busy = false;
}
this._requests.delete(request._message.id);
// allow more requests to be aborted before trying to send other pending
this._trySendPendingInNextTick();
}
}

41
src/utils/mergeMap.js Normal file
View File

@ -0,0 +1,41 @@
/*
Copyright 2020 Bruno Windels <bruno@windels.cloud>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
export function mergeMap(src, dst) {
if (src) {
for (const [key, value] of src.entries()) {
dst.set(key, value);
}
}
}
export function tests() {
return {
"mergeMap with src": assert => {
const src = new Map();
src.set(1, "a");
const dst = new Map();
dst.set(2, "b");
mergeMap(src, dst);
assert.equal(dst.get(1), "a");
assert.equal(dst.get(2), "b");
assert.equal(src.get(2), null);
},
"mergeMap without src doesn't fail": () => {
mergeMap(undefined, new Map());
}
}
}

23
src/worker-polyfill.js Normal file
View File

@ -0,0 +1,23 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// polyfills needed for IE11
// just enough to run olm, have promises and async/await
import "regenerator-runtime/runtime";
import "core-js/modules/es.promise";
import "core-js/modules/es.math.imul";
import "core-js/modules/es.math.clz32";

108
src/worker.js Normal file
View File

@ -0,0 +1,108 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
function asErrorMessage(err) {
return {
type: "error",
message: err.message,
stack: err.stack
};
}
function asSuccessMessage(payload) {
return {
type: "success",
payload
};
}
class MessageHandler {
constructor() {
this._olm = null;
}
handleEvent(e) {
if (e.type === "message") {
this._handleMessage(e.data);
}
}
_sendReply(refMessage, reply) {
reply.replyToId = refMessage.id;
self.postMessage(reply);
}
_toMessage(fn) {
try {
let payload = fn();
if (payload instanceof Promise) {
return payload.then(
payload => asSuccessMessage(payload),
err => asErrorMessage(err)
);
} else {
return asSuccessMessage(payload);
}
} catch (err) {
return asErrorMessage(err);
}
}
_loadOlm(path) {
return this._toMessage(async () => {
// might have some problems here with window vs self as global object?
if (self.msCrypto && !self.crypto) {
self.crypto = self.msCrypto;
}
self.importScripts(path);
const olm = self.olm_exports;
// mangle the globals enough to make olm load believe it is running in a browser
self.window = self;
self.document = {};
await olm.init();
delete self.document;
delete self.window;
this._olm = olm;
});
}
_megolmDecrypt(sessionKey, ciphertext) {
return this._toMessage(() => {
let session;
try {
session = new this._olm.InboundGroupSession();
session.import_session(sessionKey);
// returns object with plaintext and message_index
return session.decrypt(ciphertext);
} finally {
session?.free();
}
});
}
async _handleMessage(message) {
const {type} = message;
if (type === "ping") {
this._sendReply(message, {type: "success"});
} else if (type === "load_olm") {
this._sendReply(message, await this._loadOlm(message.path));
} else if (type === "megolm_decrypt") {
this._sendReply(message, this._megolmDecrypt(message.sessionKey, message.ciphertext));
}
}
}
self.addEventListener("message", new MessageHandler());