allow adding job to front of queue

laminarc now supports {queue,start,run} --next to place the job at the
front of the queue instead of at the end.

resolves #162
pull/167/head
Oliver Giles 2 years ago
parent e581a0cf5d
commit 41ddd8fe4f

@ -713,6 +713,7 @@ Finally, variables supplied on the command-line call to `laminarc queue`, `lamin
- `queue [JOB [PARAMS...]]...` adds one or more jobs to the queue with optional parameters, returning immediately. - `queue [JOB [PARAMS...]]...` adds one or more jobs to the queue with optional parameters, returning immediately.
- `start [JOB [PARAMS...]]...` starts one or more jobs with optional parameters, returning when the jobs begin execution. - `start [JOB [PARAMS...]]...` starts one or more jobs with optional parameters, returning when the jobs begin execution.
- `run [JOB [PARAMS...]]...` triggers one or more jobs with optional parameters and waits for the completion of all jobs. - `run [JOB [PARAMS...]]...` triggers one or more jobs with optional parameters and waits for the completion of all jobs.
- `--next` may be passed before `JOB` in order to place the job at the front of the queue instead of at the end.
- `set [VARIABLE=VALUE]...` sets one or more variables to be exported in subsequent scripts for the run identified by the `$JOB` and `$RUN` environment variables - `set [VARIABLE=VALUE]...` sets one or more variables to be exported in subsequent scripts for the run identified by the `$JOB` and `$RUN` environment variables
- `show-jobs` shows the known jobs on the server (`$LAMINAR_HOME/cfg/jobs/*.run`). - `show-jobs` shows the known jobs on the server (`$LAMINAR_HOME/cfg/jobs/*.run`).
- `show-running` shows the currently running jobs with their numbers. - `show-running` shows the currently running jobs with their numbers.

@ -27,6 +27,9 @@ begin execution.
adds job(s) (with optional parameters) to the queue and returns when the jobs adds job(s) (with optional parameters) to the queue and returns when the jobs
complete execution. The exit code will be non-zero if any of the runs does complete execution. The exit code will be non-zero if any of the runs does
not complete successfully. not complete successfully.
.It \t
\fB--next\fR may be passed to \fBqueue\fR, \fBstart\fR or \fBrun\fR in order
to place the job at the front of the queue instead of at the end.
.It Sy set .It Sy set
sets one or more parameters to be exported as environment variables in subsequent sets one or more parameters to be exported as environment variables in subsequent
scripts for the run identified by the $JOB and $RUN environment variables. scripts for the run identified by the $JOB and $RUN environment variables.

@ -1,5 +1,5 @@
/// ///
/// Copyright 2015-2021 Oliver Giles /// Copyright 2015-2022 Oliver Giles
/// ///
/// This file is part of Laminar /// This file is part of Laminar
/// ///
@ -87,12 +87,14 @@ static void printTriggerLink(const char* job, uint run) {
static void usage(std::ostream& out) { static void usage(std::ostream& out) {
out << "laminarc version " << laminar_version() << "\n"; out << "laminarc version " << laminar_version() << "\n";
out << "Usage: laminarc [-h|--help] COMMAND [PARAMETERS...]]\n"; out << "Usage: laminarc [-h|--help] COMMAND\n";
out << " -h|--help show this help message\n"; out << " -h|--help show this help message\n";
out << "where COMMAND is:\n"; out << "where COMMAND is:\n";
out << " queue JOB_LIST... queues one or more jobs for execution and returns immediately.\n"; out << " queue JOB_LIST... queues one or more jobs for execution and returns immediately.\n";
out << " start JOB_LIST... queues one or more jobs for execution and blocks until it starts.\n"; out << " start JOB_LIST... queues one or more jobs for execution and blocks until it starts.\n";
out << " run JOB_LIST... queues one or more jobs for execution and blocks until it finishes.\n"; out << " run JOB_LIST... queues one or more jobs for execution and blocks until it finishes.\n";
out << " JOB_LIST may be prepended with --next, in this case the job will\n";
out << " be pushed to the front of the queue instead of the end.\n";
out << " set PARAMETER_LIST... sets the given parameters as environment variables in the currently\n"; out << " set PARAMETER_LIST... sets the given parameters as environment variables in the currently\n";
out << " running job. Fails if run outside of a job context.\n"; out << " running job. Fails if run outside of a job context.\n";
out << " abort NAME NUMBER aborts the run identified by NAME and NUMBER.\n"; out << " abort NAME NUMBER aborts the run identified by NAME and NUMBER.\n";
@ -132,16 +134,25 @@ int main(int argc, char** argv) {
auto& waitScope = client.getWaitScope(); auto& waitScope = client.getWaitScope();
if(strcmp(argv[1], "queue") == 0) { int jobNameIndex = 2;
if(argc < 3) { bool frontOfQueue = false;
fprintf(stderr, "Usage %s queue <jobName>\n", argv[0]);
if(strcmp(argv[1], "queue") == 0 || strcmp(argv[1], "start") == 0 || strcmp(argv[1], "run") == 0) {
if(argc < 3 || (strcmp(argv[2], "--next") == 0 && argc < 4)) {
fprintf(stderr, "Usage %s %s JOB_LIST...\n", argv[0], argv[1]);
return EXIT_BAD_ARGUMENT; return EXIT_BAD_ARGUMENT;
} }
int jobNameIndex = 2; if(strcmp(argv[2], "--next") == 0) {
// make a request for each job specified on the commandline frontOfQueue = true;
jobNameIndex++;
}
}
if(strcmp(argv[1], "queue") == 0) {
do { do {
auto req = laminar.queueRequest(); auto req = laminar.queueRequest();
req.setJobName(argv[jobNameIndex]); req.setJobName(argv[jobNameIndex]);
req.setFrontOfQueue(frontOfQueue);
int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req); int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req);
ts.add(req.send().then([&ret,argv,jobNameIndex](capnp::Response<LaminarCi::QueueResults> resp){ ts.add(req.send().then([&ret,argv,jobNameIndex](capnp::Response<LaminarCi::QueueResults> resp){
if(resp.getResult() != LaminarCi::MethodResult::SUCCESS) { if(resp.getResult() != LaminarCi::MethodResult::SUCCESS) {
@ -153,16 +164,10 @@ int main(int argc, char** argv) {
jobNameIndex += n + 1; jobNameIndex += n + 1;
} while(jobNameIndex < argc); } while(jobNameIndex < argc);
} else if(strcmp(argv[1], "start") == 0) { } else if(strcmp(argv[1], "start") == 0) {
if(argc < 3) {
fprintf(stderr, "Usage %s queue <jobName>\n", argv[0]);
return EXIT_BAD_ARGUMENT;
}
kj::Vector<capnp::RemotePromise<LaminarCi::StartResults>> promises;
int jobNameIndex = 2;
// make a request for each job specified on the commandline
do { do {
auto req = laminar.startRequest(); auto req = laminar.startRequest();
req.setJobName(argv[jobNameIndex]); req.setJobName(argv[jobNameIndex]);
req.setFrontOfQueue(frontOfQueue);
int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req); int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req);
ts.add(req.send().then([&ret,argv,jobNameIndex](capnp::Response<LaminarCi::StartResults> resp){ ts.add(req.send().then([&ret,argv,jobNameIndex](capnp::Response<LaminarCi::StartResults> resp){
if(resp.getResult() != LaminarCi::MethodResult::SUCCESS) { if(resp.getResult() != LaminarCi::MethodResult::SUCCESS) {
@ -174,21 +179,16 @@ int main(int argc, char** argv) {
jobNameIndex += n + 1; jobNameIndex += n + 1;
} while(jobNameIndex < argc); } while(jobNameIndex < argc);
} else if(strcmp(argv[1], "run") == 0) { } else if(strcmp(argv[1], "run") == 0) {
if(argc < 3) {
fprintf(stderr, "Usage %s run <jobName>\n", argv[0]);
return EXIT_BAD_ARGUMENT;
}
int jobNameIndex = 2;
// make a request for each job specified on the commandline
do { do {
auto req = laminar.runRequest(); auto req = laminar.runRequest();
req.setJobName(argv[jobNameIndex]); req.setJobName(argv[jobNameIndex]);
req.setFrontOfQueue(frontOfQueue);
int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req); int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req);
ts.add(req.send().then([&ret,argv,jobNameIndex](capnp::Response<LaminarCi::RunResults> resp){ ts.add(req.send().then([&ret,argv,jobNameIndex](capnp::Response<LaminarCi::RunResults> resp){
if(resp.getResult() == LaminarCi::JobResult::UNKNOWN) if(resp.getResult() == LaminarCi::JobResult::UNKNOWN)
fprintf(stderr, "Failed to start job '%s'\n", argv[2]); fprintf(stderr, "Failed to start job '%s'\n", argv[2]);
else else
printTriggerLink(argv[jobNameIndex], resp.getBuildNum()); printTriggerLink(argv[jobNameIndex], resp.getBuildNum());
if(resp.getResult() != LaminarCi::JobResult::SUCCESS) if(resp.getResult() != LaminarCi::JobResult::SUCCESS)
ret = EXIT_RUN_FAILED; ret = EXIT_RUN_FAILED;
})); }));

@ -2,9 +2,9 @@
interface LaminarCi { interface LaminarCi {
queue @0 (jobName :Text, params :List(JobParam)) -> (result :MethodResult, buildNum :UInt32); queue @0 (jobName :Text, params :List(JobParam), frontOfQueue :Bool) -> (result :MethodResult, buildNum :UInt32);
start @1 (jobName :Text, params :List(JobParam)) -> (result :MethodResult, buildNum :UInt32); start @1 (jobName :Text, params :List(JobParam), frontOfQueue :Bool) -> (result :MethodResult, buildNum :UInt32);
run @2 (jobName :Text, params :List(JobParam)) -> (result :JobResult, buildNum :UInt32); run @2 (jobName :Text, params :List(JobParam), frontOfQueue :Bool) -> (result :JobResult, buildNum :UInt32);
listQueued @3 () -> (result :List(Run)); listQueued @3 () -> (result :List(Run));
listRunning @4 () -> (result :List(Run)); listRunning @4 () -> (result :List(Run));
listKnown @5 () -> (result :List(Text)); listKnown @5 () -> (result :List(Text));

@ -1,5 +1,5 @@
/// ///
/// Copyright 2015-2020 Oliver Giles /// Copyright 2015-2022 Oliver Giles
/// ///
/// This file is part of Laminar /// This file is part of Laminar
/// ///
@ -585,7 +585,7 @@ bool Laminar::loadConfiguration() {
return true; return true;
} }
std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) { std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params, bool frontOfQueue) {
if(!fsHome->exists(kj::Path{"cfg","jobs",name+".run"})) { if(!fsHome->exists(kj::Path{"cfg","jobs",name+".run"})) {
LLOG(ERROR, "Non-existent job", name); LLOG(ERROR, "Non-existent job", name);
return nullptr; return nullptr;
@ -596,7 +596,10 @@ std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) {
jobContexts.at(name).insert("default"); jobContexts.at(name).insert("default");
std::shared_ptr<Run> run = std::make_shared<Run>(name, ++buildNums[name], kj::mv(params), homePath.clone()); std::shared_ptr<Run> run = std::make_shared<Run>(name, ++buildNums[name], kj::mv(params), homePath.clone());
queuedJobs.push_back(run); if(frontOfQueue)
queuedJobs.push_front(run);
else
queuedJobs.push_back(run);
db->stmt("INSERT INTO builds(name,number,queuedAt,parentJob,parentBuild,reason) VALUES(?,?,?,?,?,?)") db->stmt("INSERT INTO builds(name,number,queuedAt,parentJob,parentBuild,reason) VALUES(?,?,?,?,?,?)")
.bind(run->name, run->build, run->queuedAt, run->parentName, run->parentBuild, run->reason()) .bind(run->name, run->build, run->queuedAt, run->parentName, run->parentBuild, run->reason())
@ -609,6 +612,7 @@ std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) {
.set("name", name) .set("name", name)
.set("number", run->build) .set("number", run->build)
.set("result", to_string(RunState::QUEUED)) .set("result", to_string(RunState::QUEUED))
.set("queueIndex", frontOfQueue ? 0 : (queuedJobs.size() - 1))
.set("reason", run->reason()) .set("reason", run->reason())
.EndObject(); .EndObject();
http->notifyEvent(j.str(), name.c_str()); http->notifyEvent(j.str(), name.c_str());

@ -1,5 +1,5 @@
/// ///
/// Copyright 2015-2020 Oliver Giles /// Copyright 2015-2022 Oliver Giles
/// ///
/// This file is part of Laminar /// This file is part of Laminar
/// ///
@ -52,7 +52,7 @@ public:
// Queues a job, returns immediately. Return value will be nullptr if // Queues a job, returns immediately. Return value will be nullptr if
// the supplied name is not a known job. // the supplied name is not a known job.
std::shared_ptr<Run> queueJob(std::string name, ParamMap params = ParamMap()); std::shared_ptr<Run> queueJob(std::string name, ParamMap params = ParamMap(), bool frontOfQueue = false);
// Return the latest known number of the named job // Return the latest known number of the named job
uint latestRun(std::string job); uint latestRun(std::string job);

@ -415,7 +415,7 @@ const Home = templateId => {
}); });
}, },
job_queued: function(data) { job_queued: function(data) {
state.jobsQueued.splice(0, 0, data); state.jobsQueued.splice(state.jobsQueued.length - data.queueIndex, 0, data);
this.$forceUpdate(); this.$forceUpdate();
}, },
job_started: function(data) { job_started: function(data) {
@ -594,7 +594,7 @@ const Job = templateId => {
}); });
}, },
job_queued: function(data) { job_queued: function(data) {
state.jobsQueued.splice(0, 0, data); state.jobsQueued.splice(state.jobsQueued.length - data.queueIndex, 0, data);
this.$forceUpdate(); this.$forceUpdate();
}, },
job_started: function(data) { job_started: function(data) {

@ -1,5 +1,5 @@
/// ///
/// Copyright 2015-2019 Oliver Giles /// Copyright 2015-2022 Oliver Giles
/// ///
/// This file is part of Laminar /// This file is part of Laminar
/// ///
@ -53,7 +53,7 @@ public:
kj::Promise<void> queue(QueueContext context) override { kj::Promise<void> queue(QueueContext context) override {
std::string jobName = context.getParams().getJobName(); std::string jobName = context.getParams().getJobName();
LLOG(INFO, "RPC queue", jobName); LLOG(INFO, "RPC queue", jobName);
std::shared_ptr<Run> run = laminar.queueJob(jobName, params(context.getParams().getParams())); std::shared_ptr<Run> run = laminar.queueJob(jobName, params(context.getParams().getParams()), context.getParams().getFrontOfQueue());
if(Run* r = run.get()) { if(Run* r = run.get()) {
context.getResults().setResult(LaminarCi::MethodResult::SUCCESS); context.getResults().setResult(LaminarCi::MethodResult::SUCCESS);
context.getResults().setBuildNum(r->build); context.getResults().setBuildNum(r->build);
@ -67,7 +67,7 @@ public:
kj::Promise<void> start(StartContext context) override { kj::Promise<void> start(StartContext context) override {
std::string jobName = context.getParams().getJobName(); std::string jobName = context.getParams().getJobName();
LLOG(INFO, "RPC start", jobName); LLOG(INFO, "RPC start", jobName);
std::shared_ptr<Run> run = laminar.queueJob(jobName, params(context.getParams().getParams())); std::shared_ptr<Run> run = laminar.queueJob(jobName, params(context.getParams().getParams()), context.getParams().getFrontOfQueue());
if(Run* r = run.get()) { if(Run* r = run.get()) {
return r->whenStarted().then([context,r]() mutable { return r->whenStarted().then([context,r]() mutable {
context.getResults().setResult(LaminarCi::MethodResult::SUCCESS); context.getResults().setResult(LaminarCi::MethodResult::SUCCESS);
@ -83,7 +83,7 @@ public:
kj::Promise<void> run(RunContext context) override { kj::Promise<void> run(RunContext context) override {
std::string jobName = context.getParams().getJobName(); std::string jobName = context.getParams().getJobName();
LLOG(INFO, "RPC run", jobName); LLOG(INFO, "RPC run", jobName);
std::shared_ptr<Run> run = laminar.queueJob(jobName, params(context.getParams().getParams())); std::shared_ptr<Run> run = laminar.queueJob(jobName, params(context.getParams().getParams()), context.getParams().getFrontOfQueue());
if(run) { if(run) {
return run->whenFinished().then([context,run](RunState state) mutable { return run->whenFinished().then([context,run](RunState state) mutable {
context.getResults().setResult(fromRunState(state)); context.getResults().setResult(fromRunState(state));

@ -1,5 +1,5 @@
/// ///
/// Copyright 2019 Oliver Giles /// Copyright 2019-2022 Oliver Giles
/// ///
/// This file is part of Laminar /// This file is part of Laminar
/// ///
@ -162,3 +162,27 @@ TEST_F(LaminarFixture, JobDescription) {
ASSERT_TRUE(data.HasMember("description")); ASSERT_TRUE(data.HasMember("description"));
EXPECT_STREQ("bar", data["description"].GetString()); EXPECT_STREQ("bar", data["description"].GetString());
} }
TEST_F(LaminarFixture, QueueFront) {
setNumExecutors(0);
defineJob("foo", "true");
defineJob("bar", "true");
auto es = eventSource("/");
auto req1 = client().queueRequest();
req1.setJobName("foo");
auto res1 = req1.send();
auto req2 = client().queueRequest();
req2.setFrontOfQueue(true);
req2.setJobName("bar");
auto res2 = req2.send();
ioContext->waitScope.poll();
setNumExecutors(2);
ioContext->waitScope.poll();
ASSERT_EQ(5, es->messages().size());
auto started1 = es->messages().at(3).GetObject();
EXPECT_STREQ("job_started", started1["type"].GetString());
EXPECT_STREQ("bar", started1["data"]["name"].GetString());
auto started2 = es->messages().at(4).GetObject();
EXPECT_STREQ("job_started", started2["type"].GetString());
EXPECT_STREQ("foo", started2["data"]["name"].GetString());
}

Loading…
Cancel
Save