/** * Base error used to trigger an unsubscribe action from a subscriber. * @extends Error */ export class UnsubscribeError extends Error {} /** * Thrown when a closed observable is pushed to. * @extends Error */ export class CompletedObservableError extends Error { constructor() { super('This observable can no longer be pushed to, as it has been completed.') } } /** * Type of a basic subscriber function. */ export type SubscriberFunction = (val: T) => any /** * Type of a basic subscriber function that handles errors. */ export type SubscriberErrorFunction = (error: Error) => any /** * Type of a basic subscriber function that handles completed events. */ export type SubscriberCompleteFunction = (val?: T) => any /** * Subscribers that define multiple handler methods. */ export type ComplexSubscriber = { next?: SubscriberFunction, error?: SubscriberErrorFunction, complete?: SubscriberCompleteFunction, } /** * Subscription to a behavior subject. */ export type Subscription = SubscriberFunction | ComplexSubscriber /** * Object providing helpers for unsubscribing from a subscription. */ export type Unsubscribe = { unsubscribe: () => void } /** * A stream-based state class. */ export class BehaviorSubject { /** * Subscribers to this subject. * @type Array */ protected subscribers: ComplexSubscriber[] = [] /** * True if this subject has been marked complete. * @type boolean */ protected _isComplete: boolean = false /** * The current value of this subject. */ protected _value?: T /** * True if any value has been pushed to this subject. * @type boolean */ protected _hasPush: boolean = false /** * Register a new subscription to this subject. * @param {Subscription} subscriber * @return Unsubscribe */ public subscribe(subscriber: Subscription): Unsubscribe { if ( typeof subscriber === 'function' ) { this.subscribers.push({ next: subscriber }) } else { this.subscribers.push(subscriber) } return { unsubscribe: () => { this.subscribers = this.subscribers.filter(x => x !== subscriber) } } } /** * Cast this subject to a promise, which resolves on the output of the next value. * @return Promise */ public toPromise(): Promise { return new Promise((resolve, reject) => { const { unsubscribe } = this.subscribe({ next: (val: T) => { resolve(val) unsubscribe() }, error: (error: Error) => { reject(error) unsubscribe() }, complete: (val?: T) => { if ( typeof val !== 'undefined' ) resolve(val) unsubscribe() } }) }) } /** * Push a new value to this subject. The promise resolves when all subscribers have been pushed to. * @param val * @return Promise */ public async next(val: T): Promise { if ( this._isComplete ) throw new CompletedObservableError() this._value = val this._hasPush = true for ( const subscriber of this.subscribers ) { if ( subscriber.next ) { try { await subscriber.next(val) } catch (e) { if ( e instanceof UnsubscribeError ) { this.subscribers = this.subscribers.filter(x => x !== subscriber) } else if (subscriber.error) { await subscriber.error(e) } else { throw e } } } } } /** * Push the given array of values to this subject in order. * The promise resolves when all subscribers have been pushed to for all values. * @param {Array} vals * @return Promise */ public async push(vals: T[]): Promise { if ( this._isComplete ) throw new CompletedObservableError() await Promise.all(vals.map(val => this.next(val))) } /** * Mark this subject as complete. * The promise resolves when all subscribers have been pushed to. * @param [final_val] - optionally, a final value to set * @return Promise */ public async complete(final_val?: T): Promise { if ( this._isComplete ) throw new CompletedObservableError() if ( typeof final_val === 'undefined' ) final_val = this.value() else this._value = final_val for ( const subscriber of this.subscribers ) { if ( subscriber.complete ) { try { await subscriber.complete(final_val) } catch (e) { if ( subscriber.error ) { await subscriber.error(e) } else { throw e } } } } this._isComplete = true } /** * Get the current value of this subject. */ public value(): T | undefined { return this._value } /** * True if this subject is marked as complete. * @return boolean */ public isComplete(): boolean { return this._isComplete } }