log event id when sending and when receiving remote echo on sync

This commit is contained in:
Bruno Windels 2021-02-23 19:58:01 +01:00
parent 7b7907add0
commit 0cbf6008a2
3 changed files with 56 additions and 55 deletions

View File

@ -246,7 +246,7 @@ export class Room extends EventEmitter {
}
let removedPendingEvents;
if (Array.isArray(roomResponse.timeline?.events)) {
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn, log);
}
return {
summaryChanges,
@ -405,57 +405,59 @@ export class Room extends EventEmitter {
}
/** @public */
async fillGap(fragmentEntry, amount) {
fillGap(fragmentEntry, amount, log = null) {
// TODO move some/all of this out of Room
if (fragmentEntry.edgeReached) {
return;
}
const response = await this._hsApi.messages(this._roomId, {
from: fragmentEntry.token,
dir: fragmentEntry.direction.asApiString(),
limit: amount,
filter: {
lazy_load_members: true,
include_redundant_members: true,
return this._platform.logger.wrapOrRun(log, "fillGap", async log => {
if (fragmentEntry.edgeReached) {
return;
}
}).response();
const response = await this._hsApi.messages(this._roomId, {
from: fragmentEntry.token,
dir: fragmentEntry.direction.asApiString(),
limit: amount,
filter: {
lazy_load_members: true,
include_redundant_members: true,
}
}, {log}).response();
const txn = this._storage.readWriteTxn([
this._storage.storeNames.pendingEvents,
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineFragments,
]);
let removedPendingEvents;
let gapResult;
try {
// detect remote echos of pending messages in the gap
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn);
// write new events into gap
const gapWriter = new GapWriter({
roomId: this._roomId,
storage: this._storage,
fragmentIdComparer: this._fragmentIdComparer,
});
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
if (this._roomEncryption) {
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries);
await decryptRequest.complete();
}
// once txn is committed, update in-memory state & emit events
for (const fragment of gapResult.fragments) {
this._fragmentIdComparer.add(fragment);
}
if (removedPendingEvents) {
this._sendQueue.emitRemovals(removedPendingEvents);
}
if (this._timeline) {
this._timeline.addGapEntries(gapResult.entries);
}
const txn = this._storage.readWriteTxn([
this._storage.storeNames.pendingEvents,
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineFragments,
]);
let removedPendingEvents;
let gapResult;
try {
// detect remote echos of pending messages in the gap
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn, log);
// write new events into gap
const gapWriter = new GapWriter({
roomId: this._roomId,
storage: this._storage,
fragmentIdComparer: this._fragmentIdComparer,
});
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
if (this._roomEncryption) {
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries);
await decryptRequest.complete();
}
// once txn is committed, update in-memory state & emit events
for (const fragment of gapResult.fragments) {
this._fragmentIdComparer.add(fragment);
}
if (removedPendingEvents) {
this._sendQueue.emitRemovals(removedPendingEvents);
}
if (this._timeline) {
this._timeline.addGapEntries(gapResult.entries);
}
});
}
/** @public */

View File

@ -30,10 +30,7 @@ export class PendingEvent {
constructor({data, remove, emitUpdate, attachments}) {
this._data = data;
this._attachments = attachments;
this._emitUpdate = () => {
console.log("PendingEvent status", this.status, this._attachments && Object.entries(this._attachments).map(([key, a]) => `${key}: ${a.sentBytes}/${a.size}`));
emitUpdate();
};
this._emitUpdate = emitUpdate;
this._removeFromQueueCallback = remove;
this._aborted = false;
this._status = SendStatus.Waiting;
@ -169,6 +166,7 @@ export class PendingEvent {
const response = await this._sendRequest.response();
this._sendRequest = null;
this._data.remoteId = response.event_id;
log.set("id", this._data.remoteId);
this._status = SendStatus.Sent;
this._emitUpdate("status");
}

View File

@ -53,7 +53,7 @@ export class SendQueue {
for (let i = 0; i < this._pendingEvents.length; i += 1) {
await log.wrap("send event", async log => {
const pendingEvent = this._pendingEvents.get(i);
log.set("id", pendingEvent.queueIndex);
log.set("queueIndex", pendingEvent.queueIndex);
try {
await this._sendEvent(pendingEvent, log);
} catch(err) {
@ -93,7 +93,7 @@ export class SendQueue {
}
}
removeRemoteEchos(events, txn) {
removeRemoteEchos(events, txn, parentLog) {
const removed = [];
for (const event of events) {
const txnId = event.unsigned && event.unsigned.transaction_id;
@ -105,6 +105,7 @@ export class SendQueue {
}
if (idx !== -1) {
const pendingEvent = this._pendingEvents.get(idx);
parentLog.log({l: "removeRemoteEcho", id: pendingEvent.remoteId});
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
removed.push(pendingEvent);
}