From 39ca7e86cf3f89e09611c0c25ad48407b7b1e3d6 Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Sat, 5 Oct 2019 20:06:35 +0300 Subject: [PATCH] replace websockets with sse and refactor Large refactor that more closely aligns the codebase to the kj async style, more clearly exposes an interface for functional testing and removes cruft. There is a slight increase in coupling between the Laminar and Http/Rpc classes, but this was always an issue, just until now more obscured by the arbitrary pure virtual LaminarInterface class (which has been removed in this change) and the previous lumping together of all the async stuff in the Server class (which is now more spread around the code according to function). This change replaces the use of Websockets with Server Side Events (SSE). They are simpler and more suitable for the publish-style messages used by Laminar, and typically require less configuration of the reverse proxy HTTP server. Use of gmock is also removed, which eases testing in certain envs. Resolves #90. --- CMakeLists.txt | 10 +- UserManual.md | 8 +- src/http.cpp | 551 +++++++++--------- src/http.h | 42 +- src/interface.h | 175 ------ src/laminar.cpp | 199 ++----- src/laminar.h | 101 ++-- src/main.cpp | 32 +- src/monitorscope.h | 62 ++ src/resources/js/app.js | 78 ++- src/rpc.cpp | 27 +- src/rpc.h | 4 +- src/run.cpp | 5 +- src/run.h | 2 + src/server.cpp | 97 +-- src/server.h | 37 +- test/eventsource.h | 71 +++ test/laminar-fixture.h | 82 +++ test/laminar-functional.cpp | 75 +++ test/main.cpp | 28 + test/test-laminar.cpp | 47 -- test/test-server.cpp | 138 ----- test/{test-conf.cpp => unit-conf.cpp} | 0 test/{test-database.cpp => unit-database.cpp} | 0 test/{test-run.cpp => unit-run.cpp} | 0 25 files changed, 905 insertions(+), 966 deletions(-) delete mode 100644 src/interface.h create mode 100644 src/monitorscope.h create mode 100644 test/eventsource.h create mode 100644 test/laminar-fixture.h create mode 100644 test/laminar-functional.cpp create mode 100644 test/main.cpp delete mode 100644 test/test-laminar.cpp delete mode 100644 test/test-server.cpp rename test/{test-conf.cpp => unit-conf.cpp} (100%) rename test/{test-database.cpp => unit-database.cpp} (100%) rename test/{test-run.cpp => unit-run.cpp} (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index b9aaebd..863ea12 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -85,7 +85,7 @@ generate_compressed_bins(${CMAKE_BINARY_DIR} js/vue-router.min.js js/vue.min.js js/ansi_up.js js/Chart.min.js css/bootstrap.min.css) # (see resources.cpp where these are fetched) -set(LAMINARD_SOURCES +set(LAMINARD_CORE_SOURCES src/database.cpp src/server.cpp src/laminar.cpp @@ -94,10 +94,12 @@ set(LAMINARD_SOURCES src/resources.cpp src/rpc.cpp src/run.cpp + laminar.capnp.c++ + index_html_size.h ) ## Server -add_executable(laminard ${LAMINARD_SOURCES} src/main.cpp laminar.capnp.c++ ${COMPRESSED_BINS} index_html_size.h) +add_executable(laminard ${LAMINARD_CORE_SOURCES} src/main.cpp ${COMPRESSED_BINS}) target_link_libraries(laminard capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z) ## Client @@ -109,8 +111,8 @@ set(BUILD_TESTS FALSE CACHE BOOL "Build tests") if(BUILD_TESTS) find_package(GTest REQUIRED) include_directories(${GTEST_INCLUDE_DIRS} src) - add_executable(laminar-tests ${LAMINARD_SOURCES} laminar.capnp.c++ src/resources.cpp ${COMPRESSED_BINS} test/test-conf.cpp test/test-database.cpp test/test-laminar.cpp test/test-run.cpp test/test-server.cpp) - target_link_libraries(laminar-tests ${GTEST_BOTH_LIBRARIES} gmock capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z) + add_executable(laminar-tests ${LAMINARD_CORE_SOURCES} ${COMPRESSED_BINS} test/main.cpp test/laminar-functional.cpp test/unit-conf.cpp test/unit-database.cpp test/unit-run.cpp) + target_link_libraries(laminar-tests ${GTEST_LIBRARY} capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z) endif() set(SYSTEMD_UNITDIR lib/systemd/system CACHE PATH "Path to systemd unit files") diff --git a/UserManual.md b/UserManual.md index b7c34e5..f5b2e71 100644 --- a/UserManual.md +++ b/UserManual.md @@ -88,13 +88,11 @@ Do not attempt to run laminar on port 80. This requires running as `root`, and L ## Running behind a reverse proxy -Laminar relies on WebSockets to provide a responsive, auto-updating display without polling. This may require extra support from your frontend webserver. +A reverse proxy is required if you want Laminar to share a port with other web services. It is also recommended to improve performance by serving artefacts directly or providing a caching layer for static assets. -For nginx, see [NGINX Reverse Proxy](https://www.nginx.com/resources/admin-guide/reverse-proxy/) and [WebSocket proxying](http://nginx.org/en/docs/http/websocket.html). +If you use [artefacts](#Archiving-artefacts), note that Laminar is not designed as a file server, and better performance will be achieved by allowing the frontend web server to serve the archive directory directly (e.g. using a `Location` directive). -For Apache, see [Apache Reverse Proxy](https://httpd.apache.org/docs/2.4/howto/reverse_proxy.html) and [mod_proxy_wstunnel](https://httpd.apache.org/docs/2.4/mod/mod_proxy_wstunnel.html). - -If you use [artefacts](#Archiving-artefacts), note that Laminar is not designed as a file server, and better performance will be achieved by allowing the frontend web server to directly serve the archive directory directly (e.g. using a `Location` directive). +Laminar uses Sever Sent Events to provide a responsive, auto-updating display without polling. Most frontend webservers should handle this without any extra configuration. If you use a reverse proxy to host Laminar at a subfolder instead of a subdomain root, the `` needs to be updated to ensure all links point to their proper targets. This can be done by setting `LAMINAR_BASE_URL` in `/etc/laminar.conf`. diff --git a/src/http.cpp b/src/http.cpp index aa6aba8..f575177 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -16,291 +16,290 @@ /// You should have received a copy of the GNU General Public License /// along with Laminar. If not, see /// -#include "interface.h" #include "http.h" #include "resources.h" +#include "monitorscope.h" #include "log.h" -#include -#include - -// This is the implementation of the HTTP/Websocket interface. It creates -// websocket connections as LaminarClients and registers them with the -// LaminarInterface so that status messages will be delivered to the client. -// On opening a websocket connection, it delivers a status snapshot message -// (see LaminarInterface::sendStatus) -class HttpImpl : public kj::HttpService { -public: - HttpImpl(LaminarInterface& laminar, kj::HttpHeaderTable&tbl) : - laminar(laminar), - responseHeaders(tbl) - {} - virtual ~HttpImpl() {} +#include "laminar.h" +// Helper class which wraps another class with calls to +// adding and removing a pointer to itself from a passed +// std::set reference. Used to keep track of currently +// connected clients +template +struct WithSetRef : public T { + WithSetRef(std::set& set, Args&& ...args) : + T(std::forward(args)...), + _set(set) + { + _set.insert(this); + } + ~WithSetRef() { + _set.erase(this); + } private: - class HttpChunkedClient : public LaminarClient { - public: - HttpChunkedClient(LaminarInterface& laminar) : - laminar(laminar) - {} - ~HttpChunkedClient() override { - laminar.deregisterClient(this); - } - void sendMessage(std::string payload) override { - chunks.push_back(kj::mv(payload)); - fulfiller->fulfill(); - } - void notifyJobFinished() override { - done = true; - fulfiller->fulfill(); - } - LaminarInterface& laminar; - std::list chunks; - // cannot use chunks.empty() because multiple fulfill()s - // could be coalesced - bool done = false; - kj::Own> fulfiller; - }; - - // Implements LaminarClient and holds the Websocket connection object. - // Automatically destructed when the promise created in request() resolves - // or is cancelled - class WebsocketClient : public LaminarClient { - public: - WebsocketClient(LaminarInterface& laminar, kj::Own&& ws) : - laminar(laminar), - ws(kj::mv(ws)) - {} - ~WebsocketClient() override { - laminar.deregisterClient(this); - } - virtual void sendMessage(std::string payload) override { - messages.emplace_back(kj::mv(payload)); - // sendMessage might be called several times before the event loop - // gets a chance to act on the fulfiller. So store the payload here - // where it can be fetched later and don't pass the payload with the - // fulfiller because subsequent calls to fulfill() are ignored. - fulfiller->fulfill(); - } - LaminarInterface& laminar; - kj::Own ws; - std::list messages; - kj::Own> fulfiller; - }; - - kj::Promise websocketRead(WebsocketClient& lc) - { - return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) { - KJ_SWITCH_ONEOF(message) { - KJ_CASE_ONEOF(str, kj::String) { - rapidjson::Document d; - d.ParseInsitu(const_cast(str.cStr())); - if(d.HasMember("page") && d["page"].IsInt() && d.HasMember("field") && d["field"].IsString() && d.HasMember("order") && d["order"].IsString()) { - lc.scope.page = d["page"].GetInt(); - lc.scope.field = d["field"].GetString(); - lc.scope.order_desc = strcmp(d["order"].GetString(), "dsc") == 0; - laminar.sendStatus(&lc); - return websocketRead(lc); - } - } - KJ_CASE_ONEOF(close, kj::WebSocket::Close) { - // clean socket shutdown - return lc.ws->close(close.code, close.reason); - } - KJ_CASE_ONEOF_DEFAULT {} - } - // unhandled/unknown message - return lc.ws->disconnect(); - }, [](kj::Exception&& e){ - // server logs suggest early catching here avoids fatal exception later - // TODO: reproduce in unit test - LLOG(WARNING, e.getDescription()); - return kj::READY_NOW; - }); - } - - kj::Promise websocketWrite(WebsocketClient& lc) - { - auto paf = kj::newPromiseAndFulfiller(); - lc.fulfiller = kj::mv(paf.fulfiller); - return paf.promise.then([this,&lc]{ - kj::Promise p = kj::READY_NOW; - std::list messages = kj::mv(lc.messages); - for(std::string& s : messages) { - p = p.then([&s,&lc]{ - kj::String str = kj::str(s); - return lc.ws->send(str).attach(kj::mv(str)); - }); - } - return p.attach(kj::mv(messages)).then([this,&lc]{ - return websocketWrite(lc); - }); - }); - } - - kj::Promise websocketUpgraded(WebsocketClient& lc, std::string resource) { - // convert the requested URL to a MonitorScope - if(resource.substr(0, 5) == "/jobs") { - if(resource.length() == 5) { - lc.scope.type = MonitorScope::ALL; - } else { - resource = resource.substr(5); - size_t split = resource.find('/',1); - std::string job = resource.substr(1,split-1); - if(!job.empty()) { - lc.scope.job = job; - lc.scope.type = MonitorScope::JOB; - } - if(split != std::string::npos) { - size_t split2 = resource.find('/', split+1); - std::string run = resource.substr(split+1, split2-split); - if(!run.empty()) { - lc.scope.num = static_cast(atoi(run.c_str())); - lc.scope.type = MonitorScope::RUN; - } - if(split2 != std::string::npos && resource.compare(split2, 4, "/log") == 0) { - lc.scope.type = MonitorScope::LOG; - } - } - } - } - laminar.registerClient(&lc); - kj::Promise connection = websocketRead(lc).exclusiveJoin(websocketWrite(lc)); - // registerClient can happen after a successful websocket handshake. - // However, the connection might not be closed gracefully, so the - // corresponding deregister operation happens in the WebsocketClient - // destructor rather than the close handler or a then() clause - laminar.sendStatus(&lc); - return connection; - } - - // Parses the url of the form /log/NAME/NUMBER, filling in the passed - // references and returning true if successful. /log/NAME/latest is - // also allowed, in which case the num reference is set to 0 - bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) { - if(url.startsWith("/log/")) { - kj::StringPtr path = url.slice(5); - KJ_IF_MAYBE(sep, path.findFirst('/')) { - name = path.slice(0, *sep).begin(); - kj::StringPtr tail = path.slice(*sep+1); - num = static_cast(atoi(tail.begin())); - name.erase(*sep); - if(tail == "latest") - num = laminar.latestRun(name); - if(num > 0) - return true; - } - } - return false; - } - - kj::Promise writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) { - auto paf = kj::newPromiseAndFulfiller(); - client->fulfiller = kj::mv(paf.fulfiller); - return paf.promise.then([=]{ - kj::Promise p = kj::READY_NOW; - std::list chunks = kj::mv(client->chunks); - for(std::string& s : chunks) { - p = p.then([=,&s]{ - return stream->write(s.data(), s.size()); - }); - } - return p.attach(kj::mv(chunks)).then([=]{ - return client->done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream); - }); - }); - } - - virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, - kj::AsyncInputStream& requestBody, Response& response) override - { - if(headers.isWebSocket()) { - responseHeaders.clear(); - kj::Own lc = kj::heap(laminar, response.acceptWebSocket(responseHeaders)); - return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc)); - } else { - // handle regular HTTP request - const char* start, *end, *content_type; - std::string badge; - // for log requests - std::string name; - uint num; - responseHeaders.clear(); - // Clients usually expect that http servers will ignore unknown query parameters, - // and expect to use this feature to work around browser limitations like there - // being no way to programatically force a resource to be reloaded from the server - // (without "Cache-Control: no-store", which is overkill). See issue #89. - // Since we currently don't handle any query parameters at all, the easiest way - // to achieve this is unconditionally remove all query parameters from the request. - // This will need to be redone if we ever accept query parameters, which probably - // will happen as part of issue #90. - KJ_IF_MAYBE(queryIdx, url.findFirst('?')) { - const_cast(url.begin())[*queryIdx] = '\0'; - url = url.begin(); - } - if(url.startsWith("/archive/")) { - KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) { - auto array = (*file)->mmap(0, (*file)->stat().size); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - auto stream = response.send(200, "OK", responseHeaders, array.size()); - return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); - } - } else if(parseLogEndpoint(url, name, num)) { - kj::Own cc = kj::heap(laminar); - cc->scope.job = name; - cc->scope.num = num; - bool complete; - std::string output; - cc->scope.type = MonitorScope::LOG; - if(laminar.handleLogRequest(name, num, output, complete)) { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8"); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output. - responseHeaders.add("X-Accel-Buffering", "no"); - auto stream = response.send(200, "OK", responseHeaders, nullptr); - laminar.registerClient(cc.get()); - return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{ - if(complete) - return kj::Promise(kj::READY_NOW); - return writeLogChunk(c, s); - }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc)); - } - } else if(url == "/custom/style.css") { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8"); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - std::string css = laminar.getCustomCss(); - auto stream = response.send(200, "OK", responseHeaders, css.size()); - return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream)); - } else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); - responseHeaders.add("Content-Encoding", "gzip"); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - auto stream = response.send(200, "OK", responseHeaders, end-start); - return stream->write(start, end-start).attach(kj::mv(stream)); - } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml"); - responseHeaders.add("Cache-Control", "no-cache"); - auto stream = response.send(200, "OK", responseHeaders, badge.size()); - return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream)); - } - return response.sendError(404, "Not Found", responseHeaders); - } - } - - LaminarInterface& laminar; - Resources resources; - kj::HttpHeaders responseHeaders; + std::set& _set; }; -Http::Http(LaminarInterface &li) : - headerTable(kj::heap()), - httpService(kj::heap(li, *headerTable)), - laminar(li) +struct EventPeer { + MonitorScope scope; + std::list pendingOutput; + kj::Own> fulfiller; +}; + +struct LogWatcher { + std::string job; + uint run; + std::list pendingOutput; + kj::Own> fulfiller; +}; + +kj::Maybe fromUrl(std::string resource, char* query) { + MonitorScope scope; + + if(query) { + char *sk; + for(char* k = strtok_r(query, "&", &sk); k; k = strtok_r(nullptr, "&", &sk)) { + if(char* v = strchr(k, '=')) { + *v++ = '\0'; + if(strcmp(k, "page") == 0) + scope.page = atoi(v); + else if(strcmp(k, "field") == 0) + scope.field = v; + else if(strcmp(k, "order") == 0) + scope.order_desc = (strcmp(v, "dsc") == 0); + } + } + } + + if(resource == "/") { + scope.type = MonitorScope::HOME; + return kj::mv(scope); + } + + if(resource.substr(0, 5) != "/jobs") + return nullptr; + + if(resource.length() == 5) { + scope.type = MonitorScope::ALL; + return kj::mv(scope); + } + + resource = resource.substr(5); + size_t split = resource.find('/',1); + std::string job = resource.substr(1,split-1); + if(job.empty()) + return nullptr; + + scope.job = job; + scope.type = MonitorScope::JOB; + if(split == std::string::npos) + return kj::mv(scope); + + size_t split2 = resource.find('/', split+1); + std::string run = resource.substr(split+1, split2-split); + if(run.empty()) + return nullptr; + + scope.num = static_cast(atoi(run.c_str())); + scope.type = MonitorScope::RUN; + return kj::mv(scope); +} + +// Parses the url of the form /log/NAME/NUMBER, filling in the passed +// references and returning true if successful. /log/NAME/latest is +// also allowed, in which case the num reference is set to 0 +bool Http::parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) { + if(url.startsWith("/log/")) { + kj::StringPtr path = url.slice(5); + KJ_IF_MAYBE(sep, path.findFirst('/')) { + name = path.slice(0, *sep).begin(); + kj::StringPtr tail = path.slice(*sep+1); + num = static_cast(atoi(tail.begin())); + name.erase(*sep); + if(tail == "latest") + num = laminar.latestRun(name); + if(num > 0) + return true; + } + } + return false; +} + +kj::Promise Http::cleanupPeers(kj::Timer& timer) { - + return timer.afterDelay(15 * kj::SECONDS).then([&]{ + for(EventPeer* p : eventPeers) { + // an empty SSE message is a colon followed by two newlines + p->pendingOutput.push_back(":\n\n"); + p->fulfiller->fulfill(); + } + return cleanupPeers(timer); + }).eagerlyEvaluate(nullptr); } -kj::Promise Http::startServer(kj::Timer& timer, kj::Own&& listener) { - auto httpServer = kj::heap(timer, *headerTable, *httpService); - return httpServer->listenHttp(*listener).attach(kj::mv(listener)).attach(kj::mv(httpServer)); +kj::Promise writeEvents(EventPeer* peer, kj::AsyncOutputStream* stream) { + auto paf = kj::newPromiseAndFulfiller(); + peer->fulfiller = kj::mv(paf.fulfiller); + return paf.promise.then([=]{ + kj::Promise p = kj::READY_NOW; + std::list chunks = kj::mv(peer->pendingOutput); + for(std::string& s : chunks) { + p = p.then([=,&s]{ + return stream->write(s.data(), s.size()); + }); + } + return p.attach(kj::mv(chunks)).then([=]{ + return writeEvents(peer, stream); + }); + }); +} + +kj::Promise writeLogChunk(LogWatcher* client, kj::AsyncOutputStream* stream) { + auto paf = kj::newPromiseAndFulfiller(); + client->fulfiller = kj::mv(paf.fulfiller); + return paf.promise.then([=](bool done){ + kj::Promise p = kj::READY_NOW; + std::list chunks = kj::mv(client->pendingOutput); + for(std::string& s : chunks) { + p = p.then([=,&s]{ + return stream->write(s.data(), s.size()); + }); + } + return p.attach(kj::mv(chunks)).then([=]{ + return done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream); + }); + }); +} + +kj::Promise Http::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders &headers, kj::AsyncInputStream &requestBody, HttpService::Response &response) +{ + const char* start, *end, *content_type; + std::string badge; + // for log requests + std::string name; + uint num; + kj::HttpHeaders responseHeaders(*headerTable); + responseHeaders.clear(); + bool is_sse = false; + char* queryString = nullptr; + // Clients usually expect that http servers will ignore unknown query parameters, + // and expect to use this feature to work around browser limitations like there + // being no way to programatically force a resource to be reloaded from the server + // (without "Cache-Control: no-store", which is overkill). See issue #89. + // So first parse any query parameters we *are* interested in, then simply remove + // them from the URL, to make comparisions easier. + KJ_IF_MAYBE(queryIdx, url.findFirst('?')) { + const_cast(url.begin())[*queryIdx] = '\0'; + queryString = const_cast(url.begin() + *queryIdx + 1); + url = url.begin(); + } + + KJ_IF_MAYBE(accept, headers.get(ACCEPT)) { + is_sse = (*accept == "text/event-stream"); + } + + if(is_sse) { + KJ_IF_MAYBE(s, fromUrl(url.cStr(), queryString)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/event-stream"); + auto peer = kj::heap>(eventPeers); + peer->scope = *s; + std::string st = "data: " + laminar.getStatus(peer->scope) + "\n\n"; + auto stream = response.send(200, "OK", responseHeaders); + return stream->write(st.data(), st.size()).attach(kj::mv(st)).then([=,s=stream.get(),p=peer.get()]{ + return writeEvents(p,s); + }).attach(kj::mv(stream)).attach(kj::mv(peer)); + } + } else if(url.startsWith("/archive/")) { + KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) { + auto array = (*file)->mmap(0, (*file)->stat().size); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + auto stream = response.send(200, "OK", responseHeaders, array.size()); + return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); + } + } else if(parseLogEndpoint(url, name, num)) { + auto lw = kj::heap>(logWatchers); + lw->job = name; + lw->run = num; + bool complete; + std::string output; + if(laminar.handleLogRequest(name, num, output, complete)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output. + responseHeaders.add("X-Accel-Buffering", "no"); + auto stream = response.send(200, "OK", responseHeaders, nullptr); + return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=lw.get()]{ + if(complete) + return kj::Promise(kj::READY_NOW); + return writeLogChunk(c, s); + }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(lw)); + } + } else if(url == "/custom/style.css") { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + std::string css = laminar.getCustomCss(); + auto stream = response.send(200, "OK", responseHeaders, css.size()); + return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream)); + } else if(resources->handleRequest(url.cStr(), &start, &end, &content_type)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); + responseHeaders.add("Content-Encoding", "gzip"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + auto stream = response.send(200, "OK", responseHeaders, end-start); + return stream->write(start, end-start).attach(kj::mv(stream)); + } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml"); + responseHeaders.add("Cache-Control", "no-cache"); + auto stream = response.send(200, "OK", responseHeaders, badge.size()); + return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream)); + } + return response.sendError(404, "Not Found", responseHeaders); +} + +Http::Http(Laminar &li) : + laminar(li), + resources(kj::heap()) +{ + kj::HttpHeaderTable::Builder builder; + ACCEPT = builder.add("Accept"); + headerTable = builder.build(); +} + +Http::~Http() +{ + KJ_ASSERT(logWatchers.size() == 0); + KJ_ASSERT(eventPeers.size() == 0); +} + +kj::Promise Http::startServer(kj::Timer& timer, kj::Own&& listener) +{ + kj::Own server = kj::heap(timer, *headerTable, *this); + return server->listenHttp(*listener).attach(cleanupPeers(timer)).attach(kj::mv(listener)).attach(kj::mv(server)); +} + +void Http::notifyEvent(const char *type, const char *data, std::string job, uint run) +{ + for(EventPeer* c : eventPeers) { + if(c->scope.wantsStatus(job, run) + // The run page also should know that another job has started + // (so maybe it can show a previously hidden "next" button). + // Hence this small hack: + // TODO obviate + || (std::string(type)=="job_started" && c->scope.type == MonitorScope::Type::RUN && c->scope.job == job)) + { + c->pendingOutput.push_back("data: " + std::string(data) + "\n\n"); + c->fulfiller->fulfill(); + } + } +} + +void Http::notifyLog(std::string job, uint run, std::string log_chunk, bool eot) +{ + for(LogWatcher* lw : logWatchers) { + if(lw->job == job && lw->run == run) { + lw->pendingOutput.push_back(kj::mv(log_chunk)); + lw->fulfiller->fulfill(kj::mv(eot)); + } + } } diff --git a/src/http.h b/src/http.h index 5098b39..909a244 100644 --- a/src/http.h +++ b/src/http.h @@ -19,18 +19,48 @@ #ifndef LAMINAR_HTTP_H_ #define LAMINAR_HTTP_H_ +#include #include +#include +#include -struct LaminarInterface; +// Definition needed for musl +typedef unsigned int uint; +typedef unsigned long ulong; -class Http { +struct Laminar; +struct Resources; +struct LogWatcher; +struct EventPeer; + +class Http : public kj::HttpService { public: - Http(LaminarInterface &li); - kj::Promise startServer(kj::Timer &timer, kj::Own&& listener); + Http(Laminar&li); + virtual ~Http(); + kj::Promise startServer(kj::Timer &timer, kj::Own &&listener); + + void notifyEvent(const char* type, const char* data, std::string job = nullptr, uint run = 0); + void notifyLog(std::string job, uint run, std::string log_chunk, bool eot); + +private: + virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, Response& response) override; + bool parseLogEndpoint(kj::StringPtr url, std::string &name, uint &num); + + // With SSE, there is no notification if a client disappears. Also, an idle + // client must be kept alive if there is no activity in their MonitorScope. + // Deal with these by sending a periodic keepalive and reaping the client if + // the write fails. + kj::Promise cleanupPeers(kj::Timer &timer); + + Laminar& laminar; + std::set eventPeers; kj::Own headerTable; - kj::Own httpService; - LaminarInterface& laminar; + kj::Own resources; + std::set logWatchers; + + kj::HttpHeaderId ACCEPT; }; #endif //LAMINAR_HTTP_H_ diff --git a/src/interface.h b/src/interface.h deleted file mode 100644 index 25387ea..0000000 --- a/src/interface.h +++ /dev/null @@ -1,175 +0,0 @@ -/// -/// Copyright 2015-2019 Oliver Giles -/// -/// This file is part of Laminar -/// -/// Laminar is free software: you can redistribute it and/or modify -/// it under the terms of the GNU General Public License as published by -/// the Free Software Foundation, either version 3 of the License, or -/// (at your option) any later version. -/// -/// Laminar is distributed in the hope that it will be useful, -/// but WITHOUT ANY WARRANTY; without even the implied warranty of -/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -/// GNU General Public License for more details. -/// -/// You should have received a copy of the GNU General Public License -/// along with Laminar. If not, see -/// -#ifndef LAMINAR_INTERFACE_H_ -#define LAMINAR_INTERFACE_H_ - -#include "run.h" - -#include -#include -#include - -// Simple struct to define which information a frontend client is interested -// in, both in initial request phase and real-time updates. It corresponds -// loosely to frontend URLs -struct MonitorScope { - enum Type { - HOME, // home page: recent builds and statistics - ALL, // browse jobs - JOB, // a specific job page - RUN, // a specific run page - LOG // a run's log page - }; - - MonitorScope(Type type = HOME, std::string job = std::string(), uint num = 0) : - type(type), - job(job), - num(num), - page(0), - field("number"), - order_desc(true) - {} - - // whether this scope wants status information about the given job or run - bool wantsStatus(std::string ajob, uint anum = 0) const { - if(type == HOME || type == ALL) return true; - if(type == JOB) return ajob == job; - if(type == RUN) return ajob == job && anum == num; - return false; - } - - bool wantsLog(std::string ajob, uint anum) const { - return type == LOG && ajob == job && anum == num; - } - - Type type; - std::string job; - uint num = 0; - // sorting - uint page = 0; - std::string field; - bool order_desc; -}; - -// Represents a (websocket) client that wants to be notified about events -// matching the supplied scope. Pass instances of this to LaminarInterface -// registerClient and deregisterClient -struct LaminarClient { - virtual ~LaminarClient() noexcept(false) {} - virtual void sendMessage(std::string payload) = 0; - // TODO: redesign - virtual void notifyJobFinished() {} - MonitorScope scope; -}; - -// Represents a (rpc) client that wants to be notified about run completion. -// Pass instances of this to LaminarInterface registerWaiter and -// deregisterWaiter -struct LaminarWaiter { - virtual ~LaminarWaiter() =default; - virtual void complete(const Run*) = 0; -}; - -// Represents a file mapped in memory. Used to serve artefacts -struct MappedFile { - virtual ~MappedFile() =default; - virtual const void* address() = 0; - virtual size_t size() = 0; -}; - -// The interface connecting the network layer to the application business -// logic. These methods fulfil the requirements of both the HTTP/Websocket -// and RPC interfaces. -struct LaminarInterface { - virtual ~LaminarInterface() {} - - // Queues a job, returns immediately. Return value will be nullptr if - // the supplied name is not a known job. - virtual std::shared_ptr queueJob(std::string name, ParamMap params = ParamMap()) = 0; - - // Register a client (but don't give up ownership). The client will be - // notified with a JSON message of any events matching its scope - // (see LaminarClient and MonitorScope above) - virtual void registerClient(LaminarClient* client) = 0; - - // Call this before destroying a client so that Laminar doesn't try - // to call LaminarClient::sendMessage on invalid data - virtual void deregisterClient(LaminarClient* client) = 0; - - // Register a waiter (but don't give up ownership). The waiter will be - // notified with a callback of any run completion (see LaminarWaiter above) - virtual void registerWaiter(LaminarWaiter* waiter) = 0; - - // Call this before destroying a waiter so that Laminar doesn't try - // to call LaminarWaiter::complete on invalid data - virtual void deregisterWaiter(LaminarWaiter* waiter) = 0; - - // Return the latest known number of the named job - virtual uint latestRun(std::string job) = 0; - - // Given a job name and number, return existence and (via reference params) - // its current log output and whether the job is ongoing - virtual bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) = 0; - - // Synchronously send a snapshot of the current status to the given - // client (as governed by the client's MonitorScope). This is called on - // initial websocket connect. - virtual void sendStatus(LaminarClient* client) = 0; - - // Implements the laminar client interface allowing the setting of - // arbitrary parameters on a run (usually itself) to be available in - // the environment of subsequent scripts. - virtual bool setParam(std::string job, uint buildNum, std::string param, std::string value) = 0; - - // Gets the list of jobs currently waiting in the execution queue - virtual const std::list>& listQueuedJobs() = 0; - - // Gets the list of currently executing jobs - virtual const RunSet& listRunningJobs() = 0; - - // Gets the list of known jobs - scans cfg/jobs for *.run files - virtual std::list listKnownJobs() = 0; - - // Fetches the content of an artifact given its filename relative to - // $LAMINAR_HOME/archive. Ideally, this would instead be served by a - // proper web server which handles this url. - virtual kj::Maybe> getArtefact(std::string path) = 0; - - // Given the name of a job, populate the provided string reference with - // SVG content describing the last known state of the job. Returns false - // if the job is unknown. - virtual bool handleBadgeRequest(std::string job, std::string& badge) = 0; - - // Fetches the content of $LAMINAR_HOME/custom/style.css or an empty - // string. Ideally, this would instead be served by a proper web server - // which handles this url. - virtual std::string getCustomCss() = 0; - - // Aborts a single job - virtual bool abort(std::string job, uint buildNum) = 0; - - // Abort all running jobs - virtual void abortAll() = 0; - - // Callback to handle a configuration modification notification - virtual void notifyConfigChanged() = 0; -}; - -#endif // LAMINAR_INTERFACE_H_ - diff --git a/src/laminar.cpp b/src/laminar.cpp index 166fc0d..5170319 100644 --- a/src/laminar.cpp +++ b/src/laminar.cpp @@ -20,6 +20,8 @@ #include "server.h" #include "conf.h" #include "log.h" +#include "http.h" +#include "rpc.h" #include #include @@ -35,7 +37,7 @@ #include // rapidjson::Writer with a StringBuffer is used a lot in Laminar for -// preparing JSON messages to send to Websocket clients. A small wrapper +// preparing JSON messages to send to HTTP clients. A small wrapper // class here reduces verbosity later for this common use case. class Json : public rapidjson::Writer { public: @@ -52,13 +54,6 @@ template<> Json& Json::set(const char* key, double value) { String(key); Double( template<> Json& Json::set(const char* key, const char* value) { String(key); String(value); return *this; } template<> Json& Json::set(const char* key, std::string value) { String(key); String(value.c_str()); return *this; } -namespace { -// Default values when none were supplied in $LAMINAR_CONF_FILE (/etc/laminar.conf) -constexpr const char* INTADDR_RPC_DEFAULT = "unix-abstract:laminar"; -constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080"; -constexpr const char* ARCHIVE_URL_DEFAULT = "/archive/"; -} - // short syntax helpers for kj::Path template inline kj::Path operator/(const kj::Path& p, const T& ext) { @@ -71,18 +66,19 @@ inline kj::Path operator/(const std::string& p, const T& ext) { typedef std::string str; -Laminar::Laminar(const char *home) : - homePath(kj::Path::parse(&home[1])), - fsHome(kj::newDiskFilesystem()->getRoot().openSubdir(homePath, kj::WriteMode::MODIFY)) +Laminar::Laminar(Server &server, Settings settings) : + settings(settings), + srv(server), + homePath(kj::Path::parse(&settings.home[1])), + fsHome(kj::newDiskFilesystem()->getRoot().openSubdir(homePath, kj::WriteMode::MODIFY)), + http(kj::heap(*this)), + rpc(kj::heap(*this)) { - LASSERT(home[0] == '/'); + LASSERT(settings.home[0] == '/'); - archiveUrl = ARCHIVE_URL_DEFAULT; - if(char* envArchive = getenv("LAMINAR_ARCHIVE_URL")) { - archiveUrl = envArchive; - if(archiveUrl.back() != '/') - archiveUrl.append("/"); - } + archiveUrl = settings.archive_url; + if(archiveUrl.back() != '/') + archiveUrl.append("/"); numKeepRunDirs = 0; @@ -103,29 +99,22 @@ Laminar::Laminar(const char *home) : buildNums[name] = build; }); - srv = nullptr; + srv.watchPaths([this]{ + LLOG(INFO, "Reloading configuration"); + loadConfiguration(); + // config change may allow stuck jobs to dequeue + assignNewJobs(); + }).addPath((homePath/"cfg"/"nodes").toString(true).cStr()) + .addPath((homePath/"cfg"/"jobs").toString(true).cStr()); + + srv.listenRpc(*rpc, settings.bind_rpc); + srv.listenHttp(*http, settings.bind_http); // Load configuration, may be called again in response to an inotify event // that the configuration files have been modified loadConfiguration(); } -void Laminar::registerClient(LaminarClient* client) { - clients.insert(client); -} - -void Laminar::deregisterClient(LaminarClient* client) { - clients.erase(client); -} - -void Laminar::registerWaiter(LaminarWaiter *waiter) { - waiters.insert(waiter); -} - -void Laminar::deregisterWaiter(LaminarWaiter *waiter) { - waiters.erase(waiter); -} - uint Laminar::latestRun(std::string job) { auto it = activeJobs.byJobName().equal_range(job); if(it.first == it.second) { @@ -141,8 +130,6 @@ uint Laminar::latestRun(std::string job) { } } -// TODO: reunify with sendStatus. The difference is that this method is capable of -// returning "not found" to the caller, and sendStatus isn't bool Laminar::handleLogRequest(std::string name, uint num, std::string& output, bool& complete) { if(Run* run = activeRun(name, num)) { output = run->log; @@ -216,40 +203,15 @@ void Laminar::populateArtifacts(Json &j, std::string job, uint num) const { } } -void Laminar::sendStatus(LaminarClient* client) { - if(client->scope.type == MonitorScope::LOG) { - // If the requested job is currently in progress - if(const Run* run = activeRun(client->scope.job, client->scope.num)) { - client->sendMessage(run->log.c_str()); - } else { // it must be finished, fetch it from the database - db->stmt("SELECT output, outputLen FROM builds WHERE name = ? AND number = ?") - .bind(client->scope.job, client->scope.num) - .fetch([=](str maybeZipped, unsigned long sz) { - str log(sz+1,'\0'); - if(sz >= COMPRESS_LOG_MIN_SIZE) { - int res = ::uncompress((uint8_t*) log.data(), &sz, - (const uint8_t*) maybeZipped.data(), maybeZipped.size()); - if(res == Z_OK) - client->sendMessage(log); - else - LLOG(ERROR, "Failed to uncompress log"); - } else { - client->sendMessage(maybeZipped); - } - }); - client->notifyJobFinished(); - } - return; - } - +std::string Laminar::getStatus(MonitorScope scope) { Json j; j.set("type", "status"); j.set("title", getenv("LAMINAR_TITLE") ?: "Laminar"); j.set("time", time(nullptr)); j.startObject("data"); - if(client->scope.type == MonitorScope::RUN) { + if(scope.type == MonitorScope::RUN) { db->stmt("SELECT queuedAt,startedAt,completedAt,result,reason,parentJob,parentBuild FROM builds WHERE name = ? AND number = ?") - .bind(client->scope.job, client->scope.num) + .bind(scope.job, scope.num) .fetch([&](time_t queued, time_t started, time_t completed, int result, std::string reason, std::string parentJob, uint parentBuild) { j.set("queued", started-queued); j.set("started", started); @@ -258,7 +220,7 @@ void Laminar::sendStatus(LaminarClient* client) { j.set("reason", reason); j.startObject("upstream").set("name", parentJob).set("num", parentBuild).EndObject(2); }); - if(const Run* run = activeRun(client->scope.job, client->scope.num)) { + if(const Run* run = activeRun(scope.job, scope.num)) { j.set("queued", run->startedAt - run->queuedAt); j.set("started", run->startedAt); j.set("result", to_string(RunState::RUNNING)); @@ -270,30 +232,30 @@ void Laminar::sendStatus(LaminarClient* client) { j.set("etc", run->startedAt + lastRuntime); }); } - j.set("latestNum", int(buildNums[client->scope.job])); + j.set("latestNum", int(buildNums[scope.job])); j.startArray("artifacts"); - populateArtifacts(j, client->scope.job, client->scope.num); + populateArtifacts(j, scope.job, scope.num); j.EndArray(); - } else if(client->scope.type == MonitorScope::JOB) { + } else if(scope.type == MonitorScope::JOB) { const uint runsPerPage = 10; j.startArray("recent"); // ORDER BY param cannot be bound std::string order_by; - std::string direction = client->scope.order_desc ? "DESC" : "ASC"; - if(client->scope.field == "number") + std::string direction = scope.order_desc ? "DESC" : "ASC"; + if(scope.field == "number") order_by = "number " + direction; - else if(client->scope.field == "result") + else if(scope.field == "result") order_by = "result " + direction + ", number DESC"; - else if(client->scope.field == "started") + else if(scope.field == "started") order_by = "startedAt " + direction + ", number DESC"; - else if(client->scope.field == "duration") + else if(scope.field == "duration") order_by = "(completedAt-startedAt) " + direction + ", number DESC"; else order_by = "number DESC"; std::string stmt = "SELECT number,startedAt,completedAt,result,reason FROM builds WHERE name = ? ORDER BY " + order_by + " LIMIT ?,?"; db->stmt(stmt.c_str()) - .bind(client->scope.job, client->scope.page * runsPerPage, runsPerPage) + .bind(scope.job, scope.page * runsPerPage, runsPerPage) .fetch([&](uint build,time_t started,time_t completed,int result,str reason){ j.StartObject(); j.set("number", build) @@ -305,18 +267,18 @@ void Laminar::sendStatus(LaminarClient* client) { }); j.EndArray(); db->stmt("SELECT COUNT(*),AVG(completedAt-startedAt) FROM builds WHERE name = ?") - .bind(client->scope.job) + .bind(scope.job) .fetch([&](uint nRuns, uint averageRuntime){ j.set("averageRuntime", averageRuntime); j.set("pages", (nRuns-1) / runsPerPage + 1); j.startObject("sort"); - j.set("page", client->scope.page) - .set("field", client->scope.field) - .set("order", client->scope.order_desc ? "dsc" : "asc") + j.set("page", scope.page) + .set("field", scope.field) + .set("order", scope.order_desc ? "dsc" : "asc") .EndObject(); }); j.startArray("running"); - auto p = activeJobs.byJobName().equal_range(client->scope.job); + auto p = activeJobs.byJobName().equal_range(scope.job); for(auto it = p.first; it != p.second; ++it) { const std::shared_ptr run = *it; j.StartObject(); @@ -330,27 +292,27 @@ void Laminar::sendStatus(LaminarClient* client) { j.EndArray(); int nQueued = 0; for(const auto& run : queuedJobs) { - if (run->name == client->scope.job) { + if (run->name == scope.job) { nQueued++; } } j.set("nQueued", nQueued); db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result = ? ORDER BY completedAt DESC LIMIT 1") - .bind(client->scope.job, int(RunState::SUCCESS)) + .bind(scope.job, int(RunState::SUCCESS)) .fetch([&](int build, time_t started){ j.startObject("lastSuccess"); j.set("number", build).set("started", started); j.EndObject(); }); db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result <> ? ORDER BY completedAt DESC LIMIT 1") - .bind(client->scope.job, int(RunState::SUCCESS)) + .bind(scope.job, int(RunState::SUCCESS)) .fetch([&](int build, time_t started){ j.startObject("lastFailed"); j.set("number", build).set("started", started); j.EndObject(); }); - } else if(client->scope.type == MonitorScope::ALL) { + } else if(scope.type == MonitorScope::ALL) { j.startArray("jobs"); db->stmt("SELECT name,number,startedAt,completedAt,result FROM builds b JOIN (SELECT name n,MAX(number) l FROM builds GROUP BY n) q ON b.name = q.n AND b.number = q.l") .fetch([&](str name,uint number, time_t started, time_t completed, int result){ @@ -512,31 +474,16 @@ void Laminar::sendStatus(LaminarClient* client) { } j.EndObject(); - client->sendMessage(j.str()); + return j.str(); } Laminar::~Laminar() noexcept try { delete db; - delete srv; } catch (std::exception& e) { LLOG(ERROR, e.what()); return; } -void Laminar::run() { - const char* listen_rpc = getenv("LAMINAR_BIND_RPC") ?: INTADDR_RPC_DEFAULT; - const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT; - - srv = new Server(*this, listen_rpc, listen_http); - srv->addWatchPath((homePath/"cfg"/"nodes").toString(true).cStr()); - srv->addWatchPath((homePath/"cfg"/"jobs").toString(true).cStr()); - srv->start(); -} - -void Laminar::stop() { - srv->stop(); -} - bool Laminar::loadConfiguration() { if(const char* ndirs = getenv("LAMINAR_KEEP_RUNDIRS")) numKeepRunDirs = static_cast(atoi(ndirs)); @@ -627,24 +574,12 @@ std::shared_ptr Laminar::queueJob(std::string name, ParamMap params) { .startObject("data") .set("name", name) .EndObject(); - const char* msg = j.str(); - for(LaminarClient* c : clients) { - if(c->scope.wantsStatus(name)) - c->sendMessage(msg); - } + http->notifyEvent("job_queued", j.str(), name.c_str()); assignNewJobs(); return run; } -void Laminar::notifyConfigChanged() -{ - LLOG(INFO, "Reloading configuration"); - loadConfiguration(); - // config change may allow stuck jobs to dequeue - assignNewJobs(); -} - bool Laminar::abort(std::string job, uint buildNum) { if(Run* run = activeRun(job, buildNum)) { run->abort(true); @@ -700,11 +635,11 @@ bool Laminar::tryStartRun(std::shared_ptr run, int queueIndex) { runFinished(run.get()); }); if(run->timeout > 0) { - exec = exec.attach(srv->addTimeout(run->timeout, [r=run.get()](){ + exec = exec.attach(srv.addTimeout(run->timeout, [r=run.get()](){ r->abort(true); })); } - srv->addTask(kj::mv(exec)); + srv.addTask(kj::mv(exec)); LLOG(INFO, "Started job on node", run->name, run->build, node->name); // update next build number @@ -731,16 +666,7 @@ bool Laminar::tryStartRun(std::shared_ptr run, int queueIndex) { } j.EndArray(); j.EndObject(); - const char* msg = j.str(); - for(LaminarClient* c : clients) { - if(c->scope.wantsStatus(run->name, run->build) - // The run page also should know that another job has started - // (so maybe it can show a previously hidden "next" button). - // Hence this small hack: - || (c->scope.type == MonitorScope::Type::RUN && c->scope.job == run->name)) - c->sendMessage(msg); - } - + http->notifyEvent("job_started", j.str(), run->name.c_str(), run->build); return true; } } @@ -765,17 +691,14 @@ kj::Promise Laminar::handleRunStep(Run* run) { return kj::READY_NOW; } - kj::Promise exited = srv->onChildExit(run->current_pid); + kj::Promise exited = srv.onChildExit(run->current_pid); // promise is fulfilled when the process is reaped. But first we wait for all // output from the pipe (Run::output_fd) to be consumed. - return srv->readDescriptor(run->output_fd, [this,run](const char*b,size_t n){ + return srv.readDescriptor(run->output_fd, [this,run](const char*b,size_t n){ // handle log output std::string s(b, n); run->log += s; - for(LaminarClient* c : clients) { - if(c->scope.wantsLog(run->name, run->build)) - c->sendMessage(s); - } + http->notifyLog(run->name, run->build, s, false); }).then([p = std::move(exited)]() mutable { // wait until the process is reaped return kj::mv(p); @@ -832,18 +755,8 @@ void Laminar::runFinished(Run * r) { populateArtifacts(j, r->name, r->build); j.EndArray(); j.EndObject(); - const char* msg = j.str(); - for(LaminarClient* c : clients) { - if(c->scope.wantsStatus(r->name, r->build)) - c->sendMessage(msg); - c->notifyJobFinished(); - } - - // notify the waiters - for(LaminarWaiter* w : waiters) { - w->complete(r); - } - + http->notifyEvent("job_completed", j.str(), r->name, r->build); + http->notifyLog(r->name, r->build, "", true); // erase reference to run from activeJobs. Since runFinished is called in a // lambda whose context contains a shared_ptr, the run won't be deleted // until the context is destroyed at the end of the lambda execution. diff --git a/src/laminar.h b/src/laminar.h index 8780d22..988db66 100644 --- a/src/laminar.h +++ b/src/laminar.h @@ -19,13 +19,14 @@ #ifndef LAMINAR_LAMINAR_H_ #define LAMINAR_LAMINAR_H_ -#include "interface.h" #include "run.h" +#include "monitorscope.h" #include "node.h" #include "database.h" #include #include +#include // Node name to node object map typedef std::unordered_map> NodeMap; @@ -33,40 +34,72 @@ typedef std::unordered_map> NodeMap; struct Server; class Json; +class Http; +class Rpc; + +struct Settings { + const char* home; + const char* bind_rpc; + const char* bind_http; + const char* archive_url; +}; + // The main class implementing the application's business logic. -// It owns a Server to manage the HTTP/websocket and Cap'n Proto RPC -// interfaces and communicates via the LaminarInterface methods and -// the LaminarClient objects (see interface.h) -class Laminar final : public LaminarInterface { +class Laminar final { public: - Laminar(const char* homePath); - ~Laminar() noexcept override; + Laminar(Server& server, Settings settings); + ~Laminar() noexcept; - // Runs the application forever - void run(); - // Call this in a signal handler to make run() return - void stop(); + // Queues a job, returns immediately. Return value will be nullptr if + // the supplied name is not a known job. + std::shared_ptr queueJob(std::string name, ParamMap params = ParamMap()); - // Implementations of LaminarInterface - std::shared_ptr queueJob(std::string name, ParamMap params = ParamMap()) override; - void registerClient(LaminarClient* client) override; - void deregisterClient(LaminarClient* client) override; - void registerWaiter(LaminarWaiter* waiter) override; - void deregisterWaiter(LaminarWaiter* waiter) override; - uint latestRun(std::string job) override; - bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) override; + // Return the latest known number of the named job + uint latestRun(std::string job); - void sendStatus(LaminarClient* client) override; - bool setParam(std::string job, uint buildNum, std::string param, std::string value) override; - const std::list>& listQueuedJobs() override; - const RunSet& listRunningJobs() override; - std::list listKnownJobs() override; - kj::Maybe> getArtefact(std::string path) override; - bool handleBadgeRequest(std::string job, std::string& badge) override; - std::string getCustomCss() override; - bool abort(std::string job, uint buildNum) override; - void abortAll() override; - void notifyConfigChanged() override; + // Given a job name and number, return existence and (via reference params) + // its current log output and whether the job is ongoing + bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete); + + // Given a relevant scope, returns a JSON string describing the current + // server status. Content differs depending on the page viewed by the user, + // which should be provided as part of the scope. + std::string getStatus(MonitorScope scope); + + // Implements the laminarc function of setting arbitrary parameters on a run, + // (typically the current run) which will be made available in the environment + // of subsequent scripts. + bool setParam(std::string job, uint buildNum, std::string param, std::string value); + + // Gets the list of jobs currently waiting in the execution queue + const std::list>& listQueuedJobs(); + + // Gets the list of currently executing jobs + const RunSet& listRunningJobs(); + + // Gets the list of known jobs - scans cfg/jobs for *.run files + std::list listKnownJobs(); + + // Fetches the content of an artifact given its filename relative to + // $LAMINAR_HOME/archive. Ideally, this would instead be served by a + // proper web server which handles this url. + kj::Maybe> getArtefact(std::string path); + + // Given the name of a job, populate the provided string reference with + // SVG content describing the last known state of the job. Returns false + // if the job is unknown. + bool handleBadgeRequest(std::string job, std::string& badge); + + // Fetches the content of $LAMINAR_HOME/custom/style.css or an empty + // string. Ideally, this would instead be served by a proper web server + // which handles this url. + std::string getCustomCss(); + + // Aborts a single job + bool abort(std::string job, uint buildNum); + + // Abort all running jobs + void abortAll(); private: bool loadConfiguration(); @@ -89,16 +122,18 @@ private: std::unordered_map> jobTags; + Settings settings; RunSet activeJobs; Database* db; - Server* srv; + Server& srv; NodeMap nodes; kj::Path homePath; kj::Own fsHome; - std::set clients; - std::set waiters; uint numKeepRunDirs; std::string archiveUrl; + + kj::Own http; + kj::Own rpc; }; #endif // LAMINAR_LAMINAR_H_ diff --git a/src/main.cpp b/src/main.cpp index ead42c9..9086761 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -17,17 +17,31 @@ /// along with Laminar. If not, see /// #include "laminar.h" +#include "server.h" #include "log.h" #include #include #include static Laminar* laminar; +static Server* server; static void laminar_quit(int) { - laminar->stop(); + // Abort current jobs. Most of the time this isn't necessary since + // systemd stop or other kill mechanism will send SIGTERM to the whole + // process group. + laminar->abortAll(); + server->stop(); } +namespace { +constexpr const char* INTADDR_RPC_DEFAULT = "unix-abstract:laminar"; +constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080"; +constexpr const char* ARCHIVE_URL_DEFAULT = "/archive/"; +} + + + int main(int argc, char** argv) { for(int i = 1; i < argc; ++i) { if(strcmp(argv[i], "-v") == 0) { @@ -35,15 +49,27 @@ int main(int argc, char** argv) { } } - laminar = new Laminar(getenv("LAMINAR_HOME") ?: "/var/lib/laminar"); + auto ioContext = kj::setupAsyncIo(); + + Settings settings; + // Default values when none were supplied in $LAMINAR_CONF_FILE (/etc/laminar.conf) + settings.home = getenv("LAMINAR_HOME") ?: "/var/lib/laminar"; + settings.bind_rpc = getenv("LAMINAR_BIND_RPC") ?: INTADDR_RPC_DEFAULT; + settings.bind_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT; + settings.archive_url = getenv("LAMINAR_ARCHIVE_URL") ?: ARCHIVE_URL_DEFAULT; + + server = new Server(ioContext); + laminar = new Laminar(*server, settings); + kj::UnixEventPort::captureChildExit(); signal(SIGINT, &laminar_quit); signal(SIGTERM, &laminar_quit); - laminar->run(); + server->start(); delete laminar; + delete server; LLOG(INFO, "Clean exit"); return 0; diff --git a/src/monitorscope.h b/src/monitorscope.h new file mode 100644 index 0000000..2b48850 --- /dev/null +++ b/src/monitorscope.h @@ -0,0 +1,62 @@ +/// +/// Copyright 2015-2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef LAMINAR_MONITORSCOPE_H_ +#define LAMINAR_MONITORSCOPE_H_ + +#include + +// Simple struct to define which information a frontend client is interested +// in, both in initial request phase and real-time updates. It corresponds +// loosely to frontend URLs +struct MonitorScope { + enum Type { + HOME, // home page: recent builds and statistics + ALL, // browse jobs + JOB, // a specific job page + RUN, // a specific run page + }; + + MonitorScope(Type type = HOME, std::string job = std::string(), uint num = 0) : + type(type), + job(job), + num(num), + page(0), + field("number"), + order_desc(true) + {} + + // whether this scope wants status information about the given job or run + bool wantsStatus(std::string ajob, uint anum = 0) const { + if(type == HOME || type == ALL) return true; + if(type == JOB) return ajob == job; + if(type == RUN) return ajob == job && anum == num; + return false; + } + + Type type; + std::string job; + uint num = 0; + // sorting + uint page = 0; + std::string field; + bool order_desc; +}; + +#endif // LAMINAR_MONITORSCOPE_H_ + diff --git a/src/resources/js/app.js b/src/resources/js/app.js index 7ea8667..f7a0a31 100644 --- a/src/resources/js/app.js +++ b/src/resources/js/app.js @@ -22,30 +22,30 @@ const timeScale = function(max){ ? { scale:function(v){return Math.round(v/60)/10}, label:'Minutes' } : { scale:function(v){return v;}, label:'Seconds' }; } - -const WebsocketHandler = function() { - function setupWebsocket(path, next) { - let ws = new WebSocket(document.head.baseURI.replace(/^http/,'ws') + path.substr(1)); - ws.onmessage = function(msg) { +const ServerEventHandler = function() { + function setupEventSource(path, query, next) { + const es = new EventSource(document.head.baseURI + path.substr(1) + query); + es.path = path; // save for later in case we need to add query params + es.onmessage = function(msg) { msg = JSON.parse(msg.data); - // "status" is the first message the websocket always delivers. + // "status" is the first message the server always delivers. // Use this to confirm the navigation. The component is not // created until next() is called, so creating a reference // for other message types must be deferred. There are some extra - // subtle checks here. If this websocket already has a component, + // subtle checks here. If this eventsource already has a component, // then this is not the first time the status message has been // received. If the frontend requests an update, the status message // should not be handled here, but treated the same as any other // message. An exception is if the connection has been lost - in // that case we should treat this as a "first-time" status message. - // this.comp.ws is used as a proxy for this. - if (msg.type === 'status' && (!this.comp || !this.comp.ws)) { + // this.comp.es is used as a proxy for this. + if (msg.type === 'status' && (!this.comp || !this.comp.es)) { next(comp => { // Set up bidirectional reference // 1. needed to reference the component for other msg types this.comp = comp; // 2. needed to close the ws on navigation away - comp.ws = this; + comp.es = this; // Update html and nav titles document.title = comp.$root.title = msg.title; // Calculate clock offset (used by ProgressUpdater) @@ -59,47 +59,35 @@ const WebsocketHandler = function() { if (!this.comp) return console.error("Page component was undefined"); else { + this.comp.$root.connected = true; this.comp.$root.showNotify(msg.type, msg.data); if(typeof this.comp[msg.type] === 'function') this.comp[msg.type](msg.data); } } - }; - ws.onclose = function(ev) { - // if this.comp isn't set, this connection has never been used - // and a re-connection isn't meaningful - if(!ev.wasClean && 'comp' in this) { - this.comp.$root.connected = false; - // remove the reference to the websocket from the component. - // This not only cleans up an unneeded reference but ensures a - // status message on reconnection is treated as "first-time" - delete this.comp.ws; - this.reconnectTimeout = setTimeout(()=>{ - var newWs = setupWebsocket(path, (fn) => { fn(this.comp); }); - // the next() callback won't happen if the server is still - // unreachable. Save the reference to the last component - // here so we can recover if/when it does return. This means - // passing this.comp in the next() callback above is redundant - // but necessary to keep the same implementation. - newWs.comp = this.comp; - }, 2000); - } } - return ws; - }; + es.onerror = function() { + this.comp.$root.connected = false; + } + return es; + } return { beforeRouteEnter(to, from, next) { - setupWebsocket(to.path, (fn) => { next(fn); }); + setupEventSource(to.path, '', (fn) => { next(fn); }); }, beforeRouteUpdate(to, from, next) { - this.ws.close(); - clearTimeout(this.ws.reconnectTimeout); - setupWebsocket(to.path, (fn) => { fn(this); next(); }); + this.es.close(); + setupEventSource(to.path, '', (fn) => { fn(this); next(); }); }, beforeRouteLeave(to, from, next) { - this.ws.close(); - clearTimeout(this.ws.reconnectTimeout); + this.es.close(); next(); + }, + methods: { + query(q) { + this.es.close(); + setupEventSource(this.es.path, '?' + Object.entries(q).map(([k,v])=>`${k}=${v}`).join('&'), (fn) => { fn(this); }); + } } }; }(); @@ -195,7 +183,7 @@ const Home = function() { return { template: '#home', - mixins: [WebsocketHandler, Utils, ProgressUpdater], + mixins: [ServerEventHandler, Utils, ProgressUpdater], data: function() { return state; }, @@ -432,7 +420,7 @@ const Jobs = function() { }; return { template: '#jobs', - mixins: [WebsocketHandler, Utils, ProgressUpdater], + mixins: [ServerEventHandler, Utils, ProgressUpdater], data: function() { return state; }, methods: { status: function(msg) { @@ -536,7 +524,7 @@ var Job = function() { var chtBt = null; return Vue.extend({ template: '#job', - mixins: [WebsocketHandler, Utils, ProgressUpdater], + mixins: [ServerEventHandler, Utils, ProgressUpdater], data: function() { return state; }, @@ -634,11 +622,11 @@ var Job = function() { }, page_next: function() { state.sort.page++; - this.ws.send(JSON.stringify(state.sort)); + this.query(state.sort) }, page_prev: function() { state.sort.page--; - this.ws.send(JSON.stringify(state.sort)); + this.query(state.sort) }, do_sort: function(field) { if(state.sort.field == field) { @@ -647,7 +635,7 @@ var Job = function() { state.sort.order = 'dsc'; state.sort.field = field; } - this.ws.send(JSON.stringify(state.sort)); + this.query(state.sort) } } }); @@ -690,7 +678,7 @@ const Run = function() { return { template: '#run', - mixins: [WebsocketHandler, Utils, ProgressUpdater], + mixins: [ServerEventHandler, Utils, ProgressUpdater], data: function() { return state; }, diff --git a/src/rpc.cpp b/src/rpc.cpp index c7a9a3e..e68b696 100644 --- a/src/rpc.cpp +++ b/src/rpc.cpp @@ -18,8 +18,7 @@ /// #include "rpc.h" #include "laminar.capnp.h" - -#include "interface.h" +#include "laminar.h" #include "log.h" namespace { @@ -38,16 +37,16 @@ LaminarCi::JobResult fromRunState(RunState state) { } // This is the implementation of the Laminar Cap'n Proto RPC interface. // As such, it implements the pure virtual interface generated from -// laminar.capnp with calls to the LaminarInterface -class RpcImpl : public LaminarCi::Server, public LaminarWaiter { +// laminar.capnp with calls to the primary Laminar class +class RpcImpl : public LaminarCi::Server { public: - RpcImpl(LaminarInterface& l) : + RpcImpl(Laminar& l) : LaminarCi::Server(), laminar(l) { } - ~RpcImpl() override { + virtual ~RpcImpl() { } // Queue a job, without waiting for it to start @@ -83,10 +82,9 @@ public: LLOG(INFO, "RPC run", jobName); std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams())); if(Run* r = run.get()) { - runWaiters[r].emplace_back(kj::newPromiseAndFulfiller()); - return runWaiters[r].back().promise.then([context,run](RunState state) mutable { + return r->whenFinished().then([context,r](RunState state) mutable { context.getResults().setResult(fromRunState(state)); - context.getResults().setBuildNum(run->build); + context.getResults().setBuildNum(r->build); }); } else { context.getResults().setResult(LaminarCi::JobResult::UNKNOWN); @@ -164,18 +162,11 @@ private: return res; } - // Implements LaminarWaiter::complete - void complete(const Run* r) override { - for(kj::PromiseFulfillerPair& w : runWaiters[r]) - w.fulfiller->fulfill(RunState(r->result)); - runWaiters.erase(r); - } -private: - LaminarInterface& laminar; + Laminar& laminar; std::unordered_map>> runWaiters; }; -Rpc::Rpc(LaminarInterface& li) : +Rpc::Rpc(Laminar& li) : rpcInterface(kj::heap(li)) {} diff --git a/src/rpc.h b/src/rpc.h index 823980e..bb2d096 100644 --- a/src/rpc.h +++ b/src/rpc.h @@ -23,11 +23,11 @@ #include #include -struct LaminarInterface; +struct Laminar; class Rpc { public: - Rpc(LaminarInterface &li); + Rpc(Laminar&li); kj::Promise accept(kj::Own&& connection); capnp::Capability::Client rpcInterface; diff --git a/src/run.cpp b/src/run.cpp index 4862652..73b917f 100644 --- a/src/run.cpp +++ b/src/run.cpp @@ -51,7 +51,8 @@ Run::Run(std::string name, ParamMap pm, kj::Path&& rootPath) : params(kj::mv(pm)), queuedAt(time(nullptr)), rootPath(kj::mv(rootPath)), - started(kj::newPromiseAndFulfiller()) + started(kj::newPromiseAndFulfiller()), + finished(kj::newPromiseAndFulfiller()) { for(auto it = params.begin(); it != params.end();) { if(it->first[0] == '=') { @@ -261,4 +262,6 @@ void Run::reaped(int status) { else if(status != 0) result = RunState::FAILED; // otherwise preserve earlier status + + finished.fulfiller->fulfill(RunState(result)); } diff --git a/src/run.h b/src/run.h index 79955d8..5fde79e 100644 --- a/src/run.h +++ b/src/run.h @@ -74,6 +74,7 @@ public: std::string reason() const; kj::Promise&& whenStarted() { return kj::mv(started.promise); } + kj::Promise&& whenFinished() { return kj::mv(finished.promise); } std::shared_ptr node; RunState result; @@ -108,6 +109,7 @@ private: std::list env; std::string reasonMsg; kj::PromiseFulfillerPair started; + kj::PromiseFulfillerPair finished; }; diff --git a/src/server.cpp b/src/server.cpp index f98ef4a..0944633 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -17,10 +17,10 @@ /// along with Laminar. If not, see /// #include "server.h" -#include "interface.h" #include "log.h" #include "rpc.h" #include "http.h" +#include "laminar.h" #include #include @@ -35,41 +35,11 @@ // a multiple of sizeof(struct signalfd_siginfo) == 128 #define PROC_IO_BUFSIZE 4096 -Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, - kj::StringPtr httpBindAddress) : - laminarInterface(li), - ioContext(kj::setupAsyncIo()), +Server::Server(kj::AsyncIoContext& io) : + ioContext(io), listeners(kj::heap(*this)), - childTasks(*this), - httpReady(kj::newPromiseAndFulfiller()), - http(kj::heap(li)), - rpc(kj::heap(li)) + childTasks(*this) { - // RPC task - if(rpcBindAddress.startsWith("unix:")) - unlink(rpcBindAddress.slice(strlen("unix:")).cStr()); - listeners->add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress) - .then([this](kj::Own&& addr) { - return acceptRpcClient(addr->listen()); - })); - - // HTTP task - if(httpBindAddress.startsWith("unix:")) - unlink(httpBindAddress.slice(strlen("unix:")).cStr()); - listeners->add(ioContext.provider->getNetwork().parseAddress(httpBindAddress) - .then([this](kj::Own&& addr) { - // TODO: a better way? Currently used only for testing - httpReady.fulfiller->fulfill(); - return http->startServer(ioContext.lowLevelProvider->getTimer(), addr->listen()); - })); - - // handle watched paths - { - inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); - pathWatch = readDescriptor(inotify_fd, [this](const char*, size_t){ - laminarInterface.notifyConfigChanged(); - }); - } } Server::~Server() { @@ -89,15 +59,12 @@ void Server::start() { // Shutdown sequence: // 1. stop accepting new connections listeners = nullptr; - // 2. abort current jobs. Most of the time this isn't necessary since - // systemd stop or other kill mechanism will send SIGTERM to the whole - // process group. - laminarInterface.abortAll(); - // 3. wait for all children to close + // 2. wait for all children to close childTasks.onEmpty().wait(ioContext.waitScope); - // 4. run the loop once more to send any pending output to websocket clients + // TODO not sure the comments below are true + // 3. run the loop once more to send any pending output to http clients ioContext.waitScope.poll(); - // 5. return: websockets will be destructed when class is deleted + // 4. return: http connections will be destructed when class is deleted } void Server::stop() { @@ -126,16 +93,52 @@ kj::Promise Server::onChildExit(kj::Maybe &pid) { return ioContext.unixEventPort.onChildExit(pid); } -void Server::addWatchPath(const char* dpath) { - inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE); +Server::PathWatcher& Server::watchPaths(std::function fn) +{ + struct PathWatcherImpl : public PathWatcher { + PathWatcher& addPath(const char* path) override { + inotify_add_watch(fd, path, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE); + return *this; + } + int fd; + }; + auto pwi = kj::heap(); + PathWatcher* pw = pwi.get(); + + pwi->fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); + listeners->add(readDescriptor(pwi->fd, [fn](const char*, size_t){ + fn(); + }).attach(kj::mv(pwi))); + return *pw; } -kj::Promise Server::acceptRpcClient(kj::Own&& listener) { +void Server::listenRpc(Rpc &rpc, kj::StringPtr rpcBindAddress) +{ + if(rpcBindAddress.startsWith("unix:")) + unlink(rpcBindAddress.slice(strlen("unix:")).cStr()); + listeners->add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress) + .then([this,&rpc](kj::Own&& addr) { + return acceptRpcClient(rpc, addr->listen()); + })); + +} + +void Server::listenHttp(Http &http, kj::StringPtr httpBindAddress) +{ + if(httpBindAddress.startsWith("unix:")) + unlink(httpBindAddress.slice(strlen("unix:")).cStr()); + listeners->add(ioContext.provider->getNetwork().parseAddress(httpBindAddress) + .then([this,&http](kj::Own&& addr) { + return http.startServer(ioContext.lowLevelProvider->getTimer(), addr->listen()); + })); +} + +kj::Promise Server::acceptRpcClient(Rpc& rpc, kj::Own&& listener) { kj::ConnectionReceiver& cr = *listener.get(); return cr.accept().then(kj::mvCapture(kj::mv(listener), - [this](kj::Own&& listener, kj::Own&& connection) { - childTasks.add(rpc->accept(kj::mv(connection))); - return acceptRpcClient(kj::mv(listener)); + [this, &rpc](kj::Own&& listener, kj::Own&& connection) { + addTask(rpc.accept(kj::mv(connection))); + return acceptRpcClient(rpc, kj::mv(listener)); })); } diff --git a/src/server.h b/src/server.h index ccb4a8a..048f388 100644 --- a/src/server.h +++ b/src/server.h @@ -25,19 +25,14 @@ #include #include -struct LaminarInterface; +struct Laminar; struct Http; struct Rpc; -// This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces -// and manages the program's asynchronous event loop +// This class manages the program's asynchronous event loop class Server final : public kj::TaskSet::ErrorHandler { public: - // Initializes the server with a LaminarInterface to handle requests from - // HTTP/Websocket or RPC clients and bind addresses for each of those - // interfaces. See the documentation for kj::AsyncIoProvider::getNetwork - // for a description of the address format - Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, kj::StringPtr httpBindAddress); + Server(kj::AsyncIoContext& ioContext); ~Server(); void start(); void stop(); @@ -52,32 +47,28 @@ public: // get a promise which resolves when a child process exits kj::Promise onChildExit(kj::Maybe& pid); - // add a path to be watched for changes - void addWatchPath(const char* dpath); + + struct PathWatcher { + virtual PathWatcher& addPath(const char* path) = 0; + }; + + PathWatcher& watchPaths(std::function); + + void listenRpc(Rpc& rpc, kj::StringPtr rpcBindAddress); + void listenHttp(Http& http, kj::StringPtr httpBindAddress); private: - kj::Promise acceptRpcClient(kj::Own&& listener); + kj::Promise acceptRpcClient(Rpc& rpc, kj::Own&& listener); kj::Promise handleFdRead(kj::AsyncInputStream* stream, char* buffer, std::function cb); void taskFailed(kj::Exception&& exception) override; private: int efd_quit; - LaminarInterface& laminarInterface; - kj::AsyncIoContext ioContext; + kj::AsyncIoContext& ioContext; kj::Own listeners; kj::TaskSet childTasks; kj::Maybe> reapWatch; - int inotify_fd; - kj::Maybe> pathWatch; - - // TODO: restructure so this isn't necessary - friend class ServerTest; - kj::PromiseFulfillerPair httpReady; - - // TODO: WIP - kj::Own http; - kj::Own rpc; }; #endif // LAMINAR_SERVER_H_ diff --git a/test/eventsource.h b/test/eventsource.h new file mode 100644 index 0000000..c124381 --- /dev/null +++ b/test/eventsource.h @@ -0,0 +1,71 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef LAMINAR_EVENTSOURCE_H_ +#define LAMINAR_EVENTSOURCE_H_ + +#include +#include +#include +#include + +class EventSource { +public: + EventSource(kj::AsyncIoContext& ctx, const char* httpConnectAddr, const char* path) : + networkAddress(ctx.provider->getNetwork().parseAddress(httpConnectAddr).wait(ctx.waitScope)), + httpClient(kj::newHttpClient(ctx.lowLevelProvider->getTimer(), headerTable, *networkAddress)), + headerTable(), + headers(headerTable), + buffer(kj::heapArrayBuilder(BUFFER_SIZE)) + { + headers.add("Accept", "text/event-stream"); + auto resp = httpClient->request(kj::HttpMethod::GET, path, headers).response.wait(ctx.waitScope); + promise = waitForMessages(resp.body.get(), 0).attach(kj::mv(resp)); + } + + const std::vector& messages() { + return receivedMessages; + } + +private: + kj::Own networkAddress; + kj::Own httpClient; + kj::HttpHeaderTable headerTable; + kj::HttpHeaders headers; + kj::ArrayBuilder buffer; + kj::Maybe> promise; + std::vector receivedMessages; + + kj::Promise waitForMessages(kj::AsyncInputStream* stream, ulong offset) { + return stream->read(buffer.asPtr().begin() + offset, 1, BUFFER_SIZE).then([=](size_t s) { + ulong end = offset + s; + buffer.asPtr().begin()[end] = '\0'; + if(strcmp(&buffer.asPtr().begin()[end - 2], "\n\n") == 0) { + rapidjson::Document d; + d.Parse(buffer.begin() + strlen("data: ")); + receivedMessages.emplace_back(kj::mv(d)); + end = 0; + } + return waitForMessages(stream, end); + }); + } + + static const int BUFFER_SIZE = 1024; +}; + +#endif // LAMINAR_EVENTSOURCE_H_ diff --git a/test/laminar-fixture.h b/test/laminar-fixture.h new file mode 100644 index 0000000..015e47c --- /dev/null +++ b/test/laminar-fixture.h @@ -0,0 +1,82 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef LAMINAR_FIXTURE_H_ +#define LAMINAR_FIXTURE_H_ + +#include +#include +#include "laminar.capnp.h" +#include "eventsource.h" +#include "tempdir.h" +#include "laminar.h" +#include "server.h" + +class LaminarFixture : public ::testing::Test { +public: + LaminarFixture() { + bind_rpc = std::string("unix:/") + tmp.path.toString(true).cStr() + "/rpc.sock"; + bind_http = std::string("unix:/") + tmp.path.toString(true).cStr() + "/http.sock"; + home = tmp.path.toString(true).cStr(); + tmp.fs->openSubdir(kj::Path{"cfg", "jobs"}, kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT); + settings.home = home.c_str(); + settings.bind_rpc = bind_rpc.c_str(); + settings.bind_http = bind_http.c_str(); + settings.archive_url = "/test-archive/"; + server = new Server(ioContext); + laminar = new Laminar(*server, settings); + } + ~LaminarFixture() noexcept(true) { + delete server; + delete laminar; + } + + kj::Own eventSource(const char* path) { + return kj::heap(ioContext, bind_http.c_str(), path); + } + + void defineJob(const char* name, const char* scriptContent) { + KJ_IF_MAYBE(f, tmp.fs->tryOpenFile(kj::Path{"cfg", "jobs", std::string(name) + ".run"}, + kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT | kj::WriteMode::EXECUTABLE)) { + (*f)->writeAll(std::string("#!/bin/sh\n") + scriptContent + "\n"); + } + } + + LaminarCi::Client client() { + if(!rpc) { + auto stream = ioContext.provider->getNetwork().parseAddress(bind_rpc).wait(ioContext.waitScope)->connect().wait(ioContext.waitScope); + auto net = kj::heap(*stream, capnp::rpc::twoparty::Side::CLIENT); + rpc = kj::heap>(*net, nullptr).attach(kj::mv(net), kj::mv(stream)); + } + static capnp::word scratch[4]; + memset(scratch, 0, sizeof(scratch)); + auto hostId = capnp::MallocMessageBuilder(scratch).getRoot(); + hostId.setSide(capnp::rpc::twoparty::Side::SERVER); + return rpc->bootstrap(hostId).castAs(); + } + + kj::Own> rpc; + TempDir tmp; + std::string home, bind_rpc, bind_http; + Settings settings; + Server* server; + Laminar* laminar; + static kj::AsyncIoContext ioContext; +}; + +#endif // LAMINAR_FIXTURE_H_ diff --git a/test/laminar-functional.cpp b/test/laminar-functional.cpp new file mode 100644 index 0000000..6afac07 --- /dev/null +++ b/test/laminar-functional.cpp @@ -0,0 +1,75 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include +#include "laminar-fixture.h" + +// TODO: consider handling this differently +kj::AsyncIoContext LaminarFixture::ioContext = kj::setupAsyncIo(); + +TEST_F(LaminarFixture, EmptyStatusMessageStructure) { + auto es = eventSource("/"); + ioContext.waitScope.poll(); + ASSERT_EQ(1, es->messages().size()); + + auto json = es->messages().front().GetObject(); + EXPECT_STREQ("status", json["type"].GetString()); + EXPECT_STREQ("Laminar", json["title"].GetString()); + EXPECT_LT(time(nullptr) - json["time"].GetInt64(), 1); + + auto data = json["data"].GetObject(); + EXPECT_TRUE(data.HasMember("recent")); + EXPECT_TRUE(data.HasMember("running")); + EXPECT_TRUE(data.HasMember("queued")); + EXPECT_TRUE(data.HasMember("executorsTotal")); + EXPECT_TRUE(data.HasMember("executorsBusy")); + EXPECT_TRUE(data.HasMember("buildsPerDay")); + EXPECT_TRUE(data.HasMember("buildsPerJob")); + EXPECT_TRUE(data.HasMember("timePerJob")); + EXPECT_TRUE(data.HasMember("resultChanged")); + EXPECT_TRUE(data.HasMember("lowPassRates")); + EXPECT_TRUE(data.HasMember("buildTimeChanges")); + EXPECT_TRUE(data.HasMember("buildTimeDist")); +} + +TEST_F(LaminarFixture, JobNotifyHomePage) { + defineJob("foo", "true"); + auto es = eventSource("/"); + + auto req = client().runRequest(); + req.setJobName("foo"); + ASSERT_EQ(LaminarCi::JobResult::SUCCESS, req.send().wait(ioContext.waitScope).getResult()); + + // wait for job completed + ioContext.waitScope.poll(); + + ASSERT_EQ(4, es->messages().size()); + + auto job_queued = es->messages().at(1).GetObject(); + EXPECT_STREQ("job_queued", job_queued["type"].GetString()); + EXPECT_STREQ("foo", job_queued["data"]["name"].GetString()); + + auto job_started = es->messages().at(2).GetObject(); + EXPECT_STREQ("job_started", job_started["type"].GetString()); + EXPECT_STREQ("foo", job_started["data"]["name"].GetString()); + + auto job_completed = es->messages().at(3).GetObject(); + EXPECT_STREQ("job_completed", job_completed["type"].GetString()); + EXPECT_STREQ("foo", job_completed["data"]["name"].GetString()); +} + diff --git a/test/main.cpp b/test/main.cpp new file mode 100644 index 0000000..e783abb --- /dev/null +++ b/test/main.cpp @@ -0,0 +1,28 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include +#include + +// gtest main supplied in order to call captureChildExit +int main(int argc, char **argv) { + kj::UnixEventPort::captureChildExit(); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/test-laminar.cpp b/test/test-laminar.cpp deleted file mode 100644 index 216d150..0000000 --- a/test/test-laminar.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/// -/// Copyright 2018 Oliver Giles -/// -/// This file is part of Laminar -/// -/// Laminar is free software: you can redistribute it and/or modify -/// it under the terms of the GNU General Public License as published by -/// the Free Software Foundation, either version 3 of the License, or -/// (at your option) any later version. -/// -/// Laminar is distributed in the hope that it will be useful, -/// but WITHOUT ANY WARRANTY; without even the implied warranty of -/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -/// GNU General Public License for more details. -/// -/// You should have received a copy of the GNU General Public License -/// along with Laminar. If not, see -/// -#include -#include -#include -#include "laminar.h" - -class TestLaminarClient : public LaminarClient { -public: - virtual void sendMessage(std::string p) { payload = p; } - std::string payload; -}; - -class LaminarTest : public testing::Test { -protected: - LaminarTest() : - testing::Test(), - laminar("/tmp") - {} - Laminar laminar; -}; - -TEST_F(LaminarTest, StatusMessageContainsTime) { - TestLaminarClient testClient; - laminar.sendStatus(&testClient); - rapidjson::Document d; - d.Parse(testClient.payload.c_str()); - ASSERT_TRUE(d.IsObject()); - ASSERT_TRUE(d.HasMember("time")); - EXPECT_GE(1, d["time"].GetInt() - time(nullptr)); -} diff --git a/test/test-server.cpp b/test/test-server.cpp deleted file mode 100644 index c5d365c..0000000 --- a/test/test-server.cpp +++ /dev/null @@ -1,138 +0,0 @@ -/// -/// Copyright 2018 Oliver Giles -/// -/// This file is part of Laminar -/// -/// Laminar is free software: you can redistribute it and/or modify -/// it under the terms of the GNU General Public License as published by -/// the Free Software Foundation, either version 3 of the License, or -/// (at your option) any later version. -/// -/// Laminar is distributed in the hope that it will be useful, -/// but WITHOUT ANY WARRANTY; without even the implied warranty of -/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -/// GNU General Public License for more details. -/// -/// You should have received a copy of the GNU General Public License -/// along with Laminar. If not, see -/// -#include -#include -#include -#include -#include "server.h" -#include "log.h" -#include "interface.h" -#include "laminar.capnp.h" -#include "tempdir.h" -#include "rpc.h" - -class MockLaminar : public LaminarInterface { -public: - LaminarClient* client = nullptr; - ~MockLaminar() {} - virtual void registerClient(LaminarClient* c) override { - ASSERT_EQ(nullptr, client); - client = c; - EXPECT_CALL(*this, sendStatus(client)).Times(testing::Exactly(1)); - } - - virtual void deregisterClient(LaminarClient* c) override { - ASSERT_EQ(client, c); - client = nullptr; - } - - // MOCK_METHOD does not seem to work with return values whose destructors have noexcept(false) - kj::Maybe> getArtefact(std::string path) override { return nullptr; } - - MOCK_METHOD2(queueJob, std::shared_ptr(std::string name, ParamMap params)); - MOCK_METHOD1(registerWaiter, void(LaminarWaiter* waiter)); - MOCK_METHOD1(deregisterWaiter, void(LaminarWaiter* waiter)); - MOCK_METHOD1(latestRun, uint(std::string)); - MOCK_METHOD4(handleLogRequest, bool(std::string, uint, std::string&, bool&)); - - MOCK_METHOD1(sendStatus, void(LaminarClient* client)); - MOCK_METHOD4(setParam, bool(std::string job, uint buildNum, std::string param, std::string value)); - MOCK_METHOD0(listQueuedJobs, const std::list>&()); - MOCK_METHOD0(listRunningJobs, const RunSet&()); - MOCK_METHOD0(listKnownJobs, std::list()); - - MOCK_METHOD0(getCustomCss, std::string()); - MOCK_METHOD2(handleBadgeRequest, bool(std::string, std::string&)); - MOCK_METHOD2(abort, bool(std::string, uint)); - MOCK_METHOD0(abortAll, void()); - MOCK_METHOD0(notifyConfigChanged, void()); -}; - -class ServerTest : public ::testing::Test { -protected: - void SetUp() override { - EXPECT_CALL(mockLaminar, registerWaiter(testing::_)); - EXPECT_CALL(mockLaminar, deregisterWaiter(testing::_)); - server = new Server(mockLaminar, "unix:"+std::string(tempDir.path.append("rpc.sock").toString(true).cStr()), "127.0.0.1:8080"); - } - void TearDown() override { - delete server; - } - - LaminarCi::Client client() const { - return server->rpc->rpcInterface.castAs(); - } - kj::WaitScope& ws() const { - return server->ioContext.waitScope; - } - void waitForHttpReady() { - server->httpReady.promise.wait(server->ioContext.waitScope); - } - - kj::Network& network() { return server->ioContext.provider->getNetwork(); } - TempDir tempDir; - MockLaminar mockLaminar; - Server* server; -}; - -TEST_F(ServerTest, RpcQueue) { - auto req = client().queueRequest(); - req.setJobName("foo"); - EXPECT_CALL(mockLaminar, queueJob("foo", ParamMap())).Times(testing::Exactly(1)); - req.send().wait(ws()); -} - -// Tests that agressively closed websockets are properly removed -// and will not be attempted to be contacted again -TEST_F(ServerTest, HttpWebsocketRST) { - waitForHttpReady(); - - // TODO: generalize - constexpr const char* WS = - "GET / HTTP/1.1\r\n" - "Host: localhost:8080\r\n" - "Connection: Upgrade\r\n" - "Upgrade: websocket\r\n" - "Sec-WebSocket-Key: GTFmrUCM9N6B32LdDE3Rzw==\r\n" - "Sec-WebSocket-Version: 13\r\n\r\n"; - - static char buffer[256]; - network().parseAddress("localhost:8080").then([this](kj::Own&& addr){ - return addr->connect().attach(kj::mv(addr)).then([this](kj::Own&& stream){ - kj::AsyncIoStream* s = stream.get(); - return s->write(WS, strlen(WS)).then([this,s](){ - // Read the websocket header response, ensure the client has been registered - return s->tryRead(buffer, 64, 256).then([this,s](size_t sz){ - EXPECT_LE(64, sz); - EXPECT_NE(nullptr, mockLaminar.client); - // agressively abort the connection - struct linger so_linger; - so_linger.l_onoff = 1; - so_linger.l_linger = 0; - s->setsockopt(SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); - return kj::Promise(kj::READY_NOW); - }); - }).attach(kj::mv(stream)); - }); - }).wait(ws()); - ws().poll(); - // Expect that the client has been cleared. If it has not, Laminar could - // try to write to the closed file descriptor, causing an exception - EXPECT_EQ(nullptr, mockLaminar.client); -} diff --git a/test/test-conf.cpp b/test/unit-conf.cpp similarity index 100% rename from test/test-conf.cpp rename to test/unit-conf.cpp diff --git a/test/test-database.cpp b/test/unit-database.cpp similarity index 100% rename from test/test-database.cpp rename to test/unit-database.cpp diff --git a/test/test-run.cpp b/test/unit-run.cpp similarity index 100% rename from test/test-run.cpp rename to test/unit-run.cpp