mirror of
https://github.com/gristlabs/grist-core.git
synced 2024-10-27 20:44:07 +00:00
(core) support GRIST_WORKER_GROUP to place worker into an exclusive group
Summary: In an emergency, we may want to serve certain documents with "old" workers as we fix problems. This diff adds some support for that. * Creates duplicate task definitions and services for staging and production doc workers (called grist-docs-staging2 and grist-docs-prod2), pulling from distinct docker tags (staging2 and prod2). The services are set to have zero workers until we need them. * These new workers are started with a new env variable `GRIST_WORKER_GROUP` set to `secondary`. * The `GRIST_WORKER_GROUP` variable, if set, makes the worker available to documents in the named group, and only that group. * An unauthenticated `/assign` endpoint is added to documents which, when POSTed to, checks that the doc is served by a worker in the desired group for that doc (as set manually in redis), and if not frees the doc up for reassignment. This makes it possible to move individual docs between workers without redeployments. The bash scripts added are a record of how the task definitions + services were created. The services could just have been copied manually, but the task definitions will need to be updated whenever the definitions for the main doc workers are updated, so it is worth scripting that. For example, if a certain document were to fail on a new deployment of Grist, but rolling back the full deployment wasn't practical: * Set prod2 tag in docker to desired codebase for that document * Set desired_count for grist-docs-prod2 service to non-zero * Set doc-<docid>-group for that doc in redis to secondary * Hit /api/docs/<docid>/assign to move the doc to grist-docs-prod2 (If the document needs to be reverted to a previous snapshot, that currently would need doing manually - could be made simpler, but not in scope of this diff). Test Plan: added tests Reviewers: dsagal Reviewed By: dsagal Differential Revision: https://phab.getgrist.com/D2649
This commit is contained in:
parent
275a35d03a
commit
d6ff1361cb
@ -32,9 +32,10 @@ export class DocApiForwarder {
|
||||
// Middleware to forward a request about an existing document that user has access to.
|
||||
// We do not check whether the document has been soft-deleted; that will be checked by
|
||||
// the worker if needed.
|
||||
const withDoc = expressWrap(this._forwardToDocWorker.bind(this, true));
|
||||
const withDoc = expressWrap(this._forwardToDocWorker.bind(this, true, 'viewers'));
|
||||
// Middleware to forward a request without a pre-existing document (for imports/uploads).
|
||||
const withoutDoc = expressWrap(this._forwardToDocWorker.bind(this, false));
|
||||
const withoutDoc = expressWrap(this._forwardToDocWorker.bind(this, false, null));
|
||||
const withDocWithoutAuth = expressWrap(this._forwardToDocWorker.bind(this, true, null));
|
||||
app.use('/api/docs/:docId/tables', withDoc);
|
||||
app.use('/api/docs/:docId/force-reload', withDoc);
|
||||
app.use('/api/docs/:docId/remove', withDoc);
|
||||
@ -47,14 +48,17 @@ export class DocApiForwarder {
|
||||
app.use('/api/docs/:docId/flush', withDoc);
|
||||
app.use('/api/docs/:docId/states', withDoc);
|
||||
app.use('/api/docs/:docId/compare', withDoc);
|
||||
app.use('/api/docs/:docId/assign', withDocWithoutAuth);
|
||||
app.use('^/api/docs$', withoutDoc);
|
||||
}
|
||||
|
||||
private async _forwardToDocWorker(withDocId: boolean, req: express.Request, res: express.Response): Promise<void> {
|
||||
private async _forwardToDocWorker(withDocId: boolean, role: 'viewers'|null, req: express.Request, res: express.Response): Promise<void> {
|
||||
let docId: string|null = null;
|
||||
if (withDocId) {
|
||||
const docAuth = await getOrSetDocAuth(req as RequestWithLogin, this._dbManager, req.params.docId);
|
||||
assertAccess('viewers', docAuth, {allowRemoved: true});
|
||||
if (role) {
|
||||
assertAccess(role, docAuth, {allowRemoved: true});
|
||||
}
|
||||
docId = docAuth.docId;
|
||||
}
|
||||
// Use the docId for worker assignment, rather than req.params.docId, which could be a urlId.
|
||||
|
@ -110,6 +110,14 @@ class DummyDocWorkerMap implements IDocWorkerMap {
|
||||
public async getChecksum(family: string, key: string) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public async getWorkerGroup(workerId: string): Promise<string|null> {
|
||||
return null;
|
||||
}
|
||||
|
||||
public async getDocGroup(docId: string): Promise<string|null> {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -140,6 +148,7 @@ class DummyDocWorkerMap implements IDocWorkerMap {
|
||||
*/
|
||||
export class DocWorkerMap implements IDocWorkerMap {
|
||||
private _client: RedisClient;
|
||||
private _clients: RedisClient[];
|
||||
private _redlock: Redlock;
|
||||
|
||||
// Optional deploymentKey argument supplies a key unique to the deployment (this is important
|
||||
@ -148,19 +157,31 @@ export class DocWorkerMap implements IDocWorkerMap {
|
||||
permitMsec?: number
|
||||
}) {
|
||||
this._deploymentKey = this._deploymentKey || version.version;
|
||||
_clients = _clients || [createClient(process.env.REDIS_URL)];
|
||||
this._redlock = new Redlock(_clients);
|
||||
this._client = _clients[0]!;
|
||||
this._clients = _clients || [createClient(process.env.REDIS_URL)];
|
||||
this._redlock = new Redlock(this._clients);
|
||||
this._client = this._clients[0]!;
|
||||
}
|
||||
|
||||
public async addWorker(info: DocWorkerInfo): Promise<void> {
|
||||
log.info(`DocWorkerMap.addWorker ${info.id}`);
|
||||
const lock = await this._redlock.lock('workers-lock', LOCK_TIMEOUT);
|
||||
try {
|
||||
// Make a worker-{workerId} key with contact info, then add this worker to available set.
|
||||
// Make a worker-{workerId} key with contact info.
|
||||
await this._client.hmsetAsync(`worker-${info.id}`, info);
|
||||
// Add this worker to set of workers (but don't make it available for work yet).
|
||||
await this._client.saddAsync('workers', info.id);
|
||||
// Figure out if worker should belong to a group
|
||||
|
||||
if (info.group) {
|
||||
// Accept work only for a specific group.
|
||||
// Do not accept work not associated with the specified group.
|
||||
await this._client.setAsync(`worker-${info.id}-group`, info.group);
|
||||
} else {
|
||||
// Figure out if worker should belong to a group via elections.
|
||||
// Be careful: elections happen within a single deployment, so are somewhat
|
||||
// unintuitive in behavior. For example, if a document is assigned to a group
|
||||
// but there is no worker available for that group, it may open on any worker.
|
||||
// And if a worker is assigned to a group, it may still end up assigned work
|
||||
// not associated with that group if it is the only worker available.
|
||||
const groups = await this._client.hgetallAsync('groups');
|
||||
if (groups) {
|
||||
const elections = await this._client.hgetallAsync(`elections-${this._deploymentKey}`) || {};
|
||||
@ -175,6 +196,7 @@ export class DocWorkerMap implements IDocWorkerMap {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
await lock.unlock();
|
||||
}
|
||||
@ -237,8 +259,14 @@ export class DocWorkerMap implements IDocWorkerMap {
|
||||
log.info(`DocWorkerMap.setWorkerAvailability ${workerId} ${available}`);
|
||||
const group = await this._client.getAsync(`worker-${workerId}-group`) || 'default';
|
||||
if (available) {
|
||||
const docWorker = await this._client.hgetallAsync(`worker-${workerId}`) as DocWorkerInfo|null;
|
||||
if (!docWorker) { throw new Error('no doc worker contact info available'); }
|
||||
await this._client.saddAsync(`workers-available-${group}`, workerId);
|
||||
// If we're not assigned exclusively to a group, add this worker also to the general
|
||||
// pool of workers.
|
||||
if (!docWorker.group) {
|
||||
await this._client.saddAsync('workers-available', workerId);
|
||||
}
|
||||
} else {
|
||||
await this._client.sremAsync('workers-available', workerId);
|
||||
await this._client.sremAsync(`workers-available-${group}`, workerId);
|
||||
@ -408,7 +436,9 @@ export class DocWorkerMap implements IDocWorkerMap {
|
||||
}
|
||||
|
||||
public async close(): Promise<void> {
|
||||
// nothing to do
|
||||
for (const cli of this._clients || []) {
|
||||
await cli.quitAsync();
|
||||
}
|
||||
}
|
||||
|
||||
public async getElection(name: string, durationInMs: number): Promise<string|null> {
|
||||
@ -442,6 +472,14 @@ export class DocWorkerMap implements IDocWorkerMap {
|
||||
await lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public async getWorkerGroup(workerId: string): Promise<string|null> {
|
||||
return this._client.getAsync(`worker-${workerId}-group`);
|
||||
}
|
||||
|
||||
public async getDocGroup(docId: string): Promise<string|null> {
|
||||
return this._client.getAsync(`doc-${docId}-group`);
|
||||
}
|
||||
}
|
||||
|
||||
// If we don't have redis available and use a DummyDocWorker, it should be a singleton.
|
||||
|
@ -13,6 +13,7 @@ import { assertAccess, getOrSetDocAuth, getTransitiveHeaders, getUserId, isAnony
|
||||
import { DocManager } from "app/server/lib/DocManager";
|
||||
import { docSessionFromRequest, makeExceptionalDocSession, OptDocSession } from "app/server/lib/DocSession";
|
||||
import { DocWorker } from "app/server/lib/DocWorker";
|
||||
import { IDocWorkerMap } from "app/server/lib/DocWorkerMap";
|
||||
import { expressWrap } from 'app/server/lib/expressWrap';
|
||||
import { GristServer } from 'app/server/lib/GristServer';
|
||||
import { HashUtil } from 'app/server/lib/HashUtil';
|
||||
@ -69,7 +70,8 @@ function apiThrottle(usage: Map<string, number>,
|
||||
}
|
||||
|
||||
export class DocWorkerApi {
|
||||
constructor(private _app: Application, private _docWorker: DocWorker, private _docManager: DocManager,
|
||||
constructor(private _app: Application, private _docWorker: DocWorker,
|
||||
private _docWorkerMap: IDocWorkerMap, private _docManager: DocManager,
|
||||
private _dbManager: HomeDBManager, private _grist: GristServer) {}
|
||||
|
||||
/**
|
||||
@ -86,6 +88,8 @@ export class DocWorkerApi {
|
||||
const canEdit = expressWrap(this._assertAccess.bind(this, 'editors', false));
|
||||
// check user can edit document, with soft-deleted documents being acceptable
|
||||
const canEditMaybeRemoved = expressWrap(this._assertAccess.bind(this, 'editors', true));
|
||||
// check document exists, don't check user access
|
||||
const docExists = expressWrap(this._assertAccess.bind(this, null, false));
|
||||
|
||||
// Middleware to limit number of outstanding requests per document. Will also
|
||||
// handle errors like expressWrap would.
|
||||
@ -254,6 +258,29 @@ export class DocWorkerApi {
|
||||
res.json(true);
|
||||
}));
|
||||
|
||||
// Administrative endpoint, that checks if a document is in the expected group,
|
||||
// and frees it for reassignment if not. Has no effect if document is in the
|
||||
// expected group. Does not require specific rights. Returns true if the document
|
||||
// is freed up for reassignment, otherwise false.
|
||||
this._app.post('/api/docs/:docId/assign', docExists, throttled(async (req, res) => {
|
||||
const docId = getDocId(req);
|
||||
const status = await this._docWorkerMap.getDocWorker(docId);
|
||||
if (!status) { res.json(false); return; }
|
||||
const workerGroup = await this._docWorkerMap.getWorkerGroup(status.docWorker.id);
|
||||
const docGroup = await this._docWorkerMap.getDocGroup(docId);
|
||||
if (docGroup === workerGroup) { res.json(false); return; }
|
||||
const activeDoc = await this._getActiveDoc(req);
|
||||
await activeDoc.flushDoc();
|
||||
// flushDoc terminates once there's no pending operation on the document.
|
||||
// There could still be async operations in progess. We mute their effect,
|
||||
// as if they never happened.
|
||||
activeDoc.docClients.interruptAllClients();
|
||||
activeDoc.setMuted();
|
||||
await activeDoc.shutdown();
|
||||
await this._docWorkerMap.releaseAssignment(status.docWorker.id, docId);
|
||||
res.json(true);
|
||||
}));
|
||||
|
||||
// This endpoint cannot use withDoc since it is expected behavior for the ActiveDoc it
|
||||
// starts with to become muted.
|
||||
this._app.post('/api/docs/:docId/replace', canEdit, throttled(async (req, res) => {
|
||||
@ -416,12 +443,12 @@ export class DocWorkerApi {
|
||||
return this._docManager.getActiveDoc(getDocId(req));
|
||||
}
|
||||
|
||||
private async _assertAccess(role: 'viewers'|'editors', allowRemoved: boolean,
|
||||
private async _assertAccess(role: 'viewers'|'editors'|null, allowRemoved: boolean,
|
||||
req: Request, res: Response, next: NextFunction) {
|
||||
const scope = getDocScope(req);
|
||||
allowRemoved = scope.showAll || scope.showRemoved || allowRemoved;
|
||||
const docAuth = await getOrSetDocAuth(req as RequestWithLogin, this._dbManager, scope.urlId);
|
||||
assertAccess(role, docAuth, {allowRemoved});
|
||||
if (role) { assertAccess(role, docAuth, {allowRemoved}); }
|
||||
next();
|
||||
}
|
||||
|
||||
@ -575,10 +602,10 @@ export class DocWorkerApi {
|
||||
}
|
||||
|
||||
export function addDocApiRoutes(
|
||||
app: Application, docWorker: DocWorker, docManager: DocManager, dbManager: HomeDBManager,
|
||||
app: Application, docWorker: DocWorker, docWorkerMap: IDocWorkerMap, docManager: DocManager, dbManager: HomeDBManager,
|
||||
grist: GristServer
|
||||
) {
|
||||
const api = new DocWorkerApi(app, docWorker, docManager, dbManager, grist);
|
||||
const api = new DocWorkerApi(app, docWorker, docWorkerMap, docManager, dbManager, grist);
|
||||
api.addEndpoints();
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,9 @@ export interface DocWorkerInfo {
|
||||
|
||||
// The internal base URL for the docWorker.
|
||||
internalUrl: string;
|
||||
|
||||
// If set, worker should accept work only for this named group.
|
||||
group?: string;
|
||||
}
|
||||
|
||||
export interface DocStatus {
|
||||
@ -60,4 +63,7 @@ export interface IDocWorkerMap extends IPermitStore, IElectionStore, IChecksumSt
|
||||
// Get all assignments for a worker. Should only be queried for a worker that
|
||||
// is currently unavailable.
|
||||
getAssignments(workerId: string): Promise<string[]>;
|
||||
|
||||
getWorkerGroup(workerId: string): Promise<string|null>;
|
||||
getDocGroup(docId: string): Promise<string|null>;
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ import {DocWorkerInfo, IDocWorkerMap} from 'app/server/lib/DocWorkerMap';
|
||||
import {expressWrap, jsonErrorHandler} from 'app/server/lib/expressWrap';
|
||||
import {Hosts, RequestWithOrg} from 'app/server/lib/extractOrg';
|
||||
import {GristLoginMiddleware, GristServer} from 'app/server/lib/GristServer';
|
||||
import {initGristSessions} from 'app/server/lib/gristSessions';
|
||||
import {initGristSessions, SessionStore} from 'app/server/lib/gristSessions';
|
||||
import {HostedStorageManager} from 'app/server/lib/HostedStorageManager';
|
||||
import {IBilling} from 'app/server/lib/IBilling';
|
||||
import {IDocStorageManager} from 'app/server/lib/IDocStorageManager';
|
||||
@ -111,6 +111,7 @@ export class FlexServer implements GristServer {
|
||||
private _docWorker: DocWorker;
|
||||
private _hosts: Hosts;
|
||||
private _pluginManager: PluginManager;
|
||||
private _sessionStore: SessionStore;
|
||||
private _storageManager: IDocStorageManager;
|
||||
private _docWorkerMap: IDocWorkerMap;
|
||||
private _disabled: boolean = false;
|
||||
@ -525,8 +526,9 @@ export class FlexServer implements GristServer {
|
||||
if (this.usage) { this.usage.close(); }
|
||||
if (this.housekeeper) { await this.housekeeper.stop(); }
|
||||
await this._shutdown();
|
||||
// Do this last, DocWorkerMap is used during shutdown.
|
||||
// Do this after _shutdown, since DocWorkerMap is used during shutdown.
|
||||
if (this._docWorkerMap) { await this._docWorkerMap.close(); }
|
||||
if (this._sessionStore) { await this._sessionStore.close(); }
|
||||
}
|
||||
|
||||
public addDocApiForwarder() {
|
||||
@ -546,7 +548,7 @@ export class FlexServer implements GristServer {
|
||||
this.addOrg();
|
||||
|
||||
// Create the sessionStore and related objects.
|
||||
const {sessions, sessionMiddleware} = initGristSessions(this.instanceRoot, this);
|
||||
const {sessions, sessionMiddleware, sessionStore} = initGristSessions(this.instanceRoot, this);
|
||||
this.app.use(sessionMiddleware);
|
||||
|
||||
// Create an endpoint for making cookies during testing.
|
||||
@ -556,6 +558,7 @@ export class FlexServer implements GristServer {
|
||||
});
|
||||
|
||||
this.sessions = sessions;
|
||||
this._sessionStore = sessionStore;
|
||||
}
|
||||
|
||||
// Close connections and stop accepting new connections. Remove server from any lists
|
||||
@ -892,7 +895,7 @@ export class FlexServer implements GristServer {
|
||||
this.addSupportPaths(docAccessMiddleware);
|
||||
|
||||
if (!isSingleUserMode()) {
|
||||
addDocApiRoutes(this.app, docWorker, docManager, this.dbManager, this);
|
||||
addDocApiRoutes(this.app, docWorker, this._docWorkerMap, docManager, this.dbManager, this);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1200,6 +1203,9 @@ export class FlexServer implements GristServer {
|
||||
}
|
||||
this.info.push(['docWorkerId', this.worker.id]);
|
||||
|
||||
if (process.env.GRIST_WORKER_GROUP) {
|
||||
this.worker.group = process.env.GRIST_WORKER_GROUP;
|
||||
}
|
||||
} else {
|
||||
if (process.env.GRIST_ROUTER_URL) {
|
||||
await this.createWorkerUrl();
|
||||
|
@ -5,6 +5,7 @@ import {GristServer} from 'app/server/lib/GristServer';
|
||||
import {Sessions} from 'app/server/lib/Sessions';
|
||||
import {promisifyAll} from 'bluebird';
|
||||
import * as express from 'express';
|
||||
import assignIn = require('lodash/assignIn');
|
||||
import * as path from 'path';
|
||||
import * as shortUUID from "short-uuid";
|
||||
|
||||
@ -17,6 +18,7 @@ export const COOKIE_MAX_AGE = 90 * 24 * 60 * 60 * 1000; // 90 days in milliseco
|
||||
export interface SessionStore {
|
||||
getAsync(sid: string): Promise<any>;
|
||||
setAsync(sid: string, session: any): Promise<void>;
|
||||
close(): Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -50,17 +52,27 @@ function createSessionStoreFactory(sessionsDB: string): () => SessionStore {
|
||||
// Note that ./build excludes this module from the electron build.
|
||||
const RedisStore = require('connect-redis')(session);
|
||||
promisifyAll(RedisStore.prototype);
|
||||
return () => new RedisStore({
|
||||
return () => {
|
||||
const store = new RedisStore({
|
||||
url: process.env.REDIS_URL,
|
||||
});
|
||||
return assignIn(store, {
|
||||
async close() {
|
||||
// Doesn't actually close, just unrefs stream so node becomes close-able.
|
||||
store.client.unref();
|
||||
}});
|
||||
}
|
||||
} else {
|
||||
const SQLiteStore = require('@gristlabs/connect-sqlite3')(session);
|
||||
promisifyAll(SQLiteStore.prototype);
|
||||
return () => new SQLiteStore({
|
||||
return () => {
|
||||
const store = new SQLiteStore({
|
||||
dir: path.dirname(sessionsDB),
|
||||
db: path.basename(sessionsDB), // SQLiteStore no longer appends a .db suffix.
|
||||
table: 'sessions'
|
||||
});
|
||||
return assignIn(store, { async close() {}});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user