Start reimplementation in typescript
This commit is contained in:
19
src/util/lifecycle.ts
Normal file
19
src/util/lifecycle.ts
Normal file
@@ -0,0 +1,19 @@
|
||||
import {Awaitable} from './types.js'
|
||||
|
||||
export type LifecycleCallback = () => Awaitable<unknown>
|
||||
|
||||
export type LifecycleAware = {
|
||||
adoptLifecycle(lifecycle: Lifecycle): void;
|
||||
}
|
||||
|
||||
export class Lifecycle {
|
||||
private onCloses: LifecycleCallback[] = []
|
||||
|
||||
onClose(closure: LifecycleCallback): void {
|
||||
this.onCloses.push(closure)
|
||||
}
|
||||
|
||||
close() {
|
||||
this.onCloses.map(x => x())
|
||||
}
|
||||
}
|
||||
122
src/util/log.ts
Normal file
122
src/util/log.ts
Normal file
@@ -0,0 +1,122 @@
|
||||
import {Awaitable} from './types.js'
|
||||
|
||||
export enum LogLevel {
|
||||
VERBOSE = 0,
|
||||
DEBUG = 1,
|
||||
INFO = 2,
|
||||
WARN = 3,
|
||||
ERROR = 4,
|
||||
}
|
||||
|
||||
export const logLevelDisplay: Record<LogLevel, string> = {
|
||||
[LogLevel.VERBOSE]: 'verb',
|
||||
[LogLevel.DEBUG]: 'debug',
|
||||
[LogLevel.INFO]: 'info',
|
||||
[LogLevel.WARN]: 'warn',
|
||||
[LogLevel.ERROR]: 'error',
|
||||
}
|
||||
|
||||
export type LogLevels = {
|
||||
default: LogLevel,
|
||||
streams: Record<string, LogLevel>,
|
||||
}
|
||||
|
||||
export type LogMessage = {
|
||||
level: LogLevel,
|
||||
timestamp: Date,
|
||||
stream: string,
|
||||
data: unknown,
|
||||
}
|
||||
|
||||
export type StreamLogger = {
|
||||
verbose(data: unknown): void,
|
||||
debug(data: unknown): void,
|
||||
info(data: unknown): void,
|
||||
warn(data: unknown): void,
|
||||
error(data: unknown): void,
|
||||
}
|
||||
|
||||
export abstract class Logger {
|
||||
protected logLevels: LogLevels
|
||||
|
||||
constructor(
|
||||
defaultLevel: LogLevel,
|
||||
) {
|
||||
this.logLevels = {
|
||||
default: defaultLevel,
|
||||
streams: {},
|
||||
}
|
||||
}
|
||||
|
||||
setDefaultLevel(level: LogLevel) {
|
||||
this.logLevels.default = level
|
||||
}
|
||||
|
||||
setStreamLevel(stream: string, level: LogLevel) {
|
||||
this.logLevels.streams[stream] = level
|
||||
}
|
||||
|
||||
getStreamLogger(stream: string): StreamLogger {
|
||||
return {
|
||||
verbose: (data: unknown) => this.verbose(stream, data),
|
||||
debug: (data: unknown) => this.debug(stream, data),
|
||||
info: (data: unknown) => this.info(stream, data),
|
||||
warn: (data: unknown) => this.warn(stream, data),
|
||||
error: (data: unknown) => this.error(stream, data),
|
||||
}
|
||||
}
|
||||
|
||||
verbose(stream: string, data: unknown): Awaitable<void> {
|
||||
return this.logAtLevel(LogLevel.VERBOSE, stream, data)
|
||||
}
|
||||
|
||||
debug(stream: string, data: unknown): Awaitable<void> {
|
||||
return this.logAtLevel(LogLevel.DEBUG, stream, data)
|
||||
}
|
||||
|
||||
info(stream: string, data: unknown): Awaitable<void> {
|
||||
return this.logAtLevel(LogLevel.INFO, stream, data)
|
||||
}
|
||||
|
||||
warn(stream: string, data: unknown): Awaitable<void> {
|
||||
return this.logAtLevel(LogLevel.WARN, stream, data)
|
||||
}
|
||||
|
||||
error(stream: string, data: unknown): Awaitable<void> {
|
||||
return this.logAtLevel(LogLevel.ERROR, stream, data)
|
||||
}
|
||||
|
||||
logAtLevel(level: LogLevel, stream: string, data: unknown): Awaitable<void> {
|
||||
return this.log({
|
||||
timestamp: new Date,
|
||||
level,
|
||||
stream,
|
||||
data,
|
||||
})
|
||||
}
|
||||
|
||||
shouldLog(message: LogMessage): boolean {
|
||||
return message.level >= this.getLevelForStream(message.stream)
|
||||
}
|
||||
|
||||
log(message: LogMessage): Awaitable<void> {
|
||||
if ( this.shouldLog(message) ) {
|
||||
return this.write(message)
|
||||
}
|
||||
}
|
||||
|
||||
getLevelForStream(stream: string): LogLevel {
|
||||
if ( stream in this.logLevels.streams ) {
|
||||
return this.logLevels.streams[stream]
|
||||
}
|
||||
return this.logLevels.default
|
||||
}
|
||||
|
||||
protected abstract write(message: LogMessage): Awaitable<void>
|
||||
}
|
||||
|
||||
export class ConsoleLogger extends Logger {
|
||||
protected write(message: LogMessage): Awaitable<void> {
|
||||
console.log(`[${message.stream}] [${logLevelDisplay[message.level]}] [${message.timestamp.toISOString()}]`, message.data)
|
||||
}
|
||||
}
|
||||
225
src/util/subject.ts
Normal file
225
src/util/subject.ts
Normal file
@@ -0,0 +1,225 @@
|
||||
/**
|
||||
* Base error used to trigger an unsubscribe action from a subscriber.
|
||||
* @extends Error
|
||||
*/
|
||||
export class UnsubscribeError extends Error {}
|
||||
|
||||
/**
|
||||
* Thrown when a closed observable is pushed to.
|
||||
* @extends Error
|
||||
*/
|
||||
export class CompletedObservableError extends Error {
|
||||
constructor() {
|
||||
super('This observable can no longer be pushed to, as it has been completed.')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Type of a basic subscriber function.
|
||||
*/
|
||||
export type SubscriberFunction<T> = (val: T) => any
|
||||
|
||||
/**
|
||||
* Type of a basic subscriber function that handles errors.
|
||||
*/
|
||||
export type SubscriberErrorFunction = (error: Error) => any
|
||||
|
||||
/**
|
||||
* Type of a basic subscriber function that handles completed events.
|
||||
*/
|
||||
export type SubscriberCompleteFunction<T> = (val?: T) => any
|
||||
|
||||
/**
|
||||
* Subscribers that define multiple handler methods.
|
||||
*/
|
||||
export type ComplexSubscriber<T> = {
|
||||
next?: SubscriberFunction<T>,
|
||||
error?: SubscriberErrorFunction,
|
||||
complete?: SubscriberCompleteFunction<T>,
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscription to a behavior subject.
|
||||
*/
|
||||
export type Subscription<T> = SubscriberFunction<T> | ComplexSubscriber<T>
|
||||
|
||||
/**
|
||||
* Object providing helpers for unsubscribing from a subscription.
|
||||
*/
|
||||
export type Unsubscribe = { unsubscribe: () => void }
|
||||
|
||||
/**
|
||||
* A stream-based state class.
|
||||
*/
|
||||
export class BehaviorSubject<T> {
|
||||
/**
|
||||
* Subscribers to this subject.
|
||||
* @type Array<ComplexSubscriber>
|
||||
*/
|
||||
protected subscribers: ComplexSubscriber<T>[] = []
|
||||
|
||||
/**
|
||||
* True if this subject has been marked complete.
|
||||
* @type boolean
|
||||
*/
|
||||
protected subjectIsComplete = false
|
||||
|
||||
/**
|
||||
* The current value of this subject.
|
||||
*/
|
||||
protected currentValue?: T
|
||||
|
||||
/**
|
||||
* True if any value has been pushed to this subject.
|
||||
* @type boolean
|
||||
*/
|
||||
protected hasPush = false
|
||||
|
||||
/**
|
||||
* Register a new subscription to this subject.
|
||||
* @param {Subscription} subscriber
|
||||
* @return Unsubscribe
|
||||
*/
|
||||
public subscribe(subscriber: Subscription<T>): Unsubscribe {
|
||||
if ( typeof subscriber === 'function' ) {
|
||||
this.subscribers.push({ next: subscriber })
|
||||
} else {
|
||||
this.subscribers.push(subscriber)
|
||||
}
|
||||
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
this.subscribers = this.subscribers.filter(x => x !== subscriber)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
public pipe<T2>(mapper: (val: T) => T2|Promise<T2>): BehaviorSubject<T2> {
|
||||
const sub = new BehaviorSubject<T2>()
|
||||
this.subscribe(async val => sub.next(await mapper(val)))
|
||||
return sub
|
||||
}
|
||||
|
||||
public pipeFlat<T2>(mapper: (val: T) => T2[]|Promise<T2[]>): BehaviorSubject<T2> {
|
||||
const sub = new BehaviorSubject<T2>()
|
||||
this.subscribe(async val => {
|
||||
const vals = await mapper(val)
|
||||
return Promise.all(vals.map(val => sub.next(val)))
|
||||
})
|
||||
return sub
|
||||
}
|
||||
|
||||
/**
|
||||
* Cast this subject to a promise, which resolves on the output of the next value.
|
||||
* @return Promise
|
||||
*/
|
||||
public toPromise(): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const { unsubscribe } = this.subscribe({
|
||||
next: (val: T) => {
|
||||
resolve(val)
|
||||
unsubscribe()
|
||||
},
|
||||
error: (error: Error) => {
|
||||
reject(error)
|
||||
unsubscribe()
|
||||
},
|
||||
complete: (val?: T) => {
|
||||
if ( typeof val !== 'undefined' ) {
|
||||
resolve(val)
|
||||
}
|
||||
unsubscribe()
|
||||
},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a new value to this subject. The promise resolves when all subscribers have been pushed to.
|
||||
* @param val
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async next(val: T): Promise<void> {
|
||||
if ( this.subjectIsComplete ) {
|
||||
throw new CompletedObservableError()
|
||||
}
|
||||
this.currentValue = val
|
||||
this.hasPush = true
|
||||
for ( const subscriber of this.subscribers ) {
|
||||
if ( subscriber.next ) {
|
||||
try {
|
||||
await subscriber.next(val)
|
||||
} catch (e) {
|
||||
if ( e instanceof UnsubscribeError ) {
|
||||
this.subscribers = this.subscribers.filter(x => x !== subscriber)
|
||||
} else if (subscriber.error && e instanceof Error) {
|
||||
await subscriber.error(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Push the given array of values to this subject in order.
|
||||
* The promise resolves when all subscribers have been pushed to for all values.
|
||||
* @param {Array} vals
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async push(vals: T[]): Promise<void> {
|
||||
if ( this.subjectIsComplete ) {
|
||||
throw new CompletedObservableError()
|
||||
}
|
||||
await Promise.all(vals.map(val => this.next(val)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this subject as complete.
|
||||
* The promise resolves when all subscribers have been pushed to.
|
||||
* @param [finalValue] - optionally, a final value to set
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async complete(finalValue?: T): Promise<void> {
|
||||
if ( this.subjectIsComplete ) {
|
||||
throw new CompletedObservableError()
|
||||
}
|
||||
if ( typeof finalValue === 'undefined' ) {
|
||||
finalValue = this.value()
|
||||
} else {
|
||||
this.currentValue = finalValue
|
||||
}
|
||||
|
||||
for ( const subscriber of this.subscribers ) {
|
||||
if ( subscriber.complete ) {
|
||||
try {
|
||||
await subscriber.complete(finalValue)
|
||||
} catch (e) {
|
||||
if ( subscriber.error && e instanceof Error ) {
|
||||
await subscriber.error(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.subjectIsComplete = true
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current value of this subject.
|
||||
*/
|
||||
public value(): T | undefined {
|
||||
return this.currentValue
|
||||
}
|
||||
|
||||
/**
|
||||
* True if this subject is marked as complete.
|
||||
* @return boolean
|
||||
*/
|
||||
public isComplete(): boolean {
|
||||
return this.subjectIsComplete
|
||||
}
|
||||
}
|
||||
9
src/util/types.ts
Normal file
9
src/util/types.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
export type Awaitable<T> = T | Promise<T>
|
||||
|
||||
export type JSONScalar = string | boolean | number | undefined
|
||||
export type JSONData = JSONScalar | Array<JSONScalar | JSONData> | { [key: string]: JSONScalar | JSONData }
|
||||
|
||||
/** A typescript-compatible version of Object.hasOwnProperty. */
|
||||
export function hasOwnProperty<X extends {}, Y extends PropertyKey>(obj: X, prop: Y): obj is X & Record<Y, unknown> { // eslint-disable-line @typescript-eslint/ban-types
|
||||
return Object.hasOwnProperty.call(obj, prop)
|
||||
}
|
||||
Reference in New Issue
Block a user