use kj's onChildExit mechanism

This reduces code and allows for more idiosyncratic use of Promises.
Requires latest capnproto git.

Part of #49 refactor
pull/70/head
Oliver Giles 6 years ago
parent 4ffc22c657
commit a81492e5bc

@ -19,10 +19,10 @@ export PATH=/opt/rh/devtoolset-7/root/usr/bin:\$PATH
mkdir /build
cd /build
wget -O capnproto.tar.gz https://github.com/capnproto/capnproto/archive/06a7136708955d91f8ddc1fa3d54e620eacba13e.tar.gz
wget -O capnproto.tar.gz https://github.com/capnproto/capnproto/archive/df67f26862c011c6efb31a28fb0d2a2ca1b94ac8.tar.gz
wget -O rapidjson.tar.gz https://github.com/miloyip/rapidjson/archive/v1.1.0.tar.gz
md5sum -c <<EOF
a24b4d6e671d97c8a2aacd0dd4677726 capnproto.tar.gz
80c28dc26842b84dcbe8d930a97c70cf capnproto.tar.gz
badd12c511e081fec6c89c43a7027bce rapidjson.tar.gz
EOF

@ -17,10 +17,10 @@ docker run --rm -i -v $SOURCE_DIR:/laminar:ro -v $OUTPUT_DIR:/output $DOCKER_TAG
mkdir /build
cd /build
wget -O capnproto.tar.gz https://github.com/capnproto/capnproto/archive/06a7136708955d91f8ddc1fa3d54e620eacba13e.tar.gz
wget -O capnproto.tar.gz https://github.com/capnproto/capnproto/archive/df67f26862c011c6efb31a28fb0d2a2ca1b94ac8.tar.gz
wget -O rapidjson.tar.gz https://github.com/miloyip/rapidjson/archive/v1.1.0.tar.gz
md5sum -c <<EOF
a24b4d6e671d97c8a2aacd0dd4677726 capnproto.tar.gz
80c28dc26842b84dcbe8d930a97c70cf capnproto.tar.gz
badd12c511e081fec6c89c43a7027bce rapidjson.tar.gz
EOF

@ -137,9 +137,6 @@ struct LaminarInterface {
// Abort all running jobs
virtual void abortAll() = 0;
// Callback for laminar to reap child processes.
virtual void reapChildren() = 0;
// Callback to handle a configuration modification notification
virtual void notifyConfigChanged() = 0;
};

