diff --git a/src/matrix/calls/PeerCall.ts b/src/matrix/calls/PeerCall.ts index dc85084a..d94c9c35 100644 --- a/src/matrix/calls/PeerCall.ts +++ b/src/matrix/calls/PeerCall.ts @@ -44,6 +44,8 @@ import type { SDPStreamMetadata, SignallingMessage } from "./callEventTypes"; +import type { ErrorBoundary } from "../../utils/ErrorBoundary"; +import { AbortError } from "../../utils/error"; export type Options = { webRTC: WebRTC, @@ -51,6 +53,7 @@ export type Options = { turnServer: BaseObservableValue, createTimeout: TimeoutCreator, emitUpdate: (peerCall: PeerCall, params: any, log: ILogItem) => void; + errorBoundary: ErrorBoundary; sendSignallingMessage: (message: SignallingMessage, log: ILogItem) => Promise; }; @@ -125,31 +128,34 @@ export class PeerCall implements IDisposable { this.logItem.log({l: "updating turn server", turnServer}) this.peerConnection.setConfiguration({iceServers: [turnServer]}); })); - const listen = (type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void => { - this.peerConnection.addEventListener(type, listener); + const listen = (type: K, listener: (ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void => { + const newListener = (e) => { + this.options.errorBoundary.try(() => listener(e)); + }; + this.peerConnection.addEventListener(type, newListener); const dispose = () => { - this.peerConnection.removeEventListener(type, listener); + this.peerConnection.removeEventListener(type, newListener); }; this.disposables.track(dispose); }; - listen("iceconnectionstatechange", () => { + listen("iceconnectionstatechange", async () => { const state = this.peerConnection.iceConnectionState; - logItem.wrap({l: "onIceConnectionStateChange", status: state}, log => { - this.onIceConnectionStateChange(state, log); + await logItem.wrap({l: "onIceConnectionStateChange", status: state}, async log => { + await this.onIceConnectionStateChange(state, log); }); }); - listen("icecandidate", event => { - logItem.wrap("onLocalIceCandidate", log => { + listen("icecandidate", async (event) => { + await logItem.wrap("onLocalIceCandidate", async log => { if (event.candidate) { - this.handleLocalIceCandidate(event.candidate, log); + await this.handleLocalIceCandidate(event.candidate, log); } }); }); - listen("icegatheringstatechange", () => { + listen("icegatheringstatechange", async () => { const state = this.peerConnection.iceGatheringState; - logItem.wrap({l: "onIceGatheringStateChange", status: state}, log => { - this.handleIceGatheringState(state, log); + await logItem.wrap({l: "onIceGatheringStateChange", status: state}, async log => { + await this.handleIceGatheringState(state, log); }); }); listen("track", event => { @@ -171,6 +177,9 @@ export class PeerCall implements IDisposable { }); }; this.responsePromiseChain = this.responsePromiseChain?.then(promiseCreator) ?? promiseCreator(); + this.responsePromiseChain.catch((e) => + this.options.errorBoundary.reportError(e) + ); }); } @@ -285,11 +294,12 @@ export class PeerCall implements IDisposable { } private async _hangup(errorCode: CallErrorCode, log: ILogItem): Promise { - if (this._state === CallState.Ended) { + if (this._state === CallState.Ended || this._state === CallState.Ending) { return; } - this.terminate(CallParty.Local, errorCode, log); + this.setState(CallState.Ending, log); await this.sendHangupWithCallId(this.callId, errorCode, log); + this.terminate(CallParty.Local, errorCode, log); } getMessageAction(message: SignallingMessage): IncomingMessageAction { @@ -422,14 +432,14 @@ export class PeerCall implements IDisposable { log.refDetached(timeoutLog); // don't await this, as it would block other negotationneeded events from being processed // as they are processed serially - timeoutLog.run(async log => { + await timeoutLog.run(async log => { try { await this.delay(CALL_TIMEOUT_MS); } - catch (err) { return; } + catch (err) { return; } // return when delay is cancelled by throwing an AbortError // @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between if (this._state === CallState.InviteSent) { - this._hangup(CallErrorCode.InviteTimeout, log); + await this._hangup(CallErrorCode.InviteTimeout, log); } - }).catch(err => {}); // prevent error from being unhandled, it will be logged already by run above + }); } }; @@ -579,7 +589,7 @@ export class PeerCall implements IDisposable { } } - private handleIceGatheringState(state: RTCIceGatheringState, log: ILogItem) { + private async handleIceGatheringState(state: RTCIceGatheringState, log: ILogItem) { if (state === 'complete' && !this.sentEndOfCandidates) { // If we didn't get an empty-string candidate to signal the end of candidates, // create one ourselves now gathering has finished. @@ -591,12 +601,12 @@ export class PeerCall implements IDisposable { const c = { candidate: '', } as RTCIceCandidate; - this.queueCandidate(c, log); + await this.queueCandidate(c, log); this.sentEndOfCandidates = true; } } - private handleLocalIceCandidate(candidate: RTCIceCandidate, log: ILogItem) { + private async handleLocalIceCandidate(candidate: RTCIceCandidate, log: ILogItem) { log.set("sdpMid", candidate.sdpMid); log.set("candidate", candidate.candidate); @@ -606,7 +616,7 @@ export class PeerCall implements IDisposable { // As with the offer, note we need to make a copy of this object, not // pass the original: that broke in Chrome ~m43. if (candidate.candidate !== '' || !this.sentEndOfCandidates) { - this.queueCandidate(candidate, log); + await this.queueCandidate(candidate, log); if (candidate.candidate === '') { this.sentEndOfCandidates = true; } @@ -841,13 +851,19 @@ export class PeerCall implements IDisposable { logStats = true; this.iceDisconnectedTimeout?.abort(); this.iceDisconnectedTimeout = undefined; - this._hangup(CallErrorCode.IceFailed, log); + await this._hangup(CallErrorCode.IceFailed, log); } else if (state == 'disconnected') { logStats = true; this.iceDisconnectedTimeout = this.options.createTimeout(30 * 1000); - this.iceDisconnectedTimeout.elapsed().then(() => { - this._hangup(CallErrorCode.IceFailed, log); - }, () => { /* ignore AbortError */ }); + try { + await this.iceDisconnectedTimeout.elapsed() + await this._hangup(CallErrorCode.IceFailed, log); + } + catch (e){ + if (!(e instanceof AbortError)) { + throw e; + } + } } if (logStats) { const stats = await this.peerConnection.getStats(); @@ -1131,6 +1147,7 @@ export enum CallState { Connecting = 'connecting', Connected = 'connected', Ringing = 'ringing', + Ending = 'ending', Ended = 'ended', } diff --git a/src/matrix/calls/group/GroupCall.ts b/src/matrix/calls/group/GroupCall.ts index 0460c212..b82b3276 100644 --- a/src/matrix/calls/group/GroupCall.ts +++ b/src/matrix/calls/group/GroupCall.ts @@ -99,6 +99,7 @@ export class GroupCall extends EventEmitter<{change: never}> { // in case the error happens in code that does not log, // log it here to make sure it isn't swallowed this.joinedData.logItem.log("error at boundary").catch(err); + console.error(err); } }); @@ -392,8 +393,8 @@ export class GroupCall extends EventEmitter<{change: never}> { /** @internal */ updateMembership(userId: string, roomMember: RoomMember, callMembership: CallMembership, syncLog: ILogItem) { - this.errorBoundary.try(() => { - syncLog.wrap({l: "update call membership", t: CALL_LOG_TYPE, id: this.id, userId}, log => { + this.errorBoundary.try(async () => { + await syncLog.wrap({l: "update call membership", t: CALL_LOG_TYPE, id: this.id, userId}, async log => { const now = this.options.clock.now(); const devices = callMembership["m.devices"]; const previousDeviceIds = this.getDeviceIdsForUserId(userId); @@ -415,7 +416,7 @@ export class GroupCall extends EventEmitter<{change: never}> { } }); } else { - log.wrap({l: "update device membership", id: memberKey, sessionId: device.session_id}, log => { + await log.wrap({l: "update device membership", id: memberKey, sessionId: device.session_id}, async log => { if (isMemberExpired(device, now)) { log.set("expired", true); const member = this._members.get(memberKey); @@ -434,7 +435,7 @@ export class GroupCall extends EventEmitter<{change: never}> { } else { if (member && sessionIdChanged) { log.set("removedSessionId", member.sessionId); - const disconnectLogItem = member.disconnect(false); + const disconnectLogItem = await member.disconnect(false); if (disconnectLogItem) { log.refDetached(disconnectLogItem); } @@ -528,11 +529,11 @@ export class GroupCall extends EventEmitter<{change: never}> { } /** @internal */ - disconnect(log: ILogItem): boolean { - return this.errorBoundary.try(() => { + disconnect(log: ILogItem): Promise | true { + return this.errorBoundary.try(async () => { if (this.hasJoined) { for (const member of this._members.values()) { - const disconnectLogItem = member.disconnect(true); + const disconnectLogItem = await member.disconnect(true); if (disconnectLogItem) { log.refDetached(disconnectLogItem); } @@ -546,13 +547,13 @@ export class GroupCall extends EventEmitter<{change: never}> { } /** @internal */ - private removeMemberDevice(userId: string, deviceId: string, log: ILogItem) { + private async removeMemberDevice(userId: string, deviceId: string, log: ILogItem) { const memberKey = getMemberKey(userId, deviceId); - log.wrap({l: "remove device member", id: memberKey}, log => { + await log.wrap({l: "remove device member", id: memberKey}, async log => { const member = this._members.get(memberKey); if (member) { log.set("leave", true); - const disconnectLogItem = member.disconnect(false); + const disconnectLogItem = await member.disconnect(false); if (disconnectLogItem) { log.refDetached(disconnectLogItem); } @@ -634,15 +635,15 @@ export class GroupCall extends EventEmitter<{change: never}> { return stateContent; } - private connectToMember(member: Member, joinedData: JoinedData, log: ILogItem) { + private async connectToMember(member: Member, joinedData: JoinedData, log: ILogItem) { const memberKey = getMemberKey(member.userId, member.deviceId); const logItem = joinedData.membersLogItem.child({ l: "member", id: memberKey, sessionId: member.sessionId }); - log.wrap({l: "connect", id: memberKey}, log => { - const connectItem = member.connect( + await log.wrap({l: "connect", id: memberKey}, async log => { + const connectItem = await member.connect( joinedData.localMedia, joinedData.localMuteSettings, joinedData.turnServer, diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index 0a0e697c..db5482ee 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -193,8 +193,8 @@ export class Member { } /** @internal */ - connect(localMedia: LocalMedia, localMuteSettings: MuteSettings, turnServer: BaseObservableValue, memberLogItem: ILogItem): ILogItem | undefined { - return this.errorBoundary.try(() => { + connect(localMedia: LocalMedia, localMuteSettings: MuteSettings, turnServer: BaseObservableValue, memberLogItem: ILogItem): Promise | undefined { + return this.errorBoundary.try(async () => { if (this.connection) { return; } @@ -207,7 +207,7 @@ export class Member { ); this.connection = connection; let connectLogItem: ILogItem | undefined; - connection.logItem.wrap("connect", async log => { + await connection.logItem.wrap("connect", async log => { connectLogItem = log; await this.callIfNeeded(log); }); @@ -240,15 +240,15 @@ export class Member { } /** @internal */ - disconnect(hangup: boolean): ILogItem | undefined { - return this.errorBoundary.try(() => { + disconnect(hangup: boolean): Promise | undefined { + return this.errorBoundary.try(async () => { const {connection} = this; if (!connection) { return; } - let disconnectLogItem; + let disconnectLogItem: ILogItem | undefined; // if if not sending the hangup, still log disconnect - connection.logItem.wrap("disconnect", async log => { + await connection.logItem.wrap("disconnect", async log => { disconnectLogItem = log; if (hangup && connection.peerCall) { await connection.peerCall.hangup(CallErrorCode.UserHangup, log); @@ -279,7 +279,7 @@ export class Member { } /** @internal */ - emitUpdateFromPeerCall = (peerCall: PeerCall, params: any, log: ILogItem): void => { + emitUpdateFromPeerCall = async (peerCall: PeerCall, params: any, log: ILogItem): Promise => { const connection = this.connection!; if (peerCall.state === CallState.Ringing) { connection.logItem.wrap("ringing, answer peercall", answerLog => { @@ -294,12 +294,12 @@ export class Member { if (hangupReason && !errorCodesWithoutRetry.includes(hangupReason)) { connection.retryCount += 1; const {retryCount} = connection; - connection.logItem.wrap({l: "retry connection", retryCount}, async retryLog => { + await connection.logItem.wrap({l: "retry connection", retryCount}, async retryLog => { log.refDetached(retryLog); if (retryCount <= 3) { await this.callIfNeeded(retryLog); } else { - const disconnectLogItem = this.disconnect(false); + const disconnectLogItem = await this.disconnect(false); if (disconnectLogItem) { retryLog.refDetached(disconnectLogItem); } @@ -444,6 +444,7 @@ export class Member { private _createPeerCall(callId: string): PeerCall { const connection = this.connection!; return new PeerCall(callId, Object.assign({}, this.options, { + errorBoundary: this.errorBoundary, emitUpdate: this.emitUpdateFromPeerCall, sendSignallingMessage: this.sendSignallingMessage, turnServer: connection.turnServer diff --git a/src/utils/ErrorBoundary.ts b/src/utils/ErrorBoundary.ts index a74da479..750385c7 100644 --- a/src/utils/ErrorBoundary.ts +++ b/src/utils/ErrorBoundary.ts @@ -14,8 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -export const ErrorValue = Symbol("ErrorBoundary:Error"); - export class ErrorBoundary { private _error?: Error; @@ -23,7 +21,7 @@ export class ErrorBoundary { /** * Executes callback() and then runs errorCallback() on error. - * This will never throw but instead return `errorValue` if an error occured. + * This will never throw but instead return `errorValue` if an error occurred. */ try(callback: () => T, errorValue?: E): T | typeof errorValue; try(callback: () => Promise, errorValue?: E): Promise | typeof errorValue { @@ -44,7 +42,7 @@ export class ErrorBoundary { } } - private reportError(err: Error) { + reportError(err: Error) { try { this.errorCallback(err); } catch (err) {