1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2024-10-27 20:34:20 +00:00

use promises to control job runs

This is a refactor that more cleanly uses the kj framework for handling
processes spawned by Runs. This obviates the workaround introduced back in
ff42dae7cc, and incidentally now requires c++14.

Part of #49 refactor
This commit is contained in:
Oliver Giles 2018-07-20 14:15:59 +03:00
parent e506142fa4
commit 4ffc22c657
9 changed files with 279 additions and 294 deletions

View File

@ -22,7 +22,7 @@ cmake_policy(SET CMP0058 NEW)
set(CMAKE_INCLUDE_CURRENT_DIR ON) set(CMAKE_INCLUDE_CURRENT_DIR ON)
add_definitions("-std=c++11 -Wall -Wextra -Wno-unused-parameter -Wno-sign-compare") add_definitions("-std=c++14 -Wall -Wextra -Wno-unused-parameter -Wno-sign-compare")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Werror -DDEBUG") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Werror -DDEBUG")
# This macro takes a list of files, gzips them and converts the output into # This macro takes a list of files, gzips them and converts the output into

View File

@ -8,11 +8,13 @@ VERSION=$(cd "$SOURCE_DIR" && git describe --tags --abbrev=8 --dirty | tr - .)
DOCKER_TAG=$(docker build -q - <<EOS DOCKER_TAG=$(docker build -q - <<EOS
FROM centos:7 FROM centos:7
RUN yum -y install epel-release && yum -y install rpm-build cmake3 make gcc gcc-c++ wget sqlite-devel boost-devel zlib-devel RUN yum -y install epel-release centos-release-scl && yum-config-manager --enable rhel-server-rhscl-7-rpms && yum -y install rpm-build cmake3 make devtoolset-7-gcc-c++ wget sqlite-devel boost-devel zlib-devel
EOS EOS
) )
docker run --rm -i -v $SOURCE_DIR:/root/rpmbuild/SOURCES/laminar-$VERSION:ro -v $OUTPUT_DIR:/output $DOCKER_TAG bash -xe <<EOS docker run --rm -i -v $SOURCE_DIR:/root/rpmbuild/SOURCES/laminar-$VERSION:ro -v $OUTPUT_DIR:/output $DOCKER_TAG bash -xe <<EOS
# for new gcc
export PATH=/opt/rh/devtoolset-7/root/usr/bin:\$PATH
mkdir /build mkdir /build
cd /build cd /build

View File

