From eda906b805add63c681de8a4f11cedd391257a30 Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Thu, 10 Aug 2017 08:25:20 +0300 Subject: [PATCH] refactor: remove transport knowledge from Laminar class Improve the boundary between RpcImpl and LaminarInterface such that the Laminar class doesn't require any types from kj/async.h. The necessary logic moved from Laminar to RpcImpl and the notification now happens by abstract virtual callback instead of kj::Promise. Also remove the fairly useless 'wait' RPC call and drop the wrappers around kj::PromiseFulfillerPair --- src/client.cpp | 7 ------ src/interface.h | 26 ++++++++++++---------- src/laminar.capnp | 7 +++--- src/laminar.cpp | 27 ++++++++++------------- src/laminar.h | 19 ++++------------ src/server.cpp | 56 +++++++++++++++++++---------------------------- 6 files changed, 56 insertions(+), 86 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index 7e4ccdc..b89b8ee 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -151,13 +151,6 @@ int main(int argc, char** argv) { fprintf(stderr, "Missing lJobName and lBuildNum or param is not in the format key=value\n"); return EINVAL; } - } else if(strcmp(argv[1], "wait") == 0) { - auto req = laminar.pendRequest(); - req.setJobName(argv[2]); - req.setBuildNum(atoi(argv[3])); - auto response = req.send().wait(waitScope); - if(response.getResult() != LaminarCi::JobResult::SUCCESS) - return EFAILED; } else if(strcmp(argv[1], "lock") == 0) { auto req = laminar.lockRequest(); req.setLockName(argv[2]); diff --git a/src/interface.h b/src/interface.h index 317993c..c4b0410 100644 --- a/src/interface.h +++ b/src/interface.h @@ -21,8 +21,6 @@ #include "run.h" -#include - #include #include #include @@ -73,6 +71,13 @@ struct LaminarClient { MonitorScope scope; }; +// Represents a (rpc) client that wants to be notified about run completion. +// Pass instances of this to LaminarInterface registerWaiter and +// deregisterWaiter +struct LaminarWaiter { + virtual void complete(const Run*) = 0; +}; + // The interface connecting the network layer to the application business // logic. These methods fulfil the requirements of both the HTTP/Websocket // and RPC interfaces. @@ -81,15 +86,6 @@ struct LaminarInterface { // the supplied name is not a known job. virtual std::shared_ptr queueJob(std::string name, ParamMap params = ParamMap()) = 0; - // Returns a promise that will wait for a run matching the given name - // and build number to complete. The promise will resolve to the result - // of the run. If no such run exists, the status will be RunState::UNKNOWN - virtual kj::Promise waitForRun(std::string name, int buildNum) = 0; - - // Specialization of above for an existing Run object (for example returned - // from queueJob). Returned promise will never resolve to RunState::UNKNOWN - virtual kj::Promise waitForRun(const Run*) = 0; - // Register a client (but don't give up ownership). The client will be // notified with a JSON message of any events matching its scope // (see LaminarClient and MonitorScope above) @@ -99,6 +95,14 @@ struct LaminarInterface { // to call LaminarClient::sendMessage on invalid data virtual void deregisterClient(LaminarClient* client) = 0; + // Register a waiter (but don't give up ownership). The waiter will be + // notified with a callback of any run completion (see LaminarWaiter above) + virtual void registerWaiter(LaminarWaiter* waiter) = 0; + + // Call this before destroying a waiter so that Laminar doesn't try + // to call LaminarWaiter::complete on invalid data + virtual void deregisterWaiter(LaminarWaiter* waiter) = 0; + // Synchronously send a snapshot of the current status to the given // client (as governed by the client's MonitorScope). This is called on // initial websocket connect. diff --git a/src/laminar.capnp b/src/laminar.capnp index f947bf4..339100c 100644 --- a/src/laminar.capnp +++ b/src/laminar.capnp @@ -4,10 +4,9 @@ interface LaminarCi { trigger @0 (jobName :Text, params :List(JobParam)) -> (result :MethodResult); start @1 (jobName :Text, params :List(JobParam)) -> (result :JobResult); - pend @2 (jobName :Text, buildNum :UInt32) -> (result :JobResult); - set @3 (jobName :Text, buildNum :UInt32, param :JobParam) -> (result :MethodResult); - lock @4 (lockName :Text) -> (); - release @5 (lockName :Text) -> (); + set @2 (jobName :Text, buildNum :UInt32, param :JobParam) -> (result :MethodResult); + lock @3 (lockName :Text) -> (); + release @4 (lockName :Text) -> (); struct JobParam { name @0 :Text; diff --git a/src/laminar.cpp b/src/laminar.cpp index 31b9b6a..a5d36dc 100644 --- a/src/laminar.cpp +++ b/src/laminar.cpp @@ -108,6 +108,14 @@ void Laminar::deregisterClient(LaminarClient* client) { clients.erase(client); } +void Laminar::registerWaiter(LaminarWaiter *waiter) { + waiters.insert(waiter); +} + +void Laminar::deregisterWaiter(LaminarWaiter *waiter) { + waiters.erase(waiter); +} + bool Laminar::setParam(std::string job, int buildNum, std::string param, std::string value) { if(Run* run = activeRun(job, buildNum)) { run->params[param] = value; @@ -471,17 +479,6 @@ std::shared_ptr Laminar::queueJob(std::string name, ParamMap params) { return run; } -kj::Promise Laminar::waitForRun(std::string name, int buildNum) { - if(const Run* run = activeRun(name, buildNum)) - return waitForRun(run); - return RunState::UNKNOWN; -} - -kj::Promise Laminar::waitForRun(const Run* run) { - waiters[run].emplace_back(Waiter{}); - return waiters[run].back().takePromise(); -} - bool Laminar::stepRun(std::shared_ptr run) { bool complete = run->step(); if(!complete) { @@ -723,10 +720,10 @@ void Laminar::runFinished(Run * r) { c->sendMessage(msg); } - // wake the waiters - for(Waiter& waiter : waiters[r]) - waiter.release(r->result); - waiters.erase(r); + // notify the waiters + for(LaminarWaiter* w : waiters) { + w->complete(r); + } // remove the rundir fs::remove_all(r->runDir); diff --git a/src/laminar.h b/src/laminar.h index dca21f0..0c39ae9 100644 --- a/src/laminar.h +++ b/src/laminar.h @@ -48,10 +48,11 @@ public: // Implementations of LaminarInterface std::shared_ptr queueJob(std::string name, ParamMap params = ParamMap()) override; - kj::Promise waitForRun(std::string name, int buildNum) override; - kj::Promise waitForRun(const Run* run) override; void registerClient(LaminarClient* client) override; void deregisterClient(LaminarClient* client) override; + void registerWaiter(LaminarWaiter* waiter) override; + void deregisterWaiter(LaminarWaiter* waiter) override; + void sendStatus(LaminarClient* client) override; bool setParam(std::string job, int buildNum, std::string param, std::string value) override; bool getArtefact(std::string path, std::string& result) override; @@ -73,19 +74,6 @@ private: std::list> queuedJobs; - // Implements the waitForRun API. - // TODO: refactor - struct Waiter { - Waiter() : paf(kj::newPromiseAndFulfiller()) {} - void release(RunState state) { - paf.fulfiller->fulfill(RunState(state)); - } - kj::Promise takePromise() { return std::move(paf.promise); } - private: - kj::PromiseFulfillerPair paf; - }; - std::unordered_map> waiters; - std::unordered_map buildNums; std::unordered_map> jobTags; @@ -96,6 +84,7 @@ private: NodeMap nodes; std::string homeDir; std::set clients; + std::set waiters; bool eraseWorkdir; std::string archiveUrl; }; diff --git a/src/server.cpp b/src/server.cpp index 8b8785c..ef4d89a 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -74,24 +74,17 @@ LaminarCi::JobResult fromRunState(RunState state) { // This is the implementation of the Laminar Cap'n Proto RPC interface. // As such, it implements the pure virtual interface generated from // laminar.capnp with calls to the LaminarInterface -class RpcImpl : public LaminarCi::Server { -private: - struct Lock { - Lock() : paf(kj::newPromiseAndFulfiller()) {} - void release() { - paf.fulfiller->fulfill(); - } - kj::Promise takePromise() { return std::move(paf.promise); } - private: - kj::PromiseFulfillerPair paf; - }; - - +class RpcImpl : public LaminarCi::Server, public LaminarWaiter { public: RpcImpl(LaminarInterface& l) : LaminarCi::Server(), laminar(l) { + laminar.registerWaiter(this); + } + + ~RpcImpl() { + laminar.deregisterWaiter(this); } // Start a job, without waiting for it to finish @@ -118,8 +111,9 @@ public: params[p.getName().cStr()] = p.getValue().cStr(); } std::shared_ptr run = laminar.queueJob(jobName, params); - if(run.get()) { - return laminar.waitForRun(run.get()).then([context](RunState state) mutable { + if(const Run* r = run.get()) { + runWaiters[r].emplace_back(kj::newPromiseAndFulfiller()); + return runWaiters[r].back().promise.then([context](RunState state) mutable { context.getResults().setResult(fromRunState(state)); }); } else { @@ -128,19 +122,6 @@ public: } } - // Wait for an already-running job to complete, returning the result - kj::Promise pend(PendContext context) override { - std::string jobName = context.getParams().getJobName(); - int buildNum = context.getParams().getBuildNum(); - LLOG(INFO, "RPC pend", jobName, buildNum); - - kj::Promise promise = laminar.waitForRun(jobName, buildNum); - - return promise.then([context](RunState state) mutable { - context.getResults().setResult(fromRunState(state)); - }); - } - // Set a parameter on a running build kj::Promise set(SetContext context) override { std::string jobName = context.getParams().getJobName(); @@ -160,10 +141,10 @@ public: std::string lockName = context.getParams().getLockName(); LLOG(INFO, "RPC lock", lockName); auto& lockList = locks[lockName]; - lockList.emplace_back(Lock{}); + lockList.emplace_back(kj::newPromiseAndFulfiller()); if(lockList.size() == 1) - lockList.front().release(); - return lockList.back().takePromise(); + lockList.front().fulfiller->fulfill(); + return std::move(lockList.back().promise); } // Release a named lock @@ -177,14 +158,21 @@ public: } lockList.erase(lockList.begin()); if(lockList.size() > 0) - lockList.front().release(); + lockList.front().fulfiller->fulfill(); return kj::READY_NOW; } - +private: + // Implements LaminarWaiter::complete + void complete(const Run* r) override { + for(kj::PromiseFulfillerPair& w : runWaiters[r]) + w.fulfiller->fulfill(RunState(r->result)); + runWaiters.erase(r); + } private: LaminarInterface& laminar; kj::LowLevelAsyncIoProvider* asyncio; - std::unordered_map> locks; + std::unordered_map>> locks; + std::unordered_map>> runWaiters; };