job leader process

Implement a separate process, the "leader", which runs all the
scripts for a job run, instead of directly from the main laminard
process. This makes for a cleaner process tree view, where the
owning job for a given script is clear; also the leader process
acts as a subreaper to clean up any wayward descendent processes.

Resolves #78.
pull/113/head
Oliver Giles 4 years ago
parent 304ef797b8
commit 3fde38c6b8

@ -86,14 +86,15 @@ generate_compressed_bins(${CMAKE_BINARY_DIR} js/vue-router.min.js js/vue.min.js
# (see resources.cpp where these are fetched)
set(LAMINARD_CORE_SOURCES
src/conf.cpp
src/database.cpp
src/server.cpp
src/laminar.cpp
src/conf.cpp
src/leader.cpp
src/http.cpp
src/resources.cpp
src/rpc.cpp
src/run.cpp
src/server.cpp
laminar.capnp.c++
index_html_size.h
)
@ -111,7 +112,7 @@ set(BUILD_TESTS FALSE CACHE BOOL "Build tests")
if(BUILD_TESTS)
find_package(GTest REQUIRED)
include_directories(${GTEST_INCLUDE_DIRS} src)
add_executable(laminar-tests ${LAMINARD_CORE_SOURCES} ${COMPRESSED_BINS} test/main.cpp test/laminar-functional.cpp test/unit-conf.cpp test/unit-database.cpp test/unit-run.cpp)
add_executable(laminar-tests ${LAMINARD_CORE_SOURCES} ${COMPRESSED_BINS} test/main.cpp test/laminar-functional.cpp test/unit-conf.cpp test/unit-database.cpp)
target_link_libraries(laminar-tests ${GTEST_LIBRARY} capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z)
endif()

@ -323,8 +323,6 @@ Then in `example.run`
echo $foo # prints "bar"
```
This works because laminarc reads `$JOB` and `$NUM` and passes them to the laminar daemon as part of the `set` request. (It is thus possible to set environment variables on other jobs by overriding these variables, but this is not very useful).
---
# Archiving artefacts

@ -23,6 +23,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define EXIT_BAD_ARGUMENT 1
#define EXIT_OPERATION_FAILED 2
@ -169,21 +170,10 @@ int main(int argc, char** argv) {
fprintf(stderr, "Usage %s set param=value\n", argv[0]);
return EXIT_BAD_ARGUMENT;
}
auto req = laminar.setRequest();
char* eq = strchr(argv[2], '=');
char* job = getenv("JOB");
char* num = getenv("RUN");
if(job && num && eq) {
char* name = argv[2];
*eq++ = '\0';
char* val = eq;
req.getRun().setJob(job);
req.getRun().setBuildNum(atoi(num));
req.getParam().setName(name);
req.getParam().setValue(val);
req.send().wait(waitScope);
if(char* pipeNum = getenv("__LAMINAR_SETENV_PIPE")) {
write(atoi(pipeNum), argv[2], strlen(argv[2]));
} else {
fprintf(stderr, "Missing $JOB or $RUN or param is not in the format key=value\n");
fprintf(stderr, "Must be run from within a laminar job\n");
return EXIT_BAD_ARGUMENT;
}
} else if(strcmp(argv[1], "abort") == 0) {

@ -5,11 +5,10 @@ interface LaminarCi {
queue @0 (jobName :Text, params :List(JobParam)) -> (result :MethodResult);
start @1 (jobName :Text, params :List(JobParam)) -> (result :MethodResult, buildNum :UInt32);
run @2 (jobName :Text, params :List(JobParam)) -> (result :JobResult, buildNum :UInt32);
set @3 (run :Run, param :JobParam) -> (result :MethodResult);
listQueued @4 () -> (result :List(Text));
listRunning @5 () -> (result :List(Run));
listKnown @6 () -> (result :List(Text));
abort @7 (run :Run) -> (result :MethodResult);
listQueued @3 () -> (result :List(Text));
listRunning @4 () -> (result :List(Run));
listKnown @5 () -> (result :List(Text));
abort @6 (run :Run) -> (result :MethodResult);
struct Run {
job @0 :Text;

@ -581,16 +581,14 @@ std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) {
}
bool Laminar::abort(std::string job, uint buildNum) {
if(Run* run = activeRun(job, buildNum)) {
run->abort(true);
return true;
}
if(Run* run = activeRun(job, buildNum))
return run->abort();
return false;
}
void Laminar::abortAll() {
for(std::shared_ptr<Run> run : activeJobs) {
run->abort(false);
run->abort();
}
}
@ -598,7 +596,9 @@ bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
for(auto& sc : contexts) {
std::shared_ptr<Context> ctx = sc.second;
if(ctx->canQueue(jobContexts.at(run->name)) && run->configure(buildNums[run->name] + 1, ctx, *fsHome)) {
if(ctx->canQueue(jobContexts.at(run->name))) {
kj::Promise<RunState> onRunFinished = run->start(buildNums[run->name] + 1, ctx, *fsHome,[this](kj::Maybe<pid_t>& pid){return srv.onChildExit(pid);});
ctx->busyExecutors++;
// set the last known result if exists
db->stmt("SELECT result FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
@ -607,13 +607,20 @@ bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
run->lastResult = RunState(result);
});
// Actually schedules the Run steps
kj::Promise<void> exec = handleRunStep(run.get()).then([=]{
runFinished(run.get());
kj::Promise<void> exec = srv.readDescriptor(run->output_fd, [this, run](const char*b, size_t n){
// handle log output
std::string s(b, n);
run->log += s;
http->notifyLog(run->name, run->build, s, false);
}).then([run, p = kj::mv(onRunFinished)]() mutable {
// wait until leader reaped
return kj::mv(p);
}).then([this, run](RunState){
handleRunFinished(run.get());
});
if(run->timeout > 0) {
exec = exec.attach(srv.addTimeout(run->timeout, [r=run.get()](){
r->abort(true);
r->abort();
}));
}
srv.addTask(kj::mv(exec));
@ -657,31 +664,7 @@ void Laminar::assignNewJobs() {
}
}
kj::Promise<void> Laminar::handleRunStep(Run* run) {
if(run->step()) {
// no more steps
return kj::READY_NOW;
}
kj::Promise<int> exited = srv.onChildExit(run->current_pid);
// promise is fulfilled when the process is reaped. But first we wait for all
// output from the pipe (Run::output_fd) to be consumed.
return srv.readDescriptor(run->output_fd, [this,run](const char*b,size_t n){
// handle log output
std::string s(b, n);
run->log += s;
http->notifyLog(run->name, run->build, s, false);
}).then([p = std::move(exited)]() mutable {
// wait until the process is reaped
return kj::mv(p);
}).then([this, run](int status){
run->reaped(status);
// next step in Run
return handleRunStep(run);
});
}
void Laminar::runFinished(Run * r) {
void Laminar::handleRunFinished(Run * r) {
std::shared_ptr<Context> ctx = r->context;
ctx->busyExecutors--;

@ -105,8 +105,7 @@ private:
bool loadConfiguration();
void assignNewJobs();
bool tryStartRun(std::shared_ptr<Run> run, int queueIndex);
kj::Promise<void> handleRunStep(Run *run);
void runFinished(Run*);
void handleRunFinished(Run*);
// expects that Json has started an array
void populateArtifacts(Json& out, std::string job, uint num) const;

@ -0,0 +1,295 @@
///
/// Copyright 2019 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "log.h"
#include <string>
#include <unistd.h>
#include <queue>
#include <sys/prctl.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <kj/async-io.h>
#include <kj/async-unix.h>
#include <kj/filesystem.h>
#include "run.h"
// short syntax helper for kj::Path
template<typename T>
inline kj::Path operator/(const kj::Path& p, const T& ext) {
return p.append(ext);
}
template<typename T>
inline kj::Path operator/(const kj::PathPtr& p, const T& ext) {
return p.append(ext);
}
struct Script {
kj::Path path;
kj::Path cwd;
bool runOnAbort;
};
class Leader final : public kj::TaskSet::ErrorHandler {
public:
Leader(kj::AsyncIoContext& ioContext, kj::Filesystem& fs, const char* jobName, uint runNumber);
RunState run();
private:
void taskFailed(kj::Exception&& exception) override;
kj::Promise<void> step(std::queue<Script>& scripts);
kj::Promise<void> reapChildProcesses();
kj::Promise<void> readEnvPipe(kj::AsyncInputStream* stream, char* buffer);
kj::TaskSet tasks;
RunState result;
kj::AsyncIoContext& ioContext;
const kj::Directory& home;
kj::PathPtr rootPath;
std::string jobName;
uint runNumber;
pid_t currentGroupId;
pid_t currentScriptPid;
std::queue<Script> scripts;
int setEnvPipe[2];
};
Leader::Leader(kj::AsyncIoContext &ioContext, kj::Filesystem &fs, const char *jobName, uint runNumber) :
tasks(*this),
result(RunState::SUCCESS),
ioContext(ioContext),
home(fs.getCurrent()),
rootPath(fs.getCurrentPath()),
jobName(jobName),
runNumber(runNumber)
{
tasks.add(ioContext.unixEventPort.onSignal(SIGTERM).then([this](siginfo_t) {
while(scripts.size() && (!scripts.front().runOnAbort))
scripts.pop();
// TODO: probably shouldn't do this if we are already in a runOnAbort script
kill(-currentGroupId, SIGTERM);
// TODO: wait a few seconds for exit, then send KILL?
}));
pipe(setEnvPipe);
auto event = ioContext.lowLevelProvider->wrapInputFd(setEnvPipe[0], kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
auto buffer = kj::heapArrayBuilder<char>(1024);
tasks.add(readEnvPipe(event, buffer.asPtr().begin()).attach(kj::mv(event), kj::mv(buffer)));
}
RunState Leader::run()
{
kj::Path cfgDir{"cfg"};
// create the run directory
kj::Path rd{"run",jobName,std::to_string(runNumber)};
bool createWorkdir = true;
KJ_IF_MAYBE(ls, home.tryLstat(rd)) {
LASSERT(ls->type == kj::FsNode::Type::DIRECTORY);
LLOG(WARNING, "Working directory already exists, removing", rd.toString());
if(home.tryRemove(rd) == false) {
LLOG(WARNING, "Failed to remove working directory");
createWorkdir = false;
}
}
if(createWorkdir && home.tryOpenSubdir(rd, kj::WriteMode::CREATE|kj::WriteMode::CREATE_PARENT) == nullptr) {
LLOG(ERROR, "Could not create working directory", rd.toString());
return RunState::FAILED;
}
// create an archive directory
kj::Path archive = kj::Path{"archive",jobName,std::to_string(runNumber)};
if(home.exists(archive)) {
LLOG(WARNING, "Archive directory already exists", archive.toString());
} else if(home.tryOpenSubdir(archive, kj::WriteMode::CREATE|kj::WriteMode::CREATE_PARENT) == nullptr) {
LLOG(ERROR, "Could not create archive directory", archive.toString());
return RunState::FAILED;
}
// create a workspace for this job if it doesn't exist
kj::Path ws{"run",jobName,"workspace"};
if(!home.exists(ws)) {
home.openSubdir(ws, kj::WriteMode::CREATE|kj::WriteMode::CREATE_PARENT);
// prepend the workspace init script
if(home.exists(cfgDir/"jobs"/(jobName+".init")))
scripts.push({cfgDir/"jobs"/(jobName+".init"), kj::mv(ws), false});
}
// add scripts
// global before-run script
if(home.exists(cfgDir/"before"))
scripts.push({cfgDir/"before", rd.clone(), false});
// job before-run script
if(home.exists(cfgDir/"jobs"/(jobName+".before")))
scripts.push({cfgDir/"jobs"/(jobName+".before"), rd.clone(), false});
// main run script. must exist.
scripts.push({cfgDir/"jobs"/(jobName+".run"), rd.clone(), false});
// job after-run script
if(home.exists(cfgDir/"jobs"/(jobName+".after")))
scripts.push({cfgDir/"jobs"/(jobName+".after"), rd.clone(), true});
// global after-run script
if(home.exists(cfgDir/"after"))
scripts.push({cfgDir/"after", rd.clone(), true});
// Start executing scripts
return step(scripts).then([this](){
return result;
}).wait(ioContext.waitScope);
}
void Leader::taskFailed(kj::Exception &&exception)
{
LLOG(ERROR, exception);
}
kj::Promise<void> Leader::step(std::queue<Script> &scripts)
{
if(scripts.empty())
return kj::READY_NOW;
Script currentScript = kj::mv(scripts.front());
scripts.pop();
pid_t pid = fork();
if(pid == 0) { // child
// unblock all signals
sigset_t mask;
sigfillset(&mask);
sigprocmask(SIG_UNBLOCK, &mask, nullptr);
// create a new process group to help us deal with any wayward forks
setpgid(0, 0);
std::string buildNum = std::to_string(runNumber);
LSYSCALL(chdir(currentScript.cwd.toString(false).cStr()));
setenv("RESULT", to_string(result).c_str(), true);
// pass the pipe through a variable to allow laminarc to send new env back
char pipeNum[4];
sprintf(pipeNum, "%d", setEnvPipe[1]);
setenv("__LAMINAR_SETENV_PIPE", pipeNum, 1);
fprintf(stderr, "[laminar] Executing %s\n", currentScript.path.toString().cStr());
kj::String execPath = (rootPath/currentScript.path).toString(true);
execl(execPath.cStr(), execPath.cStr(), NULL);
fprintf(stderr, "[laminar] Failed to execute %s\n", currentScript.path.toString().cStr());
_exit(1);
}
currentScriptPid = pid;
currentGroupId = pid;
return reapChildProcesses().then([&](){
return step(scripts);
});
}
kj::Promise<void> Leader::reapChildProcesses()
{
return ioContext.unixEventPort.onSignal(SIGCHLD).then([this](siginfo_t) -> kj::Promise<void> {
while(true) {
int status;
errno = 0;
pid_t pid = waitpid(-1, &status, WNOHANG);
if(pid == -1 && errno == ECHILD) {
// all children exited
return kj::READY_NOW;
} else if(pid == 0) {
// child processes are still running
if(currentScriptPid) {
// We could get here if a more deeply nested process was reparented to us
// before the primary script executed. Quietly wait until the process we're
// waiting for is done
return reapChildProcesses();
}
// Otherwise, reparented orphans are on borrowed time
// TODO list wayward processes?
fprintf(stderr, "[laminar] sending SIGHUP to adopted child processes\n");
kill(-currentGroupId, SIGHUP);
return ioContext.provider->getTimer().afterDelay(5*kj::SECONDS).then([this]{
fprintf(stderr, "[laminar] sending SIGKILL to process group %d\n", currentGroupId);
// TODO: should we mark the job as failed if we had to kill reparented processes?
kill(-currentGroupId, SIGKILL);
return reapChildProcesses();
}).exclusiveJoin(reapChildProcesses());
} else if(pid == currentScriptPid) {
// the script we were waiting for is done
// if we already marked as failed, preserve that
if(result == RunState::SUCCESS) {
if(WIFSIGNALED(status) && (WTERMSIG(status) == SIGTERM || WTERMSIG(status) == SIGKILL))
result = RunState::ABORTED;
else if(WEXITSTATUS(status) != 0)
result = RunState::FAILED;
}
currentScriptPid = 0;
} else {
// some reparented process was reaped
}
}
});
}
kj::Promise<void> Leader::readEnvPipe(kj::AsyncInputStream *stream, char *buffer) {
return stream->tryRead(buffer, 1, 1024).then([this,stream,buffer](size_t sz) {
if(sz > 0) {
buffer[sz] = '\0';
if(char* eq = strchr(buffer, '=')) {
*eq++ = '\0';
setenv(buffer, eq, 1);
}
return readEnvPipe(stream, kj::mv(buffer));
}
return kj::Promise<void>(kj::READY_NOW);
});
}
int leader_main(void) {
auto ioContext = kj::setupAsyncIo();
auto fs = kj::newDiskFilesystem();
kj::UnixEventPort::captureSignal(SIGTERM);
// Don't use captureChildExit or onChildExit because they don't provide a way to
// reap orphaned child processes. Stick with the more fundamental onSignal.
kj::UnixEventPort::captureSignal(SIGCHLD);
// Becoming a subreaper means any descendent process whose parent process disappears
// will be reparented to this one instead of init (or higher layer subreaper).
// We do this so that the run will wait until all descedents exit before executing
// the next step.
prctl(PR_SET_CHILD_SUBREAPER, 1, NULL, NULL, NULL);
// Become the leader of a new process group. This is so that all child processes
// will also get a kill signal when the run is aborted
setpgid(0, 0);
// Environment inherited from main laminard process
const char* jobName = getenv("JOB");
std::string name(jobName);
uint runNumber = atoi(getenv("RUN"));
if(!jobName || !runNumber)
return EXIT_FAILURE;
Leader leader(ioContext, *fs, jobName, runNumber);
// Parent process will cast back to RunState
return int(leader.run());
}

@ -0,0 +1,36 @@
///
/// Copyright 2019 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef LAMINAR_LEADER_H_
#define LAMINAR_LEADER_H_
// Main function for the leader process which is responsible for
// executing all the scripts which make up a Run. Separating this
// into its own process allows for a cleaner process tree view,
// where it's obvious which script belongs to which run of which
// job, and allows this leader process to act as a subreaper for
// any wayward child processes.
// This could have been implemented as a separate process, but
// instead we just fork & exec /proc/self/exe from the main laminar
// daemon, and distinguish based on argv[0]. This saves installing
// another binary and avoids some associated pitfalls.
int leader_main(void);
#endif // LAMINAR_LEADER_H_

@ -17,6 +17,7 @@
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "laminar.h"
#include "leader.h"
#include "server.h"
#include "log.h"
#include <signal.h>
@ -40,9 +41,10 @@ constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080";
constexpr const char* ARCHIVE_URL_DEFAULT = "/archive/";
}
int main(int argc, char** argv) {
if(argv[0][0] == '{')
return leader_main();
for(int i = 1; i < argc; ++i) {
if(strcmp(argv[i], "-v") == 0) {
kj::_::Debug::setLogLevel(kj::_::Debug::Severity::INFO);

@ -81,10 +81,10 @@ public:
std::string jobName = context.getParams().getJobName();
LLOG(INFO, "RPC run", jobName);
std::shared_ptr<Run> run = laminar.queueJob(jobName, params(context.getParams().getParams()));
if(Run* r = run.get()) {
return r->whenFinished().then([context,r](RunState state) mutable {
if(run) {
return run->whenFinished().then([context,run](RunState state) mutable {
context.getResults().setResult(fromRunState(state));
context.getResults().setBuildNum(r->build);
context.getResults().setBuildNum(run->build);
});
} else {
context.getResults().setResult(LaminarCi::JobResult::UNKNOWN);
@ -92,20 +92,6 @@ public:
}
}
// Set a parameter on a running build
kj::Promise<void> set(SetContext context) override {
std::string jobName = context.getParams().getRun().getJob();
uint buildNum = context.getParams().getRun().getBuildNum();
LLOG(INFO, "RPC set", jobName, buildNum);
LaminarCi::MethodResult result = laminar.setParam(jobName, buildNum,
context.getParams().getParam().getName(), context.getParams().getParam().getValue())
? LaminarCi::MethodResult::SUCCESS
: LaminarCi::MethodResult::FAILED;
context.getResults().setResult(result);
return kj::READY_NOW;
}
// List jobs in queue
kj::Promise<void> listQueued(ListQueuedContext context) override {
const std::list<std::shared_ptr<Run>>& queue = laminar.listQueuedJobs();

@ -52,7 +52,9 @@ Run::Run(std::string name, ParamMap pm, kj::Path&& rootPath) :
queuedAt(time(nullptr)),
rootPath(kj::mv(rootPath)),
started(kj::newPromiseAndFulfiller<void>()),
finished(kj::newPromiseAndFulfiller<RunState>())
startedFork(started.promise.fork()),
finished(kj::newPromiseAndFulfiller<RunState>()),
finishedFork(finished.promise.fork())
{
for(auto it = params.begin(); it != params.end();) {
if(it->first[0] == '=') {
@ -75,113 +77,53 @@ Run::~Run() {
LLOG(INFO, "Run destroyed");
}
bool Run::configure(uint buildNum, std::shared_ptr<Context> nd, const kj::Directory& fsHome)
{
kj::Path cfgDir{"cfg"};
// create the run directory
kj::Path rd{"run",name,std::to_string(buildNum)};
bool createWorkdir = true;
KJ_IF_MAYBE(ls, fsHome.tryLstat(rd)) {
LASSERT(ls->type == kj::FsNode::Type::DIRECTORY);
LLOG(WARNING, "Working directory already exists, removing", rd.toString());
if(fsHome.tryRemove(rd) == false) {
LLOG(WARNING, "Failed to remove working directory");
createWorkdir = false;
}
static void setEnvFromFile(const kj::Path& rootPath, kj::Path file) {
StringMap vars = parseConfFile((rootPath/file).toString(true).cStr());
for(auto& it : vars) {
setenv(it.first.c_str(), it.second.c_str(), true);
}
if(createWorkdir && fsHome.tryOpenSubdir(rd, kj::WriteMode::CREATE|kj::WriteMode::CREATE_PARENT) == nullptr) {
LLOG(ERROR, "Could not create working directory", rd.toString());
return false;
}
// create an archive directory
kj::Path archive = kj::Path{"archive",name,std::to_string(buildNum)};
if(fsHome.exists(archive)) {
LLOG(WARNING, "Archive directory already exists", archive.toString());
} else if(fsHome.tryOpenSubdir(archive, kj::WriteMode::CREATE|kj::WriteMode::CREATE_PARENT) == nullptr) {
LLOG(ERROR, "Could not create archive directory", archive.toString());
return false;
}
// create a workspace for this job if it doesn't exist
kj::Path ws{"run",name,"workspace"};
if(!fsHome.exists(ws)) {
fsHome.openSubdir(ws, kj::WriteMode::CREATE|kj::WriteMode::CREATE_PARENT);
// prepend the workspace init script
if(fsHome.exists(cfgDir/"jobs"/(name+".init")))
addScript(cfgDir/"jobs"/(name+".init"), kj::mv(ws));
}
// add scripts
// global before-run script
if(fsHome.exists(cfgDir/"before"))
addScript(cfgDir/"before", rd.clone());
// job before-run script
if(fsHome.exists(cfgDir/"jobs"/(name+".before")))
addScript(cfgDir/"jobs"/(name+".before"), rd.clone());
// main run script. must exist.
addScript(cfgDir/"jobs"/(name+".run"), rd.clone());
// job after-run script
if(fsHome.exists(cfgDir/"jobs"/(name+".after")))
addScript(cfgDir/"jobs"/(name+".after"), rd.clone(), true);
// global after-run script
if(fsHome.exists(cfgDir/"after"))
addScript(cfgDir/"after", rd.clone(), true);
}
// add environment files
if(fsHome.exists(cfgDir/"env"))
addEnv(cfgDir/"env");
if(fsHome.exists(cfgDir/"contexts"/(nd->name+".env")))
addEnv(cfgDir/"contexts"/(nd->name+".env"));
if(fsHome.exists(cfgDir/"jobs"/(name+".env")))
addEnv(cfgDir/"jobs"/(name+".env"));
kj::Promise<RunState> Run::start(uint buildNum, std::shared_ptr<Context> ctx, const kj::Directory &fsHome, std::function<kj::Promise<int>(kj::Maybe<pid_t>&)> getPromise)
{
kj::Path cfgDir{"cfg"};
// add job timeout if specified
if(fsHome.exists(cfgDir/"jobs"/(name+".conf"))) {
timeout = parseConfFile((rootPath/cfgDir/"jobs"/(name+".conf")).toString(true).cStr()).get<int>("TIMEOUT", 0);
}
// All good, we've "started"
startedAt = time(nullptr);
build = buildNum;
context = nd;
// notifies the rpc client if the start command was used
started.fulfiller->fulfill();
return true;
}
std::string Run::reason() const {
return reasonMsg;
}
bool Run::step() {
if(!scripts.size())
return true;
Script currentScript = kj::mv(scripts.front());
scripts.pop();
int pfd[2];
pipe(pfd);
pid_t pid = fork();
if(pid == 0) { // child
// reset signal mask (SIGCHLD blocked in Laminar::start)
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGCHLD);
sigprocmask(SIG_UNBLOCK, &mask, nullptr);
int plog[2];
LSYSCALL(pipe(plog));
// Fork a process leader to run all the steps of the job. This gives us a nice
// process tree output (job name and number as the process name) and helps
// contain any wayward descendent processes.
pid_t leader;
LSYSCALL(leader = fork());
if(leader == 0) {
// All output from this process will be captured in the plog pipe
close(plog[0]);
dup2(plog[1], STDOUT_FILENO);
dup2(plog[1], STDERR_FILENO);
close(plog[1]);
// All initial/fixed env vars can be set here. Dynamic ones, including
// "RESULT" and any set by `laminarc set` have to be handled in the subprocess.
// add environment files
if(fsHome.exists(cfgDir/"env"))
setEnvFromFile(rootPath, cfgDir/"env");
if(fsHome.exists(cfgDir/"contexts"/(ctx->name+".env")))
setEnvFromFile(rootPath, cfgDir/"contexts"/(ctx->name+".env"));
if(fsHome.exists(cfgDir/"jobs"/(name+".env")))
setEnvFromFile(rootPath, cfgDir/"jobs"/(name+".env"));
// set pgid == pid for easy killing on abort
setpgid(0, 0);
close(pfd[0]);
dup2(pfd[1], 1);
dup2(pfd[1], 2);
close(pfd[1]);
std::string buildNum = std::to_string(build);
// parameterized vars
for(auto& pair : params) {
setenv(pair.first.c_str(), pair.second.c_str(), false);
}
std::string PATH = (rootPath/"cfg"/"scripts").toString(true).cStr();
if(const char* p = getenv("PATH")) {
@ -189,72 +131,62 @@ bool Run::step() {
PATH.append(p);
}
LSYSCALL(chdir((rootPath/currentScript.cwd).toString(true).cStr()));
// conf file env vars
for(kj::Path& file : env) {
StringMap vars = parseConfFile((rootPath/file).toString(true).cStr());
for(auto& it : vars) {
setenv(it.first.c_str(), it.second.c_str(), true);
}
}
// parameterized vars
for(auto& pair : params) {
setenv(pair.first.c_str(), pair.second.c_str(), false);
}
std::string runNumStr = std::to_string(buildNum);
setenv("PATH", PATH.c_str(), true);
setenv("RUN", buildNum.c_str(), true);
setenv("RUN", runNumStr.c_str(), true);
setenv("JOB", name.c_str(), true);
setenv("CONTEXT", context->name.c_str(), true);
setenv("RESULT", to_string(result).c_str(), true);
setenv("CONTEXT", ctx->name.c_str(), true);
setenv("LAST_RESULT", to_string(lastResult).c_str(), true);
setenv("WORKSPACE", (rootPath/"run"/name/"workspace").toString(true).cStr(), true);
setenv("ARCHIVE", (rootPath/"archive"/name/buildNum).toString(true).cStr(), true);
fprintf(stderr, "[laminar] Executing %s\n", currentScript.path.toString().cStr());
kj::String execPath = (rootPath/currentScript.path).toString(true);
execl(execPath.cStr(), execPath.cStr(), NULL);
// cannot use LLOG because stdout/stderr are captured
fprintf(stderr, "[laminar] Failed to execute %s\n", currentScript.path.toString().cStr());
_exit(1);
setenv("ARCHIVE", (rootPath/"archive"/name/runNumStr).toString(true).cStr(), true);
// RESULT set in leader process
// leader process assumes $LAMINAR_HOME as CWD
LSYSCALL(chdir(rootPath.toString(true).cStr()));
setenv("PWD", rootPath.toString(true).cStr(), 1);
// We could just fork/wait over all the steps here directly, but then we
// can't set a nice name for the process tree. There is pthread_setname_np,
// but it's limited to 16 characters, which most of the time probably isn't
// enough. Instead, we'll just exec ourselves and handle that in laminard's
// main() by calling leader_main()
char* procName;
asprintf(&procName, "{laminar} %s:%d", name.data(), buildNum);
execl("/proc/self/exe", procName, NULL); // does not return
_exit(EXIT_FAILURE);
}
LLOG(INFO, "Forked", currentScript.path, currentScript.cwd, pid);
close(pfd[1]);
// All good, we've "started"
startedAt = time(nullptr);
build = buildNum;
context = ctx;
current_pid = pid;
output_fd = pfd[0];
return false;
}
output_fd = plog[0];
close(plog[1]);
pid = leader;
void Run::addScript(kj::Path scriptPath, kj::Path scriptWorkingDir, bool runOnAbort) {
scripts.push({kj::mv(scriptPath), kj::mv(scriptWorkingDir), runOnAbort});
// notifies the rpc client if the start command was used
started.fulfiller->fulfill();
return getPromise(pid).then([this](int status){
// The leader process passes a RunState through the return value.
// Check it didn't die abnormally, then cast to get it back.
result = WIFEXITED(status) ? RunState(WEXITSTATUS(status)) : RunState::ABORTED;
finished.fulfiller->fulfill(RunState(result));
return result;
});
}
void Run::addEnv(kj::Path path) {
env.push_back(kj::mv(path));
std::string Run::reason() const {
return reasonMsg;
}
void Run::abort(bool respectRunOnAbort) {
while(scripts.size() && (!respectRunOnAbort || !scripts.front().runOnAbort))
scripts.pop();
bool Run::abort() {
// if the Maybe is empty, wait() was already called on this process
KJ_IF_MAYBE(p, current_pid) {
KJ_IF_MAYBE(p, pid) {
kill(-*p, SIGTERM);
return true;
}
}
void Run::reaped(int status) {
// once state is non-success it cannot change again
if(result != RunState::SUCCESS)
return;
if(WIFSIGNALED(status) && (WTERMSIG(status) == SIGTERM || WTERMSIG(status) == SIGKILL))
result = RunState::ABORTED;
else if(status != 0)
result = RunState::FAILED;
// otherwise preserve earlier status
finished.fulfiller->fulfill(RunState(result));
return false;
}

@ -57,24 +57,15 @@ public:
Run(const Run&) = delete;
Run& operator=(const Run&) = delete;
// Call this to "start" the run with a specific number and context
bool configure(uint buildNum, std::shared_ptr<Context> context, const kj::Directory &fsHome);
// executes the next script (if any), returning true if there is nothing
// more to be done.
bool step();
kj::Promise<RunState> start(uint buildNum, std::shared_ptr<Context> ctx, const kj::Directory &fsHome, std::function<kj::Promise<int>(kj::Maybe<pid_t>&)> getPromise);
// aborts this run
void abort(bool respectRunOnAbort);
// called when a process owned by this run has been reaped. The status
// may be used to set the run's job status
void reaped(int status);
bool abort();
std::string reason() const;
kj::Promise<void>&& whenStarted() { return kj::mv(started.promise); }
kj::Promise<RunState>&& whenFinished() { return kj::mv(finished.promise); }
kj::Promise<void> whenStarted() { return startedFork.addBranch(); }
kj::Promise<RunState> whenFinished() { return finishedFork.addBranch(); }
std::shared_ptr<Context> context;
RunState result;
@ -84,7 +75,7 @@ public:
int parentBuild = 0;
uint build = 0;
std::string log;
kj::Maybe<pid_t> current_pid;
kj::Maybe<pid_t> pid;
int output_fd;
std::unordered_map<std::string, std::string> params;
int timeout = 0;
@ -105,14 +96,14 @@ private:
};
kj::Path rootPath;
std::queue<Script> scripts;
std::list<kj::Path> env;
std::string reasonMsg;
kj::PromiseFulfillerPair<void> started;
kj::ForkedPromise<void> startedFork;
kj::PromiseFulfillerPair<RunState> finished;
kj::ForkedPromise<RunState> finishedFork;
};
// All this below is a somewhat overengineered method of keeping track of
// currently executing builds (Run objects). This would probably scale
// very well, but it's completely gratuitous since we are not likely to

@ -26,6 +26,7 @@
#include "tempdir.h"
#include "laminar.h"
#include "server.h"
#include "conf.h"
class LaminarFixture : public ::testing::Test {
public:
@ -38,7 +39,7 @@ public:
settings.bind_rpc = bind_rpc.c_str();
settings.bind_http = bind_http.c_str();
settings.archive_url = "/test-archive/";
server = new Server(ioContext);
server = new Server(*ioContext);
laminar = new Laminar(*server, settings);
}
~LaminarFixture() noexcept(true) {
@ -47,7 +48,7 @@ public:
}
kj::Own<EventSource> eventSource(const char* path) {
return kj::heap<EventSource>(ioContext, bind_http.c_str(), path);
return kj::heap<EventSource>(*ioContext, bind_http.c_str(), path);
}
void defineJob(const char* name, const char* scriptContent) {
@ -57,9 +58,61 @@ public:
}
}
struct RunExec {
LaminarCi::JobResult result;
kj::String log;
};
RunExec runJob(const char* name, kj::Maybe<StringMap> params = nullptr) {
auto req = client().runRequest();
req.setJobName(name);
KJ_IF_MAYBE(p, params) {
auto params = req.initParams(p->size());
int i = 0;
for(auto kv : *p) {
params[i].setName(kv.first);
params[i].setValue(kv.second);
i++;
}
}
auto res = req.send().wait(ioContext->waitScope);
std::string path = std::string{"/log/"} + name + "/" + std::to_string(res.getBuildNum());
kj::HttpHeaderTable headerTable;
kj::String log = kj::newHttpClient(ioContext->lowLevelProvider->getTimer(), headerTable,
*ioContext->provider->getNetwork().parseAddress(bind_http.c_str()).wait(ioContext->waitScope))
->request(kj::HttpMethod::GET, path, kj::HttpHeaders(headerTable)).response.wait(ioContext->waitScope).body
->readAllText().wait(ioContext->waitScope);
return { res.getResult(), kj::mv(log) };
}
kj::String stripLaminarLogLines(const kj::String& str) {
auto out = kj::heapString(str.size());
char *o = out.begin();
for(const char *p = str.cStr(), *e = p + str.size(); p < e;) {
const char *nl = strchrnul(p, '\n');
if(!kj::StringPtr{p}.startsWith("[laminar]")) {
memcpy(o, p, nl - p + 1);
o += nl - p + 1;
}
p = nl + 1;
}
*o = '\0';
return out;
}
StringMap parseFromString(kj::StringPtr content) {
char tmp[16] = "/tmp/lt.XXXXXX";
int fd = mkstemp(tmp);
write(fd, content.begin(), content.size());
close(fd);
StringMap map = parseConfFile(tmp);
unlink(tmp);
return map;
}
LaminarCi::Client client() {
if(!rpc) {
auto stream = ioContext.provider->getNetwork().parseAddress(bind_rpc).wait(ioContext.waitScope)->connect().wait(ioContext.waitScope);
auto stream = ioContext->provider->getNetwork().parseAddress(bind_rpc).wait(ioContext->waitScope)->connect().wait(ioContext->waitScope);
auto net = kj::heap<capnp::TwoPartyVatNetwork>(*stream, capnp::rpc::twoparty::Side::CLIENT);
rpc = kj::heap<capnp::RpcSystem<capnp::rpc::twoparty::VatId>>(*net, nullptr).attach(kj::mv(net), kj::mv(stream));
}
@ -76,7 +129,7 @@ public:
Settings settings;
Server* server;
Laminar* laminar;
static kj::AsyncIoContext ioContext;
static kj::AsyncIoContext* ioContext;
};
#endif // LAMINAR_FIXTURE_H_

@ -18,13 +18,14 @@
///
#include <kj/async-unix.h>
#include "laminar-fixture.h"
#include "conf.h"
// TODO: consider handling this differently
kj::AsyncIoContext LaminarFixture::ioContext = kj::setupAsyncIo();
kj::AsyncIoContext* LaminarFixture::ioContext;
TEST_F(LaminarFixture, EmptyStatusMessageStructure) {
auto es = eventSource("/");
ioContext.waitScope.poll();
ioContext->waitScope.poll();
ASSERT_EQ(1, es->messages().size());
auto json = es->messages().front().GetObject();
@ -51,12 +52,7 @@ TEST_F(LaminarFixture, JobNotifyHomePage) {
defineJob("foo", "true");
auto es = eventSource("/");
auto req = client().runRequest();
req.setJobName("foo");
ASSERT_EQ(LaminarCi::JobResult::SUCCESS, req.send().wait(ioContext.waitScope).getResult());
// wait for job completed
ioContext.waitScope.poll();
runJob("foo");
ASSERT_EQ(4, es->messages().size());
@ -84,13 +80,8 @@ TEST_F(LaminarFixture, OnlyRelevantNotifications) {
auto es1Run = eventSource("/jobs/job1/1");
auto es2Run = eventSource("/jobs/job2/1");
auto req1 = client().runRequest();
req1.setJobName("job1");
ASSERT_EQ(LaminarCi::JobResult::SUCCESS, req1.send().wait(ioContext.waitScope).getResult());
auto req2 = client().runRequest();
req2.setJobName("job2");
ASSERT_EQ(LaminarCi::JobResult::SUCCESS, req2.send().wait(ioContext.waitScope).getResult());
ioContext.waitScope.poll();
runJob("job1");
runJob("job2");
EXPECT_EQ(7, esHome->messages().size());
EXPECT_EQ(7, esJobs->messages().size());
@ -101,3 +92,62 @@ TEST_F(LaminarFixture, OnlyRelevantNotifications) {
EXPECT_EQ(4, es1Run->messages().size());
EXPECT_EQ(4, es2Run->messages().size());
}
TEST_F(LaminarFixture, FailedStatus) {
defineJob("job1", "false");
auto run = runJob("job1");
ASSERT_EQ(LaminarCi::JobResult::FAILED, run.result);
}
TEST_F(LaminarFixture, WorkingDirectory) {
defineJob("job1", "pwd");
auto run = runJob("job1");
ASSERT_EQ(LaminarCi::JobResult::SUCCESS, run.result);
std::string cwd{tmp.path.append(kj::Path{"run","job1","1"}).toString(true).cStr()};
EXPECT_EQ(cwd + "\n", stripLaminarLogLines(run.log).cStr());
}
TEST_F(LaminarFixture, Environment) {
defineJob("foo", "env");
auto run = runJob("foo");
std::string ws{tmp.path.append(kj::Path{"run","foo","workspace"}).toString(true).cStr()};
std::string archive{tmp.path.append(kj::Path{"archive","foo","1"}).toString(true).cStr()};
StringMap map = parseFromString(run.log);
EXPECT_EQ("1", map["RUN"]);
EXPECT_EQ("foo", map["JOB"]);
EXPECT_EQ("success", map["RESULT"]);
EXPECT_EQ("unknown", map["LAST_RESULT"]);
EXPECT_EQ(ws, map["WORKSPACE"]);
EXPECT_EQ(archive, map["ARCHIVE"]);
}
TEST_F(LaminarFixture, ParamsToEnv) {
defineJob("foo", "env");
StringMap params;
params["foo"] = "bar";
auto run = runJob("foo", params);
StringMap map = parseFromString(run.log);
EXPECT_EQ("bar", map["foo"]);
}
TEST_F(LaminarFixture, Abort) {
defineJob("job1", "yes");
auto req = client().runRequest();
req.setJobName("job1");
auto res = req.send();
// There isn't a nice way of knowing when the leader process is ready to
// handle SIGTERM. Just wait until it prints something to the log
ioContext->waitScope.poll();
kj::HttpHeaderTable headerTable;
char _;
kj::newHttpClient(ioContext->lowLevelProvider->getTimer(), headerTable,
*ioContext->provider->getNetwork().parseAddress(bind_http.c_str()).wait(ioContext->waitScope))
->request(kj::HttpMethod::GET, "/log/job1/1", kj::HttpHeaders(headerTable)).response.wait(ioContext->waitScope).body
->tryRead(&_, 1, 1).wait(ioContext->waitScope);
// now it should be ready to abort
ASSERT_TRUE(laminar->abort("job1", 1));
EXPECT_EQ(LaminarCi::JobResult::ABORTED, res.wait(ioContext->waitScope).getResult());
}

@ -18,10 +18,22 @@
///
#include <kj/async-unix.h>
#include <gtest/gtest.h>
#include <kj/debug.h>
// gtest main supplied in order to call captureChildExit
#include "laminar-fixture.h"
#include "leader.h"
// gtest main supplied in order to call captureChildExit and handle process leader
int main(int argc, char **argv) {
if(argv[0][0] == '{')
return leader_main();
// TODO: consider handling this differently
auto ioContext = kj::setupAsyncIo();
LaminarFixture::ioContext = &ioContext;
kj::UnixEventPort::captureChildExit();
//kj::_::Debug::setLogLevel(kj::_::Debug::Severity::INFO);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();

@ -1,136 +0,0 @@
///
/// Copyright 2018 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include <gtest/gtest.h>
#include "run.h"
#include "log.h"
#include "context.h"
#include "conf.h"
#include "tempdir.h"
class RunTest : public testing::Test {
protected:
RunTest() :
testing::Test(),
context(std::make_shared<Context>()),
tmp(),
run("foo", ParamMap{}, tmp.path.clone())
{
}
~RunTest() noexcept {}
void wait() {
int state = -1;
waitpid(run.current_pid.orDefault(0), &state, 0);
run.reaped(state);
}
void runAll() {
while(!run.step())
wait();
}
std::string readAllOutput() {
std::string res;
char tmp[64];
for(ssize_t n = read(run.output_fd, tmp, 64); n > 0; n = read(run.output_fd, tmp, 64))
res += std::string(tmp, n);
// strip the first "[laminar] executing.. line
return strchr(res.c_str(), '\n') + 1;
}
StringMap parseFromString(std::string content) {
char tmp[16] = "/tmp/lt.XXXXXX";
int fd = mkstemp(tmp);
write(fd, content.data(), content.size());
close(fd);
StringMap map = parseConfFile(tmp);
unlink(tmp);
return map;
}
std::shared_ptr<Context> context;
TempDir tmp;
class Run run;
void setRunLink(const char * path) {
KJ_IF_MAYBE(f, tmp.fs->tryOpenFile(kj::Path{"cfg", "jobs", run.name + ".run"},
kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT | kj::WriteMode::EXECUTABLE)) {
(f->get())->writeAll(std::string("#!/bin/sh\nexec ") + path + "\n");
}
}
};
TEST_F(RunTest, WorkingDirectory) {
setRunLink("pwd");
run.configure(1, context, *tmp.fs);
runAll();
std::string cwd{tmp.path.append(kj::Path{"run","foo","1"}).toString(true).cStr()};
EXPECT_EQ(cwd + "\n", readAllOutput());
}
TEST_F(RunTest, SuccessStatus) {
setRunLink("true");
run.configure(1, context, *tmp.fs);
runAll();
EXPECT_EQ(RunState::SUCCESS, run.result);
}
TEST_F(RunTest, FailedStatus) {
setRunLink("false");
run.configure(1, context, *tmp.fs);
runAll();
EXPECT_EQ(RunState::FAILED, run.result);
}
TEST_F(RunTest, Environment) {
setRunLink("env");
run.configure(1234, context, *tmp.fs);
runAll();
std::string ws{tmp.path.append(kj::Path{"run","foo","workspace"}).toString(true).cStr()};
std::string archive{tmp.path.append(kj::Path{"archive","foo","1234"}).toString(true).cStr()};
StringMap map = parseFromString(readAllOutput());
EXPECT_EQ("1234", map["RUN"]);
EXPECT_EQ("foo", map["JOB"]);
EXPECT_EQ("success", map["RESULT"]);
EXPECT_EQ("unknown", map["LAST_RESULT"]);
EXPECT_EQ(ws, map["WORKSPACE"]);
EXPECT_EQ(archive, map["ARCHIVE"]);
}
TEST_F(RunTest, ParamsToEnv) {
setRunLink("env");
run.params["foo"] = "bar";
run.configure(1, context, *tmp.fs);
runAll();
StringMap map = parseFromString(readAllOutput());
EXPECT_EQ("bar", map["foo"]);
}
TEST_F(RunTest, Abort) {
setRunLink("yes");
run.configure(1, context, *tmp.fs);
run.step();
usleep(200); // TODO fix
run.abort(false);
wait();
EXPECT_EQ(RunState::ABORTED, run.result);
}
Loading…
Cancel
Save