1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2024-10-27 20:34:20 +00:00

implement proper child cleanup

This commit is contained in:
Oliver Giles 2016-07-25 14:59:45 +03:00
parent 7b7de751e3
commit 2ea27d46ed
6 changed files with 68 additions and 42 deletions

View File

@ -22,6 +22,7 @@
#include "log.h" #include "log.h"
#include <sys/wait.h> #include <sys/wait.h>
#include <sys/signalfd.h>
#include <fstream> #include <fstream>
#include <zlib.h> #include <zlib.h>
@ -326,6 +327,19 @@ void Laminar::run() {
srv = new Server(*this, listen_rpc, listen_http); srv = new Server(*this, listen_rpc, listen_http);
// handle SIGCHLD
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGCHLD);
sigprocmask(SIG_BLOCK, &mask, NULL);
int sigchld = signalfd(-1, &mask, 0);
srv->addDescriptor(sigchld, [this](char* buf, size_t sz){
struct signalfd_siginfo* siginfo = (struct signalfd_siginfo*) buf;
KJ_ASSERT(siginfo->ssi_signo == SIGCHLD);
reapAdvance();
assignNewJobs();
});
srv->start(); srv->start();
} }
@ -474,25 +488,24 @@ kj::Promise<RunState> Laminar::waitForRun(const Run* run) {
bool Laminar::stepRun(std::shared_ptr<Run> run) { bool Laminar::stepRun(std::shared_ptr<Run> run) {
bool complete = run->step(); bool complete = run->step();
if(!complete) { if(!complete) {
srv->addProcess(run->fd, [=](char* b,size_t n){ srv->addDescriptor(run->fd, [=](char* b,size_t n){
std::string s(b,n); std::string s(b,n);
run->log += s; run->log += s;
for(LaminarClient* c : clients) { for(LaminarClient* c : clients) {
if(c->scope.wantsLog(run->name, run->build)) if(c->scope.wantsLog(run->name, run->build))
c->sendMessage(s); c->sendMessage(s);
} }
}, [this,run](){ reapAdvance();}); });
} }
return complete; return complete;
} }
// Reaps a zombie and steps the corresponding Run to its next state.
// Should be called on SIGCHLD
void Laminar::reapAdvance() { void Laminar::reapAdvance() {
int ret = 0; int ret = 0;
// TODO: If we pass WNOHANG here for better asynchronicity, how do pid_t pid;
// we re-schedule a poll to wait for finished child processes? while((pid = waitpid(-1, &ret, WNOHANG)) > 0) {
pid_t pid = waitpid(-1, &ret, 0);
// TODO: handle signalled child processes
if(pid > 0) {
LLOG(INFO, "Reaping", pid); LLOG(INFO, "Reaping", pid);
auto it = activeJobs.get<0>().find(pid); auto it = activeJobs.get<0>().find(pid);
std::shared_ptr<Run> run = *it; std::shared_ptr<Run> run = *it;
@ -505,7 +518,6 @@ void Laminar::reapAdvance() {
if(completed) if(completed)
run->complete(); run->complete();
} }
assignNewJobs();
} }
bool Laminar::nodeCanQueue(const Node& node, const Run& run) const { bool Laminar::nodeCanQueue(const Node& node, const Run& run) const {

View File

@ -36,7 +36,7 @@ class Json;
// It owns a Server to manage the HTTP/websocket and Cap'n Proto RPC // It owns a Server to manage the HTTP/websocket and Cap'n Proto RPC
// interfaces and communicates via the LaminarInterface methods and // interfaces and communicates via the LaminarInterface methods and
// the LaminarClient objects (see interface.h) // the LaminarClient objects (see interface.h)
class Laminar : public LaminarInterface { class Laminar final : public LaminarInterface {
public: public:
Laminar(); Laminar();
~Laminar(); ~Laminar();

View File

@ -21,8 +21,11 @@
#include <signal.h> #include <signal.h>
std::function<void()> sigHandler; static Laminar* laminar;
static void __sigHandler(int) { sigHandler(); }
static void laminar_quit(int) {
laminar->stop();
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
for(int i = 1; i < argc; ++i) { for(int i = 1; i < argc; ++i) {
@ -31,19 +34,14 @@ int main(int argc, char** argv) {
} }
} }
do { laminar = new Laminar;
Laminar laminar;
sigHandler = [&](){
LLOG(INFO, "Received SIGINT");
laminar.stop();
};
signal(SIGINT, &__sigHandler);
signal(SIGTERM, &__sigHandler);
laminar.run(); signal(SIGINT, &laminar_quit);
} while(false); signal(SIGTERM, &laminar_quit);
laminar->run();
LLOG(INFO, "end of main"); delete laminar;
LLOG(INFO, "Clean exit");
return 0; return 0;
} }

View File

@ -23,6 +23,7 @@
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#include <signal.h>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
namespace fs = boost::filesystem; namespace fs = boost::filesystem;
@ -68,7 +69,13 @@ bool Run::step() {
int pfd[2]; int pfd[2];
pipe(pfd); pipe(pfd);
pid_t pid = fork(); pid_t pid = fork();
if(pid == 0) { if(pid == 0) { // child
// reset signal mask (SIGCHLD blocked in Laminar::start)
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGCHLD);
sigprocmask(SIG_UNBLOCK, &mask, NULL);
close(pfd[0]); close(pfd[0]);
dup2(pfd[1], 1); dup2(pfd[1], 1);
dup2(pfd[1], 2); dup2(pfd[1], 2);

View File

@ -33,6 +33,10 @@
#include <sys/eventfd.h> #include <sys/eventfd.h>
// Size of buffer used to read from file descriptors. Should be
// a multiple of sizeof(struct signalfd_siginfo) == 128
#define PROC_IO_BUFSIZE 4096
// Configuration struct for the websocketpp template library. // Configuration struct for the websocketpp template library.
struct wsconfig : public websocketpp::config::core { struct wsconfig : public websocketpp::config::core {
// static const websocketpp::log::level elog_level = // static const websocketpp::log::level elog_level =
@ -371,22 +375,22 @@ void Server::start() {
// this eventfd is just to allow us to quit the server at some point // this eventfd is just to allow us to quit the server at some point
// in the future by adding this event to the async loop. I couldn't see // in the future by adding this event to the async loop. I couldn't see
// a simpler way... // a simpler way...
efd = eventfd(0,0); efd_quit = eventfd(0,0);
kj::Promise<void> quit = kj::evalLater([this](){ kj::Promise<void> quit = kj::evalLater([this](){
static uint64_t _; static uint64_t _;
auto wakeEvent = ioContext.lowLevelProvider->wrapInputFd(efd); auto wakeEvent = ioContext.lowLevelProvider->wrapInputFd(efd_quit);
return wakeEvent->read(&_, sizeof(uint64_t)).attach(std::move(wakeEvent)); return wakeEvent->read(&_, sizeof(uint64_t)).attach(std::move(wakeEvent));
}); });
quit.wait(ioContext.waitScope); quit.wait(ioContext.waitScope);
} }
void Server::stop() { void Server::stop() {
eventfd_write(efd, 1); eventfd_write(efd_quit, 1);
} }
void Server::addProcess(int fd, std::function<void(char*,size_t)> readCb, std::function<void()> cb) { void Server::addDescriptor(int fd, std::function<void(char*,size_t)> cb) {
auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd); auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd);
tasks.add(handleProcessOutput(event,readCb).attach(std::move(event)).then(std::move(cb))); tasks.add(handleFdRead(event, cb).attach(std::move(event)));
} }
void Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) { void Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
@ -416,14 +420,15 @@ void Server::acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener) {
); );
} }
// handles stdout/stderr from a child process by sending it to the provided // returns a promise which will read a chunk of data from the file descriptor
// callback function // wrapped by stream and invoke the provided callback with the read data.
kj::Promise<void> Server::handleProcessOutput(kj::AsyncInputStream* stream, std::function<void(char*,size_t)> readCb) { // Repeats until ::read returns <= 0
static char* buffer = new char[131072]; kj::Promise<void> Server::handleFdRead(kj::AsyncInputStream* stream, std::function<void(char*,size_t)> cb) {
return stream->tryRead(buffer, 1, sizeof(buffer)).then([this,stream,readCb](size_t sz) { static char* buffer = new char[PROC_IO_BUFSIZE];
readCb(buffer, sz); return stream->tryRead(buffer, 1, PROC_IO_BUFSIZE).then([this,stream,cb](size_t sz) {
if(sz > 0) { if(sz > 0) {
return handleProcessOutput(stream, readCb); cb(buffer, sz);
return handleFdRead(stream, cb);
} }
return kj::Promise<void>(kj::READY_NOW); return kj::Promise<void>(kj::READY_NOW);
}); });

View File

@ -26,8 +26,8 @@
struct LaminarInterface; struct LaminarInterface;
// This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces. // This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces
// It also manages the program's asynchronous event loop // and manages the program's asynchronous event loop
class Server final : public kj::TaskSet::ErrorHandler { class Server final : public kj::TaskSet::ErrorHandler {
public: public:
// Initializes the server with a LaminarInterface to handle requests from // Initializes the server with a LaminarInterface to handle requests from
@ -38,22 +38,26 @@ public:
~Server(); ~Server();
void start(); void start();
void stop(); void stop();
void addProcess(int fd, std::function<void(char*,size_t)> readCb, std::function<void()> cb);
// add a file descriptor to be monitored for output. The callback will be
// invoked with the read data
void addDescriptor(int fd, std::function<void(char*,size_t)> cb);
private: private:
void acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener); void acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
void acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener); void acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
kj::Promise<void> handleProcessOutput(kj::AsyncInputStream* stream, std::function<void(char*,size_t)> readCb); kj::Promise<void> handleFdRead(kj::AsyncInputStream* stream, std::function<void(char*,size_t)> cb);
void taskFailed(kj::Exception&& exception) override { void taskFailed(kj::Exception&& exception) override {
kj::throwFatalException(kj::mv(exception)); kj::throwFatalException(kj::mv(exception));
} }
private: private:
int efd;
capnp::Capability::Client rpcInterface;
struct WebsocketConnection; struct WebsocketConnection;
struct HttpImpl; struct HttpImpl;
int efd_quit;
capnp::Capability::Client rpcInterface;
HttpImpl* httpInterface; HttpImpl* httpInterface;
kj::AsyncIoContext ioContext; kj::AsyncIoContext ioContext;
kj::TaskSet tasks; kj::TaskSet tasks;