Implement in-room message channel

This commit is contained in:
RMidhunSuresh 2023-04-04 16:02:50 +05:30
parent d95badc6d0
commit 4f302b0470
9 changed files with 275 additions and 20 deletions

View File

@ -80,6 +80,8 @@ export class RoomViewModel extends ErrorReportViewModel {
this.logAndCatch("RoomViewModel.load", async log => { this.logAndCatch("RoomViewModel.load", async log => {
this._room.on("change", this._onRoomChange); this._room.on("change", this._onRoomChange);
const timeline = await this._room.openTimeline(log); const timeline = await this._room.openTimeline(log);
timeline.retain();
this.track(() => timeline.release());
this._tileOptions = this.childOptions({ this._tileOptions = this.childOptions({
session: this.getOption("session"), session: this.getOption("session"),
roomVM: this, roomVM: this,

View File

@ -545,7 +545,8 @@ export class BaseRoom extends EventEmitter {
return this._platform.logger.wrapOrRun(log, "open timeline", async log => { return this._platform.logger.wrapOrRun(log, "open timeline", async log => {
log.set("id", this.id); log.set("id", this.id);
if (this._timeline) { if (this._timeline) {
throw new Error("not dealing with load race here for now"); log.log({ l: "Returning existing timeline" });
return this._timeline;
} }
this._timeline = new Timeline({ this._timeline = new Timeline({
roomId: this.id, roomId: this.id,
@ -610,7 +611,6 @@ export class BaseRoom extends EventEmitter {
dispose() { dispose() {
this._roomEncryption?.dispose(); this._roomEncryption?.dispose();
this._timeline?.dispose();
} }
} }

View File

@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
import {EventEmitter} from "../../../utils/EventEmitter";
import {createEnum} from "../../../utils/enum"; import {createEnum} from "../../../utils/enum";
import {AbortError} from "../../../utils/error"; import {AbortError} from "../../../utils/error";
import {REDACTION_TYPE} from "../common"; import {REDACTION_TYPE} from "../common";
@ -30,8 +31,9 @@ export const SendStatus = createEnum(
const unencryptedContentFields = [ "m.relates_to" ]; const unencryptedContentFields = [ "m.relates_to" ];
export class PendingEvent { export class PendingEvent extends EventEmitter {
constructor({data, remove, emitUpdate, attachments}) { constructor({data, remove, emitUpdate, attachments}) {
super();
this._data = data; this._data = data;
this._attachments = attachments; this._attachments = attachments;
this._emitUpdate = emitUpdate; this._emitUpdate = emitUpdate;
@ -228,6 +230,7 @@ export class PendingEvent {
this._sendRequest = null; this._sendRequest = null;
// both /send and /redact have the same response format // both /send and /redact have the same response format
this._data.remoteId = response.event_id; this._data.remoteId = response.event_id;
this.emit("remote-id", this.remoteId);
log.set("id", this._data.remoteId); log.set("id", this._data.remoteId);
this._status = SendStatus.Sent; this._status = SendStatus.Sent;
this._emitUpdate("status"); this._emitUpdate("status");

View File

@ -225,7 +225,7 @@ export class SendQueue {
} }
} }
} }
await this._enqueueEvent(eventType, content, attachments, relatedTxnId, null, log); return await this._enqueueEvent(eventType, content, attachments, relatedTxnId, null, log);
} }
async _enqueueEvent(eventType, content, attachments, relatedTxnId, relatedEventId, log) { async _enqueueEvent(eventType, content, attachments, relatedTxnId, relatedEventId, log) {
@ -239,6 +239,7 @@ export class SendQueue {
if (this._sendLoopLogItem) { if (this._sendLoopLogItem) {
log.refDetached(this._sendLoopLogItem); log.refDetached(this._sendLoopLogItem);
} }
return pendingEvent;
} }
async enqueueRedaction(eventIdOrTxnId, reason, log) { async enqueueRedaction(eventIdOrTxnId, reason, log) {

View File

@ -25,9 +25,13 @@ import {getRelation, ANNOTATION_RELATION_TYPE} from "./relations.js";
import {REDACTION_TYPE} from "../common"; import {REDACTION_TYPE} from "../common";
import {NonPersistedEventEntry} from "./entries/NonPersistedEventEntry.js"; import {NonPersistedEventEntry} from "./entries/NonPersistedEventEntry.js";
import {EVENT_TYPE as MEMBER_EVENT_TYPE} from "../members/RoomMember.js"; import {EVENT_TYPE as MEMBER_EVENT_TYPE} from "../members/RoomMember.js";
import {RetainedValue} from "../../../utils/RetainedValue";
export class Timeline { export class Timeline extends RetainedValue {
constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, clock, powerLevelsObservable, hsApi}) { constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, clock, powerLevelsObservable, hsApi}) {
super(() => {
this.dispose();
});
this._roomId = roomId; this._roomId = roomId;
this._storage = storage; this._storage = storage;
this._closeCallback = closeCallback; this._closeCallback = closeCallback;

View File

@ -18,6 +18,7 @@ import {REDACTION_TYPE} from "../common";
export const REACTION_TYPE = "m.reaction"; export const REACTION_TYPE = "m.reaction";
export const ANNOTATION_RELATION_TYPE = "m.annotation"; export const ANNOTATION_RELATION_TYPE = "m.annotation";
export const REFERENCE_RELATION_TYPE = "m.reference";
export function createAnnotation(targetId, key) { export function createAnnotation(targetId, key) {
return { return {
@ -29,6 +30,15 @@ export function createAnnotation(targetId, key) {
}; };
} }
export function createReference(targetId) {
return {
"m.relates_to": {
"event_id": targetId,
"rel_type": REFERENCE_RELATION_TYPE
}
};
}
export function getRelationTarget(relation) { export function getRelationTarget(relation) {
return relation.event_id || relation["m.in_reply_to"]?.event_id return relation.event_id || relation["m.in_reply_to"]?.event_id
} }

View File

@ -17,6 +17,20 @@ limitations under the License.
import type {ILogItem} from "../../../../logging/types"; import type {ILogItem} from "../../../../logging/types";
import {CancelReason, VerificationEventType} from "./types"; import {CancelReason, VerificationEventType} from "./types";
export const messageFromErrorType = {
[CancelReason.UserCancelled]: "User declined",
[CancelReason.InvalidMessage]: "Invalid Message.",
[CancelReason.KeyMismatch]: "Key Mismatch.",
[CancelReason.OtherDeviceAccepted]: "Another device has accepted this request.",
[CancelReason.TimedOut]: "Timed Out",
[CancelReason.UnexpectedMessage]: "Unexpected Message.",
[CancelReason.UnknownMethod]: "Unknown method.",
[CancelReason.UnknownTransaction]: "Unknown Transaction.",
[CancelReason.UserMismatch]: "User Mismatch",
[CancelReason.MismatchedCommitment]: "Hash commitment does not match.",
[CancelReason.MismatchedSAS]: "Emoji/decimal does not match.",
}
export interface IChannel { export interface IChannel {
send(eventType: VerificationEventType, content: any, log: ILogItem): Promise<void>; send(eventType: VerificationEventType, content: any, log: ILogItem): Promise<void>;
waitForEvent(eventType: VerificationEventType): Promise<any>; waitForEvent(eventType: VerificationEventType): Promise<any>;

View File

@ -0,0 +1,235 @@
/*
Copyright 2023 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import type {HomeServerApi} from "../../../net/HomeServerApi";
import type {ILogItem} from "../../../../logging/types";
import type {IChannel} from "./IChannel";
import type {Room} from "../../../room/Room.js";
import type {EventEntry} from "../../../room/timeline/entries/EventEntry.js";
import {messageFromErrorType} from "./IChannel";
import {CancelReason, VerificationEventType} from "./types";
import {Disposables} from "../../../../utils/Disposables";
import {VerificationCancelledError} from "../VerificationCancelledError";
import {Deferred} from "../../../../utils/Deferred";
import {getRelatedEventId, createReference} from "../../../room/timeline/relations.js";
type Options = {
hsApi: HomeServerApi;
otherUserId: string;
log: ILogItem;
ourUserDeviceId: string;
room: Room;
}
export class RoomChannel extends Disposables implements IChannel {
private readonly hsApi: HomeServerApi;
private ourDeviceId: string;
private readonly otherUserId: string;
private readonly sentMessages: Map<VerificationEventType, any> = new Map();
private readonly receivedMessages: Map<VerificationEventType, any> = new Map();
private readonly waitMap: Map<string, Deferred<any>> = new Map();
private readonly log: ILogItem;
private readonly room: Room;
public otherUserDeviceId: string;
public startMessage: any;
/**
* This is the event-id of the starting message (request/start)
*/
public id: string;
private _initiatedByUs: boolean;
private _cancellation?: { code: CancelReason, cancelledByUs: boolean };
/**
*
* @param startingMessage Create the channel with existing message in the receivedMessage buffer
*/
constructor(options: Options, startingMessage?: any) {
super();
this.hsApi = options.hsApi;
this.otherUserId = options.otherUserId;
this.ourDeviceId = options.ourUserDeviceId;
this.log = options.log;
this.room = options.room;
this.subscribeToTimeline();
this.track(() => {
this.waitMap.forEach((value) => {
value.reject(new VerificationCancelledError());
});
});
// Copy over request message
if (startingMessage) {
/**
* startingMessage may be the ready message or the start message.
*/
this.id = startingMessage.content.transaction_id;
this.receivedMessages.set(startingMessage.type, startingMessage);
this.otherUserDeviceId = startingMessage.content.from_device;
}
}
private async subscribeToTimeline() {
const timeline = await this.room.openTimeline();
timeline.retain();
this.track(() => timeline.release());
this.track(
timeline.entries.subscribe({
onAdd: async (_, entry: EventEntry) => {
this.handleRoomMessage(entry);
},
onRemove: () => { /** noop */ },
onUpdate: () => { /** noop */ },
})
);
}
get cancellation(): IChannel["cancellation"] {
return this._cancellation;
};
get isCancelled(): boolean {
return !!this._cancellation;
}
async send(eventType: VerificationEventType, content: any, log: ILogItem): Promise<void> {
await log.wrap("RoomChannel.send", async () => {
if (this.isCancelled) {
throw new VerificationCancelledError();
}
if (eventType === VerificationEventType.Request) {
// Handle this case specially
await this.handleRequestEventSpecially(eventType, content, log);
return;
}
Object.assign(content, createReference(this.id));
await this.room.sendEvent(eventType, content, undefined, log);
this.sentMessages.set(eventType, {content});
});
}
private async handleRequestEventSpecially(eventType: VerificationEventType, content: any, log: ILogItem) {
await log.wrap("RoomChannel.handleRequestEventSpecially", async () => {
Object.assign(content, {
body: `${this.otherUserId} is requesting to verify your key, but your client does not support in-chat key verification. You will need to use legacy key verification to verify keys.`,
msgtype: VerificationEventType.Request,
to: this.otherUserId,
});
const pendingEvent = await this.room.sendEvent("m.room.message", content, undefined, log);
this.track(pendingEvent.disposableOn("remote-id", (id: string) => { this.id = id; }));
this.sentMessages.set(eventType, {content});
});
}
getReceivedMessage(event: VerificationEventType) {
return this.receivedMessages.get(event);
}
getSentMessage(event: VerificationEventType) {
return this.sentMessages.get(event);
}
get acceptMessage(): any {
return this.receivedMessages.get(VerificationEventType.Accept) ??
this.sentMessages.get(VerificationEventType.Accept);
}
private async handleRoomMessage(entry: EventEntry) {
const type = entry.content.msgtype ?? entry.eventType;
if (!type.startsWith("m.key.verification")) {
return;
}
await this.log.wrap("RoomChannel.handleRoomMessage", async (log) => {
console.log("entry", entry);
log.log({ l: "entry", entry });
if (!this.id) {
throw new Error("Couldn't find event-id of request message!");
}
if (getRelatedEventId(entry) !== this.id) {
/**
* When a device receives an unknown transaction_id, it should send an appropriate
* m.key.verification.cancel message to the other device indicating as such.
* This does not apply for inbound m.key.verification.start or m.key.verification.cancel messages.
*/
console.log("Received entry with unknown transaction id: ", entry);
await this.cancelVerification(CancelReason.UnknownTransaction);
return;
}
this.resolveAnyWaits(entry);
this.receivedMessages.set(entry.eventType, entry);
if (entry.eventType === VerificationEventType.Ready) {
const fromDevice = entry.content.from_device;
this.otherUserDeviceId = fromDevice;
return;
}
if (entry.eventType === VerificationEventType.Cancel) {
this._cancellation = { code: entry.content.code, cancelledByUs: false };
this.dispose();
return;
}
});
}
async cancelVerification(cancellationType: CancelReason) {
await this.log.wrap("RoomChannel.cancelVerification", async log => {
if (this.isCancelled) {
throw new VerificationCancelledError();
}
const content = {
code: cancellationType,
reason: messageFromErrorType[cancellationType],
}
await this.send(VerificationEventType.Cancel, content, log);
this._cancellation = { code: cancellationType, cancelledByUs: true };
this.dispose();
});
}
private resolveAnyWaits(entry: EventEntry) {
const { eventType } = entry;
const wait = this.waitMap.get(eventType);
if (wait) {
wait.resolve(entry);
this.waitMap.delete(eventType);
}
}
waitForEvent(eventType: VerificationEventType): Promise<any> {
if (this.isCancelled) {
throw new VerificationCancelledError();
}
// Check if we already received the message
const receivedMessage = this.receivedMessages.get(eventType);
if (receivedMessage) {
return Promise.resolve(receivedMessage);
}
// Check if we're already waiting for this message
const existingWait = this.waitMap.get(eventType);
if (existingWait) {
return existingWait.promise;
}
const deferred = new Deferred();
this.waitMap.set(eventType, deferred);
return deferred.promise;
}
setStartMessage(event) {
this.startMessage = event;
this._initiatedByUs = event.content.from_device === this.ourDeviceId;
}
get initiatedByUs(): boolean {
return this._initiatedByUs;
};
}

View File

@ -20,27 +20,13 @@ import type {ILogItem} from "../../../../logging/types";
import type {Clock} from "../../../../platform/web/dom/Clock.js"; import type {Clock} from "../../../../platform/web/dom/Clock.js";
import type {DeviceMessageHandler} from "../../../DeviceMessageHandler.js"; import type {DeviceMessageHandler} from "../../../DeviceMessageHandler.js";
import type {IChannel} from "./IChannel"; import type {IChannel} from "./IChannel";
import {messageFromErrorType} from "./IChannel";
import {makeTxnId} from "../../../common.js"; import {makeTxnId} from "../../../common.js";
import {CancelReason, VerificationEventType} from "./types"; import {CancelReason, VerificationEventType} from "./types";
import {Disposables} from "../../../../utils/Disposables"; import {Disposables} from "../../../../utils/Disposables";
import {VerificationCancelledError} from "../VerificationCancelledError"; import {VerificationCancelledError} from "../VerificationCancelledError";
import {Deferred} from "../../../../utils/Deferred"; import {Deferred} from "../../../../utils/Deferred";
const messageFromErrorType = {
[CancelReason.UserCancelled]: "User declined",
[CancelReason.InvalidMessage]: "Invalid Message.",
[CancelReason.KeyMismatch]: "Key Mismatch.",
[CancelReason.OtherDeviceAccepted]: "Another device has accepted this request.",
[CancelReason.TimedOut]: "Timed Out",
[CancelReason.UnexpectedMessage]: "Unexpected Message.",
[CancelReason.UnknownMethod]: "Unknown method.",
[CancelReason.UnknownTransaction]: "Unknown Transaction.",
[CancelReason.UserMismatch]: "User Mismatch",
[CancelReason.MismatchedCommitment]: "Hash commitment does not match.",
[CancelReason.MismatchedSAS]: "Emoji/decimal does not match.",
}
type Options = { type Options = {
hsApi: HomeServerApi; hsApi: HomeServerApi;
deviceTracker: DeviceTracker; deviceTracker: DeviceTracker;