diff --git a/src/matrix/calls/PeerCall.ts b/src/matrix/calls/PeerCall.ts index fc5b48a6..d004dedc 100644 --- a/src/matrix/calls/PeerCall.ts +++ b/src/matrix/calls/PeerCall.ts @@ -53,6 +53,12 @@ export type Options = { sendSignallingMessage: (message: SignallingMessage, log: ILogItem) => Promise; }; +export enum IncomingMessageAction { + InviteGlare, + Handle, + Ignore +}; + export class RemoteMedia { constructor(public userMedia?: Stream | undefined, public screenShare?: Stream | undefined) {} } @@ -299,6 +305,17 @@ export class PeerCall implements IDisposable { await this.sendHangupWithCallId(this.callId, errorCode, log); } + getMessageAction(message: SignallingMessage): IncomingMessageAction { + const callIdMatches = this.callId === message.content.call_id; + if (message.type === EventType.Invite && !callIdMatches) { + return IncomingMessageAction.InviteGlare; + } if (callIdMatches) { + return IncomingMessageAction.Handle; + } else { + return IncomingMessageAction.Ignore; + } + } + handleIncomingSignallingMessage(message: SignallingMessage, partyId: PartyId, log: ILogItem): ILogItem { // return logItem item immediately so it can be references in sync manner let logItem; @@ -309,40 +326,35 @@ export class PeerCall implements IDisposable { payload: message.content }, async log => { logItem = log; - - const callIdMatches = this.callId === message.content.call_id; - - if (message.type === EventType.Invite && !callIdMatches) { - await this.handleInviteGlare(message.content, partyId, log); - } else if (callIdMatches) { - switch (message.type) { - case EventType.Invite: - await this.handleFirstInvite(message.content, partyId, log); - break; - case EventType.Answer: - await this.handleAnswer(message.content, partyId, log); - break; - case EventType.Negotiate: - await this.onNegotiateReceived(message.content, log); - break; - case EventType.Candidates: - await this.handleRemoteIceCandidates(message.content, partyId, log); - break; - case EventType.SDPStreamMetadataChanged: - case EventType.SDPStreamMetadataChangedPrefix: - this.updateRemoteSDPStreamMetadata(message.content[SDPStreamMetadataKey], log); - break; - case EventType.Hangup: - // TODO: this is a bit hacky, double check its what we need - log.set("reason", message.content.reason); - this.terminate(CallParty.Remote, message.content.reason ?? CallErrorCode.UserHangup, log); - break; - default: - log.log(`Unknown event type for call: ${message.type}`); - break; - } - } else if (!callIdMatches) { + if (this.getMessageAction(message) !== IncomingMessageAction.Handle) { log.set("wrongCallId", true); + return; + } + switch (message.type) { + case EventType.Invite: + await this.handleFirstInvite(message.content, partyId, log); + break; + case EventType.Answer: + await this.handleAnswer(message.content, partyId, log); + break; + case EventType.Negotiate: + await this.onNegotiateReceived(message.content, log); + break; + case EventType.Candidates: + await this.handleRemoteIceCandidates(message.content, partyId, log); + break; + case EventType.SDPStreamMetadataChanged: + case EventType.SDPStreamMetadataChangedPrefix: + this.updateRemoteSDPStreamMetadata(message.content[SDPStreamMetadataKey], log); + break; + case EventType.Hangup: + // TODO: this is a bit hacky, double check its what we need + log.set("reason", message.content.reason); + this.terminate(CallParty.Remote, message.content.reason ?? CallErrorCode.UserHangup, log); + break; + default: + log.log(`Unknown event type for call: ${message.type}`); + break; } }); return logItem; @@ -434,30 +446,43 @@ export class PeerCall implements IDisposable { } }; - private async handleInviteGlare(content: MCallInvite, partyId: PartyId, log: ILogItem): Promise { - // this is only called when the ids are different - const newCallId = content.call_id; - if (this.callId! > newCallId) { - log.log( - "Glare detected: answering incoming call " + newCallId + - " and canceling outgoing call " - ); - // How do we interrupt `call()`? well, perhaps we need to not just await InviteSent but also CreateAnswer? - if (this._state === CallState.Fledgling || this._state === CallState.CreateOffer) { - // TODO: don't send invite! - } else { - await this.sendHangupWithCallId(this.callId, CallErrorCode.Replaced, log); - } - await this.handleInvite(content, partyId, log); - // TODO: need to skip state check - await this.answer(this.localMedia!, this.localMuteSettings!, log); - } else { - log.log( - "Glare detected: rejecting incoming call " + newCallId + - " and keeping outgoing call " - ); - await this.sendHangupWithCallId(newCallId, CallErrorCode.Replaced, log); + /** + * @returns {boolean} whether or not this call should be replaced + * */ + handleInviteGlare(message: SignallingMessage, partyId: PartyId, log: ILogItem): {shouldReplace: boolean, log?: ILogItem} { + if (message.type !== EventType.Invite) { + return {shouldReplace: false}; } + + const {content} = message; + const newCallId = content.call_id; + const shouldReplace = this.callId! > newCallId; + + let logItem; + log.wrap("handling call glare", async log => { + logItem = log; + if (shouldReplace) { + log.log( + "Glare detected: answering incoming call " + newCallId + + " and canceling outgoing call " + ); + // TODO: How do we interrupt `call()`? well, perhaps we need to not just await InviteSent but also CreateAnswer? + if (this._state !== CallState.Fledgling && this._state !== CallState.CreateOffer) { + await this.sendHangupWithCallId(this.callId, CallErrorCode.Replaced, log); + } + // since this method isn't awaited, we dispose ourselves once we hung up + this.close(CallErrorCode.Replaced, log); + this.dispose(); + } else { + log.log( + "Glare detected: rejecting incoming call " + newCallId + + " and keeping outgoing call " + ); + await this.sendHangupWithCallId(newCallId, CallErrorCode.Replaced, log); + } + }); + + return {shouldReplace, log: logItem}; } private handleHangupReceived(content: MCallHangupReject, log: ILogItem) { diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index 9e87fa22..98e2aa4d 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -import {PeerCall, CallState} from "../PeerCall"; +import {PeerCall, CallState, IncomingMessageAction} from "../PeerCall"; import {makeTxnId, makeId} from "../../common"; import {EventType, CallErrorCode} from "../callEventTypes"; import {formatToDeviceMessagesPayload} from "../../common"; @@ -251,7 +251,7 @@ export class Member { } /** @internal */ - handleDeviceMessage(message: SignallingMessage, syncLog: ILogItem): void{ + handleDeviceMessage(message: SignallingMessage, syncLog: ILogItem): void { const {connection} = this; if (connection) { const destSessionId = message.content.dest_session_id; @@ -260,36 +260,65 @@ export class Member { syncLog.refDetached(logItem); return; } + // if there is no peerCall, we either create it with an invite and Handle is implied or we'll ignore it + let action = IncomingMessageAction.Handle; + if (connection.peerCall) { + action = connection.peerCall.getMessageAction(message); + // deal with glare and replacing the call before creating new calls + if (action === IncomingMessageAction.InviteGlare) { + const {shouldReplace, log} = connection.peerCall.handleInviteGlare(message, this.deviceId, connection.logItem); + if (log) { + syncLog.refDetached(log); + } + if (shouldReplace) { + connection.peerCall = undefined; + } + } + } if (message.type === EventType.Invite && !connection.peerCall) { connection.peerCall = this._createPeerCall(message.content.call_id); } - const idx = sortedIndex(connection.queuedSignallingMessages, message, (a, b) => a.content.seq - b.content.seq); - connection.queuedSignallingMessages.splice(idx, 0, message); - let hasBeenDequeued = false; - if (connection.peerCall) { - while ( - connection.queuedSignallingMessages.length && ( - connection.lastProcessedSeqNr === undefined || - connection.queuedSignallingMessages[0].content.seq === connection.lastProcessedSeqNr + 1 - ) - ) { - const dequeuedMessage = connection.queuedSignallingMessages.shift()!; - if (dequeuedMessage === message) { - hasBeenDequeued = true; + if (action === IncomingMessageAction.Handle) { + const idx = sortedIndex(connection.queuedSignallingMessages, message, (a, b) => a.content.seq - b.content.seq); + connection.queuedSignallingMessages.splice(idx, 0, message); + if (connection.peerCall) { + const hasNewMessageBeenDequeued = this.dequeueSignallingMessages(connection, connection.peerCall, message, syncLog); + if (!hasNewMessageBeenDequeued) { + syncLog.refDetached(connection.logItem.log({l: "queued signalling message", type: message.type, seq: message.content.seq})); } - const item = connection.peerCall!.handleIncomingSignallingMessage(dequeuedMessage, this.deviceId, connection.logItem); - syncLog.refDetached(item); - connection.lastProcessedSeqNr = dequeuedMessage.content.seq; } - } - if (!hasBeenDequeued) { - syncLog.refDetached(connection.logItem.log({l: "queued signalling message", type: message.type, seq: message.content.seq})); + } else if (action === IncomingMessageAction.Ignore && connection.peerCall) { + const logItem = connection.logItem.log({l: "ignoring to_device event with wrong call_id", callId: message.content.call_id, type: message.type}); + syncLog.refDetached(logItem); } } else { syncLog.log({l: "member not connected", userId: this.userId, deviceId: this.deviceId}); } } + private dequeueSignallingMessages(connection: MemberConnection, peerCall: PeerCall, newMessage: SignallingMessage, syncLog: ILogItem): boolean { + let hasNewMessageBeenDequeued = false; + while ( + connection.queuedSignallingMessages.length && ( + connection.lastProcessedSeqNr === undefined || + connection.queuedSignallingMessages[0].content.seq === connection.lastProcessedSeqNr + 1 + ) + ) { + const message = connection.queuedSignallingMessages.shift()!; + if (message === newMessage) { + hasNewMessageBeenDequeued = true; + } + // ignore items in the queue that should not be handled and prevent + // the lastProcessedSeqNr being corrupted with the `seq` for other call ids + if (peerCall.getMessageAction(message) === IncomingMessageAction.Handle) { + const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem); + syncLog.refDetached(item); + connection.lastProcessedSeqNr = message.content.seq; + } + } + return hasNewMessageBeenDequeued; + } + /** @internal */ async setMedia(localMedia: LocalMedia, previousMedia: LocalMedia): Promise { const {connection} = this;