lib/src/util/support/BehaviorSubject.ts

211 lines
5.7 KiB
TypeScript
Raw Normal View History

2021-06-02 01:59:40 +00:00
/**
* Base error used to trigger an unsubscribe action from a subscriber.
* @extends Error
*/
export class UnsubscribeError extends Error {}
/**
* Thrown when a closed observable is pushed to.
* @extends Error
*/
export class CompletedObservableError extends Error {
constructor() {
super('This observable can no longer be pushed to, as it has been completed.')
}
}
/**
* Type of a basic subscriber function.
*/
export type SubscriberFunction<T> = (val: T) => any
/**
* Type of a basic subscriber function that handles errors.
*/
export type SubscriberErrorFunction = (error: Error) => any
/**
* Type of a basic subscriber function that handles completed events.
*/
export type SubscriberCompleteFunction<T> = (val?: T) => any
/**
* Subscribers that define multiple handler methods.
*/
export type ComplexSubscriber<T> = {
next?: SubscriberFunction<T>,
error?: SubscriberErrorFunction,
complete?: SubscriberCompleteFunction<T>,
}
/**
* Subscription to a behavior subject.
*/
export type Subscription<T> = SubscriberFunction<T> | ComplexSubscriber<T>
/**
* Object providing helpers for unsubscribing from a subscription.
*/
export type Unsubscribe = { unsubscribe: () => void }
/**
* A stream-based state class.
*/
export class BehaviorSubject<T> {
/**
* Subscribers to this subject.
* @type Array<ComplexSubscriber>
*/
protected subscribers: ComplexSubscriber<T>[] = []
/**
* True if this subject has been marked complete.
* @type boolean
*/
2021-06-03 03:36:25 +00:00
protected subjectIsComplete = false
2021-06-02 01:59:40 +00:00
/**
* The current value of this subject.
*/
2021-06-03 03:36:25 +00:00
protected currentValue?: T
2021-06-02 01:59:40 +00:00
/**
* True if any value has been pushed to this subject.
* @type boolean
*/
2021-06-03 03:36:25 +00:00
protected hasPush = false
2021-06-02 01:59:40 +00:00
/**
* Register a new subscription to this subject.
* @param {Subscription} subscriber
* @return Unsubscribe
*/
public subscribe(subscriber: Subscription<T>): Unsubscribe {
if ( typeof subscriber === 'function' ) {
this.subscribers.push({ next: subscriber })
} else {
this.subscribers.push(subscriber)
}
return {
unsubscribe: () => {
this.subscribers = this.subscribers.filter(x => x !== subscriber)
2021-06-03 03:36:25 +00:00
},
2021-06-02 01:59:40 +00:00
}
}
/**
* Cast this subject to a promise, which resolves on the output of the next value.
* @return Promise
*/
public toPromise(): Promise<T> {
return new Promise((resolve, reject) => {
const { unsubscribe } = this.subscribe({
next: (val: T) => {
resolve(val)
unsubscribe()
},
error: (error: Error) => {
reject(error)
unsubscribe()
},
complete: (val?: T) => {
2021-06-03 03:36:25 +00:00
if ( typeof val !== 'undefined' ) {
resolve(val)
}
2021-06-02 01:59:40 +00:00
unsubscribe()
2021-06-03 03:36:25 +00:00
},
2021-06-02 01:59:40 +00:00
})
})
}
/**
* Push a new value to this subject. The promise resolves when all subscribers have been pushed to.
* @param val
* @return Promise<void>
*/
public async next(val: T): Promise<void> {
2021-06-03 03:36:25 +00:00
if ( this.subjectIsComplete ) {
throw new CompletedObservableError()
}
this.currentValue = val
this.hasPush = true
2021-06-02 01:59:40 +00:00
for ( const subscriber of this.subscribers ) {
if ( subscriber.next ) {
try {
await subscriber.next(val)
} catch (e) {
if ( e instanceof UnsubscribeError ) {
this.subscribers = this.subscribers.filter(x => x !== subscriber)
2021-10-18 18:03:28 +00:00
} else if (subscriber.error && e instanceof Error) {
2021-06-02 01:59:40 +00:00
await subscriber.error(e)
} else {
throw e
}
}
}
}
}
/**
* Push the given array of values to this subject in order.
* The promise resolves when all subscribers have been pushed to for all values.
* @param {Array} vals
* @return Promise<void>
*/
public async push(vals: T[]): Promise<void> {
2021-06-03 03:36:25 +00:00
if ( this.subjectIsComplete ) {
throw new CompletedObservableError()
}
2021-06-02 01:59:40 +00:00
await Promise.all(vals.map(val => this.next(val)))
}
/**
* Mark this subject as complete.
* The promise resolves when all subscribers have been pushed to.
2021-06-03 03:36:25 +00:00
* @param [finalValue] - optionally, a final value to set
2021-06-02 01:59:40 +00:00
* @return Promise<void>
*/
2021-06-03 03:36:25 +00:00
public async complete(finalValue?: T): Promise<void> {
if ( this.subjectIsComplete ) {
throw new CompletedObservableError()
}
if ( typeof finalValue === 'undefined' ) {
finalValue = this.value()
} else {
this.currentValue = finalValue
}
2021-06-02 01:59:40 +00:00
for ( const subscriber of this.subscribers ) {
if ( subscriber.complete ) {
try {
2021-06-03 03:36:25 +00:00
await subscriber.complete(finalValue)
2021-06-02 01:59:40 +00:00
} catch (e) {
2021-10-18 18:03:28 +00:00
if ( subscriber.error && e instanceof Error ) {
2021-06-02 01:59:40 +00:00
await subscriber.error(e)
} else {
throw e
}
}
}
}
2021-06-03 03:36:25 +00:00
this.subjectIsComplete = true
2021-06-02 01:59:40 +00:00
}
/**
* Get the current value of this subject.
*/
public value(): T | undefined {
2021-06-03 03:36:25 +00:00
return this.currentValue
2021-06-02 01:59:40 +00:00
}
/**
* True if this subject is marked as complete.
* @return boolean
*/
public isComplete(): boolean {
2021-06-03 03:36:25 +00:00
return this.subjectIsComplete
2021-06-02 01:59:40 +00:00
}
}