add flatMap operator on observable value

This commit is contained in:
Bruno Windels 2022-01-28 16:35:49 +01:00
parent c340746a87
commit e0df003aba

View File

@ -16,6 +16,7 @@ limitations under the License.
import {AbortError} from "../utils/error"; import {AbortError} from "../utils/error";
import {BaseObservable} from "./BaseObservable"; import {BaseObservable} from "./BaseObservable";
import type {SubscriptionHandle} from "./BaseObservable";
// like an EventEmitter, but doesn't have an event type // like an EventEmitter, but doesn't have an event type
export abstract class BaseObservableValue<T> extends BaseObservable<(value: T) => void> { export abstract class BaseObservableValue<T> extends BaseObservable<(value: T) => void> {
@ -34,6 +35,10 @@ export abstract class BaseObservableValue<T> extends BaseObservable<(value: T) =
return new WaitForHandle(this, predicate); return new WaitForHandle(this, predicate);
} }
} }
flatMap<C>(mapper: (value: T) => (BaseObservableValue<C> | undefined)): BaseObservableValue<C | undefined> {
return new FlatMapObservableValue<T, C>(this, mapper);
}
} }
interface IWaitHandle<T> { interface IWaitHandle<T> {
@ -114,6 +119,61 @@ export class RetainedObservableValue<T> extends ObservableValue<T> {
} }
} }
export class FlatMapObservableValue<P, C> extends BaseObservableValue<C | undefined> {
private sourceSubscription?: SubscriptionHandle;
private targetSubscription?: SubscriptionHandle;
constructor(
private readonly source: BaseObservableValue<P>,
private readonly mapper: (value: P) => (BaseObservableValue<C> | undefined)
) {
super();
}
onUnsubscribeLast() {
super.onUnsubscribeLast();
this.sourceSubscription = this.sourceSubscription!();
if (this.targetSubscription) {
this.targetSubscription = this.targetSubscription();
}
}
onSubscribeFirst() {
super.onSubscribeFirst();
this.sourceSubscription = this.source.subscribe(() => {
this.updateTargetSubscription();
this.emit(this.get());
});
this.updateTargetSubscription();
}
private updateTargetSubscription() {
const sourceValue = this.source.get();
if (sourceValue) {
const target = this.mapper(sourceValue);
if (target) {
if (!this.targetSubscription) {
this.targetSubscription = target.subscribe(() => this.emit(this.get()));
}
return;
}
}
// if no sourceValue or target
if (this.targetSubscription) {
this.targetSubscription = this.targetSubscription();
}
}
get(): C | undefined {
const sourceValue = this.source.get();
if (!sourceValue) {
return undefined;
}
const mapped = this.mapper(sourceValue);
return mapped?.get();
}
}
export function tests() { export function tests() {
return { return {
"set emits an update": assert => { "set emits an update": assert => {
@ -155,5 +215,34 @@ export function tests() {
}); });
await assert.rejects(handle.promise, AbortError); await assert.rejects(handle.promise, AbortError);
}, },
"flatMap.get": assert => {
const a = new ObservableValue<undefined | {count: ObservableValue<number>}>(undefined);
const countProxy = a.flatMap(a => a.count);
assert.strictEqual(countProxy.get(), undefined);
const count = new ObservableValue<number>(0);
a.set({count});
assert.strictEqual(countProxy.get(), 0);
},
"flatMap update from source": assert => {
const a = new ObservableValue<undefined | {count: ObservableValue<number>}>(undefined);
const updates: (number | undefined)[] = [];
a.flatMap(a => a.count).subscribe(count => {
updates.push(count);
});
const count = new ObservableValue<number>(0);
a.set({count});
assert.deepEqual(updates, [0]);
},
"flatMap update from target": assert => {
const a = new ObservableValue<undefined | {count: ObservableValue<number>}>(undefined);
const updates: (number | undefined)[] = [];
a.flatMap(a => a.count).subscribe(count => {
updates.push(count);
});
const count = new ObservableValue<number>(0);
a.set({count});
count.set(5);
assert.deepEqual(updates, [0, 5]);
}
} }
} }