@ -501,50 +501,16 @@ std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) {
return run; return run;
} }
bool Laminar::stepRun(std::shared_ptr<Run> run) {
bool complete = run->step();
if(!complete) {
srv->addDescriptor(run->fd, [this, run](const char* b,size_t n){
handleRunLog(run, std::string(b,n));
});
}
return complete;
}
void Laminar::handleRunLog(std::shared_ptr<Run> run, std::string s) {
run->log += s;
for(LaminarClient* c : clients) {
if(c->scope.wantsLog(run->name, run->build))
c->sendMessage(s);
}
}
// Reaps a zombie and steps the corresponding Run to its next state. // Reaps a zombie and steps the corresponding Run to its next state.
// Should be called on SIGCHLD // Should be called on SIGCHLD
void Laminar::reapChildren() { void Laminar::reapChildren() {
int ret = 0; int ret = 0;
pid_t pid; pid_t pid;
constexpr int bufsz = 1024;
static thread_local char buf[bufsz];
while((pid = waitpid(-1, &ret, WNOHANG)) > 0) { while((pid = waitpid(-1, &ret, WNOHANG)) > 0) {
LLOG(INFO, "Reaping", pid); LLOG(INFO, "Reaping", pid);
auto it = activeJobs.byPid().find(pid); auto it = pids.find(pid);
std::shared_ptr<Run> run = *it; it->second->fulfill(kj::mv(ret));
// The main event loop might schedule this SIGCHLD handler before the final pids.erase(it);
// output handler (from addDescriptor). In that case, because it keeps a
// shared_ptr to the run it would successfully add to the log output buffer,
// but by then reapAdvance would have stored the log and ensured no-one cares.
// Preempt this case by forcing a final (non-blocking) read here.
for(ssize_t n = read(run->fd, buf, bufsz); n > 0; n = read(run->fd, buf, bufsz)) {
handleRunLog(run, std::string(buf, static_cast<size_t>(n)));
}
bool completed = true;
activeJobs.byPid().modify(it, [&](std::shared_ptr<Run> run){
run->reaped(ret);
completed = stepRun(run);
});
if(completed)
run->complete();
} }
assignNewJobs(); assignNewJobs();
@ -586,13 +552,9 @@ bool Laminar::nodeCanQueue(const Node& node, const Run& run) const {
return false; return false;
} }
void Laminar::assignNewJobs() { bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
auto it = queuedJobs.begin();
while(it != queuedJobs.end()) {
bool assigned = false;
for(auto& sn : nodes) { for(auto& sn : nodes) {
std::shared_ptr<Node> node = sn.second; std::shared_ptr<Node> node = sn.second;
std::shared_ptr<Run> run = *it;
if(nodeCanQueue(*node.get(), *run)) { if(nodeCanQueue(*node.get(), *run)) {
fs::path cfgDir = fs::path(homeDir)/"cfg"; fs::path cfgDir = fs::path(homeDir)/"cfg";
boost::system::error_code err; boost::system::error_code err;
@ -701,7 +663,7 @@ void Laminar::assignNewJobs() {
Json j; Json j;
j.set("type", "job_started") j.set("type", "job_started")
.startObject("data") .startObject("data")
.set("queueIndex", std::distance(it,queuedJobs.begin())) .set("queueIndex", queueIndex)
.set("name", run->name) .set("name", run->name)
.set("queued", run->startedAt - run->queuedAt) .set("queued", run->startedAt - run->queuedAt)
.set("started", run->startedAt) .set("started", run->startedAt)
@ -726,27 +688,55 @@ void Laminar::assignNewJobs() {
// notify the rpc client if the start command was used // notify the rpc client if the start command was used
run->started.fulfiller->fulfill(); run->started.fulfiller->fulfill();
// setup run completion handler srv->addTask(handleRunStep(run.get()).then([this,run]{
run->notifyCompletion = [this](Run* r) { runFinished(r); }; runFinished(run.get());
}));
// trigger the first step of the run return true;
if(stepRun(run)) {
// should never happen
LLOG(INFO, "No steps for run");
run->complete();
} }
}
return false;
}
assigned = true; void Laminar::assignNewJobs() {
break; auto it = queuedJobs.begin();
} while(it != queuedJobs.end()) {
} if(tryStartRun(*it, std::distance(it, queuedJobs.begin()))) {
if(assigned) {
activeJobs.insert(*it); activeJobs.insert(*it);
it = queuedJobs.erase(it); it = queuedJobs.erase(it);
} else } else {
++it; ++it;
} }
}
}
kj::Promise<void> Laminar::handleRunStep(Run* run) {
if(run->step()) {
// no more steps
return kj::READY_NOW;
}
// promise is fulfilled when the process is reaped. But first we wait for all
// output from the pipe (Run::output_fd) to be consumed.
auto paf = kj::newPromiseAndFulfiller<int>();
pids.emplace(run->current_pid, kj::mv(paf.fulfiller));
return srv->readDescriptor(run->output_fd, [this,run](const char*b,size_t n){
// handle log output
std::string s(b, n);
run->log += s;
for(LaminarClient* c : clients) {
if(c->scope.wantsLog(run->name, run->build))
c->sendMessage(s);
}
}).then([p = std::move(paf.promise)]() mutable {
// wait until the process is reaped
return kj::mv(p);
}).then([this, run](int status){
run->reaped(status);
// next step in Run
return handleRunStep(run);
});
} }
void Laminar::runFinished(Run * r) { void Laminar::runFinished(Run * r) {

View File

@ -1,5 +1,5 @@
/// ///
/// Copyright 2015-2017 Oliver Giles /// Copyright 2015-2018 Oliver Giles
/// ///
/// This file is part of Laminar /// This file is part of Laminar
/// ///
@ -64,16 +64,16 @@ public:
private: private:
bool loadConfiguration(); bool loadConfiguration();
void assignNewJobs(); void assignNewJobs();
bool stepRun(std::shared_ptr<Run> run); bool tryStartRun(std::shared_ptr<Run> run, int queueIndex);
void handleRunLog(std::shared_ptr<Run> run, std::string log); kj::Promise<void> handleRunStep(Run *run);
void runFinished(Run*); void runFinished(Run*);
bool nodeCanQueue(const Node&, const Run&) const; bool nodeCanQueue(const Node&, const Run&) const;
// expects that Json has started an array // expects that Json has started an array
void populateArtifacts(Json& out, std::string job, uint num) const; void populateArtifacts(Json& out, std::string job, uint num) const;
Run* activeRun(std::string name, uint num) { Run* activeRun(const std::string name, uint num) {
auto it = activeJobs.byRun().find(boost::make_tuple(name, num)); auto it = activeJobs.byNameNumber().find(boost::make_tuple(name, num));
return it == activeJobs.byRun().end() ? nullptr : it->get(); return it == activeJobs.byNameNumber().end() ? nullptr : it->get();
} }
std::list<std::shared_ptr<Run>> queuedJobs; std::list<std::shared_ptr<Run>> queuedJobs;
@ -83,6 +83,7 @@ private:
std::unordered_map<std::string, std::set<std::string>> jobTags; std::unordered_map<std::string, std::set<std::string>> jobTags;
RunSet activeJobs; RunSet activeJobs;
std::map<pid_t, kj::Own<kj::PromiseFulfiller<int>>> pids;
Database* db; Database* db;
Server* srv; Server* srv;
NodeMap nodes; NodeMap nodes;

View File

@ -1,5 +1,5 @@
/// ///
/// Copyright 2015-2017 Oliver Giles /// Copyright 2015-2018 Oliver Giles
/// ///
/// This file is part of Laminar /// This file is part of Laminar
/// ///
@ -41,9 +41,10 @@ std::string to_string(const RunState& rs) {
} }
Run::Run() { Run::Run() :
result = RunState::SUCCESS; result(RunState::SUCCESS),
lastResult = RunState::UNKNOWN; lastResult(RunState::UNKNOWN)
{
} }
Run::~Run() { Run::~Run() {
@ -58,7 +59,9 @@ std::string Run::reason() const {
} }
bool Run::step() { bool Run::step() {
if(scripts.size()) { if(!scripts.size())
return true;
currentScript = scripts.front(); currentScript = scripts.front();
scripts.pop(); scripts.pop();
@ -119,13 +122,10 @@ bool Run::step() {
LLOG(INFO, "Forked", currentScript.path, currentScript.cwd, pid); LLOG(INFO, "Forked", currentScript.path, currentScript.cwd, pid);
close(pfd[1]); close(pfd[1]);
fd = pfd[0];
this->pid = pid;
current_pid = pid;
output_fd = pfd[0];
return false; return false;
} else {
return true;
}
} }
void Run::addScript(std::string scriptPath, std::string scriptWorkingDir) { void Run::addScript(std::string scriptPath, std::string scriptWorkingDir) {
@ -139,7 +139,7 @@ void Run::addEnv(std::string path) {
void Run::abort() { void Run::abort() {
// clear all pending scripts // clear all pending scripts
std::queue<Script>().swap(scripts); std::queue<Script>().swap(scripts);
kill(-pid, SIGTERM); kill(-current_pid, SIGTERM);
} }
void Run::reaped(int status) { void Run::reaped(int status) {
@ -153,7 +153,3 @@ void Run::reaped(int status) {
result = RunState::FAILED; result = RunState::FAILED;
// otherwise preserve earlier status // otherwise preserve earlier status
} }
void Run::complete() {
notifyCompletion(this);
}

View File

@ -1,5 +1,5 @@
/// ///
/// Copyright 2015-2017 Oliver Giles /// Copyright 2015-2018 Oliver Giles
/// ///
/// This file is part of Laminar /// This file is part of Laminar
/// ///
@ -52,12 +52,9 @@ public:
Run& operator=(const Run&) = delete; Run& operator=(const Run&) = delete;
// executes the next script (if any), returning true if there is nothing // executes the next script (if any), returning true if there is nothing
// more to be done - in this case the caller should call complete() // more to be done.
bool step(); bool step();
// call this when all scripts are done to get the notifyCompletion callback
void complete();
// adds a script to the queue of scripts to be executed by this run // adds a script to the queue of scripts to be executed by this run
void addScript(std::string scriptPath, std::string scriptWorkingDir); void addScript(std::string scriptPath, std::string scriptWorkingDir);
@ -76,7 +73,6 @@ public:
std::string reason() const; std::string reason() const;
std::function<void(Run*)> notifyCompletion;
std::shared_ptr<Node> node; std::shared_ptr<Node> node;
RunState result; RunState result;
RunState lastResult; RunState lastResult;
@ -88,8 +84,8 @@ public:
std::string reasonMsg; std::string reasonMsg;
uint build = 0; uint build = 0;
std::string log; std::string log;
pid_t pid; pid_t current_pid;
int fd; int output_fd;
std::unordered_map<std::string, std::string> params; std::unordered_map<std::string, std::string> params;
kj::Promise<void> timeout = kj::NEVER_DONE; kj::Promise<void> timeout = kj::NEVER_DONE;
kj::PromiseFulfillerPair<void> started = kj::newPromiseAndFulfiller<void>(); kj::PromiseFulfillerPair<void> started = kj::newPromiseAndFulfiller<void>();
@ -97,7 +93,6 @@ public:
time_t queuedAt; time_t queuedAt;
time_t startedAt; time_t startedAt;
private: private:
struct Script { struct Script {
std::string path; std::string path;
std::string cwd; std::string cwd;
@ -132,8 +127,6 @@ struct _run_same {
// A single Run can be fetched by... // A single Run can be fetched by...
struct _run_index : bmi::indexed_by< struct _run_index : bmi::indexed_by<
// their current running pid
bmi::hashed_unique<bmi::member<Run, pid_t, &Run::pid>>,
bmi::hashed_unique<bmi::composite_key< bmi::hashed_unique<bmi::composite_key<
std::shared_ptr<Run>, std::shared_ptr<Run>,
// a combination of their job name and build number // a combination of their job name and build number
@ -153,20 +146,17 @@ struct RunSet: public boost::multi_index_container<
std::shared_ptr<Run>, std::shared_ptr<Run>,
_run_index _run_index
> { > {
typename bmi::nth_index<RunSet, 0>::type& byPid() { return get<0>(); } typename bmi::nth_index<RunSet, 0>::type& byNameNumber() { return get<0>(); }
typename bmi::nth_index<RunSet, 0>::type const& byPid() const { return get<0>(); } typename bmi::nth_index<RunSet, 0>::type const& byNameNumber() const { return get<0>(); }
typename bmi::nth_index<RunSet, 1>::type& byRun() { return get<1>(); } typename bmi::nth_index<RunSet, 1>::type& byRunPtr() { return get<1>(); }
typename bmi::nth_index<RunSet, 1>::type const& byRun() const { return get<1>(); } typename bmi::nth_index<RunSet, 1>::type const& byRunPtr() const { return get<1>(); }
typename bmi::nth_index<RunSet, 2>::type& byRunPtr() { return get<2>(); } typename bmi::nth_index<RunSet, 2>::type& byStartedAt() { return get<2>(); }
typename bmi::nth_index<RunSet, 2>::type const& byRunPtr() const { return get<2>(); } typename bmi::nth_index<RunSet, 2>::type const& byStartedAt() const { return get<2>(); }
typename bmi::nth_index<RunSet, 3>::type& byStartedAt() { return get<3>(); } typename bmi::nth_index<RunSet, 3>::type& byJobName() { return get<3>(); }
typename bmi::nth_index<RunSet, 3>::type const& byStartedAt() const { return get<3>(); } typename bmi::nth_index<RunSet, 3>::type const& byJobName() const { return get<3>(); }
typename bmi::nth_index<RunSet, 4>::type& byJobName() { return get<4>(); }
typename bmi::nth_index<RunSet, 4>::type const& byJobName() const { return get<4>(); }
}; };
#endif // LAMINAR_RUN_H_ #endif // LAMINAR_RUN_H_

View File

@ -447,10 +447,15 @@ void Server::stop() {
eventfd_write(efd_quit, 1); eventfd_write(efd_quit, 1);
} }
void Server::addDescriptor(int fd, std::function<void(const char*,size_t)> cb) { kj::Promise<void> Server::readDescriptor(int fd, std::function<void(const char*,size_t)> cb) {
auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE); auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
childTasks.add(handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer))); return handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer));
}
void Server::addTask(kj::Promise<void>&& task)
{
childTasks.add(kj::mv(task));
} }
kj::Promise<void> Server::addTimeout(int seconds, std::function<void ()> cb) { kj::Promise<void> Server::addTimeout(int seconds, std::function<void ()> cb) {

View File

@ -42,8 +42,9 @@ public:
// add a file descriptor to be monitored for output. The callback will be // add a file descriptor to be monitored for output. The callback will be
// invoked with the read data // invoked with the read data
void addDescriptor(int fd, std::function<void(const char*,size_t)> cb); kj::Promise<void> readDescriptor(int fd, std::function<void(const char*,size_t)> cb);
void addTask(kj::Promise<void> &&task);
// add a one-shot timer callback // add a one-shot timer callback
kj::Promise<void> addTimeout(int seconds, std::function<void()> cb); kj::Promise<void> addTimeout(int seconds, std::function<void()> cb);

View File

@ -31,7 +31,7 @@ protected:
} }
void wait() { void wait() {
int state = -1; int state = -1;
waitpid(run.pid, &state, 0); waitpid(run.current_pid, &state, 0);
run.reaped(state); run.reaped(state);
} }
void runAll() { void runAll() {
@ -41,7 +41,7 @@ protected:
std::string readAllOutput() { std::string readAllOutput() {
std::string res; std::string res;
char tmp[64]; char tmp[64];
for(ssize_t n = read(run.fd, tmp, 64); n > 0; n = read(run.fd, tmp, 64)) for(ssize_t n = read(run.output_fd, tmp, 64); n > 0; n = read(run.output_fd, tmp, 64))
res += std::string(tmp, n); res += std::string(tmp, n);
// strip the first "[laminar] executing.. line // strip the first "[laminar] executing.. line
return strchr(res.c_str(), '\n') + 1; return strchr(res.c_str(), '\n') + 1;