Add request container lifecycle handling

This commit is contained in:
2022-03-30 23:04:00 -05:00
parent 514a578260
commit 78cb26fcb2
8 changed files with 103 additions and 16 deletions

View File

@@ -1,4 +1,4 @@
import {Inject, Singleton, StaticInstantiable} from '../../di'
import {AwareOfContainerLifecycle, Inject, Singleton, StaticInstantiable} from '../../di'
import {
BusConnectorConfig,
BusSubscriber,
@@ -25,7 +25,9 @@ export interface BusInternalSubscription {
* Propagating event bus implementation.
*/
@Singleton()
export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<TEvent> {
export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<TEvent>, AwareOfContainerLifecycle {
awareOfContainerLifecycle: true = true
@Inject()
protected readonly logging!: Logging
@@ -228,4 +230,8 @@ export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<
this.isUp = false
}
onContainerDestroy(): Awaitable<void> {
this.down()
}
}

View File

@@ -1,4 +1,4 @@
import {Inject, Injectable, StaticInstantiable} from '../../di'
import {AwareOfContainerLifecycle, Inject, Injectable, StaticInstantiable} from '../../di'
import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types'
import {Awaitable, Collection, ifDebugging, Pipeline, uuid4} from '../../util'
import {Logging} from '../../service/Logging'
@@ -10,7 +10,9 @@ import {CanonicalItemClass} from '../CanonicalReceiver'
* Non-connectable event bus implementation. Can forward events to the main Bus instance.
*/
@Injectable()
export class LocalBus<TEvent extends Event = Event> extends CanonicalItemClass implements EventBus<TEvent> {
export class LocalBus<TEvent extends Event = Event> extends CanonicalItemClass implements EventBus<TEvent>, AwareOfContainerLifecycle {
awareOfContainerLifecycle: true = true
@Inject()
protected readonly logging!: Logging
@@ -135,4 +137,8 @@ export class LocalBus<TEvent extends Event = Event> extends CanonicalItemClass i
this.isUp = false
}
onContainerRelease(): Awaitable<void> {
this.down()
}
}

View File

@@ -1,5 +1,5 @@
import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types'
import {Container, Inject, Injectable, StaticInstantiable} from '../../di'
import {AwareOfContainerLifecycle, Container, Inject, Injectable, StaticInstantiable} from '../../di'
import {Awaitable, Collection, Pipeline, uuid4} from '../../util'
import {Redis} from '../redis/Redis'
import {Serialization} from './serial/Serialization'
@@ -11,7 +11,9 @@ import {getEventName} from './getEventName'
* Event bus implementation that does pub/sub over a Redis connection.
*/
@Injectable()
export class RedisBus implements EventBus {
export class RedisBus implements EventBus, AwareOfContainerLifecycle {
awareOfContainerLifecycle: true = true
@Inject()
protected readonly redis!: Redis
@@ -125,8 +127,11 @@ export class RedisBus implements EventBus {
}
down(): Awaitable<void> {
// The Redis service will clean up the connections when the framework exits,
// so we don't need to do anything here.
return undefined
this.subscriberConnection?.disconnect()
this.publisherConnection?.disconnect()
}
onContainerRelease(): Awaitable<void> {
this.down()
}
}