Shutdown Doc worker when it is not considered as available in Redis #831

pull/856/head
Florent FAYOLLE 4 months ago committed by fflorent
parent 3405ce902e
commit 09aa4e714e

@ -62,6 +62,10 @@ class DummyDocWorkerMap implements IDocWorkerMap {
this._available = available;
}
public onWorkerUnavailable(workerId: string, cb: () => void) {
// nothing to do
}
public async releaseAssignment(workerId: string, docId: string): Promise<void> {
// nothing to do
}
@ -306,6 +310,28 @@ export class DocWorkerMap implements IDocWorkerMap {
}
}
public onWorkerUnavailable(workerId: string, cb: () => void, nbFailures = 0) {
setTimeout(async () => {
if (nbFailures >= 3) {
log.error(
'DocWorkerMap: Presence checker failed 3 times, considering the worker %d is not available anymore',
workerId
);
return cb();
}
try {
if (!await this._client.sismemberAsync('workers-available', workerId)) {
log.error("DocWorkerMap: Worker %d has been marked as unavailable in Redis", workerId);
return cb();
}
return this.onWorkerUnavailable(workerId, cb);
} catch (err) {
log.error('DocWorkerMap: Presence checker failed', err);
return this.onWorkerUnavailable(workerId, cb, nbFailures + 1);
}
}, 30_000);
}
public async releaseAssignment(workerId: string, docId: string): Promise<void> {
const op = this._client.multi();
op.del(`doc-${docId}`);

@ -57,6 +57,10 @@ export interface IDocWorkerMap extends IPermitStores, IElectionStore, IChecksumS
// release existing assignments.
setWorkerAvailability(workerId: string, available: boolean): Promise<void>;
// Call cb when worker has been marked as unavailable in Redis.
// This is used to shutdown doc workers gracefully.
onWorkerUnavailable(workerId: string, cb: () => void): void;
// Releases doc from worker, freeing it to be assigned elsewhere.
// Assignments should only be released for workers that are now unavailable.
releaseAssignment(workerId: string, docId: string): Promise<void>;

@ -1938,6 +1938,12 @@ export class FlexServer implements GristServer {
}
await workers.addWorker(this.worker);
await workers.setWorkerAvailability(this.worker.id, true);
if (!process.env.GRIST_MANAGED_WORKERS) {
workers.onWorkerUnavailable(this.worker.id, async () => {
await this._shutdown();
});
}
} catch (err) {
this._healthy = false;
throw err;

Loading…
Cancel
Save