/** * Base error used to trigger an unsubscribe action from a subscriber. * @extends Error */ export class UnsubscribeError extends Error {} /** * Thrown when a closed observable is pushed to. * @extends Error */ export class CompletedObservableError extends Error { constructor() { super('This observable can no longer be pushed to, as it has been completed.') } } /** * Type of a basic subscriber function. */ export type SubscriberFunction = (val: T) => any /** * Type of a basic subscriber function that handles errors. */ export type SubscriberErrorFunction = (error: Error) => any /** * Type of a basic subscriber function that handles completed events. */ export type SubscriberCompleteFunction = (val?: T) => any /** * Subscribers that define multiple handler methods. */ export type ComplexSubscriber = { next?: SubscriberFunction, error?: SubscriberErrorFunction, complete?: SubscriberCompleteFunction, } /** * Subscription to a behavior subject. */ export type Subscription = SubscriberFunction | ComplexSubscriber /** * Object providing helpers for unsubscribing from a subscription. */ export type Unsubscribe = { unsubscribe: () => void } /** * A stream-based state class. */ export class BehaviorSubject { /** * Subscribers to this subject. * @type Array */ protected subscribers: ComplexSubscriber[] = [] /** * True if this subject has been marked complete. * @type boolean */ protected subjectIsComplete = false /** * The current value of this subject. */ protected currentValue?: T /** * True if any value has been pushed to this subject. * @type boolean */ protected hasPush = false /** * Register a new subscription to this subject. * @param {Subscription} subscriber * @return Unsubscribe */ public subscribe(subscriber: Subscription): Unsubscribe { if ( typeof subscriber === 'function' ) { this.subscribers.push({ next: subscriber }) } else { this.subscribers.push(subscriber) } return { unsubscribe: () => { this.subscribers = this.subscribers.filter(x => x !== subscriber) }, } } /** * Cast this subject to a promise, which resolves on the output of the next value. * @return Promise */ public toPromise(): Promise { return new Promise((resolve, reject) => { const { unsubscribe } = this.subscribe({ next: (val: T) => { resolve(val) unsubscribe() }, error: (error: Error) => { reject(error) unsubscribe() }, complete: (val?: T) => { if ( typeof val !== 'undefined' ) { resolve(val) } unsubscribe() }, }) }) } /** * Push a new value to this subject. The promise resolves when all subscribers have been pushed to. * @param val * @return Promise */ public async next(val: T): Promise { if ( this.subjectIsComplete ) { throw new CompletedObservableError() } this.currentValue = val this.hasPush = true for ( const subscriber of this.subscribers ) { if ( subscriber.next ) { try { await subscriber.next(val) } catch (e) { if ( e instanceof UnsubscribeError ) { this.subscribers = this.subscribers.filter(x => x !== subscriber) } else if (subscriber.error && e instanceof Error) { await subscriber.error(e) } else { throw e } } } } } /** * Push the given array of values to this subject in order. * The promise resolves when all subscribers have been pushed to for all values. * @param {Array} vals * @return Promise */ public async push(vals: T[]): Promise { if ( this.subjectIsComplete ) { throw new CompletedObservableError() } await Promise.all(vals.map(val => this.next(val))) } /** * Mark this subject as complete. * The promise resolves when all subscribers have been pushed to. * @param [finalValue] - optionally, a final value to set * @return Promise */ public async complete(finalValue?: T): Promise { if ( this.subjectIsComplete ) { throw new CompletedObservableError() } if ( typeof finalValue === 'undefined' ) { finalValue = this.value() } else { this.currentValue = finalValue } for ( const subscriber of this.subscribers ) { if ( subscriber.complete ) { try { await subscriber.complete(finalValue) } catch (e) { if ( subscriber.error && e instanceof Error ) { await subscriber.error(e) } else { throw e } } } } this.subjectIsComplete = true } /** * Get the current value of this subject. */ public value(): T | undefined { return this.currentValue } /** * True if this subject is marked as complete. * @return boolean */ public isComplete(): boolean { return this.subjectIsComplete } }