///
/// Copyright 2016 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see
///
#include "laminar.h"
#include "server.h"
#include "conf.h"
#include "log.h"
#include
#include
#include
#include
#include
namespace fs = boost::filesystem;
#define COMPRESS_LOG_MIN_SIZE 1024
#include
#include
// rapidjson::Writer with a StringBuffer is used a lot in Laminar for
// preparing JSON messages to send to Websocket clients. A small wrapper
// class here reduces verbosity later for this common use case.
class Json : public rapidjson::Writer {
public:
Json() : rapidjson::Writer(buf) { StartObject(); }
template
Json& set(const char* key, T value);
Json& startObject(const char* key) { String(key); StartObject(); return *this; }
Json& startArray(const char* key) { String(key); StartArray(); return *this; }
const char* str() { EndObject(); return buf.GetString(); }
private:
rapidjson::StringBuffer buf;
};
template<> Json& Json::set(const char* key, const char* value) { String(key); String(value); return *this; }
template<> Json& Json::set(const char* key, std::string value) { String(key); String(value.c_str()); return *this; }
template<> Json& Json::set(const char* key, int value) { String(key); Int(value); return *this; }
template<> Json& Json::set(const char* key, time_t value) { String(key); Int64(value); return *this; }
namespace {
// Default values when none were supplied in $LAMINAR_CONF_FILE (/etc/laminar.conf)
constexpr const char* INTADDR_RPC_DEFAULT = "unix-abstract:laminar";
constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080";
constexpr const char* ARCHIVE_URL_DEFAULT = "/archive";
}
// helper for appending to boost::filesystem::path
fs::path operator+(fs::path p, const char* ext) {
std::string leaf = p.leaf().string();
leaf += ext;
return p.remove_leaf()/leaf;
}
typedef std::string str;
Laminar::Laminar() {
archiveUrl = ARCHIVE_URL_DEFAULT;
if(char* envArchive = getenv("LAMINAR_ARCHIVE_URL"))
archiveUrl = envArchive;
eraseWorkdir = true;
homeDir = getenv("LAMINAR_HOME") ?: "/var/lib/laminar";
db = new Database((fs::path(homeDir)/"laminar.sqlite").string().c_str());
// Prepare database for first use
// TODO: error handling
db->exec("CREATE TABLE IF NOT EXISTS builds("
"name TEXT, number INT UNSIGNED, node TEXT, queuedAt INT, "
"startedAt INT, completedAt INT, result INT, output TEXT, "
"outputLen INT, parentJob TEXT, parentBuild INT, reason TEXT, "
"PRIMARY KEY (name, number))");
db->exec("CREATE INDEX IF NOT EXISTS idx_completion_time ON builds("
"completedAt DESC)");
// retrieve the last build numbers
db->stmt("SELECT name, MAX(number) FROM builds GROUP BY name")
.fetch([this](str name, int build){
buildNums[name] = build;
});
// This is only a separate function because I imagined that it would
// be nice to reload some configuration during runtime without restarting
// the server completely. Currently not called from anywhere else
// TODO: implement that
loadConfiguration();
}
void Laminar::registerClient(LaminarClient* client) {
clients.insert(client);
}
void Laminar::deregisterClient(LaminarClient* client) {
clients.erase(client);
}
void Laminar::registerWaiter(LaminarWaiter *waiter) {
waiters.insert(waiter);
}
void Laminar::deregisterWaiter(LaminarWaiter *waiter) {
waiters.erase(waiter);
}
bool Laminar::setParam(std::string job, int buildNum, std::string param, std::string value) {
if(Run* run = activeRun(job, buildNum)) {
run->params[param] = value;
return true;
}
return false;
}
void Laminar::populateArtifacts(Json &j, std::string job, int num) const {
fs::path dir(fs::path(homeDir)/"archive"/job/std::to_string(num));
if(fs::is_directory(dir)) {
int prefixLen = (fs::path(homeDir)/"archive").string().length();
int scopeLen = dir.string().length();
for(fs::recursive_directory_iterator it(dir); it != fs::recursive_directory_iterator(); ++it) {
if(!fs::is_regular_file(*it))
continue;
j.StartObject();
j.set("url", archiveUrl + it->path().string().substr(prefixLen));
j.set("filename", it->path().string().substr(scopeLen+1));
j.EndObject();
}
}
}
void Laminar::sendStatus(LaminarClient* client) {
if(client->scope.type == MonitorScope::LOG) {
// If the requested job is currently in progress
if(const Run* run = activeRun(client->scope.job, client->scope.num)) {
client->sendMessage(run->log.c_str());
} else { // it must be finished, fetch it from the database
db->stmt("SELECT output, outputLen FROM builds WHERE name = ? AND number = ?")
.bind(client->scope.job, client->scope.num)
.fetch([=](str maybeZipped, unsigned long sz) {
std::string log(sz+1,'\0');
if(sz >= COMPRESS_LOG_MIN_SIZE) {
int res = ::uncompress((unsigned char*)&log[0], &sz,
(unsigned char*)maybeZipped.data(), maybeZipped.size());
if(res == Z_OK)
client->sendMessage(log);
else
LLOG(ERROR, "Failed to uncompress log");
} else {
client->sendMessage(maybeZipped);
}
});
}
return;
}
Json j;
j.set("type", "status");
j.set("title", getenv("LAMINAR_TITLE") ?: "Laminar");
j.startObject("data");
if(client->scope.type == MonitorScope::RUN) {
db->stmt("SELECT queuedAt,startedAt,completedAt, result, reason FROM builds WHERE name = ? AND number = ?")
.bind(client->scope.job, client->scope.num)
.fetch([&](time_t queued, time_t started, time_t completed, int result, std::string reason) {
j.set("queued", started-queued);
j.set("started", started);
j.set("completed", completed);
j.set("duration", completed-started);
j.set("result", to_string(RunState(result)));
j.set("reason", reason);
});
if(const Run* run = activeRun(client->scope.job, client->scope.num)) {
j.set("queued", run->startedAt - run->queuedAt);
j.set("started", run->startedAt);
j.set("reason", run->reason());
j.set("result", to_string(RunState::RUNNING));
db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
.bind(run->name)
.fetch([&](int lastRuntime){
j.set("etc", run->startedAt + lastRuntime);
});
}
j.set("latestNum", int(buildNums[client->scope.job]));
j.startArray("artifacts");
populateArtifacts(j, client->scope.job, client->scope.num);
j.EndArray();
} else if(client->scope.type == MonitorScope::JOB) {
j.startArray("recent");
db->stmt("SELECT number,startedAt,completedAt,result,reason FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 25")
.bind(client->scope.job)
.fetch([&](int build,time_t started,time_t completed,int result,str reason){
j.StartObject();
j.set("number", build)
.set("duration", completed - started)
.set("started", started)
.set("result", to_string(RunState(result)))
.set("reason", reason)
.EndObject();
});
j.EndArray();
j.startArray("running");
auto p = activeJobs.get<4>().equal_range(client->scope.job);
for(auto it = p.first; it != p.second; ++it) {
const std::shared_ptr run = *it;
j.StartObject();
j.set("number", run->build);
j.set("node", run->node->name);
j.set("started", run->startedAt);
j.set("result", to_string(RunState::RUNNING));
j.set("reason", run->reason());
j.EndObject();
}
j.EndArray();
int nQueued = 0;
for(const std::shared_ptr run : queuedJobs) {
if (run->name == client->scope.job) {
nQueued++;
}
}
j.set("nQueued", nQueued);
db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result = ? ORDER BY completedAt DESC LIMIT 1")
.bind(client->scope.job, int(RunState::SUCCESS))
.fetch([&](int build, time_t started){
j.startObject("lastSuccess");
j.set("number", build).set("started", started);
j.EndObject();
});
db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result <> ? ORDER BY completedAt DESC LIMIT 1")
.bind(client->scope.job, int(RunState::SUCCESS))
.fetch([&](int build, time_t started){
j.startObject("lastFailed");
j.set("number", build).set("started", started);
j.EndObject();
});
} else if(client->scope.type == MonitorScope::ALL) {
j.startArray("jobs");
db->stmt("SELECT name,number,startedAt,result FROM builds GROUP BY name ORDER BY number DESC")
.fetch([&](str name,int number, time_t started, int result){
j.StartObject();
j.set("name", name);
j.set("number", number).set("result", to_string(RunState(result))).set("started", started);
j.startArray("tags");
for(const str& t: jobTags[name]) {
j.String(t.c_str());
}
j.EndArray();
j.EndObject();
});
j.EndArray();
} else { // Home page
j.startArray("recent");
db->stmt("SELECT * FROM builds ORDER BY completedAt DESC LIMIT 15")
.fetch([&](str name,int build,str node,time_t,time_t started,time_t completed,int result){
j.StartObject();
j.set("name", name)
.set("number", build)
.set("node", node)
.set("duration", completed - started)
.set("started", started)
.set("result", to_string(RunState(result)))
.EndObject();
});
j.EndArray();
j.startArray("running");
for(const std::shared_ptr run : activeJobs.get<3>()) {
j.StartObject();
j.set("name", run->name);
j.set("number", run->build);
j.set("node", run->node->name);
j.set("started", run->startedAt);
db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
.bind(run->name)
.fetch([&](int lastRuntime){
j.set("etc", run->startedAt + lastRuntime);
});
j.EndObject();
}
j.EndArray();
j.startArray("queued");
for(const std::shared_ptr run : queuedJobs) {
j.StartObject();
j.set("name", run->name);
j.EndObject();
}
j.EndArray();
int execTotal = 0;
int execBusy = 0;
for(const auto& it : nodes) {
const Node& node = it.second;
execTotal += node.numExecutors;
execBusy += node.busyExecutors;
}
j.set("executorsTotal", execTotal);
j.set("executorsBusy", execBusy);
j.startArray("buildsPerDay");
for(int i = 6; i >= 0; --i) {
j.StartObject();
db->stmt("SELECT result, COUNT(*) FROM builds WHERE completedAt > ? AND completedAt < ? GROUP by result")
.bind(86400*(time(0)/86400 - i), 86400*(time(0)/86400 - (i-1)))
.fetch([&](int result, int num){
j.set(to_string(RunState(result)).c_str(), num);
});
j.EndObject();
}
j.EndArray();
j.startObject("buildsPerJob");
db->stmt("SELECT name, COUNT(*) FROM builds WHERE completedAt > ? GROUP BY name")
.bind(time(0) - 86400)
.fetch([&](str job, int count){
j.set(job.c_str(), count);
});
j.EndObject();
j.startObject("timePerJob");
db->stmt("SELECT name, AVG(completedAt-startedAt) FROM builds WHERE completedAt > ? GROUP BY name")
.bind(time(0) - 7 * 86400)
.fetch([&](str job, int time){
j.set(job.c_str(), time);
});
j.EndObject();
}
j.EndObject();
client->sendMessage(j.str());
}
Laminar::~Laminar() {
delete db;
delete srv;
}
void Laminar::run() {
const char* listen_rpc = getenv("LAMINAR_BIND_RPC") ?: INTADDR_RPC_DEFAULT;
const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT;
srv = new Server(*this, listen_rpc, listen_http);
// handle SIGCHLD
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGCHLD);
sigprocmask(SIG_BLOCK, &mask, NULL);
int sigchld = signalfd(-1, &mask, 0);
srv->addDescriptor(sigchld, [this](char* buf, size_t sz){
struct signalfd_siginfo* siginfo = (struct signalfd_siginfo*) buf;
KJ_ASSERT(siginfo->ssi_signo == SIGCHLD);
reapAdvance();
assignNewJobs();
});
srv->start();
}
void Laminar::stop() {
clients.clear();
srv->stop();
}
bool Laminar::loadConfiguration() {
if(getenv("LAMINAR_KEEP_WORKDIR"))
eraseWorkdir = false;
NodeMap nm;
fs::path nodeCfg = fs::path(homeDir)/"cfg"/"nodes";
if(fs::is_directory(nodeCfg)) {
for(fs::directory_iterator it(nodeCfg); it != fs::directory_iterator(); ++it) {
if(!fs::is_regular_file(it->status()) || it->path().extension() != ".conf")
continue;
StringMap conf = parseConfFile(it->path().string().c_str());
Node node;
node.name = it->path().stem().string();
node.numExecutors = conf.get("EXECUTORS", 6);
std::string tags = conf.get("TAGS");
if(!tags.empty()) {
std::istringstream iss(tags);
std::set tagList;
std::string tag;
while(std::getline(iss, tag, ','))
tagList.insert(tag);
node.tags = tagList;
}
nm.emplace(node.name, std::move(node));
}
}
if(nm.empty()) {
// add a default node
Node node;
node.name = "";
node.numExecutors = 6;
nm.emplace("", std::move(node));
}
nodes = nm;
fs::path jobsDir = fs::path(homeDir)/"cfg"/"jobs";
if(fs::is_directory(jobsDir)) {
for(fs::directory_iterator it(jobsDir); it != fs::directory_iterator(); ++it) {
if(!fs::is_regular_file(it->status()) || it->path().extension() != ".conf")
continue;
StringMap conf = parseConfFile(it->path().string().c_str());
std::string tags = conf.get("TAGS");
if(!tags.empty()) {
std::istringstream iss(tags);
std::set tagList;
std::string tag;
while(std::getline(iss, tag, ','))
tagList.insert(tag);
jobTags[it->path().stem().string()] = tagList;
}
}
}
return true;
}
std::shared_ptr Laminar::queueJob(std::string name, ParamMap params) {
if(!fs::exists(fs::path(homeDir)/"cfg"/"jobs"/name+".run")) {
LLOG(ERROR, "Non-existent job", name);
return nullptr;
}
std::shared_ptr run = std::make_shared();
run->name = name;
run->queuedAt = time(0);
for(auto it = params.begin(); it != params.end();) {
if(it->first[0] == '=') {
if(it->first == "=parentJob") {
run->parentName = it->second;
} else if(it->first == "=parentBuild") {
run->parentBuild = atoi(it->second.c_str());
} else if(it->first == "=reason") {
run->reasonMsg = it->second;
} else {
LLOG(ERROR, "Unknown internal job parameter", it->first);
}
it = params.erase(it);
} else
++it;
}
run->params = params;
queuedJobs.push_back(run);
// notify clients
Json j;
j.set("type", "job_queued")
.startObject("data")
.set("name", name)
.EndObject();
const char* msg = j.str();
for(LaminarClient* c : clients) {
if(c->scope.wantsStatus(name))
c->sendMessage(msg);
}
assignNewJobs();
return run;
}
bool Laminar::stepRun(std::shared_ptr run) {
bool complete = run->step();
if(!complete) {
srv->addDescriptor(run->fd, [=](char* b,size_t n){
std::string s(b,n);
run->log += s;
for(LaminarClient* c : clients) {
if(c->scope.wantsLog(run->name, run->build))
c->sendMessage(s);
}
});
}
return complete;
}
// Reaps a zombie and steps the corresponding Run to its next state.
// Should be called on SIGCHLD
void Laminar::reapAdvance() {
int ret = 0;
pid_t pid;
while((pid = waitpid(-1, &ret, WNOHANG)) > 0) {
LLOG(INFO, "Reaping", pid);
auto it = activeJobs.get<0>().find(pid);
std::shared_ptr run = *it;
bool completed = true;
activeJobs.get<0>().modify(it, [&](std::shared_ptr run){
run->reaped(ret);
completed = stepRun(run);
});
if(completed)
run->complete();
}
}
bool Laminar::nodeCanQueue(const Node& node, const Run& run) const {
// if a node is too busy, it can't take the job
if(node.busyExecutors >= node.numExecutors)
return false;
// if the node has no tags, allow the build
if(node.tags.size() == 0)
return true;
auto it = jobTags.find(run.name);
// if the job has no tags, it cannot be run on this node
if(it == jobTags.end())
return false;
// otherwise, allow the build if job and node have a tag in common
for(const std::string& tag : it->second) {
if(node.tags.find(tag) != node.tags.end())
return true;
}
return false;
}
void Laminar::assignNewJobs() {
auto it = queuedJobs.begin();
while(it != queuedJobs.end()) {
bool assigned = false;
for(auto& sn : nodes) {
Node& node = sn.second;
std::shared_ptr run = *it;
if(nodeCanQueue(node, *run)) {
fs::path cfgDir = fs::path(homeDir)/"cfg";
// create a workspace for this job if it doesn't exist
fs::path ws = fs::path(homeDir)/"run"/run->name/"workspace";
if(!fs::exists(ws)) {
if(!fs::create_directories(ws)) {
LLOG(ERROR, "Could not create job workspace", run->name);
break;
}
// prepend the workspace init script
if(fs::exists(cfgDir/"jobs"/run->name+".init"))
run->addScript((cfgDir/"jobs"/run->name+".init").string(), ws.string());
}
int buildNum = buildNums[run->name] + 1;
// create the run directory
fs::path rd = fs::path(homeDir)/"run"/run->name/std::to_string(buildNum);
if(!fs::create_directory(rd)) {
LLOG(ERROR, "Could not create working directory", rd.string());
break;
}
run->runDir = rd.string();
// create an archive directory
fs::path archive = fs::path(homeDir)/"archive"/run->name/std::to_string(buildNum);
if(fs::is_directory(archive)) {
LLOG(WARNING, "Archive directory already exists", archive.string());
} else if(!fs::create_directories(archive)) {
LLOG(ERROR, "Could not create archive directory", archive.string());
break;
}
// add scripts
// global before-run script
if(fs::exists(cfgDir/"before"))
run->addScript((cfgDir/"before").string());
// per-node before-run script
if(fs::exists(cfgDir/"nodes"/node.name+".before"))
run->addScript((cfgDir/"nodes"/node.name+".before").string());
// job before-run script
if(fs::exists(cfgDir/"jobs"/run->name+".before"))
run->addScript((cfgDir/"jobs"/run->name+".before").string());
// main run script. must exist.
run->addScript((cfgDir/"jobs"/run->name+".run").string());
// job after-run script
if(fs::exists(cfgDir/"jobs"/run->name+".after"))
run->addScript((cfgDir/"jobs"/run->name+".after").string());
// per-node after-run script
if(fs::exists(cfgDir/"nodes"/node.name+".after"))
run->addScript((cfgDir/"nodes"/node.name+".after").string());
// global after-run script
if(fs::exists(cfgDir/"after"))
run->addScript((cfgDir/"after").string());
// add environment files
if(fs::exists(cfgDir/"env"))
run->addEnv((cfgDir/"env").string());
if(fs::exists(cfgDir/"nodes"/node.name+".env"))
run->addEnv((cfgDir/"nodes"/node.name+".env").string());
if(fs::exists(cfgDir/"jobs"/run->name+".env"))
run->addEnv((cfgDir/"jobs"/run->name+".env").string());
// start the job
node.busyExecutors++;
run->node = &node;
run->startedAt = time(0);
run->laminarHome = homeDir;
run->build = buildNum;
// set the last known result if exists
db->stmt("SELECT result FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
.bind(run->name)
.fetch([=](int result){
run->lastResult = RunState(result);
});
// update next build number
buildNums[run->name] = buildNum;
LLOG(INFO, "Queued job to node", run->name, run->build, node.name);
// notify clients
Json j;
j.set("type", "job_started")
.startObject("data")
.set("queueIndex", std::distance(it,queuedJobs.begin()))
.set("name", run->name)
.set("queued", run->startedAt - run->queuedAt)
.set("started", run->startedAt)
.set("number", run->build)
.set("reason", run->reason());
db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
.bind(run->name)
.fetch([&](int etc){
j.set("etc", time(0) + etc);
});
j.EndObject();
const char* msg = j.str();
for(LaminarClient* c : clients) {
if(c->scope.wantsStatus(run->name, run->build)
// The run page also should know that another job has started
// (so maybe it can show a previously hidden "next" button).
// Hence this small hack:
|| (c->scope.type == MonitorScope::Type::RUN && c->scope.job == run->name))
c->sendMessage(msg);
}
// setup run completion handler
run->notifyCompletion = [this](Run* r) { runFinished(r); };
// trigger the first step of the run
if(stepRun(run)) {
// should never happen
LLOG(INFO, "No steps for run");
run->complete();
}
assigned = true;
break;
}
}
if(assigned) {
activeJobs.insert(*it);
it = queuedJobs.erase(it);
} else
++it;
}
}
void Laminar::runFinished(Run * r) {
Node* node = r->node;
node->busyExecutors--;
LLOG(INFO, "Run completed", r->name, to_string(r->result));
time_t completedAt = time(0);
// compress log
std::string maybeZipped = r->log;
size_t logsize = r->log.length();
if(r->log.length() >= COMPRESS_LOG_MIN_SIZE) {
std::string zipped(r->log.size(), '\0');
unsigned long zippedSize = zipped.size();
if(::compress((unsigned char*)&zipped[0], &zippedSize,
(unsigned char*)&r->log[0], logsize) == Z_OK) {
zipped.resize(zippedSize);
std::swap(maybeZipped, zipped);
}
}
std::string reason = r->reason();
db->stmt("INSERT INTO builds VALUES(?,?,?,?,?,?,?,?,?,?,?,?)")
.bind(r->name, r->build, node->name, r->queuedAt, r->startedAt, completedAt, int(r->result),
maybeZipped, logsize, r->parentName, r->parentBuild, reason)
.exec();
// notify clients
Json j;
j.set("type", "job_completed")
.startObject("data")
.set("name", r->name)
.set("number", r->build)
.set("queued", r->startedAt - r->queuedAt)
.set("completed", completedAt)
.set("duration", completedAt - r->startedAt)
.set("started", r->startedAt)
.set("result", to_string(r->result))
.set("reason", r->reason());
j.startArray("artifacts");
populateArtifacts(j, r->name, r->build);
j.EndArray();
j.EndObject();
const char* msg = j.str();
for(LaminarClient* c : clients) {
if(c->scope.wantsStatus(r->name, r->build))
c->sendMessage(msg);
}
// notify the waiters
for(LaminarWaiter* w : waiters) {
w->complete(r);
}
// remove the rundir
fs::remove_all(r->runDir);
// will delete the job
activeJobs.get<2>().erase(r);
}
bool Laminar::getArtefact(std::string path, std::string& result) {
if(archiveUrl != ARCHIVE_URL_DEFAULT) {
// we shouldn't have got here. Probably an invalid link.
return false;
}
// Reads in the whole file into the given string reference.
// This is a terrible way to serve files (especially large ones).
fs::path file(fs::path(homeDir)/"archive"/path);
// no support for browsing archived directories
if(fs::is_directory(file))
return false;
std::ifstream fstr(file.string());
fstr.seekg(0, std::ios::end);
size_t sz = fstr.tellg();
if(fstr.rdstate() == 0) {
result.resize(sz);
fstr.seekg(0);
fstr.read(&result[0], sz);
return true;
}
return false;
}