You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lib/src/util/support/BehaviorSubject.ts

211 lines
5.7 KiB

/**
* Base error used to trigger an unsubscribe action from a subscriber.
* @extends Error
*/
export class UnsubscribeError extends Error {}
/**
* Thrown when a closed observable is pushed to.
* @extends Error
*/
export class CompletedObservableError extends Error {
constructor() {
super('This observable can no longer be pushed to, as it has been completed.')
}
}
/**
* Type of a basic subscriber function.
*/
export type SubscriberFunction<T> = (val: T) => any
/**
* Type of a basic subscriber function that handles errors.
*/
export type SubscriberErrorFunction = (error: Error) => any
/**
* Type of a basic subscriber function that handles completed events.
*/
export type SubscriberCompleteFunction<T> = (val?: T) => any
/**
* Subscribers that define multiple handler methods.
*/
export type ComplexSubscriber<T> = {
next?: SubscriberFunction<T>,
error?: SubscriberErrorFunction,
complete?: SubscriberCompleteFunction<T>,
}
/**
* Subscription to a behavior subject.
*/
export type Subscription<T> = SubscriberFunction<T> | ComplexSubscriber<T>
/**
* Object providing helpers for unsubscribing from a subscription.
*/
export type Unsubscribe = { unsubscribe: () => void }
/**
* A stream-based state class.
*/
export class BehaviorSubject<T> {
/**
* Subscribers to this subject.
* @type Array<ComplexSubscriber>
*/
protected subscribers: ComplexSubscriber<T>[] = []
/**
* True if this subject has been marked complete.
* @type boolean
*/
protected subjectIsComplete = false
/**
* The current value of this subject.
*/
protected currentValue?: T
/**
* True if any value has been pushed to this subject.
* @type boolean
*/
protected hasPush = false
/**
* Register a new subscription to this subject.
* @param {Subscription} subscriber
* @return Unsubscribe
*/
public subscribe(subscriber: Subscription<T>): Unsubscribe {
if ( typeof subscriber === 'function' ) {
this.subscribers.push({ next: subscriber })
} else {
this.subscribers.push(subscriber)
}
return {
unsubscribe: () => {
this.subscribers = this.subscribers.filter(x => x !== subscriber)
},
}
}
/**
* Cast this subject to a promise, which resolves on the output of the next value.
* @return Promise
*/
public toPromise(): Promise<T> {
return new Promise((resolve, reject) => {
const { unsubscribe } = this.subscribe({
next: (val: T) => {
resolve(val)
unsubscribe()
},
error: (error: Error) => {
reject(error)
unsubscribe()
},
complete: (val?: T) => {
if ( typeof val !== 'undefined' ) {
resolve(val)
}
unsubscribe()
},
})
})
}
/**
* Push a new value to this subject. The promise resolves when all subscribers have been pushed to.
* @param val
* @return Promise<void>
*/
public async next(val: T): Promise<void> {
if ( this.subjectIsComplete ) {
throw new CompletedObservableError()
}
this.currentValue = val
this.hasPush = true
for ( const subscriber of this.subscribers ) {
if ( subscriber.next ) {
try {
await subscriber.next(val)
} catch (e) {
if ( e instanceof UnsubscribeError ) {
this.subscribers = this.subscribers.filter(x => x !== subscriber)
3 years ago
} else if (subscriber.error && e instanceof Error) {
await subscriber.error(e)
} else {
throw e
}
}
}
}
}
/**
* Push the given array of values to this subject in order.
* The promise resolves when all subscribers have been pushed to for all values.
* @param {Array} vals
* @return Promise<void>
*/
public async push(vals: T[]): Promise<void> {
if ( this.subjectIsComplete ) {
throw new CompletedObservableError()
}
await Promise.all(vals.map(val => this.next(val)))
}
/**
* Mark this subject as complete.
* The promise resolves when all subscribers have been pushed to.
* @param [finalValue] - optionally, a final value to set
* @return Promise<void>
*/
public async complete(finalValue?: T): Promise<void> {
if ( this.subjectIsComplete ) {
throw new CompletedObservableError()
}
if ( typeof finalValue === 'undefined' ) {
finalValue = this.value()
} else {
this.currentValue = finalValue
}
for ( const subscriber of this.subscribers ) {
if ( subscriber.complete ) {
try {
await subscriber.complete(finalValue)
} catch (e) {
3 years ago
if ( subscriber.error && e instanceof Error ) {
await subscriber.error(e)
} else {
throw e
}
}
}
}
this.subjectIsComplete = true
}
/**
* Get the current value of this subject.
*/
public value(): T | undefined {
return this.currentValue
}
/**
* True if this subject is marked as complete.
* @return boolean
*/
public isComplete(): boolean {
return this.subjectIsComplete
}
}