diff --git a/CMakeLists.txt b/CMakeLists.txt
index cf73a1b..b9aaebd 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -85,9 +85,19 @@ generate_compressed_bins(${CMAKE_BINARY_DIR} js/vue-router.min.js js/vue.min.js
js/ansi_up.js js/Chart.min.js css/bootstrap.min.css)
# (see resources.cpp where these are fetched)
+set(LAMINARD_SOURCES
+ src/database.cpp
+ src/server.cpp
+ src/laminar.cpp
+ src/conf.cpp
+ src/http.cpp
+ src/resources.cpp
+ src/rpc.cpp
+ src/run.cpp
+)
+
## Server
-add_executable(laminard src/database.cpp src/main.cpp src/server.cpp src/laminar.cpp
- src/conf.cpp src/resources.cpp src/run.cpp laminar.capnp.c++ ${COMPRESSED_BINS} index_html_size.h)
+add_executable(laminard ${LAMINARD_SOURCES} src/main.cpp laminar.capnp.c++ ${COMPRESSED_BINS} index_html_size.h)
target_link_libraries(laminard capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z)
## Client
@@ -99,7 +109,7 @@ set(BUILD_TESTS FALSE CACHE BOOL "Build tests")
if(BUILD_TESTS)
find_package(GTest REQUIRED)
include_directories(${GTEST_INCLUDE_DIRS} src)
- add_executable(laminar-tests src/conf.cpp src/database.cpp src/laminar.cpp src/run.cpp src/server.cpp laminar.capnp.c++ src/resources.cpp ${COMPRESSED_BINS} test/test-conf.cpp test/test-database.cpp test/test-laminar.cpp test/test-run.cpp test/test-server.cpp)
+ add_executable(laminar-tests ${LAMINARD_SOURCES} laminar.capnp.c++ src/resources.cpp ${COMPRESSED_BINS} test/test-conf.cpp test/test-database.cpp test/test-laminar.cpp test/test-run.cpp test/test-server.cpp)
target_link_libraries(laminar-tests ${GTEST_BOTH_LIBRARIES} gmock capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z)
endif()
diff --git a/src/http.cpp b/src/http.cpp
new file mode 100644
index 0000000..aa6aba8
--- /dev/null
+++ b/src/http.cpp
@@ -0,0 +1,306 @@
+///
+/// Copyright 2015-2019 Oliver Giles
+///
+/// This file is part of Laminar
+///
+/// Laminar is free software: you can redistribute it and/or modify
+/// it under the terms of the GNU General Public License as published by
+/// the Free Software Foundation, either version 3 of the License, or
+/// (at your option) any later version.
+///
+/// Laminar is distributed in the hope that it will be useful,
+/// but WITHOUT ANY WARRANTY; without even the implied warranty of
+/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+/// GNU General Public License for more details.
+///
+/// You should have received a copy of the GNU General Public License
+/// along with Laminar. If not, see
+///
+#include "interface.h"
+#include "http.h"
+#include "resources.h"
+#include "log.h"
+
+#include
+#include
+
+// This is the implementation of the HTTP/Websocket interface. It creates
+// websocket connections as LaminarClients and registers them with the
+// LaminarInterface so that status messages will be delivered to the client.
+// On opening a websocket connection, it delivers a status snapshot message
+// (see LaminarInterface::sendStatus)
+class HttpImpl : public kj::HttpService {
+public:
+ HttpImpl(LaminarInterface& laminar, kj::HttpHeaderTable&tbl) :
+ laminar(laminar),
+ responseHeaders(tbl)
+ {}
+ virtual ~HttpImpl() {}
+
+private:
+ class HttpChunkedClient : public LaminarClient {
+ public:
+ HttpChunkedClient(LaminarInterface& laminar) :
+ laminar(laminar)
+ {}
+ ~HttpChunkedClient() override {
+ laminar.deregisterClient(this);
+ }
+ void sendMessage(std::string payload) override {
+ chunks.push_back(kj::mv(payload));
+ fulfiller->fulfill();
+ }
+ void notifyJobFinished() override {
+ done = true;
+ fulfiller->fulfill();
+ }
+ LaminarInterface& laminar;
+ std::list chunks;
+ // cannot use chunks.empty() because multiple fulfill()s
+ // could be coalesced
+ bool done = false;
+ kj::Own> fulfiller;
+ };
+
+ // Implements LaminarClient and holds the Websocket connection object.
+ // Automatically destructed when the promise created in request() resolves
+ // or is cancelled
+ class WebsocketClient : public LaminarClient {
+ public:
+ WebsocketClient(LaminarInterface& laminar, kj::Own&& ws) :
+ laminar(laminar),
+ ws(kj::mv(ws))
+ {}
+ ~WebsocketClient() override {
+ laminar.deregisterClient(this);
+ }
+ virtual void sendMessage(std::string payload) override {
+ messages.emplace_back(kj::mv(payload));
+ // sendMessage might be called several times before the event loop
+ // gets a chance to act on the fulfiller. So store the payload here
+ // where it can be fetched later and don't pass the payload with the
+ // fulfiller because subsequent calls to fulfill() are ignored.
+ fulfiller->fulfill();
+ }
+ LaminarInterface& laminar;
+ kj::Own ws;
+ std::list messages;
+ kj::Own> fulfiller;
+ };
+
+ kj::Promise websocketRead(WebsocketClient& lc)
+ {
+ return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) {
+ KJ_SWITCH_ONEOF(message) {
+ KJ_CASE_ONEOF(str, kj::String) {
+ rapidjson::Document d;
+ d.ParseInsitu(const_cast(str.cStr()));
+ if(d.HasMember("page") && d["page"].IsInt() && d.HasMember("field") && d["field"].IsString() && d.HasMember("order") && d["order"].IsString()) {
+ lc.scope.page = d["page"].GetInt();
+ lc.scope.field = d["field"].GetString();
+ lc.scope.order_desc = strcmp(d["order"].GetString(), "dsc") == 0;
+ laminar.sendStatus(&lc);
+ return websocketRead(lc);
+ }
+ }
+ KJ_CASE_ONEOF(close, kj::WebSocket::Close) {
+ // clean socket shutdown
+ return lc.ws->close(close.code, close.reason);
+ }
+ KJ_CASE_ONEOF_DEFAULT {}
+ }
+ // unhandled/unknown message
+ return lc.ws->disconnect();
+ }, [](kj::Exception&& e){
+ // server logs suggest early catching here avoids fatal exception later
+ // TODO: reproduce in unit test
+ LLOG(WARNING, e.getDescription());
+ return kj::READY_NOW;
+ });
+ }
+
+ kj::Promise websocketWrite(WebsocketClient& lc)
+ {
+ auto paf = kj::newPromiseAndFulfiller();
+ lc.fulfiller = kj::mv(paf.fulfiller);
+ return paf.promise.then([this,&lc]{
+ kj::Promise p = kj::READY_NOW;
+ std::list messages = kj::mv(lc.messages);
+ for(std::string& s : messages) {
+ p = p.then([&s,&lc]{
+ kj::String str = kj::str(s);
+ return lc.ws->send(str).attach(kj::mv(str));
+ });
+ }
+ return p.attach(kj::mv(messages)).then([this,&lc]{
+ return websocketWrite(lc);
+ });
+ });
+ }
+
+ kj::Promise websocketUpgraded(WebsocketClient& lc, std::string resource) {
+ // convert the requested URL to a MonitorScope
+ if(resource.substr(0, 5) == "/jobs") {
+ if(resource.length() == 5) {
+ lc.scope.type = MonitorScope::ALL;
+ } else {
+ resource = resource.substr(5);
+ size_t split = resource.find('/',1);
+ std::string job = resource.substr(1,split-1);
+ if(!job.empty()) {
+ lc.scope.job = job;
+ lc.scope.type = MonitorScope::JOB;
+ }
+ if(split != std::string::npos) {
+ size_t split2 = resource.find('/', split+1);
+ std::string run = resource.substr(split+1, split2-split);
+ if(!run.empty()) {
+ lc.scope.num = static_cast(atoi(run.c_str()));
+ lc.scope.type = MonitorScope::RUN;
+ }
+ if(split2 != std::string::npos && resource.compare(split2, 4, "/log") == 0) {
+ lc.scope.type = MonitorScope::LOG;
+ }
+ }
+ }
+ }
+ laminar.registerClient(&lc);
+ kj::Promise connection = websocketRead(lc).exclusiveJoin(websocketWrite(lc));
+ // registerClient can happen after a successful websocket handshake.
+ // However, the connection might not be closed gracefully, so the
+ // corresponding deregister operation happens in the WebsocketClient
+ // destructor rather than the close handler or a then() clause
+ laminar.sendStatus(&lc);
+ return connection;
+ }
+
+ // Parses the url of the form /log/NAME/NUMBER, filling in the passed
+ // references and returning true if successful. /log/NAME/latest is
+ // also allowed, in which case the num reference is set to 0
+ bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) {
+ if(url.startsWith("/log/")) {
+ kj::StringPtr path = url.slice(5);
+ KJ_IF_MAYBE(sep, path.findFirst('/')) {
+ name = path.slice(0, *sep).begin();
+ kj::StringPtr tail = path.slice(*sep+1);
+ num = static_cast(atoi(tail.begin()));
+ name.erase(*sep);
+ if(tail == "latest")
+ num = laminar.latestRun(name);
+ if(num > 0)
+ return true;
+ }
+ }
+ return false;
+ }
+
+ kj::Promise writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) {
+ auto paf = kj::newPromiseAndFulfiller();
+ client->fulfiller = kj::mv(paf.fulfiller);
+ return paf.promise.then([=]{
+ kj::Promise p = kj::READY_NOW;
+ std::list chunks = kj::mv(client->chunks);
+ for(std::string& s : chunks) {
+ p = p.then([=,&s]{
+ return stream->write(s.data(), s.size());
+ });
+ }
+ return p.attach(kj::mv(chunks)).then([=]{
+ return client->done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream);
+ });
+ });
+ }
+
+ virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers,
+ kj::AsyncInputStream& requestBody, Response& response) override
+ {
+ if(headers.isWebSocket()) {
+ responseHeaders.clear();
+ kj::Own lc = kj::heap(laminar, response.acceptWebSocket(responseHeaders));
+ return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc));
+ } else {
+ // handle regular HTTP request
+ const char* start, *end, *content_type;
+ std::string badge;
+ // for log requests
+ std::string name;
+ uint num;
+ responseHeaders.clear();
+ // Clients usually expect that http servers will ignore unknown query parameters,
+ // and expect to use this feature to work around browser limitations like there
+ // being no way to programatically force a resource to be reloaded from the server
+ // (without "Cache-Control: no-store", which is overkill). See issue #89.
+ // Since we currently don't handle any query parameters at all, the easiest way
+ // to achieve this is unconditionally remove all query parameters from the request.
+ // This will need to be redone if we ever accept query parameters, which probably
+ // will happen as part of issue #90.
+ KJ_IF_MAYBE(queryIdx, url.findFirst('?')) {
+ const_cast(url.begin())[*queryIdx] = '\0';
+ url = url.begin();
+ }
+ if(url.startsWith("/archive/")) {
+ KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) {
+ auto array = (*file)->mmap(0, (*file)->stat().size);
+ responseHeaders.add("Content-Transfer-Encoding", "binary");
+ auto stream = response.send(200, "OK", responseHeaders, array.size());
+ return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream));
+ }
+ } else if(parseLogEndpoint(url, name, num)) {
+ kj::Own cc = kj::heap(laminar);
+ cc->scope.job = name;
+ cc->scope.num = num;
+ bool complete;
+ std::string output;
+ cc->scope.type = MonitorScope::LOG;
+ if(laminar.handleLogRequest(name, num, output, complete)) {
+ responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8");
+ responseHeaders.add("Content-Transfer-Encoding", "binary");
+ // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output.
+ responseHeaders.add("X-Accel-Buffering", "no");
+ auto stream = response.send(200, "OK", responseHeaders, nullptr);
+ laminar.registerClient(cc.get());
+ return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{
+ if(complete)
+ return kj::Promise(kj::READY_NOW);
+ return writeLogChunk(c, s);
+ }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc));
+ }
+ } else if(url == "/custom/style.css") {
+ responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8");
+ responseHeaders.add("Content-Transfer-Encoding", "binary");
+ std::string css = laminar.getCustomCss();
+ auto stream = response.send(200, "OK", responseHeaders, css.size());
+ return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream));
+ } else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) {
+ responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type);
+ responseHeaders.add("Content-Encoding", "gzip");
+ responseHeaders.add("Content-Transfer-Encoding", "binary");
+ auto stream = response.send(200, "OK", responseHeaders, end-start);
+ return stream->write(start, end-start).attach(kj::mv(stream));
+ } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) {
+ responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml");
+ responseHeaders.add("Cache-Control", "no-cache");
+ auto stream = response.send(200, "OK", responseHeaders, badge.size());
+ return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream));
+ }
+ return response.sendError(404, "Not Found", responseHeaders);
+ }
+ }
+
+ LaminarInterface& laminar;
+ Resources resources;
+ kj::HttpHeaders responseHeaders;
+};
+
+Http::Http(LaminarInterface &li) :
+ headerTable(kj::heap()),
+ httpService(kj::heap(li, *headerTable)),
+ laminar(li)
+{
+
+}
+
+kj::Promise Http::startServer(kj::Timer& timer, kj::Own&& listener) {
+ auto httpServer = kj::heap(timer, *headerTable, *httpService);
+ return httpServer->listenHttp(*listener).attach(kj::mv(listener)).attach(kj::mv(httpServer));
+}
diff --git a/src/http.h b/src/http.h
new file mode 100644
index 0000000..5098b39
--- /dev/null
+++ b/src/http.h
@@ -0,0 +1,36 @@
+///
+/// Copyright 2019 Oliver Giles
+///
+/// This file is part of Laminar
+///
+/// Laminar is free software: you can redistribute it and/or modify
+/// it under the terms of the GNU General Public License as published by
+/// the Free Software Foundation, either version 3 of the License, or
+/// (at your option) any later version.
+///
+/// Laminar is distributed in the hope that it will be useful,
+/// but WITHOUT ANY WARRANTY; without even the implied warranty of
+/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+/// GNU General Public License for more details.
+///
+/// You should have received a copy of the GNU General Public License
+/// along with Laminar. If not, see
+///
+#ifndef LAMINAR_HTTP_H_
+#define LAMINAR_HTTP_H_
+
+#include
+
+struct LaminarInterface;
+
+class Http {
+public:
+ Http(LaminarInterface &li);
+ kj::Promise startServer(kj::Timer &timer, kj::Own&& listener);
+
+ kj::Own headerTable;
+ kj::Own httpService;
+ LaminarInterface& laminar;
+};
+
+#endif //LAMINAR_HTTP_H_
diff --git a/src/rpc.cpp b/src/rpc.cpp
new file mode 100644
index 0000000..c7a9a3e
--- /dev/null
+++ b/src/rpc.cpp
@@ -0,0 +1,200 @@
+///
+/// Copyright 2015-2019 Oliver Giles
+///
+/// This file is part of Laminar
+///
+/// Laminar is free software: you can redistribute it and/or modify
+/// it under the terms of the GNU General Public License as published by
+/// the Free Software Foundation, either version 3 of the License, or
+/// (at your option) any later version.
+///
+/// Laminar is distributed in the hope that it will be useful,
+/// but WITHOUT ANY WARRANTY; without even the implied warranty of
+/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+/// GNU General Public License for more details.
+///
+/// You should have received a copy of the GNU General Public License
+/// along with Laminar. If not, see
+///
+#include "rpc.h"
+#include "laminar.capnp.h"
+
+#include "interface.h"
+#include "log.h"
+
+namespace {
+
+// Used for returning run state to RPC clients
+LaminarCi::JobResult fromRunState(RunState state) {
+ switch(state) {
+ case RunState::SUCCESS: return LaminarCi::JobResult::SUCCESS;
+ case RunState::FAILED: return LaminarCi::JobResult::FAILED;
+ case RunState::ABORTED: return LaminarCi::JobResult::ABORTED;
+ default:
+ return LaminarCi::JobResult::UNKNOWN;
+ }
+}
+
+}
+// This is the implementation of the Laminar Cap'n Proto RPC interface.
+// As such, it implements the pure virtual interface generated from
+// laminar.capnp with calls to the LaminarInterface
+class RpcImpl : public LaminarCi::Server, public LaminarWaiter {
+public:
+ RpcImpl(LaminarInterface& l) :
+ LaminarCi::Server(),
+ laminar(l)
+ {
+ }
+
+ ~RpcImpl() override {
+ }
+
+ // Queue a job, without waiting for it to start
+ kj::Promise queue(QueueContext context) override {
+ std::string jobName = context.getParams().getJobName();
+ LLOG(INFO, "RPC queue", jobName);
+ LaminarCi::MethodResult result = laminar.queueJob(jobName, params(context.getParams().getParams()))
+ ? LaminarCi::MethodResult::SUCCESS
+ : LaminarCi::MethodResult::FAILED;
+ context.getResults().setResult(result);
+ return kj::READY_NOW;
+ }
+
+ // Start a job, without waiting for it to finish
+ kj::Promise start(StartContext context) override {
+ std::string jobName = context.getParams().getJobName();
+ LLOG(INFO, "RPC start", jobName);
+ std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams()));
+ if(Run* r = run.get()) {
+ return r->whenStarted().then([context,r]() mutable {
+ context.getResults().setResult(LaminarCi::MethodResult::SUCCESS);
+ context.getResults().setBuildNum(r->build);
+ });
+ } else {
+ context.getResults().setResult(LaminarCi::MethodResult::FAILED);
+ return kj::READY_NOW;
+ }
+ }
+
+ // Start a job and wait for the result
+ kj::Promise run(RunContext context) override {
+ std::string jobName = context.getParams().getJobName();
+ LLOG(INFO, "RPC run", jobName);
+ std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams()));
+ if(Run* r = run.get()) {
+ runWaiters[r].emplace_back(kj::newPromiseAndFulfiller());
+ return runWaiters[r].back().promise.then([context,run](RunState state) mutable {
+ context.getResults().setResult(fromRunState(state));
+ context.getResults().setBuildNum(run->build);
+ });
+ } else {
+ context.getResults().setResult(LaminarCi::JobResult::UNKNOWN);
+ return kj::READY_NOW;
+ }
+ }
+
+ // Set a parameter on a running build
+ kj::Promise set(SetContext context) override {
+ std::string jobName = context.getParams().getRun().getJob();
+ uint buildNum = context.getParams().getRun().getBuildNum();
+ LLOG(INFO, "RPC set", jobName, buildNum);
+
+ LaminarCi::MethodResult result = laminar.setParam(jobName, buildNum,
+ context.getParams().getParam().getName(), context.getParams().getParam().getValue())
+ ? LaminarCi::MethodResult::SUCCESS
+ : LaminarCi::MethodResult::FAILED;
+ context.getResults().setResult(result);
+ return kj::READY_NOW;
+ }
+
+ // List jobs in queue
+ kj::Promise listQueued(ListQueuedContext context) override {
+ const std::list>& queue = laminar.listQueuedJobs();
+ auto res = context.getResults().initResult(queue.size());
+ int i = 0;
+ for(auto it : queue) {
+ res.set(i++, it->name);
+ }
+ return kj::READY_NOW;
+ }
+
+ // List running jobs
+ kj::Promise listRunning(ListRunningContext context) override {
+ const RunSet& active = laminar.listRunningJobs();
+ auto res = context.getResults().initResult(active.size());
+ int i = 0;
+ for(auto it : active) {
+ res[i].setJob(it->name);
+ res[i].setBuildNum(it->build);
+ i++;
+ }
+ return kj::READY_NOW;
+ }
+
+ // List known jobs
+ kj::Promise listKnown(ListKnownContext context) override {
+ std::list known = laminar.listKnownJobs();
+ auto res = context.getResults().initResult(known.size());
+ int i = 0;
+ for(auto it : known) {
+ res.set(i++, it);
+ }
+ return kj::READY_NOW;
+ }
+
+ kj::Promise abort(AbortContext context) override {
+ std::string jobName = context.getParams().getRun().getJob();
+ uint buildNum = context.getParams().getRun().getBuildNum();
+ LLOG(INFO, "RPC abort", jobName, buildNum);
+ LaminarCi::MethodResult result = laminar.abort(jobName, buildNum)
+ ? LaminarCi::MethodResult::SUCCESS
+ : LaminarCi::MethodResult::FAILED;
+ context.getResults().setResult(result);
+ return kj::READY_NOW;
+ }
+
+private:
+ // Helper to convert an RPC parameter list to a hash map
+ ParamMap params(const capnp::List::Reader& paramReader) {
+ ParamMap res;
+ for(auto p : paramReader) {
+ res[p.getName().cStr()] = p.getValue().cStr();
+ }
+ return res;
+ }
+
+ // Implements LaminarWaiter::complete
+ void complete(const Run* r) override {
+ for(kj::PromiseFulfillerPair& w : runWaiters[r])
+ w.fulfiller->fulfill(RunState(r->result));
+ runWaiters.erase(r);
+ }
+private:
+ LaminarInterface& laminar;
+ std::unordered_map>> runWaiters;
+};
+
+Rpc::Rpc(LaminarInterface& li) :
+ rpcInterface(kj::heap(li))
+{}
+
+// Context for an RPC connection
+struct RpcConnection {
+ RpcConnection(kj::Own&& stream,
+ capnp::Capability::Client bootstrap,
+ capnp::ReaderOptions readerOpts) :
+ stream(kj::mv(stream)),
+ network(*this->stream, capnp::rpc::twoparty::Side::SERVER, readerOpts),
+ rpcSystem(capnp::makeRpcServer(network, bootstrap))
+ {
+ }
+ kj::Own stream;
+ capnp::TwoPartyVatNetwork network;
+ capnp::RpcSystem rpcSystem;
+};
+
+kj::Promise Rpc::accept(kj::Own&& connection) {
+ auto server = kj::heap(kj::mv(connection), rpcInterface, capnp::ReaderOptions());
+ return server->network.onDisconnect().attach(kj::mv(server));
+}
diff --git a/src/rpc.h b/src/rpc.h
new file mode 100644
index 0000000..823980e
--- /dev/null
+++ b/src/rpc.h
@@ -0,0 +1,36 @@
+///
+/// Copyright 2019 Oliver Giles
+///
+/// This file is part of Laminar
+///
+/// Laminar is free software: you can redistribute it and/or modify
+/// it under the terms of the GNU General Public License as published by
+/// the Free Software Foundation, either version 3 of the License, or
+/// (at your option) any later version.
+///
+/// Laminar is distributed in the hope that it will be useful,
+/// but WITHOUT ANY WARRANTY; without even the implied warranty of
+/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+/// GNU General Public License for more details.
+///
+/// You should have received a copy of the GNU General Public License
+/// along with Laminar. If not, see
+///
+#ifndef LAMINAR_RPC_H_
+#define LAMINAR_RPC_H_
+
+#include
+#include
+#include
+
+struct LaminarInterface;
+
+class Rpc {
+public:
+ Rpc(LaminarInterface &li);
+ kj::Promise accept(kj::Own&& connection);
+
+ capnp::Capability::Client rpcInterface;
+};
+
+#endif //LAMINAR_RPC_H_
diff --git a/src/server.cpp b/src/server.cpp
index 484783c..f98ef4a 100644
--- a/src/server.cpp
+++ b/src/server.cpp
@@ -18,13 +18,10 @@
///
#include "server.h"
#include "interface.h"
-#include "laminar.capnp.h"
-#include "resources.h"
#include "log.h"
+#include "rpc.h"
+#include "http.h"
-#include
-#include
-#include
#include
#include
#include
@@ -34,463 +31,19 @@
#include
#include
-#include
-
// Size of buffer used to read from file descriptors. Should be
// a multiple of sizeof(struct signalfd_siginfo) == 128
#define PROC_IO_BUFSIZE 4096
-namespace {
-
-// Used for returning run state to RPC clients
-LaminarCi::JobResult fromRunState(RunState state) {
- switch(state) {
- case RunState::SUCCESS: return LaminarCi::JobResult::SUCCESS;
- case RunState::FAILED: return LaminarCi::JobResult::FAILED;
- case RunState::ABORTED: return LaminarCi::JobResult::ABORTED;
- default:
- return LaminarCi::JobResult::UNKNOWN;
- }
-}
-
-}
-
-// This is the implementation of the Laminar Cap'n Proto RPC interface.
-// As such, it implements the pure virtual interface generated from
-// laminar.capnp with calls to the LaminarInterface
-class RpcImpl : public LaminarCi::Server, public LaminarWaiter {
-public:
- RpcImpl(LaminarInterface& l) :
- LaminarCi::Server(),
- laminar(l)
- {
- laminar.registerWaiter(this);
- }
-
- ~RpcImpl() override {
- laminar.deregisterWaiter(this);
- }
-
- // Queue a job, without waiting for it to start
- kj::Promise queue(QueueContext context) override {
- std::string jobName = context.getParams().getJobName();
- LLOG(INFO, "RPC queue", jobName);
- LaminarCi::MethodResult result = laminar.queueJob(jobName, params(context.getParams().getParams()))
- ? LaminarCi::MethodResult::SUCCESS
- : LaminarCi::MethodResult::FAILED;
- context.getResults().setResult(result);
- return kj::READY_NOW;
- }
-
- // Start a job, without waiting for it to finish
- kj::Promise start(StartContext context) override {
- std::string jobName = context.getParams().getJobName();
- LLOG(INFO, "RPC start", jobName);
- std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams()));
- if(Run* r = run.get()) {
- return r->whenStarted().then([context,r]() mutable {
- context.getResults().setResult(LaminarCi::MethodResult::SUCCESS);
- context.getResults().setBuildNum(r->build);
- });
- } else {
- context.getResults().setResult(LaminarCi::MethodResult::FAILED);
- return kj::READY_NOW;
- }
- }
-
- // Start a job and wait for the result
- kj::Promise run(RunContext context) override {
- std::string jobName = context.getParams().getJobName();
- LLOG(INFO, "RPC run", jobName);
- std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams()));
- if(const Run* r = run.get()) {
- runWaiters[r].emplace_back(kj::newPromiseAndFulfiller());
- return runWaiters[r].back().promise.then([context,run](RunState state) mutable {
- context.getResults().setResult(fromRunState(state));
- context.getResults().setBuildNum(run->build);
- });
- } else {
- context.getResults().setResult(LaminarCi::JobResult::UNKNOWN);
- return kj::READY_NOW;
- }
- }
-
- // Set a parameter on a running build
- kj::Promise set(SetContext context) override {
- std::string jobName = context.getParams().getRun().getJob();
- uint buildNum = context.getParams().getRun().getBuildNum();
- LLOG(INFO, "RPC set", jobName, buildNum);
-
- LaminarCi::MethodResult result = laminar.setParam(jobName, buildNum,
- context.getParams().getParam().getName(), context.getParams().getParam().getValue())
- ? LaminarCi::MethodResult::SUCCESS
- : LaminarCi::MethodResult::FAILED;
- context.getResults().setResult(result);
- return kj::READY_NOW;
- }
-
- // List jobs in queue
- kj::Promise listQueued(ListQueuedContext context) override {
- const std::list>& queue = laminar.listQueuedJobs();
- auto res = context.getResults().initResult(queue.size());
- int i = 0;
- for(auto it : queue) {
- res.set(i++, it->name);
- }
- return kj::READY_NOW;
- }
-
- // List running jobs
- kj::Promise listRunning(ListRunningContext context) override {
- const RunSet& active = laminar.listRunningJobs();
- auto res = context.getResults().initResult(active.size());
- int i = 0;
- for(auto it : active) {
- res[i].setJob(it->name);
- res[i].setBuildNum(it->build);
- i++;
- }
- return kj::READY_NOW;
- }
-
- // List known jobs
- kj::Promise listKnown(ListKnownContext context) override {
- std::list known = laminar.listKnownJobs();
- auto res = context.getResults().initResult(known.size());
- int i = 0;
- for(auto it : known) {
- res.set(i++, it);
- }
- return kj::READY_NOW;
- }
-
- kj::Promise abort(AbortContext context) override {
- std::string jobName = context.getParams().getRun().getJob();
- uint buildNum = context.getParams().getRun().getBuildNum();
- LLOG(INFO, "RPC abort", jobName, buildNum);
- LaminarCi::MethodResult result = laminar.abort(jobName, buildNum)
- ? LaminarCi::MethodResult::SUCCESS
- : LaminarCi::MethodResult::FAILED;
- context.getResults().setResult(result);
- return kj::READY_NOW;
- }
-
-private:
- // Helper to convert an RPC parameter list to a hash map
- ParamMap params(const capnp::List::Reader& paramReader) {
- ParamMap res;
- for(auto p : paramReader) {
- res[p.getName().cStr()] = p.getValue().cStr();
- }
- return res;
- }
-
- // Implements LaminarWaiter::complete
- void complete(const Run* r) override {
- for(kj::PromiseFulfillerPair& w : runWaiters[r])
- w.fulfiller->fulfill(RunState(r->result));
- runWaiters.erase(r);
- }
-private:
- LaminarInterface& laminar;
- std::unordered_map>> runWaiters;
-};
-
-// This is the implementation of the HTTP/Websocket interface. It creates
-// websocket connections as LaminarClients and registers them with the
-// LaminarInterface so that status messages will be delivered to the client.
-// On opening a websocket connection, it delivers a status snapshot message
-// (see LaminarInterface::sendStatus)
-class HttpImpl : public kj::HttpService {
-public:
- HttpImpl(LaminarInterface& laminar, kj::HttpHeaderTable&tbl) :
- laminar(laminar),
- responseHeaders(tbl)
- {}
- virtual ~HttpImpl() {}
-
-private:
- class HttpChunkedClient : public LaminarClient {
- public:
- HttpChunkedClient(LaminarInterface& laminar) :
- laminar(laminar)
- {}
- ~HttpChunkedClient() override {
- laminar.deregisterClient(this);
- }
- void sendMessage(std::string payload) override {
- chunks.push_back(kj::mv(payload));
- fulfiller->fulfill();
- }
- void notifyJobFinished() override {
- done = true;
- fulfiller->fulfill();
- }
- LaminarInterface& laminar;
- std::list chunks;
- // cannot use chunks.empty() because multiple fulfill()s
- // could be coalesced
- bool done = false;
- kj::Own> fulfiller;
- };
-
- // Implements LaminarClient and holds the Websocket connection object.
- // Automatically destructed when the promise created in request() resolves
- // or is cancelled
- class WebsocketClient : public LaminarClient {
- public:
- WebsocketClient(LaminarInterface& laminar, kj::Own&& ws) :
- laminar(laminar),
- ws(kj::mv(ws))
- {}
- ~WebsocketClient() override {
- laminar.deregisterClient(this);
- }
- virtual void sendMessage(std::string payload) override {
- messages.emplace_back(kj::mv(payload));
- // sendMessage might be called several times before the event loop
- // gets a chance to act on the fulfiller. So store the payload here
- // where it can be fetched later and don't pass the payload with the
- // fulfiller because subsequent calls to fulfill() are ignored.
- fulfiller->fulfill();
- }
- LaminarInterface& laminar;
- kj::Own ws;
- std::list messages;
- kj::Own> fulfiller;
- };
-
- kj::Promise websocketRead(WebsocketClient& lc)
- {
- return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) {
- KJ_SWITCH_ONEOF(message) {
- KJ_CASE_ONEOF(str, kj::String) {
- rapidjson::Document d;
- d.ParseInsitu(const_cast(str.cStr()));
- if(d.HasMember("page") && d["page"].IsInt() && d.HasMember("field") && d["field"].IsString() && d.HasMember("order") && d["order"].IsString()) {
- lc.scope.page = d["page"].GetInt();
- lc.scope.field = d["field"].GetString();
- lc.scope.order_desc = strcmp(d["order"].GetString(), "dsc") == 0;
- laminar.sendStatus(&lc);
- return websocketRead(lc);
- }
- }
- KJ_CASE_ONEOF(close, kj::WebSocket::Close) {
- // clean socket shutdown
- return lc.ws->close(close.code, close.reason);
- }
- KJ_CASE_ONEOF_DEFAULT {}
- }
- // unhandled/unknown message
- return lc.ws->disconnect();
- }, [](kj::Exception&& e){
- // server logs suggest early catching here avoids fatal exception later
- // TODO: reproduce in unit test
- LLOG(WARNING, e.getDescription());
- return kj::READY_NOW;
- });
- }
-
- kj::Promise websocketWrite(WebsocketClient& lc)
- {
- auto paf = kj::newPromiseAndFulfiller();
- lc.fulfiller = kj::mv(paf.fulfiller);
- return paf.promise.then([this,&lc]{
- kj::Promise p = kj::READY_NOW;
- std::list messages = kj::mv(lc.messages);
- for(std::string& s : messages) {
- p = p.then([&s,&lc]{
- kj::String str = kj::str(s);
- return lc.ws->send(str).attach(kj::mv(str));
- });
- }
- return p.attach(kj::mv(messages)).then([this,&lc]{
- return websocketWrite(lc);
- });
- });
- }
-
- kj::Promise websocketUpgraded(WebsocketClient& lc, std::string resource) {
- // convert the requested URL to a MonitorScope
- if(resource.substr(0, 5) == "/jobs") {
- if(resource.length() == 5) {
- lc.scope.type = MonitorScope::ALL;
- } else {
- resource = resource.substr(5);
- size_t split = resource.find('/',1);
- std::string job = resource.substr(1,split-1);
- if(!job.empty()) {
- lc.scope.job = job;
- lc.scope.type = MonitorScope::JOB;
- }
- if(split != std::string::npos) {
- size_t split2 = resource.find('/', split+1);
- std::string run = resource.substr(split+1, split2-split);
- if(!run.empty()) {
- lc.scope.num = static_cast(atoi(run.c_str()));
- lc.scope.type = MonitorScope::RUN;
- }
- if(split2 != std::string::npos && resource.compare(split2, 4, "/log") == 0) {
- lc.scope.type = MonitorScope::LOG;
- }
- }
- }
- }
- laminar.registerClient(&lc);
- kj::Promise connection = websocketRead(lc).exclusiveJoin(websocketWrite(lc));
- // registerClient can happen after a successful websocket handshake.
- // However, the connection might not be closed gracefully, so the
- // corresponding deregister operation happens in the WebsocketClient
- // destructor rather than the close handler or a then() clause
- laminar.sendStatus(&lc);
- return connection;
- }
-
- // Parses the url of the form /log/NAME/NUMBER, filling in the passed
- // references and returning true if successful. /log/NAME/latest is
- // also allowed, in which case the num reference is set to 0
- bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) {
- if(url.startsWith("/log/")) {
- kj::StringPtr path = url.slice(5);
- KJ_IF_MAYBE(sep, path.findFirst('/')) {
- name = path.slice(0, *sep).begin();
- kj::StringPtr tail = path.slice(*sep+1);
- num = static_cast(atoi(tail.begin()));
- name.erase(*sep);
- if(tail == "latest")
- num = laminar.latestRun(name);
- if(num > 0)
- return true;
- }
- }
- return false;
- }
-
- kj::Promise writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) {
- auto paf = kj::newPromiseAndFulfiller();
- client->fulfiller = kj::mv(paf.fulfiller);
- return paf.promise.then([=]{
- kj::Promise p = kj::READY_NOW;
- std::list chunks = kj::mv(client->chunks);
- for(std::string& s : chunks) {
- p = p.then([=,&s]{
- return stream->write(s.data(), s.size());
- });
- }
- return p.attach(kj::mv(chunks)).then([=]{
- return client->done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream);
- });
- });
- }
-
- virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers,
- kj::AsyncInputStream& requestBody, Response& response) override
- {
- if(headers.isWebSocket()) {
- responseHeaders.clear();
- kj::Own lc = kj::heap(laminar, response.acceptWebSocket(responseHeaders));
- return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc));
- } else {
- // handle regular HTTP request
- const char* start, *end, *content_type;
- std::string badge;
- // for log requests
- std::string name;
- uint num;
- responseHeaders.clear();
- // Clients usually expect that http servers will ignore unknown query parameters,
- // and expect to use this feature to work around browser limitations like there
- // being no way to programatically force a resource to be reloaded from the server
- // (without "Cache-Control: no-store", which is overkill). See issue #89.
- // Since we currently don't handle any query parameters at all, the easiest way
- // to achieve this is unconditionally remove all query parameters from the request.
- // This will need to be redone if we ever accept query parameters, which probably
- // will happen as part of issue #90.
- KJ_IF_MAYBE(queryIdx, url.findFirst('?')) {
- const_cast(url.begin())[*queryIdx] = '\0';
- url = url.begin();
- }
- if(url.startsWith("/archive/")) {
- KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) {
- auto array = (*file)->mmap(0, (*file)->stat().size);
- responseHeaders.add("Content-Transfer-Encoding", "binary");
- auto stream = response.send(200, "OK", responseHeaders, array.size());
- return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream));
- }
- } else if(parseLogEndpoint(url, name, num)) {
- kj::Own cc = kj::heap(laminar);
- cc->scope.job = name;
- cc->scope.num = num;
- bool complete;
- std::string output;
- cc->scope.type = MonitorScope::LOG;
- if(laminar.handleLogRequest(name, num, output, complete)) {
- responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8");
- responseHeaders.add("Content-Transfer-Encoding", "binary");
- // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output.
- responseHeaders.add("X-Accel-Buffering", "no");
- auto stream = response.send(200, "OK", responseHeaders, nullptr);
- laminar.registerClient(cc.get());
- return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{
- if(complete)
- return kj::Promise(kj::READY_NOW);
- return writeLogChunk(c, s);
- }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc));
- }
- } else if(url == "/custom/style.css") {
- responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8");
- responseHeaders.add("Content-Transfer-Encoding", "binary");
- std::string css = laminar.getCustomCss();
- auto stream = response.send(200, "OK", responseHeaders, css.size());
- return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream));
- } else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) {
- responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type);
- responseHeaders.add("Content-Encoding", "gzip");
- responseHeaders.add("Content-Transfer-Encoding", "binary");
- auto stream = response.send(200, "OK", responseHeaders, end-start);
- return stream->write(start, end-start).attach(kj::mv(stream));
- } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) {
- responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml");
- responseHeaders.add("Cache-Control", "no-cache");
- auto stream = response.send(200, "OK", responseHeaders, badge.size());
- return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream));
- }
- return response.sendError(404, "Not Found", responseHeaders);
- }
- }
-
- LaminarInterface& laminar;
- Resources resources;
- kj::HttpHeaders responseHeaders;
-};
-
-// Context for an RPC connection
-struct RpcConnection {
- RpcConnection(kj::Own&& stream,
- capnp::Capability::Client bootstrap,
- capnp::ReaderOptions readerOpts) :
- stream(kj::mv(stream)),
- network(*this->stream, capnp::rpc::twoparty::Side::SERVER, readerOpts),
- rpcSystem(capnp::makeRpcServer(network, bootstrap))
- {
- }
- kj::Own stream;
- capnp::TwoPartyVatNetwork network;
- capnp::RpcSystem rpcSystem;
-};
-
Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
kj::StringPtr httpBindAddress) :
- rpcInterface(kj::heap(li)),
laminarInterface(li),
ioContext(kj::setupAsyncIo()),
- headerTable(),
- httpService(kj::heap(li, headerTable)),
- httpServer(kj::heap(ioContext.provider->getTimer(), headerTable, *httpService)),
listeners(kj::heap(*this)),
childTasks(*this),
- httpConnections(*this),
- httpReady(kj::newPromiseAndFulfiller())
+ httpReady(kj::newPromiseAndFulfiller()),
+ http(kj::heap(li)),
+ rpc(kj::heap(li))
{
// RPC task
if(rpcBindAddress.startsWith("unix:"))
@@ -507,8 +60,7 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
.then([this](kj::Own&& addr) {
// TODO: a better way? Currently used only for testing
httpReady.fulfiller->fulfill();
- kj::Own listener = addr->listen();
- return httpServer->listenHttp(*listener).attach(kj::mv(listener));
+ return http->startServer(ioContext.lowLevelProvider->getTimer(), addr->listen());
}));
// handle watched paths
@@ -582,8 +134,7 @@ kj::Promise Server::acceptRpcClient(kj::Own&& list
kj::ConnectionReceiver& cr = *listener.get();
return cr.accept().then(kj::mvCapture(kj::mv(listener),
[this](kj::Own&& listener, kj::Own&& connection) {
- auto server = kj::heap(kj::mv(connection), rpcInterface, capnp::ReaderOptions());
- childTasks.add(server->network.onDisconnect().attach(kj::mv(server)));
+ childTasks.add(rpc->accept(kj::mv(connection)));
return acceptRpcClient(kj::mv(listener));
}));
}
diff --git a/src/server.h b/src/server.h
index 233c82e..ccb4a8a 100644
--- a/src/server.h
+++ b/src/server.h
@@ -26,6 +26,8 @@
#include
struct LaminarInterface;
+struct Http;
+struct Rpc;
// This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces
// and manages the program's asynchronous event loop
@@ -61,15 +63,10 @@ private:
private:
int efd_quit;
- capnp::Capability::Client rpcInterface;
LaminarInterface& laminarInterface;
kj::AsyncIoContext ioContext;
- kj::HttpHeaderTable headerTable;
- kj::Own httpService;
- kj::Own httpServer;
kj::Own listeners;
kj::TaskSet childTasks;
- kj::TaskSet httpConnections;
kj::Maybe> reapWatch;
int inotify_fd;
kj::Maybe> pathWatch;
@@ -77,6 +74,10 @@ private:
// TODO: restructure so this isn't necessary
friend class ServerTest;
kj::PromiseFulfillerPair httpReady;
+
+ // TODO: WIP
+ kj::Own http;
+ kj::Own rpc;
};
#endif // LAMINAR_SERVER_H_
diff --git a/test/test-server.cpp b/test/test-server.cpp
index 74601b1..c5d365c 100644
--- a/test/test-server.cpp
+++ b/test/test-server.cpp
@@ -25,6 +25,7 @@
#include "interface.h"
#include "laminar.capnp.h"
#include "tempdir.h"
+#include "rpc.h"
class MockLaminar : public LaminarInterface {
public:
@@ -75,7 +76,7 @@ protected:
}
LaminarCi::Client client() const {
- return server->rpcInterface.castAs();
+ return server->rpc->rpcInterface.castAs();
}
kj::WaitScope& ws() const {
return server->ioContext.waitScope;