You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

116 lines
3.5 KiB

export class UnsubscribeError extends Error {}
export class CompletedObservableError extends Error {
constructor() {
super('This observable can no longer be pushed to, as it has been completed.')
}
}
export type SubscriberFunction<T> = (val: T) => any
export type SubscriberErrorFunction = (error: Error) => any
export type SubscriberCompleteFunction<T> = (val?: T) => any
export type ComplexSubscriber<T> = {
next?: SubscriberFunction<T>,
error?: SubscriberErrorFunction,
complete?: SubscriberCompleteFunction<T>,
}
export type Subscription<T> = SubscriberFunction<T> | ComplexSubscriber<T>
export type Unsubscribe = { unsubscribe: () => void }
export class BehaviorSubject<T> {
protected subscribers: ComplexSubscriber<T>[] = []
protected _is_complete: boolean = false
protected _value?: T
protected _has_push: boolean = false
public subscribe(subscriber: Subscription<T>): Unsubscribe {
if ( typeof subscriber === 'function' ) {
this.subscribers.push({ next: subscriber })
} else {
this.subscribers.push(subscriber)
}
return {
unsubscribe: () => {
this.subscribers = this.subscribers.filter(x => x !== subscriber)
}
}
}
public to_promise(): Promise<T> {
return new Promise((resolve, reject) => {
const { unsubscribe } = this.subscribe({
next: (val: T) => {
resolve(val)
unsubscribe()
},
error: (error: Error) => {
reject(error)
unsubscribe()
},
complete: (val?: T) => {
resolve(val)
unsubscribe()
}
})
})
}
public async next(val: T): Promise<void> {
if ( this._is_complete ) throw new CompletedObservableError()
this._value = val
this._has_push = true
for ( const subscriber of this.subscribers ) {
if ( subscriber.next ) {
try {
await subscriber.next(val)
} catch (e) {
if ( e instanceof UnsubscribeError ) {
this.subscribers = this.subscribers.filter(x => x !== subscriber)
} else if (subscriber.error) {
await subscriber.error(e)
} else {
throw e
}
}
}
}
}
public async push(vals: T[]): Promise<void> {
if ( this._is_complete ) throw new CompletedObservableError()
await Promise.all(vals.map(val => this.next(val)))
}
public async complete(final_val?: T): Promise<void> {
if ( this._is_complete ) throw new CompletedObservableError()
if ( typeof final_val === 'undefined' ) final_val = this.value()
else this._value = final_val
for ( const subscriber of this.subscribers ) {
if ( subscriber.complete ) {
try {
await subscriber.complete(final_val)
} catch (e) {
if ( subscriber.error ) {
await subscriber.error(e)
} else {
throw e
}
}
}
}
this._is_complete = true
}
public value(): T | undefined {
return this._value
}
public is_complete(): boolean {
return this._is_complete
}
}