From 2ea27d46ed8fa7942616e8310fd9c19402172907 Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Mon, 25 Jul 2016 14:59:45 +0300 Subject: [PATCH] implement proper child cleanup --- src/laminar.cpp | 28 ++++++++++++++++++++-------- src/laminar.h | 2 +- src/main.cpp | 26 ++++++++++++-------------- src/run.cpp | 9 ++++++++- src/server.cpp | 29 +++++++++++++++++------------ src/server.h | 16 ++++++++++------ 6 files changed, 68 insertions(+), 42 deletions(-) diff --git a/src/laminar.cpp b/src/laminar.cpp index 5aef5a4..5e979df 100644 --- a/src/laminar.cpp +++ b/src/laminar.cpp @@ -22,6 +22,7 @@ #include "log.h" #include +#include #include #include @@ -326,6 +327,19 @@ void Laminar::run() { srv = new Server(*this, listen_rpc, listen_http); + // handle SIGCHLD + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGCHLD); + sigprocmask(SIG_BLOCK, &mask, NULL); + int sigchld = signalfd(-1, &mask, 0); + srv->addDescriptor(sigchld, [this](char* buf, size_t sz){ + struct signalfd_siginfo* siginfo = (struct signalfd_siginfo*) buf; + KJ_ASSERT(siginfo->ssi_signo == SIGCHLD); + reapAdvance(); + assignNewJobs(); + }); + srv->start(); } @@ -474,25 +488,24 @@ kj::Promise Laminar::waitForRun(const Run* run) { bool Laminar::stepRun(std::shared_ptr run) { bool complete = run->step(); if(!complete) { - srv->addProcess(run->fd, [=](char* b,size_t n){ + srv->addDescriptor(run->fd, [=](char* b,size_t n){ std::string s(b,n); run->log += s; for(LaminarClient* c : clients) { if(c->scope.wantsLog(run->name, run->build)) c->sendMessage(s); } - }, [this,run](){ reapAdvance();}); + }); } return complete; } +// Reaps a zombie and steps the corresponding Run to its next state. +// Should be called on SIGCHLD void Laminar::reapAdvance() { int ret = 0; - // TODO: If we pass WNOHANG here for better asynchronicity, how do - // we re-schedule a poll to wait for finished child processes? - pid_t pid = waitpid(-1, &ret, 0); - // TODO: handle signalled child processes - if(pid > 0) { + pid_t pid; + while((pid = waitpid(-1, &ret, WNOHANG)) > 0) { LLOG(INFO, "Reaping", pid); auto it = activeJobs.get<0>().find(pid); std::shared_ptr run = *it; @@ -505,7 +518,6 @@ void Laminar::reapAdvance() { if(completed) run->complete(); } - assignNewJobs(); } bool Laminar::nodeCanQueue(const Node& node, const Run& run) const { diff --git a/src/laminar.h b/src/laminar.h index 9546766..5d34161 100644 --- a/src/laminar.h +++ b/src/laminar.h @@ -36,7 +36,7 @@ class Json; // It owns a Server to manage the HTTP/websocket and Cap'n Proto RPC // interfaces and communicates via the LaminarInterface methods and // the LaminarClient objects (see interface.h) -class Laminar : public LaminarInterface { +class Laminar final : public LaminarInterface { public: Laminar(); ~Laminar(); diff --git a/src/main.cpp b/src/main.cpp index 3968ead..6eb95e5 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -21,8 +21,11 @@ #include -std::function sigHandler; -static void __sigHandler(int) { sigHandler(); } +static Laminar* laminar; + +static void laminar_quit(int) { + laminar->stop(); +} int main(int argc, char** argv) { for(int i = 1; i < argc; ++i) { @@ -31,19 +34,14 @@ int main(int argc, char** argv) { } } - do { - Laminar laminar; - sigHandler = [&](){ - LLOG(INFO, "Received SIGINT"); - laminar.stop(); - }; - signal(SIGINT, &__sigHandler); - signal(SIGTERM, &__sigHandler); + laminar = new Laminar; - laminar.run(); - } while(false); + signal(SIGINT, &laminar_quit); + signal(SIGTERM, &laminar_quit); + laminar->run(); - LLOG(INFO, "end of main"); + delete laminar; + + LLOG(INFO, "Clean exit"); return 0; } - diff --git a/src/run.cpp b/src/run.cpp index 6348361..8b0d175 100644 --- a/src/run.cpp +++ b/src/run.cpp @@ -23,6 +23,7 @@ #include #include +#include #include namespace fs = boost::filesystem; @@ -68,7 +69,13 @@ bool Run::step() { int pfd[2]; pipe(pfd); pid_t pid = fork(); - if(pid == 0) { + if(pid == 0) { // child + // reset signal mask (SIGCHLD blocked in Laminar::start) + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGCHLD); + sigprocmask(SIG_UNBLOCK, &mask, NULL); + close(pfd[0]); dup2(pfd[1], 1); dup2(pfd[1], 2); diff --git a/src/server.cpp b/src/server.cpp index a8512b7..aa57ef1 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -33,6 +33,10 @@ #include +// Size of buffer used to read from file descriptors. Should be +// a multiple of sizeof(struct signalfd_siginfo) == 128 +#define PROC_IO_BUFSIZE 4096 + // Configuration struct for the websocketpp template library. struct wsconfig : public websocketpp::config::core { // static const websocketpp::log::level elog_level = @@ -371,22 +375,22 @@ void Server::start() { // this eventfd is just to allow us to quit the server at some point // in the future by adding this event to the async loop. I couldn't see // a simpler way... - efd = eventfd(0,0); + efd_quit = eventfd(0,0); kj::Promise quit = kj::evalLater([this](){ static uint64_t _; - auto wakeEvent = ioContext.lowLevelProvider->wrapInputFd(efd); + auto wakeEvent = ioContext.lowLevelProvider->wrapInputFd(efd_quit); return wakeEvent->read(&_, sizeof(uint64_t)).attach(std::move(wakeEvent)); }); quit.wait(ioContext.waitScope); } void Server::stop() { - eventfd_write(efd, 1); + eventfd_write(efd_quit, 1); } -void Server::addProcess(int fd, std::function readCb, std::function cb) { +void Server::addDescriptor(int fd, std::function cb) { auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd); - tasks.add(handleProcessOutput(event,readCb).attach(std::move(event)).then(std::move(cb))); + tasks.add(handleFdRead(event, cb).attach(std::move(event))); } void Server::acceptHttpClient(kj::Own&& listener) { @@ -416,14 +420,15 @@ void Server::acceptRpcClient(kj::Own&& listener) { ); } -// handles stdout/stderr from a child process by sending it to the provided -// callback function -kj::Promise Server::handleProcessOutput(kj::AsyncInputStream* stream, std::function readCb) { - static char* buffer = new char[131072]; - return stream->tryRead(buffer, 1, sizeof(buffer)).then([this,stream,readCb](size_t sz) { - readCb(buffer, sz); +// returns a promise which will read a chunk of data from the file descriptor +// wrapped by stream and invoke the provided callback with the read data. +// Repeats until ::read returns <= 0 +kj::Promise Server::handleFdRead(kj::AsyncInputStream* stream, std::function cb) { + static char* buffer = new char[PROC_IO_BUFSIZE]; + return stream->tryRead(buffer, 1, PROC_IO_BUFSIZE).then([this,stream,cb](size_t sz) { if(sz > 0) { - return handleProcessOutput(stream, readCb); + cb(buffer, sz); + return handleFdRead(stream, cb); } return kj::Promise(kj::READY_NOW); }); diff --git a/src/server.h b/src/server.h index d30dcca..c675b98 100644 --- a/src/server.h +++ b/src/server.h @@ -26,8 +26,8 @@ struct LaminarInterface; -// This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces. -// It also manages the program's asynchronous event loop +// This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces +// and manages the program's asynchronous event loop class Server final : public kj::TaskSet::ErrorHandler { public: // Initializes the server with a LaminarInterface to handle requests from @@ -38,22 +38,26 @@ public: ~Server(); void start(); void stop(); - void addProcess(int fd, std::function readCb, std::function cb); + + // add a file descriptor to be monitored for output. The callback will be + // invoked with the read data + void addDescriptor(int fd, std::function cb); private: void acceptHttpClient(kj::Own&& listener); void acceptRpcClient(kj::Own&& listener); - kj::Promise handleProcessOutput(kj::AsyncInputStream* stream, std::function readCb); + kj::Promise handleFdRead(kj::AsyncInputStream* stream, std::function cb); void taskFailed(kj::Exception&& exception) override { kj::throwFatalException(kj::mv(exception)); } private: - int efd; - capnp::Capability::Client rpcInterface; struct WebsocketConnection; struct HttpImpl; + + int efd_quit; + capnp::Capability::Client rpcInterface; HttpImpl* httpInterface; kj::AsyncIoContext ioContext; kj::TaskSet tasks;