From 4ffc22c6575675e5707d036652c495ef5ca09bf0 Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Fri, 20 Jul 2018 14:15:59 +0300 Subject: [PATCH] use promises to control job runs This is a refactor that more cleanly uses the kj framework for handling processes spawned by Runs. This obviates the workaround introduced back in ff42dae7cc3, and incidentally now requires c++14. Part of #49 refactor --- CMakeLists.txt | 2 +- docker-build-centos.sh | 4 +- src/laminar.cpp | 372 ++++++++++++++++++++--------------------- src/laminar.h | 13 +- src/run.cpp | 132 +++++++-------- src/run.h | 34 ++-- src/server.cpp | 9 +- src/server.h | 3 +- test/test-run.cpp | 4 +- 9 files changed, 279 insertions(+), 294 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f8b4c7..0a9f46c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,7 +22,7 @@ cmake_policy(SET CMP0058 NEW) set(CMAKE_INCLUDE_CURRENT_DIR ON) -add_definitions("-std=c++11 -Wall -Wextra -Wno-unused-parameter -Wno-sign-compare") +add_definitions("-std=c++14 -Wall -Wextra -Wno-unused-parameter -Wno-sign-compare") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Werror -DDEBUG") # This macro takes a list of files, gzips them and converts the output into diff --git a/docker-build-centos.sh b/docker-build-centos.sh index ee71fab..ea09c2e 100755 --- a/docker-build-centos.sh +++ b/docker-build-centos.sh @@ -8,11 +8,13 @@ VERSION=$(cd "$SOURCE_DIR" && git describe --tags --abbrev=8 --dirty | tr - .) DOCKER_TAG=$(docker build -q - < Laminar::queueJob(std::string name, ParamMap params) { return run; } -bool Laminar::stepRun(std::shared_ptr run) { - bool complete = run->step(); - if(!complete) { - srv->addDescriptor(run->fd, [this, run](const char* b,size_t n){ - handleRunLog(run, std::string(b,n)); - }); - } - return complete; -} - -void Laminar::handleRunLog(std::shared_ptr run, std::string s) { - run->log += s; - for(LaminarClient* c : clients) { - if(c->scope.wantsLog(run->name, run->build)) - c->sendMessage(s); - } -} - // Reaps a zombie and steps the corresponding Run to its next state. // Should be called on SIGCHLD void Laminar::reapChildren() { int ret = 0; pid_t pid; - constexpr int bufsz = 1024; - static thread_local char buf[bufsz]; while((pid = waitpid(-1, &ret, WNOHANG)) > 0) { LLOG(INFO, "Reaping", pid); - auto it = activeJobs.byPid().find(pid); - std::shared_ptr run = *it; - // The main event loop might schedule this SIGCHLD handler before the final - // output handler (from addDescriptor). In that case, because it keeps a - // shared_ptr to the run it would successfully add to the log output buffer, - // but by then reapAdvance would have stored the log and ensured no-one cares. - // Preempt this case by forcing a final (non-blocking) read here. - for(ssize_t n = read(run->fd, buf, bufsz); n > 0; n = read(run->fd, buf, bufsz)) { - handleRunLog(run, std::string(buf, static_cast(n))); - } - bool completed = true; - activeJobs.byPid().modify(it, [&](std::shared_ptr run){ - run->reaped(ret); - completed = stepRun(run); - }); - if(completed) - run->complete(); + auto it = pids.find(pid); + it->second->fulfill(kj::mv(ret)); + pids.erase(it); } assignNewJobs(); @@ -586,169 +552,193 @@ bool Laminar::nodeCanQueue(const Node& node, const Run& run) const { return false; } +bool Laminar::tryStartRun(std::shared_ptr run, int queueIndex) { + for(auto& sn : nodes) { + std::shared_ptr node = sn.second; + if(nodeCanQueue(*node.get(), *run)) { + fs::path cfgDir = fs::path(homeDir)/"cfg"; + boost::system::error_code err; + + // create a workspace for this job if it doesn't exist + fs::path ws = fs::path(homeDir)/"run"/run->name/"workspace"; + if(!fs::exists(ws)) { + if(!fs::create_directories(ws, err)) { + LLOG(ERROR, "Could not create job workspace", run->name); + break; + } + // prepend the workspace init script + if(fs::exists(cfgDir/"jobs"/run->name+".init")) + run->addScript((cfgDir/"jobs"/run->name+".init").string(), ws.string()); + } + + uint buildNum = buildNums[run->name] + 1; + // create the run directory + fs::path rd = fs::path(homeDir)/"run"/run->name/std::to_string(buildNum); + bool createWorkdir = true; + if(fs::is_directory(rd)) { + LLOG(WARNING, "Working directory already exists, removing", rd.string()); + fs::remove_all(rd, err); + if(err) { + LLOG(WARNING, "Failed to remove working directory", err.message()); + createWorkdir = false; + } + } + if(createWorkdir && !fs::create_directory(rd, err)) { + LLOG(ERROR, "Could not create working directory", rd.string()); + break; + } + run->runDir = rd.string(); + + // create an archive directory + fs::path archive = fs::path(homeDir)/"archive"/run->name/std::to_string(buildNum); + if(fs::is_directory(archive)) { + LLOG(WARNING, "Archive directory already exists", archive.string()); + } else if(!fs::create_directories(archive)) { + LLOG(ERROR, "Could not create archive directory", archive.string()); + break; + } + + // add scripts + // global before-run script + if(fs::exists(cfgDir/"before")) + run->addScript((cfgDir/"before").string()); + // per-node before-run script + if(fs::exists(cfgDir/"nodes"/node->name+".before")) + run->addScript((cfgDir/"nodes"/node->name+".before").string()); + // job before-run script + if(fs::exists(cfgDir/"jobs"/run->name+".before")) + run->addScript((cfgDir/"jobs"/run->name+".before").string()); + // main run script. must exist. + run->addScript((cfgDir/"jobs"/run->name+".run").string()); + // job after-run script + if(fs::exists(cfgDir/"jobs"/run->name+".after")) + run->addScript((cfgDir/"jobs"/run->name+".after").string()); + // per-node after-run script + if(fs::exists(cfgDir/"nodes"/node->name+".after")) + run->addScript((cfgDir/"nodes"/node->name+".after").string()); + // global after-run script + if(fs::exists(cfgDir/"after")) + run->addScript((cfgDir/"after").string()); + + // add environment files + if(fs::exists(cfgDir/"env")) + run->addEnv((cfgDir/"env").string()); + if(fs::exists(cfgDir/"nodes"/node->name+".env")) + run->addEnv((cfgDir/"nodes"/node->name+".env").string()); + if(fs::exists(cfgDir/"jobs"/run->name+".env")) + run->addEnv((cfgDir/"jobs"/run->name+".env").string()); + + // add job timeout if specified + if(fs::exists(cfgDir/"jobs"/run->name+".conf")) { + int timeout = parseConfFile(fs::path(cfgDir/"jobs"/run->name+".conf").string().c_str()).get("TIMEOUT", 0); + if(timeout > 0) { + // A raw pointer to run is used here so as not to have a circular reference. + // The captured raw pointer is safe because if the Run is destroyed the Promise + // will be cancelled and the callback never called. + Run* r = run.get(); + r->timeout = srv->addTimeout(timeout, [r](){ + r->abort(); + }); + } + } + + // start the job + node->busyExecutors++; + run->node = node; + run->startedAt = time(nullptr); + run->laminarHome = homeDir; + run->build = buildNum; + // set the last known result if exists + db->stmt("SELECT result FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1") + .bind(run->name) + .fetch([=](int result){ + run->lastResult = RunState(result); + }); + // update next build number + buildNums[run->name] = buildNum; + + LLOG(INFO, "Queued job to node", run->name, run->build, node->name); + + // notify clients + Json j; + j.set("type", "job_started") + .startObject("data") + .set("queueIndex", queueIndex) + .set("name", run->name) + .set("queued", run->startedAt - run->queuedAt) + .set("started", run->startedAt) + .set("number", run->build) + .set("reason", run->reason()); + db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1") + .bind(run->name) + .fetch([&](uint etc){ + j.set("etc", time(nullptr) + etc); + }); + j.EndObject(); + const char* msg = j.str(); + for(LaminarClient* c : clients) { + if(c->scope.wantsStatus(run->name, run->build) + // The run page also should know that another job has started + // (so maybe it can show a previously hidden "next" button). + // Hence this small hack: + || (c->scope.type == MonitorScope::Type::RUN && c->scope.job == run->name)) + c->sendMessage(msg); + } + + // notify the rpc client if the start command was used + run->started.fulfiller->fulfill(); + + srv->addTask(handleRunStep(run.get()).then([this,run]{ + runFinished(run.get()); + })); + + return true; + } + } + return false; +} + void Laminar::assignNewJobs() { auto it = queuedJobs.begin(); while(it != queuedJobs.end()) { - bool assigned = false; - for(auto& sn : nodes) { - std::shared_ptr node = sn.second; - std::shared_ptr run = *it; - if(nodeCanQueue(*node.get(), *run)) { - fs::path cfgDir = fs::path(homeDir)/"cfg"; - boost::system::error_code err; - - // create a workspace for this job if it doesn't exist - fs::path ws = fs::path(homeDir)/"run"/run->name/"workspace"; - if(!fs::exists(ws)) { - if(!fs::create_directories(ws, err)) { - LLOG(ERROR, "Could not create job workspace", run->name); - break; - } - // prepend the workspace init script - if(fs::exists(cfgDir/"jobs"/run->name+".init")) - run->addScript((cfgDir/"jobs"/run->name+".init").string(), ws.string()); - } - - uint buildNum = buildNums[run->name] + 1; - // create the run directory - fs::path rd = fs::path(homeDir)/"run"/run->name/std::to_string(buildNum); - bool createWorkdir = true; - if(fs::is_directory(rd)) { - LLOG(WARNING, "Working directory already exists, removing", rd.string()); - fs::remove_all(rd, err); - if(err) { - LLOG(WARNING, "Failed to remove working directory", err.message()); - createWorkdir = false; - } - } - if(createWorkdir && !fs::create_directory(rd, err)) { - LLOG(ERROR, "Could not create working directory", rd.string()); - break; - } - run->runDir = rd.string(); - - // create an archive directory - fs::path archive = fs::path(homeDir)/"archive"/run->name/std::to_string(buildNum); - if(fs::is_directory(archive)) { - LLOG(WARNING, "Archive directory already exists", archive.string()); - } else if(!fs::create_directories(archive)) { - LLOG(ERROR, "Could not create archive directory", archive.string()); - break; - } - - // add scripts - // global before-run script - if(fs::exists(cfgDir/"before")) - run->addScript((cfgDir/"before").string()); - // per-node before-run script - if(fs::exists(cfgDir/"nodes"/node->name+".before")) - run->addScript((cfgDir/"nodes"/node->name+".before").string()); - // job before-run script - if(fs::exists(cfgDir/"jobs"/run->name+".before")) - run->addScript((cfgDir/"jobs"/run->name+".before").string()); - // main run script. must exist. - run->addScript((cfgDir/"jobs"/run->name+".run").string()); - // job after-run script - if(fs::exists(cfgDir/"jobs"/run->name+".after")) - run->addScript((cfgDir/"jobs"/run->name+".after").string()); - // per-node after-run script - if(fs::exists(cfgDir/"nodes"/node->name+".after")) - run->addScript((cfgDir/"nodes"/node->name+".after").string()); - // global after-run script - if(fs::exists(cfgDir/"after")) - run->addScript((cfgDir/"after").string()); - - // add environment files - if(fs::exists(cfgDir/"env")) - run->addEnv((cfgDir/"env").string()); - if(fs::exists(cfgDir/"nodes"/node->name+".env")) - run->addEnv((cfgDir/"nodes"/node->name+".env").string()); - if(fs::exists(cfgDir/"jobs"/run->name+".env")) - run->addEnv((cfgDir/"jobs"/run->name+".env").string()); - - // add job timeout if specified - if(fs::exists(cfgDir/"jobs"/run->name+".conf")) { - int timeout = parseConfFile(fs::path(cfgDir/"jobs"/run->name+".conf").string().c_str()).get("TIMEOUT", 0); - if(timeout > 0) { - // A raw pointer to run is used here so as not to have a circular reference. - // The captured raw pointer is safe because if the Run is destroyed the Promise - // will be cancelled and the callback never called. - Run* r = run.get(); - r->timeout = srv->addTimeout(timeout, [r](){ - r->abort(); - }); - } - } - - // start the job - node->busyExecutors++; - run->node = node; - run->startedAt = time(nullptr); - run->laminarHome = homeDir; - run->build = buildNum; - // set the last known result if exists - db->stmt("SELECT result FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1") - .bind(run->name) - .fetch([=](int result){ - run->lastResult = RunState(result); - }); - // update next build number - buildNums[run->name] = buildNum; - - LLOG(INFO, "Queued job to node", run->name, run->build, node->name); - - // notify clients - Json j; - j.set("type", "job_started") - .startObject("data") - .set("queueIndex", std::distance(it,queuedJobs.begin())) - .set("name", run->name) - .set("queued", run->startedAt - run->queuedAt) - .set("started", run->startedAt) - .set("number", run->build) - .set("reason", run->reason()); - db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1") - .bind(run->name) - .fetch([&](uint etc){ - j.set("etc", time(nullptr) + etc); - }); - j.EndObject(); - const char* msg = j.str(); - for(LaminarClient* c : clients) { - if(c->scope.wantsStatus(run->name, run->build) - // The run page also should know that another job has started - // (so maybe it can show a previously hidden "next" button). - // Hence this small hack: - || (c->scope.type == MonitorScope::Type::RUN && c->scope.job == run->name)) - c->sendMessage(msg); - } - - // notify the rpc client if the start command was used - run->started.fulfiller->fulfill(); - - // setup run completion handler - run->notifyCompletion = [this](Run* r) { runFinished(r); }; - - // trigger the first step of the run - if(stepRun(run)) { - // should never happen - LLOG(INFO, "No steps for run"); - run->complete(); - } - - assigned = true; - break; - } - } - if(assigned) { + if(tryStartRun(*it, std::distance(it, queuedJobs.begin()))) { activeJobs.insert(*it); it = queuedJobs.erase(it); - } else + } else { ++it; - + } } } +kj::Promise Laminar::handleRunStep(Run* run) { + if(run->step()) { + // no more steps + return kj::READY_NOW; + } + + // promise is fulfilled when the process is reaped. But first we wait for all + // output from the pipe (Run::output_fd) to be consumed. + auto paf = kj::newPromiseAndFulfiller(); + pids.emplace(run->current_pid, kj::mv(paf.fulfiller)); + + return srv->readDescriptor(run->output_fd, [this,run](const char*b,size_t n){ + // handle log output + std::string s(b, n); + run->log += s; + for(LaminarClient* c : clients) { + if(c->scope.wantsLog(run->name, run->build)) + c->sendMessage(s); + } + }).then([p = std::move(paf.promise)]() mutable { + // wait until the process is reaped + return kj::mv(p); + }).then([this, run](int status){ + run->reaped(status); + // next step in Run + return handleRunStep(run); + }); +} + void Laminar::runFinished(Run * r) { std::shared_ptr node = r->node; diff --git a/src/laminar.h b/src/laminar.h index 1f8efd3..ce00895 100644 --- a/src/laminar.h +++ b/src/laminar.h @@ -1,5 +1,5 @@ /// -/// Copyright 2015-2017 Oliver Giles +/// Copyright 2015-2018 Oliver Giles /// /// This file is part of Laminar /// @@ -64,16 +64,16 @@ public: private: bool loadConfiguration(); void assignNewJobs(); - bool stepRun(std::shared_ptr run); - void handleRunLog(std::shared_ptr run, std::string log); + bool tryStartRun(std::shared_ptr run, int queueIndex); + kj::Promise handleRunStep(Run *run); void runFinished(Run*); bool nodeCanQueue(const Node&, const Run&) const; // expects that Json has started an array void populateArtifacts(Json& out, std::string job, uint num) const; - Run* activeRun(std::string name, uint num) { - auto it = activeJobs.byRun().find(boost::make_tuple(name, num)); - return it == activeJobs.byRun().end() ? nullptr : it->get(); + Run* activeRun(const std::string name, uint num) { + auto it = activeJobs.byNameNumber().find(boost::make_tuple(name, num)); + return it == activeJobs.byNameNumber().end() ? nullptr : it->get(); } std::list> queuedJobs; @@ -83,6 +83,7 @@ private: std::unordered_map> jobTags; RunSet activeJobs; + std::map>> pids; Database* db; Server* srv; NodeMap nodes; diff --git a/src/run.cpp b/src/run.cpp index ec96151..2fc8169 100644 --- a/src/run.cpp +++ b/src/run.cpp @@ -1,5 +1,5 @@ /// -/// Copyright 2015-2017 Oliver Giles +/// Copyright 2015-2018 Oliver Giles /// /// This file is part of Laminar /// @@ -41,9 +41,10 @@ std::string to_string(const RunState& rs) { } -Run::Run() { - result = RunState::SUCCESS; - lastResult = RunState::UNKNOWN; +Run::Run() : + result(RunState::SUCCESS), + lastResult(RunState::UNKNOWN) +{ } Run::~Run() { @@ -58,74 +59,73 @@ std::string Run::reason() const { } bool Run::step() { - if(scripts.size()) { - currentScript = scripts.front(); - scripts.pop(); + if(!scripts.size()) + return true; - int pfd[2]; - pipe(pfd); - pid_t pid = fork(); - if(pid == 0) { // child - // reset signal mask (SIGCHLD blocked in Laminar::start) - sigset_t mask; - sigemptyset(&mask); - sigaddset(&mask, SIGCHLD); - sigprocmask(SIG_UNBLOCK, &mask, nullptr); + currentScript = scripts.front(); + scripts.pop(); - // set pgid == pid for easy killing on abort - setpgid(0, 0); + int pfd[2]; + pipe(pfd); + pid_t pid = fork(); + if(pid == 0) { // child + // reset signal mask (SIGCHLD blocked in Laminar::start) + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGCHLD); + sigprocmask(SIG_UNBLOCK, &mask, nullptr); - close(pfd[0]); - dup2(pfd[1], 1); - dup2(pfd[1], 2); - close(pfd[1]); - std::string buildNum = std::to_string(build); + // set pgid == pid for easy killing on abort + setpgid(0, 0); - std::string PATH = (fs::path(laminarHome)/"cfg"/"scripts").string() + ":"; - if(const char* p = getenv("PATH")) { - PATH.append(p); - } + close(pfd[0]); + dup2(pfd[1], 1); + dup2(pfd[1], 2); + close(pfd[1]); + std::string buildNum = std::to_string(build); - chdir(currentScript.cwd.c_str()); - - // conf file env vars - for(std::string file : env) { - StringMap vars = parseConfFile(file.c_str()); - for(auto& it : vars) { - setenv(it.first.c_str(), it.second.c_str(), true); - } - } - // parameterized vars - for(auto& pair : params) { - setenv(pair.first.c_str(), pair.second.c_str(), false); - } - - setenv("PATH", PATH.c_str(), true); - setenv("RUN", buildNum.c_str(), true); - setenv("JOB", name.c_str(), true); - if(!node->name.empty()) - setenv("NODE", node->name.c_str(), true); - setenv("RESULT", to_string(result).c_str(), true); - setenv("LAST_RESULT", to_string(lastResult).c_str(), true); - setenv("WORKSPACE", (fs::path(laminarHome)/"run"/name/"workspace").string().c_str(), true); - setenv("ARCHIVE", (fs::path(laminarHome)/"archive"/name/buildNum.c_str()).string().c_str(), true); - - fprintf(stderr, "[laminar] Executing %s\n", currentScript.path.c_str()); - execl(currentScript.path.c_str(), currentScript.path.c_str(), NULL); - // cannot use LLOG because stdout/stderr are captured - fprintf(stderr, "[laminar] Failed to execute %s\n", currentScript.path.c_str()); - _exit(1); + std::string PATH = (fs::path(laminarHome)/"cfg"/"scripts").string() + ":"; + if(const char* p = getenv("PATH")) { + PATH.append(p); } - LLOG(INFO, "Forked", currentScript.path, currentScript.cwd, pid); - close(pfd[1]); - fd = pfd[0]; - this->pid = pid; + chdir(currentScript.cwd.c_str()); - return false; - } else { - return true; + // conf file env vars + for(std::string file : env) { + StringMap vars = parseConfFile(file.c_str()); + for(auto& it : vars) { + setenv(it.first.c_str(), it.second.c_str(), true); + } + } + // parameterized vars + for(auto& pair : params) { + setenv(pair.first.c_str(), pair.second.c_str(), false); + } + + setenv("PATH", PATH.c_str(), true); + setenv("RUN", buildNum.c_str(), true); + setenv("JOB", name.c_str(), true); + if(!node->name.empty()) + setenv("NODE", node->name.c_str(), true); + setenv("RESULT", to_string(result).c_str(), true); + setenv("LAST_RESULT", to_string(lastResult).c_str(), true); + setenv("WORKSPACE", (fs::path(laminarHome)/"run"/name/"workspace").string().c_str(), true); + setenv("ARCHIVE", (fs::path(laminarHome)/"archive"/name/buildNum.c_str()).string().c_str(), true); + + fprintf(stderr, "[laminar] Executing %s\n", currentScript.path.c_str()); + execl(currentScript.path.c_str(), currentScript.path.c_str(), NULL); + // cannot use LLOG because stdout/stderr are captured + fprintf(stderr, "[laminar] Failed to execute %s\n", currentScript.path.c_str()); + _exit(1); } + + LLOG(INFO, "Forked", currentScript.path, currentScript.cwd, pid); + close(pfd[1]); + + current_pid = pid; + output_fd = pfd[0]; + return false; } void Run::addScript(std::string scriptPath, std::string scriptWorkingDir) { @@ -139,7 +139,7 @@ void Run::addEnv(std::string path) { void Run::abort() { // clear all pending scripts std::queue