mirror of
https://github.com/vector-im/hydrogen-web.git
synced 2024-12-23 11:35:04 +01:00
Merge pull request #1017 from vector-im/bwindels/fix-1015
Improve `seq` handling to prevent queue blocking for call signalling messages
This commit is contained in:
commit
a49c9c17c0
@ -303,7 +303,7 @@ export class PeerCall implements IDisposable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
handleIncomingSignallingMessage<B extends MCallBase>(message: SignallingMessage<B>, partyId: PartyId, log: ILogItem): ILogItem {
|
handleIncomingSignallingMessage<B extends MCallBase>(message: SignallingMessage<B>, partyId: PartyId, log: ILogItem): ILogItem {
|
||||||
// return logItem item immediately so it can be references in sync manner
|
// return logItem item immediately so it can be referenced by the sync log
|
||||||
let logItem;
|
let logItem;
|
||||||
log.wrap({
|
log.wrap({
|
||||||
l: "receive signalling message",
|
l: "receive signalling message",
|
||||||
|
@ -21,13 +21,13 @@ import {formatToDeviceMessagesPayload} from "../../common";
|
|||||||
import {sortedIndex} from "../../../utils/sortedIndex";
|
import {sortedIndex} from "../../../utils/sortedIndex";
|
||||||
import { ErrorBoundary } from "../../../utils/ErrorBoundary";
|
import { ErrorBoundary } from "../../../utils/ErrorBoundary";
|
||||||
|
|
||||||
import type {MuteSettings} from "../common";
|
import {MuteSettings} from "../common";
|
||||||
import type {Options as PeerCallOptions, RemoteMedia} from "../PeerCall";
|
import type {Options as PeerCallOptions, RemoteMedia} from "../PeerCall";
|
||||||
import type {LocalMedia} from "../LocalMedia";
|
import type {LocalMedia} from "../LocalMedia";
|
||||||
import type {HomeServerApi} from "../../net/HomeServerApi";
|
import type {HomeServerApi} from "../../net/HomeServerApi";
|
||||||
import type {MCallBase, MGroupCallBase, SignallingMessage, CallDeviceMembership} from "../callEventTypes";
|
import type {MCallBase, MGroupCallBase, SignallingMessage, CallDeviceMembership} from "../callEventTypes";
|
||||||
import type {GroupCall} from "./GroupCall";
|
import type {GroupCall} from "./GroupCall";
|
||||||
import type {RoomMember} from "../../room/members/RoomMember";
|
import {RoomMember} from "../../room/members/RoomMember";
|
||||||
import type {EncryptedMessage} from "../../e2ee/olm/Encryption";
|
import type {EncryptedMessage} from "../../e2ee/olm/Encryption";
|
||||||
import type {ILogItem} from "../../../logging/types";
|
import type {ILogItem} from "../../../logging/types";
|
||||||
import type {BaseObservableValue} from "../../../observable/value";
|
import type {BaseObservableValue} from "../../../observable/value";
|
||||||
@ -59,6 +59,9 @@ class MemberConnection {
|
|||||||
public retryCount: number = 0;
|
public retryCount: number = 0;
|
||||||
public peerCall?: PeerCall;
|
public peerCall?: PeerCall;
|
||||||
public lastProcessedSeqNr: number | undefined;
|
public lastProcessedSeqNr: number | undefined;
|
||||||
|
// XXX: Not needed anymore when seq is scoped to call_id
|
||||||
|
// see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166
|
||||||
|
public lastIgnoredSeqNr: number | undefined;
|
||||||
public queuedSignallingMessages: SignallingMessage<MGroupCallBase>[] = [];
|
public queuedSignallingMessages: SignallingMessage<MGroupCallBase>[] = [];
|
||||||
public outboundSeqCounter: number = 0;
|
public outboundSeqCounter: number = 0;
|
||||||
|
|
||||||
@ -73,16 +76,23 @@ class MemberConnection {
|
|||||||
if (this.queuedSignallingMessages.length === 0) {
|
if (this.queuedSignallingMessages.length === 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (this.lastProcessedSeqNr === undefined) {
|
const first = this.queuedSignallingMessages[0];
|
||||||
|
const firstSeq = first.content.seq;
|
||||||
|
// prevent not being able to jump over seq values of ignored messages for other call ids
|
||||||
|
// as they don't increase lastProcessedSeqNr.
|
||||||
|
if (this.lastIgnoredSeqNr !== undefined && firstSeq === this.lastIgnoredSeqNr + 1) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
const first = this.queuedSignallingMessages[0];
|
if (this.lastProcessedSeqNr === undefined) {
|
||||||
|
return firstSeq === 0;
|
||||||
|
}
|
||||||
// allow messages with both a seq we've just seen and
|
// allow messages with both a seq we've just seen and
|
||||||
// the next one to be dequeued as it can happen
|
// the next one to be dequeued as it can happen
|
||||||
// that messages for other callIds (which could repeat seq)
|
// that messages for other callIds (which could repeat seq)
|
||||||
// are present in the queue
|
// are present in the queue
|
||||||
return first.content.seq === this.lastProcessedSeqNr ||
|
// XXX: Not needed anymore when seq is scoped to call_id
|
||||||
first.content.seq === this.lastProcessedSeqNr + 1;
|
// see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166
|
||||||
|
return firstSeq <= (this.lastProcessedSeqNr + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
dispose() {
|
dispose() {
|
||||||
@ -333,66 +343,79 @@ export class Member {
|
|||||||
/** @internal */
|
/** @internal */
|
||||||
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, syncLog: ILogItem): void {
|
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, syncLog: ILogItem): void {
|
||||||
this.errorBoundary.try(() => {
|
this.errorBoundary.try(() => {
|
||||||
|
syncLog.wrap({l: "Member.handleDeviceMessage", type: message.type, seq: message.content?.seq}, log => {
|
||||||
const {connection} = this;
|
const {connection} = this;
|
||||||
if (connection) {
|
if (connection) {
|
||||||
const destSessionId = message.content.dest_session_id;
|
const destSessionId = message.content.dest_session_id;
|
||||||
if (destSessionId !== this.options.sessionId) {
|
if (destSessionId !== this.options.sessionId) {
|
||||||
const logItem = connection.logItem.log({l: "ignoring to_device event with wrong session_id", destSessionId, type: message.type});
|
const logItem = connection.logItem.log({l: "ignoring to_device event with wrong session_id", destSessionId, type: message.type});
|
||||||
syncLog.refDetached(logItem);
|
log.refDetached(logItem);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// if there is no peerCall, we either create it with an invite and Handle is implied or we'll ignore it
|
// if there is no peerCall, we either create it with an invite and Handle is implied or we'll ignore it
|
||||||
let action = IncomingMessageAction.Handle;
|
|
||||||
if (connection.peerCall) {
|
if (connection.peerCall) {
|
||||||
action = connection.peerCall.getMessageAction(message);
|
const action = connection.peerCall.getMessageAction(message);
|
||||||
// deal with glare and replacing the call before creating new calls
|
// deal with glare and replacing the call before creating new calls
|
||||||
if (action === IncomingMessageAction.InviteGlare) {
|
if (action === IncomingMessageAction.InviteGlare) {
|
||||||
const {shouldReplace, log} = connection.peerCall.handleInviteGlare(message, this.deviceId, connection.logItem);
|
const {shouldReplace, log} = connection.peerCall.handleInviteGlare(message, this.deviceId, connection.logItem);
|
||||||
if (log) {
|
if (log) {
|
||||||
syncLog.refDetached(log);
|
log.refDetached(log);
|
||||||
}
|
}
|
||||||
if (shouldReplace) {
|
if (shouldReplace) {
|
||||||
|
connection.peerCall.dispose();
|
||||||
connection.peerCall = undefined;
|
connection.peerCall = undefined;
|
||||||
action = IncomingMessageAction.Handle;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// create call on invite
|
||||||
if (message.type === EventType.Invite && !connection.peerCall) {
|
if (message.type === EventType.Invite && !connection.peerCall) {
|
||||||
connection.peerCall = this._createPeerCall(message.content.call_id);
|
connection.peerCall = this._createPeerCall(message.content.call_id);
|
||||||
}
|
}
|
||||||
if (action === IncomingMessageAction.Handle) {
|
// enqueue
|
||||||
const idx = sortedIndex(connection.queuedSignallingMessages, message, (a, b) => a.content.seq - b.content.seq);
|
const idx = sortedIndex(connection.queuedSignallingMessages, message, (a, b) => a.content.seq - b.content.seq);
|
||||||
connection.queuedSignallingMessages.splice(idx, 0, message);
|
connection.queuedSignallingMessages.splice(idx, 0, message);
|
||||||
|
// dequeue as much as we can
|
||||||
|
let hasNewMessageBeenDequeued = false;
|
||||||
if (connection.peerCall) {
|
if (connection.peerCall) {
|
||||||
const hasNewMessageBeenDequeued = this.dequeueSignallingMessages(connection, connection.peerCall, message, syncLog);
|
hasNewMessageBeenDequeued = this.dequeueSignallingMessages(connection, connection.peerCall, message, log);
|
||||||
|
}
|
||||||
if (!hasNewMessageBeenDequeued) {
|
if (!hasNewMessageBeenDequeued) {
|
||||||
syncLog.refDetached(connection.logItem.log({l: "queued signalling message", type: message.type, seq: message.content.seq}));
|
log.refDetached(connection.logItem.log({l: "queued message", type: message.type, seq: message.content.seq, idx}));
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (action === IncomingMessageAction.Ignore && connection.peerCall) {
|
|
||||||
const logItem = connection.logItem.log({l: "ignoring to_device event with wrong call_id", callId: message.content.call_id, type: message.type});
|
|
||||||
syncLog.refDetached(logItem);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// TODO: the right thing to do here would be to at least enqueue the message rather than drop it,
|
||||||
|
// and if it's up to the other end to send the invite and the type is an invite to actually
|
||||||
|
// call connect and assume the m.call.member state update is on its way?
|
||||||
syncLog.log({l: "member not connected", userId: this.userId, deviceId: this.deviceId});
|
syncLog.log({l: "member not connected", userId: this.userId, deviceId: this.deviceId});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private dequeueSignallingMessages(connection: MemberConnection, peerCall: PeerCall, newMessage: SignallingMessage<MGroupCallBase>, syncLog: ILogItem): boolean {
|
private dequeueSignallingMessages(connection: MemberConnection, peerCall: PeerCall, newMessage: SignallingMessage<MGroupCallBase>, syncLog: ILogItem): boolean {
|
||||||
let hasNewMessageBeenDequeued = false;
|
let hasNewMessageBeenDequeued = false;
|
||||||
while (connection.canDequeueNextSignallingMessage) {
|
while (connection.canDequeueNextSignallingMessage) {
|
||||||
const message = connection.queuedSignallingMessages.shift()!;
|
const message = connection.queuedSignallingMessages.shift()!;
|
||||||
if (message === newMessage) {
|
const isNewMsg = message === newMessage;
|
||||||
hasNewMessageBeenDequeued = true;
|
hasNewMessageBeenDequeued = hasNewMessageBeenDequeued || isNewMsg;
|
||||||
}
|
syncLog.wrap(isNewMsg ? "process message" : "dequeue message", log => {
|
||||||
|
const seq = message.content?.seq;
|
||||||
|
log.set("seq", seq);
|
||||||
|
log.set("type", message.type);
|
||||||
// ignore items in the queue that should not be handled and prevent
|
// ignore items in the queue that should not be handled and prevent
|
||||||
// the lastProcessedSeqNr being corrupted with the `seq` for other call ids
|
// the lastProcessedSeqNr being corrupted with the `seq` for other call ids
|
||||||
if (peerCall.getMessageAction(message) === IncomingMessageAction.Handle) {
|
// XXX: Not needed anymore when seq is scoped to call_id
|
||||||
|
// see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166
|
||||||
|
const action = peerCall.getMessageAction(message);
|
||||||
|
if (action === IncomingMessageAction.Handle) {
|
||||||
const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem);
|
const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem);
|
||||||
syncLog.refDetached(item);
|
log.refDetached(item);
|
||||||
connection.lastProcessedSeqNr = message.content.seq;
|
connection.lastProcessedSeqNr = seq;
|
||||||
|
} else {
|
||||||
|
log.set("ignored", true);
|
||||||
|
connection.lastIgnoredSeqNr = seq;
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
return hasNewMessageBeenDequeued;
|
return hasNewMessageBeenDequeued;
|
||||||
}
|
}
|
||||||
@ -448,3 +471,113 @@ export function isMemberExpired(callDeviceMembership: CallDeviceMembership, now:
|
|||||||
const expiresAt = memberExpiresAt(callDeviceMembership);
|
const expiresAt = memberExpiresAt(callDeviceMembership);
|
||||||
return typeof expiresAt === "number" ? ((expiresAt + margin) <= now) : true;
|
return typeof expiresAt === "number" ? ((expiresAt + margin) <= now) : true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
import {ObservableValue} from "../../../observable/value";
|
||||||
|
import {Clock as MockClock} from "../../../mocks/Clock";
|
||||||
|
import {Instance as NullLoggerInstance} from "../../../logging/NullLogger";
|
||||||
|
|
||||||
|
export function tests() {
|
||||||
|
|
||||||
|
class MockMedia {
|
||||||
|
clone(): MockMedia { return this; }
|
||||||
|
}
|
||||||
|
|
||||||
|
class MockPeerConn {
|
||||||
|
addEventListener() {}
|
||||||
|
removeEventListener() {}
|
||||||
|
setConfiguration() {}
|
||||||
|
setRemoteDescription() {}
|
||||||
|
getReceivers() { return [{}]; } // non-empty array
|
||||||
|
getSenders() { return []; }
|
||||||
|
addTrack() { return {}; }
|
||||||
|
removeTrack() {}
|
||||||
|
close() {}
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
"test queue doesn't get blocked by enqueued, then ignored device message": assert => {
|
||||||
|
// XXX we might want to refactor the queue code a bit so it's easier to test
|
||||||
|
// without having to provide so many mocks
|
||||||
|
const clock = new MockClock();
|
||||||
|
// setup logging
|
||||||
|
const logger = NullLoggerInstance;
|
||||||
|
// const logger = new Logger({platform: {clock, random: Math.random}});
|
||||||
|
// logger.addReporter(new ConsoleReporter());
|
||||||
|
|
||||||
|
// create member
|
||||||
|
const callDeviceMembership = {
|
||||||
|
["device_id"]: "BVPIHSKXFC",
|
||||||
|
["session_id"]: "s1d5863f41ec5a5",
|
||||||
|
["expires_ts"]: 123,
|
||||||
|
feeds: [{purpose: "m.usermedia"}]
|
||||||
|
};
|
||||||
|
const roomMember = RoomMember.fromUserId("!abc", "@bruno4:matrix.org", "join");
|
||||||
|
const turnServer = new ObservableValue({});
|
||||||
|
const options = {
|
||||||
|
confId: "conf",
|
||||||
|
ownUserId: "@foobaraccount2:matrix.org",
|
||||||
|
ownDeviceId: "CMLEZSARRT",
|
||||||
|
sessionId: "s1cece7088b9d35",
|
||||||
|
clock,
|
||||||
|
emitUpdate: () => {},
|
||||||
|
webRTC: {
|
||||||
|
prepareSenderForPurpose: () => {},
|
||||||
|
createPeerConnection() {
|
||||||
|
return new MockPeerConn();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} as Options;
|
||||||
|
const member = new Member(roomMember, callDeviceMembership, options, logger.child("member"));
|
||||||
|
member.connect(new MockMedia() as LocalMedia, new MuteSettings(), turnServer, logger.child("connect"));
|
||||||
|
// pretend we've already received 3 messages
|
||||||
|
member.connection.lastProcessedSeqNr = 2;
|
||||||
|
// send hangup with seq=3, this will enqueue the message because there is no peerCall
|
||||||
|
// as it's up to @bruno4:matrix.org to send the invite
|
||||||
|
const hangup = {
|
||||||
|
type: EventType.Hangup,
|
||||||
|
content: {
|
||||||
|
"call_id": "c0ac1b0e37afe73",
|
||||||
|
"version": 1,
|
||||||
|
"reason": "invite_timeout",
|
||||||
|
"seq": 3,
|
||||||
|
"conf_id": "conf-16a120796440a6",
|
||||||
|
"device_id": "BVPIHSKXFC",
|
||||||
|
"party_id": "BVPIHSKXFC",
|
||||||
|
"sender_session_id": "s1d5863f41ec5a5",
|
||||||
|
"dest_session_id": "s1cece7088b9d35"
|
||||||
|
}
|
||||||
|
};
|
||||||
|
member.handleDeviceMessage(hangup, logger.child("handle hangup"));
|
||||||
|
// Send an invite with seq=4, this will create a new peer call with a the call id
|
||||||
|
// when dequeueing the hangup from before, it'll get ignored because it is
|
||||||
|
// for the previous call id.
|
||||||
|
const invite = {
|
||||||
|
type: EventType.Invite,
|
||||||
|
content: {
|
||||||
|
"call_id": "c1175b12d559fb1",
|
||||||
|
"offer": {
|
||||||
|
"type": "offer",
|
||||||
|
"sdp": "..."
|
||||||
|
},
|
||||||
|
"org.matrix.msc3077.sdp_stream_metadata": {
|
||||||
|
"60087b60-487e-4fa0-8229-b232c18e1332": {
|
||||||
|
"purpose": "m.usermedia",
|
||||||
|
"audio_muted": false,
|
||||||
|
"video_muted": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"version": 1,
|
||||||
|
"lifetime": 60000,
|
||||||
|
"seq": 4,
|
||||||
|
"conf_id": "conf-16a120796440a6",
|
||||||
|
"device_id": "BVPIHSKXFC",
|
||||||
|
"party_id": "BVPIHSKXFC",
|
||||||
|
"sender_session_id": "s1d5863f41ec5a5",
|
||||||
|
"dest_session_id": "s1cece7088b9d35"
|
||||||
|
}
|
||||||
|
};
|
||||||
|
member.handleDeviceMessage(invite, logger.child("handle invite"));
|
||||||
|
assert.equal(member.connection.queuedSignallingMessages.length, 0);
|
||||||
|
// logger.reporters[0].printOpenItems();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user