Merge pull request #881 from vector-im/bwindels/calls

TURN and End-of-Candidates fixes for thirdroom branch
This commit is contained in:
Robert Long 2022-09-26 10:28:42 -07:00 committed by GitHub
commit a5ad9247f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 289 additions and 20 deletions

View File

@ -101,9 +101,6 @@ export class Session {
ownDeviceId: sessionInfo.deviceId, ownDeviceId: sessionInfo.deviceId,
ownUserId: sessionInfo.userId, ownUserId: sessionInfo.userId,
logger: this._platform.logger, logger: this._platform.logger,
turnServers: [{
urls: ["stun:turn.matrix.org"],
}],
forceTURN: false, forceTURN: false,
}); });
this._roomStateHandler = new RoomStateHandlerSet(); this._roomStateHandler = new RoomStateHandlerSet();
@ -499,6 +496,8 @@ export class Session {
this._megolmDecryption = undefined; this._megolmDecryption = undefined;
this._e2eeAccount?.dispose(); this._e2eeAccount?.dispose();
this._e2eeAccount = undefined; this._e2eeAccount = undefined;
this._callHandler?.dispose();
this._callHandler = undefined;
for (const room of this._rooms.values()) { for (const room of this._rooms.values()) {
room.dispose(); room.dispose();
} }

View File

@ -23,6 +23,7 @@ import {GroupCall} from "./group/GroupCall";
import {makeId} from "../common"; import {makeId} from "../common";
import {CALL_LOG_TYPE} from "./common"; import {CALL_LOG_TYPE} from "./common";
import {EVENT_TYPE as MEMBER_EVENT_TYPE, RoomMember} from "../room/members/RoomMember"; import {EVENT_TYPE as MEMBER_EVENT_TYPE, RoomMember} from "../room/members/RoomMember";
import {TurnServerSource} from "./TurnServerSource";
import type {LocalMedia} from "./LocalMedia"; import type {LocalMedia} from "./LocalMedia";
import type {Room} from "../room/Room"; import type {Room} from "../room/Room";
@ -39,7 +40,7 @@ import type {Clock} from "../../platform/web/dom/Clock";
import type {RoomStateHandler} from "../room/state/types"; import type {RoomStateHandler} from "../room/state/types";
import type {MemberSync} from "../room/timeline/persistence/MemberWriter"; import type {MemberSync} from "../room/timeline/persistence/MemberWriter";
export type Options = Omit<GroupCallOptions, "emitUpdate" | "createTimeout"> & { export type Options = Omit<GroupCallOptions, "emitUpdate" | "createTimeout" | "turnServerSource"> & {
clock: Clock clock: Clock
}; };
@ -54,9 +55,12 @@ export class CallHandler implements RoomStateHandler {
private roomMemberToCallIds: Map<string, Set<string>> = new Map(); private roomMemberToCallIds: Map<string, Set<string>> = new Map();
private groupCallOptions: GroupCallOptions; private groupCallOptions: GroupCallOptions;
private sessionId = makeId("s"); private sessionId = makeId("s");
private turnServerSource: TurnServerSource;
constructor(private readonly options: Options) { constructor(private readonly options: Options) {
this.turnServerSource = new TurnServerSource(this.options.hsApi, this.options.clock);
this.groupCallOptions = Object.assign({}, this.options, { this.groupCallOptions = Object.assign({}, this.options, {
turnServerSource: this.turnServerSource,
emitUpdate: (groupCall, params) => this._calls.update(groupCall.id, params), emitUpdate: (groupCall, params) => this._calls.update(groupCall.id, params),
createTimeout: this.options.clock.createTimeout, createTimeout: this.options.clock.createTimeout,
sessionId: this.sessionId sessionId: this.sessionId
@ -242,5 +246,11 @@ export class CallHandler implements RoomStateHandler {
this.roomMemberToCallIds.set(roomMemberKey, newCallIdsMemberOf); this.roomMemberToCallIds.set(roomMemberKey, newCallIdsMemberOf);
} }
} }
dispose() {
this.turnServerSource.dispose();
const joinedCalls = Array.from(this._calls.values()).filter(c => c.hasJoined);
Promise.all(joinedCalls.map(c => c.leave())).then(() => {}, () => {});
}
} }

View File

@ -15,6 +15,7 @@ limitations under the License.
*/ */
import {ObservableMap} from "../../observable/map/ObservableMap"; import {ObservableMap} from "../../observable/map/ObservableMap";
import {BaseObservableValue} from "../../observable/value/BaseObservableValue";
import {recursivelyAssign} from "../../utils/recursivelyAssign"; import {recursivelyAssign} from "../../utils/recursivelyAssign";
import {Disposables, Disposable, IDisposable} from "../../utils/Disposables"; import {Disposables, Disposable, IDisposable} from "../../utils/Disposables";
import {WebRTC, PeerConnection, Transceiver, TransceiverDirection, Sender, Receiver, PeerConnectionEventMap} from "../../platform/types/WebRTC"; import {WebRTC, PeerConnection, Transceiver, TransceiverDirection, Sender, Receiver, PeerConnectionEventMap} from "../../platform/types/WebRTC";
@ -47,7 +48,7 @@ import type {
export type Options = { export type Options = {
webRTC: WebRTC, webRTC: WebRTC,
forceTURN: boolean, forceTURN: boolean,
turnServers: RTCIceServer[], turnServer: BaseObservableValue<RTCIceServer>,
createTimeout: TimeoutCreator, createTimeout: TimeoutCreator,
emitUpdate: (peerCall: PeerCall, params: any, log: ILogItem) => void; emitUpdate: (peerCall: PeerCall, params: any, log: ILogItem) => void;
sendSignallingMessage: (message: SignallingMessage<MCallBase>, log: ILogItem) => Promise<void>; sendSignallingMessage: (message: SignallingMessage<MCallBase>, log: ILogItem) => Promise<void>;
@ -114,8 +115,16 @@ export class PeerCall implements IDisposable {
) { ) {
logItem.log({l: "create PeerCall", id: callId}); logItem.log({l: "create PeerCall", id: callId});
this._remoteMedia = new RemoteMedia(); this._remoteMedia = new RemoteMedia();
this.peerConnection = options.webRTC.createPeerConnection(this.options.forceTURN, this.options.turnServers, 0); this.peerConnection = options.webRTC.createPeerConnection(
this.options.forceTURN,
[this.options.turnServer.get()],
0
);
// update turn servers when they change (see TurnServerSource)
this.disposables.track(this.options.turnServer.subscribe(turnServer => {
this.logItem.log({l: "updating turn server", turnServer})
this.peerConnection.setConfiguration({iceServers: [turnServer]});
}));
const listen = <K extends keyof PeerConnectionEventMap>(type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void => { const listen = <K extends keyof PeerConnectionEventMap>(type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void => {
this.peerConnection.addEventListener(type, listener); this.peerConnection.addEventListener(type, listener);
const dispose = () => { const dispose = () => {

View File

@ -0,0 +1,223 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {RetainedObservableValue} from "../../observable/value/RetainedObservableValue";
import type {HomeServerApi} from "../net/HomeServerApi";
import type {IHomeServerRequest} from "../net/HomeServerRequest";
import type {BaseObservableValue} from "../../observable/value/BaseObservableValue";
import type {ObservableValue} from "../../observable/value/ObservableValue";
import type {Clock, Timeout} from "../../platform/web/dom/Clock";
import type {ILogItem} from "../../logging/types";
type TurnServerSettings = {
uris: string[],
username: string,
password: string,
ttl: number
};
const DEFAULT_TTL = 5 * 60; // 5min
const DEFAULT_SETTINGS: RTCIceServer = {
urls: ["stun:turn.matrix.org"],
username: "",
credential: "",
};
export class TurnServerSource {
private currentObservable?: ObservableValue<RTCIceServer>;
private pollTimeout?: Timeout;
private pollRequest?: IHomeServerRequest;
private isPolling = false;
constructor(
private hsApi: HomeServerApi,
private clock: Clock,
private defaultSettings: RTCIceServer = DEFAULT_SETTINGS
) {}
getSettings(log: ILogItem): Promise<BaseObservableValue<RTCIceServer>> {
return log.wrap("get turn server", async log => {
if (!this.isPolling) {
const settings = await this.doRequest(log);
const iceServer = settings ? toIceServer(settings) : this.defaultSettings;
log.set("iceServer", iceServer);
if (this.currentObservable) {
this.currentObservable.set(iceServer);
} else {
this.currentObservable = new RetainedObservableValue(iceServer,
() => {
this.stopPollLoop();
},
() => {
// start loop on first subscribe
this.runLoop(settings?.ttl ?? DEFAULT_TTL);
});
}
}
return this.currentObservable!;
});
}
private async runLoop(initialTtl: number): Promise<void> {
let ttl = initialTtl;
this.isPolling = true;
while(this.isPolling) {
try {
this.pollTimeout = this.clock.createTimeout(ttl * 1000);
await this.pollTimeout.elapsed();
this.pollTimeout = undefined;
const settings = await this.doRequest(undefined);
if (settings) {
const iceServer = toIceServer(settings);
if (shouldUpdate(this.currentObservable!, iceServer)) {
this.currentObservable!.set(iceServer);
}
if (settings.ttl > 0) {
ttl = settings.ttl;
} else {
// stop polling is settings are good indefinitely
this.stopPollLoop();
}
} else {
ttl = DEFAULT_TTL;
}
} catch (err) {
if (err.name === "AbortError") {
/* ignore, the loop will exit because isPolling is false */
} else {
// TODO: log error
}
}
}
}
private async doRequest(log: ILogItem | undefined): Promise<TurnServerSettings | undefined> {
try {
this.pollRequest = this.hsApi.getTurnServer({log});
const settings = await this.pollRequest.response();
return settings;
} catch (err) {
if (err.name === "HomeServerError") {
return undefined;
}
throw err;
} finally {
this.pollRequest = undefined;
}
}
private stopPollLoop() {
this.isPolling = false;
this.currentObservable = undefined;
this.pollTimeout?.dispose();
this.pollTimeout = undefined;
this.pollRequest?.abort();
this.pollRequest = undefined;
}
dispose() {
this.stopPollLoop();
}
}
function shouldUpdate(observable: BaseObservableValue<RTCIceServer | undefined>, settings: RTCIceServer): boolean {
const currentSettings = observable.get();
if (!currentSettings) {
return true;
}
// same length and new settings doesn't contain any uri the old settings don't contain
const currentUrls = Array.isArray(currentSettings.urls) ? currentSettings.urls : [currentSettings.urls];
const newUrls = Array.isArray(settings.urls) ? settings.urls : [settings.urls];
const arraysEqual = currentUrls.length === newUrls.length &&
!newUrls.some(uri => !currentUrls.includes(uri));
return !arraysEqual || settings.username !== currentSettings.username ||
settings.credential !== currentSettings.credential;
}
function toIceServer(settings: TurnServerSettings): RTCIceServer {
return {
urls: settings.uris,
username: settings.username,
credential: settings.password,
credentialType: "password"
}
}
export function tests() {
return {
"shouldUpdate returns false for same object": assert => {
const observable = {get() {
return {
urls: ["a", "b"],
username: "alice",
credential: "f00",
};
}};
const same = {
urls: ["a", "b"],
username: "alice",
credential: "f00",
};
assert.equal(false, shouldUpdate(observable as any as BaseObservableValue<RTCIceServer>, same));
},
"shouldUpdate returns true for 1 different uri": assert => {
const observable = {get() {
return {
urls: ["a", "c"],
username: "alice",
credential: "f00",
};
}};
const same = {
urls: ["a", "b"],
username: "alice",
credential: "f00",
};
assert.equal(true, shouldUpdate(observable as any as BaseObservableValue<RTCIceServer>, same));
},
"shouldUpdate returns true for different user": assert => {
const observable = {get() {
return {
urls: ["a", "b"],
username: "alice",
credential: "f00",
};
}};
const same = {
urls: ["a", "b"],
username: "bob",
credential: "f00",
};
assert.equal(true, shouldUpdate(observable as any as BaseObservableValue<RTCIceServer>, same));
},
"shouldUpdate returns true for different password": assert => {
const observable = {get() {
return {
urls: ["a", "b"],
username: "alice",
credential: "f00",
};
}};
const same = {
urls: ["a", "b"],
username: "alice",
credential: "b4r",
};
assert.equal(true, shouldUpdate(observable as any as BaseObservableValue<RTCIceServer>, same));
}
}
}

View File

@ -23,6 +23,7 @@ import {EventEmitter} from "../../../utils/EventEmitter";
import {EventType, CallIntent} from "../callEventTypes"; import {EventType, CallIntent} from "../callEventTypes";
import type {Options as MemberOptions} from "./Member"; import type {Options as MemberOptions} from "./Member";
import type {TurnServerSource} from "../TurnServerSource";
import type {BaseObservableMap} from "../../../observable/map/BaseObservableMap"; import type {BaseObservableMap} from "../../../observable/map/BaseObservableMap";
import type {Track} from "../../../platform/types/MediaDevices"; import type {Track} from "../../../platform/types/MediaDevices";
import type {SignallingMessage, MGroupCallBase, CallMembership} from "../callEventTypes"; import type {SignallingMessage, MGroupCallBase, CallMembership} from "../callEventTypes";
@ -32,6 +33,7 @@ import type {Platform} from "../../../platform/web/Platform";
import type {EncryptedMessage} from "../../e2ee/olm/Encryption"; import type {EncryptedMessage} from "../../e2ee/olm/Encryption";
import type {ILogItem, ILogger} from "../../../logging/types"; import type {ILogItem, ILogger} from "../../../logging/types";
import type {Storage} from "../../storage/idb/Storage"; import type {Storage} from "../../storage/idb/Storage";
import type {BaseObservableValue} from "../../../observable/value/BaseObservableValue";
export enum GroupCallState { export enum GroupCallState {
Fledgling = "fledgling", Fledgling = "fledgling",
@ -53,11 +55,12 @@ function getDeviceFromMemberKey(key: string): string {
return JSON.parse(`[${key}]`)[1]; return JSON.parse(`[${key}]`)[1];
} }
export type Options = Omit<MemberOptions, "emitUpdate" | "confId" | "encryptDeviceMessage"> & { export type Options = Omit<MemberOptions, "emitUpdate" | "confId" | "encryptDeviceMessage" | "turnServer"> & {
emitUpdate: (call: GroupCall, params?: any) => void; emitUpdate: (call: GroupCall, params?: any) => void;
encryptDeviceMessage: (roomId: string, userId: string, deviceId: string, message: SignallingMessage<MGroupCallBase>, log: ILogItem) => Promise<EncryptedMessage | undefined>, encryptDeviceMessage: (roomId: string, userId: string, deviceId: string, message: SignallingMessage<MGroupCallBase>, log: ILogItem) => Promise<EncryptedMessage | undefined>,
storage: Storage, storage: Storage,
logger: ILogger, logger: ILogger,
turnServerSource: TurnServerSource
}; };
class JoinedData { class JoinedData {
@ -65,7 +68,8 @@ class JoinedData {
public readonly logItem: ILogItem, public readonly logItem: ILogItem,
public readonly membersLogItem: ILogItem, public readonly membersLogItem: ILogItem,
public localMedia: LocalMedia, public localMedia: LocalMedia,
public localMuteSettings: MuteSettings public localMuteSettings: MuteSettings,
public turnServer: BaseObservableValue<RTCIceServer>
) {} ) {}
dispose() { dispose() {
@ -147,6 +151,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
id: this.id, id: this.id,
ownSessionId: this.options.sessionId ownSessionId: this.options.sessionId
}); });
const turnServer = await this.options.turnServerSource.getSettings(logItem);
const membersLogItem = logItem.child("member connections"); const membersLogItem = logItem.child("member connections");
const localMuteSettings = new MuteSettings(); const localMuteSettings = new MuteSettings();
localMuteSettings.updateTrackInfo(localMedia.userMedia); localMuteSettings.updateTrackInfo(localMedia.userMedia);
@ -154,7 +159,8 @@ export class GroupCall extends EventEmitter<{change: never}> {
logItem, logItem,
membersLogItem, membersLogItem,
localMedia, localMedia,
localMuteSettings localMuteSettings,
turnServer
); );
this.joinedData = joinedData; this.joinedData = joinedData;
await joinedData.logItem.wrap("join", async log => { await joinedData.logItem.wrap("join", async log => {
@ -529,7 +535,12 @@ export class GroupCall extends EventEmitter<{change: never}> {
const logItem = joinedData.membersLogItem.child({l: "member", id: memberKey}); const logItem = joinedData.membersLogItem.child({l: "member", id: memberKey});
logItem.set("sessionId", member.sessionId); logItem.set("sessionId", member.sessionId);
log.wrap({l: "connect", id: memberKey}, log => { log.wrap({l: "connect", id: memberKey}, log => {
const connectItem = member.connect(joinedData.localMedia, joinedData.localMuteSettings, logItem); const connectItem = member.connect(
joinedData.localMedia,
joinedData.localMuteSettings,
joinedData.turnServer,
logItem
);
if (connectItem) { if (connectItem) {
log.refDetached(connectItem); log.refDetached(connectItem);
} }

View File

@ -19,6 +19,7 @@ import {makeTxnId, makeId} from "../../common";
import {EventType, CallErrorCode} from "../callEventTypes"; import {EventType, CallErrorCode} from "../callEventTypes";
import {formatToDeviceMessagesPayload} from "../../common"; import {formatToDeviceMessagesPayload} from "../../common";
import {sortedIndex} from "../../../utils/sortedIndex"; import {sortedIndex} from "../../../utils/sortedIndex";
import type {MuteSettings} from "../common"; import type {MuteSettings} from "../common";
import type {Options as PeerCallOptions, RemoteMedia} from "../PeerCall"; import type {Options as PeerCallOptions, RemoteMedia} from "../PeerCall";
import type {LocalMedia} from "../LocalMedia"; import type {LocalMedia} from "../LocalMedia";
@ -28,8 +29,9 @@ import type {GroupCall} from "./GroupCall";
import type {RoomMember} from "../../room/members/RoomMember"; import type {RoomMember} from "../../room/members/RoomMember";
import type {EncryptedMessage} from "../../e2ee/olm/Encryption"; import type {EncryptedMessage} from "../../e2ee/olm/Encryption";
import type {ILogItem} from "../../../logging/types"; import type {ILogItem} from "../../../logging/types";
import type {BaseObservableValue} from "../../../observable/value/BaseObservableValue";
export type Options = Omit<PeerCallOptions, "emitUpdate" | "sendSignallingMessage"> & { export type Options = Omit<PeerCallOptions, "emitUpdate" | "sendSignallingMessage" | "turnServer"> & {
confId: string, confId: string,
ownUserId: string, ownUserId: string,
ownDeviceId: string, ownDeviceId: string,
@ -60,6 +62,7 @@ class MemberConnection {
constructor( constructor(
public localMedia: LocalMedia, public localMedia: LocalMedia,
public localMuteSettings: MuteSettings, public localMuteSettings: MuteSettings,
public turnServer: BaseObservableValue<RTCIceServer>,
public readonly logItem: ILogItem public readonly logItem: ILogItem
) {} ) {}
} }
@ -122,12 +125,17 @@ export class Member {
} }
/** @internal */ /** @internal */
connect(localMedia: LocalMedia, localMuteSettings: MuteSettings, memberLogItem: ILogItem): ILogItem | undefined { connect(localMedia: LocalMedia, localMuteSettings: MuteSettings, turnServer: BaseObservableValue<RTCIceServer>, memberLogItem: ILogItem): ILogItem | undefined {
if (this.connection) { if (this.connection) {
return; return;
} }
// Safari can't send a MediaStream to multiple sources, so clone it // Safari can't send a MediaStream to multiple sources, so clone it
const connection = new MemberConnection(localMedia.clone(), localMuteSettings, memberLogItem); const connection = new MemberConnection(
localMedia.clone(),
localMuteSettings,
turnServer,
memberLogItem
);
this.connection = connection; this.connection = connection;
let connectLogItem; let connectLogItem;
connection.logItem.wrap("connect", async log => { connection.logItem.wrap("connect", async log => {
@ -355,9 +363,11 @@ export class Member {
} }
private _createPeerCall(callId: string): PeerCall { private _createPeerCall(callId: string): PeerCall {
const connection = this.connection!;
return new PeerCall(callId, Object.assign({}, this.options, { return new PeerCall(callId, Object.assign({}, this.options, {
emitUpdate: this.emitUpdateFromPeerCall, emitUpdate: this.emitUpdateFromPeerCall,
sendSignallingMessage: this.sendSignallingMessage sendSignallingMessage: this.sendSignallingMessage,
}), this.connection!.logItem); turnServer: connection.turnServer
}), connection.logItem);
} }
} }

View File

@ -373,6 +373,10 @@ export class HomeServerApi {
setAccountData(ownUserId: string, type: string, content: Record<string, any>, options?: BaseRequestOptions): IHomeServerRequest { setAccountData(ownUserId: string, type: string, content: Record<string, any>, options?: BaseRequestOptions): IHomeServerRequest {
return this._put(`/user/${encodeURIComponent(ownUserId)}/account_data/${encodeURIComponent(type)}`, {}, content, options); return this._put(`/user/${encodeURIComponent(ownUserId)}/account_data/${encodeURIComponent(type)}`, {}, content, options);
} }
getTurnServer(options?: BaseRequestOptions): IHomeServerRequest {
return this._get(`/voip/turnServer`, undefined, undefined, options);
}
} }
import {Request as MockRequest} from "../../mocks/Request.js"; import {Request as MockRequest} from "../../mocks/Request.js";

View File

@ -17,15 +17,17 @@ limitations under the License.
import {ObservableValue} from "./ObservableValue"; import {ObservableValue} from "./ObservableValue";
export class RetainedObservableValue<T> extends ObservableValue<T> { export class RetainedObservableValue<T> extends ObservableValue<T> {
private _freeCallback: () => void;
constructor(initialValue: T, freeCallback: () => void) { constructor(initialValue: T, private freeCallback: () => void, private startCallback: () => void = () => {}) {
super(initialValue); super(initialValue);
this._freeCallback = freeCallback; }
onSubscribeFirst() {
this.startCallback();
} }
onUnsubscribeLast() { onUnsubscribeLast() {
super.onUnsubscribeLast(); super.onUnsubscribeLast();
this._freeCallback(); this.freeCallback();
} }
} }

View File

@ -148,6 +148,7 @@ export interface PeerConnection {
addEventListener<K extends keyof PeerConnectionEventMap>(type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | AddEventListenerOptions): void; addEventListener<K extends keyof PeerConnectionEventMap>(type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | AddEventListenerOptions): void;
removeEventListener<K extends keyof PeerConnectionEventMap>(type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void; removeEventListener<K extends keyof PeerConnectionEventMap>(type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void;
getStats(selector?: Track | null): Promise<StatsReport>; getStats(selector?: Track | null): Promise<StatsReport>;
setConfiguration(configuration?: RTCConfiguration): void;
} }