Clear timeout on shutdown #831

pull/856/head
Florent FAYOLLE 4 months ago committed by fflorent
parent 09aa4e714e
commit aaa8b3b5dc

@ -62,8 +62,8 @@ class DummyDocWorkerMap implements IDocWorkerMap {
this._available = available;
}
public onWorkerUnavailable(workerId: string, cb: () => void) {
// nothing to do
public onWorkerUnavailable(workerId: string, cb: () => void): () => void {
return () => {};
}
public async releaseAssignment(workerId: string, docId: string): Promise<void> {
@ -310,26 +310,33 @@ export class DocWorkerMap implements IDocWorkerMap {
}
}
public onWorkerUnavailable(workerId: string, cb: () => void, nbFailures = 0) {
setTimeout(async () => {
if (nbFailures >= 3) {
log.error(
'DocWorkerMap: Presence checker failed 3 times, considering the worker %d is not available anymore',
workerId
);
return cb();
}
try {
if (!await this._client.sismemberAsync('workers-available', workerId)) {
log.error("DocWorkerMap: Worker %d has been marked as unavailable in Redis", workerId);
public onWorkerUnavailable(workerId: string, cb: () => void, nbFailures = 0): () => void {
let timeout: NodeJS.Timeout;
(function recursiveTimeout(self) {
timeout = setTimeout(async () => {
if (nbFailures >= 3) {
log.error(
'DocWorkerMap: Presence checker failed 3 times, considering the worker %d is not available anymore',
workerId
);
return cb();
}
return this.onWorkerUnavailable(workerId, cb);
} catch (err) {
log.error('DocWorkerMap: Presence checker failed', err);
return this.onWorkerUnavailable(workerId, cb, nbFailures + 1);
}
}, 30_000);
try {
if (!await self._client.sismemberAsync('workers-available', workerId)) {
log.error("DocWorkerMap: Worker %d has been marked as unavailable in Redis", workerId);
return cb();
}
return recursiveTimeout(self);
} catch (err) {
log.error('DocWorkerMap: Presence checker failed for worker %d', workerId, err);
return recursiveTimeout(self);
}
}, 30_000);
})(this);
return () => {
clearTimeout(timeout);
};
}
public async releaseAssignment(workerId: string, docId: string): Promise<void> {

@ -59,7 +59,7 @@ export interface IDocWorkerMap extends IPermitStores, IElectionStore, IChecksumS
// Call cb when worker has been marked as unavailable in Redis.
// This is used to shutdown doc workers gracefully.
onWorkerUnavailable(workerId: string, cb: () => void): void;
onWorkerUnavailable(workerId: string, cb: () => void): () => void;
// Releases doc from worker, freeing it to be assigned elsewhere.
// Assignments should only be released for workers that are now unavailable.

@ -179,6 +179,7 @@ export class FlexServer implements GristServer {
// Set once ready() is called
private _isReady: boolean = false;
private _probes: BootProbes;
private _unsubscribeWorkerUnavailableListener?: () => void;
constructor(public port: number, public name: string = 'flexServer',
public readonly options: FlexServerOptions = {}) {
@ -1939,7 +1940,7 @@ export class FlexServer implements GristServer {
await workers.addWorker(this.worker);
await workers.setWorkerAvailability(this.worker.id, true);
if (!process.env.GRIST_MANAGED_WORKERS) {
workers.onWorkerUnavailable(this.worker.id, async () => {
this._unsubscribeWorkerUnavailableListener = workers.onWorkerUnavailable(this.worker.id, async () => {
await this._shutdown();
});
}
@ -1975,6 +1976,7 @@ export class FlexServer implements GristServer {
// We urgently want to disable any new assignments.
await workers.setWorkerAvailability(this.worker.id, false);
this._unsubscribeWorkerUnavailableListener?.();
// Enumerate the documents we are responsible for.
let assignments = await workers.getAssignments(this.worker.id);

Loading…
Cancel
Save