Import other modules into monorepo
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
This commit is contained in:
199
src/util/support/BehaviorSubject.ts
Normal file
199
src/util/support/BehaviorSubject.ts
Normal file
@@ -0,0 +1,199 @@
|
||||
/**
|
||||
* Base error used to trigger an unsubscribe action from a subscriber.
|
||||
* @extends Error
|
||||
*/
|
||||
export class UnsubscribeError extends Error {}
|
||||
|
||||
/**
|
||||
* Thrown when a closed observable is pushed to.
|
||||
* @extends Error
|
||||
*/
|
||||
export class CompletedObservableError extends Error {
|
||||
constructor() {
|
||||
super('This observable can no longer be pushed to, as it has been completed.')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Type of a basic subscriber function.
|
||||
*/
|
||||
export type SubscriberFunction<T> = (val: T) => any
|
||||
|
||||
/**
|
||||
* Type of a basic subscriber function that handles errors.
|
||||
*/
|
||||
export type SubscriberErrorFunction = (error: Error) => any
|
||||
|
||||
/**
|
||||
* Type of a basic subscriber function that handles completed events.
|
||||
*/
|
||||
export type SubscriberCompleteFunction<T> = (val?: T) => any
|
||||
|
||||
/**
|
||||
* Subscribers that define multiple handler methods.
|
||||
*/
|
||||
export type ComplexSubscriber<T> = {
|
||||
next?: SubscriberFunction<T>,
|
||||
error?: SubscriberErrorFunction,
|
||||
complete?: SubscriberCompleteFunction<T>,
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscription to a behavior subject.
|
||||
*/
|
||||
export type Subscription<T> = SubscriberFunction<T> | ComplexSubscriber<T>
|
||||
|
||||
/**
|
||||
* Object providing helpers for unsubscribing from a subscription.
|
||||
*/
|
||||
export type Unsubscribe = { unsubscribe: () => void }
|
||||
|
||||
/**
|
||||
* A stream-based state class.
|
||||
*/
|
||||
export class BehaviorSubject<T> {
|
||||
/**
|
||||
* Subscribers to this subject.
|
||||
* @type Array<ComplexSubscriber>
|
||||
*/
|
||||
protected subscribers: ComplexSubscriber<T>[] = []
|
||||
|
||||
/**
|
||||
* True if this subject has been marked complete.
|
||||
* @type boolean
|
||||
*/
|
||||
protected _isComplete: boolean = false
|
||||
|
||||
/**
|
||||
* The current value of this subject.
|
||||
*/
|
||||
protected _value?: T
|
||||
|
||||
/**
|
||||
* True if any value has been pushed to this subject.
|
||||
* @type boolean
|
||||
*/
|
||||
protected _hasPush: boolean = false
|
||||
|
||||
/**
|
||||
* Register a new subscription to this subject.
|
||||
* @param {Subscription} subscriber
|
||||
* @return Unsubscribe
|
||||
*/
|
||||
public subscribe(subscriber: Subscription<T>): Unsubscribe {
|
||||
if ( typeof subscriber === 'function' ) {
|
||||
this.subscribers.push({ next: subscriber })
|
||||
} else {
|
||||
this.subscribers.push(subscriber)
|
||||
}
|
||||
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
this.subscribers = this.subscribers.filter(x => x !== subscriber)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cast this subject to a promise, which resolves on the output of the next value.
|
||||
* @return Promise
|
||||
*/
|
||||
public toPromise(): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const { unsubscribe } = this.subscribe({
|
||||
next: (val: T) => {
|
||||
resolve(val)
|
||||
unsubscribe()
|
||||
},
|
||||
error: (error: Error) => {
|
||||
reject(error)
|
||||
unsubscribe()
|
||||
},
|
||||
complete: (val?: T) => {
|
||||
if ( typeof val !== 'undefined' ) resolve(val)
|
||||
unsubscribe()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a new value to this subject. The promise resolves when all subscribers have been pushed to.
|
||||
* @param val
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async next(val: T): Promise<void> {
|
||||
if ( this._isComplete ) throw new CompletedObservableError()
|
||||
this._value = val
|
||||
this._hasPush = true
|
||||
for ( const subscriber of this.subscribers ) {
|
||||
if ( subscriber.next ) {
|
||||
try {
|
||||
await subscriber.next(val)
|
||||
} catch (e) {
|
||||
if ( e instanceof UnsubscribeError ) {
|
||||
this.subscribers = this.subscribers.filter(x => x !== subscriber)
|
||||
} else if (subscriber.error) {
|
||||
await subscriber.error(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Push the given array of values to this subject in order.
|
||||
* The promise resolves when all subscribers have been pushed to for all values.
|
||||
* @param {Array} vals
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async push(vals: T[]): Promise<void> {
|
||||
if ( this._isComplete ) throw new CompletedObservableError()
|
||||
await Promise.all(vals.map(val => this.next(val)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this subject as complete.
|
||||
* The promise resolves when all subscribers have been pushed to.
|
||||
* @param [final_val] - optionally, a final value to set
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async complete(final_val?: T): Promise<void> {
|
||||
if ( this._isComplete ) throw new CompletedObservableError()
|
||||
if ( typeof final_val === 'undefined' ) final_val = this.value()
|
||||
else this._value = final_val
|
||||
|
||||
for ( const subscriber of this.subscribers ) {
|
||||
if ( subscriber.complete ) {
|
||||
try {
|
||||
await subscriber.complete(final_val)
|
||||
} catch (e) {
|
||||
if ( subscriber.error ) {
|
||||
await subscriber.error(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this._isComplete = true
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current value of this subject.
|
||||
*/
|
||||
public value(): T | undefined {
|
||||
return this._value
|
||||
}
|
||||
|
||||
/**
|
||||
* True if this subject is marked as complete.
|
||||
* @return boolean
|
||||
*/
|
||||
public isComplete(): boolean {
|
||||
return this._isComplete
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user