diff --git a/src/matrix/calls/PeerCall.ts b/src/matrix/calls/PeerCall.ts index fcb71db1..905756d7 100644 --- a/src/matrix/calls/PeerCall.ts +++ b/src/matrix/calls/PeerCall.ts @@ -44,6 +44,8 @@ import type { SDPStreamMetadata, SignallingMessage } from "./callEventTypes"; +import type { ErrorBoundary } from "../../utils/ErrorBoundary"; +import { AbortError } from "../../utils/error"; export type Options = { webRTC: WebRTC, @@ -51,6 +53,7 @@ export type Options = { turnServer: BaseObservableValue, createTimeout: TimeoutCreator, emitUpdate: (peerCall: PeerCall, params: any, log: ILogItem) => void; + errorBoundary: ErrorBoundary; sendSignallingMessage: (message: SignallingMessage, log: ILogItem) => Promise; }; @@ -125,31 +128,34 @@ export class PeerCall implements IDisposable { this.logItem.log({l: "updating turn server", turnServer}) this.peerConnection.setConfiguration({iceServers: [turnServer]}); })); - const listen = (type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void => { - this.peerConnection.addEventListener(type, listener); + const listen = (type: K, listener: (ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void => { + const newListener = (e) => { + this.options.errorBoundary.try(() => listener(e)); + }; + this.peerConnection.addEventListener(type, newListener); const dispose = () => { - this.peerConnection.removeEventListener(type, listener); + this.peerConnection.removeEventListener(type, newListener); }; this.disposables.track(dispose); }; - listen("iceconnectionstatechange", () => { + listen("iceconnectionstatechange", async () => { const state = this.peerConnection.iceConnectionState; - logItem.wrap({l: "onIceConnectionStateChange", status: state}, log => { - this.onIceConnectionStateChange(state, log); + await logItem.wrap({l: "onIceConnectionStateChange", status: state}, async log => { + await this.onIceConnectionStateChange(state, log); }); }); - listen("icecandidate", event => { - logItem.wrap("onLocalIceCandidate", log => { + listen("icecandidate", async (event) => { + await logItem.wrap("onLocalIceCandidate", async log => { if (event.candidate) { - this.handleLocalIceCandidate(event.candidate, log); + await this.handleLocalIceCandidate(event.candidate, log); } }); }); - listen("icegatheringstatechange", () => { + listen("icegatheringstatechange", async () => { const state = this.peerConnection.iceGatheringState; - logItem.wrap({l: "onIceGatheringStateChange", status: state}, log => { - this.handleIceGatheringState(state, log); + await logItem.wrap({l: "onIceGatheringStateChange", status: state}, async log => { + await this.handleIceGatheringState(state, log); }); }); listen("track", event => { @@ -422,14 +428,18 @@ export class PeerCall implements IDisposable { log.refDetached(timeoutLog); // don't await this, as it would block other negotationneeded events from being processed // as they are processed serially - timeoutLog.run(async log => { - try { await this.delay(CALL_TIMEOUT_MS); } - catch (err) { return; } - // @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between - if (this._state === CallState.InviteSent) { - this._hangup(CallErrorCode.InviteTimeout, log); - } - }).catch(err => {}); // prevent error from being unhandled, it will be logged already by run above + try { + await timeoutLog.run(async log => { + await this.delay(CALL_TIMEOUT_MS); + // @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between + if (this._state === CallState.InviteSent) { + await this._hangup(CallErrorCode.InviteTimeout, log); + } + }); + } + catch (e) { + // prevent error from being unhandled, it will be logged already by run above + } } }; @@ -579,7 +589,7 @@ export class PeerCall implements IDisposable { } } - private handleIceGatheringState(state: RTCIceGatheringState, log: ILogItem) { + private async handleIceGatheringState(state: RTCIceGatheringState, log: ILogItem) { if (state === 'complete' && !this.sentEndOfCandidates) { // If we didn't get an empty-string candidate to signal the end of candidates, // create one ourselves now gathering has finished. @@ -591,12 +601,12 @@ export class PeerCall implements IDisposable { const c = { candidate: '', } as RTCIceCandidate; - this.queueCandidate(c, log); + await this.queueCandidate(c, log); this.sentEndOfCandidates = true; } } - private handleLocalIceCandidate(candidate: RTCIceCandidate, log: ILogItem) { + private async handleLocalIceCandidate(candidate: RTCIceCandidate, log: ILogItem) { log.set("sdpMid", candidate.sdpMid); log.set("candidate", candidate.candidate); @@ -606,7 +616,7 @@ export class PeerCall implements IDisposable { // As with the offer, note we need to make a copy of this object, not // pass the original: that broke in Chrome ~m43. if (candidate.candidate !== '' || !this.sentEndOfCandidates) { - this.queueCandidate(candidate, log); + await this.queueCandidate(candidate, log); if (candidate.candidate === '') { this.sentEndOfCandidates = true; } @@ -841,13 +851,19 @@ export class PeerCall implements IDisposable { logStats = true; this.iceDisconnectedTimeout?.abort(); this.iceDisconnectedTimeout = undefined; - this._hangup(CallErrorCode.IceFailed, log); + await this._hangup(CallErrorCode.IceFailed, log); } else if (state == 'disconnected') { logStats = true; this.iceDisconnectedTimeout = this.options.createTimeout(30 * 1000); - this.iceDisconnectedTimeout.elapsed().then(() => { - this._hangup(CallErrorCode.IceFailed, log); - }, () => { /* ignore AbortError */ }); + try { + await this.iceDisconnectedTimeout.elapsed() + await this._hangup(CallErrorCode.IceFailed, log); + } + catch (e){ + if (!(e instanceof AbortError)) { + throw e; + } + } } if (logStats) { const stats = await this.peerConnection.getStats(); diff --git a/src/matrix/calls/group/GroupCall.ts b/src/matrix/calls/group/GroupCall.ts index 0460c212..d540d2e0 100644 --- a/src/matrix/calls/group/GroupCall.ts +++ b/src/matrix/calls/group/GroupCall.ts @@ -133,7 +133,8 @@ export class GroupCall extends EventEmitter<{change: never}> { }, encryptDeviceMessage: (userId: string, deviceId: string, message: SignallingMessage, log) => { return this.options.encryptDeviceMessage(this.roomId, userId, deviceId, message, log); - } + }, + groupCallErrorBoundary: this.errorBoundary, }); } @@ -392,8 +393,8 @@ export class GroupCall extends EventEmitter<{change: never}> { /** @internal */ updateMembership(userId: string, roomMember: RoomMember, callMembership: CallMembership, syncLog: ILogItem) { - this.errorBoundary.try(() => { - syncLog.wrap({l: "update call membership", t: CALL_LOG_TYPE, id: this.id, userId}, log => { + this.errorBoundary.try(async () => { + await syncLog.wrap({l: "update call membership", t: CALL_LOG_TYPE, id: this.id, userId}, async log => { const now = this.options.clock.now(); const devices = callMembership["m.devices"]; const previousDeviceIds = this.getDeviceIdsForUserId(userId); @@ -415,7 +416,7 @@ export class GroupCall extends EventEmitter<{change: never}> { } }); } else { - log.wrap({l: "update device membership", id: memberKey, sessionId: device.session_id}, log => { + await log.wrap({l: "update device membership", id: memberKey, sessionId: device.session_id}, async log => { if (isMemberExpired(device, now)) { log.set("expired", true); const member = this._members.get(memberKey); @@ -434,7 +435,7 @@ export class GroupCall extends EventEmitter<{change: never}> { } else { if (member && sessionIdChanged) { log.set("removedSessionId", member.sessionId); - const disconnectLogItem = member.disconnect(false); + const disconnectLogItem = await member.disconnect(false); if (disconnectLogItem) { log.refDetached(disconnectLogItem); } @@ -528,11 +529,11 @@ export class GroupCall extends EventEmitter<{change: never}> { } /** @internal */ - disconnect(log: ILogItem): boolean { - return this.errorBoundary.try(() => { + disconnect(log: ILogItem): Promise | true { + return this.errorBoundary.try(async () => { if (this.hasJoined) { for (const member of this._members.values()) { - const disconnectLogItem = member.disconnect(true); + const disconnectLogItem = await member.disconnect(true); if (disconnectLogItem) { log.refDetached(disconnectLogItem); } @@ -546,13 +547,13 @@ export class GroupCall extends EventEmitter<{change: never}> { } /** @internal */ - private removeMemberDevice(userId: string, deviceId: string, log: ILogItem) { + private async removeMemberDevice(userId: string, deviceId: string, log: ILogItem) { const memberKey = getMemberKey(userId, deviceId); - log.wrap({l: "remove device member", id: memberKey}, log => { + await log.wrap({l: "remove device member", id: memberKey}, async log => { const member = this._members.get(memberKey); if (member) { log.set("leave", true); - const disconnectLogItem = member.disconnect(false); + const disconnectLogItem = await member.disconnect(false); if (disconnectLogItem) { log.refDetached(disconnectLogItem); } @@ -634,15 +635,15 @@ export class GroupCall extends EventEmitter<{change: never}> { return stateContent; } - private connectToMember(member: Member, joinedData: JoinedData, log: ILogItem) { + private async connectToMember(member: Member, joinedData: JoinedData, log: ILogItem) { const memberKey = getMemberKey(member.userId, member.deviceId); const logItem = joinedData.membersLogItem.child({ l: "member", id: memberKey, sessionId: member.sessionId }); - log.wrap({l: "connect", id: memberKey}, log => { - const connectItem = member.connect( + await log.wrap({l: "connect", id: memberKey}, async log => { + const connectItem = await member.connect( joinedData.localMedia, joinedData.localMuteSettings, joinedData.turnServer, diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index ab99c6b8..88e0369b 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -42,6 +42,7 @@ export type Options = Omit, log: ILogItem) => Promise, emitUpdate: (participant: Member, params?: any) => void, + groupCallErrorBoundary: ErrorBoundary, clock: Clock } @@ -183,8 +184,8 @@ export class Member { } /** @internal */ - connect(localMedia: LocalMedia, localMuteSettings: MuteSettings, turnServer: BaseObservableValue, memberLogItem: ILogItem): ILogItem | undefined { - return this.errorBoundary.try(() => { + connect(localMedia: LocalMedia, localMuteSettings: MuteSettings, turnServer: BaseObservableValue, memberLogItem: ILogItem): Promise | undefined { + return this.errorBoundary.try(async () => { if (this.connection) { return; } @@ -197,7 +198,7 @@ export class Member { ); this.connection = connection; let connectLogItem: ILogItem | undefined; - connection.logItem.wrap("connect", async log => { + await connection.logItem.wrap("connect", async log => { connectLogItem = log; await this.callIfNeeded(log); }); @@ -230,15 +231,15 @@ export class Member { } /** @internal */ - disconnect(hangup: boolean): ILogItem | undefined { - return this.errorBoundary.try(() => { + disconnect(hangup: boolean): Promise | undefined { + return this.errorBoundary.try(async () => { const {connection} = this; if (!connection) { return; } - let disconnectLogItem; + let disconnectLogItem: ILogItem | undefined; // if if not sending the hangup, still log disconnect - connection.logItem.wrap("disconnect", async log => { + await connection.logItem.wrap("disconnect", async log => { disconnectLogItem = log; if (hangup && connection.peerCall) { await connection.peerCall.hangup(CallErrorCode.UserHangup, log); @@ -269,7 +270,7 @@ export class Member { } /** @internal */ - emitUpdateFromPeerCall = (peerCall: PeerCall, params: any, log: ILogItem): void => { + emitUpdateFromPeerCall = async (peerCall: PeerCall, params: any, log: ILogItem): Promise => { const connection = this.connection!; if (peerCall.state === CallState.Ringing) { connection.logItem.wrap("ringing, answer peercall", answerLog => { @@ -284,12 +285,12 @@ export class Member { if (hangupReason && !errorCodesWithoutRetry.includes(hangupReason)) { connection.retryCount += 1; const {retryCount} = connection; - connection.logItem.wrap({l: "retry connection", retryCount}, async retryLog => { + await connection.logItem.wrap({l: "retry connection", retryCount}, async retryLog => { log.refDetached(retryLog); if (retryCount <= 3) { await this.callIfNeeded(retryLog); } else { - const disconnectLogItem = this.disconnect(false); + const disconnectLogItem = await this.disconnect(false); if (disconnectLogItem) { retryLog.refDetached(disconnectLogItem); } @@ -303,6 +304,10 @@ export class Member { /** @internal */ sendSignallingMessage = async (message: SignallingMessage, log: ILogItem): Promise => { const groupMessage = message as SignallingMessage; + if (this.connection?.peerCall?.state === CallState.CreateOffer) { + // @ts-ignore + this.connection!.foobar.barfpp; + } groupMessage.content.seq = this.connection!.outboundSeqCounter++; groupMessage.content.conf_id = this.options.confId; groupMessage.content.device_id = this.options.ownDeviceId; @@ -421,6 +426,7 @@ export class Member { private _createPeerCall(callId: string): PeerCall { const connection = this.connection!; return new PeerCall(callId, Object.assign({}, this.options, { + errorBoundary: this.options.groupCallErrorBoundary, emitUpdate: this.emitUpdateFromPeerCall, sendSignallingMessage: this.sendSignallingMessage, turnServer: connection.turnServer diff --git a/src/utils/ErrorBoundary.ts b/src/utils/ErrorBoundary.ts index a74da479..a0f6fca7 100644 --- a/src/utils/ErrorBoundary.ts +++ b/src/utils/ErrorBoundary.ts @@ -14,8 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -export const ErrorValue = Symbol("ErrorBoundary:Error"); - export class ErrorBoundary { private _error?: Error; @@ -23,7 +21,7 @@ export class ErrorBoundary { /** * Executes callback() and then runs errorCallback() on error. - * This will never throw but instead return `errorValue` if an error occured. + * This will never throw but instead return `errorValue` if an error occurred. */ try(callback: () => T, errorValue?: E): T | typeof errorValue; try(callback: () => Promise, errorValue?: E): Promise | typeof errorValue {