diff --git a/CMakeLists.txt b/CMakeLists.txt index 8faa6fb..dbd716e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -86,14 +86,15 @@ generate_compressed_bins(${CMAKE_BINARY_DIR} js/vue-router.min.js js/vue.min.js # (see resources.cpp where these are fetched) set(LAMINARD_CORE_SOURCES + src/conf.cpp src/database.cpp - src/server.cpp src/laminar.cpp - src/conf.cpp + src/leader.cpp src/http.cpp src/resources.cpp src/rpc.cpp src/run.cpp + src/server.cpp laminar.capnp.c++ index_html_size.h ) @@ -111,7 +112,7 @@ set(BUILD_TESTS FALSE CACHE BOOL "Build tests") if(BUILD_TESTS) find_package(GTest REQUIRED) include_directories(${GTEST_INCLUDE_DIRS} src) - add_executable(laminar-tests ${LAMINARD_CORE_SOURCES} ${COMPRESSED_BINS} test/main.cpp test/laminar-functional.cpp test/unit-conf.cpp test/unit-database.cpp test/unit-run.cpp) + add_executable(laminar-tests ${LAMINARD_CORE_SOURCES} ${COMPRESSED_BINS} test/main.cpp test/laminar-functional.cpp test/unit-conf.cpp test/unit-database.cpp) target_link_libraries(laminar-tests ${GTEST_LIBRARY} capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z) endif() diff --git a/UserManual.md b/UserManual.md index 7b57a29..d3812e8 100644 --- a/UserManual.md +++ b/UserManual.md @@ -323,8 +323,6 @@ Then in `example.run` echo $foo # prints "bar" ``` -This works because laminarc reads `$JOB` and `$NUM` and passes them to the laminar daemon as part of the `set` request. (It is thus possible to set environment variables on other jobs by overriding these variables, but this is not very useful). - --- # Archiving artefacts diff --git a/src/client.cpp b/src/client.cpp index 25c32f6..bdb723a 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -23,6 +23,7 @@ #include #include +#include #define EXIT_BAD_ARGUMENT 1 #define EXIT_OPERATION_FAILED 2 @@ -169,21 +170,10 @@ int main(int argc, char** argv) { fprintf(stderr, "Usage %s set param=value\n", argv[0]); return EXIT_BAD_ARGUMENT; } - auto req = laminar.setRequest(); - char* eq = strchr(argv[2], '='); - char* job = getenv("JOB"); - char* num = getenv("RUN"); - if(job && num && eq) { - char* name = argv[2]; - *eq++ = '\0'; - char* val = eq; - req.getRun().setJob(job); - req.getRun().setBuildNum(atoi(num)); - req.getParam().setName(name); - req.getParam().setValue(val); - req.send().wait(waitScope); + if(char* pipeNum = getenv("__LAMINAR_SETENV_PIPE")) { + write(atoi(pipeNum), argv[2], strlen(argv[2])); } else { - fprintf(stderr, "Missing $JOB or $RUN or param is not in the format key=value\n"); + fprintf(stderr, "Must be run from within a laminar job\n"); return EXIT_BAD_ARGUMENT; } } else if(strcmp(argv[1], "abort") == 0) { diff --git a/src/laminar.capnp b/src/laminar.capnp index 9e9e5c7..15c5297 100644 --- a/src/laminar.capnp +++ b/src/laminar.capnp @@ -5,11 +5,10 @@ interface LaminarCi { queue @0 (jobName :Text, params :List(JobParam)) -> (result :MethodResult); start @1 (jobName :Text, params :List(JobParam)) -> (result :MethodResult, buildNum :UInt32); run @2 (jobName :Text, params :List(JobParam)) -> (result :JobResult, buildNum :UInt32); - set @3 (run :Run, param :JobParam) -> (result :MethodResult); - listQueued @4 () -> (result :List(Text)); - listRunning @5 () -> (result :List(Run)); - listKnown @6 () -> (result :List(Text)); - abort @7 (run :Run) -> (result :MethodResult); + listQueued @3 () -> (result :List(Text)); + listRunning @4 () -> (result :List(Run)); + listKnown @5 () -> (result :List(Text)); + abort @6 (run :Run) -> (result :MethodResult); struct Run { job @0 :Text; diff --git a/src/laminar.cpp b/src/laminar.cpp index 5097bb4..fa3261e 100644 --- a/src/laminar.cpp +++ b/src/laminar.cpp @@ -581,16 +581,14 @@ std::shared_ptr Laminar::queueJob(std::string name, ParamMap params) { } bool Laminar::abort(std::string job, uint buildNum) { - if(Run* run = activeRun(job, buildNum)) { - run->abort(true); - return true; - } + if(Run* run = activeRun(job, buildNum)) + return run->abort(); return false; } void Laminar::abortAll() { for(std::shared_ptr run : activeJobs) { - run->abort(false); + run->abort(); } } @@ -598,7 +596,9 @@ bool Laminar::tryStartRun(std::shared_ptr run, int queueIndex) { for(auto& sc : contexts) { std::shared_ptr ctx = sc.second; - if(ctx->canQueue(jobContexts.at(run->name)) && run->configure(buildNums[run->name] + 1, ctx, *fsHome)) { + if(ctx->canQueue(jobContexts.at(run->name))) { + kj::Promise onRunFinished = run->start(buildNums[run->name] + 1, ctx, *fsHome,[this](kj::Maybe& pid){return srv.onChildExit(pid);}); + ctx->busyExecutors++; // set the last known result if exists db->stmt("SELECT result FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1") @@ -607,13 +607,20 @@ bool Laminar::tryStartRun(std::shared_ptr run, int queueIndex) { run->lastResult = RunState(result); }); - // Actually schedules the Run steps - kj::Promise exec = handleRunStep(run.get()).then([=]{ - runFinished(run.get()); + kj::Promise exec = srv.readDescriptor(run->output_fd, [this, run](const char*b, size_t n){ + // handle log output + std::string s(b, n); + run->log += s; + http->notifyLog(run->name, run->build, s, false); + }).then([run, p = kj::mv(onRunFinished)]() mutable { + // wait until leader reaped + return kj::mv(p); + }).then([this, run](RunState){ + handleRunFinished(run.get()); }); if(run->timeout > 0) { exec = exec.attach(srv.addTimeout(run->timeout, [r=run.get()](){ - r->abort(true); + r->abort(); })); } srv.addTask(kj::mv(exec)); @@ -657,31 +664,7 @@ void Laminar::assignNewJobs() { } } -kj::Promise Laminar::handleRunStep(Run* run) { - if(run->step()) { - // no more steps - return kj::READY_NOW; - } - - kj::Promise exited = srv.onChildExit(run->current_pid); - // promise is fulfilled when the process is reaped. But first we wait for all - // output from the pipe (Run::output_fd) to be consumed. - return srv.readDescriptor(run->output_fd, [this,run](const char*b,size_t n){ - // handle log output - std::string s(b, n); - run->log += s; - http->notifyLog(run->name, run->build, s, false); - }).then([p = std::move(exited)]() mutable { - // wait until the process is reaped - return kj::mv(p); - }).then([this, run](int status){ - run->reaped(status); - // next step in Run - return handleRunStep(run); - }); -} - -void Laminar::runFinished(Run * r) { +void Laminar::handleRunFinished(Run * r) { std::shared_ptr ctx = r->context; ctx->busyExecutors--; diff --git a/src/laminar.h b/src/laminar.h index febb419..20a37ea 100644 --- a/src/laminar.h +++ b/src/laminar.h @@ -105,8 +105,7 @@ private: bool loadConfiguration(); void assignNewJobs(); bool tryStartRun(std::shared_ptr run, int queueIndex); - kj::Promise handleRunStep(Run *run); - void runFinished(Run*); + void handleRunFinished(Run*); // expects that Json has started an array void populateArtifacts(Json& out, std::string job, uint num) const; diff --git a/src/leader.cpp b/src/leader.cpp new file mode 100644 index 0000000..15e70f4 --- /dev/null +++ b/src/leader.cpp @@ -0,0 +1,295 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "log.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "run.h" + +// short syntax helper for kj::Path +template +inline kj::Path operator/(const kj::Path& p, const T& ext) { + return p.append(ext); +} +template +inline kj::Path operator/(const kj::PathPtr& p, const T& ext) { + return p.append(ext); +} + +struct Script { + kj::Path path; + kj::Path cwd; + bool runOnAbort; +}; + +class Leader final : public kj::TaskSet::ErrorHandler { +public: + Leader(kj::AsyncIoContext& ioContext, kj::Filesystem& fs, const char* jobName, uint runNumber); + RunState run(); + +private: + void taskFailed(kj::Exception&& exception) override; + kj::Promise step(std::queue