/// /// Copyright 2015-2022 Oliver Giles /// /// This file is part of Laminar /// /// Laminar is free software: you can redistribute it and/or modify /// it under the terms of the GNU General Public License as published by /// the Free Software Foundation, either version 3 of the License, or /// (at your option) any later version. /// /// Laminar is distributed in the hope that it will be useful, /// but WITHOUT ANY WARRANTY; without even the implied warranty of /// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the /// GNU General Public License for more details. /// /// You should have received a copy of the GNU General Public License /// along with Laminar. If not, see /// #include "rpc.h" #include "laminar.capnp.h" #include "laminar.h" #include "log.h" namespace { // Used for returning run state to RPC clients LaminarCi::JobResult fromRunState(RunState state) { switch(state) { case RunState::SUCCESS: return LaminarCi::JobResult::SUCCESS; case RunState::FAILED: return LaminarCi::JobResult::FAILED; case RunState::ABORTED: return LaminarCi::JobResult::ABORTED; default: return LaminarCi::JobResult::UNKNOWN; } } } // This is the implementation of the Laminar Cap'n Proto RPC interface. // As such, it implements the pure virtual interface generated from // laminar.capnp with calls to the primary Laminar class class RpcImpl : public LaminarCi::Server { public: RpcImpl(Laminar& l) : LaminarCi::Server(), laminar(l) { } virtual ~RpcImpl() { } // Queue a job, without waiting for it to start kj::Promise queue(QueueContext context) override { std::string jobName = context.getParams().getJobName(); LLOG(INFO, "RPC queue", jobName); std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams()), context.getParams().getFrontOfQueue()); if(Run* r = run.get()) { context.getResults().setResult(LaminarCi::MethodResult::SUCCESS); context.getResults().setBuildNum(r->build); } else { context.getResults().setResult(LaminarCi::MethodResult::FAILED); } return kj::READY_NOW; } // Start a job, without waiting for it to finish kj::Promise start(StartContext context) override { std::string jobName = context.getParams().getJobName(); LLOG(INFO, "RPC start", jobName); std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams()), context.getParams().getFrontOfQueue()); if(Run* r = run.get()) { return r->whenStarted().then([context,r]() mutable { context.getResults().setResult(LaminarCi::MethodResult::SUCCESS); context.getResults().setBuildNum(r->build); }); } else { context.getResults().setResult(LaminarCi::MethodResult::FAILED); return kj::READY_NOW; } } // Start a job and wait for the result kj::Promise run(RunContext context) override { std::string jobName = context.getParams().getJobName(); LLOG(INFO, "RPC run", jobName); std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams()), context.getParams().getFrontOfQueue()); if(run) { return run->whenFinished().then([context,run](RunState state) mutable { context.getResults().setResult(fromRunState(state)); context.getResults().setBuildNum(run->build); }); } else { context.getResults().setResult(LaminarCi::JobResult::UNKNOWN); return kj::READY_NOW; } } // List jobs in queue kj::Promise listQueued(ListQueuedContext context) override { const std::list>& queue = laminar.listQueuedJobs(); auto res = context.getResults().initResult(queue.size()); int i = 0; for(auto it : queue) { res[i].setJob(it->name); res[i].setBuildNum(it->build); i++; } return kj::READY_NOW; } // List running jobs kj::Promise listRunning(ListRunningContext context) override { const RunSet& active = laminar.listRunningJobs(); auto res = context.getResults().initResult(active.size()); int i = 0; for(auto it : active) { res[i].setJob(it->name); res[i].setBuildNum(it->build); i++; } return kj::READY_NOW; } // List known jobs kj::Promise listKnown(ListKnownContext context) override { std::list known = laminar.listKnownJobs(); auto res = context.getResults().initResult(known.size()); int i = 0; for(auto it : known) { res.set(i++, it); } return kj::READY_NOW; } kj::Promise abort(AbortContext context) override { std::string jobName = context.getParams().getRun().getJob(); uint buildNum = context.getParams().getRun().getBuildNum(); LLOG(INFO, "RPC abort", jobName, buildNum); LaminarCi::MethodResult result = laminar.abort(jobName, buildNum) ? LaminarCi::MethodResult::SUCCESS : LaminarCi::MethodResult::FAILED; context.getResults().setResult(result); return kj::READY_NOW; } private: // Helper to convert an RPC parameter list to a hash map ParamMap params(const capnp::List::Reader& paramReader) { ParamMap res; for(auto p : paramReader) { res[p.getName().cStr()] = p.getValue().cStr(); } return res; } Laminar& laminar; std::unordered_map>> runWaiters; }; Rpc::Rpc(Laminar& li) : rpcInterface(kj::heap(li)) {} // Context for an RPC connection struct RpcConnection { RpcConnection(kj::Own&& stream, capnp::Capability::Client bootstrap, capnp::ReaderOptions readerOpts) : stream(kj::mv(stream)), network(*this->stream, capnp::rpc::twoparty::Side::SERVER, readerOpts), rpcSystem(capnp::makeRpcServer(network, bootstrap)) { } kj::Own stream; capnp::TwoPartyVatNetwork network; capnp::RpcSystem rpcSystem; }; kj::Promise Rpc::accept(kj::Own&& connection) { auto server = kj::heap(kj::mv(connection), rpcInterface, capnp::ReaderOptions()); return server->network.onDisconnect().attach(kj::mv(server)); }