diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index ab99c6b8..f82c5ddd 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -59,6 +59,9 @@ class MemberConnection { public retryCount: number = 0; public peerCall?: PeerCall; public lastProcessedSeqNr: number | undefined; + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + public lastIgnoredSeqNr: number | undefined; public queuedSignallingMessages: SignallingMessage[] = []; public outboundSeqCounter: number = 0; @@ -73,16 +76,24 @@ class MemberConnection { if (this.queuedSignallingMessages.length === 0) { return false; } + const first = this.queuedSignallingMessages[0]; + const firstSeq = first.content.seq; + // prevent not being able to jump over seq values of ignored messages for other call ids + // as they don't increase lastProcessedSeqNr. + if (this.lastIgnoredSeqNr !== undefined && firstSeq === this.lastIgnoredSeqNr + 1) { + return true; + } if (this.lastProcessedSeqNr === undefined) { return true; } - const first = this.queuedSignallingMessages[0]; // allow messages with both a seq we've just seen and // the next one to be dequeued as it can happen // that messages for other callIds (which could repeat seq) // are present in the queue - return first.content.seq === this.lastProcessedSeqNr || - first.content.seq === this.lastProcessedSeqNr + 1; + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + return firstSeq === this.lastProcessedSeqNr || + firstSeq === this.lastProcessedSeqNr + 1; } dispose() { @@ -382,17 +393,29 @@ export class Member { private dequeueSignallingMessages(connection: MemberConnection, peerCall: PeerCall, newMessage: SignallingMessage, syncLog: ILogItem): boolean { let hasNewMessageBeenDequeued = false; while (connection.canDequeueNextSignallingMessage) { - const message = connection.queuedSignallingMessages.shift()!; - if (message === newMessage) { - hasNewMessageBeenDequeued = true; - } - // ignore items in the queue that should not be handled and prevent - // the lastProcessedSeqNr being corrupted with the `seq` for other call ids - if (peerCall.getMessageAction(message) === IncomingMessageAction.Handle) { - const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem); - syncLog.refDetached(item); - connection.lastProcessedSeqNr = message.content.seq; - } + syncLog.wrap("dequeue message", log => { + const message = connection.queuedSignallingMessages.shift()!; + if (message === newMessage) { + log.set("isNewMsg", true); + hasNewMessageBeenDequeued = true; + } + const seq = message.content?.seq; + log.set("seq", seq); + // ignore items in the queue that should not be handled and prevent + // the lastProcessedSeqNr being corrupted with the `seq` for other call ids + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + const action = peerCall.getMessageAction(message); + if (action === IncomingMessageAction.Handle) { + const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem); + log.refDetached(item); + connection.lastProcessedSeqNr = seq; + } else { + log.set("type", message.type); + log.set("ignored", true); + connection.lastIgnoredSeqNr = seq; + } + }); } return hasNewMessageBeenDequeued; }