@ -360,7 +360,6 @@ void Laminar::sendStatus(LaminarClient* client) {
}
j.EndObject();
client->sendMessage(j.str());
}
Laminar::~Laminar() {
@ -501,21 +500,6 @@ std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) {
return run;
}
// Reaps a zombie and steps the corresponding Run to its next state.
// Should be called on SIGCHLD
void Laminar::reapChildren() {
int ret = 0;
pid_t pid;
while((pid = waitpid(-1, &ret, WNOHANG)) > 0) {
LLOG(INFO, "Reaping", pid);
auto it = pids.find(pid);
it->second->fulfill(kj::mv(ret));
pids.erase(it);
}
assignNewJobs();
}
void Laminar::notifyConfigChanged()
{
loadConfiguration();
@ -688,6 +672,7 @@ bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
// notify the rpc client if the start command was used
run->started.fulfiller->fulfill();
// this actually spawns the first step
srv->addTask(handleRunStep(run.get()).then([this,run]{
runFinished(run.get());
}));
@ -716,11 +701,9 @@ kj::Promise<void> Laminar::handleRunStep(Run* run) {
return kj::READY_NOW;
}
kj::Promise<int> exited = srv->onChildExit(run->current_pid);
// promise is fulfilled when the process is reaped. But first we wait for all
// output from the pipe (Run::output_fd) to be consumed.
auto paf = kj::newPromiseAndFulfiller<int>();
pids.emplace(run->current_pid, kj::mv(paf.fulfiller));
return srv->readDescriptor(run->output_fd, [this,run](const char*b,size_t n){
// handle log output
std::string s(b, n);
@ -729,7 +712,7 @@ kj::Promise<void> Laminar::handleRunStep(Run* run) {
if(c->scope.wantsLog(run->name, run->build))
c->sendMessage(s);
}
}).then([p = std::move(paf.promise)]() mutable {
}).then([p = std::move(exited)]() mutable {
// wait until the process is reaped
return kj::mv(p);
}).then([this, run](int status){
@ -791,7 +774,9 @@ void Laminar::runFinished(Run * r) {
w->complete(r);
}
// erase reference to run from activeJobs
// erase reference to run from activeJobs. Since runFinished is called in a
// lambda whose context contains a shared_ptr<Run>, the run won't be deleted
// until the context is destroyed at the end of the lambda execution.
activeJobs.byRunPtr().erase(r);
// remove old run directories
@ -813,6 +798,9 @@ void Laminar::runFinished(Run * r) {
break;
fs::remove_all(d);
}
// in case we freed up an executor, check the queue
assignNewJobs();
}
class MappedFileImpl : public MappedFile {

@ -58,7 +58,6 @@ public:
kj::Own<MappedFile> getArtefact(std::string path) override;
std::string getCustomCss() override;
void abortAll() override;
void reapChildren() override;
void notifyConfigChanged() override;
private:
@ -83,7 +82,6 @@ private:
std::unordered_map<std::string, std::set<std::string>> jobTags;
RunSet activeJobs;
std::map<pid_t, kj::Own<kj::PromiseFulfiller<int>>> pids;
Database* db;
Server* srv;
NodeMap nodes;

@ -18,8 +18,8 @@
///
#include "laminar.h"
#include "log.h"
#include <signal.h>
#include <kj/async-unix.h>
static Laminar* laminar;
@ -35,9 +35,10 @@ int main(int argc, char** argv) {
}
laminar = new Laminar;
kj::UnixEventPort::captureChildExit();
signal(SIGINT, &laminar_quit);
signal(SIGTERM, &laminar_quit);
laminar->run();
delete laminar;

@ -139,7 +139,10 @@ void Run::addEnv(std::string path) {
void Run::abort() {
// clear all pending scripts
std::queue<Script>().swap(scripts);
kill(-current_pid, SIGTERM);
// if the Maybe is empty, wait() was already called on this process
KJ_IF_MAYBE(p, current_pid) {
kill(-*p, SIGTERM);
}
}
void Run::reaped(int status) {

@ -84,7 +84,7 @@ public:
std::string reasonMsg;
uint build = 0;
std::string log;
pid_t current_pid;
kj::Maybe<pid_t> current_pid;
int output_fd;
std::unordered_map<std::string, std::string> params;
kj::Promise<void> timeout = kj::NEVER_DONE;

@ -26,6 +26,7 @@
#include <capnp/rpc-twoparty.h>
#include <capnp/rpc.capnp.h>
#include <kj/async-io.h>
#include <kj/async-unix.h>
#include <kj/threadlocal.h>
#include <sys/eventfd.h>
@ -386,30 +387,12 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
return httpServer->listenHttp(*listener).attach(kj::mv(listener));
}));
// handle SIGCHLD
{
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGCHLD);
sigprocmask(SIG_BLOCK, &mask, nullptr);
int sigchld = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
auto event = ioContext.lowLevelProvider->wrapInputFd(sigchld, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
reapWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char* buf, size_t){
const struct signalfd_siginfo* siginfo = reinterpret_cast<const struct signalfd_siginfo*>(buf);
KJ_ASSERT(siginfo->ssi_signo == SIGCHLD);
laminarInterface.reapChildren();
}).attach(std::move(event)).attach(std::move(buffer));
}
// handle watched paths
{
inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
auto event = ioContext.lowLevelProvider->wrapInputFd(inotify_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
pathWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char*, size_t){
pathWatch = readDescriptor(inotify_fd, [this](const char*, size_t){
laminarInterface.notifyConfigChanged();
}).attach(std::move(event)).attach(std::move(buffer));
});
}
}
@ -438,7 +421,7 @@ void Server::start() {
childTasks.onEmpty().wait(ioContext.waitScope);
// 4. run the loop once more to send any pending output to websocket clients
ioContext.waitScope.poll();
// 5. return: websockets will be destructed
// 5. return: websockets will be destructed when class is deleted
}
void Server::stop() {
@ -453,8 +436,7 @@ kj::Promise<void> Server::readDescriptor(int fd, std::function<void(const char*,
return handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer));
}
void Server::addTask(kj::Promise<void>&& task)
{
void Server::addTask(kj::Promise<void>&& task) {
childTasks.add(kj::mv(task));
}
@ -464,6 +446,10 @@ kj::Promise<void> Server::addTimeout(int seconds, std::function<void ()> cb) {
}).eagerlyEvaluate(nullptr);
}
kj::Promise<int> Server::onChildExit(kj::Maybe<pid_t> &pid) {
return ioContext.unixEventPort.onChildExit(pid);
}
void Server::addWatchPath(const char* dpath) {
inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE);
}

@ -48,6 +48,8 @@ public:
// add a one-shot timer callback
kj::Promise<void> addTimeout(int seconds, std::function<void()> cb);
// get a promise which resolves when a child process exits
kj::Promise<int> onChildExit(kj::Maybe<pid_t>& pid);
// add a path to be watched for changes
void addWatchPath(const char* dpath);

@ -31,7 +31,7 @@ protected:
}
void wait() {
int state = -1;
waitpid(run.current_pid, &state, 0);
waitpid(run.current_pid.orDefault(0), &state, 0);
run.reaped(state);
}
void runAll() {

@ -66,7 +66,6 @@ public:
MOCK_METHOD4(setParam, bool(std::string job, uint buildNum, std::string param, std::string value));
MOCK_METHOD0(getCustomCss, std::string());
MOCK_METHOD0(abortAll, void());
MOCK_METHOD0(reapChildren, void());
MOCK_METHOD0(notifyConfigChanged, void());
};

Loading…
Cancel
Save