|
|
|
/**
|
|
|
|
* Base error used to trigger an unsubscribe action from a subscriber.
|
|
|
|
* @extends Error
|
|
|
|
*/
|
|
|
|
export class UnsubscribeError extends Error {}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Thrown when a closed observable is pushed to.
|
|
|
|
* @extends Error
|
|
|
|
*/
|
|
|
|
export class CompletedObservableError extends Error {
|
|
|
|
constructor() {
|
|
|
|
super('This observable can no longer be pushed to, as it has been completed.')
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Type of a basic subscriber function.
|
|
|
|
*/
|
|
|
|
export type SubscriberFunction<T> = (val: T) => any
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Type of a basic subscriber function that handles errors.
|
|
|
|
*/
|
|
|
|
export type SubscriberErrorFunction = (error: Error) => any
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Type of a basic subscriber function that handles completed events.
|
|
|
|
*/
|
|
|
|
export type SubscriberCompleteFunction<T> = (val?: T) => any
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Subscribers that define multiple handler methods.
|
|
|
|
*/
|
|
|
|
export type ComplexSubscriber<T> = {
|
|
|
|
next?: SubscriberFunction<T>,
|
|
|
|
error?: SubscriberErrorFunction,
|
|
|
|
complete?: SubscriberCompleteFunction<T>,
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Subscription to a behavior subject.
|
|
|
|
*/
|
|
|
|
export type Subscription<T> = SubscriberFunction<T> | ComplexSubscriber<T>
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Object providing helpers for unsubscribing from a subscription.
|
|
|
|
*/
|
|
|
|
export type Unsubscribe = { unsubscribe: () => void }
|
|
|
|
|
|
|
|
/**
|
|
|
|
* A stream-based state class.
|
|
|
|
*/
|
|
|
|
export class BehaviorSubject<T> {
|
|
|
|
/**
|
|
|
|
* Subscribers to this subject.
|
|
|
|
* @type Array<ComplexSubscriber>
|
|
|
|
*/
|
|
|
|
protected subscribers: ComplexSubscriber<T>[] = []
|
|
|
|
|
|
|
|
/**
|
|
|
|
* True if this subject has been marked complete.
|
|
|
|
* @type boolean
|
|
|
|
*/
|
|
|
|
protected subjectIsComplete = false
|
|
|
|
|
|
|
|
/**
|
|
|
|
* The current value of this subject.
|
|
|
|
*/
|
|
|
|
protected currentValue?: T
|
|
|
|
|
|
|
|
/**
|
|
|
|
* True if any value has been pushed to this subject.
|
|
|
|
* @type boolean
|
|
|
|
*/
|
|
|
|
protected hasPush = false
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Register a new subscription to this subject.
|
|
|
|
* @param {Subscription} subscriber
|
|
|
|
* @return Unsubscribe
|
|
|
|
*/
|
|
|
|
public subscribe(subscriber: Subscription<T>): Unsubscribe {
|
|
|
|
if ( typeof subscriber === 'function' ) {
|
|
|
|
this.subscribers.push({ next: subscriber })
|
|
|
|
} else {
|
|
|
|
this.subscribers.push(subscriber)
|
|
|
|
}
|
|
|
|
|
|
|
|
return {
|
|
|
|
unsubscribe: () => {
|
|
|
|
this.subscribers = this.subscribers.filter(x => x !== subscriber)
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Cast this subject to a promise, which resolves on the output of the next value.
|
|
|
|
* @return Promise
|
|
|
|
*/
|
|
|
|
public toPromise(): Promise<T> {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
const { unsubscribe } = this.subscribe({
|
|
|
|
next: (val: T) => {
|
|
|
|
resolve(val)
|
|
|
|
unsubscribe()
|
|
|
|
},
|
|
|
|
error: (error: Error) => {
|
|
|
|
reject(error)
|
|
|
|
unsubscribe()
|
|
|
|
},
|
|
|
|
complete: (val?: T) => {
|
|
|
|
if ( typeof val !== 'undefined' ) {
|
|
|
|
resolve(val)
|
|
|
|
}
|
|
|
|
unsubscribe()
|
|
|
|
},
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Push a new value to this subject. The promise resolves when all subscribers have been pushed to.
|
|
|
|
* @param val
|
|
|
|
* @return Promise<void>
|
|
|
|
*/
|
|
|
|
public async next(val: T): Promise<void> {
|
|
|
|
if ( this.subjectIsComplete ) {
|
|
|
|
throw new CompletedObservableError()
|
|
|
|
}
|
|
|
|
this.currentValue = val
|
|
|
|
this.hasPush = true
|
|
|
|
for ( const subscriber of this.subscribers ) {
|
|
|
|
if ( subscriber.next ) {
|
|
|
|
try {
|
|
|
|
await subscriber.next(val)
|
|
|
|
} catch (e) {
|
|
|
|
if ( e instanceof UnsubscribeError ) {
|
|
|
|
this.subscribers = this.subscribers.filter(x => x !== subscriber)
|
|
|
|
} else if (subscriber.error && e instanceof Error) {
|
|
|
|
await subscriber.error(e)
|
|
|
|
} else {
|
|
|
|
throw e
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Push the given array of values to this subject in order.
|
|
|
|
* The promise resolves when all subscribers have been pushed to for all values.
|
|
|
|
* @param {Array} vals
|
|
|
|
* @return Promise<void>
|
|
|
|
*/
|
|
|
|
public async push(vals: T[]): Promise<void> {
|
|
|
|
if ( this.subjectIsComplete ) {
|
|
|
|
throw new CompletedObservableError()
|
|
|
|
}
|
|
|
|
await Promise.all(vals.map(val => this.next(val)))
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Mark this subject as complete.
|
|
|
|
* The promise resolves when all subscribers have been pushed to.
|
|
|
|
* @param [finalValue] - optionally, a final value to set
|
|
|
|
* @return Promise<void>
|
|
|
|
*/
|
|
|
|
public async complete(finalValue?: T): Promise<void> {
|
|
|
|
if ( this.subjectIsComplete ) {
|
|
|
|
throw new CompletedObservableError()
|
|
|
|
}
|
|
|
|
if ( typeof finalValue === 'undefined' ) {
|
|
|
|
finalValue = this.value()
|
|
|
|
} else {
|
|
|
|
this.currentValue = finalValue
|
|
|
|
}
|
|
|
|
|
|
|
|
for ( const subscriber of this.subscribers ) {
|
|
|
|
if ( subscriber.complete ) {
|
|
|
|
try {
|
|
|
|
await subscriber.complete(finalValue)
|
|
|
|
} catch (e) {
|
|
|
|
if ( subscriber.error && e instanceof Error ) {
|
|
|
|
await subscriber.error(e)
|
|
|
|
} else {
|
|
|
|
throw e
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
this.subjectIsComplete = true
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Get the current value of this subject.
|
|
|
|
*/
|
|
|
|
public value(): T | undefined {
|
|
|
|
return this.currentValue
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* True if this subject is marked as complete.
|
|
|
|
* @return boolean
|
|
|
|
*/
|
|
|
|
public isComplete(): boolean {
|
|
|
|
return this.subjectIsComplete
|
|
|
|
}
|
|
|
|
}
|