1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2026-03-02 03:40:21 +00:00

resolves #36: queue/start/run

This commit is contained in:
Oliver Giles
2018-05-12 17:56:56 +03:00
parent 828b66682d
commit f1e4d10be3
7 changed files with 92 additions and 24 deletions

View File

@@ -84,16 +84,16 @@ int main(int argc, char** argv) {
auto& waitScope = client.getWaitScope();
if(strcmp(argv[1], "trigger") == 0) {
if(strcmp(argv[1], "queue") == 0) {
if(argc < 3) {
fprintf(stderr, "Usage %s trigger <jobName>\n", argv[0]);
fprintf(stderr, "Usage %s queue <jobName>\n", argv[0]);
return EINVAL;
}
kj::Vector<capnp::RemotePromise<LaminarCi::TriggerResults>> promises;
kj::Vector<capnp::RemotePromise<LaminarCi::QueueResults>> promises;
int jobNameIndex = 2;
// make a request for each job specified on the commandline
do {
auto req = laminar.triggerRequest();
auto req = laminar.queueRequest();
req.setJobName(argv[jobNameIndex]);
int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req);
promises.add(req.send());
@@ -106,14 +106,39 @@ int main(int argc, char** argv) {
return ENOENT;
}
}
} else if(strcmp(argv[1], "run") == 0 || strcmp(argv[1], "start") == 0) {
} else if(strcmp(argv[1], "start") == 0 || strcmp(argv[1], "trigger") == 0) {
if(strcmp(argv[1], "trigger") == 0)
fprintf(stderr, "Warning: 'trigger' is deprecated, use 'queue' for the old behavior\n");
if(argc < 3) {
fprintf(stderr, "Usage %s queue <jobName>\n", argv[0]);
return EINVAL;
}
kj::Vector<capnp::RemotePromise<LaminarCi::StartResults>> promises;
struct: public kj::TaskSet::ErrorHandler {
void taskFailed(kj::Exception&&) override {}
} ignoreFailed;
kj::TaskSet ts(ignoreFailed);
int jobNameIndex = 2;
// make a request for each job specified on the commandline
do {
auto req = laminar.startRequest();
req.setJobName(argv[jobNameIndex]);
int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req);
ts.add(req.send().then([&ret,argv,jobNameIndex](capnp::Response<LaminarCi::StartResults> resp){
if(resp.getResult() != LaminarCi::MethodResult::SUCCESS) {
fprintf(stderr, "Failed to start job '%s'\n", argv[2]);
ret = ENOENT;
}
printf("%s:%d\n", argv[jobNameIndex], resp.getBuildNum());
}));
jobNameIndex += n + 1;
} while(jobNameIndex < argc);
ts.onEmpty().wait(waitScope);
} else if(strcmp(argv[1], "run") == 0) {
if(argc < 3) {
fprintf(stderr, "Usage %s run <jobName>\n", argv[0]);
return EINVAL;
}
if(strcmp(argv[1], "start") == 0) {
fprintf(stderr, "Warning: \"start\" is deprecated, please use \"run\" instead\n");
}
struct: public kj::TaskSet::ErrorHandler {
void taskFailed(kj::Exception&&) override {}
} ignoreFailed;

View File

@@ -2,11 +2,12 @@
interface LaminarCi {
trigger @0 (jobName :Text, params :List(JobParam)) -> (result :MethodResult);
run @1 (jobName :Text, params :List(JobParam)) -> (result :JobResult, buildNum :UInt32);
set @2 (jobName :Text, buildNum :UInt32, param :JobParam) -> (result :MethodResult);
lock @3 (lockName :Text) -> ();
release @4 (lockName :Text) -> ();
queue @0 (jobName :Text, params :List(JobParam)) -> (result :MethodResult);
start @1 (jobName :Text, params :List(JobParam)) -> (result :MethodResult, buildNum :UInt32);
run @2 (jobName :Text, params :List(JobParam)) -> (result :JobResult, buildNum :UInt32);
set @3 (jobName :Text, buildNum :UInt32, param :JobParam) -> (result :MethodResult);
lock @4 (lockName :Text) -> ();
release @5 (lockName :Text) -> ();
struct JobParam {
name @0 :Text;

View File

@@ -712,6 +712,9 @@ void Laminar::assignNewJobs() {
c->sendMessage(msg);
}
// notify the rpc client if the start command was used
run->started.fulfiller->fulfill();
// setup run completion handler
run->notifyCompletion = [this](Run* r) { runFinished(r); };

View File

@@ -92,6 +92,7 @@ public:
int fd;
std::unordered_map<std::string, std::string> params;
kj::Promise<void> timeout = kj::NEVER_DONE;
kj::PromiseFulfillerPair<void> started = kj::newPromiseAndFulfiller<void>();
time_t queuedAt;
time_t startedAt;

View File

@@ -90,10 +90,10 @@ public:
laminar.deregisterWaiter(this);
}
// Start a job, without waiting for it to finish
kj::Promise<void> trigger(TriggerContext context) override {
// Queue a job, without waiting for it to start
kj::Promise<void> queue(QueueContext context) override {
std::string jobName = context.getParams().getJobName();
LLOG(INFO, "RPC trigger", jobName);
LLOG(INFO, "RPC queue", jobName);
ParamMap params;
for(auto p : context.getParams().getParams()) {
params[p.getName().cStr()] = p.getValue().cStr();
@@ -105,6 +105,26 @@ public:
return kj::READY_NOW;
}
// Start a job, without waiting for it to finish
kj::Promise<void> start(StartContext context) override {
std::string jobName = context.getParams().getJobName();
LLOG(INFO, "RPC start", jobName);
ParamMap params;
for(auto p : context.getParams().getParams()) {
params[p.getName().cStr()] = p.getValue().cStr();
}
std::shared_ptr<Run> run = laminar.queueJob(jobName, params);
if(Run* r = run.get()) {
return r->started.promise.then([context,r]() mutable {
context.getResults().setResult(LaminarCi::MethodResult::SUCCESS);
context.getResults().setBuildNum(r->build);
});
} else {
context.getResults().setResult(LaminarCi::MethodResult::FAILED);
return kj::READY_NOW;
}
}
// Start a job and wait for the result
kj::Promise<void> run(RunContext context) override {
std::string jobName = context.getParams().getJobName();
@@ -115,11 +135,10 @@ public:
}
std::shared_ptr<Run> run = laminar.queueJob(jobName, params);
if(const Run* r = run.get()) {
uint num = r->build;
runWaiters[r].emplace_back(kj::newPromiseAndFulfiller<RunState>());
return runWaiters[r].back().promise.then([context,num](RunState state) mutable {
return runWaiters[r].back().promise.then([context,run](RunState state) mutable {
context.getResults().setResult(fromRunState(state));
context.getResults().setBuildNum(num);
context.getResults().setBuildNum(run->build);
});
} else {
context.getResults().setResult(LaminarCi::JobResult::UNKNOWN);