From cadeae98bc69dc35ba4f966e298fc34b5f7e7fdd Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 7 Feb 2023 14:13:49 +0100 Subject: [PATCH] prevent ignored signaling messages from blocking the queue signaling messages get ignored when they are not for the currently active call id. In that case we currently don't advance the lastProcessedSeqNr counter, as we had a problem before where the counter would be brought out of sync with seq numbers for other call ids. However when we've previously processed a signalling message (e.g. the counter is not undefined) and the first message in the queue is to be ignored, it will prevent the subsequent messages from being dequeued as their seq number is more than 1 away from the last processed seq. This adds an additional counter for ignored seq numbers that is also used to see if the next message is only 1 away from the next seq value. I am adding logging as well here to have a better overview in the future --- src/matrix/calls/group/Member.ts | 51 +++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index ab99c6b8..f82c5ddd 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -59,6 +59,9 @@ class MemberConnection { public retryCount: number = 0; public peerCall?: PeerCall; public lastProcessedSeqNr: number | undefined; + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + public lastIgnoredSeqNr: number | undefined; public queuedSignallingMessages: SignallingMessage[] = []; public outboundSeqCounter: number = 0; @@ -73,16 +76,24 @@ class MemberConnection { if (this.queuedSignallingMessages.length === 0) { return false; } + const first = this.queuedSignallingMessages[0]; + const firstSeq = first.content.seq; + // prevent not being able to jump over seq values of ignored messages for other call ids + // as they don't increase lastProcessedSeqNr. + if (this.lastIgnoredSeqNr !== undefined && firstSeq === this.lastIgnoredSeqNr + 1) { + return true; + } if (this.lastProcessedSeqNr === undefined) { return true; } - const first = this.queuedSignallingMessages[0]; // allow messages with both a seq we've just seen and // the next one to be dequeued as it can happen // that messages for other callIds (which could repeat seq) // are present in the queue - return first.content.seq === this.lastProcessedSeqNr || - first.content.seq === this.lastProcessedSeqNr + 1; + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + return firstSeq === this.lastProcessedSeqNr || + firstSeq === this.lastProcessedSeqNr + 1; } dispose() { @@ -382,17 +393,29 @@ export class Member { private dequeueSignallingMessages(connection: MemberConnection, peerCall: PeerCall, newMessage: SignallingMessage, syncLog: ILogItem): boolean { let hasNewMessageBeenDequeued = false; while (connection.canDequeueNextSignallingMessage) { - const message = connection.queuedSignallingMessages.shift()!; - if (message === newMessage) { - hasNewMessageBeenDequeued = true; - } - // ignore items in the queue that should not be handled and prevent - // the lastProcessedSeqNr being corrupted with the `seq` for other call ids - if (peerCall.getMessageAction(message) === IncomingMessageAction.Handle) { - const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem); - syncLog.refDetached(item); - connection.lastProcessedSeqNr = message.content.seq; - } + syncLog.wrap("dequeue message", log => { + const message = connection.queuedSignallingMessages.shift()!; + if (message === newMessage) { + log.set("isNewMsg", true); + hasNewMessageBeenDequeued = true; + } + const seq = message.content?.seq; + log.set("seq", seq); + // ignore items in the queue that should not be handled and prevent + // the lastProcessedSeqNr being corrupted with the `seq` for other call ids + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + const action = peerCall.getMessageAction(message); + if (action === IncomingMessageAction.Handle) { + const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem); + log.refDetached(item); + connection.lastProcessedSeqNr = seq; + } else { + log.set("type", message.type); + log.set("ignored", true); + connection.lastIgnoredSeqNr = seq; + } + }); } return hasNewMessageBeenDequeued; }