1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2026-03-02 03:40:21 +00:00

Replace nodes/tags with contexts

This commit is contained in:
Oliver Giles
2019-11-09 21:59:37 +02:00
parent a0da7b109e
commit 7c99eae2bd
15 changed files with 193 additions and 208 deletions

View File

@@ -1,5 +1,5 @@
///
/// Copyright 2015 Oliver Giles
/// Copyright 2015-2019 Oliver Giles
///
/// This file is part of Laminar
///
@@ -16,28 +16,36 @@
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef LAMINAR_NODE_H_
#define LAMINAR_NODE_H_
#ifndef LAMINAR_CONTEXT_H_
#define LAMINAR_CONTEXT_H_
#include <fnmatch.h>
#include <string>
#include <set>
class Run;
// Represents a group of executors. Currently almost unnecessary POD
// abstraction, but may be enhanced in the future to support e.g. tags
class Node {
// Represents a context within which a Run will be executed. Allows applying
// a certain environment to a set of Jobs, or setting a limit on the number
// of parallel Runs
class Context {
public:
Node() {}
Context() {}
std::string name;
int numExecutors;
int busyExecutors = 0;
std::set<std::string> tags;
// Attempts to queue the given run to this node. Returns true if succeeded.
bool queue(const Run& run);
bool canQueue(std::set<std::string>& patterns) {
if(busyExecutors >= numExecutors)
return false;
for(std::string pattern : patterns) {
if(fnmatch(pattern.c_str(), name.c_str(), FNM_EXTMATCH) == 0)
return true;
}
return false;
}
};
#endif // LAMINAR_NODE_H_
#endif // LAMINAR_CONTEXT_H_

View File

@@ -76,6 +76,11 @@ Laminar::Laminar(Server &server, Settings settings) :
{
LASSERT(settings.home[0] == '/');
if(fsHome->exists(homePath/"cfg"/"nodes")) {
LLOG(ERROR, "Found node configuration directory cfg/nodes. Nodes have been deprecated, please migrate to contexts. The helper script laminar-migrate-to-contexts.sh script may help with this. Laminar will now exit.");
exit(EXIT_FAILURE);
}
archiveUrl = settings.archive_url;
if(archiveUrl.back() != '/')
archiveUrl.append("/");
@@ -104,8 +109,9 @@ Laminar::Laminar(Server &server, Settings settings) :
loadConfiguration();
// config change may allow stuck jobs to dequeue
assignNewJobs();
}).addPath((homePath/"cfg"/"nodes").toString(true).cStr())
.addPath((homePath/"cfg"/"jobs").toString(true).cStr());
}).addPath((homePath/"cfg"/"contexts").toString(true).cStr())
.addPath((homePath/"cfg"/"jobs").toString(true).cStr())
.addPath((homePath/"cfg").toString(true).cStr()); // for groups.conf
srv.listenRpc(*rpc, settings.bind_rpc);
srv.listenHttp(*http, settings.bind_http);
@@ -283,7 +289,7 @@ std::string Laminar::getStatus(MonitorScope scope) {
const std::shared_ptr<Run> run = *it;
j.StartObject();
j.set("number", run->build);
j.set("node", run->node->name);
j.set("context", run->context->name);
j.set("started", run->startedAt);
j.set("result", to_string(RunState::RUNNING));
j.set("reason", run->reason());
@@ -322,11 +328,6 @@ std::string Laminar::getStatus(MonitorScope scope) {
j.set("result", to_string(RunState(result)));
j.set("started", started);
j.set("completed", completed);
j.startArray("tags");
for(const str& t: jobTags[name]) {
j.String(t.c_str());
}
j.EndArray();
j.EndObject();
});
j.EndArray();
@@ -335,24 +336,23 @@ std::string Laminar::getStatus(MonitorScope scope) {
j.StartObject();
j.set("name", run->name);
j.set("number", run->build);
j.set("node", run->node->name);
j.set("context", run->context->name);
j.set("started", run->startedAt);
j.startArray("tags");
for(const str& t: jobTags[run->name]) {
j.String(t.c_str());
}
j.EndArray();
j.EndObject();
}
j.EndArray();
j.startObject("groups");
for(const auto& group : jobGroups)
j.set(group.first.c_str(), group.second);
j.EndObject();
} else { // Home page
j.startArray("recent");
db->stmt("SELECT * FROM builds ORDER BY completedAt DESC LIMIT 15")
.fetch<str,uint,str,time_t,time_t,time_t,int>([&](str name,uint build,str node,time_t,time_t started,time_t completed,int result){
.fetch<str,uint,str,time_t,time_t,time_t,int>([&](str name,uint build,str context,time_t,time_t started,time_t completed,int result){
j.StartObject();
j.set("name", name)
.set("number", build)
.set("node", node)
.set("context", context)
.set("started", started)
.set("completed", completed)
.set("result", to_string(RunState(result)))
@@ -364,7 +364,7 @@ std::string Laminar::getStatus(MonitorScope scope) {
j.StartObject();
j.set("name", run->name);
j.set("number", run->build);
j.set("node", run->node->name);
j.set("context", run->context->name);
j.set("started", run->startedAt);
db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
.bind(run->name)
@@ -383,10 +383,10 @@ std::string Laminar::getStatus(MonitorScope scope) {
j.EndArray();
int execTotal = 0;
int execBusy = 0;
for(const auto& it : nodes) {
const std::shared_ptr<Node>& node = it.second;
execTotal += node->numExecutors;
execBusy += node->busyExecutors;
for(const auto& it : contexts) {
const std::shared_ptr<Context>& context = it.second;
execTotal += context->numExecutors;
execBusy += context->busyExecutors;
}
j.set("executorsTotal", execTotal);
j.set("executorsBusy", execBusy);
@@ -488,51 +488,41 @@ bool Laminar::loadConfiguration() {
if(const char* ndirs = getenv("LAMINAR_KEEP_RUNDIRS"))
numKeepRunDirs = static_cast<uint>(atoi(ndirs));
std::set<std::string> knownNodes;
std::set<std::string> knownContexts;
KJ_IF_MAYBE(nodeDir, fsHome->tryOpenSubdir(kj::Path{"cfg","nodes"})) {
for(kj::Directory::Entry& entry : (*nodeDir)->listEntries()) {
KJ_IF_MAYBE(contextsDir, fsHome->tryOpenSubdir(kj::Path{"cfg","contexts"})) {
for(kj::Directory::Entry& entry : (*contextsDir)->listEntries()) {
if(!entry.name.endsWith(".conf"))
continue;
StringMap conf = parseConfFile((homePath/"cfg"/"nodes"/entry.name).toString(true).cStr());
StringMap conf = parseConfFile((homePath/"cfg"/"contexts"/entry.name).toString(true).cStr());
std::string nodeName(entry.name.cStr(), entry.name.findLast('.').orDefault(0));
auto existingNode = nodes.find(nodeName);
std::shared_ptr<Node> node = existingNode == nodes.end() ? nodes.emplace(nodeName, std::shared_ptr<Node>(new Node)).first->second : existingNode->second;
node->name = nodeName;
node->numExecutors = conf.get<int>("EXECUTORS", 6);
std::string name(entry.name.cStr(), entry.name.findLast('.').orDefault(0));
auto existing = contexts.find(name);
std::shared_ptr<Context> context = existing == contexts.end() ? contexts.emplace(name, std::shared_ptr<Context>(new Context)).first->second : existing->second;
context->name = name;
context->numExecutors = conf.get<int>("EXECUTORS", 6);
std::string tagString = conf.get<std::string>("TAGS");
std::set<std::string> tagList;
if(!tagString.empty()) {
std::istringstream iss(tagString);
std::string tag;
while(std::getline(iss, tag, ','))
tagList.insert(tag);
}
std::swap(node->tags, tagList);
knownNodes.insert(nodeName);
knownContexts.insert(name);
}
}
// remove any nodes whose config files disappeared.
// if there are no known nodes, take care not to remove and re-add the default node
for(auto it = nodes.begin(); it != nodes.end();) {
if((it->first == "" && knownNodes.size() == 0) || knownNodes.find(it->first) != knownNodes.end())
// remove any contexts whose config files disappeared.
// if there are no known contexts, take care not to remove and re-add the default context.
for(auto it = contexts.begin(); it != contexts.end();) {
if((it->first == "default" && knownContexts.size() == 0) || knownContexts.find(it->first) != knownContexts.end())
it++;
else
it = nodes.erase(it);
it = contexts.erase(it);
}
// add a default node
if(nodes.empty()) {
LLOG(INFO, "Creating a default node with 6 executors");
std::shared_ptr<Node> node(new Node);
node->name = "";
node->numExecutors = 6;
nodes.emplace("", node);
// add a default context
if(contexts.empty()) {
LLOG(INFO, "Creating a default context with 6 executors");
std::shared_ptr<Context> context(new Context);
context->name = "default";
context->numExecutors = 6;
contexts.emplace("default", context);
}
KJ_IF_MAYBE(jobsDir, fsHome->tryOpenSubdir(kj::Path{"cfg","jobs"})) {
@@ -543,19 +533,25 @@ bool Laminar::loadConfiguration() {
std::string jobName(entry.name.cStr(), entry.name.findLast('.').orDefault(0));
std::string tags = conf.get<std::string>("TAGS");
if(!tags.empty()) {
std::istringstream iss(tags);
std::set<std::string> tagList;
std::string tag;
while(std::getline(iss, tag, ','))
tagList.insert(tag);
jobTags[jobName] = tagList;
}
std::string ctxPtns = conf.get<std::string>("CONTEXTS");
if(!ctxPtns.empty()) {
std::istringstream iss(ctxPtns);
std::set<std::string> ctxPtnList;
std::string ctx;
while(std::getline(iss, ctx, ','))
ctxPtnList.insert(ctx);
jobContexts[jobName].swap(ctxPtnList);
}
}
}
jobGroups.clear();
KJ_IF_MAYBE(groupsConf, fsHome->tryOpenFile(kj::Path{"cfg","groups.conf"}))
jobGroups = parseConfFile((homePath/"cfg"/"groups.conf").toString(true).cStr());
if(jobGroups.empty())
jobGroups["All Jobs"] = ".*";
return true;
}
@@ -565,6 +561,10 @@ std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) {
return nullptr;
}
// If the job has no contexts (maybe there is no .conf file at all), add the default context
if(jobContexts[name].empty())
jobContexts.at(name).insert("default");
std::shared_ptr<Run> run = std::make_shared<Run>(name, kj::mv(params), homePath.clone());
queuedJobs.push_back(run);
@@ -594,35 +594,12 @@ void Laminar::abortAll() {
}
}
bool Laminar::nodeCanQueue(const Node& node, std::string jobName) const {
// if a node is too busy, it can't take the job
if(node.busyExecutors >= node.numExecutors)
return false;
// if the node has no tags, allow the build
if(node.tags.size() == 0)
return true;
auto it = jobTags.find(jobName);
// if the job has no tags, it cannot be run on this node
if(it == jobTags.end())
return false;
// otherwise, allow the build if job and node have a tag in common
for(const std::string& tag : it->second) {
if(node.tags.find(tag) != node.tags.end())
return true;
}
return false;
}
bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
for(auto& sn : nodes) {
std::shared_ptr<Node> node = sn.second;
for(auto& sc : contexts) {
std::shared_ptr<Context> ctx = sc.second;
if(nodeCanQueue(*node.get(), run->name) && run->configure(buildNums[run->name] + 1, node, *fsHome)) {
node->busyExecutors++;
if(ctx->canQueue(jobContexts.at(run->name)) && run->configure(buildNums[run->name] + 1, ctx, *fsHome)) {
ctx->busyExecutors++;
// set the last known result if exists
db->stmt("SELECT result FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
.bind(run->name)
@@ -640,7 +617,7 @@ bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
}));
}
srv.addTask(kj::mv(exec));
LLOG(INFO, "Started job on node", run->name, run->build, node->name);
LLOG(INFO, "Started job", run->name, run->build, ctx->name);
// update next build number
buildNums[run->name]++;
@@ -660,11 +637,6 @@ bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
.fetch<uint>([&](uint etc){
j.set("etc", time(nullptr) + etc);
});
j.startArray("tags");
for(const str& t: jobTags[run->name]) {
j.String(t.c_str());
}
j.EndArray();
j.EndObject();
http->notifyEvent(j.str(), run->name.c_str());
return true;
@@ -710,9 +682,9 @@ kj::Promise<void> Laminar::handleRunStep(Run* run) {
}
void Laminar::runFinished(Run * r) {
std::shared_ptr<Node> node = r->node;
std::shared_ptr<Context> ctx = r->context;
node->busyExecutors--;
ctx->busyExecutors--;
LLOG(INFO, "Run completed", r->name, to_string(r->result));
time_t completedAt = time(nullptr);
@@ -731,7 +703,7 @@ void Laminar::runFinished(Run * r) {
std::string reason = r->reason();
db->stmt("INSERT INTO builds VALUES(?,?,?,?,?,?,?,?,?,?,?,?)")
.bind(r->name, r->build, node->name, r->queuedAt, r->startedAt, completedAt, int(r->result),
.bind(r->name, r->build, ctx->name, r->queuedAt, r->startedAt, completedAt, int(r->result),
maybeZipped, logsize, r->parentName, r->parentBuild, reason)
.exec();
@@ -746,11 +718,6 @@ void Laminar::runFinished(Run * r) {
.set("started", r->startedAt)
.set("result", to_string(r->result))
.set("reason", r->reason());
j.startArray("tags");
for(const str& t: jobTags[r->name]) {
j.String(t.c_str());
}
j.EndArray();
j.startArray("artifacts");
populateArtifacts(j, r->name, r->build);
j.EndArray();

View File

@@ -21,15 +21,15 @@
#include "run.h"
#include "monitorscope.h"
#include "node.h"
#include "context.h"
#include "database.h"
#include <unordered_map>
#include <kj/filesystem.h>
#include <kj/async-io.h>
// Node name to node object map
typedef std::unordered_map<std::string, std::shared_ptr<Node>> NodeMap;
// Context name to context object map
typedef std::unordered_map<std::string, std::shared_ptr<Context>> ContextMap;
struct Server;
class Json;
@@ -107,7 +107,6 @@ private:
bool tryStartRun(std::shared_ptr<Run> run, int queueIndex);
kj::Promise<void> handleRunStep(Run *run);
void runFinished(Run*);
bool nodeCanQueue(const Node&, std::string jobName) const;
// expects that Json has started an array
void populateArtifacts(Json& out, std::string job, uint num) const;
@@ -120,13 +119,15 @@ private:
std::unordered_map<std::string, uint> buildNums;
std::unordered_map<std::string, std::set<std::string>> jobTags;
std::unordered_map<std::string, std::set<std::string>> jobContexts;
std::unordered_map<std::string, std::string> jobGroups;
Settings settings;
RunSet activeJobs;
Database* db;
Server& srv;
NodeMap nodes;
ContextMap contexts;
kj::Path homePath;
kj::Own<const kj::Directory> fsHome;
uint numKeepRunDirs;

View File

@@ -207,8 +207,8 @@
<input class="form-control" id="jobFilter" v-model="search" placeholder="Filter...">
</div>
<ul class="nav nav-tabs">
<li :class="{'active':tag==null}"><a href v-on:click.prevent="tag = null">All Jobs</a></li>
<li v-for="t in tags" :class="{'active':t==tag}"><a href v-on:click.prevent="tag = t">{{t}}</a></li>
<li v-show="ungrouped.length" :class="{'active':group==null}"><a href v-on:click.prevent="group = null">Ungrouped Jobs</a></li>
<li v-for="g in Object.keys(groups)" :class="{'active':g==group}"><a href v-on:click.prevent="group = g">{{g}}</a></li>
</ul>
<table class="table table-striped" id="joblist">
<tr v-for="job in filteredJobs()">

View File

@@ -415,8 +415,9 @@ const Jobs = function() {
var state = {
jobs: [],
search: '',
tags: [],
tag: null
groups: {},
group: null,
ungrouped: []
};
return {
template: '#jobs',
@@ -437,13 +438,10 @@ const Jobs = function() {
state.jobs.sort(function(a, b){return a.name < b.name ? -1 : a.name > b.name ? 1 : 0;});
}
}
var tags = {};
for (var i in state.jobs) {
for (var j in state.jobs[i].tags) {
tags[state.jobs[i].tags[j]] = true;
}
}
state.tags = Object.keys(tags);
state.groups = {};
Object.keys(msg.groups).map(k => state.groups[k] = new RegExp(msg.groups[k]));
state.ungrouped = state.jobs.filter(j => !Object.values(state.groups).some(r => r.test(j.name))).map(j => j.name);
state.group = state.ungrouped.length ? null : Object.keys(state.groups)[0];
},
job_started: function(data) {
var updAt = null;
@@ -470,6 +468,8 @@ const Jobs = function() {
// first execution of new job. TODO insert without resort
state.jobs.unshift(data);
state.jobs.sort(function(a, b){return a.name < b.name ? -1 : a.name > b.name ? 1 : 0;});
if(!Object.values(state.groups).some(r => r.test(data.name)))
state.ungrouped.push(data.name);
} else {
state.jobs[updAt] = data;
}
@@ -492,19 +492,13 @@ const Jobs = function() {
}
},
filteredJobs: function() {
var ret = state.jobs;
var tag = state.tag;
if (tag) {
ret = ret.filter(function(job) {
return job.tags.indexOf(tag) >= 0;
});
}
var search = this.search;
if (search) {
ret = ret.filter(function(job) {
return job.name.indexOf(search) > -1;
});
}
let ret = [];
if (state.group)
ret = state.jobs.filter(job => state.groups[state.group].test(job.name));
else
ret = state.jobs.filter(job => state.ungrouped.includes(job.name));
if (this.search)
ret = ret.filter(job => job.name.indexOf(this.search) > -1);
return ret;
},
}

View File

@@ -17,7 +17,7 @@
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "run.h"
#include "node.h"
#include "context.h"
#include "conf.h"
#include "log.h"
@@ -75,7 +75,7 @@ Run::~Run() {
LLOG(INFO, "Run destroyed");
}
bool Run::configure(uint buildNum, std::shared_ptr<Node> nd, const kj::Directory& fsHome)
bool Run::configure(uint buildNum, std::shared_ptr<Context> nd, const kj::Directory& fsHome)
{
kj::Path cfgDir{"cfg"};
@@ -117,9 +117,6 @@ bool Run::configure(uint buildNum, std::shared_ptr<Node> nd, const kj::Directory
// global before-run script
if(fsHome.exists(cfgDir/"before"))
addScript(cfgDir/"before", rd.clone());
// per-node before-run script
if(fsHome.exists(cfgDir/"nodes"/(nd->name+".before")))
addScript(cfgDir/"nodes"/(nd->name+".before"), rd.clone());
// job before-run script
if(fsHome.exists(cfgDir/"jobs"/(name+".before")))
addScript(cfgDir/"jobs"/(name+".before"), rd.clone());
@@ -128,9 +125,6 @@ bool Run::configure(uint buildNum, std::shared_ptr<Node> nd, const kj::Directory
// job after-run script
if(fsHome.exists(cfgDir/"jobs"/(name+".after")))
addScript(cfgDir/"jobs"/(name+".after"), rd.clone(), true);
// per-node after-run script
if(fsHome.exists(cfgDir/"nodes"/(nd->name+".after")))
addScript(cfgDir/"nodes"/(nd->name+".after"), rd.clone(), true);
// global after-run script
if(fsHome.exists(cfgDir/"after"))
addScript(cfgDir/"after", rd.clone(), true);
@@ -138,8 +132,8 @@ bool Run::configure(uint buildNum, std::shared_ptr<Node> nd, const kj::Directory
// add environment files
if(fsHome.exists(cfgDir/"env"))
addEnv(cfgDir/"env");
if(fsHome.exists(cfgDir/"nodes"/(nd->name+".env")))
addEnv(cfgDir/"nodes"/(nd->name+".env"));
if(fsHome.exists(cfgDir/"contexts"/(nd->name+".env")))
addEnv(cfgDir/"contexts"/(nd->name+".env"));
if(fsHome.exists(cfgDir/"jobs"/(name+".env")))
addEnv(cfgDir/"jobs"/(name+".env"));
@@ -151,7 +145,7 @@ bool Run::configure(uint buildNum, std::shared_ptr<Node> nd, const kj::Directory
// All good, we've "started"
startedAt = time(nullptr);
build = buildNum;
node = nd;
context = nd;
// notifies the rpc client if the start command was used
started.fulfiller->fulfill();
@@ -212,8 +206,7 @@ bool Run::step() {
setenv("PATH", PATH.c_str(), true);
setenv("RUN", buildNum.c_str(), true);
setenv("JOB", name.c_str(), true);
if(!node->name.empty())
setenv("NODE", node->name.c_str(), true);
setenv("CONTEXT", context->name.c_str(), true);
setenv("RESULT", to_string(result).c_str(), true);
setenv("LAST_RESULT", to_string(lastResult).c_str(), true);
setenv("WORKSPACE", (rootPath/"run"/name/"workspace").toString(true).cStr(), true);

View File

@@ -43,7 +43,7 @@ enum class RunState {
std::string to_string(const RunState& rs);
class Node;
class Context;
typedef std::unordered_map<std::string, std::string> ParamMap;
@@ -57,8 +57,8 @@ public:
Run(const Run&) = delete;
Run& operator=(const Run&) = delete;
// Call this to "start" the run with a specific number and node
bool configure(uint buildNum, std::shared_ptr<Node> node, const kj::Directory &fsHome);
// Call this to "start" the run with a specific number and context
bool configure(uint buildNum, std::shared_ptr<Context> context, const kj::Directory &fsHome);
// executes the next script (if any), returning true if there is nothing
// more to be done.
@@ -76,7 +76,7 @@ public:
kj::Promise<void>&& whenStarted() { return kj::mv(started.promise); }
kj::Promise<RunState>&& whenFinished() { return kj::mv(finished.promise); }
std::shared_ptr<Node> node;
std::shared_ptr<Context> context;
RunState result;
RunState lastResult;
std::string name;