diff --git a/CMakeLists.txt b/CMakeLists.txt index b9aaebd..863ea12 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -85,7 +85,7 @@ generate_compressed_bins(${CMAKE_BINARY_DIR} js/vue-router.min.js js/vue.min.js js/ansi_up.js js/Chart.min.js css/bootstrap.min.css) # (see resources.cpp where these are fetched) -set(LAMINARD_SOURCES +set(LAMINARD_CORE_SOURCES src/database.cpp src/server.cpp src/laminar.cpp @@ -94,10 +94,12 @@ set(LAMINARD_SOURCES src/resources.cpp src/rpc.cpp src/run.cpp + laminar.capnp.c++ + index_html_size.h ) ## Server -add_executable(laminard ${LAMINARD_SOURCES} src/main.cpp laminar.capnp.c++ ${COMPRESSED_BINS} index_html_size.h) +add_executable(laminard ${LAMINARD_CORE_SOURCES} src/main.cpp ${COMPRESSED_BINS}) target_link_libraries(laminard capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z) ## Client @@ -109,8 +111,8 @@ set(BUILD_TESTS FALSE CACHE BOOL "Build tests") if(BUILD_TESTS) find_package(GTest REQUIRED) include_directories(${GTEST_INCLUDE_DIRS} src) - add_executable(laminar-tests ${LAMINARD_SOURCES} laminar.capnp.c++ src/resources.cpp ${COMPRESSED_BINS} test/test-conf.cpp test/test-database.cpp test/test-laminar.cpp test/test-run.cpp test/test-server.cpp) - target_link_libraries(laminar-tests ${GTEST_BOTH_LIBRARIES} gmock capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z) + add_executable(laminar-tests ${LAMINARD_CORE_SOURCES} ${COMPRESSED_BINS} test/main.cpp test/laminar-functional.cpp test/unit-conf.cpp test/unit-database.cpp test/unit-run.cpp) + target_link_libraries(laminar-tests ${GTEST_LIBRARY} capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z) endif() set(SYSTEMD_UNITDIR lib/systemd/system CACHE PATH "Path to systemd unit files") diff --git a/UserManual.md b/UserManual.md index b7c34e5..f5b2e71 100644 --- a/UserManual.md +++ b/UserManual.md @@ -88,13 +88,11 @@ Do not attempt to run laminar on port 80. This requires running as `root`, and L ## Running behind a reverse proxy -Laminar relies on WebSockets to provide a responsive, auto-updating display without polling. This may require extra support from your frontend webserver. +A reverse proxy is required if you want Laminar to share a port with other web services. It is also recommended to improve performance by serving artefacts directly or providing a caching layer for static assets. -For nginx, see [NGINX Reverse Proxy](https://www.nginx.com/resources/admin-guide/reverse-proxy/) and [WebSocket proxying](http://nginx.org/en/docs/http/websocket.html). +If you use [artefacts](#Archiving-artefacts), note that Laminar is not designed as a file server, and better performance will be achieved by allowing the frontend web server to serve the archive directory directly (e.g. using a `Location` directive). -For Apache, see [Apache Reverse Proxy](https://httpd.apache.org/docs/2.4/howto/reverse_proxy.html) and [mod_proxy_wstunnel](https://httpd.apache.org/docs/2.4/mod/mod_proxy_wstunnel.html). - -If you use [artefacts](#Archiving-artefacts), note that Laminar is not designed as a file server, and better performance will be achieved by allowing the frontend web server to directly serve the archive directory directly (e.g. using a `Location` directive). +Laminar uses Sever Sent Events to provide a responsive, auto-updating display without polling. Most frontend webservers should handle this without any extra configuration. If you use a reverse proxy to host Laminar at a subfolder instead of a subdomain root, the `` needs to be updated to ensure all links point to their proper targets. This can be done by setting `LAMINAR_BASE_URL` in `/etc/laminar.conf`. diff --git a/src/http.cpp b/src/http.cpp index aa6aba8..f575177 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -16,291 +16,290 @@ /// You should have received a copy of the GNU General Public License /// along with Laminar. If not, see /// -#include "interface.h" #include "http.h" #include "resources.h" +#include "monitorscope.h" #include "log.h" -#include -#include - -// This is the implementation of the HTTP/Websocket interface. It creates -// websocket connections as LaminarClients and registers them with the -// LaminarInterface so that status messages will be delivered to the client. -// On opening a websocket connection, it delivers a status snapshot message -// (see LaminarInterface::sendStatus) -class HttpImpl : public kj::HttpService { -public: - HttpImpl(LaminarInterface& laminar, kj::HttpHeaderTable&tbl) : - laminar(laminar), - responseHeaders(tbl) - {} - virtual ~HttpImpl() {} +#include "laminar.h" +// Helper class which wraps another class with calls to +// adding and removing a pointer to itself from a passed +// std::set reference. Used to keep track of currently +// connected clients +template +struct WithSetRef : public T { + WithSetRef(std::set& set, Args&& ...args) : + T(std::forward(args)...), + _set(set) + { + _set.insert(this); + } + ~WithSetRef() { + _set.erase(this); + } private: - class HttpChunkedClient : public LaminarClient { - public: - HttpChunkedClient(LaminarInterface& laminar) : - laminar(laminar) - {} - ~HttpChunkedClient() override { - laminar.deregisterClient(this); - } - void sendMessage(std::string payload) override { - chunks.push_back(kj::mv(payload)); - fulfiller->fulfill(); - } - void notifyJobFinished() override { - done = true; - fulfiller->fulfill(); - } - LaminarInterface& laminar; - std::list chunks; - // cannot use chunks.empty() because multiple fulfill()s - // could be coalesced - bool done = false; - kj::Own> fulfiller; - }; + std::set& _set; +}; - // Implements LaminarClient and holds the Websocket connection object. - // Automatically destructed when the promise created in request() resolves - // or is cancelled - class WebsocketClient : public LaminarClient { - public: - WebsocketClient(LaminarInterface& laminar, kj::Own&& ws) : - laminar(laminar), - ws(kj::mv(ws)) - {} - ~WebsocketClient() override { - laminar.deregisterClient(this); - } - virtual void sendMessage(std::string payload) override { - messages.emplace_back(kj::mv(payload)); - // sendMessage might be called several times before the event loop - // gets a chance to act on the fulfiller. So store the payload here - // where it can be fetched later and don't pass the payload with the - // fulfiller because subsequent calls to fulfill() are ignored. - fulfiller->fulfill(); - } - LaminarInterface& laminar; - kj::Own ws; - std::list messages; - kj::Own> fulfiller; - }; +struct EventPeer { + MonitorScope scope; + std::list pendingOutput; + kj::Own> fulfiller; +}; - kj::Promise websocketRead(WebsocketClient& lc) - { - return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) { - KJ_SWITCH_ONEOF(message) { - KJ_CASE_ONEOF(str, kj::String) { - rapidjson::Document d; - d.ParseInsitu(const_cast(str.cStr())); - if(d.HasMember("page") && d["page"].IsInt() && d.HasMember("field") && d["field"].IsString() && d.HasMember("order") && d["order"].IsString()) { - lc.scope.page = d["page"].GetInt(); - lc.scope.field = d["field"].GetString(); - lc.scope.order_desc = strcmp(d["order"].GetString(), "dsc") == 0; - laminar.sendStatus(&lc); - return websocketRead(lc); - } - } - KJ_CASE_ONEOF(close, kj::WebSocket::Close) { - // clean socket shutdown - return lc.ws->close(close.code, close.reason); - } - KJ_CASE_ONEOF_DEFAULT {} +struct LogWatcher { + std::string job; + uint run; + std::list pendingOutput; + kj::Own> fulfiller; +}; + +kj::Maybe fromUrl(std::string resource, char* query) { + MonitorScope scope; + + if(query) { + char *sk; + for(char* k = strtok_r(query, "&", &sk); k; k = strtok_r(nullptr, "&", &sk)) { + if(char* v = strchr(k, '=')) { + *v++ = '\0'; + if(strcmp(k, "page") == 0) + scope.page = atoi(v); + else if(strcmp(k, "field") == 0) + scope.field = v; + else if(strcmp(k, "order") == 0) + scope.order_desc = (strcmp(v, "dsc") == 0); } - // unhandled/unknown message - return lc.ws->disconnect(); - }, [](kj::Exception&& e){ - // server logs suggest early catching here avoids fatal exception later - // TODO: reproduce in unit test - LLOG(WARNING, e.getDescription()); - return kj::READY_NOW; - }); + } } - kj::Promise websocketWrite(WebsocketClient& lc) - { - auto paf = kj::newPromiseAndFulfiller(); - lc.fulfiller = kj::mv(paf.fulfiller); - return paf.promise.then([this,&lc]{ - kj::Promise p = kj::READY_NOW; - std::list messages = kj::mv(lc.messages); - for(std::string& s : messages) { - p = p.then([&s,&lc]{ - kj::String str = kj::str(s); - return lc.ws->send(str).attach(kj::mv(str)); - }); - } - return p.attach(kj::mv(messages)).then([this,&lc]{ - return websocketWrite(lc); - }); - }); + if(resource == "/") { + scope.type = MonitorScope::HOME; + return kj::mv(scope); } - kj::Promise websocketUpgraded(WebsocketClient& lc, std::string resource) { - // convert the requested URL to a MonitorScope - if(resource.substr(0, 5) == "/jobs") { - if(resource.length() == 5) { - lc.scope.type = MonitorScope::ALL; - } else { - resource = resource.substr(5); - size_t split = resource.find('/',1); - std::string job = resource.substr(1,split-1); - if(!job.empty()) { - lc.scope.job = job; - lc.scope.type = MonitorScope::JOB; - } - if(split != std::string::npos) { - size_t split2 = resource.find('/', split+1); - std::string run = resource.substr(split+1, split2-split); - if(!run.empty()) { - lc.scope.num = static_cast(atoi(run.c_str())); - lc.scope.type = MonitorScope::RUN; - } - if(split2 != std::string::npos && resource.compare(split2, 4, "/log") == 0) { - lc.scope.type = MonitorScope::LOG; - } - } - } - } - laminar.registerClient(&lc); - kj::Promise connection = websocketRead(lc).exclusiveJoin(websocketWrite(lc)); - // registerClient can happen after a successful websocket handshake. - // However, the connection might not be closed gracefully, so the - // corresponding deregister operation happens in the WebsocketClient - // destructor rather than the close handler or a then() clause - laminar.sendStatus(&lc); - return connection; + if(resource.substr(0, 5) != "/jobs") + return nullptr; + + if(resource.length() == 5) { + scope.type = MonitorScope::ALL; + return kj::mv(scope); } - // Parses the url of the form /log/NAME/NUMBER, filling in the passed - // references and returning true if successful. /log/NAME/latest is - // also allowed, in which case the num reference is set to 0 - bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) { - if(url.startsWith("/log/")) { - kj::StringPtr path = url.slice(5); - KJ_IF_MAYBE(sep, path.findFirst('/')) { - name = path.slice(0, *sep).begin(); - kj::StringPtr tail = path.slice(*sep+1); - num = static_cast(atoi(tail.begin())); - name.erase(*sep); - if(tail == "latest") - num = laminar.latestRun(name); - if(num > 0) - return true; - } + resource = resource.substr(5); + size_t split = resource.find('/',1); + std::string job = resource.substr(1,split-1); + if(job.empty()) + return nullptr; + + scope.job = job; + scope.type = MonitorScope::JOB; + if(split == std::string::npos) + return kj::mv(scope); + + size_t split2 = resource.find('/', split+1); + std::string run = resource.substr(split+1, split2-split); + if(run.empty()) + return nullptr; + + scope.num = static_cast(atoi(run.c_str())); + scope.type = MonitorScope::RUN; + return kj::mv(scope); +} + +// Parses the url of the form /log/NAME/NUMBER, filling in the passed +// references and returning true if successful. /log/NAME/latest is +// also allowed, in which case the num reference is set to 0 +bool Http::parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) { + if(url.startsWith("/log/")) { + kj::StringPtr path = url.slice(5); + KJ_IF_MAYBE(sep, path.findFirst('/')) { + name = path.slice(0, *sep).begin(); + kj::StringPtr tail = path.slice(*sep+1); + num = static_cast(atoi(tail.begin())); + name.erase(*sep); + if(tail == "latest") + num = laminar.latestRun(name); + if(num > 0) + return true; } - return false; } + return false; +} - kj::Promise writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) { - auto paf = kj::newPromiseAndFulfiller(); - client->fulfiller = kj::mv(paf.fulfiller); - return paf.promise.then([=]{ - kj::Promise p = kj::READY_NOW; - std::list chunks = kj::mv(client->chunks); - for(std::string& s : chunks) { - p = p.then([=,&s]{ - return stream->write(s.data(), s.size()); - }); - } - return p.attach(kj::mv(chunks)).then([=]{ - return client->done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream); +kj::Promise Http::cleanupPeers(kj::Timer& timer) +{ + return timer.afterDelay(15 * kj::SECONDS).then([&]{ + for(EventPeer* p : eventPeers) { + // an empty SSE message is a colon followed by two newlines + p->pendingOutput.push_back(":\n\n"); + p->fulfiller->fulfill(); + } + return cleanupPeers(timer); + }).eagerlyEvaluate(nullptr); +} + +kj::Promise writeEvents(EventPeer* peer, kj::AsyncOutputStream* stream) { + auto paf = kj::newPromiseAndFulfiller(); + peer->fulfiller = kj::mv(paf.fulfiller); + return paf.promise.then([=]{ + kj::Promise p = kj::READY_NOW; + std::list chunks = kj::mv(peer->pendingOutput); + for(std::string& s : chunks) { + p = p.then([=,&s]{ + return stream->write(s.data(), s.size()); }); + } + return p.attach(kj::mv(chunks)).then([=]{ + return writeEvents(peer, stream); + }); + }); +} + +kj::Promise writeLogChunk(LogWatcher* client, kj::AsyncOutputStream* stream) { + auto paf = kj::newPromiseAndFulfiller(); + client->fulfiller = kj::mv(paf.fulfiller); + return paf.promise.then([=](bool done){ + kj::Promise p = kj::READY_NOW; + std::list chunks = kj::mv(client->pendingOutput); + for(std::string& s : chunks) { + p = p.then([=,&s]{ + return stream->write(s.data(), s.size()); + }); + } + return p.attach(kj::mv(chunks)).then([=]{ + return done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream); }); + }); +} + +kj::Promise Http::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders &headers, kj::AsyncInputStream &requestBody, HttpService::Response &response) +{ + const char* start, *end, *content_type; + std::string badge; + // for log requests + std::string name; + uint num; + kj::HttpHeaders responseHeaders(*headerTable); + responseHeaders.clear(); + bool is_sse = false; + char* queryString = nullptr; + // Clients usually expect that http servers will ignore unknown query parameters, + // and expect to use this feature to work around browser limitations like there + // being no way to programatically force a resource to be reloaded from the server + // (without "Cache-Control: no-store", which is overkill). See issue #89. + // So first parse any query parameters we *are* interested in, then simply remove + // them from the URL, to make comparisions easier. + KJ_IF_MAYBE(queryIdx, url.findFirst('?')) { + const_cast(url.begin())[*queryIdx] = '\0'; + queryString = const_cast(url.begin() + *queryIdx + 1); + url = url.begin(); } - virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, - kj::AsyncInputStream& requestBody, Response& response) override - { - if(headers.isWebSocket()) { - responseHeaders.clear(); - kj::Own lc = kj::heap(laminar, response.acceptWebSocket(responseHeaders)); - return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc)); - } else { - // handle regular HTTP request - const char* start, *end, *content_type; - std::string badge; - // for log requests - std::string name; - uint num; - responseHeaders.clear(); - // Clients usually expect that http servers will ignore unknown query parameters, - // and expect to use this feature to work around browser limitations like there - // being no way to programatically force a resource to be reloaded from the server - // (without "Cache-Control: no-store", which is overkill). See issue #89. - // Since we currently don't handle any query parameters at all, the easiest way - // to achieve this is unconditionally remove all query parameters from the request. - // This will need to be redone if we ever accept query parameters, which probably - // will happen as part of issue #90. - KJ_IF_MAYBE(queryIdx, url.findFirst('?')) { - const_cast(url.begin())[*queryIdx] = '\0'; - url = url.begin(); - } - if(url.startsWith("/archive/")) { - KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) { - auto array = (*file)->mmap(0, (*file)->stat().size); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - auto stream = response.send(200, "OK", responseHeaders, array.size()); - return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); - } - } else if(parseLogEndpoint(url, name, num)) { - kj::Own cc = kj::heap(laminar); - cc->scope.job = name; - cc->scope.num = num; - bool complete; - std::string output; - cc->scope.type = MonitorScope::LOG; - if(laminar.handleLogRequest(name, num, output, complete)) { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8"); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output. - responseHeaders.add("X-Accel-Buffering", "no"); - auto stream = response.send(200, "OK", responseHeaders, nullptr); - laminar.registerClient(cc.get()); - return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{ - if(complete) - return kj::Promise(kj::READY_NOW); - return writeLogChunk(c, s); - }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc)); - } - } else if(url == "/custom/style.css") { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8"); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - std::string css = laminar.getCustomCss(); - auto stream = response.send(200, "OK", responseHeaders, css.size()); - return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream)); - } else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); - responseHeaders.add("Content-Encoding", "gzip"); - responseHeaders.add("Content-Transfer-Encoding", "binary"); - auto stream = response.send(200, "OK", responseHeaders, end-start); - return stream->write(start, end-start).attach(kj::mv(stream)); - } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) { - responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml"); - responseHeaders.add("Cache-Control", "no-cache"); - auto stream = response.send(200, "OK", responseHeaders, badge.size()); - return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream)); - } - return response.sendError(404, "Not Found", responseHeaders); + KJ_IF_MAYBE(accept, headers.get(ACCEPT)) { + is_sse = (*accept == "text/event-stream"); + } + + if(is_sse) { + KJ_IF_MAYBE(s, fromUrl(url.cStr(), queryString)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/event-stream"); + auto peer = kj::heap>(eventPeers); + peer->scope = *s; + std::string st = "data: " + laminar.getStatus(peer->scope) + "\n\n"; + auto stream = response.send(200, "OK", responseHeaders); + return stream->write(st.data(), st.size()).attach(kj::mv(st)).then([=,s=stream.get(),p=peer.get()]{ + return writeEvents(p,s); + }).attach(kj::mv(stream)).attach(kj::mv(peer)); } + } else if(url.startsWith("/archive/")) { + KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) { + auto array = (*file)->mmap(0, (*file)->stat().size); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + auto stream = response.send(200, "OK", responseHeaders, array.size()); + return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); + } + } else if(parseLogEndpoint(url, name, num)) { + auto lw = kj::heap>(logWatchers); + lw->job = name; + lw->run = num; + bool complete; + std::string output; + if(laminar.handleLogRequest(name, num, output, complete)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output. + responseHeaders.add("X-Accel-Buffering", "no"); + auto stream = response.send(200, "OK", responseHeaders, nullptr); + return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=lw.get()]{ + if(complete) + return kj::Promise(kj::READY_NOW); + return writeLogChunk(c, s); + }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(lw)); + } + } else if(url == "/custom/style.css") { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + std::string css = laminar.getCustomCss(); + auto stream = response.send(200, "OK", responseHeaders, css.size()); + return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream)); + } else if(resources->handleRequest(url.cStr(), &start, &end, &content_type)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); + responseHeaders.add("Content-Encoding", "gzip"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + auto stream = response.send(200, "OK", responseHeaders, end-start); + return stream->write(start, end-start).attach(kj::mv(stream)); + } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml"); + responseHeaders.add("Cache-Control", "no-cache"); + auto stream = response.send(200, "OK", responseHeaders, badge.size()); + return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream)); } + return response.sendError(404, "Not Found", responseHeaders); +} - LaminarInterface& laminar; - Resources resources; - kj::HttpHeaders responseHeaders; -}; +Http::Http(Laminar &li) : + laminar(li), + resources(kj::heap()) +{ + kj::HttpHeaderTable::Builder builder; + ACCEPT = builder.add("Accept"); + headerTable = builder.build(); +} + +Http::~Http() +{ + KJ_ASSERT(logWatchers.size() == 0); + KJ_ASSERT(eventPeers.size() == 0); +} -Http::Http(LaminarInterface &li) : - headerTable(kj::heap()), - httpService(kj::heap(li, *headerTable)), - laminar(li) +kj::Promise Http::startServer(kj::Timer& timer, kj::Own&& listener) { + kj::Own server = kj::heap(timer, *headerTable, *this); + return server->listenHttp(*listener).attach(cleanupPeers(timer)).attach(kj::mv(listener)).attach(kj::mv(server)); +} +void Http::notifyEvent(const char *type, const char *data, std::string job, uint run) +{ + for(EventPeer* c : eventPeers) { + if(c->scope.wantsStatus(job, run) + // The run page also should know that another job has started + // (so maybe it can show a previously hidden "next" button). + // Hence this small hack: + // TODO obviate + || (std::string(type)=="job_started" && c->scope.type == MonitorScope::Type::RUN && c->scope.job == job)) + { + c->pendingOutput.push_back("data: " + std::string(data) + "\n\n"); + c->fulfiller->fulfill(); + } + } } -kj::Promise Http::startServer(kj::Timer& timer, kj::Own&& listener) { - auto httpServer = kj::heap(timer, *headerTable, *httpService); - return httpServer->listenHttp(*listener).attach(kj::mv(listener)).attach(kj::mv(httpServer)); +void Http::notifyLog(std::string job, uint run, std::string log_chunk, bool eot) +{ + for(LogWatcher* lw : logWatchers) { + if(lw->job == job && lw->run == run) { + lw->pendingOutput.push_back(kj::mv(log_chunk)); + lw->fulfiller->fulfill(kj::mv(eot)); + } + } } diff --git a/src/http.h b/src/http.h index 5098b39..909a244 100644 --- a/src/http.h +++ b/src/http.h @@ -19,18 +19,48 @@ #ifndef LAMINAR_HTTP_H_ #define LAMINAR_HTTP_H_ +#include #include +#include +#include -struct LaminarInterface; +// Definition needed for musl +typedef unsigned int uint; +typedef unsigned long ulong; -class Http { +struct Laminar; +struct Resources; +struct LogWatcher; +struct EventPeer; + +class Http : public kj::HttpService { public: - Http(LaminarInterface &li); - kj::Promise startServer(kj::Timer &timer, kj::Own&& listener); + Http(Laminar&li); + virtual ~Http(); + + kj::Promise startServer(kj::Timer &timer, kj::Own &&listener); + + void notifyEvent(const char* type, const char* data, std::string job = nullptr, uint run = 0); + void notifyLog(std::string job, uint run, std::string log_chunk, bool eot); +private: + virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, Response& response) override; + bool parseLogEndpoint(kj::StringPtr url, std::string &name, uint &num); + + // With SSE, there is no notification if a client disappears. Also, an idle + // client must be kept alive if there is no activity in their MonitorScope. + // Deal with these by sending a periodic keepalive and reaping the client if + // the write fails. + kj::Promise cleanupPeers(kj::Timer &timer); + + Laminar& laminar; + std::set eventPeers; kj::Own headerTable; - kj::Own httpService; - LaminarInterface& laminar; + kj::Own resources; + std::set logWatchers; + + kj::HttpHeaderId ACCEPT; }; #endif //LAMINAR_HTTP_H_ diff --git a/src/interface.h b/src/interface.h deleted file mode 100644 index 25387ea..0000000 --- a/src/interface.h +++ /dev/null @@ -1,175 +0,0 @@ -/// -/// Copyright 2015-2019 Oliver Giles -/// -/// This file is part of Laminar -/// -/// Laminar is free software: you can redistribute it and/or modify -/// it under the terms of the GNU General Public License as published by -/// the Free Software Foundation, either version 3 of the License, or -/// (at your option) any later version. -/// -/// Laminar is distributed in the hope that it will be useful, -/// but WITHOUT ANY WARRANTY; without even the implied warranty of -/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -/// GNU General Public License for more details. -/// -/// You should have received a copy of the GNU General Public License -/// along with Laminar. If not, see -/// -#ifndef LAMINAR_INTERFACE_H_ -#define LAMINAR_INTERFACE_H_ - -#include "run.h" - -#include -#include -#include - -// Simple struct to define which information a frontend client is interested -// in, both in initial request phase and real-time updates. It corresponds -// loosely to frontend URLs -struct MonitorScope { - enum Type { - HOME, // home page: recent builds and statistics - ALL, // browse jobs - JOB, // a specific job page - RUN, // a specific run page - LOG // a run's log page - }; - - MonitorScope(Type type = HOME, std::string job = std::string(), uint num = 0) : - type(type), - job(job), - num(num), - page(0), - field("number"), - order_desc(true) - {} - - // whether this scope wants status information about the given job or run - bool wantsStatus(std::string ajob, uint anum = 0) const { - if(type == HOME || type == ALL) return true; - if(type == JOB) return ajob == job; - if(type == RUN) return ajob == job && anum == num; - return false; - } - - bool wantsLog(std::string ajob, uint anum) const { - return type == LOG && ajob == job && anum == num; - } - - Type type; - std::string job; - uint num = 0; - // sorting - uint page = 0; - std::string field; - bool order_desc; -}; - -// Represents a (websocket) client that wants to be notified about events -// matching the supplied scope. Pass instances of this to LaminarInterface -// registerClient and deregisterClient -struct LaminarClient { - virtual ~LaminarClient() noexcept(false) {} - virtual void sendMessage(std::string payload) = 0; - // TODO: redesign - virtual void notifyJobFinished() {} - MonitorScope scope; -}; - -// Represents a (rpc) client that wants to be notified about run completion. -// Pass instances of this to LaminarInterface registerWaiter and -// deregisterWaiter -struct LaminarWaiter { - virtual ~LaminarWaiter() =default; - virtual void complete(const Run*) = 0; -}; - -// Represents a file mapped in memory. Used to serve artefacts -struct MappedFile { - virtual ~MappedFile() =default; - virtual const void* address() = 0; - virtual size_t size() = 0; -}; - -// The interface connecting the network layer to the application business -// logic. These methods fulfil the requirements of both the HTTP/Websocket -// and RPC interfaces. -struct LaminarInterface { - virtual ~LaminarInterface() {} - - // Queues a job, returns immediately. Return value will be nullptr if - // the supplied name is not a known job. - virtual std::shared_ptr queueJob(std::string name, ParamMap params = ParamMap()) = 0; - - // Register a client (but don't give up ownership). The client will be - // notified with a JSON message of any events matching its scope - // (see LaminarClient and MonitorScope above) - virtual void registerClient(LaminarClient* client) = 0; - - // Call this before destroying a client so that Laminar doesn't try - // to call LaminarClient::sendMessage on invalid data - virtual void deregisterClient(LaminarClient* client) = 0; - - // Register a waiter (but don't give up ownership). The waiter will be - // notified with a callback of any run completion (see LaminarWaiter above) - virtual void registerWaiter(LaminarWaiter* waiter) = 0; - - // Call this before destroying a waiter so that Laminar doesn't try - // to call LaminarWaiter::complete on invalid data - virtual void deregisterWaiter(LaminarWaiter* waiter) = 0; - - // Return the latest known number of the named job - virtual uint latestRun(std::string job) = 0; - - // Given a job name and number, return existence and (via reference params) - // its current log output and whether the job is ongoing - virtual bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) = 0; - - // Synchronously send a snapshot of the current status to the given - // client (as governed by the client's MonitorScope). This is called on - // initial websocket connect. - virtual void sendStatus(LaminarClient* client) = 0; - - // Implements the laminar client interface allowing the setting of - // arbitrary parameters on a run (usually itself) to be available in - // the environment of subsequent scripts. - virtual bool setParam(std::string job, uint buildNum, std::string param, std::string value) = 0; - - // Gets the list of jobs currently waiting in the execution queue - virtual const std::list>& listQueuedJobs() = 0; - - // Gets the list of currently executing jobs - virtual const RunSet& listRunningJobs() = 0; - - // Gets the list of known jobs - scans cfg/jobs for *.run files - virtual std::list listKnownJobs() = 0; - - // Fetches the content of an artifact given its filename relative to - // $LAMINAR_HOME/archive. Ideally, this would instead be served by a - // proper web server which handles this url. - virtual kj::Maybe> getArtefact(std::string path) = 0; - - // Given the name of a job, populate the provided string reference with - // SVG content describing the last known state of the job. Returns false - // if the job is unknown. - virtual bool handleBadgeRequest(std::string job, std::string& badge) = 0; - - // Fetches the content of $LAMINAR_HOME/custom/style.css or an empty - // string. Ideally, this would instead be served by a proper web server - // which handles this url. - virtual std::string getCustomCss() = 0; - - // Aborts a single job - virtual bool abort(std::string job, uint buildNum) = 0; - - // Abort all running jobs - virtual void abortAll() = 0; - - // Callback to handle a configuration modification notification - virtual void notifyConfigChanged() = 0; -}; - -#endif // LAMINAR_INTERFACE_H_ - diff --git a/src/laminar.cpp b/src/laminar.cpp index 166fc0d..5170319 100644 --- a/src/laminar.cpp +++ b/src/laminar.cpp @@ -20,6 +20,8 @@ #include "server.h" #include "conf.h" #include "log.h" +#include "http.h" +#include "rpc.h" #include #include @@ -35,7 +37,7 @@ #include // rapidjson::Writer with a StringBuffer is used a lot in Laminar for -// preparing JSON messages to send to Websocket clients. A small wrapper +// preparing JSON messages to send to HTTP clients. A small wrapper // class here reduces verbosity later for this common use case. class Json : public rapidjson::Writer { public: @@ -52,13 +54,6 @@ template<> Json& Json::set(const char* key, double value) { String(key); Double( template<> Json& Json::set(const char* key, const char* value) { String(key); String(value); return *this; } template<> Json& Json::set(const char* key, std::string value) { String(key); String(value.c_str()); return *this; } -namespace { -// Default values when none were supplied in $LAMINAR_CONF_FILE (/etc/laminar.conf) -constexpr const char* INTADDR_RPC_DEFAULT = "unix-abstract:laminar"; -constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080"; -constexpr const char* ARCHIVE_URL_DEFAULT = "/archive/"; -} - // short syntax helpers for kj::Path template inline kj::Path operator/(const kj::Path& p, const T& ext) { @@ -71,18 +66,19 @@ inline kj::Path operator/(const std::string& p, const T& ext) { typedef std::string str; -Laminar::Laminar(const char *home) : - homePath(kj::Path::parse(&home[1])), - fsHome(kj::newDiskFilesystem()->getRoot().openSubdir(homePath, kj::WriteMode::MODIFY)) +Laminar::Laminar(Server &server, Settings settings) : + settings(settings), + srv(server), + homePath(kj::Path::parse(&settings.home[1])), + fsHome(kj::newDiskFilesystem()->getRoot().openSubdir(homePath, kj::WriteMode::MODIFY)), + http(kj::heap(*this)), + rpc(kj::heap(*this)) { - LASSERT(home[0] == '/'); + LASSERT(settings.home[0] == '/'); - archiveUrl = ARCHIVE_URL_DEFAULT; - if(char* envArchive = getenv("LAMINAR_ARCHIVE_URL")) { - archiveUrl = envArchive; - if(archiveUrl.back() != '/') - archiveUrl.append("/"); - } + archiveUrl = settings.archive_url; + if(archiveUrl.back() != '/') + archiveUrl.append("/"); numKeepRunDirs = 0; @@ -103,29 +99,22 @@ Laminar::Laminar(const char *home) : buildNums[name] = build; }); - srv = nullptr; + srv.watchPaths([this]{ + LLOG(INFO, "Reloading configuration"); + loadConfiguration(); + // config change may allow stuck jobs to dequeue + assignNewJobs(); + }).addPath((homePath/"cfg"/"nodes").toString(true).cStr()) + .addPath((homePath/"cfg"/"jobs").toString(true).cStr()); + + srv.listenRpc(*rpc, settings.bind_rpc); + srv.listenHttp(*http, settings.bind_http); // Load configuration, may be called again in response to an inotify event // that the configuration files have been modified loadConfiguration(); } -void Laminar::registerClient(LaminarClient* client) { - clients.insert(client); -} - -void Laminar::deregisterClient(LaminarClient* client) { - clients.erase(client); -} - -void Laminar::registerWaiter(LaminarWaiter *waiter) { - waiters.insert(waiter); -} - -void Laminar::deregisterWaiter(LaminarWaiter *waiter) { - waiters.erase(waiter); -} - uint Laminar::latestRun(std::string job) { auto it = activeJobs.byJobName().equal_range(job); if(it.first == it.second) { @@ -141,8 +130,6 @@ uint Laminar::latestRun(std::string job) { } } -// TODO: reunify with sendStatus. The difference is that this method is capable of -// returning "not found" to the caller, and sendStatus isn't bool Laminar::handleLogRequest(std::string name, uint num, std::string& output, bool& complete) { if(Run* run = activeRun(name, num)) { output = run->log; @@ -216,40 +203,15 @@ void Laminar::populateArtifacts(Json &j, std::string job, uint num) const { } } -void Laminar::sendStatus(LaminarClient* client) { - if(client->scope.type == MonitorScope::LOG) { - // If the requested job is currently in progress - if(const Run* run = activeRun(client->scope.job, client->scope.num)) { - client->sendMessage(run->log.c_str()); - } else { // it must be finished, fetch it from the database - db->stmt("SELECT output, outputLen FROM builds WHERE name = ? AND number = ?") - .bind(client->scope.job, client->scope.num) - .fetch([=](str maybeZipped, unsigned long sz) { - str log(sz+1,'\0'); - if(sz >= COMPRESS_LOG_MIN_SIZE) { - int res = ::uncompress((uint8_t*) log.data(), &sz, - (const uint8_t*) maybeZipped.data(), maybeZipped.size()); - if(res == Z_OK) - client->sendMessage(log); - else - LLOG(ERROR, "Failed to uncompress log"); - } else { - client->sendMessage(maybeZipped); - } - }); - client->notifyJobFinished(); - } - return; - } - +std::string Laminar::getStatus(MonitorScope scope) { Json j; j.set("type", "status"); j.set("title", getenv("LAMINAR_TITLE") ?: "Laminar"); j.set("time", time(nullptr)); j.startObject("data"); - if(client->scope.type == MonitorScope::RUN) { + if(scope.type == MonitorScope::RUN) { db->stmt("SELECT queuedAt,startedAt,completedAt,result,reason,parentJob,parentBuild FROM builds WHERE name = ? AND number = ?") - .bind(client->scope.job, client->scope.num) + .bind(scope.job, scope.num) .fetch([&](time_t queued, time_t started, time_t completed, int result, std::string reason, std::string parentJob, uint parentBuild) { j.set("queued", started-queued); j.set("started", started); @@ -258,7 +220,7 @@ void Laminar::sendStatus(LaminarClient* client) { j.set("reason", reason); j.startObject("upstream").set("name", parentJob).set("num", parentBuild).EndObject(2); }); - if(const Run* run = activeRun(client->scope.job, client->scope.num)) { + if(const Run* run = activeRun(scope.job, scope.num)) { j.set("queued", run->startedAt - run->queuedAt); j.set("started", run->startedAt); j.set("result", to_string(RunState::RUNNING)); @@ -270,30 +232,30 @@ void Laminar::sendStatus(LaminarClient* client) { j.set("etc", run->startedAt + lastRuntime); }); } - j.set("latestNum", int(buildNums[client->scope.job])); + j.set("latestNum", int(buildNums[scope.job])); j.startArray("artifacts"); - populateArtifacts(j, client->scope.job, client->scope.num); + populateArtifacts(j, scope.job, scope.num); j.EndArray(); - } else if(client->scope.type == MonitorScope::JOB) { + } else if(scope.type == MonitorScope::JOB) { const uint runsPerPage = 10; j.startArray("recent"); // ORDER BY param cannot be bound std::string order_by; - std::string direction = client->scope.order_desc ? "DESC" : "ASC"; - if(client->scope.field == "number") + std::string direction = scope.order_desc ? "DESC" : "ASC"; + if(scope.field == "number") order_by = "number " + direction; - else if(client->scope.field == "result") + else if(scope.field == "result") order_by = "result " + direction + ", number DESC"; - else if(client->scope.field == "started") + else if(scope.field == "started") order_by = "startedAt " + direction + ", number DESC"; - else if(client->scope.field == "duration") + else if(scope.field == "duration") order_by = "(completedAt-startedAt) " + direction + ", number DESC"; else order_by = "number DESC"; std::string stmt = "SELECT number,startedAt,completedAt,result,reason FROM builds WHERE name = ? ORDER BY " + order_by + " LIMIT ?,?"; db->stmt(stmt.c_str()) - .bind(client->scope.job, client->scope.page * runsPerPage, runsPerPage) + .bind(scope.job, scope.page * runsPerPage, runsPerPage) .fetch([&](uint build,time_t started,time_t completed,int result,str reason){ j.StartObject(); j.set("number", build) @@ -305,18 +267,18 @@ void Laminar::sendStatus(LaminarClient* client) { }); j.EndArray(); db->stmt("SELECT COUNT(*),AVG(completedAt-startedAt) FROM builds WHERE name = ?") - .bind(client->scope.job) + .bind(scope.job) .fetch([&](uint nRuns, uint averageRuntime){ j.set("averageRuntime", averageRuntime); j.set("pages", (nRuns-1) / runsPerPage + 1); j.startObject("sort"); - j.set("page", client->scope.page) - .set("field", client->scope.field) - .set("order", client->scope.order_desc ? "dsc" : "asc") + j.set("page", scope.page) + .set("field", scope.field) + .set("order", scope.order_desc ? "dsc" : "asc") .EndObject(); }); j.startArray("running"); - auto p = activeJobs.byJobName().equal_range(client->scope.job); + auto p = activeJobs.byJobName().equal_range(scope.job); for(auto it = p.first; it != p.second; ++it) { const std::shared_ptr run = *it; j.StartObject(); @@ -330,27 +292,27 @@ void Laminar::sendStatus(LaminarClient* client) { j.EndArray(); int nQueued = 0; for(const auto& run : queuedJobs) { - if (run->name == client->scope.job) { + if (run->name == scope.job) { nQueued++; } } j.set("nQueued", nQueued); db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result = ? ORDER BY completedAt DESC LIMIT 1") - .bind(client->scope.job, int(RunState::SUCCESS)) + .bind(scope.job, int(RunState::SUCCESS)) .fetch([&](int build, time_t started){ j.startObject("lastSuccess"); j.set("number", build).set("started", started); j.EndObject(); }); db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result <> ? ORDER BY completedAt DESC LIMIT 1") - .bind(client->scope.job, int(RunState::SUCCESS)) + .bind(scope.job, int(RunState::SUCCESS)) .fetch([&](int build, time_t started){ j.startObject("lastFailed"); j.set("number", build).set("started", started); j.EndObject(); }); - } else if(client->scope.type == MonitorScope::ALL) { + } else if(scope.type == MonitorScope::ALL) { j.startArray("jobs"); db->stmt("SELECT name,number,startedAt,completedAt,result FROM builds b JOIN (SELECT name n,MAX(number) l FROM builds GROUP BY n) q ON b.name = q.n AND b.number = q.l") .fetch([&](str name,uint number, time_t started, time_t completed, int result){ @@ -512,31 +474,16 @@ void Laminar::sendStatus(LaminarClient* client) { } j.EndObject(); - client->sendMessage(j.str()); + return j.str(); } Laminar::~Laminar() noexcept try { delete db; - delete srv; } catch (std::exception& e) { LLOG(ERROR, e.what()); return; } -void Laminar::run() { - const char* listen_rpc = getenv("LAMINAR_BIND_RPC") ?: INTADDR_RPC_DEFAULT; - const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT; - - srv = new Server(*this, listen_rpc, listen_http); - srv->addWatchPath((homePath/"cfg"/"nodes").toString(true).cStr()); - srv->addWatchPath((homePath/"cfg"/"jobs").toString(true).cStr()); - srv->start(); -} - -void Laminar::stop() { - srv->stop(); -} - bool Laminar::loadConfiguration() { if(const char* ndirs = getenv("LAMINAR_KEEP_RUNDIRS")) numKeepRunDirs = static_cast(atoi(ndirs)); @@ -627,24 +574,12 @@ std::shared_ptr Laminar::queueJob(std::string name, ParamMap params) { .startObject("data") .set("name", name) .EndObject(); - const char* msg = j.str(); - for(LaminarClient* c : clients) { - if(c->scope.wantsStatus(name)) - c->sendMessage(msg); - } + http->notifyEvent("job_queued", j.str(), name.c_str()); assignNewJobs(); return run; } -void Laminar::notifyConfigChanged() -{ - LLOG(INFO, "Reloading configuration"); - loadConfiguration(); - // config change may allow stuck jobs to dequeue - assignNewJobs(); -} - bool Laminar::abort(std::string job, uint buildNum) { if(Run* run = activeRun(job, buildNum)) { run->abort(true); @@ -700,11 +635,11 @@ bool Laminar::tryStartRun(std::shared_ptr run, int queueIndex) { runFinished(run.get()); }); if(run->timeout > 0) { - exec = exec.attach(srv->addTimeout(run->timeout, [r=run.get()](){ + exec = exec.attach(srv.addTimeout(run->timeout, [r=run.get()](){ r->abort(true); })); } - srv->addTask(kj::mv(exec)); + srv.addTask(kj::mv(exec)); LLOG(INFO, "Started job on node", run->name, run->build, node->name); // update next build number @@ -731,16 +666,7 @@ bool Laminar::tryStartRun(std::shared_ptr run, int queueIndex) { } j.EndArray(); j.EndObject(); - const char* msg = j.str(); - for(LaminarClient* c : clients) { - if(c->scope.wantsStatus(run->name, run->build) - // The run page also should know that another job has started - // (so maybe it can show a previously hidden "next" button). - // Hence this small hack: - || (c->scope.type == MonitorScope::Type::RUN && c->scope.job == run->name)) - c->sendMessage(msg); - } - + http->notifyEvent("job_started", j.str(), run->name.c_str(), run->build); return true; } } @@ -765,17 +691,14 @@ kj::Promise Laminar::handleRunStep(Run* run) { return kj::READY_NOW; } - kj::Promise exited = srv->onChildExit(run->current_pid); + kj::Promise exited = srv.onChildExit(run->current_pid); // promise is fulfilled when the process is reaped. But first we wait for all // output from the pipe (Run::output_fd) to be consumed. - return srv->readDescriptor(run->output_fd, [this,run](const char*b,size_t n){ + return srv.readDescriptor(run->output_fd, [this,run](const char*b,size_t n){ // handle log output std::string s(b, n); run->log += s; - for(LaminarClient* c : clients) { - if(c->scope.wantsLog(run->name, run->build)) - c->sendMessage(s); - } + http->notifyLog(run->name, run->build, s, false); }).then([p = std::move(exited)]() mutable { // wait until the process is reaped return kj::mv(p); @@ -832,18 +755,8 @@ void Laminar::runFinished(Run * r) { populateArtifacts(j, r->name, r->build); j.EndArray(); j.EndObject(); - const char* msg = j.str(); - for(LaminarClient* c : clients) { - if(c->scope.wantsStatus(r->name, r->build)) - c->sendMessage(msg); - c->notifyJobFinished(); - } - - // notify the waiters - for(LaminarWaiter* w : waiters) { - w->complete(r); - } - + http->notifyEvent("job_completed", j.str(), r->name, r->build); + http->notifyLog(r->name, r->build, "", true); // erase reference to run from activeJobs. Since runFinished is called in a // lambda whose context contains a shared_ptr, the run won't be deleted // until the context is destroyed at the end of the lambda execution. diff --git a/src/laminar.h b/src/laminar.h index 8780d22..988db66 100644 --- a/src/laminar.h +++ b/src/laminar.h @@ -19,13 +19,14 @@ #ifndef LAMINAR_LAMINAR_H_ #define LAMINAR_LAMINAR_H_ -#include "interface.h" #include "run.h" +#include "monitorscope.h" #include "node.h" #include "database.h" #include #include +#include // Node name to node object map typedef std::unordered_map> NodeMap; @@ -33,40 +34,72 @@ typedef std::unordered_map> NodeMap; struct Server; class Json; +class Http; +class Rpc; + +struct Settings { + const char* home; + const char* bind_rpc; + const char* bind_http; + const char* archive_url; +}; + // The main class implementing the application's business logic. -// It owns a Server to manage the HTTP/websocket and Cap'n Proto RPC -// interfaces and communicates via the LaminarInterface methods and -// the LaminarClient objects (see interface.h) -class Laminar final : public LaminarInterface { +class Laminar final { public: - Laminar(const char* homePath); - ~Laminar() noexcept override; - - // Runs the application forever - void run(); - // Call this in a signal handler to make run() return - void stop(); - - // Implementations of LaminarInterface - std::shared_ptr queueJob(std::string name, ParamMap params = ParamMap()) override; - void registerClient(LaminarClient* client) override; - void deregisterClient(LaminarClient* client) override; - void registerWaiter(LaminarWaiter* waiter) override; - void deregisterWaiter(LaminarWaiter* waiter) override; - uint latestRun(std::string job) override; - bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) override; - - void sendStatus(LaminarClient* client) override; - bool setParam(std::string job, uint buildNum, std::string param, std::string value) override; - const std::list>& listQueuedJobs() override; - const RunSet& listRunningJobs() override; - std::list listKnownJobs() override; - kj::Maybe> getArtefact(std::string path) override; - bool handleBadgeRequest(std::string job, std::string& badge) override; - std::string getCustomCss() override; - bool abort(std::string job, uint buildNum) override; - void abortAll() override; - void notifyConfigChanged() override; + Laminar(Server& server, Settings settings); + ~Laminar() noexcept; + + // Queues a job, returns immediately. Return value will be nullptr if + // the supplied name is not a known job. + std::shared_ptr queueJob(std::string name, ParamMap params = ParamMap()); + + // Return the latest known number of the named job + uint latestRun(std::string job); + + // Given a job name and number, return existence and (via reference params) + // its current log output and whether the job is ongoing + bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete); + + // Given a relevant scope, returns a JSON string describing the current + // server status. Content differs depending on the page viewed by the user, + // which should be provided as part of the scope. + std::string getStatus(MonitorScope scope); + + // Implements the laminarc function of setting arbitrary parameters on a run, + // (typically the current run) which will be made available in the environment + // of subsequent scripts. + bool setParam(std::string job, uint buildNum, std::string param, std::string value); + + // Gets the list of jobs currently waiting in the execution queue + const std::list>& listQueuedJobs(); + + // Gets the list of currently executing jobs + const RunSet& listRunningJobs(); + + // Gets the list of known jobs - scans cfg/jobs for *.run files + std::list listKnownJobs(); + + // Fetches the content of an artifact given its filename relative to + // $LAMINAR_HOME/archive. Ideally, this would instead be served by a + // proper web server which handles this url. + kj::Maybe> getArtefact(std::string path); + + // Given the name of a job, populate the provided string reference with + // SVG content describing the last known state of the job. Returns false + // if the job is unknown. + bool handleBadgeRequest(std::string job, std::string& badge); + + // Fetches the content of $LAMINAR_HOME/custom/style.css or an empty + // string. Ideally, this would instead be served by a proper web server + // which handles this url. + std::string getCustomCss(); + + // Aborts a single job + bool abort(std::string job, uint buildNum); + + // Abort all running jobs + void abortAll(); private: bool loadConfiguration(); @@ -89,16 +122,18 @@ private: std::unordered_map> jobTags; + Settings settings; RunSet activeJobs; Database* db; - Server* srv; + Server& srv; NodeMap nodes; kj::Path homePath; kj::Own fsHome; - std::set clients; - std::set waiters; uint numKeepRunDirs; std::string archiveUrl; + + kj::Own http; + kj::Own rpc; }; #endif // LAMINAR_LAMINAR_H_ diff --git a/src/main.cpp b/src/main.cpp index ead42c9..9086761 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -17,17 +17,31 @@ /// along with Laminar. If not, see /// #include "laminar.h" +#include "server.h" #include "log.h" #include #include #include static Laminar* laminar; +static Server* server; static void laminar_quit(int) { - laminar->stop(); + // Abort current jobs. Most of the time this isn't necessary since + // systemd stop or other kill mechanism will send SIGTERM to the whole + // process group. + laminar->abortAll(); + server->stop(); } +namespace { +constexpr const char* INTADDR_RPC_DEFAULT = "unix-abstract:laminar"; +constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080"; +constexpr const char* ARCHIVE_URL_DEFAULT = "/archive/"; +} + + + int main(int argc, char** argv) { for(int i = 1; i < argc; ++i) { if(strcmp(argv[i], "-v") == 0) { @@ -35,15 +49,27 @@ int main(int argc, char** argv) { } } - laminar = new Laminar(getenv("LAMINAR_HOME") ?: "/var/lib/laminar"); + auto ioContext = kj::setupAsyncIo(); + + Settings settings; + // Default values when none were supplied in $LAMINAR_CONF_FILE (/etc/laminar.conf) + settings.home = getenv("LAMINAR_HOME") ?: "/var/lib/laminar"; + settings.bind_rpc = getenv("LAMINAR_BIND_RPC") ?: INTADDR_RPC_DEFAULT; + settings.bind_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT; + settings.archive_url = getenv("LAMINAR_ARCHIVE_URL") ?: ARCHIVE_URL_DEFAULT; + + server = new Server(ioContext); + laminar = new Laminar(*server, settings); + kj::UnixEventPort::captureChildExit(); signal(SIGINT, &laminar_quit); signal(SIGTERM, &laminar_quit); - laminar->run(); + server->start(); delete laminar; + delete server; LLOG(INFO, "Clean exit"); return 0; diff --git a/src/monitorscope.h b/src/monitorscope.h new file mode 100644 index 0000000..2b48850 --- /dev/null +++ b/src/monitorscope.h @@ -0,0 +1,62 @@ +/// +/// Copyright 2015-2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef LAMINAR_MONITORSCOPE_H_ +#define LAMINAR_MONITORSCOPE_H_ + +#include + +// Simple struct to define which information a frontend client is interested +// in, both in initial request phase and real-time updates. It corresponds +// loosely to frontend URLs +struct MonitorScope { + enum Type { + HOME, // home page: recent builds and statistics + ALL, // browse jobs + JOB, // a specific job page + RUN, // a specific run page + }; + + MonitorScope(Type type = HOME, std::string job = std::string(), uint num = 0) : + type(type), + job(job), + num(num), + page(0), + field("number"), + order_desc(true) + {} + + // whether this scope wants status information about the given job or run + bool wantsStatus(std::string ajob, uint anum = 0) const { + if(type == HOME || type == ALL) return true; + if(type == JOB) return ajob == job; + if(type == RUN) return ajob == job && anum == num; + return false; + } + + Type type; + std::string job; + uint num = 0; + // sorting + uint page = 0; + std::string field; + bool order_desc; +}; + +#endif // LAMINAR_MONITORSCOPE_H_ + diff --git a/src/resources/js/app.js b/src/resources/js/app.js index 7ea8667..f7a0a31 100644 --- a/src/resources/js/app.js +++ b/src/resources/js/app.js @@ -22,30 +22,30 @@ const timeScale = function(max){ ? { scale:function(v){return Math.round(v/60)/10}, label:'Minutes' } : { scale:function(v){return v;}, label:'Seconds' }; } - -const WebsocketHandler = function() { - function setupWebsocket(path, next) { - let ws = new WebSocket(document.head.baseURI.replace(/^http/,'ws') + path.substr(1)); - ws.onmessage = function(msg) { +const ServerEventHandler = function() { + function setupEventSource(path, query, next) { + const es = new EventSource(document.head.baseURI + path.substr(1) + query); + es.path = path; // save for later in case we need to add query params + es.onmessage = function(msg) { msg = JSON.parse(msg.data); - // "status" is the first message the websocket always delivers. + // "status" is the first message the server always delivers. // Use this to confirm the navigation. The component is not // created until next() is called, so creating a reference // for other message types must be deferred. There are some extra - // subtle checks here. If this websocket already has a component, + // subtle checks here. If this eventsource already has a component, // then this is not the first time the status message has been // received. If the frontend requests an update, the status message // should not be handled here, but treated the same as any other // message. An exception is if the connection has been lost - in // that case we should treat this as a "first-time" status message. - // this.comp.ws is used as a proxy for this. - if (msg.type === 'status' && (!this.comp || !this.comp.ws)) { + // this.comp.es is used as a proxy for this. + if (msg.type === 'status' && (!this.comp || !this.comp.es)) { next(comp => { // Set up bidirectional reference // 1. needed to reference the component for other msg types this.comp = comp; // 2. needed to close the ws on navigation away - comp.ws = this; + comp.es = this; // Update html and nav titles document.title = comp.$root.title = msg.title; // Calculate clock offset (used by ProgressUpdater) @@ -59,47 +59,35 @@ const WebsocketHandler = function() { if (!this.comp) return console.error("Page component was undefined"); else { + this.comp.$root.connected = true; this.comp.$root.showNotify(msg.type, msg.data); if(typeof this.comp[msg.type] === 'function') this.comp[msg.type](msg.data); } } - }; - ws.onclose = function(ev) { - // if this.comp isn't set, this connection has never been used - // and a re-connection isn't meaningful - if(!ev.wasClean && 'comp' in this) { - this.comp.$root.connected = false; - // remove the reference to the websocket from the component. - // This not only cleans up an unneeded reference but ensures a - // status message on reconnection is treated as "first-time" - delete this.comp.ws; - this.reconnectTimeout = setTimeout(()=>{ - var newWs = setupWebsocket(path, (fn) => { fn(this.comp); }); - // the next() callback won't happen if the server is still - // unreachable. Save the reference to the last component - // here so we can recover if/when it does return. This means - // passing this.comp in the next() callback above is redundant - // but necessary to keep the same implementation. - newWs.comp = this.comp; - }, 2000); - } } - return ws; - }; + es.onerror = function() { + this.comp.$root.connected = false; + } + return es; + } return { beforeRouteEnter(to, from, next) { - setupWebsocket(to.path, (fn) => { next(fn); }); + setupEventSource(to.path, '', (fn) => { next(fn); }); }, beforeRouteUpdate(to, from, next) { - this.ws.close(); - clearTimeout(this.ws.reconnectTimeout); - setupWebsocket(to.path, (fn) => { fn(this); next(); }); + this.es.close(); + setupEventSource(to.path, '', (fn) => { fn(this); next(); }); }, beforeRouteLeave(to, from, next) { - this.ws.close(); - clearTimeout(this.ws.reconnectTimeout); + this.es.close(); next(); + }, + methods: { + query(q) { + this.es.close(); + setupEventSource(this.es.path, '?' + Object.entries(q).map(([k,v])=>`${k}=${v}`).join('&'), (fn) => { fn(this); }); + } } }; }(); @@ -195,7 +183,7 @@ const Home = function() { return { template: '#home', - mixins: [WebsocketHandler, Utils, ProgressUpdater], + mixins: [ServerEventHandler, Utils, ProgressUpdater], data: function() { return state; }, @@ -432,7 +420,7 @@ const Jobs = function() { }; return { template: '#jobs', - mixins: [WebsocketHandler, Utils, ProgressUpdater], + mixins: [ServerEventHandler, Utils, ProgressUpdater], data: function() { return state; }, methods: { status: function(msg) { @@ -536,7 +524,7 @@ var Job = function() { var chtBt = null; return Vue.extend({ template: '#job', - mixins: [WebsocketHandler, Utils, ProgressUpdater], + mixins: [ServerEventHandler, Utils, ProgressUpdater], data: function() { return state; }, @@ -634,11 +622,11 @@ var Job = function() { }, page_next: function() { state.sort.page++; - this.ws.send(JSON.stringify(state.sort)); + this.query(state.sort) }, page_prev: function() { state.sort.page--; - this.ws.send(JSON.stringify(state.sort)); + this.query(state.sort) }, do_sort: function(field) { if(state.sort.field == field) { @@ -647,7 +635,7 @@ var Job = function() { state.sort.order = 'dsc'; state.sort.field = field; } - this.ws.send(JSON.stringify(state.sort)); + this.query(state.sort) } } }); @@ -690,7 +678,7 @@ const Run = function() { return { template: '#run', - mixins: [WebsocketHandler, Utils, ProgressUpdater], + mixins: [ServerEventHandler, Utils, ProgressUpdater], data: function() { return state; }, diff --git a/src/rpc.cpp b/src/rpc.cpp index c7a9a3e..e68b696 100644 --- a/src/rpc.cpp +++ b/src/rpc.cpp @@ -18,8 +18,7 @@ /// #include "rpc.h" #include "laminar.capnp.h" - -#include "interface.h" +#include "laminar.h" #include "log.h" namespace { @@ -38,16 +37,16 @@ LaminarCi::JobResult fromRunState(RunState state) { } // This is the implementation of the Laminar Cap'n Proto RPC interface. // As such, it implements the pure virtual interface generated from -// laminar.capnp with calls to the LaminarInterface -class RpcImpl : public LaminarCi::Server, public LaminarWaiter { +// laminar.capnp with calls to the primary Laminar class +class RpcImpl : public LaminarCi::Server { public: - RpcImpl(LaminarInterface& l) : + RpcImpl(Laminar& l) : LaminarCi::Server(), laminar(l) { } - ~RpcImpl() override { + virtual ~RpcImpl() { } // Queue a job, without waiting for it to start @@ -83,10 +82,9 @@ public: LLOG(INFO, "RPC run", jobName); std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams())); if(Run* r = run.get()) { - runWaiters[r].emplace_back(kj::newPromiseAndFulfiller()); - return runWaiters[r].back().promise.then([context,run](RunState state) mutable { + return r->whenFinished().then([context,r](RunState state) mutable { context.getResults().setResult(fromRunState(state)); - context.getResults().setBuildNum(run->build); + context.getResults().setBuildNum(r->build); }); } else { context.getResults().setResult(LaminarCi::JobResult::UNKNOWN); @@ -164,18 +162,11 @@ private: return res; } - // Implements LaminarWaiter::complete - void complete(const Run* r) override { - for(kj::PromiseFulfillerPair& w : runWaiters[r]) - w.fulfiller->fulfill(RunState(r->result)); - runWaiters.erase(r); - } -private: - LaminarInterface& laminar; + Laminar& laminar; std::unordered_map>> runWaiters; }; -Rpc::Rpc(LaminarInterface& li) : +Rpc::Rpc(Laminar& li) : rpcInterface(kj::heap(li)) {} diff --git a/src/rpc.h b/src/rpc.h index 823980e..bb2d096 100644 --- a/src/rpc.h +++ b/src/rpc.h @@ -23,11 +23,11 @@ #include #include -struct LaminarInterface; +struct Laminar; class Rpc { public: - Rpc(LaminarInterface &li); + Rpc(Laminar&li); kj::Promise accept(kj::Own&& connection); capnp::Capability::Client rpcInterface; diff --git a/src/run.cpp b/src/run.cpp index 4862652..73b917f 100644 --- a/src/run.cpp +++ b/src/run.cpp @@ -51,7 +51,8 @@ Run::Run(std::string name, ParamMap pm, kj::Path&& rootPath) : params(kj::mv(pm)), queuedAt(time(nullptr)), rootPath(kj::mv(rootPath)), - started(kj::newPromiseAndFulfiller()) + started(kj::newPromiseAndFulfiller()), + finished(kj::newPromiseAndFulfiller()) { for(auto it = params.begin(); it != params.end();) { if(it->first[0] == '=') { @@ -261,4 +262,6 @@ void Run::reaped(int status) { else if(status != 0) result = RunState::FAILED; // otherwise preserve earlier status + + finished.fulfiller->fulfill(RunState(result)); } diff --git a/src/run.h b/src/run.h index 79955d8..5fde79e 100644 --- a/src/run.h +++ b/src/run.h @@ -74,6 +74,7 @@ public: std::string reason() const; kj::Promise&& whenStarted() { return kj::mv(started.promise); } + kj::Promise&& whenFinished() { return kj::mv(finished.promise); } std::shared_ptr node; RunState result; @@ -108,6 +109,7 @@ private: std::list env; std::string reasonMsg; kj::PromiseFulfillerPair started; + kj::PromiseFulfillerPair finished; }; diff --git a/src/server.cpp b/src/server.cpp index f98ef4a..0944633 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -17,10 +17,10 @@ /// along with Laminar. If not, see /// #include "server.h" -#include "interface.h" #include "log.h" #include "rpc.h" #include "http.h" +#include "laminar.h" #include #include @@ -35,41 +35,11 @@ // a multiple of sizeof(struct signalfd_siginfo) == 128 #define PROC_IO_BUFSIZE 4096 -Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, - kj::StringPtr httpBindAddress) : - laminarInterface(li), - ioContext(kj::setupAsyncIo()), +Server::Server(kj::AsyncIoContext& io) : + ioContext(io), listeners(kj::heap(*this)), - childTasks(*this), - httpReady(kj::newPromiseAndFulfiller()), - http(kj::heap(li)), - rpc(kj::heap(li)) + childTasks(*this) { - // RPC task - if(rpcBindAddress.startsWith("unix:")) - unlink(rpcBindAddress.slice(strlen("unix:")).cStr()); - listeners->add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress) - .then([this](kj::Own&& addr) { - return acceptRpcClient(addr->listen()); - })); - - // HTTP task - if(httpBindAddress.startsWith("unix:")) - unlink(httpBindAddress.slice(strlen("unix:")).cStr()); - listeners->add(ioContext.provider->getNetwork().parseAddress(httpBindAddress) - .then([this](kj::Own&& addr) { - // TODO: a better way? Currently used only for testing - httpReady.fulfiller->fulfill(); - return http->startServer(ioContext.lowLevelProvider->getTimer(), addr->listen()); - })); - - // handle watched paths - { - inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); - pathWatch = readDescriptor(inotify_fd, [this](const char*, size_t){ - laminarInterface.notifyConfigChanged(); - }); - } } Server::~Server() { @@ -89,15 +59,12 @@ void Server::start() { // Shutdown sequence: // 1. stop accepting new connections listeners = nullptr; - // 2. abort current jobs. Most of the time this isn't necessary since - // systemd stop or other kill mechanism will send SIGTERM to the whole - // process group. - laminarInterface.abortAll(); - // 3. wait for all children to close + // 2. wait for all children to close childTasks.onEmpty().wait(ioContext.waitScope); - // 4. run the loop once more to send any pending output to websocket clients + // TODO not sure the comments below are true + // 3. run the loop once more to send any pending output to http clients ioContext.waitScope.poll(); - // 5. return: websockets will be destructed when class is deleted + // 4. return: http connections will be destructed when class is deleted } void Server::stop() { @@ -126,16 +93,52 @@ kj::Promise Server::onChildExit(kj::Maybe &pid) { return ioContext.unixEventPort.onChildExit(pid); } -void Server::addWatchPath(const char* dpath) { - inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE); +Server::PathWatcher& Server::watchPaths(std::function fn) +{ + struct PathWatcherImpl : public PathWatcher { + PathWatcher& addPath(const char* path) override { + inotify_add_watch(fd, path, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE); + return *this; + } + int fd; + }; + auto pwi = kj::heap(); + PathWatcher* pw = pwi.get(); + + pwi->fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); + listeners->add(readDescriptor(pwi->fd, [fn](const char*, size_t){ + fn(); + }).attach(kj::mv(pwi))); + return *pw; +} + +void Server::listenRpc(Rpc &rpc, kj::StringPtr rpcBindAddress) +{ + if(rpcBindAddress.startsWith("unix:")) + unlink(rpcBindAddress.slice(strlen("unix:")).cStr()); + listeners->add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress) + .then([this,&rpc](kj::Own&& addr) { + return acceptRpcClient(rpc, addr->listen()); + })); + +} + +void Server::listenHttp(Http &http, kj::StringPtr httpBindAddress) +{ + if(httpBindAddress.startsWith("unix:")) + unlink(httpBindAddress.slice(strlen("unix:")).cStr()); + listeners->add(ioContext.provider->getNetwork().parseAddress(httpBindAddress) + .then([this,&http](kj::Own&& addr) { + return http.startServer(ioContext.lowLevelProvider->getTimer(), addr->listen()); + })); } -kj::Promise Server::acceptRpcClient(kj::Own&& listener) { +kj::Promise Server::acceptRpcClient(Rpc& rpc, kj::Own&& listener) { kj::ConnectionReceiver& cr = *listener.get(); return cr.accept().then(kj::mvCapture(kj::mv(listener), - [this](kj::Own&& listener, kj::Own&& connection) { - childTasks.add(rpc->accept(kj::mv(connection))); - return acceptRpcClient(kj::mv(listener)); + [this, &rpc](kj::Own&& listener, kj::Own&& connection) { + addTask(rpc.accept(kj::mv(connection))); + return acceptRpcClient(rpc, kj::mv(listener)); })); } diff --git a/src/server.h b/src/server.h index ccb4a8a..048f388 100644 --- a/src/server.h +++ b/src/server.h @@ -25,19 +25,14 @@ #include #include -struct LaminarInterface; +struct Laminar; struct Http; struct Rpc; -// This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces -// and manages the program's asynchronous event loop +// This class manages the program's asynchronous event loop class Server final : public kj::TaskSet::ErrorHandler { public: - // Initializes the server with a LaminarInterface to handle requests from - // HTTP/Websocket or RPC clients and bind addresses for each of those - // interfaces. See the documentation for kj::AsyncIoProvider::getNetwork - // for a description of the address format - Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, kj::StringPtr httpBindAddress); + Server(kj::AsyncIoContext& ioContext); ~Server(); void start(); void stop(); @@ -52,32 +47,28 @@ public: // get a promise which resolves when a child process exits kj::Promise onChildExit(kj::Maybe& pid); - // add a path to be watched for changes - void addWatchPath(const char* dpath); + + struct PathWatcher { + virtual PathWatcher& addPath(const char* path) = 0; + }; + + PathWatcher& watchPaths(std::function); + + void listenRpc(Rpc& rpc, kj::StringPtr rpcBindAddress); + void listenHttp(Http& http, kj::StringPtr httpBindAddress); private: - kj::Promise acceptRpcClient(kj::Own&& listener); + kj::Promise acceptRpcClient(Rpc& rpc, kj::Own&& listener); kj::Promise handleFdRead(kj::AsyncInputStream* stream, char* buffer, std::function cb); void taskFailed(kj::Exception&& exception) override; private: int efd_quit; - LaminarInterface& laminarInterface; - kj::AsyncIoContext ioContext; + kj::AsyncIoContext& ioContext; kj::Own listeners; kj::TaskSet childTasks; kj::Maybe> reapWatch; - int inotify_fd; - kj::Maybe> pathWatch; - - // TODO: restructure so this isn't necessary - friend class ServerTest; - kj::PromiseFulfillerPair httpReady; - - // TODO: WIP - kj::Own http; - kj::Own rpc; }; #endif // LAMINAR_SERVER_H_ diff --git a/test/eventsource.h b/test/eventsource.h new file mode 100644 index 0000000..c124381 --- /dev/null +++ b/test/eventsource.h @@ -0,0 +1,71 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef LAMINAR_EVENTSOURCE_H_ +#define LAMINAR_EVENTSOURCE_H_ + +#include +#include +#include +#include + +class EventSource { +public: + EventSource(kj::AsyncIoContext& ctx, const char* httpConnectAddr, const char* path) : + networkAddress(ctx.provider->getNetwork().parseAddress(httpConnectAddr).wait(ctx.waitScope)), + httpClient(kj::newHttpClient(ctx.lowLevelProvider->getTimer(), headerTable, *networkAddress)), + headerTable(), + headers(headerTable), + buffer(kj::heapArrayBuilder(BUFFER_SIZE)) + { + headers.add("Accept", "text/event-stream"); + auto resp = httpClient->request(kj::HttpMethod::GET, path, headers).response.wait(ctx.waitScope); + promise = waitForMessages(resp.body.get(), 0).attach(kj::mv(resp)); + } + + const std::vector& messages() { + return receivedMessages; + } + +private: + kj::Own networkAddress; + kj::Own httpClient; + kj::HttpHeaderTable headerTable; + kj::HttpHeaders headers; + kj::ArrayBuilder buffer; + kj::Maybe> promise; + std::vector receivedMessages; + + kj::Promise waitForMessages(kj::AsyncInputStream* stream, ulong offset) { + return stream->read(buffer.asPtr().begin() + offset, 1, BUFFER_SIZE).then([=](size_t s) { + ulong end = offset + s; + buffer.asPtr().begin()[end] = '\0'; + if(strcmp(&buffer.asPtr().begin()[end - 2], "\n\n") == 0) { + rapidjson::Document d; + d.Parse(buffer.begin() + strlen("data: ")); + receivedMessages.emplace_back(kj::mv(d)); + end = 0; + } + return waitForMessages(stream, end); + }); + } + + static const int BUFFER_SIZE = 1024; +}; + +#endif // LAMINAR_EVENTSOURCE_H_ diff --git a/test/laminar-fixture.h b/test/laminar-fixture.h new file mode 100644 index 0000000..015e47c --- /dev/null +++ b/test/laminar-fixture.h @@ -0,0 +1,82 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef LAMINAR_FIXTURE_H_ +#define LAMINAR_FIXTURE_H_ + +#include +#include +#include "laminar.capnp.h" +#include "eventsource.h" +#include "tempdir.h" +#include "laminar.h" +#include "server.h" + +class LaminarFixture : public ::testing::Test { +public: + LaminarFixture() { + bind_rpc = std::string("unix:/") + tmp.path.toString(true).cStr() + "/rpc.sock"; + bind_http = std::string("unix:/") + tmp.path.toString(true).cStr() + "/http.sock"; + home = tmp.path.toString(true).cStr(); + tmp.fs->openSubdir(kj::Path{"cfg", "jobs"}, kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT); + settings.home = home.c_str(); + settings.bind_rpc = bind_rpc.c_str(); + settings.bind_http = bind_http.c_str(); + settings.archive_url = "/test-archive/"; + server = new Server(ioContext); + laminar = new Laminar(*server, settings); + } + ~LaminarFixture() noexcept(true) { + delete server; + delete laminar; + } + + kj::Own eventSource(const char* path) { + return kj::heap(ioContext, bind_http.c_str(), path); + } + + void defineJob(const char* name, const char* scriptContent) { + KJ_IF_MAYBE(f, tmp.fs->tryOpenFile(kj::Path{"cfg", "jobs", std::string(name) + ".run"}, + kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT | kj::WriteMode::EXECUTABLE)) { + (*f)->writeAll(std::string("#!/bin/sh\n") + scriptContent + "\n"); + } + } + + LaminarCi::Client client() { + if(!rpc) { + auto stream = ioContext.provider->getNetwork().parseAddress(bind_rpc).wait(ioContext.waitScope)->connect().wait(ioContext.waitScope); + auto net = kj::heap(*stream, capnp::rpc::twoparty::Side::CLIENT); + rpc = kj::heap>(*net, nullptr).attach(kj::mv(net), kj::mv(stream)); + } + static capnp::word scratch[4]; + memset(scratch, 0, sizeof(scratch)); + auto hostId = capnp::MallocMessageBuilder(scratch).getRoot(); + hostId.setSide(capnp::rpc::twoparty::Side::SERVER); + return rpc->bootstrap(hostId).castAs(); + } + + kj::Own> rpc; + TempDir tmp; + std::string home, bind_rpc, bind_http; + Settings settings; + Server* server; + Laminar* laminar; + static kj::AsyncIoContext ioContext; +}; + +#endif // LAMINAR_FIXTURE_H_ diff --git a/test/laminar-functional.cpp b/test/laminar-functional.cpp new file mode 100644 index 0000000..6afac07 --- /dev/null +++ b/test/laminar-functional.cpp @@ -0,0 +1,75 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include +#include "laminar-fixture.h" + +// TODO: consider handling this differently +kj::AsyncIoContext LaminarFixture::ioContext = kj::setupAsyncIo(); + +TEST_F(LaminarFixture, EmptyStatusMessageStructure) { + auto es = eventSource("/"); + ioContext.waitScope.poll(); + ASSERT_EQ(1, es->messages().size()); + + auto json = es->messages().front().GetObject(); + EXPECT_STREQ("status", json["type"].GetString()); + EXPECT_STREQ("Laminar", json["title"].GetString()); + EXPECT_LT(time(nullptr) - json["time"].GetInt64(), 1); + + auto data = json["data"].GetObject(); + EXPECT_TRUE(data.HasMember("recent")); + EXPECT_TRUE(data.HasMember("running")); + EXPECT_TRUE(data.HasMember("queued")); + EXPECT_TRUE(data.HasMember("executorsTotal")); + EXPECT_TRUE(data.HasMember("executorsBusy")); + EXPECT_TRUE(data.HasMember("buildsPerDay")); + EXPECT_TRUE(data.HasMember("buildsPerJob")); + EXPECT_TRUE(data.HasMember("timePerJob")); + EXPECT_TRUE(data.HasMember("resultChanged")); + EXPECT_TRUE(data.HasMember("lowPassRates")); + EXPECT_TRUE(data.HasMember("buildTimeChanges")); + EXPECT_TRUE(data.HasMember("buildTimeDist")); +} + +TEST_F(LaminarFixture, JobNotifyHomePage) { + defineJob("foo", "true"); + auto es = eventSource("/"); + + auto req = client().runRequest(); + req.setJobName("foo"); + ASSERT_EQ(LaminarCi::JobResult::SUCCESS, req.send().wait(ioContext.waitScope).getResult()); + + // wait for job completed + ioContext.waitScope.poll(); + + ASSERT_EQ(4, es->messages().size()); + + auto job_queued = es->messages().at(1).GetObject(); + EXPECT_STREQ("job_queued", job_queued["type"].GetString()); + EXPECT_STREQ("foo", job_queued["data"]["name"].GetString()); + + auto job_started = es->messages().at(2).GetObject(); + EXPECT_STREQ("job_started", job_started["type"].GetString()); + EXPECT_STREQ("foo", job_started["data"]["name"].GetString()); + + auto job_completed = es->messages().at(3).GetObject(); + EXPECT_STREQ("job_completed", job_completed["type"].GetString()); + EXPECT_STREQ("foo", job_completed["data"]["name"].GetString()); +} + diff --git a/test/main.cpp b/test/main.cpp new file mode 100644 index 0000000..e783abb --- /dev/null +++ b/test/main.cpp @@ -0,0 +1,28 @@ +/// +/// Copyright 2019 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include +#include + +// gtest main supplied in order to call captureChildExit +int main(int argc, char **argv) { + kj::UnixEventPort::captureChildExit(); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/test-laminar.cpp b/test/test-laminar.cpp deleted file mode 100644 index 216d150..0000000 --- a/test/test-laminar.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/// -/// Copyright 2018 Oliver Giles -/// -/// This file is part of Laminar -/// -/// Laminar is free software: you can redistribute it and/or modify -/// it under the terms of the GNU General Public License as published by -/// the Free Software Foundation, either version 3 of the License, or -/// (at your option) any later version. -/// -/// Laminar is distributed in the hope that it will be useful, -/// but WITHOUT ANY WARRANTY; without even the implied warranty of -/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -/// GNU General Public License for more details. -/// -/// You should have received a copy of the GNU General Public License -/// along with Laminar. If not, see -/// -#include -#include -#include -#include "laminar.h" - -class TestLaminarClient : public LaminarClient { -public: - virtual void sendMessage(std::string p) { payload = p; } - std::string payload; -}; - -class LaminarTest : public testing::Test { -protected: - LaminarTest() : - testing::Test(), - laminar("/tmp") - {} - Laminar laminar; -}; - -TEST_F(LaminarTest, StatusMessageContainsTime) { - TestLaminarClient testClient; - laminar.sendStatus(&testClient); - rapidjson::Document d; - d.Parse(testClient.payload.c_str()); - ASSERT_TRUE(d.IsObject()); - ASSERT_TRUE(d.HasMember("time")); - EXPECT_GE(1, d["time"].GetInt() - time(nullptr)); -} diff --git a/test/test-server.cpp b/test/test-server.cpp deleted file mode 100644 index c5d365c..0000000 --- a/test/test-server.cpp +++ /dev/null @@ -1,138 +0,0 @@ -/// -/// Copyright 2018 Oliver Giles -/// -/// This file is part of Laminar -/// -/// Laminar is free software: you can redistribute it and/or modify -/// it under the terms of the GNU General Public License as published by -/// the Free Software Foundation, either version 3 of the License, or -/// (at your option) any later version. -/// -/// Laminar is distributed in the hope that it will be useful, -/// but WITHOUT ANY WARRANTY; without even the implied warranty of -/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -/// GNU General Public License for more details. -/// -/// You should have received a copy of the GNU General Public License -/// along with Laminar. If not, see -/// -#include -#include -#include -#include -#include "server.h" -#include "log.h" -#include "interface.h" -#include "laminar.capnp.h" -#include "tempdir.h" -#include "rpc.h" - -class MockLaminar : public LaminarInterface { -public: - LaminarClient* client = nullptr; - ~MockLaminar() {} - virtual void registerClient(LaminarClient* c) override { - ASSERT_EQ(nullptr, client); - client = c; - EXPECT_CALL(*this, sendStatus(client)).Times(testing::Exactly(1)); - } - - virtual void deregisterClient(LaminarClient* c) override { - ASSERT_EQ(client, c); - client = nullptr; - } - - // MOCK_METHOD does not seem to work with return values whose destructors have noexcept(false) - kj::Maybe> getArtefact(std::string path) override { return nullptr; } - - MOCK_METHOD2(queueJob, std::shared_ptr(std::string name, ParamMap params)); - MOCK_METHOD1(registerWaiter, void(LaminarWaiter* waiter)); - MOCK_METHOD1(deregisterWaiter, void(LaminarWaiter* waiter)); - MOCK_METHOD1(latestRun, uint(std::string)); - MOCK_METHOD4(handleLogRequest, bool(std::string, uint, std::string&, bool&)); - - MOCK_METHOD1(sendStatus, void(LaminarClient* client)); - MOCK_METHOD4(setParam, bool(std::string job, uint buildNum, std::string param, std::string value)); - MOCK_METHOD0(listQueuedJobs, const std::list>&()); - MOCK_METHOD0(listRunningJobs, const RunSet&()); - MOCK_METHOD0(listKnownJobs, std::list()); - - MOCK_METHOD0(getCustomCss, std::string()); - MOCK_METHOD2(handleBadgeRequest, bool(std::string, std::string&)); - MOCK_METHOD2(abort, bool(std::string, uint)); - MOCK_METHOD0(abortAll, void()); - MOCK_METHOD0(notifyConfigChanged, void()); -}; - -class ServerTest : public ::testing::Test { -protected: - void SetUp() override { - EXPECT_CALL(mockLaminar, registerWaiter(testing::_)); - EXPECT_CALL(mockLaminar, deregisterWaiter(testing::_)); - server = new Server(mockLaminar, "unix:"+std::string(tempDir.path.append("rpc.sock").toString(true).cStr()), "127.0.0.1:8080"); - } - void TearDown() override { - delete server; - } - - LaminarCi::Client client() const { - return server->rpc->rpcInterface.castAs(); - } - kj::WaitScope& ws() const { - return server->ioContext.waitScope; - } - void waitForHttpReady() { - server->httpReady.promise.wait(server->ioContext.waitScope); - } - - kj::Network& network() { return server->ioContext.provider->getNetwork(); } - TempDir tempDir; - MockLaminar mockLaminar; - Server* server; -}; - -TEST_F(ServerTest, RpcQueue) { - auto req = client().queueRequest(); - req.setJobName("foo"); - EXPECT_CALL(mockLaminar, queueJob("foo", ParamMap())).Times(testing::Exactly(1)); - req.send().wait(ws()); -} - -// Tests that agressively closed websockets are properly removed -// and will not be attempted to be contacted again -TEST_F(ServerTest, HttpWebsocketRST) { - waitForHttpReady(); - - // TODO: generalize - constexpr const char* WS = - "GET / HTTP/1.1\r\n" - "Host: localhost:8080\r\n" - "Connection: Upgrade\r\n" - "Upgrade: websocket\r\n" - "Sec-WebSocket-Key: GTFmrUCM9N6B32LdDE3Rzw==\r\n" - "Sec-WebSocket-Version: 13\r\n\r\n"; - - static char buffer[256]; - network().parseAddress("localhost:8080").then([this](kj::Own&& addr){ - return addr->connect().attach(kj::mv(addr)).then([this](kj::Own&& stream){ - kj::AsyncIoStream* s = stream.get(); - return s->write(WS, strlen(WS)).then([this,s](){ - // Read the websocket header response, ensure the client has been registered - return s->tryRead(buffer, 64, 256).then([this,s](size_t sz){ - EXPECT_LE(64, sz); - EXPECT_NE(nullptr, mockLaminar.client); - // agressively abort the connection - struct linger so_linger; - so_linger.l_onoff = 1; - so_linger.l_linger = 0; - s->setsockopt(SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); - return kj::Promise(kj::READY_NOW); - }); - }).attach(kj::mv(stream)); - }); - }).wait(ws()); - ws().poll(); - // Expect that the client has been cleared. If it has not, Laminar could - // try to write to the closed file descriptor, causing an exception - EXPECT_EQ(nullptr, mockLaminar.client); -} diff --git a/test/test-conf.cpp b/test/unit-conf.cpp similarity index 100% rename from test/test-conf.cpp rename to test/unit-conf.cpp diff --git a/test/test-database.cpp b/test/unit-database.cpp similarity index 100% rename from test/test-database.cpp rename to test/unit-database.cpp diff --git a/test/test-run.cpp b/test/unit-run.cpp similarity index 100% rename from test/test-run.cpp rename to test/unit-run.cpp