diff --git a/src/matrix/calls/PeerCall.ts b/src/matrix/calls/PeerCall.ts index 36136b97..2b28fbef 100644 --- a/src/matrix/calls/PeerCall.ts +++ b/src/matrix/calls/PeerCall.ts @@ -303,7 +303,7 @@ export class PeerCall implements IDisposable { } handleIncomingSignallingMessage(message: SignallingMessage, partyId: PartyId, log: ILogItem): ILogItem { - // return logItem item immediately so it can be references in sync manner + // return logItem item immediately so it can be referenced by the sync log let logItem; log.wrap({ l: "receive signalling message", diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index ab99c6b8..daada4af 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -21,13 +21,13 @@ import {formatToDeviceMessagesPayload} from "../../common"; import {sortedIndex} from "../../../utils/sortedIndex"; import { ErrorBoundary } from "../../../utils/ErrorBoundary"; -import type {MuteSettings} from "../common"; +import {MuteSettings} from "../common"; import type {Options as PeerCallOptions, RemoteMedia} from "../PeerCall"; import type {LocalMedia} from "../LocalMedia"; import type {HomeServerApi} from "../../net/HomeServerApi"; import type {MCallBase, MGroupCallBase, SignallingMessage, CallDeviceMembership} from "../callEventTypes"; import type {GroupCall} from "./GroupCall"; -import type {RoomMember} from "../../room/members/RoomMember"; +import {RoomMember} from "../../room/members/RoomMember"; import type {EncryptedMessage} from "../../e2ee/olm/Encryption"; import type {ILogItem} from "../../../logging/types"; import type {BaseObservableValue} from "../../../observable/value"; @@ -59,6 +59,9 @@ class MemberConnection { public retryCount: number = 0; public peerCall?: PeerCall; public lastProcessedSeqNr: number | undefined; + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + public lastIgnoredSeqNr: number | undefined; public queuedSignallingMessages: SignallingMessage[] = []; public outboundSeqCounter: number = 0; @@ -73,16 +76,23 @@ class MemberConnection { if (this.queuedSignallingMessages.length === 0) { return false; } - if (this.lastProcessedSeqNr === undefined) { + const first = this.queuedSignallingMessages[0]; + const firstSeq = first.content.seq; + // prevent not being able to jump over seq values of ignored messages for other call ids + // as they don't increase lastProcessedSeqNr. + if (this.lastIgnoredSeqNr !== undefined && firstSeq === this.lastIgnoredSeqNr + 1) { return true; } - const first = this.queuedSignallingMessages[0]; + if (this.lastProcessedSeqNr === undefined) { + return firstSeq === 0; + } // allow messages with both a seq we've just seen and // the next one to be dequeued as it can happen // that messages for other callIds (which could repeat seq) // are present in the queue - return first.content.seq === this.lastProcessedSeqNr || - first.content.seq === this.lastProcessedSeqNr + 1; + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + return firstSeq <= (this.lastProcessedSeqNr + 1); } dispose() { @@ -333,49 +343,52 @@ export class Member { /** @internal */ handleDeviceMessage(message: SignallingMessage, syncLog: ILogItem): void { this.errorBoundary.try(() => { - const {connection} = this; - if (connection) { - const destSessionId = message.content.dest_session_id; - if (destSessionId !== this.options.sessionId) { - const logItem = connection.logItem.log({l: "ignoring to_device event with wrong session_id", destSessionId, type: message.type}); - syncLog.refDetached(logItem); - return; - } - // if there is no peerCall, we either create it with an invite and Handle is implied or we'll ignore it - let action = IncomingMessageAction.Handle; - if (connection.peerCall) { - action = connection.peerCall.getMessageAction(message); - // deal with glare and replacing the call before creating new calls - if (action === IncomingMessageAction.InviteGlare) { - const {shouldReplace, log} = connection.peerCall.handleInviteGlare(message, this.deviceId, connection.logItem); - if (log) { - syncLog.refDetached(log); - } - if (shouldReplace) { - connection.peerCall = undefined; - action = IncomingMessageAction.Handle; + syncLog.wrap({l: "Member.handleDeviceMessage", type: message.type, seq: message.content?.seq}, log => { + const {connection} = this; + if (connection) { + const destSessionId = message.content.dest_session_id; + if (destSessionId !== this.options.sessionId) { + const logItem = connection.logItem.log({l: "ignoring to_device event with wrong session_id", destSessionId, type: message.type}); + log.refDetached(logItem); + return; + } + // if there is no peerCall, we either create it with an invite and Handle is implied or we'll ignore it + if (connection.peerCall) { + const action = connection.peerCall.getMessageAction(message); + // deal with glare and replacing the call before creating new calls + if (action === IncomingMessageAction.InviteGlare) { + const {shouldReplace, log} = connection.peerCall.handleInviteGlare(message, this.deviceId, connection.logItem); + if (log) { + log.refDetached(log); + } + if (shouldReplace) { + connection.peerCall.dispose(); + connection.peerCall = undefined; + } } } - } - if (message.type === EventType.Invite && !connection.peerCall) { - connection.peerCall = this._createPeerCall(message.content.call_id); - } - if (action === IncomingMessageAction.Handle) { + // create call on invite + if (message.type === EventType.Invite && !connection.peerCall) { + connection.peerCall = this._createPeerCall(message.content.call_id); + } + // enqueue const idx = sortedIndex(connection.queuedSignallingMessages, message, (a, b) => a.content.seq - b.content.seq); connection.queuedSignallingMessages.splice(idx, 0, message); + // dequeue as much as we can + let hasNewMessageBeenDequeued = false; if (connection.peerCall) { - const hasNewMessageBeenDequeued = this.dequeueSignallingMessages(connection, connection.peerCall, message, syncLog); - if (!hasNewMessageBeenDequeued) { - syncLog.refDetached(connection.logItem.log({l: "queued signalling message", type: message.type, seq: message.content.seq})); - } + hasNewMessageBeenDequeued = this.dequeueSignallingMessages(connection, connection.peerCall, message, log); } - } else if (action === IncomingMessageAction.Ignore && connection.peerCall) { - const logItem = connection.logItem.log({l: "ignoring to_device event with wrong call_id", callId: message.content.call_id, type: message.type}); - syncLog.refDetached(logItem); + if (!hasNewMessageBeenDequeued) { + log.refDetached(connection.logItem.log({l: "queued message", type: message.type, seq: message.content.seq, idx})); + } + } else { + // TODO: the right thing to do here would be to at least enqueue the message rather than drop it, + // and if it's up to the other end to send the invite and the type is an invite to actually + // call connect and assume the m.call.member state update is on its way? + syncLog.log({l: "member not connected", userId: this.userId, deviceId: this.deviceId}); } - } else { - syncLog.log({l: "member not connected", userId: this.userId, deviceId: this.deviceId}); - } + }); }); } @@ -383,16 +396,26 @@ export class Member { let hasNewMessageBeenDequeued = false; while (connection.canDequeueNextSignallingMessage) { const message = connection.queuedSignallingMessages.shift()!; - if (message === newMessage) { - hasNewMessageBeenDequeued = true; - } - // ignore items in the queue that should not be handled and prevent - // the lastProcessedSeqNr being corrupted with the `seq` for other call ids - if (peerCall.getMessageAction(message) === IncomingMessageAction.Handle) { - const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem); - syncLog.refDetached(item); - connection.lastProcessedSeqNr = message.content.seq; - } + const isNewMsg = message === newMessage; + hasNewMessageBeenDequeued = hasNewMessageBeenDequeued || isNewMsg; + syncLog.wrap(isNewMsg ? "process message" : "dequeue message", log => { + const seq = message.content?.seq; + log.set("seq", seq); + log.set("type", message.type); + // ignore items in the queue that should not be handled and prevent + // the lastProcessedSeqNr being corrupted with the `seq` for other call ids + // XXX: Not needed anymore when seq is scoped to call_id + // see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166 + const action = peerCall.getMessageAction(message); + if (action === IncomingMessageAction.Handle) { + const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem); + log.refDetached(item); + connection.lastProcessedSeqNr = seq; + } else { + log.set("ignored", true); + connection.lastIgnoredSeqNr = seq; + } + }); } return hasNewMessageBeenDequeued; } @@ -448,3 +471,113 @@ export function isMemberExpired(callDeviceMembership: CallDeviceMembership, now: const expiresAt = memberExpiresAt(callDeviceMembership); return typeof expiresAt === "number" ? ((expiresAt + margin) <= now) : true; } + +import {ObservableValue} from "../../../observable/value"; +import {Clock as MockClock} from "../../../mocks/Clock"; +import {Instance as NullLoggerInstance} from "../../../logging/NullLogger"; + +export function tests() { + + class MockMedia { + clone(): MockMedia { return this; } + } + + class MockPeerConn { + addEventListener() {} + removeEventListener() {} + setConfiguration() {} + setRemoteDescription() {} + getReceivers() { return [{}]; } // non-empty array + getSenders() { return []; } + addTrack() { return {}; } + removeTrack() {} + close() {} + } + return { + "test queue doesn't get blocked by enqueued, then ignored device message": assert => { + // XXX we might want to refactor the queue code a bit so it's easier to test + // without having to provide so many mocks + const clock = new MockClock(); + // setup logging + const logger = NullLoggerInstance; + // const logger = new Logger({platform: {clock, random: Math.random}}); + // logger.addReporter(new ConsoleReporter()); + + // create member + const callDeviceMembership = { + ["device_id"]: "BVPIHSKXFC", + ["session_id"]: "s1d5863f41ec5a5", + ["expires_ts"]: 123, + feeds: [{purpose: "m.usermedia"}] + }; + const roomMember = RoomMember.fromUserId("!abc", "@bruno4:matrix.org", "join"); + const turnServer = new ObservableValue({}); + const options = { + confId: "conf", + ownUserId: "@foobaraccount2:matrix.org", + ownDeviceId: "CMLEZSARRT", + sessionId: "s1cece7088b9d35", + clock, + emitUpdate: () => {}, + webRTC: { + prepareSenderForPurpose: () => {}, + createPeerConnection() { + return new MockPeerConn(); + } + } + } as Options; + const member = new Member(roomMember, callDeviceMembership, options, logger.child("member")); + member.connect(new MockMedia() as LocalMedia, new MuteSettings(), turnServer, logger.child("connect")); + // pretend we've already received 3 messages + member.connection.lastProcessedSeqNr = 2; + // send hangup with seq=3, this will enqueue the message because there is no peerCall + // as it's up to @bruno4:matrix.org to send the invite + const hangup = { + type: EventType.Hangup, + content: { + "call_id": "c0ac1b0e37afe73", + "version": 1, + "reason": "invite_timeout", + "seq": 3, + "conf_id": "conf-16a120796440a6", + "device_id": "BVPIHSKXFC", + "party_id": "BVPIHSKXFC", + "sender_session_id": "s1d5863f41ec5a5", + "dest_session_id": "s1cece7088b9d35" + } + }; + member.handleDeviceMessage(hangup, logger.child("handle hangup")); + // Send an invite with seq=4, this will create a new peer call with a the call id + // when dequeueing the hangup from before, it'll get ignored because it is + // for the previous call id. + const invite = { + type: EventType.Invite, + content: { + "call_id": "c1175b12d559fb1", + "offer": { + "type": "offer", + "sdp": "..." + }, + "org.matrix.msc3077.sdp_stream_metadata": { + "60087b60-487e-4fa0-8229-b232c18e1332": { + "purpose": "m.usermedia", + "audio_muted": false, + "video_muted": false + } + }, + "version": 1, + "lifetime": 60000, + "seq": 4, + "conf_id": "conf-16a120796440a6", + "device_id": "BVPIHSKXFC", + "party_id": "BVPIHSKXFC", + "sender_session_id": "s1d5863f41ec5a5", + "dest_session_id": "s1cece7088b9d35" + } + }; + member.handleDeviceMessage(invite, logger.child("handle invite")); + assert.equal(member.connection.queuedSignallingMessages.length, 0); + // logger.reporters[0].printOpenItems(); + } + }; +}