2019-07-26 22:03:57 +02:00
|
|
|
import SortedArray from "../../../observable/list/SortedArray.js";
|
|
|
|
import {NetworkError} from "../../error.js";
|
|
|
|
import PendingEvent from "./PendingEvent.js";
|
2019-07-01 10:00:29 +02:00
|
|
|
|
2019-07-26 22:03:57 +02:00
|
|
|
function makeTxnId() {
|
|
|
|
const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
|
|
|
|
const str = n.toString(16);
|
|
|
|
return "t" + "0".repeat(14 - str.length) + str;
|
2019-07-01 10:00:29 +02:00
|
|
|
}
|
|
|
|
|
2019-07-26 22:03:57 +02:00
|
|
|
export default class SendQueue {
|
2019-07-26 22:33:33 +02:00
|
|
|
constructor({roomId, storage, sendScheduler, pendingEvents}) {
|
|
|
|
pendingEvents = pendingEvents || [];
|
2019-07-26 22:03:57 +02:00
|
|
|
this._roomId = roomId;
|
|
|
|
this._storage = storage;
|
2019-07-26 22:33:33 +02:00
|
|
|
this._sendScheduler = sendScheduler;
|
2019-07-26 22:03:57 +02:00
|
|
|
this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex);
|
2019-07-27 10:40:56 +02:00
|
|
|
if (pendingEvents.length) {
|
|
|
|
console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents);
|
|
|
|
}
|
2019-07-29 09:54:34 +02:00
|
|
|
this._pendingEvents.setManyUnsorted(pendingEvents.map(data => new PendingEvent(data)));
|
2019-07-26 22:03:57 +02:00
|
|
|
this._isSending = false;
|
2019-06-28 00:52:54 +02:00
|
|
|
this._offline = false;
|
2019-07-26 22:03:57 +02:00
|
|
|
this._amountSent = 0;
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
|
|
|
|
2019-07-01 10:00:29 +02:00
|
|
|
async _sendLoop() {
|
2019-07-26 22:03:57 +02:00
|
|
|
this._isSending = true;
|
|
|
|
try {
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("start sending", this._amountSent, "<", this._pendingEvents.length);
|
2019-07-26 22:03:57 +02:00
|
|
|
while (this._amountSent < this._pendingEvents.length) {
|
|
|
|
const pendingEvent = this._pendingEvents.get(this._amountSent);
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("trying to send", pendingEvent.content.body);
|
2019-07-26 22:03:57 +02:00
|
|
|
this._amountSent += 1;
|
|
|
|
if (pendingEvent.remoteId) {
|
|
|
|
continue;
|
2019-07-01 10:00:29 +02:00
|
|
|
}
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("really sending now");
|
2019-07-26 22:33:33 +02:00
|
|
|
const response = await this._sendScheduler.request(hsApi => {
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("got sendScheduler slot");
|
2019-07-26 22:03:57 +02:00
|
|
|
return hsApi.send(
|
|
|
|
pendingEvent.roomId,
|
|
|
|
pendingEvent.eventType,
|
|
|
|
pendingEvent.txnId,
|
|
|
|
pendingEvent.content
|
|
|
|
);
|
|
|
|
});
|
|
|
|
pendingEvent.remoteId = response.event_id;
|
2019-07-27 10:40:56 +02:00
|
|
|
//
|
|
|
|
console.log("writing remoteId now");
|
2019-07-26 22:03:57 +02:00
|
|
|
await this._tryUpdateEvent(pendingEvent);
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("keep sending?", this._amountSent, "<", this._pendingEvents.length);
|
2019-07-01 10:00:29 +02:00
|
|
|
}
|
2019-07-26 22:03:57 +02:00
|
|
|
} catch(err) {
|
|
|
|
if (err instanceof NetworkError) {
|
|
|
|
this._offline = true;
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
2019-07-26 22:03:57 +02:00
|
|
|
} finally {
|
|
|
|
this._isSending = false;
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-26 22:33:33 +02:00
|
|
|
removeRemoteEchos(events, txn) {
|
|
|
|
const removed = [];
|
|
|
|
for (const event of events) {
|
|
|
|
const txnId = event.unsigned && event.unsigned.transaction_id;
|
|
|
|
if (txnId) {
|
|
|
|
const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId);
|
|
|
|
if (idx !== -1) {
|
|
|
|
const pendingEvent = this._pendingEvents.get(idx);
|
|
|
|
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
|
|
|
|
removed.push(pendingEvent);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return removed;
|
|
|
|
}
|
2019-06-28 00:52:54 +02:00
|
|
|
|
2019-07-26 22:33:33 +02:00
|
|
|
emitRemovals(pendingEvents) {
|
|
|
|
for (const pendingEvent of pendingEvents) {
|
|
|
|
const idx = this._pendingEvents.array.indexOf(pendingEvent);
|
|
|
|
if (idx !== -1) {
|
|
|
|
this._amountSent -= 1;
|
|
|
|
this._pendingEvents.remove(idx);
|
|
|
|
}
|
2019-07-26 22:03:57 +02:00
|
|
|
}
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
|
|
|
|
2019-07-26 22:03:57 +02:00
|
|
|
resumeSending() {
|
|
|
|
this._offline = false;
|
|
|
|
if (!this._isSending) {
|
|
|
|
this._sendLoop();
|
|
|
|
}
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|
2019-07-01 10:00:29 +02:00
|
|
|
|
2019-07-26 22:03:57 +02:00
|
|
|
async enqueueEvent(eventType, content) {
|
|
|
|
const pendingEvent = await this._createAndStoreEvent(eventType, content);
|
|
|
|
this._pendingEvents.set(pendingEvent);
|
2019-09-15 12:25:14 +02:00
|
|
|
console.log("added to _pendingEvents set", this._pendingEvents.length);
|
2019-07-26 22:03:57 +02:00
|
|
|
if (!this._isSending && !this._offline) {
|
|
|
|
this._sendLoop();
|
|
|
|
}
|
2019-07-01 10:00:29 +02:00
|
|
|
}
|
2019-06-28 00:52:54 +02:00
|
|
|
|
2019-07-26 22:03:57 +02:00
|
|
|
get pendingEvents() {
|
|
|
|
return this._pendingEvents;
|
2019-07-01 10:00:29 +02:00
|
|
|
}
|
|
|
|
|
2019-07-26 22:03:57 +02:00
|
|
|
async _tryUpdateEvent(pendingEvent) {
|
|
|
|
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("_tryUpdateEvent: got txn");
|
2019-07-26 22:03:57 +02:00
|
|
|
try {
|
|
|
|
// pendingEvent might have been removed already here
|
|
|
|
// by a racing remote echo, so check first so we don't recreate it
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("_tryUpdateEvent: before exists");
|
2019-07-26 22:03:57 +02:00
|
|
|
if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) {
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("_tryUpdateEvent: inside if exists");
|
2019-07-26 22:03:57 +02:00
|
|
|
txn.pendingEvents.update(pendingEvent.data);
|
|
|
|
}
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("_tryUpdateEvent: after exists");
|
2019-07-26 22:03:57 +02:00
|
|
|
} catch (err) {
|
|
|
|
txn.abort();
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("_tryUpdateEvent: error", err);
|
2019-07-26 22:03:57 +02:00
|
|
|
throw err;
|
|
|
|
}
|
2019-07-27 10:40:56 +02:00
|
|
|
console.log("_tryUpdateEvent: try complete");
|
2019-07-26 22:03:57 +02:00
|
|
|
await txn.complete();
|
|
|
|
}
|
2019-07-01 10:00:29 +02:00
|
|
|
|
2019-07-26 22:03:57 +02:00
|
|
|
async _createAndStoreEvent(eventType, content) {
|
2019-09-15 12:25:14 +02:00
|
|
|
console.log("_createAndStoreEvent");
|
2019-07-01 10:00:29 +02:00
|
|
|
const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
|
2019-07-26 22:03:57 +02:00
|
|
|
let pendingEvent;
|
|
|
|
try {
|
|
|
|
const pendingEventsStore = txn.pendingEvents;
|
2019-09-15 12:25:14 +02:00
|
|
|
console.log("_createAndStoreEvent getting maxQueueIndex");
|
2019-07-26 22:03:57 +02:00
|
|
|
const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0;
|
2019-09-15 12:25:14 +02:00
|
|
|
console.log("_createAndStoreEvent got maxQueueIndex", maxQueueIndex);
|
2019-07-26 22:03:57 +02:00
|
|
|
const queueIndex = maxQueueIndex + 1;
|
|
|
|
pendingEvent = new PendingEvent({
|
|
|
|
roomId: this._roomId,
|
|
|
|
queueIndex,
|
|
|
|
eventType,
|
|
|
|
content,
|
|
|
|
txnId: makeTxnId()
|
|
|
|
});
|
2019-09-15 12:25:14 +02:00
|
|
|
console.log("_createAndStoreEvent: adding to pendingEventsStore");
|
2019-07-26 22:03:57 +02:00
|
|
|
pendingEventsStore.add(pendingEvent.data);
|
|
|
|
} catch (err) {
|
|
|
|
txn.abort();
|
|
|
|
throw err;
|
|
|
|
}
|
2019-07-01 10:00:29 +02:00
|
|
|
await txn.complete();
|
2019-07-26 22:03:57 +02:00
|
|
|
return pendingEvent;
|
2019-07-01 10:00:29 +02:00
|
|
|
}
|
2019-06-28 00:52:54 +02:00
|
|
|
}
|