/// /// Copyright 2015-2020 Oliver Giles /// /// This file is part of Laminar /// /// Laminar is free software: you can redistribute it and/or modify /// it under the terms of the GNU General Public License as published by /// the Free Software Foundation, either version 3 of the License, or /// (at your option) any later version. /// /// Laminar is distributed in the hope that it will be useful, /// but WITHOUT ANY WARRANTY; without even the implied warranty of /// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the /// GNU General Public License for more details. /// /// You should have received a copy of the GNU General Public License /// along with Laminar. If not, see /// #include "laminar.h" #include "server.h" #include "conf.h" #include "log.h" #include "http.h" #include "rpc.h" #include #include #include #include #include #include #include #include #define COMPRESS_LOG_MIN_SIZE 1024 #include #include // FNM_EXTMATCH isn't supported under musl #if !defined(FNM_EXTMATCH) #define FNM_EXTMATCH 0 #endif // rapidjson::Writer with a StringBuffer is used a lot in Laminar for // preparing JSON messages to send to HTTP clients. A small wrapper // class here reduces verbosity later for this common use case. class Json : public rapidjson::Writer { public: Json() : rapidjson::Writer(buf) { StartObject(); } template Json& set(const char* key, T value) { String(key); Int64(value); return *this; } Json& startObject(const char* key) { String(key); StartObject(); return *this; } Json& startArray(const char* key) { String(key); StartArray(); return *this; } const char* str() { EndObject(); return buf.GetString(); } private: rapidjson::StringBuffer buf; }; template<> Json& Json::set(const char* key, double value) { String(key); Double(value); return *this; } template<> Json& Json::set(const char* key, const char* value) { String(key); String(value); return *this; } template<> Json& Json::set(const char* key, std::string value) { String(key); String(value.c_str()); return *this; } // short syntax helpers for kj::Path template inline kj::Path operator/(const kj::Path& p, const T& ext) { return p.append(ext); } template inline kj::Path operator/(const std::string& p, const T& ext) { return kj::Path{p}/ext; } typedef std::string str; Laminar::Laminar(Server &server, Settings settings) : srv(server), homePath(kj::Path::parse(&settings.home[1])), fsHome(kj::newDiskFilesystem()->getRoot().openSubdir(homePath, kj::WriteMode::MODIFY)), http(kj::heap(*this)), rpc(kj::heap(*this)) { LASSERT(settings.home[0] == '/'); if(fsHome->exists(homePath/"cfg"/"nodes")) { LLOG(ERROR, "Found node configuration directory cfg/nodes. Nodes have been deprecated, please migrate to contexts. Laminar will now exit."); exit(EXIT_FAILURE); } archiveUrl = settings.archive_url; if(archiveUrl.back() != '/') archiveUrl.append("/"); numKeepRunDirs = 0; db = new Database((homePath/"laminar.sqlite").toString(true).cStr()); // Prepare database for first use // TODO: error handling db->exec("CREATE TABLE IF NOT EXISTS builds(" "name TEXT, number INT UNSIGNED, node TEXT, queuedAt INT, " "startedAt INT, completedAt INT, result INT, output TEXT, " "outputLen INT, parentJob TEXT, parentBuild INT, reason TEXT, " "PRIMARY KEY (name, number))"); db->exec("CREATE INDEX IF NOT EXISTS idx_completion_time ON builds(" "completedAt DESC)"); // retrieve the last build numbers db->stmt("SELECT name, MAX(number) FROM builds GROUP BY name") .fetch([this](str name, uint build){ buildNums[name] = build; }); srv.watchPaths([this]{ LLOG(INFO, "Reloading configuration"); loadConfiguration(); // config change may allow stuck jobs to dequeue assignNewJobs(); }).addPath((homePath/"cfg"/"contexts").toString(true).cStr()) .addPath((homePath/"cfg"/"jobs").toString(true).cStr()) .addPath((homePath/"cfg").toString(true).cStr()); // for groups.conf loadCustomizations(); srv.watchPaths([this]{ LLOG(INFO, "Reloading customizations"); loadCustomizations(); }).addPath((homePath/"custom").toString(true).cStr()); srv.listenRpc(*rpc, settings.bind_rpc); srv.listenHttp(*http, settings.bind_http); // Load configuration, may be called again in response to an inotify event // that the configuration files have been modified loadConfiguration(); } void Laminar::loadCustomizations() { KJ_IF_MAYBE(templ, fsHome->tryOpenFile(kj::Path{"custom","index.html"})) { http->setHtmlTemplate((*templ)->readAllText().cStr()); } else { http->setHtmlTemplate(); } } uint Laminar::latestRun(std::string job) { if(auto it = buildNums.find(job); it != buildNums.end()) return it->second; return 0; } bool Laminar::handleLogRequest(std::string name, uint num, std::string& output, bool& complete) { if(Run* run = activeRun(name, num)) { output = run->log; complete = false; return true; } else { // it must be finished, fetch it from the database db->stmt("SELECT output, outputLen FROM builds WHERE name = ? AND number = ?") .bind(name, num) .fetch([&](str maybeZipped, unsigned long sz) { str log(sz,'\0'); if(sz >= COMPRESS_LOG_MIN_SIZE) { int res = ::uncompress((uint8_t*) log.data(), &sz, (const uint8_t*) maybeZipped.data(), maybeZipped.size()); if(res == Z_OK) std::swap(output, log); else LLOG(ERROR, "Failed to uncompress log", res); } else { std::swap(output, maybeZipped); } }); if(output.size()) { complete = true; return true; } } return false; } bool Laminar::setParam(std::string job, uint buildNum, std::string param, std::string value) { if(Run* run = activeRun(job, buildNum)) { run->params[param] = value; return true; } return false; } const std::list>& Laminar::listQueuedJobs() { return queuedJobs; } const RunSet& Laminar::listRunningJobs() { return activeJobs; } std::list Laminar::listKnownJobs() { std::list res; KJ_IF_MAYBE(dir, fsHome->tryOpenSubdir(kj::Path{"cfg","jobs"})) { for(kj::Directory::Entry& entry : (*dir)->listEntries()) { if(entry.name.endsWith(".run")) { res.emplace_back(entry.name.cStr(), entry.name.findLast('.').orDefault(0)); } } } return res; } void Laminar::populateArtifacts(Json &j, std::string job, uint num, kj::Path subdir) const { kj::Path runArchive{job,std::to_string(num)}; runArchive = runArchive.append(subdir); KJ_IF_MAYBE(dir, fsHome->tryOpenSubdir("archive"/runArchive)) { for(kj::StringPtr file : (*dir)->listNames()) { kj::FsNode::Metadata meta = (*dir)->lstat(kj::Path{file}); if(meta.type == kj::FsNode::Type::FILE) { j.StartObject(); j.set("url", archiveUrl + (runArchive/file).toString().cStr()); j.set("filename", (subdir/file).toString().cStr()); j.set("size", meta.size); j.EndObject(); } else if(meta.type == kj::FsNode::Type::DIRECTORY) { populateArtifacts(j, job, num, subdir/file); } } } } std::string Laminar::getStatus(MonitorScope scope) { Json j; j.set("type", "status"); j.set("title", getenv("LAMINAR_TITLE") ?: "Laminar"); j.set("version", laminar_version()); j.set("time", time(nullptr)); j.startObject("data"); if(scope.type == MonitorScope::RUN) { db->stmt("SELECT queuedAt,startedAt,completedAt,result,reason,parentJob,parentBuild,q.lr IS NOT NULL,q.lr FROM builds " "LEFT JOIN (SELECT name n, MAX(number), completedAt-startedAt lr FROM builds WHERE result IS NOT NULL GROUP BY n) q ON q.n = name " "WHERE name = ? AND number = ?") .bind(scope.job, scope.num) .fetch([&](time_t queued, time_t started, time_t completed, int result, std::string reason, std::string parentJob, uint parentBuild, uint lastRuntimeKnown, uint lastRuntime) { j.set("queued", queued); j.set("started", started); if(completed) j.set("completed", completed); j.set("result", to_string(completed ? RunState(result) : started ? RunState::RUNNING : RunState::QUEUED)); j.set("reason", reason); j.startObject("upstream").set("name", parentJob).set("num", parentBuild).EndObject(2); if(lastRuntimeKnown) j.set("etc", started + lastRuntime); }); if(auto it = buildNums.find(scope.job); it != buildNums.end()) j.set("latestNum", int(it->second)); j.startArray("artifacts"); populateArtifacts(j, scope.job, scope.num); j.EndArray(); } else if(scope.type == MonitorScope::JOB) { const uint runsPerPage = 20; j.startArray("recent"); // ORDER BY param cannot be bound std::string order_by; std::string direction = scope.order_desc ? "DESC" : "ASC"; if(scope.field == "number") order_by = "number " + direction; else if(scope.field == "result") order_by = "result " + direction + ", number DESC"; else if(scope.field == "started") order_by = "startedAt " + direction + ", number DESC"; else if(scope.field == "duration") order_by = "(completedAt-startedAt) " + direction + ", number DESC"; else order_by = "number DESC"; std::string stmt = "SELECT number,startedAt,completedAt,result,reason FROM builds " "WHERE name = ? AND result IS NOT NULL ORDER BY " + order_by + " LIMIT ?,?"; db->stmt(stmt.c_str()) .bind(scope.job, scope.page * runsPerPage, runsPerPage) .fetch([&](uint build,time_t started,time_t completed,int result,str reason){ j.StartObject(); j.set("number", build) .set("completed", completed) .set("started", started) .set("result", to_string(RunState(result))) .set("reason", reason) .EndObject(); }); j.EndArray(); db->stmt("SELECT COUNT(*),AVG(completedAt-startedAt) FROM builds WHERE name = ? AND result IS NOT NULL") .bind(scope.job) .fetch([&](uint nRuns, uint averageRuntime){ j.set("averageRuntime", averageRuntime); j.set("pages", (nRuns-1) / runsPerPage + 1); j.startObject("sort"); j.set("page", scope.page) .set("field", scope.field) .set("order", scope.order_desc ? "dsc" : "asc") .EndObject(); }); j.startArray("running"); auto p = activeJobs.byJobName().equal_range(scope.job); for(auto it = p.first; it != p.second; ++it) { const std::shared_ptr run = *it; j.StartObject(); j.set("number", run->build); j.set("context", run->context->name); j.set("started", run->startedAt); j.set("result", to_string(RunState::RUNNING)); j.set("reason", run->reason()); j.EndObject(); } j.EndArray(); int nQueued = 0; for(const auto& run : queuedJobs) { if (run->name == scope.job) { nQueued++; } } j.set("nQueued", nQueued); db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result = ? " "ORDER BY completedAt DESC LIMIT 1") .bind(scope.job, int(RunState::SUCCESS)) .fetch([&](int build, time_t started){ j.startObject("lastSuccess"); j.set("number", build).set("started", started); j.EndObject(); }); db->stmt("SELECT number,startedAt FROM builds " "WHERE name = ? AND result <> ? " "ORDER BY completedAt DESC LIMIT 1") .bind(scope.job, int(RunState::SUCCESS)) .fetch([&](int build, time_t started){ j.startObject("lastFailed"); j.set("number", build).set("started", started); j.EndObject(); }); auto desc = jobDescriptions.find(scope.job); j.set("description", desc == jobDescriptions.end() ? "" : desc->second); } else if(scope.type == MonitorScope::ALL) { j.startArray("jobs"); db->stmt("SELECT name,number,startedAt,completedAt,result,reason FROM builds b " "JOIN (SELECT name n,MAX(number) latest FROM builds WHERE result IS NOT NULL GROUP BY n) q " "ON b.name = q.n AND b.number = latest") .fetch([&](str name,uint number, time_t started, time_t completed, int result, str reason){ j.StartObject(); j.set("name", name); j.set("number", number); j.set("result", to_string(RunState(result))); j.set("started", started); j.set("completed", completed); j.set("reason", reason); j.EndObject(); }); j.EndArray(); j.startArray("running"); for(const auto& run : activeJobs.byStartedAt()) { j.StartObject(); j.set("name", run->name); j.set("number", run->build); j.set("context", run->context->name); j.set("started", run->startedAt); j.EndObject(); } j.EndArray(); j.startObject("groups"); for(const auto& group : jobGroups) j.set(group.first.c_str(), group.second); j.EndObject(); } else { // Home page j.startArray("recent"); db->stmt("SELECT name,number,node,queuedAt,startedAt,completedAt,result,reason FROM builds WHERE completedAt IS NOT NULL ORDER BY completedAt DESC LIMIT 20") .fetch([&](str name,uint build,str context,time_t queued,time_t started,time_t completed,int result,str reason){ j.StartObject(); j.set("name", name) .set("number", build) .set("context", context) .set("queued", queued) .set("started", started) .set("completed", completed) .set("result", to_string(RunState(result))) .set("reason", reason) .EndObject(); }); j.EndArray(); j.startArray("running"); for(const auto& run : activeJobs.byStartedAt()) { j.StartObject(); j.set("name", run->name); j.set("number", run->build); j.set("context", run->context->name); j.set("started", run->startedAt); db->stmt("SELECT completedAt - startedAt FROM builds " "WHERE completedAt IS NOT NULL AND name = ? " "ORDER BY completedAt DESC LIMIT 1") .bind(run->name) .fetch([&](uint lastRuntime){ j.set("etc", run->startedAt + lastRuntime); }); j.EndObject(); } j.EndArray(); j.startArray("queued"); for(const auto& run : queuedJobs) { j.StartObject(); j.set("name", run->name); j.EndObject(); } j.EndArray(); int execTotal = 0; int execBusy = 0; for(const auto& it : contexts) { const std::shared_ptr& context = it.second; execTotal += context->numExecutors; execBusy += context->busyExecutors; } j.set("executorsTotal", execTotal); j.set("executorsBusy", execBusy); j.startArray("buildsPerDay"); for(int i = 6; i >= 0; --i) { j.StartObject(); db->stmt("SELECT result, COUNT(*) FROM builds WHERE completedAt > ? AND completedAt < ? GROUP BY result") .bind(86400*(time(nullptr)/86400 - i), 86400*(time(nullptr)/86400 - (i-1))) .fetch([&](int result, int num){ j.set(to_string(RunState(result)).c_str(), num); }); j.EndObject(); } j.EndArray(); j.startObject("buildsPerJob"); db->stmt("SELECT name, COUNT(*) c FROM builds WHERE completedAt > ? GROUP BY name ORDER BY c DESC LIMIT 5") .bind(time(nullptr) - 86400) .fetch([&](str job, int count){ j.set(job.c_str(), count); }); j.EndObject(); j.startObject("timePerJob"); db->stmt("SELECT name, AVG(completedAt-startedAt) av FROM builds WHERE completedAt > ? GROUP BY name ORDER BY av DESC LIMIT 8") .bind(time(nullptr) - 7 * 86400) .fetch([&](str job, double time){ j.set(job.c_str(), time); }); j.EndObject(); j.startArray("resultChanged"); db->stmt("SELECT b.name,MAX(b.number) as lastSuccess,lastFailure FROM builds AS b JOIN (SELECT name,MAX(number) AS lastFailure FROM builds WHERE result<>? GROUP BY name) AS t ON t.name=b.name WHERE b.result=? GROUP BY b.name ORDER BY lastSuccess>lastFailure, lastFailure-lastSuccess DESC LIMIT 8") .bind(int(RunState::SUCCESS), int(RunState::SUCCESS)) .fetch([&](str job, uint lastSuccess, uint lastFailure){ j.StartObject(); j.set("name", job) .set("lastSuccess", lastSuccess) .set("lastFailure", lastFailure); j.EndObject(); }); j.EndArray(); j.startArray("lowPassRates"); db->stmt("SELECT name,CAST(SUM(result==?) AS FLOAT)/COUNT(*) AS passRate FROM builds GROUP BY name ORDER BY passRate ASC LIMIT 8") .bind(int(RunState::SUCCESS)) .fetch([&](str job, double passRate){ j.StartObject(); j.set("name", job).set("passRate", passRate); j.EndObject(); }); j.EndArray(); j.startArray("buildTimeChanges"); db->stmt("SELECT name,GROUP_CONCAT(number),GROUP_CONCAT(completedAt-startedAt) FROM builds WHERE number > (SELECT MAX(number)-10 FROM builds b WHERE b.name=builds.name) GROUP BY name ORDER BY (MAX(completedAt-startedAt)-MIN(completedAt-startedAt))-STDEV(completedAt-startedAt) DESC LIMIT 8") .fetch([&](str name, str numbers, str durations){ j.StartObject(); j.set("name", name); j.startArray("numbers"); j.RawValue(numbers.data(), numbers.length(), rapidjson::Type::kArrayType); j.EndArray(); j.startArray("durations"); j.RawValue(durations.data(), durations.length(), rapidjson::Type::kArrayType); j.EndArray(); j.EndObject(); }); j.EndArray(); j.startObject("completedCounts"); db->stmt("SELECT name, COUNT(*) FROM builds WHERE result IS NOT NULL GROUP BY name") .fetch([&](str job, uint count){ j.set(job.c_str(), count); }); j.EndObject(); } j.EndObject(); return j.str(); } Laminar::~Laminar() noexcept try { delete db; } catch (std::exception& e) { LLOG(ERROR, e.what()); return; } bool Laminar::loadConfiguration() { if(const char* ndirs = getenv("LAMINAR_KEEP_RUNDIRS")) numKeepRunDirs = static_cast(atoi(ndirs)); std::set knownContexts; KJ_IF_MAYBE(contextsDir, fsHome->tryOpenSubdir(kj::Path{"cfg","contexts"})) { for(kj::Directory::Entry& entry : (*contextsDir)->listEntries()) { if(!entry.name.endsWith(".conf")) continue; StringMap conf = parseConfFile((homePath/"cfg"/"contexts"/entry.name).toString(true).cStr()); std::string name(entry.name.cStr(), entry.name.findLast('.').orDefault(0)); auto existing = contexts.find(name); std::shared_ptr context = existing == contexts.end() ? contexts.emplace(name, std::shared_ptr(new Context)).first->second : existing->second; context->name = name; context->numExecutors = conf.get("EXECUTORS", 6); std::string jobPtns = conf.get("JOBS"); std::set jobPtnsList; if(!jobPtns.empty()) { std::istringstream iss(jobPtns); std::string job; while(std::getline(iss, job, ',')) jobPtnsList.insert(job); } context->jobPatterns.swap(jobPtnsList); knownContexts.insert(name); } } // remove any contexts whose config files disappeared. // if there are no known contexts, take care not to remove and re-add the default context. for(auto it = contexts.begin(); it != contexts.end();) { if((it->first == "default" && knownContexts.size() == 0) || knownContexts.find(it->first) != knownContexts.end()) it++; else it = contexts.erase(it); } // add a default context if(contexts.empty()) { LLOG(INFO, "Creating a default context with 6 executors"); std::shared_ptr context(new Context); context->name = "default"; context->numExecutors = 6; contexts.emplace("default", context); } KJ_IF_MAYBE(jobsDir, fsHome->tryOpenSubdir(kj::Path{"cfg","jobs"})) { for(kj::Directory::Entry& entry : (*jobsDir)->listEntries()) { if(!entry.name.endsWith(".conf")) continue; StringMap conf = parseConfFile((homePath/"cfg"/"jobs"/entry.name).toString(true).cStr()); std::string jobName(entry.name.cStr(), entry.name.findLast('.').orDefault(0)); std::string ctxPtns = conf.get("CONTEXTS"); std::set ctxPtnList; if(!ctxPtns.empty()) { std::istringstream iss(ctxPtns); std::string ctx; while(std::getline(iss, ctx, ',')) ctxPtnList.insert(ctx); } // Must be present both here and in queueJob because otherwise if a context // were created while a job is already queued, the default context would be // dropped when the set of contexts is updated here. if(ctxPtnList.empty()) ctxPtnList.insert("default"); jobContexts[jobName].swap(ctxPtnList); std::string desc = conf.get("DESCRIPTION"); if(!desc.empty()) { jobDescriptions[jobName] = desc; } } } jobGroups.clear(); KJ_IF_MAYBE(groupsConf, fsHome->tryOpenFile(kj::Path{"cfg","groups.conf"})) jobGroups = parseConfFile((homePath/"cfg"/"groups.conf").toString(true).cStr()); if(jobGroups.empty()) jobGroups["All Jobs"] = ".*"; return true; } std::shared_ptr Laminar::queueJob(std::string name, ParamMap params) { if(!fsHome->exists(kj::Path{"cfg","jobs",name+".run"})) { LLOG(ERROR, "Non-existent job", name); return nullptr; } // jobContexts[name] can be empty if there is no .conf file at all if(jobContexts[name].empty()) jobContexts.at(name).insert("default"); std::shared_ptr run = std::make_shared(name, ++buildNums[name], kj::mv(params), homePath.clone()); queuedJobs.push_back(run); db->stmt("INSERT INTO builds(name,number,queuedAt,parentJob,parentBuild,reason) VALUES(?,?,?,?,?,?)") .bind(run->name, run->build, run->queuedAt, run->parentName, run->parentBuild, run->reason()) .exec(); // notify clients Json j; j.set("type", "job_queued") .startObject("data") .set("name", name) .set("number", run->build) .EndObject(); http->notifyEvent(j.str(), name.c_str()); assignNewJobs(); return run; } bool Laminar::abort(std::string job, uint buildNum) { if(Run* run = activeRun(job, buildNum)) return run->abort(); return false; } void Laminar::abortAll() { for(std::shared_ptr run : activeJobs) { run->abort(); } } bool Laminar::canQueue(const Context& ctx, const Run& run) const { if(ctx.busyExecutors >= ctx.numExecutors) return false; // match may be jobs as defined by the context... for(std::string p : ctx.jobPatterns) { if(fnmatch(p.c_str(), run.name.c_str(), FNM_EXTMATCH) == 0) return true; } // ...or context as defined by the job. for(std::string p : jobContexts.at(run.name)) { if(fnmatch(p.c_str(), ctx.name.c_str(), FNM_EXTMATCH) == 0) return true; } return false; } bool Laminar::tryStartRun(std::shared_ptr run, int queueIndex) { for(auto& sc : contexts) { std::shared_ptr ctx = sc.second; if(canQueue(*ctx, *run)) { RunState lastResult = RunState::UNKNOWN; // set the last known result if exists. Runs which haven't started yet should // have completedAt == NULL and thus be at the end of a DESC ordered query db->stmt("SELECT result FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1") .bind(run->name) .fetch([&](int result){ lastResult = RunState(result); }); kj::Promise onRunFinished = run->start(lastResult, ctx, *fsHome,[this](kj::Maybe& pid){return srv.onChildExit(pid);}); db->stmt("UPDATE builds SET node = ?, startedAt = ? WHERE name = ? AND number = ?") .bind(ctx->name, run->startedAt, run->name, run->build) .exec(); ctx->busyExecutors++; kj::Promise exec = srv.readDescriptor(run->output_fd, [this, run](const char*b, size_t n){ // handle log output std::string s(b, n); run->log += s; http->notifyLog(run->name, run->build, s, false); }).then([run, p = kj::mv(onRunFinished)]() mutable { // wait until leader reaped return kj::mv(p); }).then([this, run](RunState){ handleRunFinished(run.get()); }); if(run->timeout > 0) { exec = exec.attach(srv.addTimeout(run->timeout, [r=run.get()](){ r->abort(); })); } srv.addTask(kj::mv(exec)); LLOG(INFO, "Started job", run->name, run->build, ctx->name); // notify clients Json j; j.set("type", "job_started") .startObject("data") .set("queueIndex", queueIndex) .set("name", run->name) .set("queued", run->queuedAt) .set("started", run->startedAt) .set("number", run->build) .set("reason", run->reason()); db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1") .bind(run->name) .fetch([&](uint etc){ j.set("etc", time(nullptr) + etc); }); j.EndObject(); http->notifyEvent(j.str(), run->name.c_str()); return true; } } return false; } void Laminar::assignNewJobs() { auto it = queuedJobs.begin(); while(it != queuedJobs.end()) { if(tryStartRun(*it, std::distance(it, queuedJobs.begin()))) { activeJobs.insert(*it); it = queuedJobs.erase(it); } else { ++it; } } } void Laminar::handleRunFinished(Run * r) { std::shared_ptr ctx = r->context; ctx->busyExecutors--; LLOG(INFO, "Run completed", r->name, to_string(r->result)); time_t completedAt = time(nullptr); // compress log std::string maybeZipped = r->log; size_t logsize = r->log.length(); if(r->log.length() >= COMPRESS_LOG_MIN_SIZE) { std::string zipped(r->log.size(), '\0'); unsigned long zippedSize = zipped.size(); if(::compress((uint8_t*) zipped.data(), &zippedSize, (const uint8_t*) r->log.data(), logsize) == Z_OK) { zipped.resize(zippedSize); std::swap(maybeZipped, zipped); } } db->stmt("UPDATE builds SET completedAt = ?, result = ?, output = ?, outputLen = ? WHERE name = ? AND number = ?") .bind(completedAt, int(r->result), maybeZipped, logsize, r->name, r->build) .exec(); // notify clients Json j; j.set("type", "job_completed") .startObject("data") .set("name", r->name) .set("number", r->build) .set("queued", r->queuedAt) .set("completed", completedAt) .set("started", r->startedAt) .set("result", to_string(r->result)) .set("reason", r->reason()); j.startArray("artifacts"); populateArtifacts(j, r->name, r->build); j.EndArray(); j.EndObject(); http->notifyEvent(j.str(), r->name); http->notifyLog(r->name, r->build, "", true); // erase reference to run from activeJobs. Since runFinished is called in a // lambda whose context contains a shared_ptr, the run won't be deleted // until the context is destroyed at the end of the lambda execution. activeJobs.byRunPtr().erase(r); // remove old run directories // We cannot count back the number of directories to keep from the currently // finishing job because there may well be older, still-running instances of // this job and we don't want to delete their rundirs. So instead, check // whether there are any more active runs of this job, and if so, count back // from the oldest among them. If there are none, count back from the latest // known build number of this job, which may not be that of the run that // finished here. auto it = activeJobs.byJobName().equal_range(r->name); uint oldestActive = (it.first == it.second)? buildNums[r->name] : (*it.first)->build - 1; for(int i = static_cast(oldestActive - numKeepRunDirs); i > 0; i--) { kj::Path d{"run",r->name,std::to_string(i)}; // Once the directory does not exist, it's probably not worth checking // any further. 99% of the time this loop should only ever have 1 iteration // anyway so hence this (admittedly debatable) optimization. if(!fsHome->exists(d)) break; // must use a try/catch because remove will throw if deletion fails. Using // tryRemove does not help because it still throws an exception for some // errors such as EACCES try { fsHome->remove(d); } catch(kj::Exception& e) { LLOG(ERROR, "Could not remove directory", e.getDescription()); } } fsHome->symlink(kj::Path{"archive", r->name, "latest"}, std::to_string(r->build), kj::WriteMode::CREATE|kj::WriteMode::MODIFY); // in case we freed up an executor, check the queue assignNewJobs(); } kj::Maybe> Laminar::getArtefact(std::string path) { return fsHome->openFile(kj::Path("archive").append(kj::Path::parse(path))); } bool Laminar::handleBadgeRequest(std::string job, std::string &badge) { RunState rs = RunState::UNKNOWN; db->stmt("SELECT result FROM builds WHERE name = ? AND result IS NOT NULL ORDER BY number DESC LIMIT 1") .bind(job) .fetch([&](int result){ rs = RunState(result); }); if(rs == RunState::UNKNOWN) return false; std::string status = to_string(rs); // Empirical approximation of pixel width. Not particularly stable. const int jobNameWidth = job.size() * 7 + 10; const int statusWidth = status.size() * 7 + 10; const char* gradient1 = (rs == RunState::SUCCESS) ? "#2aff4d" : "#ff2a2a"; const char* gradient2 = (rs == RunState::SUCCESS) ? "#24b43c" : "#b42424"; char* svg = NULL; if(asprintf(&svg, R"x( %s %s )x", jobNameWidth+statusWidth, jobNameWidth+statusWidth, gradient1, gradient2, jobNameWidth, jobNameWidth/2+1, job.data(), jobNameWidth, statusWidth, jobNameWidth+statusWidth/2, status.data()) < 0) return false; badge = svg; return true; }