(core) tweak throttling to work for gvisor/runsc

Summary:
Grist has, up to now, used a throttling mechanism that allows a sandbox free rein until it starts using above some threshold percentage of a cpu for some time - at that point, we start sending STOP and CONT signals on a duty cycle, with longer and longer STOPped periods until cpu usage is at a threshold. The general idea is to do short jobs quickly, while throttling long jobs (thus unfortunately making them even longer) in order to continue doing other short jobs quickly.

The runsc sandbox is not a single process, there are in fact 5 per sandbox in our setup. Runsc can work with kvm or ptrace. Kvm is not available to us, so we use ptrace. With ptrace, there is one process that is the appropriate one to duty cycle, and another that needs to receive a signal in order to yield. This diff adds the necessary machinery.

This is a conservative change, where I stick with our existing throttling mechanism and adapt it to the new sandbox. It would be reasonable to consider switching throttling. There's a lot the OS allows. We can set a quota for how much cpu a process can use within a given period, for example. However the overall behavior with that would be quite different to what we have, so feels like this would need more discussion.

The implementation contains use of a linux utility `pgrep` since portability is not important (runsc is only available on linux) and there's no node api for enumerating children of a process.

The diff contains some tweaks to `buildtools/contain.sh` to streamline experimenting with Grist and runsc on a mac. It is important for throttling that node and the sandbox processes are in the same process name space, if docker is in between them then some extra machinery is needed (a proxy throttler and a way to communicate with it) which I chose not to implement.

Test Plan: added test; a lot of manual testing

Reviewers: dsagal

Reviewed By: dsagal

Differential Revision: https://phab.getgrist.com/D3113
This commit is contained in:
Paul Fitzpatrick 2021-11-04 16:44:59 -04:00
parent 10a4cbb6bd
commit db34e1a6d9
3 changed files with 415 additions and 41 deletions

View File

