diff --git a/src/client.cpp b/src/client.cpp index af3af2b..7e4ccdc 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -158,6 +158,14 @@ int main(int argc, char** argv) { auto response = req.send().wait(waitScope); if(response.getResult() != LaminarCi::JobResult::SUCCESS) return EFAILED; + } else if(strcmp(argv[1], "lock") == 0) { + auto req = laminar.lockRequest(); + req.setLockName(argv[2]); + req.send().wait(waitScope); + } else if(strcmp(argv[1], "release") == 0) { + auto req = laminar.releaseRequest(); + req.setLockName(argv[2]); + req.send().wait(waitScope); } else { fprintf(stderr, "Unknown comand %s\n", argv[1]); return EINVAL; diff --git a/src/laminar.capnp b/src/laminar.capnp index 25ffdc7..f947bf4 100644 --- a/src/laminar.capnp +++ b/src/laminar.capnp @@ -6,6 +6,8 @@ interface LaminarCi { start @1 (jobName :Text, params :List(JobParam)) -> (result :JobResult); pend @2 (jobName :Text, buildNum :UInt32) -> (result :JobResult); set @3 (jobName :Text, buildNum :UInt32, param :JobParam) -> (result :MethodResult); + lock @4 (lockName :Text) -> (); + release @5 (lockName :Text) -> (); struct JobParam { name @0 :Text; diff --git a/src/server.cpp b/src/server.cpp index a4547af..8b8785c 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -75,6 +75,18 @@ LaminarCi::JobResult fromRunState(RunState state) { // As such, it implements the pure virtual interface generated from // laminar.capnp with calls to the LaminarInterface class RpcImpl : public LaminarCi::Server { +private: + struct Lock { + Lock() : paf(kj::newPromiseAndFulfiller()) {} + void release() { + paf.fulfiller->fulfill(); + } + kj::Promise takePromise() { return std::move(paf.promise); } + private: + kj::PromiseFulfillerPair paf; + }; + + public: RpcImpl(LaminarInterface& l) : LaminarCi::Server(), @@ -143,9 +155,36 @@ public: return kj::READY_NOW; } + // Take a named lock + kj::Promise lock(LockContext context) override { + std::string lockName = context.getParams().getLockName(); + LLOG(INFO, "RPC lock", lockName); + auto& lockList = locks[lockName]; + lockList.emplace_back(Lock{}); + if(lockList.size() == 1) + lockList.front().release(); + return lockList.back().takePromise(); + } + + // Release a named lock + kj::Promise release(ReleaseContext context) override { + std::string lockName = context.getParams().getLockName(); + LLOG(INFO, "RPC release", lockName); + auto& lockList = locks[lockName]; + if(lockList.size() == 0) { + LLOG(INFO, "Attempt to release unheld lock", lockName); + return kj::READY_NOW; + } + lockList.erase(lockList.begin()); + if(lockList.size() > 0) + lockList.front().release(); + return kj::READY_NOW; + } + private: LaminarInterface& laminar; kj::LowLevelAsyncIoProvider* asyncio; + std::unordered_map> locks; };