From cadeae98bc69dc35ba4f966e298fc34b5f7e7fdd Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 7 Feb 2023 14:13:49 +0100 Subject: [PATCH 1/9] prevent ignored signaling messages from blocking the queue signaling messages get ignored when they are not for the currently active call id. In that case we currently don't advance the lastProcessedSeqNr counter, as we had a problem before where the counter would be brought out of sync with seq numbers for other call ids. However when we've previously processed a signalling message (e.g. the counter is not undefined) and the first message in the queue is to be ignored, it will prevent the subsequent messages from being dequeued as their seq number is more than 1 away from the last processed seq. This adds an additional counter for ignored seq numbers that is also used to see if the next message is only 1 away from the next seq value. I am adding logging as well here to have a better overview in the future --- src/matrix/calls/group/Member.ts | 51 +++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index ab99c6b8..f82c5ddd 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -59,6 +59,9 @@ class MemberConnection { public retryCount: number = 0; public peerCall?: PeerCall; public lastProcessedSeqNr: number | undefined; + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + public lastIgnoredSeqNr: number | undefined; public queuedSignallingMessages: SignallingMessage[] = []; public outboundSeqCounter: number = 0; @@ -73,16 +76,24 @@ class MemberConnection { if (this.queuedSignallingMessages.length === 0) { return false; } + const first = this.queuedSignallingMessages[0]; + const firstSeq = first.content.seq; + // prevent not being able to jump over seq values of ignored messages for other call ids + // as they don't increase lastProcessedSeqNr. + if (this.lastIgnoredSeqNr !== undefined && firstSeq === this.lastIgnoredSeqNr + 1) { + return true; + } if (this.lastProcessedSeqNr === undefined) { return true; } - const first = this.queuedSignallingMessages[0]; // allow messages with both a seq we've just seen and // the next one to be dequeued as it can happen // that messages for other callIds (which could repeat seq) // are present in the queue - return first.content.seq === this.lastProcessedSeqNr || - first.content.seq === this.lastProcessedSeqNr + 1; + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + return firstSeq === this.lastProcessedSeqNr || + firstSeq === this.lastProcessedSeqNr + 1; } dispose() { @@ -382,17 +393,29 @@ export class Member { private dequeueSignallingMessages(connection: MemberConnection, peerCall: PeerCall, newMessage: SignallingMessage, syncLog: ILogItem): boolean { let hasNewMessageBeenDequeued = false; while (connection.canDequeueNextSignallingMessage) { - const message = connection.queuedSignallingMessages.shift()!; - if (message === newMessage) { - hasNewMessageBeenDequeued = true; - } - // ignore items in the queue that should not be handled and prevent - // the lastProcessedSeqNr being corrupted with the `seq` for other call ids - if (peerCall.getMessageAction(message) === IncomingMessageAction.Handle) { - const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem); - syncLog.refDetached(item); - connection.lastProcessedSeqNr = message.content.seq; - } + syncLog.wrap("dequeue message", log => { + const message = connection.queuedSignallingMessages.shift()!; + if (message === newMessage) { + log.set("isNewMsg", true); + hasNewMessageBeenDequeued = true; + } + const seq = message.content?.seq; + log.set("seq", seq); + // ignore items in the queue that should not be handled and prevent + // the lastProcessedSeqNr being corrupted with the `seq` for other call ids + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + const action = peerCall.getMessageAction(message); + if (action === IncomingMessageAction.Handle) { + const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem); + log.refDetached(item); + connection.lastProcessedSeqNr = seq; + } else { + log.set("type", message.type); + log.set("ignored", true); + connection.lastIgnoredSeqNr = seq; + } + }); } return hasNewMessageBeenDequeued; } From 39e9a43a1b365227b5a5bb04e86cfff4763de212 Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 7 Feb 2023 14:42:44 +0100 Subject: [PATCH 2/9] be strict about the first seq being 0 otherwise if first 2 messages are delivered in reverse order, the queue gets blocked --- src/matrix/calls/group/Member.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index f82c5ddd..fe6e40a4 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -84,7 +84,7 @@ class MemberConnection { return true; } if (this.lastProcessedSeqNr === undefined) { - return true; + return firstSeq === 0; } // allow messages with both a seq we've just seen and // the next one to be dequeued as it can happen From 5f4ad30d03efaa3f0a1203e9964bac0bdf7f4ef0 Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 7 Feb 2023 14:43:28 +0100 Subject: [PATCH 3/9] don't block if it does happen that we have processed a message too early allow dequeueing if the first seq in the queue is actually lower than what we already processed. Normally should not happen, but the bug fixed in the previous commit was aggravated by this behavior, so be more lenient here. --- src/matrix/calls/group/Member.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index fe6e40a4..81b1dc7d 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -92,8 +92,7 @@ class MemberConnection { // are present in the queue // XXX: Not needed anymore when seq is scoped to call_id // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 - return firstSeq === this.lastProcessedSeqNr || - firstSeq === this.lastProcessedSeqNr + 1; + return firstSeq <= (this.lastProcessedSeqNr + 1); } dispose() { From e39dd176a4b1edb5a583acb22e9ca0ff6e0aa326 Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 7 Feb 2023 14:58:32 +0100 Subject: [PATCH 4/9] remove debug logging --- src/matrix/calls/group/Member.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index 81b1dc7d..23b35bd8 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -372,6 +372,7 @@ export class Member { } if (action === IncomingMessageAction.Handle) { const idx = sortedIndex(connection.queuedSignallingMessages, message, (a, b) => a.content.seq - b.content.seq); + console.log(`splice ${message.type} at ${idx}`); connection.queuedSignallingMessages.splice(idx, 0, message); if (connection.peerCall) { const hasNewMessageBeenDequeued = this.dequeueSignallingMessages(connection, connection.peerCall, message, syncLog); From f67fb7add6ef0d06c44e9023ceb32254ee368bb1 Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 7 Feb 2023 14:58:57 +0100 Subject: [PATCH 5/9] add unit test for this particular error case --- src/matrix/calls/group/Member.ts | 114 ++++++++++++++++++++++++++++++- 1 file changed, 112 insertions(+), 2 deletions(-) diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index 23b35bd8..d1463b51 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -21,13 +21,13 @@ import {formatToDeviceMessagesPayload} from "../../common"; import {sortedIndex} from "../../../utils/sortedIndex"; import { ErrorBoundary } from "../../../utils/ErrorBoundary"; -import type {MuteSettings} from "../common"; +import {MuteSettings} from "../common"; import type {Options as PeerCallOptions, RemoteMedia} from "../PeerCall"; import type {LocalMedia} from "../LocalMedia"; import type {HomeServerApi} from "../../net/HomeServerApi"; import type {MCallBase, MGroupCallBase, SignallingMessage, CallDeviceMembership} from "../callEventTypes"; import type {GroupCall} from "./GroupCall"; -import type {RoomMember} from "../../room/members/RoomMember"; +import {RoomMember} from "../../room/members/RoomMember"; import type {EncryptedMessage} from "../../e2ee/olm/Encryption"; import type {ILogItem} from "../../../logging/types"; import type {BaseObservableValue} from "../../../observable/value"; @@ -471,3 +471,113 @@ export function isMemberExpired(callDeviceMembership: CallDeviceMembership, now: const expiresAt = memberExpiresAt(callDeviceMembership); return typeof expiresAt === "number" ? ((expiresAt + margin) <= now) : true; } + +import {ObservableValue} from "../../../observable/value"; +import {Clock as MockClock} from "../../../mocks/Clock"; +import {Instance as NullLoggerInstance} from "../../../logging/NullLogger"; + +export function tests() { + + class MockMedia { + clone(): MockMedia { return this; } + } + + class MockPeerConn { + addEventListener() {} + removeEventListener() {} + setConfiguration() {} + setRemoteDescription() {} + getReceivers() { return [{}]; } // non-empty array + getSenders() { return []; } + addTrack() { return {}; } + removeTrack() {} + close() {} + } + return { + "test queue doesn't get blocked by enqueued, then ignored device message": assert => { + // XXX we might want to refactor the queue code a bit so it's easier to test + // without having to provide so many mocks + const clock = new MockClock(); + // setup logging + const logger = NullLoggerInstance; + // const logger = new Logger({platform: {clock, random: Math.random}}); + // logger.addReporter(new ConsoleReporter()); + + // create member + const callDeviceMembership = { + ["device_id"]: "BVPIHSKXFC", + ["session_id"]: "s1d5863f41ec5a5", + ["expires_ts"]: 123, + feeds: [{purpose: "m.usermedia"}] + }; + const roomMember = RoomMember.fromUserId("!abc", "@bruno4:matrix.org", "join"); + const turnServer = new ObservableValue({}); + const options = { + confId: "conf", + ownUserId: "@foobaraccount2:matrix.org", + ownDeviceId: "CMLEZSARRT", + sessionId: "s1cece7088b9d35", + clock, + emitUpdate: () => {}, + webRTC: { + prepareSenderForPurpose: () => {}, + createPeerConnection() { + return new MockPeerConn(); + } + } + } as Options; + const member = new Member(roomMember, callDeviceMembership, options, logger.child("member")); + member.connect(new MockMedia() as LocalMedia, new MuteSettings(), turnServer, logger.child("connect")); + // pretend we've already received 3 messages + member.connection.lastProcessedSeqNr = 2; + // send hangup with seq=3, this will enqueue the message because there is no peerCall + // as it's up to @bruno4:matrix.org to send the invite + const hangup = { + type: EventType.Hangup, + content: { + "call_id": "c0ac1b0e37afe73", + "version": 1, + "reason": "invite_timeout", + "seq": 3, + "conf_id": "conf-16a120796440a6", + "device_id": "BVPIHSKXFC", + "party_id": "BVPIHSKXFC", + "sender_session_id": "s1d5863f41ec5a5", + "dest_session_id": "s1cece7088b9d35" + } + }; + member.handleDeviceMessage(hangup, logger.child("handle hangup")); + // Send an invite with seq=4, this will create a new peer call with a the call id + // when dequeueing the hangup from before, it'll get ignored because it is + // for the previous call id. + const invite = { + type: EventType.Invite, + content: { + "call_id": "c1175b12d559fb1", + "offer": { + "type": "offer", + "sdp": "..." + }, + "org.matrix.msc3077.sdp_stream_metadata": { + "60087b60-487e-4fa0-8229-b232c18e1332": { + "purpose": "m.usermedia", + "audio_muted": false, + "video_muted": false + } + }, + "version": 1, + "lifetime": 60000, + "seq": 4, + "conf_id": "conf-16a120796440a6", + "device_id": "BVPIHSKXFC", + "party_id": "BVPIHSKXFC", + "sender_session_id": "s1d5863f41ec5a5", + "dest_session_id": "s1cece7088b9d35" + } + }; + member.handleDeviceMessage(invite, logger.child("handle invite")); + assert.equal(member.connection.queuedSignallingMessages.length, 0); + // logger.reporters[0].printOpenItems(); + } + }; +} From 02108c69dc782c2737e4ddde9725c731238a8011 Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 7 Feb 2023 15:00:22 +0100 Subject: [PATCH 6/9] remove debug logging --- src/matrix/calls/group/Member.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index d1463b51..e0c9c6ca 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -372,7 +372,6 @@ export class Member { } if (action === IncomingMessageAction.Handle) { const idx = sortedIndex(connection.queuedSignallingMessages, message, (a, b) => a.content.seq - b.content.seq); - console.log(`splice ${message.type} at ${idx}`); connection.queuedSignallingMessages.splice(idx, 0, message); if (connection.peerCall) { const hasNewMessageBeenDequeued = this.dequeueSignallingMessages(connection, connection.peerCall, message, syncLog); From ddb5865ccb0defe5c80238e4565c3e6f45bdd9d6 Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 7 Feb 2023 15:00:56 +0100 Subject: [PATCH 7/9] actually forgot to dispose peerCall here when replacing --- src/matrix/calls/group/Member.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index e0c9c6ca..d88a216f 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -362,6 +362,7 @@ export class Member { syncLog.refDetached(log); } if (shouldReplace) { + connection.peerCall.dispose(); connection.peerCall = undefined; action = IncomingMessageAction.Handle; } From 7f9d64c9725c607eaac81a8744c7f95a161a28b5 Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 7 Feb 2023 19:42:20 +0100 Subject: [PATCH 8/9] improve logging of arrival of to_device call signalling messages --- src/matrix/calls/group/Member.ts | 88 ++++++++++++++++---------------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index d88a216f..daada4af 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -343,64 +343,65 @@ export class Member { /** @internal */ handleDeviceMessage(message: SignallingMessage, syncLog: ILogItem): void { this.errorBoundary.try(() => { - const {connection} = this; - if (connection) { - const destSessionId = message.content.dest_session_id; - if (destSessionId !== this.options.sessionId) { - const logItem = connection.logItem.log({l: "ignoring to_device event with wrong session_id", destSessionId, type: message.type}); - syncLog.refDetached(logItem); - return; - } - // if there is no peerCall, we either create it with an invite and Handle is implied or we'll ignore it - let action = IncomingMessageAction.Handle; - if (connection.peerCall) { - action = connection.peerCall.getMessageAction(message); - // deal with glare and replacing the call before creating new calls - if (action === IncomingMessageAction.InviteGlare) { - const {shouldReplace, log} = connection.peerCall.handleInviteGlare(message, this.deviceId, connection.logItem); - if (log) { - syncLog.refDetached(log); - } - if (shouldReplace) { - connection.peerCall.dispose(); - connection.peerCall = undefined; - action = IncomingMessageAction.Handle; + syncLog.wrap({l: "Member.handleDeviceMessage", type: message.type, seq: message.content?.seq}, log => { + const {connection} = this; + if (connection) { + const destSessionId = message.content.dest_session_id; + if (destSessionId !== this.options.sessionId) { + const logItem = connection.logItem.log({l: "ignoring to_device event with wrong session_id", destSessionId, type: message.type}); + log.refDetached(logItem); + return; + } + // if there is no peerCall, we either create it with an invite and Handle is implied or we'll ignore it + if (connection.peerCall) { + const action = connection.peerCall.getMessageAction(message); + // deal with glare and replacing the call before creating new calls + if (action === IncomingMessageAction.InviteGlare) { + const {shouldReplace, log} = connection.peerCall.handleInviteGlare(message, this.deviceId, connection.logItem); + if (log) { + log.refDetached(log); + } + if (shouldReplace) { + connection.peerCall.dispose(); + connection.peerCall = undefined; + } } } - } - if (message.type === EventType.Invite && !connection.peerCall) { - connection.peerCall = this._createPeerCall(message.content.call_id); - } - if (action === IncomingMessageAction.Handle) { + // create call on invite + if (message.type === EventType.Invite && !connection.peerCall) { + connection.peerCall = this._createPeerCall(message.content.call_id); + } + // enqueue const idx = sortedIndex(connection.queuedSignallingMessages, message, (a, b) => a.content.seq - b.content.seq); connection.queuedSignallingMessages.splice(idx, 0, message); + // dequeue as much as we can + let hasNewMessageBeenDequeued = false; if (connection.peerCall) { - const hasNewMessageBeenDequeued = this.dequeueSignallingMessages(connection, connection.peerCall, message, syncLog); - if (!hasNewMessageBeenDequeued) { - syncLog.refDetached(connection.logItem.log({l: "queued signalling message", type: message.type, seq: message.content.seq})); - } + hasNewMessageBeenDequeued = this.dequeueSignallingMessages(connection, connection.peerCall, message, log); } - } else if (action === IncomingMessageAction.Ignore && connection.peerCall) { - const logItem = connection.logItem.log({l: "ignoring to_device event with wrong call_id", callId: message.content.call_id, type: message.type}); - syncLog.refDetached(logItem); + if (!hasNewMessageBeenDequeued) { + log.refDetached(connection.logItem.log({l: "queued message", type: message.type, seq: message.content.seq, idx})); + } + } else { + // TODO: the right thing to do here would be to at least enqueue the message rather than drop it, + // and if it's up to the other end to send the invite and the type is an invite to actually + // call connect and assume the m.call.member state update is on its way? + syncLog.log({l: "member not connected", userId: this.userId, deviceId: this.deviceId}); } - } else { - syncLog.log({l: "member not connected", userId: this.userId, deviceId: this.deviceId}); - } + }); }); } private dequeueSignallingMessages(connection: MemberConnection, peerCall: PeerCall, newMessage: SignallingMessage, syncLog: ILogItem): boolean { let hasNewMessageBeenDequeued = false; while (connection.canDequeueNextSignallingMessage) { - syncLog.wrap("dequeue message", log => { - const message = connection.queuedSignallingMessages.shift()!; - if (message === newMessage) { - log.set("isNewMsg", true); - hasNewMessageBeenDequeued = true; - } + const message = connection.queuedSignallingMessages.shift()!; + const isNewMsg = message === newMessage; + hasNewMessageBeenDequeued = hasNewMessageBeenDequeued || isNewMsg; + syncLog.wrap(isNewMsg ? "process message" : "dequeue message", log => { const seq = message.content?.seq; log.set("seq", seq); + log.set("type", message.type); // ignore items in the queue that should not be handled and prevent // the lastProcessedSeqNr being corrupted with the `seq` for other call ids // XXX: Not needed anymore when seq is scoped to call_id @@ -411,7 +412,6 @@ export class Member { log.refDetached(item); connection.lastProcessedSeqNr = seq; } else { - log.set("type", message.type); log.set("ignored", true); connection.lastIgnoredSeqNr = seq; } From 0fa9d193d970e5da96018bc56ca691f9bc14d30a Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 7 Feb 2023 19:42:43 +0100 Subject: [PATCH 9/9] fix comment typo --- src/matrix/calls/PeerCall.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/matrix/calls/PeerCall.ts b/src/matrix/calls/PeerCall.ts index 36136b97..2b28fbef 100644 --- a/src/matrix/calls/PeerCall.ts +++ b/src/matrix/calls/PeerCall.ts @@ -303,7 +303,7 @@ export class PeerCall implements IDisposable { } handleIncomingSignallingMessage(message: SignallingMessage, partyId: PartyId, log: ILogItem): ILogItem { - // return logItem item immediately so it can be references in sync manner + // return logItem item immediately so it can be referenced by the sync log let logItem; log.wrap({ l: "receive signalling message",