This commit is contained in:
RMidhunSuresh 2023-02-07 19:15:36 +05:30
parent 7eae171ac9
commit 6d800ff359
No known key found for this signature in database
4 changed files with 76 additions and 55 deletions

View File

@ -44,6 +44,8 @@ import type {
SDPStreamMetadata,
SignallingMessage
} from "./callEventTypes";
import type { ErrorBoundary } from "../../utils/ErrorBoundary";
import { AbortError } from "../../utils/error";
export type Options = {
webRTC: WebRTC,
@ -51,6 +53,7 @@ export type Options = {
turnServer: BaseObservableValue<RTCIceServer>,
createTimeout: TimeoutCreator,
emitUpdate: (peerCall: PeerCall, params: any, log: ILogItem) => void;
errorBoundary: ErrorBoundary;
sendSignallingMessage: (message: SignallingMessage<MCallBase>, log: ILogItem) => Promise<void>;
};
@ -125,31 +128,34 @@ export class PeerCall implements IDisposable {
this.logItem.log({l: "updating turn server", turnServer})
this.peerConnection.setConfiguration({iceServers: [turnServer]});
}));
const listen = <K extends keyof PeerConnectionEventMap>(type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void => {
this.peerConnection.addEventListener(type, listener);
const listen = <K extends keyof PeerConnectionEventMap>(type: K, listener: (ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void => {
const newListener = (e) => {
this.options.errorBoundary.try(() => listener(e));
};
this.peerConnection.addEventListener(type, newListener);
const dispose = () => {
this.peerConnection.removeEventListener(type, listener);
this.peerConnection.removeEventListener(type, newListener);
};
this.disposables.track(dispose);
};
listen("iceconnectionstatechange", () => {
listen("iceconnectionstatechange", async () => {
const state = this.peerConnection.iceConnectionState;
logItem.wrap({l: "onIceConnectionStateChange", status: state}, log => {
this.onIceConnectionStateChange(state, log);
await logItem.wrap({l: "onIceConnectionStateChange", status: state}, async log => {
await this.onIceConnectionStateChange(state, log);
});
});
listen("icecandidate", event => {
logItem.wrap("onLocalIceCandidate", log => {
listen("icecandidate", async (event) => {
await logItem.wrap("onLocalIceCandidate", async log => {
if (event.candidate) {
this.handleLocalIceCandidate(event.candidate, log);
await this.handleLocalIceCandidate(event.candidate, log);
}
});
});
listen("icegatheringstatechange", () => {
listen("icegatheringstatechange", async () => {
const state = this.peerConnection.iceGatheringState;
logItem.wrap({l: "onIceGatheringStateChange", status: state}, log => {
this.handleIceGatheringState(state, log);
await logItem.wrap({l: "onIceGatheringStateChange", status: state}, async log => {
await this.handleIceGatheringState(state, log);
});
});
listen("track", event => {
@ -422,14 +428,18 @@ export class PeerCall implements IDisposable {
log.refDetached(timeoutLog);
// don't await this, as it would block other negotationneeded events from being processed
// as they are processed serially
timeoutLog.run(async log => {
try { await this.delay(CALL_TIMEOUT_MS); }
catch (err) { return; }
// @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between
if (this._state === CallState.InviteSent) {
this._hangup(CallErrorCode.InviteTimeout, log);
}
}).catch(err => {}); // prevent error from being unhandled, it will be logged already by run above
try {
await timeoutLog.run(async log => {
await this.delay(CALL_TIMEOUT_MS);
// @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between
if (this._state === CallState.InviteSent) {
await this._hangup(CallErrorCode.InviteTimeout, log);
}
});
}
catch (e) {
// prevent error from being unhandled, it will be logged already by run above
}
}
};
@ -579,7 +589,7 @@ export class PeerCall implements IDisposable {
}
}
private handleIceGatheringState(state: RTCIceGatheringState, log: ILogItem) {
private async handleIceGatheringState(state: RTCIceGatheringState, log: ILogItem) {
if (state === 'complete' && !this.sentEndOfCandidates) {
// If we didn't get an empty-string candidate to signal the end of candidates,
// create one ourselves now gathering has finished.
@ -591,12 +601,12 @@ export class PeerCall implements IDisposable {
const c = {
candidate: '',
} as RTCIceCandidate;
this.queueCandidate(c, log);
await this.queueCandidate(c, log);
this.sentEndOfCandidates = true;
}
}
private handleLocalIceCandidate(candidate: RTCIceCandidate, log: ILogItem) {
private async handleLocalIceCandidate(candidate: RTCIceCandidate, log: ILogItem) {
log.set("sdpMid", candidate.sdpMid);
log.set("candidate", candidate.candidate);
@ -606,7 +616,7 @@ export class PeerCall implements IDisposable {
// As with the offer, note we need to make a copy of this object, not
// pass the original: that broke in Chrome ~m43.
if (candidate.candidate !== '' || !this.sentEndOfCandidates) {
this.queueCandidate(candidate, log);
await this.queueCandidate(candidate, log);
if (candidate.candidate === '') {
this.sentEndOfCandidates = true;
}
@ -841,13 +851,19 @@ export class PeerCall implements IDisposable {
logStats = true;
this.iceDisconnectedTimeout?.abort();
this.iceDisconnectedTimeout = undefined;
this._hangup(CallErrorCode.IceFailed, log);
await this._hangup(CallErrorCode.IceFailed, log);
} else if (state == 'disconnected') {
logStats = true;
this.iceDisconnectedTimeout = this.options.createTimeout(30 * 1000);
this.iceDisconnectedTimeout.elapsed().then(() => {
this._hangup(CallErrorCode.IceFailed, log);
}, () => { /* ignore AbortError */ });
try {
await this.iceDisconnectedTimeout.elapsed()
await this._hangup(CallErrorCode.IceFailed, log);
}
catch (e){
if (!(e instanceof AbortError)) {
throw e;
}
}
}
if (logStats) {
const stats = await this.peerConnection.getStats();

View File

@ -133,7 +133,8 @@ export class GroupCall extends EventEmitter<{change: never}> {
},
encryptDeviceMessage: (userId: string, deviceId: string, message: SignallingMessage<MGroupCallBase>, log) => {
return this.options.encryptDeviceMessage(this.roomId, userId, deviceId, message, log);
}
},
groupCallErrorBoundary: this.errorBoundary,
});
}
@ -392,8 +393,8 @@ export class GroupCall extends EventEmitter<{change: never}> {
/** @internal */
updateMembership(userId: string, roomMember: RoomMember, callMembership: CallMembership, syncLog: ILogItem) {
this.errorBoundary.try(() => {
syncLog.wrap({l: "update call membership", t: CALL_LOG_TYPE, id: this.id, userId}, log => {
this.errorBoundary.try(async () => {
await syncLog.wrap({l: "update call membership", t: CALL_LOG_TYPE, id: this.id, userId}, async log => {
const now = this.options.clock.now();
const devices = callMembership["m.devices"];
const previousDeviceIds = this.getDeviceIdsForUserId(userId);
@ -415,7 +416,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
}
});
} else {
log.wrap({l: "update device membership", id: memberKey, sessionId: device.session_id}, log => {
await log.wrap({l: "update device membership", id: memberKey, sessionId: device.session_id}, async log => {
if (isMemberExpired(device, now)) {
log.set("expired", true);
const member = this._members.get(memberKey);
@ -434,7 +435,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
} else {
if (member && sessionIdChanged) {
log.set("removedSessionId", member.sessionId);
const disconnectLogItem = member.disconnect(false);
const disconnectLogItem = await member.disconnect(false);
if (disconnectLogItem) {
log.refDetached(disconnectLogItem);
}
@ -528,11 +529,11 @@ export class GroupCall extends EventEmitter<{change: never}> {
}
/** @internal */
disconnect(log: ILogItem): boolean {
return this.errorBoundary.try(() => {
disconnect(log: ILogItem): Promise<void> | true {
return this.errorBoundary.try(async () => {
if (this.hasJoined) {
for (const member of this._members.values()) {
const disconnectLogItem = member.disconnect(true);
const disconnectLogItem = await member.disconnect(true);
if (disconnectLogItem) {
log.refDetached(disconnectLogItem);
}
@ -546,13 +547,13 @@ export class GroupCall extends EventEmitter<{change: never}> {
}
/** @internal */
private removeMemberDevice(userId: string, deviceId: string, log: ILogItem) {
private async removeMemberDevice(userId: string, deviceId: string, log: ILogItem) {
const memberKey = getMemberKey(userId, deviceId);
log.wrap({l: "remove device member", id: memberKey}, log => {
await log.wrap({l: "remove device member", id: memberKey}, async log => {
const member = this._members.get(memberKey);
if (member) {
log.set("leave", true);
const disconnectLogItem = member.disconnect(false);
const disconnectLogItem = await member.disconnect(false);
if (disconnectLogItem) {
log.refDetached(disconnectLogItem);
}
@ -634,15 +635,15 @@ export class GroupCall extends EventEmitter<{change: never}> {
return stateContent;
}
private connectToMember(member: Member, joinedData: JoinedData, log: ILogItem) {
private async connectToMember(member: Member, joinedData: JoinedData, log: ILogItem) {
const memberKey = getMemberKey(member.userId, member.deviceId);
const logItem = joinedData.membersLogItem.child({
l: "member",
id: memberKey,
sessionId: member.sessionId
});
log.wrap({l: "connect", id: memberKey}, log => {
const connectItem = member.connect(
await log.wrap({l: "connect", id: memberKey}, async log => {
const connectItem = await member.connect(
joinedData.localMedia,
joinedData.localMuteSettings,
joinedData.turnServer,

View File

@ -42,6 +42,7 @@ export type Options = Omit<PeerCallOptions, "emitUpdate" | "sendSignallingMessag
hsApi: HomeServerApi,
encryptDeviceMessage: (userId: string, deviceId: string, message: SignallingMessage<MGroupCallBase>, log: ILogItem) => Promise<EncryptedMessage | undefined>,
emitUpdate: (participant: Member, params?: any) => void,
groupCallErrorBoundary: ErrorBoundary,
clock: Clock
}
@ -183,8 +184,8 @@ export class Member {
}
/** @internal */
connect(localMedia: LocalMedia, localMuteSettings: MuteSettings, turnServer: BaseObservableValue<RTCIceServer>, memberLogItem: ILogItem): ILogItem | undefined {
return this.errorBoundary.try(() => {
connect(localMedia: LocalMedia, localMuteSettings: MuteSettings, turnServer: BaseObservableValue<RTCIceServer>, memberLogItem: ILogItem): Promise<ILogItem | undefined> | undefined {
return this.errorBoundary.try(async () => {
if (this.connection) {
return;
}
@ -197,7 +198,7 @@ export class Member {
);
this.connection = connection;
let connectLogItem: ILogItem | undefined;
connection.logItem.wrap("connect", async log => {
await connection.logItem.wrap("connect", async log => {
connectLogItem = log;
await this.callIfNeeded(log);
});
@ -230,15 +231,15 @@ export class Member {
}
/** @internal */
disconnect(hangup: boolean): ILogItem | undefined {
return this.errorBoundary.try(() => {
disconnect(hangup: boolean): Promise<ILogItem | undefined> | undefined {
return this.errorBoundary.try(async () => {
const {connection} = this;
if (!connection) {
return;
}
let disconnectLogItem;
let disconnectLogItem: ILogItem | undefined;
// if if not sending the hangup, still log disconnect
connection.logItem.wrap("disconnect", async log => {
await connection.logItem.wrap("disconnect", async log => {
disconnectLogItem = log;
if (hangup && connection.peerCall) {
await connection.peerCall.hangup(CallErrorCode.UserHangup, log);
@ -269,7 +270,7 @@ export class Member {
}
/** @internal */
emitUpdateFromPeerCall = (peerCall: PeerCall, params: any, log: ILogItem): void => {
emitUpdateFromPeerCall = async (peerCall: PeerCall, params: any, log: ILogItem): Promise<void> => {
const connection = this.connection!;
if (peerCall.state === CallState.Ringing) {
connection.logItem.wrap("ringing, answer peercall", answerLog => {
@ -284,12 +285,12 @@ export class Member {
if (hangupReason && !errorCodesWithoutRetry.includes(hangupReason)) {
connection.retryCount += 1;
const {retryCount} = connection;
connection.logItem.wrap({l: "retry connection", retryCount}, async retryLog => {
await connection.logItem.wrap({l: "retry connection", retryCount}, async retryLog => {
log.refDetached(retryLog);
if (retryCount <= 3) {
await this.callIfNeeded(retryLog);
} else {
const disconnectLogItem = this.disconnect(false);
const disconnectLogItem = await this.disconnect(false);
if (disconnectLogItem) {
retryLog.refDetached(disconnectLogItem);
}
@ -303,6 +304,10 @@ export class Member {
/** @internal */
sendSignallingMessage = async (message: SignallingMessage<MCallBase>, log: ILogItem): Promise<void> => {
const groupMessage = message as SignallingMessage<MGroupCallBase>;
if (this.connection?.peerCall?.state === CallState.CreateOffer) {
// @ts-ignore
this.connection!.foobar.barfpp;
}
groupMessage.content.seq = this.connection!.outboundSeqCounter++;
groupMessage.content.conf_id = this.options.confId;
groupMessage.content.device_id = this.options.ownDeviceId;
@ -421,6 +426,7 @@ export class Member {
private _createPeerCall(callId: string): PeerCall {
const connection = this.connection!;
return new PeerCall(callId, Object.assign({}, this.options, {
errorBoundary: this.options.groupCallErrorBoundary,
emitUpdate: this.emitUpdateFromPeerCall,
sendSignallingMessage: this.sendSignallingMessage,
turnServer: connection.turnServer

View File

@ -14,8 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
export const ErrorValue = Symbol("ErrorBoundary:Error");
export class ErrorBoundary {
private _error?: Error;
@ -23,7 +21,7 @@ export class ErrorBoundary {
/**
* Executes callback() and then runs errorCallback() on error.
* This will never throw but instead return `errorValue` if an error occured.
* This will never throw but instead return `errorValue` if an error occurred.
*/
try<T, E>(callback: () => T, errorValue?: E): T | typeof errorValue;
try<T, E>(callback: () => Promise<T>, errorValue?: E): Promise<T | typeof errorValue> | typeof errorValue {