This commit is contained in:
Bruno Windels 2020-04-19 19:02:10 +02:00
parent 1f15ca6498
commit 8c5411cb7d
20 changed files with 538 additions and 230 deletions

View File

@ -36,9 +36,9 @@ rooms should report how many messages they have queued up, and each time they se
# TODO
- finish (Base)ObservableValue
- DONE: finish (Base)ObservableValue
- put in own file
- add waitFor
- add waitFor (won't this leak if the promise never resolves?)
- decide whether we want to inherit (no?)
- cleanup Reconnector with recent changes, move generic code, make imports work
- add SyncStatus as ObservableValue of enum in Sync

View File

@ -24,7 +24,7 @@
"devDependencies": {
"cheerio": "^1.0.0-rc.3",
"finalhandler": "^1.1.1",
"impunity": "^0.0.10",
"impunity": "^0.0.11",
"postcss": "^7.0.18",
"postcss-import": "^12.0.1",
"rollup": "^1.15.6",

View File

@ -1,11 +1,13 @@
import HomeServerApi from "./matrix/hs-api.js";
import HomeServerApi from "./matrix/net/HomeServerApi.js";
// import {RecordRequester, ReplayRequester} from "./matrix/net/replay.js";
import fetchRequest from "./matrix/net/fetch.js";
import {Reconnector} from "./matrix/net/connection/Reconnector.js";
import StorageFactory from "./matrix/storage/idb/create.js";
import SessionsStore from "./matrix/sessions-store/localstorage/SessionsStore.js";
import BrawlViewModel from "./domain/BrawlViewModel.js";
import BrawlView from "./ui/web/BrawlView.js";
import DOMClock from "./utils/DOMClock.js";
import DOMClock from "./ui/web/dom/Clock.js";
import OnlineStatus from "./ui/web/dom/OnlineStatus.js";
export default async function main(container) {
try {
@ -22,12 +24,6 @@ export default async function main(container) {
const request = fetchRequest;
const clock = new DOMClock();
const sessionContainer = new SessionContainer({
clock,
request,
storageFactory: new StorageFactory(),
});
const vm = new BrawlViewModel({
storageFactory: new StorageFactory(),
createHsApi: (homeServer, accessToken, reconnector) => new HomeServerApi({homeServer, accessToken, request, reconnector}),

View File

@ -1,169 +0,0 @@
export class ExponentialRetryDelay {
constructor(start = 2000, createTimeout) {
this._start = start;
this._current = start;
this._createTimeout = createTimeout;
this._max = 60 * 5 * 1000; //5 min
this._timeout = null;
}
async waitForRetry() {
this._timeout = this._createTimeout(this._current);
try {
await this._timeout.elapsed();
// only increase delay if we didn't get interrupted
const seconds = this._current / 1000;
const powerOfTwo = (seconds * seconds) * 1000;
this._current = Math.max(this._max, powerOfTwo);
} catch(err) {
// swallow AbortError, means abort was called
if (!(err instanceof AbortError)) {
throw err;
}
} finally {
this._timeout = null;
}
}
abort() {
if (this._timeout) {
this._timeout.abort();
}
}
reset() {
this._current = this._start;
this.abort();
}
get nextValue() {
return this._current;
}
}
// we need a clock interface that gives us both timestamps and a timer that we can interrupt?
// state
// - offline
// - waiting to reconnect
// - reconnecting
// - online
//
//
function createEnum(...values) {
const obj = {};
for (const value of values) {
obj[value] = value;
}
return Object.freeze(obj);
}
export const ConnectionState = createEnum(
"Offline",
"Waiting",
"Reconnecting",
"Online"
);
export class Reconnector {
constructor({retryDelay, createTimeMeasure, isOnline}) {
this._isOnline = isOnline;
this._retryDelay = retryDelay;
this._createTimeMeasure = createTimeMeasure;
// assume online, and do our thing when something fails
this._state = new ObservableValue(ConnectionState.Online);
this._isReconnecting = false;
this._versionsResponse = null;
}
get lastVersionsResponse() {
return this._versionsResponse;
}
get connectionState() {
return this._state;
}
get retryIn() {
if (this._state.get() === ConnectionState.Waiting) {
return this._retryDelay.nextValue - this._stateSince.measure();
}
return 0;
}
async onRequestFailed(hsApi) {
if (!this._isReconnecting) {
this._setState(ConnectionState.Offline);
const isOnlineSubscription = this._isOnline && this._isOnline.subscribe(online => {
if (online) {
this.tryNow();
}
});
try {
await this._reconnectLoop(hsApi);
} finally {
if (isOnlineSubscription) {
// unsubscribe from this._isOnline
isOnlineSubscription();
}
}
}
}
tryNow() {
if (this._retryDelay) {
// this will interrupt this._retryDelay.waitForRetry() in _reconnectLoop
this._retryDelay.abort();
}
}
_setState(state) {
if (state !== this._state.get()) {
if (state === ConnectionState.Waiting) {
this._stateSince = this._createTimeMeasure();
} else {
this._stateSince = null;
}
this._state.set(state);
}
}
async _reconnectLoop(hsApi) {
this._isReconnecting = true;
this._versionsResponse = null;
this._retryDelay.reset();
try {
while (!this._versionsResponse) {
try {
this._setState(ConnectionState.Reconnecting);
// use 10s timeout, because we don't want to be waiting for
// a stale connection when we just came online again
const versionsRequest = hsApi.versions({timeout: 10000});
this._versionsResponse = await versionsRequest.response();
this._setState(ConnectionState.Online);
} catch (err) {
if (err instanceof NetworkError) {
this._setState(ConnectionState.Waiting);
try {
await this._retryDelay.waitForRetry();
} catch (err) {
if (!(err instanceof AbortError)) {
throw err;
}
}
} else {
throw err;
}
}
}
} catch (err) {
// nothing is catching the error above us,
// so just log here
console.err(err);
}
}
}

View File

@ -1,4 +1,4 @@
import HomeServerApi from "./hs-api.js";
import HomeServerApi from "./net/HomeServerApi.js";
export const LoadStatus = createEnum(
"NotLoading",
@ -131,7 +131,6 @@ export class SessionContainer {
}
this._sync = new Sync({hsApi, storage, session: this._session});
// notify sync and session when back online
this._reconnectSubscription = this._reconnector.connectionStatus.subscribe(state => {
if (state === ConnectionStatus.Online) {
@ -139,7 +138,15 @@ export class SessionContainer {
this._session.start(this._reconnector.lastVersionsResponse);
}
});
await this._waitForFirstSync();
this._status.set(LoadStatus.Ready);
// if this fails, the reconnector will start polling versions to reconnect
const lastVersionsResponse = await hsApi.versions({timeout: 10000}).response();
this._session.start(lastVersionsResponse);
}
async _waitForFirstSync() {
try {
await this._sync.start();
} catch (err) {
@ -151,12 +158,18 @@ export class SessionContainer {
}
}
// only transition into Ready once the first sync has succeeded
await this._sync.status.waitFor(s => s === SyncStatus.Syncing);
this._status.set(LoadStatus.Ready);
// if this fails, the reconnector will start polling versions to reconnect
const lastVersionsResponse = await hsApi.versions({timeout: 10000}).response();
this._session.start(lastVersionsResponse);
this._waitForFirstSyncHandle = this._sync.status.waitFor(s => s === SyncStatus.Syncing);
try {
await this._waitForFirstSyncHandle.promise;
} catch (err) {
// if dispose is called from stop, bail out
if (err instanceof AbortError) {
return;
}
throw err;
} finally {
this._waitForFirstSyncHandle = null;
}
}
@ -183,6 +196,10 @@ export class SessionContainer {
this._reconnectSubscription = null;
this._sync.stop();
this._session.stop();
if (this._waitForFirstSyncHandle) {
this._waitForFirstSyncHandle.dispose();
this._waitForFirstSyncHandle = null;
}
}
}

View File

@ -0,0 +1,107 @@
import {AbortError} from "../../utils/error.js";
export default class ExponentialRetryDelay {
constructor(createTimeout, start = 2000) {
this._start = start;
this._current = start;
this._createTimeout = createTimeout;
this._max = 60 * 5 * 1000; //5 min
this._timeout = null;
}
async waitForRetry() {
this._timeout = this._createTimeout(this._current);
try {
await this._timeout.elapsed();
// only increase delay if we didn't get interrupted
const next = 2 * this._current;
this._current = Math.min(this._max, next);
} catch(err) {
// swallow AbortError, means abort was called
if (!(err instanceof AbortError)) {
throw err;
}
} finally {
this._timeout = null;
}
}
abort() {
if (this._timeout) {
this._timeout.abort();
}
}
reset() {
this._current = this._start;
this.abort();
}
get nextValue() {
return this._current;
}
}
import MockClock from "../../../mocks/Clock.js";
export function tests() {
return {
"test sequence": async assert => {
const clock = new MockClock();
const retryDelay = new ExponentialRetryDelay(clock.createTimeout, 2000);
let promise;
assert.strictEqual(retryDelay.nextValue, 2000);
promise = retryDelay.waitForRetry();
clock.elapse(2000);
await promise;
assert.strictEqual(retryDelay.nextValue, 4000);
promise = retryDelay.waitForRetry();
clock.elapse(4000);
await promise;
assert.strictEqual(retryDelay.nextValue, 8000);
promise = retryDelay.waitForRetry();
clock.elapse(8000);
await promise;
assert.strictEqual(retryDelay.nextValue, 16000);
promise = retryDelay.waitForRetry();
clock.elapse(16000);
await promise;
assert.strictEqual(retryDelay.nextValue, 32000);
promise = retryDelay.waitForRetry();
clock.elapse(32000);
await promise;
assert.strictEqual(retryDelay.nextValue, 64000);
promise = retryDelay.waitForRetry();
clock.elapse(64000);
await promise;
assert.strictEqual(retryDelay.nextValue, 128000);
promise = retryDelay.waitForRetry();
clock.elapse(128000);
await promise;
assert.strictEqual(retryDelay.nextValue, 256000);
promise = retryDelay.waitForRetry();
clock.elapse(256000);
await promise;
assert.strictEqual(retryDelay.nextValue, 300000);
promise = retryDelay.waitForRetry();
clock.elapse(300000);
await promise;
assert.strictEqual(retryDelay.nextValue, 300000);
promise = retryDelay.waitForRetry();
clock.elapse(300000);
await promise;
},
}
}

View File

@ -0,0 +1,176 @@
import createEnum from "../../utils/enum.js";
import {AbortError} from "../../utils/error.js";
import {NetworkError} from "../error.js"
import ObservableValue from "../../observable/ObservableValue.js";
export const ConnectionStatus = createEnum(
"Offline",
"Waiting",
"Reconnecting",
"Online"
);
export class Reconnector {
constructor({retryDelay, createMeasure, onlineStatus}) {
this._onlineStatus = onlineStatus;
this._retryDelay = retryDelay;
this._createTimeMeasure = createMeasure;
// assume online, and do our thing when something fails
this._state = new ObservableValue(ConnectionStatus.Online);
this._isReconnecting = false;
this._versionsResponse = null;
}
get lastVersionsResponse() {
return this._versionsResponse;
}
get connectionStatus() {
return this._state;
}
get retryIn() {
if (this._state.get() === ConnectionStatus.Waiting) {
return this._retryDelay.nextValue - this._stateSince.measure();
}
return 0;
}
async onRequestFailed(hsApi) {
if (!this._isReconnecting) {
this._setState(ConnectionStatus.Offline);
const onlineStatusSubscription = this._onlineStatus && this._onlineStatus.subscribe(online => {
if (online) {
this.tryNow();
}
});
try {
await this._reconnectLoop(hsApi);
} catch (err) {
// nothing is catching the error above us,
// so just log here
console.error(err);
} finally {
if (onlineStatusSubscription) {
// unsubscribe from this._onlineStatus
onlineStatusSubscription();
}
}
}
}
tryNow() {
if (this._retryDelay) {
// this will interrupt this._retryDelay.waitForRetry() in _reconnectLoop
this._retryDelay.abort();
}
}
_setState(state) {
if (state !== this._state.get()) {
if (state === ConnectionStatus.Waiting) {
this._stateSince = this._createTimeMeasure();
} else {
this._stateSince = null;
}
this._state.set(state);
}
}
async _reconnectLoop(hsApi) {
this._isReconnecting = true;
this._versionsResponse = null;
this._retryDelay.reset();
while (!this._versionsResponse) {
try {
this._setState(ConnectionStatus.Reconnecting);
// use 10s timeout, because we don't want to be waiting for
// a stale connection when we just came online again
const versionsRequest = hsApi.versions({timeout: 10000});
this._versionsResponse = await versionsRequest.response();
this._setState(ConnectionStatus.Online);
} catch (err) {
if (err instanceof NetworkError) {
this._setState(ConnectionStatus.Waiting);
try {
await this._retryDelay.waitForRetry();
} catch (err) {
if (!(err instanceof AbortError)) {
throw err;
}
}
} else {
throw err;
}
}
}
}
}
import MockClock from "../../../mocks/Clock.js";
import ExponentialRetryDelay from "./ExponentialRetryDelay.js";
export function tests() {
function createHsApiMock(remainingFailures) {
return {
versions() {
return {
response() {
if (remainingFailures) {
remainingFailures -= 1;
return Promise.reject(new NetworkError());
} else {
return Promise.resolve(42);
}
}
};
}
}
}
return {
"test reconnecting with 1 failure": async assert => {
const clock = new MockClock();
const {createMeasure} = clock;
const onlineStatus = new ObservableValue(false);
const retryDelay = new ExponentialRetryDelay(clock.createTimeout, 2000);
const reconnector = new Reconnector({retryDelay, onlineStatus, createMeasure});
const {connectionStatus} = reconnector;
const statuses = [];
const subscription = reconnector.connectionStatus.subscribe(s => {
statuses.push(s);
});
reconnector.onRequestFailed(createHsApiMock(1));
await connectionStatus.waitFor(s => s === ConnectionStatus.Waiting).promise;
clock.elapse(2000);
await connectionStatus.waitFor(s => s === ConnectionStatus.Online).promise;
assert.deepEqual(statuses, [
ConnectionStatus.Offline,
ConnectionStatus.Reconnecting,
ConnectionStatus.Waiting,
ConnectionStatus.Reconnecting,
ConnectionStatus.Online
]);
assert.strictEqual(reconnector.lastVersionsResponse, 42);
subscription();
},
"test reconnecting with onlineStatus": async assert => {
const clock = new MockClock();
const {createMeasure} = clock;
const onlineStatus = new ObservableValue(false);
const retryDelay = new ExponentialRetryDelay(clock.createTimeout, 2000);
const reconnector = new Reconnector({retryDelay, onlineStatus, createMeasure});
const {connectionStatus} = reconnector;
reconnector.onRequestFailed(createHsApiMock(1));
await connectionStatus.waitFor(s => s === ConnectionStatus.Waiting).promise;
onlineStatus.set(true); //skip waiting
await connectionStatus.waitFor(s => s === ConnectionStatus.Online).promise;
assert.equal(connectionStatus.get(), ConnectionStatus.Online);
assert.strictEqual(reconnector.lastVersionsResponse, 42);
},
}
}

77
src/mocks/Clock.js Normal file
View File

@ -0,0 +1,77 @@
import ObservableValue from "../observable/ObservableValue.js";
class Timeout {
constructor(elapsed, ms) {
this._reject = null;
this._handle = null;
const timeoutValue = elapsed.get() + ms;
this._waitHandle = elapsed.waitFor(t => t >= timeoutValue);
}
elapsed() {
return this._waitHandle.promise;
}
abort() {
// will reject with AbortError
this._waitHandle.dispose();
}
}
class TimeMeasure {
constructor(elapsed) {
this._elapsed = elapsed;
this._start = elapsed.get();
}
measure() {
return this._elapsed.get() - this._start;
}
}
export default class Clock {
constructor(baseTimestamp = 0) {
this._baseTimestamp = baseTimestamp;
this._elapsed = new ObservableValue(0);
// should be callable as a function as well as a method
this.createMeasure = this.createMeasure.bind(this);
this.createTimeout = this.createTimeout.bind(this);
this.now = this.now.bind(this);
}
createMeasure() {
return new TimeMeasure(this._elapsed);
}
createTimeout(ms) {
return new Timeout(this._elapsed, ms);
}
now() {
return this._baseTimestamp + this.elapsed;
}
elapse(ms) {
this._elapsed.set(this._elapsed.get() + Math.max(0, ms));
}
get elapsed() {
return this._elapsed.get();
}
}
export function tests() {
return {
"test timeout": async assert => {
const clock = new Clock();
Promise.resolve().then(() => {
clock.elapse(500);
clock.elapse(500);
}).catch(assert.fail);
const timeout = clock.createTimeout(1000);
const promise = timeout.elapsed();
assert(promise instanceof Promise);
await promise;
}
}
}

View File

@ -1,4 +1,4 @@
export default class BaseObservableCollection {
export default class BaseObservable {
constructor() {
this._handlers = new Set();
}
@ -31,33 +31,8 @@ export default class BaseObservableCollection {
// Add iterator over handlers here
}
// like an EventEmitter, but doesn't have an event type
export class BaseObservableValue extends BaseObservableCollection {
emit(argument) {
for (const h of this._handlers) {
h(argument);
}
}
}
export class ObservableValue extends BaseObservableValue {
constructor(initialValue) {
super();
this._value = initialValue;
}
get() {
return this._value;
}
set(value) {
this._value = value;
this.emit(this._value);
}
}
export function tests() {
class Collection extends BaseObservableCollection {
class Collection extends BaseObservable {
constructor() {
super();
this.firstSubscribeCalls = 0;

View File

@ -0,0 +1,120 @@
import {AbortError} from "../utils/error.js";
import BaseObservable from "./BaseObservable.js";
// like an EventEmitter, but doesn't have an event type
export class BaseObservableValue extends BaseObservable {
emit(argument) {
for (const h of this._handlers) {
h(argument);
}
}
}
class WaitForHandle {
constructor(observable, predicate) {
this._promise = new Promise((resolve, reject) => {
this._reject = reject;
this._subscription = observable.subscribe(v => {
if (predicate(v)) {
this._reject = null;
resolve(v);
this.dispose();
}
});
});
}
get promise() {
return this._promise;
}
dispose() {
if (this._subscription) {
this._subscription();
this._subscription = null;
}
if (this._reject) {
this._reject(new AbortError());
this._reject = null;
}
}
}
class ResolvedWaitForHandle {
constructor(promise) {
this.promise = promise;
}
dispose() {}
}
export default class ObservableValue extends BaseObservableValue {
constructor(initialValue) {
super();
this._value = initialValue;
}
get() {
return this._value;
}
set(value) {
if (value !== this._value) {
this._value = value;
this.emit(this._value);
}
}
waitFor(predicate) {
if (predicate(this.get())) {
return new ResolvedWaitForHandle(Promise.resolve(this.get()));
} else {
return new WaitForHandle(this, predicate);
}
}
}
export function tests() {
return {
"set emits an update": assert => {
const a = new ObservableValue();
let fired = false;
const subscription = a.subscribe(v => {
fired = true;
assert.strictEqual(v, 5);
});
a.set(5);
assert(fired);
subscription();
},
"set doesn't emit if value hasn't changed": assert => {
const a = new ObservableValue(5);
let fired = false;
const subscription = a.subscribe(() => {
fired = true;
});
a.set(5);
a.set(5);
assert(!fired);
subscription();
},
"waitFor promise resolves on matching update": async assert => {
const a = new ObservableValue(5);
const handle = a.waitFor(v => v === 6);
Promise.resolve().then(() => {
a.set(6);
});
await handle.promise;
assert.strictEqual(a.get(), 6);
},
"waitFor promise rejects when disposed": async assert => {
const a = new ObservableValue();
const handle = a.waitFor(() => false);
Promise.resolve().then(() => {
handle.dispose();
});
await assert.rejects(handle.promise, AbortError);
},
}
}

View File

@ -1,6 +1,6 @@
import BaseObservableCollection from "../BaseObservableCollection.js";
import BaseObservable from "../BaseObservable.js";
export default class BaseObservableList extends BaseObservableCollection {
export default class BaseObservableList extends BaseObservable {
emitReset() {
for(let h of this._handlers) {
h.onReset(this);

View File

@ -1,6 +1,6 @@
import BaseObservableCollection from "../BaseObservableCollection.js";
import BaseObservable from "../BaseObservable.js";
export default class BaseObservableMap extends BaseObservableCollection {
export default class BaseObservableMap extends BaseObservable {
emitReset() {
for(let h of this._handlers) {
h.onReset();

View File

@ -1,4 +1,4 @@
import {AbortError} from "../utils/error.js";
import {AbortError} from "../../../utils/error.js";
class Timeout {
constructor(ms) {
@ -37,7 +37,7 @@ class TimeMeasure {
}
}
export class Clock {
export default class Clock {
createMeasure() {
return new TimeMeasure();
}

View File

@ -1,4 +1,6 @@
export class OnlineStatus extends ObservableValue {
import {BaseObservableValue} from "../../../observable/ObservableValue.js";
export default class OnlineStatus extends BaseObservableValue {
constructor() {
super();
this._onOffline = this._onOffline.bind(this);

View File

@ -34,7 +34,7 @@ export default class SwitchView {
return this._childView;
}
}
/*
// SessionLoadView
// should this be the new switch view?
// and the other one be the BasicSwitchView?
@ -50,8 +50,8 @@ new BoundSwitchView(vm, vm => vm.isLoading, (loading, vm) => {
return new SessionView(vm.sessionViewModel);
}
});
class BoundSwitchView extends SwitchView {
*/
export class BoundSwitchView extends SwitchView {
constructor(value, mapper, viewCreator) {
super(viewCreator(mapper(value), value));
this._mapper = mapper;

7
src/utils/enum.js Normal file
View File

@ -0,0 +1,7 @@
export default function createEnum(...values) {
const obj = {};
for (const value of values) {
obj[value] = value;
}
return Object.freeze(obj);
}

View File

@ -234,10 +234,10 @@ http-errors@~1.7.2:
statuses ">= 1.5.0 < 2"
toidentifier "1.0.0"
impunity@^0.0.10:
version "0.0.10"
resolved "https://registry.yarnpkg.com/impunity/-/impunity-0.0.10.tgz#b4e47c85db53279ca7fcf2e07f7ffb111b050e49"
integrity sha512-orL7IaDV//74U6GDyw7j7wcLwxhhLpXStyZ+Pz4O1UEYx1zlCojfpBNuq26Mzbaw0HMEwrMMi4JnLQ9lz3HVFg==
impunity@^0.0.11:
version "0.0.11"
resolved "https://registry.yarnpkg.com/impunity/-/impunity-0.0.11.tgz#216da6860ad17dd360fdaa2b15d7006579b5dd8a"
integrity sha512-EZUlc/Qx7oaRXZY+PtewrPby63sWZQsEtjGFB05XfbL/20SBkR8ksFnBahkeOD2/ErNkO3vh8AV0oDbdSSS8jQ==
dependencies:
colors "^1.3.3"
commander "^2.19.0"