1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2026-03-02 03:40:21 +00:00

assign run numbers at queue time

This allows build chains to be traced in the common case where
an upstream job calls `laminarc queue' instead of `laminarc start'.
Incomplete job runs now have database entries, which requires
some adjustments in queries. Queued jobs can now be viewed in
the frontend and there is a corresponding status icon.
This commit is contained in:
Oliver Giles
2020-09-25 15:29:30 +12:00
parent 6d2c0b208b
commit 06a5f3d8ef
10 changed files with 119 additions and 112 deletions

View File

@@ -147,7 +147,8 @@ int main(int argc, char** argv) {
if(resp.getResult() != LaminarCi::MethodResult::SUCCESS) {
fprintf(stderr, "Failed to queue job '%s'\n", argv[jobNameIndex]);
ret = EXIT_OPERATION_FAILED;
}
} else
printTriggerLink(argv[jobNameIndex], resp.getBuildNum());
}));
jobNameIndex += n + 1;
} while(jobNameIndex < argc);

View File

@@ -2,7 +2,7 @@
interface LaminarCi {
queue @0 (jobName :Text, params :List(JobParam)) -> (result :MethodResult);
queue @0 (jobName :Text, params :List(JobParam)) -> (result :MethodResult, buildNum :UInt32);
start @1 (jobName :Text, params :List(JobParam)) -> (result :MethodResult, buildNum :UInt32);
run @2 (jobName :Text, params :List(JobParam)) -> (result :JobResult, buildNum :UInt32);
listQueued @3 () -> (result :List(Text));

View File

@@ -136,18 +136,9 @@ void Laminar::loadCustomizations() {
}
uint Laminar::latestRun(std::string job) {
auto it = activeJobs.byJobName().equal_range(job);
if(it.first == it.second) {
uint result = 0;
db->stmt("SELECT MAX(number) FROM builds WHERE name = ?")
.bind(job)
.fetch<uint>([&](uint x){
result = x;
});
return result;
} else {
return (*--it.second)->build;
}
if(auto it = buildNums.find(job); it != buildNums.end())
return it->second;
return 0;
}
bool Laminar::handleLogRequest(std::string name, uint num, std::string& output, bool& complete) {
@@ -231,30 +222,24 @@ std::string Laminar::getStatus(MonitorScope scope) {
j.set("time", time(nullptr));
j.startObject("data");
if(scope.type == MonitorScope::RUN) {
db->stmt("SELECT queuedAt,startedAt,completedAt,result,reason,parentJob,parentBuild FROM builds WHERE name = ? AND number = ?")
db->stmt("SELECT queuedAt,startedAt,completedAt,result,reason,parentJob,parentBuild,q.lr IS NOT NULL,q.lr FROM builds "
"LEFT JOIN (SELECT name n, MAX(number), completedAt-startedAt lr FROM builds WHERE result IS NOT NULL GROUP BY n) q ON q.n = name "
"WHERE name = ? AND number = ?")
.bind(scope.job, scope.num)
.fetch<time_t, time_t, time_t, int, std::string, std::string, uint>([&](time_t queued, time_t started, time_t completed, int result, std::string reason, std::string parentJob, uint parentBuild) {
j.set("queued", started-queued);
.fetch<time_t, time_t, time_t, int, std::string, std::string, uint, uint, uint>([&](time_t queued, time_t started, time_t completed, int result, std::string reason, std::string parentJob, uint parentBuild, uint lastRuntimeKnown, uint lastRuntime) {
j.set("queued", queued);
j.set("started", started);
j.set("completed", completed);
j.set("result", to_string(RunState(result)));
if(completed)
j.set("completed", completed);
j.set("result", to_string(completed ? RunState(result) : started ? RunState::RUNNING : RunState::QUEUED));
j.set("reason", reason);
j.startObject("upstream").set("name", parentJob).set("num", parentBuild).EndObject(2);
if(lastRuntimeKnown)
j.set("etc", started + lastRuntime);
});
if(const Run* run = activeRun(scope.job, scope.num)) {
j.set("queued", run->startedAt - run->queuedAt);
j.set("started", run->startedAt);
j.set("result", to_string(RunState::RUNNING));
j.set("reason", run->reason());
j.startObject("upstream").set("name", run->parentName).set("num", run->parentBuild).EndObject(2);
db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
.bind(run->name)
.fetch<uint>([&](uint lastRuntime){
j.set("etc", run->startedAt + lastRuntime);
});
}
if(auto it = buildNums.find(scope.job); it != buildNums.end())
j.set("latestNum", int(it->second));
j.startArray("artifacts");
populateArtifacts(j, scope.job, scope.num);
j.EndArray();
@@ -274,7 +259,8 @@ std::string Laminar::getStatus(MonitorScope scope) {
order_by = "(completedAt-startedAt) " + direction + ", number DESC";
else
order_by = "number DESC";
std::string stmt = "SELECT number,startedAt,completedAt,result,reason FROM builds WHERE name = ? ORDER BY "
std::string stmt = "SELECT number,startedAt,completedAt,result,reason FROM builds "
"WHERE name = ? AND result IS NOT NULL ORDER BY "
+ order_by + " LIMIT ?,?";
db->stmt(stmt.c_str())
.bind(scope.job, scope.page * runsPerPage, runsPerPage)
@@ -288,7 +274,7 @@ std::string Laminar::getStatus(MonitorScope scope) {
.EndObject();
});
j.EndArray();
db->stmt("SELECT COUNT(*),AVG(completedAt-startedAt) FROM builds WHERE name = ?")
db->stmt("SELECT COUNT(*),AVG(completedAt-startedAt) FROM builds WHERE name = ? AND result IS NOT NULL")
.bind(scope.job)
.fetch<uint,uint>([&](uint nRuns, uint averageRuntime){
j.set("averageRuntime", averageRuntime);
@@ -319,14 +305,17 @@ std::string Laminar::getStatus(MonitorScope scope) {
}
}
j.set("nQueued", nQueued);
db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result = ? ORDER BY completedAt DESC LIMIT 1")
db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result = ? "
"ORDER BY completedAt DESC LIMIT 1")
.bind(scope.job, int(RunState::SUCCESS))
.fetch<int,time_t>([&](int build, time_t started){
j.startObject("lastSuccess");
j.set("number", build).set("started", started);
j.EndObject();
});
db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result <> ? ORDER BY completedAt DESC LIMIT 1")
db->stmt("SELECT number,startedAt FROM builds "
"WHERE name = ? AND result <> ? "
"ORDER BY completedAt DESC LIMIT 1")
.bind(scope.job, int(RunState::SUCCESS))
.fetch<int,time_t>([&](int build, time_t started){
j.startObject("lastFailed");
@@ -337,7 +326,9 @@ std::string Laminar::getStatus(MonitorScope scope) {
j.set("description", desc == jobDescriptions.end() ? "" : desc->second);
} else if(scope.type == MonitorScope::ALL) {
j.startArray("jobs");
db->stmt("SELECT name,number,startedAt,completedAt,result FROM builds b JOIN (SELECT name n,MAX(number) l FROM builds GROUP BY n) q ON b.name = q.n AND b.number = q.l")
db->stmt("SELECT name,number,startedAt,completedAt,result FROM builds b "
"JOIN (SELECT name n,MAX(number) latest FROM builds WHERE result IS NOT NULL GROUP BY n) q "
"ON b.name = q.n AND b.number = latest")
.fetch<str,uint,time_t,time_t,int>([&](str name,uint number, time_t started, time_t completed, int result){
j.StartObject();
j.set("name", name);
@@ -364,7 +355,7 @@ std::string Laminar::getStatus(MonitorScope scope) {
j.EndObject();
} else { // Home page
j.startArray("recent");
db->stmt("SELECT * FROM builds ORDER BY completedAt DESC LIMIT 20")
db->stmt("SELECT * FROM builds WHERE completedAt IS NOT NULL ORDER BY completedAt DESC LIMIT 20")
.fetch<str,uint,str,time_t,time_t,time_t,int>([&](str name,uint build,str context,time_t,time_t started,time_t completed,int result){
j.StartObject();
j.set("name", name)
@@ -383,7 +374,9 @@ std::string Laminar::getStatus(MonitorScope scope) {
j.set("number", run->build);
j.set("context", run->context->name);
j.set("started", run->startedAt);
db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
db->stmt("SELECT completedAt - startedAt FROM builds "
"WHERE completedAt IS NOT NULL AND name = ? "
"ORDER BY completedAt DESC LIMIT 1")
.bind(run->name)
.fetch<uint>([&](uint lastRuntime){
j.set("etc", run->startedAt + lastRuntime);
@@ -586,14 +579,19 @@ std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) {
if(jobContexts[name].empty())
jobContexts.at(name).insert("default");
std::shared_ptr<Run> run = std::make_shared<Run>(name, kj::mv(params), homePath.clone());
std::shared_ptr<Run> run = std::make_shared<Run>(name, ++buildNums[name], kj::mv(params), homePath.clone());
queuedJobs.push_back(run);
db->stmt("INSERT INTO builds(name,number,queuedAt,parentJob,parentBuild,reason) VALUES(?,?,?,?,?,?)")
.bind(run->name, run->build, run->queuedAt, run->parentName, run->parentBuild, run->reason())
.exec();
// notify clients
Json j;
j.set("type", "job_queued")
.startObject("data")
.set("name", name)
.set("number", run->build)
.EndObject();
http->notifyEvent(j.str(), name.c_str());
@@ -620,14 +618,19 @@ bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
if(ctx->canQueue(jobContexts.at(run->name))) {
RunState lastResult = RunState::UNKNOWN;
// set the last known result if exists
// set the last known result if exists. Runs which haven't started yet should
// have completedAt == NULL and thus be at the end of a DESC ordered query
db->stmt("SELECT result FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
.bind(run->name)
.fetch<int>([&](int result){
lastResult = RunState(result);
});
kj::Promise<RunState> onRunFinished = run->start(buildNums[run->name] + 1, lastResult, ctx, *fsHome,[this](kj::Maybe<pid_t>& pid){return srv.onChildExit(pid);});
kj::Promise<RunState> onRunFinished = run->start(lastResult, ctx, *fsHome,[this](kj::Maybe<pid_t>& pid){return srv.onChildExit(pid);});
db->stmt("UPDATE builds SET node = ?, startedAt = ? WHERE name = ? AND number = ?")
.bind(ctx->name, run->startedAt, run->name, run->build)
.exec();
ctx->busyExecutors++;
@@ -650,16 +653,13 @@ bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
srv.addTask(kj::mv(exec));
LLOG(INFO, "Started job", run->name, run->build, ctx->name);
// update next build number
buildNums[run->name]++;
// notify clients
Json j;
j.set("type", "job_started")
.startObject("data")
.set("queueIndex", queueIndex)
.set("name", run->name)
.set("queued", run->startedAt - run->queuedAt)
.set("queued", run->queuedAt)
.set("started", run->startedAt)
.set("number", run->build)
.set("reason", run->reason());
@@ -708,10 +708,8 @@ void Laminar::handleRunFinished(Run * r) {
}
}
std::string reason = r->reason();
db->stmt("INSERT INTO builds VALUES(?,?,?,?,?,?,?,?,?,?,?,?)")
.bind(r->name, r->build, ctx->name, r->queuedAt, r->startedAt, completedAt, int(r->result),
maybeZipped, logsize, r->parentName, r->parentBuild, reason)
db->stmt("UPDATE builds SET completedAt = ?, result = ?, output = ?, outputLen = ? WHERE name = ? AND number = ?")
.bind(completedAt, int(r->result), maybeZipped, logsize, r->name, r->build)
.exec();
// notify clients
@@ -720,7 +718,7 @@ void Laminar::handleRunFinished(Run * r) {
.startObject("data")
.set("name", r->name)
.set("number", r->build)
.set("queued", r->startedAt - r->queuedAt)
.set("queued", r->queuedAt)
.set("completed", completedAt)
.set("started", r->startedAt)
.set("result", to_string(r->result))

View File

@@ -118,7 +118,8 @@
<th class="text-center vp-sm-hide">Reason <a class="sort" :class="(sort.field=='reason'?sort.order:'')" v-on:click="do_sort('reason')">&nbsp;</a></th>
</tr></thead>
<tr v-show="nQueued">
<td colspan="5"><i>{{nQueued}} run(s) queued</i></td>
<td style="width:1px"><span v-html="runIcon('queued')"></span></td>
<td colspan="4"><i>{{nQueued}} run(s) queued</i></td>
</tr>
<tr v-for="job in jobsRunning.concat(jobsRecent)" track-by="$index">
<td style="width:1px"><span v-html="runIcon(job.result)"></span></td>
@@ -152,10 +153,10 @@
<dl>
<dt>Reason</dt><dd>{{job.reason}}</dd>
<dt v-show="job.upstream.num > 0">Upstream</dt><dd v-show="job.upstream.num > 0"><router-link :to="'/jobs/'+job.upstream.name">{{job.upstream.name}}</router-link> <router-link :to="'/jobs/'+job.upstream.name+'/'+job.upstream.num">#{{job.upstream.num}}</router-link></li></dd>
<dt>Queued for</dt><dd>{{job.queued}}s</dd>
<dt>Started</dt><dd>{{formatDate(job.started)}}</dd>
<dt>Queued for</dt><dd>{{formatDuration(job.queued, job.started ? job.started : Math.floor(Date.now()/1000))}}</dd>
<dt v-show="job.started">Started</dt><dd v-show="job.started">{{formatDate(job.started)}}</dd>
<dt v-show="runComplete(job)">Completed</dt><dd v-show="job.completed">{{formatDate(job.completed)}}</dd>
<dt>Duration</dt><dd>{{formatDuration(job.started, job.completed)}}</dd>
<dt v-show="job.started">Duration</dt><dd v-show="job.started">{{formatDuration(job.started, job.completed)}}</dd>
</dl>
<dl v-show="job.artifacts.length">
<dt>Artifacts</dt>

View File

@@ -23,10 +23,10 @@ const timeScale = function(max){
: { scale:function(v){return v;}, label:'Seconds' };
}
const ServerEventHandler = function() {
function setupEventSource(path, query, next, comp) {
const es = new EventSource(document.head.baseURI + path.substr(1) + query);
function setupEventSource(to, query, next, comp) {
const es = new EventSource(document.head.baseURI + to.path.substr(1) + query);
es.comp = comp;
es.path = path; // save for later in case we need to add query params
es.path = to.path; // save for later in case we need to add query params
es.onmessage = function(msg) {
msg = JSON.parse(msg.data);
// "status" is the first message the server always delivers.
@@ -55,7 +55,7 @@ const ServerEventHandler = function() {
comp.$root.clockSkew = msg.time - Math.floor((new Date()).getTime()/1000);
comp.$root.connected = true;
// Component-specific callback handler
comp[msg.type](msg.data);
comp[msg.type](msg.data, to.params);
});
} else {
// at this point, the component must be defined
@@ -72,7 +72,7 @@ const ServerEventHandler = function() {
es.onerror = function(e) {
this.comp.$root.connected = false;
setTimeout(() => {
this.comp.es = setupEventSource(path, query, null, this.comp);
this.comp.es = setupEventSource(to, query, null, this.comp);
}, this.comp.esReconnectInterval);
if(this.comp.esReconnectInterval < 7500)
this.comp.esReconnectInterval *= 1.5;
@@ -82,11 +82,11 @@ const ServerEventHandler = function() {
}
return {
beforeRouteEnter(to, from, next) {
setupEventSource(to.path, '', (fn) => { next(fn); });
setupEventSource(to, '', (fn) => { next(fn); });
},
beforeRouteUpdate(to, from, next) {
this.es.close();
setupEventSource(to.path, '', (fn) => { fn(this); next(); });
setupEventSource(to, '', (fn) => { fn(this); next(); });
},
beforeRouteLeave(to, from, next) {
this.es.close();
@@ -115,6 +115,11 @@ const Utils = {
21,-26 5,5 11,15 15,20 8,-2 15,-9 20,-15 -3,-3 -17,-18 -20,-24 3,-5 23,-26 30,-33 -3,-5 -8,-9
-12,-12 -6,5 -26,26 -29,30 -6,-8 -11,-15 -15,-23 -3,0 -12,5 -15,7 z" />
</svg>`
: (result == 'queued') ? /* clock */
`<svg class="status queued" viewBox="0 0 100 100">
<circle r="50" cy="50" cx="50" />
<path d="m 50,15 0,35 17,17" stroke-width="10" fill="none" />
</svg>`
: /* spinner */
`<svg class="status running" viewBox="0 0 100 100">
<circle cx="50" cy="50" r="40" stroke-width="15" fill="none" stroke-dasharray="175">
@@ -656,49 +661,46 @@ const Run = function() {
return state;
},
methods: {
status: function(data) {
// Check for the /latest endpoint. An intuitive check might be
// if(this.$route.params.number == 'latest'), but unfortunately
// after calling $router.replace, we re-enter status() before
// $route.params is updated. Instead, assume that if there is
// no 'started' field, we should redirect to the latest number
if(!('started' in data) && 'latestNum' in data)
return this.$router.replace('/jobs/' + this.$route.params.name + '/' + data.latestNum);
status: function(data, params) {
// Check for the /latest endpoint
if(params.number === 'latest')
return this.$router.replace('/jobs/' + params.name + '/' + data.latestNum);
state.number = parseInt(params.number);
state.jobsRunning = [];
state.job = data;
state.latestNum = data.latestNum;
state.jobsRunning = [data];
state.log = '';
if(this.logstream)
this.logstream.abort();
if(data.started)
this.logstream = logFetcher(this, params.name, params.number);
},
job_queued: function(data) {
state.latestNum = data.number;
this.$forceUpdate();
},
job_started: function(data) {
state.latestNum++;
this.$forceUpdate();
if(data.number === state.number) {
state.job = Object.assign(state.job, data);
state.job.result = 'running';
if(this.logstream)
this.logstream.abort();
this.logstream = logFetcher(this, data.name, data.number);
this.$forceUpdate();
}
},
job_completed: function(data) {
state.job = Object.assign(state.job, data);
state.jobsRunning = [];
this.$forceUpdate();
if(data.number === state.number) {
state.job = Object.assign(state.job, data);
state.jobsRunning = [];
this.$forceUpdate();
}
},
runComplete: function(run) {
return !!run && (run.result === 'aborted' || run.result === 'failed' || run.result === 'success');
},
},
beforeRouteEnter(to, from, next) {
next(vm => {
state.log = '';
vm.logstream = logFetcher(vm, to.params.name, to.params.number);
});
},
beforeRouteUpdate(to, from, next) {
var vm = this;
vm.logstream.abort();
state.log = '';
vm.logstream = logFetcher(vm, to.params.name, to.params.number);
next();
},
beforeRouteLeave(to, from, next) {
this.logstream.abort();
next();
}
};
}();

View File

@@ -84,7 +84,9 @@ canvas {
}
svg.success path { fill: var(--success); }
svg.failed path { fill: var(--failure); }
svg.running circle { stroke: var(--running); }
svg.running circle { stroke: var(--running); }
svg.queued circle { fill: var(--nav-fg); }
svg.queued path { stroke: white; }
/* sort indicators */
a.sort {

View File

@@ -53,10 +53,13 @@ public:
kj::Promise<void> queue(QueueContext context) override {
std::string jobName = context.getParams().getJobName();
LLOG(INFO, "RPC queue", jobName);
LaminarCi::MethodResult result = laminar.queueJob(jobName, params(context.getParams().getParams()))
? LaminarCi::MethodResult::SUCCESS
: LaminarCi::MethodResult::FAILED;
context.getResults().setResult(result);
std::shared_ptr<Run> run = laminar.queueJob(jobName, params(context.getParams().getParams()));
if(Run* r = run.get()) {
context.getResults().setResult(LaminarCi::MethodResult::SUCCESS);
context.getResults().setBuildNum(r->build);
} else {
context.getResults().setResult(LaminarCi::MethodResult::FAILED);
}
return kj::READY_NOW;
}

View File

@@ -33,7 +33,7 @@ inline kj::Path operator/(const kj::Path& p, const T& ext) {
std::string to_string(const RunState& rs) {
switch(rs) {
case RunState::PENDING: return "pending";
case RunState::QUEUED: return "queued";
case RunState::RUNNING: return "running";
case RunState::ABORTED: return "aborted";
case RunState::FAILED: return "failed";
@@ -44,9 +44,10 @@ std::string to_string(const RunState& rs) {
}
Run::Run(std::string name, ParamMap pm, kj::Path&& rootPath) :
Run::Run(std::string name, uint num, ParamMap pm, kj::Path&& rootPath) :
result(RunState::SUCCESS),
name(name),
build(num),
params(kj::mv(pm)),
queuedAt(time(nullptr)),
rootPath(kj::mv(rootPath)),
@@ -83,7 +84,7 @@ static void setEnvFromFile(const kj::Path& rootPath, kj::Path file) {
}
}
kj::Promise<RunState> Run::start(uint buildNum, RunState lastResult, std::shared_ptr<Context> ctx, const kj::Directory &fsHome, std::function<kj::Promise<int>(kj::Maybe<pid_t>&)> getPromise)
kj::Promise<RunState> Run::start(RunState lastResult, std::shared_ptr<Context> ctx, const kj::Directory &fsHome, std::function<kj::Promise<int>(kj::Maybe<pid_t>&)> getPromise)
{
kj::Path cfgDir{"cfg"};
@@ -130,7 +131,7 @@ kj::Promise<RunState> Run::start(uint buildNum, RunState lastResult, std::shared
PATH.append(p);
}
std::string runNumStr = std::to_string(buildNum);
std::string runNumStr = std::to_string(build);
setenv("PATH", PATH.c_str(), true);
setenv("RUN", runNumStr.c_str(), true);
@@ -151,14 +152,13 @@ kj::Promise<RunState> Run::start(uint buildNum, RunState lastResult, std::shared
// enough. Instead, we'll just exec ourselves and handle that in laminard's
// main() by calling leader_main()
char* procName;
if(asprintf(&procName, "{laminar} %s:%d", name.data(), buildNum) > 0)
if(asprintf(&procName, "{laminar} %s:%d", name.data(), build) > 0)
execl("/proc/self/exe", procName, NULL); // does not return
_exit(EXIT_FAILURE);
}
// All good, we've "started"
startedAt = time(nullptr);
build = buildNum;
context = ctx;
output_fd = plog[0];

View File

@@ -34,7 +34,7 @@ typedef unsigned int uint;
enum class RunState {
UNKNOWN,
PENDING,
QUEUED,
RUNNING,
ABORTED,
FAILED,
@@ -50,14 +50,14 @@ typedef std::unordered_map<std::string, std::string> ParamMap;
// Represents an execution of a job.
class Run {
public:
Run(std::string name, ParamMap params, kj::Path&& rootPath);
Run(std::string name, uint num, ParamMap params, kj::Path&& rootPath);
~Run();
// copying this class would be asking for trouble...
Run(const Run&) = delete;
Run& operator=(const Run&) = delete;
kj::Promise<RunState> start(uint buildNum, RunState lastResult, std::shared_ptr<Context> ctx, const kj::Directory &fsHome, std::function<kj::Promise<int>(kj::Maybe<pid_t>&)> getPromise);
kj::Promise<RunState> start(RunState lastResult, std::shared_ptr<Context> ctx, const kj::Directory &fsHome, std::function<kj::Promise<int>(kj::Maybe<pid_t>&)> getPromise);
// aborts this run
bool abort();