From 4a07e24da3da987f9e42df5e760e3c4bde069592 Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Fri, 27 Sep 2019 20:50:46 +0300 Subject: [PATCH] split server into http and rpc parts this is initial preparation for a larger refactor --- CMakeLists.txt | 16 +- src/http.cpp | 306 ++++++++++++++++++++++++++++ src/http.h | 36 ++++ src/rpc.cpp | 200 +++++++++++++++++++ src/rpc.h | 36 ++++ src/server.cpp | 463 +------------------------------------------ src/server.h | 11 +- test/test-server.cpp | 3 +- 8 files changed, 606 insertions(+), 465 deletions(-) create mode 100644 src/http.cpp create mode 100644 src/http.h create mode 100644 src/rpc.cpp create mode 100644 src/rpc.h diff --git a/CMakeLists.txt b/CMakeLists.txt index cf73a1b..b9aaebd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -85,9 +85,19 @@ generate_compressed_bins(${CMAKE_BINARY_DIR} js/vue-router.min.js js/vue.min.js js/ansi_up.js js/Chart.min.js css/bootstrap.min.css) # (see resources.cpp where these are fetched) +set(LAMINARD_SOURCES + src/database.cpp + src/server.cpp + src/laminar.cpp + src/conf.cpp + src/http.cpp + src/resources.cpp + src/rpc.cpp + src/run.cpp +) + ## Server -add_executable(laminard src/database.cpp src/main.cpp src/server.cpp src/laminar.cpp - src/conf.cpp src/resources.cpp src/run.cpp laminar.capnp.c++ ${COMPRESSED_BINS} index_html_size.h) +add_executable(laminard ${LAMINARD_SOURCES} src/main.cpp laminar.capnp.c++ ${COMPRESSED_BINS} index_html_size.h) target_link_libraries(laminard capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z) ## Client @@ -99,7 +109,7 @@ set(BUILD_TESTS FALSE CACHE BOOL "Build tests") if(BUILD_TESTS) find_package(GTest REQUIRED) include_directories(${GTEST_INCLUDE_DIRS} src) - add_executable(laminar-tests src/conf.cpp src/database.cpp src/laminar.cpp src/run.cpp src/server.cpp laminar.capnp.c++ src/resources.cpp ${COMPRESSED_BINS} test/test-conf.cpp test/test-database.cpp test/test-laminar.cpp test/test-run.cpp test/test-server.cpp) + add_executable(laminar-tests ${LAMINARD_SOURCES} laminar.capnp.c++ src/resources.cpp ${COMPRESSED_BINS} test/test-conf.cpp test/test-database.cpp test/test-laminar.cpp test/test-run.cpp test/test-server.cpp) target_link_libraries(laminar-tests ${GTEST_BOTH_LIBRARIES} gmock capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z) endif() diff --git a/src/http.cpp b/src/http.cpp new file mode 100644 index 0000000..aa6aba8 --- /dev/null +++ b/src/http.cpp @@ -0,0 +1,306 @@ +/// +/// Copyright 2015-2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "interface.h" +#include "http.h" +#include "resources.h" +#include "log.h" + +#include +#include + +// This is the implementation of the HTTP/Websocket interface. It creates +// websocket connections as LaminarClients and registers them with the +// LaminarInterface so that status messages will be delivered to the client. +// On opening a websocket connection, it delivers a status snapshot message +// (see LaminarInterface::sendStatus) +class HttpImpl : public kj::HttpService { +public: + HttpImpl(LaminarInterface& laminar, kj::HttpHeaderTable&tbl) : + laminar(laminar), + responseHeaders(tbl) + {} + virtual ~HttpImpl() {} + +private: + class HttpChunkedClient : public LaminarClient { + public: + HttpChunkedClient(LaminarInterface& laminar) : + laminar(laminar) + {} + ~HttpChunkedClient() override { + laminar.deregisterClient(this); + } + void sendMessage(std::string payload) override { + chunks.push_back(kj::mv(payload)); + fulfiller->fulfill(); + } + void notifyJobFinished() override { + done = true; + fulfiller->fulfill(); + } + LaminarInterface& laminar; + std::list chunks; + // cannot use chunks.empty() because multiple fulfill()s + // could be coalesced + bool done = false; + kj::Own> fulfiller; + }; + + // Implements LaminarClient and holds the Websocket connection object. + // Automatically destructed when the promise created in request() resolves + // or is cancelled + class WebsocketClient : public LaminarClient { + public: + WebsocketClient(LaminarInterface& laminar, kj::Own&& ws) : + laminar(laminar), + ws(kj::mv(ws)) + {} + ~WebsocketClient() override { + laminar.deregisterClient(this); + } + virtual void sendMessage(std::string payload) override { + messages.emplace_back(kj::mv(payload)); + // sendMessage might be called several times before the event loop + // gets a chance to act on the fulfiller. So store the payload here + // where it can be fetched later and don't pass the payload with the + // fulfiller because subsequent calls to fulfill() are ignored. + fulfiller->fulfill(); + } + LaminarInterface& laminar; + kj::Own ws; + std::list messages; + kj::Own> fulfiller; + }; + + kj::Promise websocketRead(WebsocketClient& lc) + { + return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) { + KJ_SWITCH_ONEOF(message) { + KJ_CASE_ONEOF(str, kj::String) { + rapidjson::Document d; + d.ParseInsitu(const_cast(str.cStr())); + if(d.HasMember("page") && d["page"].IsInt() && d.HasMember("field") && d["field"].IsString() && d.HasMember("order") && d["order"].IsString()) { + lc.scope.page = d["page"].GetInt(); + lc.scope.field = d["field"].GetString(); + lc.scope.order_desc = strcmp(d["order"].GetString(), "dsc") == 0; + laminar.sendStatus(&lc); + return websocketRead(lc); + } + } + KJ_CASE_ONEOF(close, kj::WebSocket::Close) { + // clean socket shutdown + return lc.ws->close(close.code, close.reason); + } + KJ_CASE_ONEOF_DEFAULT {} + } + // unhandled/unknown message + return lc.ws->disconnect(); + }, [](kj::Exception&& e){ + // server logs suggest early catching here avoids fatal exception later + // TODO: reproduce in unit test + LLOG(WARNING, e.getDescription()); + return kj::READY_NOW; + }); + } + + kj::Promise websocketWrite(WebsocketClient& lc) + { + auto paf = kj::newPromiseAndFulfiller(); + lc.fulfiller = kj::mv(paf.fulfiller); + return paf.promise.then([this,&lc]{ + kj::Promise p = kj::READY_NOW; + std::list messages = kj::mv(lc.messages); + for(std::string& s : messages) { + p = p.then([&s,&lc]{ + kj::String str = kj::str(s); + return lc.ws->send(str).attach(kj::mv(str)); + }); + } + return p.attach(kj::mv(messages)).then([this,&lc]{ + return websocketWrite(lc); + }); + }); + } + + kj::Promise websocketUpgraded(WebsocketClient& lc, std::string resource) { + // convert the requested URL to a MonitorScope + if(resource.substr(0, 5) == "/jobs") { + if(resource.length() == 5) { + lc.scope.type = MonitorScope::ALL; + } else { + resource = resource.substr(5); + size_t split = resource.find('/',1); + std::string job = resource.substr(1,split-1); + if(!job.empty()) { + lc.scope.job = job; + lc.scope.type = MonitorScope::JOB; + } + if(split != std::string::npos) { + size_t split2 = resource.find('/', split+1); + std::string run = resource.substr(split+1, split2-split); + if(!run.empty()) { + lc.scope.num = static_cast(atoi(run.c_str())); + lc.scope.type = MonitorScope::RUN; + } + if(split2 != std::string::npos && resource.compare(split2, 4, "/log") == 0) { + lc.scope.type = MonitorScope::LOG; + } + } + } + } + laminar.registerClient(&lc); + kj::Promise connection = websocketRead(lc).exclusiveJoin(websocketWrite(lc)); + // registerClient can happen after a successful websocket handshake. + // However, the connection might not be closed gracefully, so the + // corresponding deregister operation happens in the WebsocketClient + // destructor rather than the close handler or a then() clause + laminar.sendStatus(&lc); + return connection; + } + + // Parses the url of the form /log/NAME/NUMBER, filling in the passed + // references and returning true if successful. /log/NAME/latest is + // also allowed, in which case the num reference is set to 0 + bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) { + if(url.startsWith("/log/")) { + kj::StringPtr path = url.slice(5); + KJ_IF_MAYBE(sep, path.findFirst('/')) { + name = path.slice(0, *sep).begin(); + kj::StringPtr tail = path.slice(*sep+1); + num = static_cast(atoi(tail.begin())); + name.erase(*sep); + if(tail == "latest") + num = laminar.latestRun(name); + if(num > 0) + return true; + } + } + return false; + } + + kj::Promise writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) { + auto paf = kj::newPromiseAndFulfiller(); + client->fulfiller = kj::mv(paf.fulfiller); + return paf.promise.then([=]{ + kj::Promise p = kj::READY_NOW; + std::list chunks = kj::mv(client->chunks); + for(std::string& s : chunks) { + p = p.then([=,&s]{ + return stream->write(s.data(), s.size()); + }); + } + return p.attach(kj::mv(chunks)).then([=]{ + return client->done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream); + }); + }); + } + + virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, Response& response) override + { + if(headers.isWebSocket()) { + responseHeaders.clear(); + kj::Own lc = kj::heap(laminar, response.acceptWebSocket(responseHeaders)); + return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc)); + } else { + // handle regular HTTP request + const char* start, *end, *content_type; + std::string badge; + // for log requests + std::string name; + uint num; + responseHeaders.clear(); + // Clients usually expect that http servers will ignore unknown query parameters, + // and expect to use this feature to work around browser limitations like there + // being no way to programatically force a resource to be reloaded from the server + // (without "Cache-Control: no-store", which is overkill). See issue #89. + // Since we currently don't handle any query parameters at all, the easiest way + // to achieve this is unconditionally remove all query parameters from the request. + // This will need to be redone if we ever accept query parameters, which probably + // will happen as part of issue #90. + KJ_IF_MAYBE(queryIdx, url.findFirst('?')) { + const_cast(url.begin())[*queryIdx] = '\0'; + url = url.begin(); + } + if(url.startsWith("/archive/")) { + KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) { + auto array = (*file)->mmap(0, (*file)->stat().size); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + auto stream = response.send(200, "OK", responseHeaders, array.size()); + return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); + } + } else if(parseLogEndpoint(url, name, num)) { + kj::Own cc = kj::heap(laminar); + cc->scope.job = name; + cc->scope.num = num; + bool complete; + std::string output; + cc->scope.type = MonitorScope::LOG; + if(laminar.handleLogRequest(name, num, output, complete)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output. + responseHeaders.add("X-Accel-Buffering", "no"); + auto stream = response.send(200, "OK", responseHeaders, nullptr); + laminar.registerClient(cc.get()); + return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{ + if(complete) + return kj::Promise(kj::READY_NOW); + return writeLogChunk(c, s); + }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc)); + } + } else if(url == "/custom/style.css") { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + std::string css = laminar.getCustomCss(); + auto stream = response.send(200, "OK", responseHeaders, css.size()); + return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream)); + } else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); + responseHeaders.add("Content-Encoding", "gzip"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + auto stream = response.send(200, "OK", responseHeaders, end-start); + return stream->write(start, end-start).attach(kj::mv(stream)); + } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml"); + responseHeaders.add("Cache-Control", "no-cache"); + auto stream = response.send(200, "OK", responseHeaders, badge.size()); + return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream)); + } + return response.sendError(404, "Not Found", responseHeaders); + } + } + + LaminarInterface& laminar; + Resources resources; + kj::HttpHeaders responseHeaders; +}; + +Http::Http(LaminarInterface &li) : + headerTable(kj::heap()), + httpService(kj::heap(li, *headerTable)), + laminar(li) +{ + +} + +kj::Promise Http::startServer(kj::Timer& timer, kj::Own&& listener) { + auto httpServer = kj::heap(timer, *headerTable, *httpService); + return httpServer->listenHttp(*listener).attach(kj::mv(listener)).attach(kj::mv(httpServer)); +} diff --git a/src/http.h b/src/http.h new file mode 100644 index 0000000..5098b39 --- /dev/null +++ b/src/http.h @@ -0,0 +1,36 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef LAMINAR_HTTP_H_ +#define LAMINAR_HTTP_H_ + +#include + +struct LaminarInterface; + +class Http { +public: + Http(LaminarInterface &li); + kj::Promise startServer(kj::Timer &timer, kj::Own&& listener); + + kj::Own headerTable; + kj::Own httpService; + LaminarInterface& laminar; +}; + +#endif //LAMINAR_HTTP_H_ diff --git a/src/rpc.cpp b/src/rpc.cpp new file mode 100644 index 0000000..c7a9a3e --- /dev/null +++ b/src/rpc.cpp @@ -0,0 +1,200 @@ +/// +/// Copyright 2015-2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "rpc.h" +#include "laminar.capnp.h" + +#include "interface.h" +#include "log.h" + +namespace { + +// Used for returning run state to RPC clients +LaminarCi::JobResult fromRunState(RunState state) { + switch(state) { + case RunState::SUCCESS: return LaminarCi::JobResult::SUCCESS; + case RunState::FAILED: return LaminarCi::JobResult::FAILED; + case RunState::ABORTED: return LaminarCi::JobResult::ABORTED; + default: + return LaminarCi::JobResult::UNKNOWN; + } +} + +} +// This is the implementation of the Laminar Cap'n Proto RPC interface. +// As such, it implements the pure virtual interface generated from +// laminar.capnp with calls to the LaminarInterface +class RpcImpl : public LaminarCi::Server, public LaminarWaiter { +public: + RpcImpl(LaminarInterface& l) : + LaminarCi::Server(), + laminar(l) + { + } + + ~RpcImpl() override { + } + + // Queue a job, without waiting for it to start + kj::Promise queue(QueueContext context) override { + std::string jobName = context.getParams().getJobName(); + LLOG(INFO, "RPC queue", jobName); + LaminarCi::MethodResult result = laminar.queueJob(jobName, params(context.getParams().getParams())) + ? LaminarCi::MethodResult::SUCCESS + : LaminarCi::MethodResult::FAILED; + context.getResults().setResult(result); + return kj::READY_NOW; + } + + // Start a job, without waiting for it to finish + kj::Promise start(StartContext context) override { + std::string jobName = context.getParams().getJobName(); + LLOG(INFO, "RPC start", jobName); + std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams())); + if(Run* r = run.get()) { + return r->whenStarted().then([context,r]() mutable { + context.getResults().setResult(LaminarCi::MethodResult::SUCCESS); + context.getResults().setBuildNum(r->build); + }); + } else { + context.getResults().setResult(LaminarCi::MethodResult::FAILED); + return kj::READY_NOW; + } + } + + // Start a job and wait for the result + kj::Promise run(RunContext context) override { + std::string jobName = context.getParams().getJobName(); + LLOG(INFO, "RPC run", jobName); + std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams())); + if(Run* r = run.get()) { + runWaiters[r].emplace_back(kj::newPromiseAndFulfiller()); + return runWaiters[r].back().promise.then([context,run](RunState state) mutable { + context.getResults().setResult(fromRunState(state)); + context.getResults().setBuildNum(run->build); + }); + } else { + context.getResults().setResult(LaminarCi::JobResult::UNKNOWN); + return kj::READY_NOW; + } + } + + // Set a parameter on a running build + kj::Promise set(SetContext context) override { + std::string jobName = context.getParams().getRun().getJob(); + uint buildNum = context.getParams().getRun().getBuildNum(); + LLOG(INFO, "RPC set", jobName, buildNum); + + LaminarCi::MethodResult result = laminar.setParam(jobName, buildNum, + context.getParams().getParam().getName(), context.getParams().getParam().getValue()) + ? LaminarCi::MethodResult::SUCCESS + : LaminarCi::MethodResult::FAILED; + context.getResults().setResult(result); + return kj::READY_NOW; + } + + // List jobs in queue + kj::Promise listQueued(ListQueuedContext context) override { + const std::list>& queue = laminar.listQueuedJobs(); + auto res = context.getResults().initResult(queue.size()); + int i = 0; + for(auto it : queue) { + res.set(i++, it->name); + } + return kj::READY_NOW; + } + + // List running jobs + kj::Promise listRunning(ListRunningContext context) override { + const RunSet& active = laminar.listRunningJobs(); + auto res = context.getResults().initResult(active.size()); + int i = 0; + for(auto it : active) { + res[i].setJob(it->name); + res[i].setBuildNum(it->build); + i++; + } + return kj::READY_NOW; + } + + // List known jobs + kj::Promise listKnown(ListKnownContext context) override { + std::list known = laminar.listKnownJobs(); + auto res = context.getResults().initResult(known.size()); + int i = 0; + for(auto it : known) { + res.set(i++, it); + } + return kj::READY_NOW; + } + + kj::Promise abort(AbortContext context) override { + std::string jobName = context.getParams().getRun().getJob(); + uint buildNum = context.getParams().getRun().getBuildNum(); + LLOG(INFO, "RPC abort", jobName, buildNum); + LaminarCi::MethodResult result = laminar.abort(jobName, buildNum) + ? LaminarCi::MethodResult::SUCCESS + : LaminarCi::MethodResult::FAILED; + context.getResults().setResult(result); + return kj::READY_NOW; + } + +private: + // Helper to convert an RPC parameter list to a hash map + ParamMap params(const capnp::List::Reader& paramReader) { + ParamMap res; + for(auto p : paramReader) { + res[p.getName().cStr()] = p.getValue().cStr(); + } + return res; + } + + // Implements LaminarWaiter::complete + void complete(const Run* r) override { + for(kj::PromiseFulfillerPair& w : runWaiters[r]) + w.fulfiller->fulfill(RunState(r->result)); + runWaiters.erase(r); + } +private: + LaminarInterface& laminar; + std::unordered_map>> runWaiters; +}; + +Rpc::Rpc(LaminarInterface& li) : + rpcInterface(kj::heap(li)) +{} + +// Context for an RPC connection +struct RpcConnection { + RpcConnection(kj::Own&& stream, + capnp::Capability::Client bootstrap, + capnp::ReaderOptions readerOpts) : + stream(kj::mv(stream)), + network(*this->stream, capnp::rpc::twoparty::Side::SERVER, readerOpts), + rpcSystem(capnp::makeRpcServer(network, bootstrap)) + { + } + kj::Own stream; + capnp::TwoPartyVatNetwork network; + capnp::RpcSystem rpcSystem; +}; + +kj::Promise Rpc::accept(kj::Own&& connection) { + auto server = kj::heap(kj::mv(connection), rpcInterface, capnp::ReaderOptions()); + return server->network.onDisconnect().attach(kj::mv(server)); +} diff --git a/src/rpc.h b/src/rpc.h new file mode 100644 index 0000000..823980e --- /dev/null +++ b/src/rpc.h @@ -0,0 +1,36 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef LAMINAR_RPC_H_ +#define LAMINAR_RPC_H_ + +#include +#include +#include + +struct LaminarInterface; + +class Rpc { +public: + Rpc(LaminarInterface &li); + kj::Promise accept(kj::Own&& connection); + + capnp::Capability::Client rpcInterface; +}; + +#endif //LAMINAR_RPC_H_ diff --git a/src/server.cpp b/src/server.cpp index 484783c..f98ef4a 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -18,13 +18,10 @@ /// #include "server.h" #include "interface.h" -#include "laminar.capnp.h" -#include "resources.h" #include "log.h" +#include "rpc.h" +#include "http.h" -#include -#include -#include #include #include #include @@ -34,463 +31,19 @@ #include #include -#include - // Size of buffer used to read from file descriptors. Should be // a multiple of sizeof(struct signalfd_siginfo) == 128 #define PROC_IO_BUFSIZE 4096 -namespace { - -// Used for returning run state to RPC clients -LaminarCi::JobResult fromRunState(RunState state) { - switch(state) { - case RunState::SUCCESS: return LaminarCi::JobResult::SUCCESS; - case RunState::FAILED: return LaminarCi::JobResult::FAILED; - case RunState::ABORTED: return LaminarCi::JobResult::ABORTED; - default: - return LaminarCi::JobResult::UNKNOWN; - } -} - -} - -// This is the implementation of the Laminar Cap'n Proto RPC interface. -// As such, it implements the pure virtual interface generated from -// laminar.capnp with calls to the LaminarInterface -class RpcImpl : public LaminarCi::Server, public LaminarWaiter { -public: - RpcImpl(LaminarInterface& l) : - LaminarCi::Server(), - laminar(l) - { - laminar.registerWaiter(this); - } - - ~RpcImpl() override { - laminar.deregisterWaiter(this); - } - - // Queue a job, without waiting for it to start - kj::Promise queue(QueueContext context) override { - std::string jobName = context.getParams().getJobName(); - LLOG(INFO, "RPC queue", jobName); - LaminarCi::MethodResult result = laminar.queueJob(jobName, params(context.getParams().getParams())) - ? LaminarCi::MethodResult::SUCCESS - : LaminarCi::MethodResult::FAILED; - context.getResults().setResult(result); - return kj::READY_NOW; - } - - // Start a job, without waiting for it to finish - kj::Promise start(StartContext context) override { - std::string jobName = context.getParams().getJobName(); - LLOG(INFO, "RPC start", jobName); - std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams())); - if(Run* r = run.get()) { - return r->whenStarted().then([context,r]() mutable { - context.getResults().setResult(LaminarCi::MethodResult::SUCCESS); - context.getResults().setBuildNum(r->build); - }); - } else { - context.getResults().setResult(LaminarCi::MethodResult::FAILED); - return kj::READY_NOW; - } - } - - // Start a job and wait for the result - kj::Promise run(RunContext context) override { - std::string jobName = context.getParams().getJobName(); - LLOG(INFO, "RPC run", jobName); - std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams())); - if(const Run* r = run.get()) { - runWaiters[r].emplace_back(kj::newPromiseAndFulfiller()); - return runWaiters[r].back().promise.then([context,run](RunState state) mutable { - context.getResults().setResult(fromRunState(state)); - context.getResults().setBuildNum(run->build); - }); - } else { - context.getResults().setResult(LaminarCi::JobResult::UNKNOWN); - return kj::READY_NOW; - } - } - - // Set a parameter on a running build - kj::Promise set(SetContext context) override { - std::string jobName = context.getParams().getRun().getJob(); - uint buildNum = context.getParams().getRun().getBuildNum(); - LLOG(INFO, "RPC set", jobName, buildNum); - - LaminarCi::MethodResult result = laminar.setParam(jobName, buildNum, - context.getParams().getParam().getName(), context.getParams().getParam().getValue()) - ? LaminarCi::MethodResult::SUCCESS - : LaminarCi::MethodResult::FAILED; - context.getResults().setResult(result); - return kj::READY_NOW; - } - - // List jobs in queue - kj::Promise listQueued(ListQueuedContext context) override { - const std::list>& queue = laminar.listQueuedJobs(); - auto res = context.getResults().initResult(queue.size()); - int i = 0; - for(auto it : queue) { - res.set(i++, it->name); - } - return kj::READY_NOW; - } - - // List running jobs - kj::Promise listRunning(ListRunningContext context) override { - const RunSet& active = laminar.listRunningJobs(); - auto res = context.getResults().initResult(active.size()); - int i = 0; - for(auto it : active) { - res[i].setJob(it->name); - res[i].setBuildNum(it->build); - i++; - } - return kj::READY_NOW; - } - - // List known jobs - kj::Promise listKnown(ListKnownContext context) override { - std::list known = laminar.listKnownJobs(); - auto res = context.getResults().initResult(known.size()); - int i = 0; - for(auto it : known) { - res.set(i++, it); - } - return kj::READY_NOW; - } - - kj::Promise abort(AbortContext context) override { - std::string jobName = context.getParams().getRun().getJob(); - uint buildNum = context.getParams().getRun().getBuildNum(); - LLOG(INFO, "RPC abort", jobName, buildNum); - LaminarCi::MethodResult result = laminar.abort(jobName, buildNum) - ? LaminarCi::MethodResult::SUCCESS - : LaminarCi::MethodResult::FAILED; - context.getResults().setResult(result); - return kj::READY_NOW; - } - -private: - // Helper to convert an RPC parameter list to a hash map - ParamMap params(const capnp::List::Reader& paramReader) { - ParamMap res; - for(auto p : paramReader) { - res[p.getName().cStr()] = p.getValue().cStr(); - } - return res; - } - - // Implements LaminarWaiter::complete - void complete(const Run* r) override { - for(kj::PromiseFulfillerPair& w : runWaiters[r]) - w.fulfiller->fulfill(RunState(r->result)); - runWaiters.erase(r); - } -private: - LaminarInterface& laminar; - std::unordered_map>> runWaiters; -}; - -// This is the implementation of the HTTP/Websocket interface. It creates -// websocket connections as LaminarClients and registers them with the -// LaminarInterface so that status messages will be delivered to the client. -// On opening a websocket connection, it delivers a status snapshot message -// (see LaminarInterface::sendStatus) -class HttpImpl : public kj::HttpService { -public: - HttpImpl(LaminarInterface& laminar, kj::HttpHeaderTable&tbl) : - laminar(laminar), - responseHeaders(tbl) - {} - virtual ~HttpImpl() {} - -private: - class HttpChunkedClient : public LaminarClient { - public: - HttpChunkedClient(LaminarInterface& laminar) : - laminar(laminar) - {} - ~HttpChunkedClient() override { - laminar.deregisterClient(this); - } - void sendMessage(std::string payload) override { - chunks.push_back(kj::mv(payload)); - fulfiller->fulfill(); - } - void notifyJobFinished() override { - done = true; - fulfiller->fulfill(); - } - LaminarInterface& laminar; - std::list chunks; - // cannot use chunks.empty() because multiple fulfill()s - // could be coalesced - bool done = false; - kj::Own> fulfiller; - }; - - // Implements LaminarClient and holds the Websocket connection object. - // Automatically destructed when the promise created in request() resolves - // or is cancelled - class WebsocketClient : public LaminarClient { - public: - WebsocketClient(LaminarInterface& laminar, kj::Own&& ws) : - laminar(laminar), - ws(kj::mv(ws)) - {} - ~WebsocketClient() override { - laminar.deregisterClient(this); - } - virtual void sendMessage(std::string payload) override { - messages.emplace_back(kj::mv(payload)); - // sendMessage might be called several times before the event loop - // gets a chance to act on the fulfiller. So store the payload here - // where it can be fetched later and don't pass the payload with the - // fulfiller because subsequent calls to fulfill() are ignored. - fulfiller->fulfill(); - } - LaminarInterface& laminar; - kj::Own ws; - std::list messages; - kj::Own> fulfiller; - }; - - kj::Promise websocketRead(WebsocketClient& lc) - { - return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) { - KJ_SWITCH_ONEOF(message) { - KJ_CASE_ONEOF(str, kj::String) { - rapidjson::Document d; - d.ParseInsitu(const_cast(str.cStr())); - if(d.HasMember("page") && d["page"].IsInt() && d.HasMember("field") && d["field"].IsString() && d.HasMember("order") && d["order"].IsString()) { - lc.scope.page = d["page"].GetInt(); - lc.scope.field = d["field"].GetString(); - lc.scope.order_desc = strcmp(d["order"].GetString(), "dsc") == 0; - laminar.sendStatus(&lc); - return websocketRead(lc); - } - } - KJ_CASE_ONEOF(close, kj::WebSocket::Close) { - // clean socket shutdown - return lc.ws->close(close.code, close.reason); - } - KJ_CASE_ONEOF_DEFAULT {} - } - // unhandled/unknown message - return lc.ws->disconnect(); - }, [](kj::Exception&& e){ - // server logs suggest early catching here avoids fatal exception later - // TODO: reproduce in unit test - LLOG(WARNING, e.getDescription()); - return kj::READY_NOW; - }); - } - - kj::Promise websocketWrite(WebsocketClient& lc) - { - auto paf = kj::newPromiseAndFulfiller(); - lc.fulfiller = kj::mv(paf.fulfiller); - return paf.promise.then([this,&lc]{ - kj::Promise p = kj::READY_NOW; - std::list messages = kj::mv(lc.messages); - for(std::string& s : messages) { - p = p.then([&s,&lc]{ - kj::String str = kj::str(s); - return lc.ws->send(str).attach(kj::mv(str)); - }); - } - return p.attach(kj::mv(messages)).then([this,&lc]{ - return websocketWrite(lc); - }); - }); - } - - kj::Promise websocketUpgraded(WebsocketClient& lc, std::string resource) { - // convert the requested URL to a MonitorScope - if(resource.substr(0, 5) == "/jobs") { - if(resource.length() == 5) { - lc.scope.type = MonitorScope::ALL; - } else { - resource = resource.substr(5); - size_t split = resource.find('/',1); - std::string job = resource.substr(1,split-1); - if(!job.empty()) { - lc.scope.job = job; - lc.scope.type = MonitorScope::JOB; - } - if(split != std::string::npos) { - size_t split2 = resource.find('/', split+1); - std::string run = resource.substr(split+1, split2-split); - if(!run.empty()) { - lc.scope.num = static_cast(atoi(run.c_str())); - lc.scope.type = MonitorScope::RUN; - } - if(split2 != std::string::npos && resource.compare(split2, 4, "/log") == 0) { - lc.scope.type = MonitorScope::LOG; - } - } - } - } - laminar.registerClient(&lc); - kj::Promise connection = websocketRead(lc).exclusiveJoin(websocketWrite(lc)); - // registerClient can happen after a successful websocket handshake. - // However, the connection might not be closed gracefully, so the - // corresponding deregister operation happens in the WebsocketClient - // destructor rather than the close handler or a then() clause - laminar.sendStatus(&lc); - return connection; - } - - // Parses the url of the form /log/NAME/NUMBER, filling in the passed - // references and returning true if successful. /log/NAME/latest is - // also allowed, in which case the num reference is set to 0 - bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) { - if(url.startsWith("/log/")) { - kj::StringPtr path = url.slice(5); - KJ_IF_MAYBE(sep, path.findFirst('/')) { - name = path.slice(0, *sep).begin(); - kj::StringPtr tail = path.slice(*sep+1); - num = static_cast(atoi(tail.begin())); - name.erase(*sep); - if(tail == "latest") - num = laminar.latestRun(name); - if(num > 0) - return true; - } - } - return false; - } - - kj::Promise writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) { - auto paf = kj::newPromiseAndFulfiller(); - client->fulfiller = kj::mv(paf.fulfiller); - return paf.promise.then([=]{ - kj::Promise p = kj::READY_NOW; - std::list chunks = kj::mv(client->chunks); - for(std::string& s : chunks) { - p = p.then([=,&s]{ - return stream->write(s.data(), s.size()); - }); - } - return p.attach(kj::mv(chunks)).then([=]{ - return client->done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream); - }); - }); - } - - virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, - kj::AsyncInputStream& requestBody, Response& response) override - { - if(headers.isWebSocket()) { - responseHeaders.clear(); - kj::Own lc = kj::heap(laminar, response.acceptWebSocket(responseHeaders)); - return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc)); - } else { - // handle regular HTTP request - const char* start, *end, *content_type; - std::string badge; - // for log requests - std::string name; - uint num; - responseHeaders.clear(); - // Clients usually expect that http servers will ignore unknown query parameters, - // and expect to use this feature to work around browser limitations like there - // being no way to programatically force a resource to be reloaded from the server - // (without "Cache-Control: no-store", which is overkill). See issue #89. - // Since we currently don't handle any query parameters at all, the easiest way - // to achieve this is unconditionally remove all query parameters from the request. - // This will need to be redone if we ever accept query parameters, which probably - // will happen as part of issue #90. - KJ_IF_MAYBE(queryIdx, url.findFirst('?')) { - const_cast(url.begin())[*queryIdx] = '\0'; - url = url.begin(); - } - if(url.startsWith("/archive/")) { - KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) { - auto array = (*file)->mmap(0, (*file)->stat().size); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - auto stream = response.send(200, "OK", responseHeaders, array.size()); - return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); - } - } else if(parseLogEndpoint(url, name, num)) { - kj::Own cc = kj::heap(laminar); - cc->scope.job = name; - cc->scope.num = num; - bool complete; - std::string output; - cc->scope.type = MonitorScope::LOG; - if(laminar.handleLogRequest(name, num, output, complete)) { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8"); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output. - responseHeaders.add("X-Accel-Buffering", "no"); - auto stream = response.send(200, "OK", responseHeaders, nullptr); - laminar.registerClient(cc.get()); - return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{ - if(complete) - return kj::Promise(kj::READY_NOW); - return writeLogChunk(c, s); - }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc)); - } - } else if(url == "/custom/style.css") { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8"); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - std::string css = laminar.getCustomCss(); - auto stream = response.send(200, "OK", responseHeaders, css.size()); - return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream)); - } else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); - responseHeaders.add("Content-Encoding", "gzip"); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - auto stream = response.send(200, "OK", responseHeaders, end-start); - return stream->write(start, end-start).attach(kj::mv(stream)); - } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml"); - responseHeaders.add("Cache-Control", "no-cache"); - auto stream = response.send(200, "OK", responseHeaders, badge.size()); - return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream)); - } - return response.sendError(404, "Not Found", responseHeaders); - } - } - - LaminarInterface& laminar; - Resources resources; - kj::HttpHeaders responseHeaders; -}; - -// Context for an RPC connection -struct RpcConnection { - RpcConnection(kj::Own&& stream, - capnp::Capability::Client bootstrap, - capnp::ReaderOptions readerOpts) : - stream(kj::mv(stream)), - network(*this->stream, capnp::rpc::twoparty::Side::SERVER, readerOpts), - rpcSystem(capnp::makeRpcServer(network, bootstrap)) - { - } - kj::Own stream; - capnp::TwoPartyVatNetwork network; - capnp::RpcSystem rpcSystem; -}; - Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, kj::StringPtr httpBindAddress) : - rpcInterface(kj::heap(li)), laminarInterface(li), ioContext(kj::setupAsyncIo()), - headerTable(), - httpService(kj::heap(li, headerTable)), - httpServer(kj::heap(ioContext.provider->getTimer(), headerTable, *httpService)), listeners(kj::heap(*this)), childTasks(*this), - httpConnections(*this), - httpReady(kj::newPromiseAndFulfiller()) + httpReady(kj::newPromiseAndFulfiller()), + http(kj::heap(li)), + rpc(kj::heap(li)) { // RPC task if(rpcBindAddress.startsWith("unix:")) @@ -507,8 +60,7 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, .then([this](kj::Own&& addr) { // TODO: a better way? Currently used only for testing httpReady.fulfiller->fulfill(); - kj::Own listener = addr->listen(); - return httpServer->listenHttp(*listener).attach(kj::mv(listener)); + return http->startServer(ioContext.lowLevelProvider->getTimer(), addr->listen()); })); // handle watched paths @@ -582,8 +134,7 @@ kj::Promise Server::acceptRpcClient(kj::Own&& list kj::ConnectionReceiver& cr = *listener.get(); return cr.accept().then(kj::mvCapture(kj::mv(listener), [this](kj::Own&& listener, kj::Own&& connection) { - auto server = kj::heap(kj::mv(connection), rpcInterface, capnp::ReaderOptions()); - childTasks.add(server->network.onDisconnect().attach(kj::mv(server))); + childTasks.add(rpc->accept(kj::mv(connection))); return acceptRpcClient(kj::mv(listener)); })); } diff --git a/src/server.h b/src/server.h index 233c82e..ccb4a8a 100644 --- a/src/server.h +++ b/src/server.h @@ -26,6 +26,8 @@ #include struct LaminarInterface; +struct Http; +struct Rpc; // This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces // and manages the program's asynchronous event loop @@ -61,15 +63,10 @@ private: private: int efd_quit; - capnp::Capability::Client rpcInterface; LaminarInterface& laminarInterface; kj::AsyncIoContext ioContext; - kj::HttpHeaderTable headerTable; - kj::Own httpService; - kj::Own httpServer; kj::Own listeners; kj::TaskSet childTasks; - kj::TaskSet httpConnections; kj::Maybe> reapWatch; int inotify_fd; kj::Maybe> pathWatch; @@ -77,6 +74,10 @@ private: // TODO: restructure so this isn't necessary friend class ServerTest; kj::PromiseFulfillerPair httpReady; + + // TODO: WIP + kj::Own http; + kj::Own rpc; }; #endif // LAMINAR_SERVER_H_ diff --git a/test/test-server.cpp b/test/test-server.cpp index 74601b1..c5d365c 100644 --- a/test/test-server.cpp +++ b/test/test-server.cpp @@ -25,6 +25,7 @@ #include "interface.h" #include "laminar.capnp.h" #include "tempdir.h" +#include "rpc.h" class MockLaminar : public LaminarInterface { public: @@ -75,7 +76,7 @@ protected: } LaminarCi::Client client() const { - return server->rpcInterface.castAs(); + return server->rpc->rpcInterface.castAs(); } kj::WaitScope& ws() const { return server->ioContext.waitScope;