@ -1,13 +1,13 @@
/**
* JS controller for the pypy sandbox.
*/
import * as pidusage from '@gristlabs/pidusage';
import * as marshal from 'app/common/marshal';
import {ISandbox, ISandboxCreationOptions, ISandboxCreator} from 'app/server/lib/ISandbox';
import * as log from 'app/server/lib/log';
import {DirectProcessControl, ISandboxControl, NoProcessControl, ProcessInfo,
SubprocessControl} from 'app/server/lib/SandboxControl';
import * as sandboxUtil from 'app/server/lib/sandboxUtil';
import * as shutdown from 'app/server/lib/shutdown';
import {Throttle} from 'app/server/lib/Throttle';
import {ChildProcess, spawn} from 'child_process';
import * as path from 'path';
import {Stream, Writable} from 'stream';
@ -58,6 +58,16 @@ interface ISandboxOptions {
// may want to pass arguments to python directly.
}
/**
* We interact with sandboxes as a separate child process. Data engine work is done
* across standard input and output streams from and to this process. We also monitor
* and control resource utilization via a distinct control interface.
*/
interface SandboxProcess {
child: ChildProcess;
control: ISandboxControl;
}
type ResolveRejectPair = [(value?: any) => void, (reason?: unknown) => void];
// Type for basic message identifiers, available as constants in sandboxUtil.
@ -70,6 +80,7 @@ const recordBuffersRoot = process.env.RECORD_SANDBOX_BUFFERS_DIR;
export class NSandbox implements ISandbox {
public readonly childProc: ChildProcess;
private _control: ISandboxControl;
private _logTimes: boolean;
private _exportedFunctions: {[name: string]: SandboxMethod};
private _marshaller = new marshal.Marshaller({stringToBuffer: false, version: 2});
@ -84,8 +95,6 @@ export class NSandbox implements ISandbox {
private _streamToSandbox: Writable;
private _streamFromSandbox: Stream;
private _throttle: Throttle | undefined;
// Create a unique subdirectory for each sandbox process so they can be replayed separately
private _recordBuffersDir = recordBuffersRoot ? path.resolve(recordBuffersRoot, new Date().toISOString()) : null;
@ -106,7 +115,9 @@ export class NSandbox implements ISandbox {
this._logTimes = Boolean(options.logTimes || options.logCalls);
this._exportedFunctions = options.exports || {};
this.childProc = spawner(options);
const sandboxProcess = spawner(options);
this._control = sandboxProcess.control;
this.childProc = sandboxProcess.child;
this._logMeta = {sandboxPid: this.childProc.pid, ...options.logMeta};
@ -141,13 +152,6 @@ export class NSandbox implements ISandbox {
// On shutdown, shutdown the child process cleanly, and wait for it to exit.
shutdown.addCleanupHandler(this, this.shutdown);
if (process.env.GRIST_THROTTLE_CPU) {
this._throttle = new Throttle({
pid: this.childProc.pid,
logMeta: this._logMeta,
});
}
if (this._recordBuffersDir) {
log.rawDebug(`Recording sandbox buffers in ${this._recordBuffersDir}`, this._logMeta);
fs.mkdirSync(this._recordBuffersDir, {recursive: true});
@ -165,9 +169,9 @@ export class NSandbox implements ISandbox {
// The signal ensures the sandbox process exits even if it's hanging in an infinite loop or
// long computation. It doesn't get a chance to clean up, but since it is sandboxed, there is
// nothing it needs to clean up anyway.
const timeoutID = setTimeout(() => {
const timeoutID = setTimeout(async () => {
log.rawWarn("Sandbox sending SIGKILL", this._logMeta);
this.childProc.kill('SIGKILL');
await this._control.kill();
}, 1000);
const result = await new Promise((resolve, reject) => {
@ -176,7 +180,7 @@ export class NSandbox implements ISandbox {
this.childProc.on('close', resolve);
this.childProc.on('exit', resolve);
this._close();
});
}).finally(() => this._control.close());
// In the normal case, the kill timer is pending when the process exits, and we can clear it. If
// the process got killed, the timer is invalid, and clearTimeout() does nothing.
@ -200,7 +204,7 @@ export class NSandbox implements ISandbox {
* Returns the RSS (resident set size) of the sandbox process, in bytes.
*/
public async reportMemoryUsage() {
const memory = (await pidusage(this.childProc.pid)).memory;
const {memory} = await this._control.getUsage();
log.rawDebug('Sandbox memory', {memory, ...this._logMeta});
}
@ -218,7 +222,7 @@ export class NSandbox implements ISandbox {
private _close() {
if (this._throttle) { this._throttle.stop(); }
this._control.prepareToClose();
if (!this._isWriteClosed) {
// Close the pipe to the sandbox, which should cause the sandbox to exit cleanly.
this._streamToSandbox.end();
@ -273,7 +277,7 @@ export class NSandbox implements ISandbox {
* Process the closing of the pipe by the sandboxed process.
*/
private _onSandboxClose() {
if (this._throttle) { this._throttle.stop(); }
this._control.prepareToClose();
this._isReadClosed = true;
// Clear out all reads pending on PipeFromSandbox, rejecting them with the given error.
const err = new sandboxUtil.SandboxError("PipeFromSandbox is closed");
@ -406,7 +410,7 @@ export class NSandboxCreator implements ISandboxCreator {
}
// A function that takes sandbox options and starts a sandbox process.
type SpawnFn = (options: ISandboxOptions) => ChildProcess;
type SpawnFn = (options: ISandboxOptions) => SandboxProcess;
/**
* Helper function to run a nacl sandbox. It takes care of most arguments, similarly to
@ -417,7 +421,7 @@ type SpawnFn = (options: ISandboxOptions) => ChildProcess;
* This is quite old code, with attention to Windows support that is no longer tested.
* I've done my best to avoid changing behavior by not touching it too much.
*/
function pynbox(options: ISandboxOptions): ChildProcess {
function pynbox(options: ISandboxOptions): SandboxProcess {
const {command, args: pythonArgs, unsilenceLog, importDir} = options;
if (command) {
throw new Error("NaCl can only run the specific python2.7 package built for it");
@ -449,7 +453,7 @@ function pynbox(options: ISandboxOptions): ChildProcess {
const noLog = unsilenceLog ? [] :
(process.env.OS === 'Windows_NT' ? ['-l', 'NUL'] : ['-l', '/dev/null']);
return spawn('sandbox/nacl/bin/sel_ldr', [
const child = spawn('sandbox/nacl/bin/sel_ldr', [
'-B', './sandbox/nacl/lib/irt_core.nexe', '-m', './sandbox/nacl/root:/:ro',
...noLog,
...wrapperArgs.get(),
@ -457,6 +461,7 @@ function pynbox(options: ISandboxOptions): ChildProcess {
'--library-path', '/slib', '/python/bin/python2.7.nexe',
...pythonArgs
], spawnOptions);
return {child, control: new DirectProcessControl(child, options.logMeta)};
}
/**
@ -466,7 +471,7 @@ function pynbox(options: ISandboxOptions): ChildProcess {
* by `./build python3`. Using system python works too, if all dependencies have
* been installed globally.
*/
function unsandboxed(options: ISandboxOptions): ChildProcess {
function unsandboxed(options: ISandboxOptions): SandboxProcess {
const {args: pythonArgs, importDir} = options;
const paths = getAbsolutePaths(options);
if (options.useGristEntrypoint) {
@ -485,8 +490,9 @@ function unsandboxed(options: ISandboxOptions): ChildProcess {
spawnOptions.stdio.push('pipe', 'pipe');
}
const command = findPython(options.command);
return spawn(command, pythonArgs,
{cwd: path.join(process.cwd(), 'sandbox'), ...spawnOptions});
const child = spawn(command, pythonArgs,
{cwd: path.join(process.cwd(), 'sandbox'), ...spawnOptions});
return {child, control: new DirectProcessControl(child, options.logMeta)};
}
/**
@ -496,7 +502,7 @@ function unsandboxed(options: ISandboxOptions): ChildProcess {
* directly) or `wrap_in_docker.sh` (to use runsc within a container).
* Be sure to read setup instructions in that directory.
*/
function gvisor(options: ISandboxOptions): ChildProcess {
function gvisor(options: ISandboxOptions): SandboxProcess {
const {command, args: pythonArgs} = options;
if (!command) { throw new Error("gvisor operation requires GRIST_SANDBOX"); }
if (!options.minimalPipeMode) {
@ -530,13 +536,36 @@ function gvisor(options: ISandboxOptions): ChildProcess {
if (options.useGristEntrypoint && pythonVersion === '3' && !paths.importDir &&
process.env.GRIST_CHECKPOINT) {
if (process.env.GRIST_CHECKPOINT_MAKE) {
return spawn(command, [...wrapperArgs.get(), '--checkpoint', process.env.GRIST_CHECKPOINT,
`python${pythonVersion}`, '--', ...pythonArgs]);
const child =
spawn(command, [...wrapperArgs.get(), '--checkpoint', process.env.GRIST_CHECKPOINT,
`python${pythonVersion}`, '--', ...pythonArgs]);
// We don't want process control for this.
return {child, control: new NoProcessControl(child)};
}
wrapperArgs.push('--restore');
wrapperArgs.push(process.env.GRIST_CHECKPOINT);
}
return spawn(command, [...wrapperArgs.get(), `python${pythonVersion}`, '--', ...pythonArgs]);
const child = spawn(command, [...wrapperArgs.get(), `python${pythonVersion}`, '--', ...pythonArgs]);
// For gvisor under ptrace, main work is done by a traced process identifiable as
// being labeled "exe" and having a parent also labeled "exe".
const recognizeTracedProcess = (p: ProcessInfo) => {
return p.label.includes('exe') && p.parentLabel.includes('exe');
};
// The traced process is managed by a regular process called "runsc-sandbox"
const recognizeSandboxProcess = (p: ProcessInfo) => {
return p.label.includes('runsc-sandbox');
};
// If docker is in use, this process control will log a warning message and do nothing.
return {child, control: new SubprocessControl({
pid: child.pid,
recognizers: {
sandbox: recognizeSandboxProcess, // this process we start and stop
memory: recognizeTracedProcess, // measure memory for the ptraced process
cpu: recognizeTracedProcess, // measure cpu for the ptraced process
traced: recognizeTracedProcess, // the ptraced process
},
logMeta: options.logMeta
})};
}
/**
@ -545,7 +574,7 @@ function gvisor(options: ISandboxOptions): ChildProcess {
* `python` can be run and all Grist dependencies are installed. See
* `sandbox/docker` for more.
*/
function docker(options: ISandboxOptions): ChildProcess {
function docker(options: ISandboxOptions): SandboxProcess {
const {args: pythonArgs, command} = options;
if (options.useGristEntrypoint) {
pythonArgs.unshift('grist/main.py');
@ -568,13 +597,15 @@ function docker(options: ISandboxOptions): ChildProcess {
commandParts.unshift('faketime', '-f', FAKETIME);
}
const dockerPath = which.sync('docker');
return spawn(dockerPath, [
const child = spawn(dockerPath, [
'run', '--rm', '-i', '--network', 'none',
...wrapperArgs.get(),
command || 'grist-docker-sandbox', // this is the docker image to use
...commandParts,
...pythonArgs,
]);
log.rawDebug("cannot do process control via docker yet", {...options.logMeta});
return {child, control: new NoProcessControl(child)};
}
/**
@ -585,7 +616,7 @@ function docker(options: ISandboxOptions): ChildProcess {
* the infrastructure this command is a thin wrapper around, and there's
* no obvious native sandboxing alternative.
*/
function macSandboxExec(options: ISandboxOptions): ChildProcess {
function macSandboxExec(options: ISandboxOptions): SandboxProcess {
const {args: pythonArgs} = options;
if (!options.minimalPipeMode) {
throw new Error("macSandboxExec flavor only supports 3-pipe operation");
@ -661,8 +692,9 @@ function macSandboxExec(options: ISandboxOptions): ChildProcess {
}
const profileString = profile.join('\n');
return spawn('/usr/bin/sandbox-exec', ['-p', profileString, command, ...pythonArgs],
{cwd, env});
const child = spawn('/usr/bin/sandbox-exec', ['-p', profileString, command, ...pythonArgs],
{cwd, env});
return {child, control: new DirectProcessControl(child, options.logMeta)};
}
/**

View File

@ -0,0 +1,279 @@
import { delay } from 'app/common/delay';
import * as log from 'app/server/lib/log';
import { Throttle } from 'app/server/lib/Throttle';
import * as pidusage from '@gristlabs/pidusage';
import * as childProcess from 'child_process';
import * as util from 'util';
const execFile = util.promisify(childProcess.execFile);
/**
* Sandbox usage information that we log periodically (currently just memory).
*/
export interface ISandboxUsage {
memory: number;
}
/**
* Control interface for a sandbox. Looks like it doesn't do much, but there may be
* background activities (specifically, throttling).
*/
export interface ISandboxControl {
getUsage(): Promise<ISandboxUsage>; // Poll usage information for the sandbox.
prepareToClose(): void; // Start shutting down (but don't wait).
close(): Promise<void>; // Wait for shut down.
kill(): Promise<void>; // Send kill signals to any related processes.
}
/**
* Control a single process directly. A thin wrapper around the Throttle class.
*/
export class DirectProcessControl implements ISandboxControl {
private _throttle?: Throttle;
constructor(private _process: childProcess.ChildProcess, logMeta?: log.ILogMeta) {
if (process.env.GRIST_THROTTLE_CPU) {
this._throttle = new Throttle({
pid: _process.pid,
logMeta: {...logMeta, pid: _process.pid},
});
}
}
public async close() {
this.prepareToClose();
}
public prepareToClose() {
this._throttle?.stop();
this._throttle = undefined;
}
public async kill() {
this._process.kill('SIGKILL');
}
public async getUsage() {
const memory = (await pidusage(this._process.pid)).memory;
return { memory };
}
}
/**
* Dummy control interface that does no monitoring or throttling.
*/
export class NoProcessControl implements ISandboxControl {
constructor(private _process: childProcess.ChildProcess) {
}
public async close() {
}
public prepareToClose() {
}
public async kill() {
this._process.kill('SIGKILL');
}
public async getUsage() {
return { memory: Infinity };
}
}
/**
* Control interface when multiple processes are involved, playing different roles.
* This is entirely conceived with gvisor's runsc in mind.
*
* As a process is starting up, we scan it and its children (recursively) for processes
* that match certain "recognizers". For gvisor runsc, we'll be picking out a sandbox
* process from its peers handling filesystem access, and a ptraced process that is
* effectively the data engine.
*
* This setup is very much developed by inspection, and could have weaknesses.
* TODO: check if more processes need to be included in memory counting.
* TODO: check if there could be multiple ptraced processes to deal with if user were
* to create extra processes within sandbox (which we don't yet attempt to prevent).
*
* The gvisor container could be configured with operating system help to limit
* CPU usage in various ways, but I don't yet see a way to get something analogous
* to Throttle's operation.
*/
export class SubprocessControl implements ISandboxControl {
private _throttle?: Throttle;
private _monitoredProcess: Promise<ProcessInfo|null>;
private _active: boolean;
constructor(private _options: {
pid: number, // pid of process opened by Grist
recognizers: {
sandbox: (p: ProcessInfo) => boolean, // we will stop/start this process for throttling
memory?: (p: ProcessInfo) => boolean, // read memory from this process (default: sandbox)
cpu?: (p: ProcessInfo) => boolean, // read cpu from this process (default: sandbox)
traced?: (p: ProcessInfo) => boolean, // stop this as well for throttling (default: none)
},
logMeta?: log.ILogMeta,
}) {
this._active = true;
this._monitoredProcess = this._scan().catch(e => {
log.rawDebug(`Subprocess control failure: ${e}`, this._options.logMeta || {});
return null;
});
}
public async close() {
this.prepareToClose();
await this._monitoredProcess.catch(() => null);
}
public prepareToClose() {
this._active = false;
this._throttle?.stop();
this._throttle = undefined;
}
public async kill() {
for (const proc of await this._getAllProcesses()) {
try {
process.kill(proc.pid, 'SIGKILL');
} catch (e) {
// Don't worry if process is already killed.
if (e.code !== 'ESRCH') { throw e; }
}
}
}
public async getUsage() {
try {
const monitoredProcess = await this._monitoredProcess;
if (!monitoredProcess) { return { memory: Infinity }; }
const pid = monitoredProcess.pid;
const memory = (await pidusage(pid)).memory;
return { memory };
} catch (e) {
return { memory: Infinity };
}
}
/**
* Look for the desired children. Should be run once on process startup.
* This method will check all children once per second until if finds the
* desired ones or we are closed.
*
* It returns information about the child to be monitored by getUsage().
* It also has a side effect of kicking off throttling.
*/
private async _scan(): Promise<ProcessInfo> {
while (this._active) {
const processes = await this._getAllProcesses();
const unrecognizedProcess = undefined as ProcessInfo|undefined;
const recognizedProcesses = {
sandbox: unrecognizedProcess,
memory: unrecognizedProcess,
cpu: unrecognizedProcess,
traced: unrecognizedProcess,
};
let missing = false;
for (const key of Object.keys(recognizedProcesses) as Array<keyof typeof recognizedProcesses>) {
const recognizer = this._options.recognizers[key];
if (!recognizer) { continue; }
for (const proc of processes) {
if (proc.label.includes('docker')) { throw new Error('docker barrier found'); }
if (recognizer(proc)) {
recognizedProcesses[key] = proc;
continue;
}
}
if (!recognizedProcesses[key]) { missing = true; }
}
if (!missing) {
this._configure(recognizedProcesses);
return recognizedProcesses.memory || recognizedProcesses.sandbox!; // sandbox recognizer is mandatory
}
await delay(1000);
}
throw new Error('not found');
}
/**
* Having found the desired children, we configure ourselves here, kicking off
* throttling if needed.
*/
private _configure(processes: { sandbox?: ProcessInfo, cpu?: ProcessInfo,
memory?: ProcessInfo, traced?: ProcessInfo }) {
if (!processes.sandbox) { return; }
if (process.env.GRIST_THROTTLE_CPU) {
this._throttle = new Throttle({
pid: processes.sandbox.pid,
readPid: processes.cpu?.pid,
tracedPid: processes.traced?.pid,
logMeta: {...this._options.logMeta,
pid: processes.sandbox.pid,
otherPids: [processes.cpu?.pid,
processes.memory?.pid,
processes.traced?.pid]},
});
}
}
/**
* Return the root process and all its (nested) children.
*/
private _getAllProcesses(): Promise<ProcessInfo[]> {
const rootProcess = {pid: this._options.pid, label: 'root', parentLabel: ''};
return this._addChildren([rootProcess]);
}
/**
* Take a list of processes, and add children of all those processes,
* recursively.
*/
private async _addChildren(processes: ProcessInfo[]): Promise<ProcessInfo[]> {
const nestedProcesses = await Promise.all(processes.map(async proc => {
const children = await this._getChildren(proc.pid, proc.label);
return [proc, ...await this._addChildren(children)];
}));
return ([] as ProcessInfo[]).concat(...nestedProcesses);
}
/**
* Figure out the direct children of a parent process.
*/
private async _getChildren(pid: number, parentLabel: string): Promise<ProcessInfo[]> {
// Use "pgrep" to find children of a process, in the absence of any better way.
// This only needs to happen a few times as sandbox is starting up, so doesn't need
// to be super-optimized.
// This currently is only good for Linux. Mechanically, it will run on Macs too,
// but process naming is slightly different. But this class is currently only useful
// for gvisor's runsc, which runs on Linux only.
const cmd =
execFile('pgrep', ['--list-full', '--parent', String(pid)])
.catch(() => execFile('pgrep', ['-l', '-P', String(pid)])) // mac version of pgrep
.catch(() => ({ stdout: '' }));
const result = (await cmd).stdout;
const parts = result
.split('\n')
.map(line => line.trim())
.map(line => line.split(' ', 2))
.map(part => {
return {
pid: parseInt(part[0], 10) || 0,
label: part[1] || '',
parentLabel,
};
});
return parts.filter(part => part.pid !== 0);
}
}
/**
* The information we need about processes is their pid, some kind of label (whatever
* pgrep reports, which is a version of their command line), and the label of the process's
* parent (blank if it has none).
*/
export interface ProcessInfo {
pid: number;
label: string;
parentLabel: string;
}

View File

@ -39,6 +39,8 @@ export interface ThrottleTiming {
// per unit time.
maxThrottle: number; // maximum ratio of negative duty cycle phases to
// positive.
traceNudgeOffset: number; // milliseconds to wait before sending a second signal
// to a traced process.
}
/**
@ -52,6 +54,7 @@ const defaultThrottleTiming: ThrottleTiming = {
minimumLogPeriodMs: 10000,
targetRate: 0.25,
maxThrottle: 10,
traceNudgeOffset: 5, // unlikely to be honored very precisely, but doesn't need to be.
};
/**
@ -71,6 +74,7 @@ export class Throttle {
private _timing: ThrottleTiming; // overall timing parameters
private _meteringInterval: NodeJS.Timeout | undefined; // timer for cpu measurements
private _dutyCycleTimeout: NodeJS.Timeout | undefined; // driver for throttle duty cycle
private _traceNudgeTimeout: NodeJS.Timeout | undefined; // schedule a nudge to a traced process
private _throttleFactor: number = 0; // relative length of paused phase
private _sample: MeterSample | undefined; // latest measurement.
private _anchor: MeterSample | undefined; // sample from past for averaging
@ -78,12 +82,48 @@ export class Throttle {
private _lastLogTime: number | undefined; // time of last throttle log message
private _offDuration: number = 0; // cumulative time spent paused
private _stopped: boolean = false; // set when stop has been called
private _active: boolean = true; // set when we are not trying to pause process
/**
* Start monitoring the given process and throttle as needed.
* If readPid is set, CPU usage will be read for that process.
* If tracedPid is set, then that process will be sent a STOP signal
* whenever the main process is sent a STOP, and then another STOP
* signal will be sent again shortly after.
*
* The tracedPid wrinkle is to deal with gvisor on a ptrace platform.
* From `man ptrace`:
*
* "While being traced, the tracee will stop each time a signal is
* delivered, even if the signal is being ignored. (An exception is
* SIGKILL, which has its usual effect.) The tracer will be
* notified at its next call to waitpid(2) (or one of the related
* "wait" system calls); that call will return a status value
* containing information that indicates the cause of the stop in
* the tracee. While the tracee is stopped, the tracer can use
* various ptrace requests to inspect and modify the tracee. The
* tracer then causes the tracee to continue, optionally ignoring
* the delivered signal (or even delivering a different signal
* instead)."
*
* So what sending a STOP to a process being traced by gvisor will
* do is not obvious. In practice it appears to have no effect
* (other than presumably giving gvisor a change to examine it).
* So for gvisor, we send a STOP to the tracing process, and a STOP
* to the tracee, and then a little later a STOP to the tracee again
* (since there's no particular guarantee about order of signal
* delivery). This isn't particularly elegant, but in tests, this
* seems to do the job, while sending STOP to any one process does
* not.
*
* Alternatively, gvisor runsc does have "pause" and "resume"
* commands that could be looked into more.
*
*/
constructor(private readonly _options: {
pid: number,
pid: number, // main pid to stop/continue
readPid?: number, // pid to read cpu usage of, if different to main
tracedPid?: number, // pid of a traced process to signal
logMeta: log.ILogMeta,
timing?: ThrottleTiming
}) {
@ -97,6 +137,7 @@ export class Throttle {
public stop() {
this._stopped = true;
this._stopMetering();
this._stopTraceNudge();
this._stopThrottling();
}
@ -114,10 +155,10 @@ export class Throttle {
// Measure cpu usage to date.
let cpuDuration: number;
try {
cpuDuration = (await pidusage(this._options.pid)).ctime;
cpuDuration = (await pidusage(this._options.readPid || this._options.pid)).ctime;
} catch (e) {
// process may have disappeared.
log.rawDebug(`Throttle measurement error: ${e}`, this._options.logMeta);
this._log(`Throttle measurement error: ${e}`, this._options.logMeta);
return;
}
const now = Date.now();
@ -184,10 +225,10 @@ export class Throttle {
if (!this._lastLogTime || now - this._lastLogTime > this._timing.minimumLogPeriodMs) {
this._lastLogTime = now;
log.rawDebug('throttle', {...this._options.logMeta,
throttle: Math.round(this._throttleFactor),
throttledRate: Math.round(rate * 100),
rate: Math.round(rateWithoutThrottling * 100)});
this._log('throttle', {...this._options.logMeta,
throttle: Math.round(this._throttleFactor),
throttledRate: Math.round(rate * 100),
rate: Math.round(rateWithoutThrottling * 100)});
}
}
@ -210,11 +251,22 @@ export class Throttle {
* Send CONTinue or STOP signal to process.
*/
private _letProcessRun(on: boolean) {
this._active = on;
try {
process.kill(this._options.pid, on ? 'SIGCONT' : 'SIGSTOP');
const tracedPid = this._options.tracedPid;
if (tracedPid && !on) {
process.kill(tracedPid, 'SIGSTOP');
if (this._timing.traceNudgeOffset > 0) {
this._stopTraceNudge();
this._traceNudgeTimeout = setTimeout(() => {
if (!this._active) { process.kill(tracedPid, 'SIGSTOP'); }
}, this._timing.traceNudgeOffset);
}
}
} catch (e) {
// process may have disappeared
log.rawDebug(`Throttle error: ${e}`, this._options.logMeta);
this._log(`Throttle error: ${e}`, this._options.logMeta);
}
}
@ -239,6 +291,13 @@ export class Throttle {
}
}
private _stopTraceNudge() {
if (this._traceNudgeTimeout) {
clearTimeout(this._traceNudgeTimeout);
this._traceNudgeTimeout = undefined;
}
}
/**
* Make sure duty cycle is stopped and process is left in running state.
*/
@ -249,4 +308,8 @@ export class Throttle {
this._letProcessRun(true);
}
}
private _log(msg: string, meta: log.ILogMeta) {
log.rawDebug(msg, meta);
}
}