Several fixes #831

pull/856/head
Florent FAYOLLE 4 months ago committed by fflorent
parent aaa8b3b5dc
commit b9b15db092

@ -62,7 +62,7 @@ class DummyDocWorkerMap implements IDocWorkerMap {
this._available = available;
}
public onWorkerUnavailable(workerId: string, cb: () => void): () => void {
public onWorkerUnavailable(workerInfo: DocWorkerInfo, cb: () => void): () => void {
return () => {};
}
@ -310,31 +310,34 @@ export class DocWorkerMap implements IDocWorkerMap {
}
}
public onWorkerUnavailable(workerId: string, cb: () => void, nbFailures = 0): () => void {
public onWorkerUnavailable(workerInfo: DocWorkerInfo, cb: () => void): () => void {
let timeout: NodeJS.Timeout;
const group = workerInfo.group || 'default';
const workerId = workerInfo.id;
(function recursiveTimeout(self) {
(function recursiveTimeout(self, nbFailures = 0) {
timeout = setTimeout(async () => {
if (nbFailures >= 3) {
log.error(
'DocWorkerMap: Presence checker failed 3 times, considering the worker %d is not available anymore',
'DocWorkerMap: Presence checker failed 3 times, considering the worker "%s" is not available anymore',
workerId
);
return cb();
}
try {
if (!await self._client.sismemberAsync('workers-available', workerId)) {
log.error("DocWorkerMap: Worker %d has been marked as unavailable in Redis", workerId);
if (!await self._client.sismemberAsync(`workers-available-${group}`, workerId)) {
log.error('DocWorkerMap: Worker "%s" has been marked as unavailable in Redis', workerId);
return cb();
}
return recursiveTimeout(self);
} catch (err) {
log.error('DocWorkerMap: Presence checker failed for worker %d', workerId, err);
return recursiveTimeout(self);
log.error('DocWorkerMap: Presence checker failed for worker "%s"', workerId, err);
return recursiveTimeout(self, nbFailures + 1);
}
}, 30_000);
})(this);
return () => {
log.info('DocWorkerMap: Clearing presence checker for worker "%s"', workerId);
clearTimeout(timeout);
};
}

@ -6,7 +6,7 @@
import { IChecksumStore } from 'app/server/lib/IChecksumStore';
import { IElectionStore } from 'app/server/lib/IElectionStore';
import { IPermitStores } from 'app/server/lib/Permit';
import {RedisClient} from 'redis';
import { RedisClient } from 'redis';
export interface DocWorkerInfo {
id: string;
@ -59,7 +59,7 @@ export interface IDocWorkerMap extends IPermitStores, IElectionStore, IChecksumS
// Call cb when worker has been marked as unavailable in Redis.
// This is used to shutdown doc workers gracefully.
onWorkerUnavailable(workerId: string, cb: () => void): () => void;
onWorkerUnavailable(workerInfo: DocWorkerInfo, cb: () => void): () => void;
// Releases doc from worker, freeing it to be assigned elsewhere.
// Assignments should only be released for workers that are now unavailable.

@ -1940,7 +1940,8 @@ export class FlexServer implements GristServer {
await workers.addWorker(this.worker);
await workers.setWorkerAvailability(this.worker.id, true);
if (!process.env.GRIST_MANAGED_WORKERS) {
this._unsubscribeWorkerUnavailableListener = workers.onWorkerUnavailable(this.worker.id, async () => {
this._unsubscribeWorkerUnavailableListener = workers.onWorkerUnavailable(this.worker, async () => {
log.info("DocWorker became unavailable, shutting down");
await this._shutdown();
});
}

Loading…
Cancel
Save