From f321968ac3854280429d98965325d2a015b0ed21 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Wed, 17 Feb 2021 18:45:04 +0100 Subject: [PATCH] add more sync logging --- src/logging/LogItem.js | 1 + src/matrix/DeviceMessageHandler.js | 15 +- src/matrix/Session.js | 14 +- src/matrix/Sync.js | 197 +++++++++++++-------------- src/matrix/e2ee/Account.js | 8 +- src/matrix/e2ee/RoomEncryption.js | 2 +- src/matrix/e2ee/megolm/Decryption.js | 51 ++++--- src/matrix/room/Room.js | 23 ++-- 8 files changed, 164 insertions(+), 147 deletions(-) diff --git a/src/logging/LogItem.js b/src/logging/LogItem.js index dc713e89..4fbddfcc 100644 --- a/src/logging/LogItem.js +++ b/src/logging/LogItem.js @@ -89,6 +89,7 @@ export class LogItem { if (!filter.filter(this, children)) { return null; } + // in (v)alues, (l)abel and (t)ype are also reserved. const item = { // (s)tart s: this._start, diff --git a/src/matrix/DeviceMessageHandler.js b/src/matrix/DeviceMessageHandler.js index e0904133..a726ba90 100644 --- a/src/matrix/DeviceMessageHandler.js +++ b/src/matrix/DeviceMessageHandler.js @@ -34,7 +34,7 @@ export class DeviceMessageHandler { /** * @return {bool} whether messages are waiting to be decrypted and `decryptPending` should be called. */ - async writeSync(toDeviceEvents, txn) { + async writeSync(toDeviceEvents, txn, log) { const encryptedEvents = toDeviceEvents.filter(e => e.type === "m.room.encrypted"); if (!encryptedEvents.length) { return false; @@ -53,14 +53,14 @@ export class DeviceMessageHandler { * @param {[type]} txn [description] * @return {[type]} [description] */ - async _writeDecryptedEvents(olmResults, txn) { + async _writeDecryptedEvents(olmResults, txn, log) { const megOlmRoomKeysResults = olmResults.filter(r => { return r.event?.type === "m.room_key" && r.event.content?.algorithm === MEGOLM_ALGORITHM; }); let roomKeys; + log.set("roomKeyCount", megOlmRoomKeysResults.length); if (megOlmRoomKeysResults.length) { - console.log("new room keys", megOlmRoomKeysResults); - roomKeys = await this._megolmDecryption.addRoomKeys(megOlmRoomKeysResults, txn); + roomKeys = await this._megolmDecryption.addRoomKeys(megOlmRoomKeysResults, txn, log); } return {roomKeys}; } @@ -76,12 +76,13 @@ export class DeviceMessageHandler { } // not safe to call multiple times without awaiting first call - async decryptPending(rooms) { + async decryptPending(rooms, log) { if (!this._olmDecryption) { return; } const readTxn = this._storage.readTxn([this._storage.storeNames.session]); const pendingEvents = await this._getPendingEvents(readTxn); + log.set("eventCount", pendingEvents.length); if (pendingEvents.length === 0) { return; } @@ -89,7 +90,7 @@ export class DeviceMessageHandler { const olmEvents = pendingEvents.filter(e => e.content?.algorithm === OLM_ALGORITHM); const decryptChanges = await this._olmDecryption.decryptAll(olmEvents); for (const err of decryptChanges.errors) { - console.warn("decryption failed for event", err, err.event); + log.child("decrypt_error").catch(err); } const txn = this._storage.readWriteTxn([ // both to remove the pending events and to modify the olm account @@ -99,7 +100,7 @@ export class DeviceMessageHandler { ]); let changes; try { - changes = await this._writeDecryptedEvents(decryptChanges.results, txn); + changes = await this._writeDecryptedEvents(decryptChanges.results, txn, log); decryptChanges.write(txn); txn.session.remove(PENDING_ENCRYPTED_EVENTS); } catch (err) { diff --git a/src/matrix/Session.js b/src/matrix/Session.js index 41d41e5c..5f306438 100644 --- a/src/matrix/Session.js +++ b/src/matrix/Session.js @@ -374,7 +374,7 @@ export class Session { } /** @internal */ - async writeSync(syncResponse, syncFilterId, txn) { + async writeSync(syncResponse, syncFilterId, txn, log) { const changes = { syncInfo: null, e2eeAccountChanges: null, @@ -390,20 +390,20 @@ export class Session { const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count; if (this._e2eeAccount && deviceOneTimeKeysCount) { - changes.e2eeAccountChanges = this._e2eeAccount.writeSync(deviceOneTimeKeysCount, txn); + changes.e2eeAccountChanges = this._e2eeAccount.writeSync(deviceOneTimeKeysCount, txn, log); } if (this._deviceTracker) { const deviceLists = syncResponse.device_lists; if (deviceLists) { - await this._deviceTracker.writeDeviceChanges(deviceLists, txn); + await log.wrap("deviceTracker", log => this._deviceTracker.writeDeviceChanges(deviceLists, txn, log)); } } const toDeviceEvents = syncResponse.to_device?.events; if (Array.isArray(toDeviceEvents)) { changes.deviceMessageDecryptionPending = - await this._deviceMessageHandler.writeSync(toDeviceEvents, txn); + await log.wrap("deviceMsgs", log => this._deviceMessageHandler.writeSync(toDeviceEvents, txn, log)); } // store account data @@ -430,10 +430,10 @@ export class Session { } /** @internal */ - async afterSyncCompleted(changes, isCatchupSync) { + async afterSyncCompleted(changes, isCatchupSync, log) { const promises = []; if (changes.deviceMessageDecryptionPending) { - promises.push(this._deviceMessageHandler.decryptPending(this.rooms)); + promises.push(log.wrap("decryptPending", log => this._deviceMessageHandler.decryptPending(this.rooms, log))); } // we don't start uploading one-time keys until we've caught up with // to-device messages, to help us avoid throwing away one-time-keys that we @@ -442,7 +442,7 @@ export class Session { if (!isCatchupSync) { const needsToUploadOTKs = await this._e2eeAccount.generateOTKsIfNeeded(this._storage); if (needsToUploadOTKs) { - promises.push(this._e2eeAccount.uploadKeys(this._storage)); + promises.push(log.wrap("uploadKeys", log => this._e2eeAccount.uploadKeys(this._storage, log))); } } if (promises.length) { diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js index 56aa4469..cf61d96c 100644 --- a/src/matrix/Sync.js +++ b/src/matrix/Sync.js @@ -90,79 +90,81 @@ export class Sync { this._syncLoop(syncToken); } - _createLogFilter(filter, log) { - if (log.duration >= 2000 || log.error || this._status.get() === SyncStatus.CatchupSync) { - return filter.minLevel(log.level.Detail); - } else { - return filter.minLevel(log.level.Info); - } - } async _syncLoop(syncToken) { // if syncToken is falsy, it will first do an initial sync ... while(this._status.get() !== SyncStatus.Stopped) { let roomStates; let sessionChanges; - try { - console.log(`starting sync request with since ${syncToken} ...`); - // unless we are happily syncing already, we want the server to return - // as quickly as possible, even if there are no events queued. This - // serves two purposes: - // - // * When the connection dies, we want to know asap when it comes back, - // so that we can hide the error from the user. (We don't want to - // have to wait for an event or a timeout). - // - // * We want to know if the server has any to_device messages queued up - // for us. We do that by calling it with a zero timeout until it - // doesn't give us any more to_device messages. - const timeout = this._status.get() === SyncStatus.Syncing ? INCREMENTAL_TIMEOUT : 0; - const syncResult = await this._logger.run("sync", - log => this._syncRequest(syncToken, timeout, log), - this._logger.level.Info, - this._createLogFilter.bind(this) - ); - syncToken = syncResult.syncToken; - roomStates = syncResult.roomStates; - sessionChanges = syncResult.sessionChanges; - // initial sync or catchup sync - if (this._status.get() !== SyncStatus.Syncing && syncResult.hadToDeviceMessages) { - this._status.set(SyncStatus.CatchupSync); + let wasCatchup = this._status.get() === SyncStatus.CatchupSync; + await this._logger.run("sync", async log => { + log.set("token", syncToken); + log.set("status", this._status.get()); + try { + // unless we are happily syncing already, we want the server to return + // as quickly as possible, even if there are no events queued. This + // serves two purposes: + // + // * When the connection dies, we want to know asap when it comes back, + // so that we can hide the error from the user. (We don't want to + // have to wait for an event or a timeout). + // + // * We want to know if the server has any to_device messages queued up + // for us. We do that by calling it with a zero timeout until it + // doesn't give us any more to_device messages. + const timeout = this._status.get() === SyncStatus.Syncing ? INCREMENTAL_TIMEOUT : 0; + const syncResult = await this._syncRequest(syncToken, timeout, log); + syncToken = syncResult.syncToken; + roomStates = syncResult.roomStates; + sessionChanges = syncResult.sessionChanges; + // initial sync or catchup sync + if (this._status.get() !== SyncStatus.Syncing && syncResult.hadToDeviceMessages) { + this._status.set(SyncStatus.CatchupSync); + } else { + this._status.set(SyncStatus.Syncing); + } + } catch (err) { + // retry same request on timeout + if (err.name === "ConnectionError" && err.isTimeout) { + // don't run afterSyncCompleted + return; + } + this._error = err; + if (err.name !== "AbortError") { + // sync wasn't asked to stop, but is stopping + // because of the error. + log.error = err; + log.logLevel = log.level.Fatal; + } + log.set("stopping", true); + this._status.set(SyncStatus.Stopped); + } + if (this._status.get() !== SyncStatus.Stopped) { + // TODO: if we're not going to run this phase in parallel with the next + // sync request (because this causes OTKs to be uploaded twice) + // should we move this inside _syncRequest? + // Alternatively, we can try to fix the OTK upload issue while still + // running in parallel. + await log.wrap("afterSyncCompleted", log => this._runAfterSyncCompleted(sessionChanges, roomStates, log)); + } + }, + this._logger.level.Info, + (filter, log) => { + if (log.duration >= 2000 || log.error || wasCatchup) { + return filter.minLevel(log.level.Detail); } else { - this._status.set(SyncStatus.Syncing); + return filter.minLevel(log.level.Info); } - } catch (err) { - // retry same request on timeout - if (err.name === "ConnectionError" && err.isTimeout) { - // don't run afterSyncCompleted - continue; - } - this._error = err; - if (err.name !== "AbortError") { - console.warn("stopping sync because of error"); - console.error(err); - } - this._status.set(SyncStatus.Stopped); - } - if (this._status.get() !== SyncStatus.Stopped) { - // TODO: if we're not going to run this phase in parallel with the next - // sync request (because this causes OTKs to be uploaded twice) - // should we move this inside _syncRequest? - // Alternatively, we can try to fix the OTK upload issue while still - // running in parallel. - await this._runAfterSyncCompleted(sessionChanges, roomStates); - } + }); } } - async _runAfterSyncCompleted(sessionChanges, roomStates) { + async _runAfterSyncCompleted(sessionChanges, roomStates, log) { const isCatchupSync = this._status.get() === SyncStatus.CatchupSync; const sessionPromise = (async () => { try { - await this._session.afterSyncCompleted(sessionChanges, isCatchupSync); - } catch (err) { - console.error("error during session afterSyncCompleted, continuing", err.stack); - } + await log.wrap("session", log => this._session.afterSyncCompleted(sessionChanges, isCatchupSync, log)); + } catch (err) {} // error is logged, but don't fail sessionPromise })(); const roomsNeedingAfterSyncCompleted = roomStates.filter(rs => { @@ -170,10 +172,8 @@ export class Sync { }); const roomsPromises = roomsNeedingAfterSyncCompleted.map(async rs => { try { - await rs.room.afterSyncCompleted(rs.changes); - } catch (err) { - console.error(`error during room ${rs.room.id} afterSyncCompleted, continuing`, err.stack); - } + await log.wrap("room", log => rs.room.afterSyncCompleted(rs.changes, log)); + } catch (err) {} // error is logged, but don't fail roomsPromises }); // run everything in parallel, // we don't want to delay the next sync too much @@ -185,7 +185,7 @@ export class Sync { async _syncRequest(syncToken, timeout, log) { let {syncFilterId} = this._session; if (typeof syncFilterId !== "string") { - this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}}); + this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}}, {log}); syncFilterId = (await this._currentRequest.response()).filter_id; } const totalRequestTimeout = timeout + (80 * 1000); // same as riot-web, don't get stuck on wedged long requests @@ -193,47 +193,46 @@ export class Sync { const response = await this._currentRequest.response(); const isInitialSync = !syncToken; - syncToken = response.next_batch; - log.set("syncToken", syncToken); - log.set("status", this._status.get()); - const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync); - await log.wrap("prepare rooms", log => this._prepareRooms(roomStates, log)); + log.set("roomCount", roomStates.length); + + await log.wrap("prepare", log => this._prepareRooms(roomStates, log)); + let sessionChanges; - const syncTxn = this._openSyncTxn(); - try { - sessionChanges = await log.wrap("session.writeSync", log => this._session.writeSync(response, syncFilterId, syncTxn, log)); - await Promise.all(roomStates.map(async rs => { - rs.changes = await log.wrap("room.writeSync", log => rs.room.writeSync( - rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log)); - })); - } catch(err) { - // avoid corrupting state by only - // storing the sync up till the point - // the exception occurred + + await log.wrap("write", async log => { + const syncTxn = this._openSyncTxn(); try { - syncTxn.abort(); - } catch (abortErr) { - console.error("Could not abort sync transaction, the sync response was probably only partially written and may have put storage in a inconsistent state.", abortErr); + sessionChanges = await log.wrap("session", log => this._session.writeSync(response, syncFilterId, syncTxn, log), log.level.Detail); + await Promise.all(roomStates.map(async rs => { + rs.changes = await log.wrap("room", log => rs.room.writeSync( + rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log), log.level.Detail); + })); + } catch(err) { + // avoid corrupting state by only + // storing the sync up till the point + // the exception occurred + try { + syncTxn.abort(); + } catch (abortErr) { + log.set("couldNotAbortTxn", true); + } + throw err; } - throw err; - } - try { await syncTxn.complete(); - console.info("syncTxn committed!!"); - } catch (err) { - console.error("unable to commit sync tranaction"); - throw err; - } - this._session.afterSync(sessionChanges); - // emit room related events after txn has been closed - for(let rs of roomStates) { - rs.room.afterSync(rs.changes); - } + }); + + log.wrap("after", log => { + log.wrap("session", log => this._session.afterSync(sessionChanges, log), log.level.Detail); + // emit room related events after txn has been closed + for(let rs of roomStates) { + log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail); + } + }); const toDeviceEvents = response.to_device?.events; return { - syncToken, + syncToken: response.next_batch, roomStates, sessionChanges, hadToDeviceMessages: Array.isArray(toDeviceEvents) && toDeviceEvents.length > 0, @@ -250,11 +249,11 @@ export class Sync { async _prepareRooms(roomStates, log) { const prepareTxn = this._openPrepareSyncTxn(); await Promise.all(roomStates.map(async rs => { - rs.preparation = await log.wrap("room.prepareSync", log => rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn, log)); + rs.preparation = await log.wrap("room", log => rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn, log), log.level.Detail); })); // This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md await prepareTxn.complete(); - await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation))); + await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation, log))); } _openSyncTxn() { diff --git a/src/matrix/e2ee/Account.js b/src/matrix/e2ee/Account.js index 7d5075ae..98ca46ad 100644 --- a/src/matrix/e2ee/Account.js +++ b/src/matrix/e2ee/Account.js @@ -80,7 +80,7 @@ export class Account { return this._identityKeys; } - async uploadKeys(storage) { + async uploadKeys(storage, log) { const oneTimeKeys = JSON.parse(this._account.one_time_keys()); // only one algorithm supported by olm atm, so hardcode its name const oneTimeKeysEntries = Object.entries(oneTimeKeys.curve25519); @@ -93,8 +93,9 @@ export class Account { if (oneTimeKeysEntries.length) { payload.one_time_keys = this._oneTimeKeysPayload(oneTimeKeysEntries); } - const response = await this._hsApi.uploadKeys(payload).response(); + const response = await this._hsApi.uploadKeys(payload, {log}).response(); this._serverOTKCount = response?.one_time_key_counts?.signed_curve25519; + log.set("serverOTKCount", this._serverOTKCount); // TODO: should we not modify this in the txn like we do elsewhere? // we'd have to pickle and unpickle the account to clone it though ... // and the upload has succeed at this point, so in-memory would be correct @@ -173,11 +174,12 @@ export class Account { txn.session.set(ACCOUNT_SESSION_KEY, this._account.pickle(this._pickleKey)); } - writeSync(deviceOneTimeKeysCount, txn) { + writeSync(deviceOneTimeKeysCount, txn, log) { // we only upload signed_curve25519 otks const otkCount = deviceOneTimeKeysCount.signed_curve25519 || 0; if (Number.isSafeInteger(otkCount) && otkCount !== this._serverOTKCount) { txn.session.set(SERVER_OTK_COUNT_SESSION_KEY, otkCount); + log.set("otkCount", otkCount); return otkCount; } } diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js index 54042bbd..d5f12a51 100644 --- a/src/matrix/e2ee/RoomEncryption.js +++ b/src/matrix/e2ee/RoomEncryption.js @@ -337,7 +337,7 @@ export class RoomEncryption { return id; } - async flushPendingRoomKeyShares(hsApi, operations = null) { + async flushPendingRoomKeyShares(hsApi, operations, log) { // this has to be reentrant as it can be called from Room.start while still running if (this._isFlushingRoomKeyShares) { return; diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js index b2221028..88a99cd7 100644 --- a/src/matrix/e2ee/megolm/Decryption.js +++ b/src/matrix/e2ee/megolm/Decryption.js @@ -122,33 +122,40 @@ export class Decryption { * @param {[type]} txn a storage transaction with read/write on inboundGroupSessions * @return {Promise>} an array with the newly added sessions */ - async addRoomKeys(decryptionResults, txn) { + async addRoomKeys(decryptionResults, txn, log) { const newSessions = []; for (const {senderCurve25519Key: senderKey, event, claimedEd25519Key} of decryptionResults) { - const roomId = event.content?.["room_id"]; - const sessionId = event.content?.["session_id"]; - const sessionKey = event.content?.["session_key"]; + await log.wrap("room_key", async log => { + const roomId = event.content?.["room_id"]; + const sessionId = event.content?.["session_id"]; + const sessionKey = event.content?.["session_key"]; - if ( - typeof roomId !== "string" || - typeof sessionId !== "string" || - typeof senderKey !== "string" || - typeof sessionKey !== "string" - ) { - return; - } + log.set("roomId", roomId); + log.set("sessionId", sessionId); - const session = new this._olm.InboundGroupSession(); - try { - session.create(sessionKey); - const sessionEntry = await this._writeInboundSession( - session, roomId, senderKey, claimedEd25519Key, sessionId, txn); - if (sessionEntry) { - newSessions.push(sessionEntry); + if ( + typeof roomId !== "string" || + typeof sessionId !== "string" || + typeof senderKey !== "string" || + typeof sessionKey !== "string" + ) { + log.logLevel = log.level.Warn; + log.set("invalid", true); + return; } - } finally { - session.free(); - } + + const session = new this._olm.InboundGroupSession(); + try { + session.create(sessionKey); + const sessionEntry = await this._writeInboundSession( + session, roomId, senderKey, claimedEd25519Key, sessionId, txn); + if (sessionEntry) { + newSessions.push(sessionEntry); + } + } finally { + session.free(); + } + }, log.level.Detail); } // this will be passed to the Room in notifyRoomKeys return newSessions; diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 1e017cf9..b9575cfb 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -176,11 +176,12 @@ export class Room extends EventEmitter { } async prepareSync(roomResponse, membership, txn, log) { - log.set("roomId", this.id); + log.set("id", this.id); const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership) let roomEncryption = this._roomEncryption; // encryption is enabled in this sync if (!roomEncryption && summaryChanges.encryption) { + log.set("enableEncryption", true); roomEncryption = this._createRoomEncryption(this, summaryChanges.encryption); } @@ -204,16 +205,19 @@ export class Room extends EventEmitter { }; } - async afterPrepareSync(preparation) { + async afterPrepareSync(preparation, parentLog) { if (preparation.decryptPreparation) { - preparation.decryptChanges = await preparation.decryptPreparation.decrypt(); - preparation.decryptPreparation = null; + await parentLog.wrap("afterPrepareSync decrypt", async log => { + log.set("id", this.id); + preparation.decryptChanges = await preparation.decryptPreparation.decrypt(); + preparation.decryptPreparation = null; + }); } } /** @package */ async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption}, txn, log) { - log.set("roomId", this.id); + log.set("id", this.id); const {entries, newLiveKey, memberChanges} = await this._syncWriter.writeSync(roomResponse, txn); if (decryptChanges) { @@ -259,7 +263,8 @@ export class Room extends EventEmitter { * Called with the changes returned from `writeSync` to apply them and emit changes. * No storage or network operations should be done here. */ - afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}) { + afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}, log) { + log.set("id", this.id); this._syncWriter.afterSync(newLiveKey); this._setEncryption(roomEncryption); if (memberChanges.size) { @@ -310,9 +315,11 @@ export class Room extends EventEmitter { * Can be used to do longer running operations that resulted from the last sync, * like network operations. */ - async afterSyncCompleted() { + async afterSyncCompleted(changes, log) { + log.set("id", this.id); if (this._roomEncryption) { - await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi); + // TODO: pass log to flushPendingRoomKeyShares once we also have a logger in `start` + await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi, null); } }