don't queue messages for different callIds so last seq doesn't corrupt

this includes handling invite glares differently
This commit is contained in:
Bruno Windels 2022-09-20 17:27:39 +02:00
parent 3346f68d25
commit bb2e63b05b
2 changed files with 131 additions and 77 deletions

View File

@ -53,6 +53,12 @@ export type Options = {
sendSignallingMessage: (message: SignallingMessage<MCallBase>, log: ILogItem) => Promise<void>; sendSignallingMessage: (message: SignallingMessage<MCallBase>, log: ILogItem) => Promise<void>;
}; };
export enum IncomingMessageAction {
InviteGlare,
Handle,
Ignore
};
export class RemoteMedia { export class RemoteMedia {
constructor(public userMedia?: Stream | undefined, public screenShare?: Stream | undefined) {} constructor(public userMedia?: Stream | undefined, public screenShare?: Stream | undefined) {}
} }
@ -299,6 +305,17 @@ export class PeerCall implements IDisposable {
await this.sendHangupWithCallId(this.callId, errorCode, log); await this.sendHangupWithCallId(this.callId, errorCode, log);
} }
getMessageAction<B extends MCallBase>(message: SignallingMessage<B>): IncomingMessageAction {
const callIdMatches = this.callId === message.content.call_id;
if (message.type === EventType.Invite && !callIdMatches) {
return IncomingMessageAction.InviteGlare;
} if (callIdMatches) {
return IncomingMessageAction.Handle;
} else {
return IncomingMessageAction.Ignore;
}
}
handleIncomingSignallingMessage<B extends MCallBase>(message: SignallingMessage<B>, partyId: PartyId, log: ILogItem): ILogItem { handleIncomingSignallingMessage<B extends MCallBase>(message: SignallingMessage<B>, partyId: PartyId, log: ILogItem): ILogItem {
// return logItem item immediately so it can be references in sync manner // return logItem item immediately so it can be references in sync manner
let logItem; let logItem;
@ -309,12 +326,10 @@ export class PeerCall implements IDisposable {
payload: message.content payload: message.content
}, async log => { }, async log => {
logItem = log; logItem = log;
if (this.getMessageAction(message) !== IncomingMessageAction.Handle) {
const callIdMatches = this.callId === message.content.call_id; log.set("wrongCallId", true);
return;
if (message.type === EventType.Invite && !callIdMatches) { }
await this.handleInviteGlare(message.content, partyId, log);
} else if (callIdMatches) {
switch (message.type) { switch (message.type) {
case EventType.Invite: case EventType.Invite:
await this.handleFirstInvite(message.content, partyId, log); await this.handleFirstInvite(message.content, partyId, log);
@ -341,9 +356,6 @@ export class PeerCall implements IDisposable {
log.log(`Unknown event type for call: ${message.type}`); log.log(`Unknown event type for call: ${message.type}`);
break; break;
} }
} else if (!callIdMatches) {
log.set("wrongCallId", true);
}
}); });
return logItem; return logItem;
} }
@ -434,23 +446,33 @@ export class PeerCall implements IDisposable {
} }
}; };
private async handleInviteGlare(content: MCallInvite<MCallBase>, partyId: PartyId, log: ILogItem): Promise<void> { /**
// this is only called when the ids are different * @returns {boolean} whether or not this call should be replaced
* */
handleInviteGlare<B extends MCallBase>(message: SignallingMessage<B>, partyId: PartyId, log: ILogItem): {shouldReplace: boolean, log?: ILogItem} {
if (message.type !== EventType.Invite) {
return {shouldReplace: false};
}
const {content} = message;
const newCallId = content.call_id; const newCallId = content.call_id;
if (this.callId! > newCallId) { const shouldReplace = this.callId! > newCallId;
let logItem;
log.wrap("handling call glare", async log => {
logItem = log;
if (shouldReplace) {
log.log( log.log(
"Glare detected: answering incoming call " + newCallId + "Glare detected: answering incoming call " + newCallId +
" and canceling outgoing call " " and canceling outgoing call "
); );
// How do we interrupt `call()`? well, perhaps we need to not just await InviteSent but also CreateAnswer? // TODO: How do we interrupt `call()`? well, perhaps we need to not just await InviteSent but also CreateAnswer?
if (this._state === CallState.Fledgling || this._state === CallState.CreateOffer) { if (this._state !== CallState.Fledgling && this._state !== CallState.CreateOffer) {
// TODO: don't send invite!
} else {
await this.sendHangupWithCallId(this.callId, CallErrorCode.Replaced, log); await this.sendHangupWithCallId(this.callId, CallErrorCode.Replaced, log);
} }
await this.handleInvite(content, partyId, log); // since this method isn't awaited, we dispose ourselves once we hung up
// TODO: need to skip state check this.close(CallErrorCode.Replaced, log);
await this.answer(this.localMedia!, this.localMuteSettings!, log); this.dispose();
} else { } else {
log.log( log.log(
"Glare detected: rejecting incoming call " + newCallId + "Glare detected: rejecting incoming call " + newCallId +
@ -458,6 +480,9 @@ export class PeerCall implements IDisposable {
); );
await this.sendHangupWithCallId(newCallId, CallErrorCode.Replaced, log); await this.sendHangupWithCallId(newCallId, CallErrorCode.Replaced, log);
} }
});
return {shouldReplace, log: logItem};
} }
private handleHangupReceived(content: MCallHangupReject<MCallBase>, log: ILogItem) { private handleHangupReceived(content: MCallHangupReject<MCallBase>, log: ILogItem) {

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
import {PeerCall, CallState} from "../PeerCall"; import {PeerCall, CallState, IncomingMessageAction} from "../PeerCall";
import {makeTxnId, makeId} from "../../common"; import {makeTxnId, makeId} from "../../common";
import {EventType, CallErrorCode} from "../callEventTypes"; import {EventType, CallErrorCode} from "../callEventTypes";
import {formatToDeviceMessagesPayload} from "../../common"; import {formatToDeviceMessagesPayload} from "../../common";
@ -251,7 +251,7 @@ export class Member {
} }
/** @internal */ /** @internal */
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, syncLog: ILogItem): void{ handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, syncLog: ILogItem): void {
const {connection} = this; const {connection} = this;
if (connection) { if (connection) {
const destSessionId = message.content.dest_session_id; const destSessionId = message.content.dest_session_id;
@ -260,34 +260,63 @@ export class Member {
syncLog.refDetached(logItem); syncLog.refDetached(logItem);
return; return;
} }
// if there is no peerCall, we either create it with an invite and Handle is implied or we'll ignore it
let action = IncomingMessageAction.Handle;
if (connection.peerCall) {
action = connection.peerCall.getMessageAction(message);
// deal with glare and replacing the call before creating new calls
if (action === IncomingMessageAction.InviteGlare) {
const {shouldReplace, log} = connection.peerCall.handleInviteGlare(message, this.deviceId, connection.logItem);
if (log) {
syncLog.refDetached(log);
}
if (shouldReplace) {
connection.peerCall = undefined;
}
}
}
if (message.type === EventType.Invite && !connection.peerCall) { if (message.type === EventType.Invite && !connection.peerCall) {
connection.peerCall = this._createPeerCall(message.content.call_id); connection.peerCall = this._createPeerCall(message.content.call_id);
} }
if (action === IncomingMessageAction.Handle) {
const idx = sortedIndex(connection.queuedSignallingMessages, message, (a, b) => a.content.seq - b.content.seq); const idx = sortedIndex(connection.queuedSignallingMessages, message, (a, b) => a.content.seq - b.content.seq);
connection.queuedSignallingMessages.splice(idx, 0, message); connection.queuedSignallingMessages.splice(idx, 0, message);
let hasBeenDequeued = false;
if (connection.peerCall) { if (connection.peerCall) {
const hasNewMessageBeenDequeued = this.dequeueSignallingMessages(connection, connection.peerCall, message, syncLog);
if (!hasNewMessageBeenDequeued) {
syncLog.refDetached(connection.logItem.log({l: "queued signalling message", type: message.type, seq: message.content.seq}));
}
}
} else if (action === IncomingMessageAction.Ignore && connection.peerCall) {
const logItem = connection.logItem.log({l: "ignoring to_device event with wrong call_id", callId: message.content.call_id, type: message.type});
syncLog.refDetached(logItem);
}
} else {
syncLog.log({l: "member not connected", userId: this.userId, deviceId: this.deviceId});
}
}
private dequeueSignallingMessages(connection: MemberConnection, peerCall: PeerCall, newMessage: SignallingMessage<MGroupCallBase>, syncLog: ILogItem): boolean {
let hasNewMessageBeenDequeued = false;
while ( while (
connection.queuedSignallingMessages.length && ( connection.queuedSignallingMessages.length && (
connection.lastProcessedSeqNr === undefined || connection.lastProcessedSeqNr === undefined ||
connection.queuedSignallingMessages[0].content.seq === connection.lastProcessedSeqNr + 1 connection.queuedSignallingMessages[0].content.seq === connection.lastProcessedSeqNr + 1
) )
) { ) {
const dequeuedMessage = connection.queuedSignallingMessages.shift()!; const message = connection.queuedSignallingMessages.shift()!;
if (dequeuedMessage === message) { if (message === newMessage) {
hasBeenDequeued = true; hasNewMessageBeenDequeued = true;
} }
const item = connection.peerCall!.handleIncomingSignallingMessage(dequeuedMessage, this.deviceId, connection.logItem); // ignore items in the queue that should not be handled and prevent
// the lastProcessedSeqNr being corrupted with the `seq` for other call ids
if (peerCall.getMessageAction(message) === IncomingMessageAction.Handle) {
const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem);
syncLog.refDetached(item); syncLog.refDetached(item);
connection.lastProcessedSeqNr = dequeuedMessage.content.seq; connection.lastProcessedSeqNr = message.content.seq;
} }
} }
if (!hasBeenDequeued) { return hasNewMessageBeenDequeued;
syncLog.refDetached(connection.logItem.log({l: "queued signalling message", type: message.type, seq: message.content.seq}));
}
} else {
syncLog.log({l: "member not connected", userId: this.userId, deviceId: this.deviceId});
}
} }
/** @internal */ /** @internal */