prevent ignored signaling messages from blocking the queue

signaling messages get ignored when they are not for the
currently active call id. In that case we currently don't
advance the lastProcessedSeqNr counter, as we had a problem
before where the counter would be brought out of sync with
seq numbers for other call ids.
However when we've previously processed a signalling message
(e.g. the counter is not undefined) and the first message in the queue
is to be ignored, it will prevent the subsequent messages from being
dequeued as their seq number is more than 1 away from the last
processed seq. This adds an additional counter for ignored seq numbers
that is also used to see if the next message is only 1 away from
the next seq value.

I am adding logging as well here to have a better overview in the future
This commit is contained in:
Bruno Windels 2023-02-07 14:13:49 +01:00
parent 072004a9c2
commit cadeae98bc

View File

@ -59,6 +59,9 @@ class MemberConnection {
public retryCount: number = 0;
public peerCall?: PeerCall;
public lastProcessedSeqNr: number | undefined;
// XXX: Not needed anymore when seq is scoped to call_id
// see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166
public lastIgnoredSeqNr: number | undefined;
public queuedSignallingMessages: SignallingMessage<MGroupCallBase>[] = [];
public outboundSeqCounter: number = 0;
@ -73,16 +76,24 @@ class MemberConnection {
if (this.queuedSignallingMessages.length === 0) {
return false;
}
const first = this.queuedSignallingMessages[0];
const firstSeq = first.content.seq;
// prevent not being able to jump over seq values of ignored messages for other call ids
// as they don't increase lastProcessedSeqNr.
if (this.lastIgnoredSeqNr !== undefined && firstSeq === this.lastIgnoredSeqNr + 1) {
return true;
}
if (this.lastProcessedSeqNr === undefined) {
return true;
}
const first = this.queuedSignallingMessages[0];
// allow messages with both a seq we've just seen and
// the next one to be dequeued as it can happen
// that messages for other callIds (which could repeat seq)
// are present in the queue
return first.content.seq === this.lastProcessedSeqNr ||
first.content.seq === this.lastProcessedSeqNr + 1;
// XXX: Not needed anymore when seq is scoped to call_id
// see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166
return firstSeq === this.lastProcessedSeqNr ||
firstSeq === this.lastProcessedSeqNr + 1;
}
dispose() {
@ -382,17 +393,29 @@ export class Member {
private dequeueSignallingMessages(connection: MemberConnection, peerCall: PeerCall, newMessage: SignallingMessage<MGroupCallBase>, syncLog: ILogItem): boolean {
let hasNewMessageBeenDequeued = false;
while (connection.canDequeueNextSignallingMessage) {
const message = connection.queuedSignallingMessages.shift()!;
if (message === newMessage) {
hasNewMessageBeenDequeued = true;
}
// ignore items in the queue that should not be handled and prevent
// the lastProcessedSeqNr being corrupted with the `seq` for other call ids
if (peerCall.getMessageAction(message) === IncomingMessageAction.Handle) {
const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem);
syncLog.refDetached(item);
connection.lastProcessedSeqNr = message.content.seq;
}
syncLog.wrap("dequeue message", log => {
const message = connection.queuedSignallingMessages.shift()!;
if (message === newMessage) {
log.set("isNewMsg", true);
hasNewMessageBeenDequeued = true;
}
const seq = message.content?.seq;
log.set("seq", seq);
// ignore items in the queue that should not be handled and prevent
// the lastProcessedSeqNr being corrupted with the `seq` for other call ids
// XXX: Not needed anymore when seq is scoped to call_id
// see https://github.com/matrix-org/matrix-spec-proposals/pull/3401#discussion_r1097482166
const action = peerCall.getMessageAction(message);
if (action === IncomingMessageAction.Handle) {
const item = peerCall.handleIncomingSignallingMessage(message, this.deviceId, connection.logItem);
log.refDetached(item);
connection.lastProcessedSeqNr = seq;
} else {
log.set("type", message.type);
log.set("ignored", true);
connection.lastIgnoredSeqNr = seq;
}
});
}
return hasNewMessageBeenDequeued;
}