replace websockets with sse and refactor

Large refactor that more closely aligns the codebase to the kj async
style, more clearly exposes an interface for functional testing and
removes cruft. There is a slight increase in coupling between the
Laminar and Http/Rpc classes, but this was always an issue, just until
now more obscured by the arbitrary pure virtual LaminarInterface class
(which has been removed in this change) and the previous lumping
together of all the async stuff in the Server class (which is now
more spread around the code according to function).

This change replaces the use of Websockets with Server Side Events
(SSE). They are simpler and more suitable for the publish-style messages
used by Laminar, and typically require less configuration of the
reverse proxy HTTP server.

Use of gmock is also removed, which eases testing in certain envs.

Resolves #90.
feature/contexts
Oliver Giles 5 years ago
parent 4a07e24da3
commit 39ca7e86cf

@ -85,7 +85,7 @@ generate_compressed_bins(${CMAKE_BINARY_DIR} js/vue-router.min.js js/vue.min.js
js/ansi_up.js js/Chart.min.js css/bootstrap.min.css)
# (see resources.cpp where these are fetched)
set(LAMINARD_SOURCES
set(LAMINARD_CORE_SOURCES
src/database.cpp
src/server.cpp
src/laminar.cpp
@ -94,10 +94,12 @@ set(LAMINARD_SOURCES
src/resources.cpp
src/rpc.cpp
src/run.cpp
laminar.capnp.c++
index_html_size.h
)
## Server
add_executable(laminard ${LAMINARD_SOURCES} src/main.cpp laminar.capnp.c++ ${COMPRESSED_BINS} index_html_size.h)
add_executable(laminard ${LAMINARD_CORE_SOURCES} src/main.cpp ${COMPRESSED_BINS})
target_link_libraries(laminard capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z)
## Client
@ -109,8 +111,8 @@ set(BUILD_TESTS FALSE CACHE BOOL "Build tests")
if(BUILD_TESTS)
find_package(GTest REQUIRED)
include_directories(${GTEST_INCLUDE_DIRS} src)
add_executable(laminar-tests ${LAMINARD_SOURCES} laminar.capnp.c++ src/resources.cpp ${COMPRESSED_BINS} test/test-conf.cpp test/test-database.cpp test/test-laminar.cpp test/test-run.cpp test/test-server.cpp)
target_link_libraries(laminar-tests ${GTEST_BOTH_LIBRARIES} gmock capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z)
add_executable(laminar-tests ${LAMINARD_CORE_SOURCES} ${COMPRESSED_BINS} test/main.cpp test/laminar-functional.cpp test/unit-conf.cpp test/unit-database.cpp test/unit-run.cpp)
target_link_libraries(laminar-tests ${GTEST_LIBRARY} capnp-rpc capnp kj-http kj-async kj pthread sqlite3 z)
endif()
set(SYSTEMD_UNITDIR lib/systemd/system CACHE PATH "Path to systemd unit files")

@ -88,13 +88,11 @@ Do not attempt to run laminar on port 80. This requires running as `root`, and L
## Running behind a reverse proxy
Laminar relies on WebSockets to provide a responsive, auto-updating display without polling. This may require extra support from your frontend webserver.
A reverse proxy is required if you want Laminar to share a port with other web services. It is also recommended to improve performance by serving artefacts directly or providing a caching layer for static assets.
For nginx, see [NGINX Reverse Proxy](https://www.nginx.com/resources/admin-guide/reverse-proxy/) and [WebSocket proxying](http://nginx.org/en/docs/http/websocket.html).
If you use [artefacts](#Archiving-artefacts), note that Laminar is not designed as a file server, and better performance will be achieved by allowing the frontend web server to serve the archive directory directly (e.g. using a `Location` directive).
For Apache, see [Apache Reverse Proxy](https://httpd.apache.org/docs/2.4/howto/reverse_proxy.html) and [mod_proxy_wstunnel](https://httpd.apache.org/docs/2.4/mod/mod_proxy_wstunnel.html).
If you use [artefacts](#Archiving-artefacts), note that Laminar is not designed as a file server, and better performance will be achieved by allowing the frontend web server to directly serve the archive directory directly (e.g. using a `Location` directive).
Laminar uses Sever Sent Events to provide a responsive, auto-updating display without polling. Most frontend webservers should handle this without any extra configuration.
If you use a reverse proxy to host Laminar at a subfolder instead of a subdomain root, the `<base href>` needs to be updated to ensure all links point to their proper targets. This can be done by setting `LAMINAR_BASE_URL` in `/etc/laminar.conf`.

@ -16,291 +16,290 @@
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "interface.h"
#include "http.h"
#include "resources.h"
#include "monitorscope.h"
#include "log.h"
#include <kj/compat/http.h>
#include <rapidjson/document.h>
// This is the implementation of the HTTP/Websocket interface. It creates
// websocket connections as LaminarClients and registers them with the
// LaminarInterface so that status messages will be delivered to the client.
// On opening a websocket connection, it delivers a status snapshot message
// (see LaminarInterface::sendStatus)
class HttpImpl : public kj::HttpService {
public:
HttpImpl(LaminarInterface& laminar, kj::HttpHeaderTable&tbl) :
laminar(laminar),
responseHeaders(tbl)
{}
virtual ~HttpImpl() {}
#include "laminar.h"
// Helper class which wraps another class with calls to
// adding and removing a pointer to itself from a passed
// std::set reference. Used to keep track of currently
// connected clients
template<typename T, typename ...Args>
struct WithSetRef : public T {
WithSetRef(std::set<T*>& set, Args&& ...args) :
T(std::forward(args)...),
_set(set)
{
_set.insert(this);
}
~WithSetRef() {
_set.erase(this);
}
private:
class HttpChunkedClient : public LaminarClient {
public:
HttpChunkedClient(LaminarInterface& laminar) :
laminar(laminar)
{}
~HttpChunkedClient() override {
laminar.deregisterClient(this);
}
void sendMessage(std::string payload) override {
chunks.push_back(kj::mv(payload));
fulfiller->fulfill();
}
void notifyJobFinished() override {
done = true;
fulfiller->fulfill();
}
LaminarInterface& laminar;
std::list<std::string> chunks;
// cannot use chunks.empty() because multiple fulfill()s
// could be coalesced
bool done = false;
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
};
std::set<T*>& _set;
};
// Implements LaminarClient and holds the Websocket connection object.
// Automatically destructed when the promise created in request() resolves
// or is cancelled
class WebsocketClient : public LaminarClient {
public:
WebsocketClient(LaminarInterface& laminar, kj::Own<kj::WebSocket>&& ws) :
laminar(laminar),
ws(kj::mv(ws))
{}
~WebsocketClient() override {
laminar.deregisterClient(this);
}
virtual void sendMessage(std::string payload) override {
messages.emplace_back(kj::mv(payload));
// sendMessage might be called several times before the event loop
// gets a chance to act on the fulfiller. So store the payload here
// where it can be fetched later and don't pass the payload with the
// fulfiller because subsequent calls to fulfill() are ignored.
fulfiller->fulfill();
}
LaminarInterface& laminar;
kj::Own<kj::WebSocket> ws;
std::list<std::string> messages;
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
};
struct EventPeer {
MonitorScope scope;
std::list<std::string> pendingOutput;
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
};
kj::Promise<void> websocketRead(WebsocketClient& lc)
{
return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) {
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(str, kj::String) {
rapidjson::Document d;
d.ParseInsitu(const_cast<char*>(str.cStr()));
if(d.HasMember("page") && d["page"].IsInt() && d.HasMember("field") && d["field"].IsString() && d.HasMember("order") && d["order"].IsString()) {
lc.scope.page = d["page"].GetInt();
lc.scope.field = d["field"].GetString();
lc.scope.order_desc = strcmp(d["order"].GetString(), "dsc") == 0;
laminar.sendStatus(&lc);
return websocketRead(lc);
}
}
KJ_CASE_ONEOF(close, kj::WebSocket::Close) {
// clean socket shutdown
return lc.ws->close(close.code, close.reason);
}
KJ_CASE_ONEOF_DEFAULT {}
struct LogWatcher {
std::string job;
uint run;
std::list<std::string> pendingOutput;
kj::Own<kj::PromiseFulfiller<bool>> fulfiller;
};
kj::Maybe<MonitorScope> fromUrl(std::string resource, char* query) {
MonitorScope scope;
if(query) {
char *sk;
for(char* k = strtok_r(query, "&", &sk); k; k = strtok_r(nullptr, "&", &sk)) {
if(char* v = strchr(k, '=')) {
*v++ = '\0';
if(strcmp(k, "page") == 0)
scope.page = atoi(v);
else if(strcmp(k, "field") == 0)
scope.field = v;
else if(strcmp(k, "order") == 0)
scope.order_desc = (strcmp(v, "dsc") == 0);
}
// unhandled/unknown message
return lc.ws->disconnect();
}, [](kj::Exception&& e){
// server logs suggest early catching here avoids fatal exception later
// TODO: reproduce in unit test
LLOG(WARNING, e.getDescription());
return kj::READY_NOW;
});
}
}
kj::Promise<void> websocketWrite(WebsocketClient& lc)
{
auto paf = kj::newPromiseAndFulfiller<void>();
lc.fulfiller = kj::mv(paf.fulfiller);
return paf.promise.then([this,&lc]{
kj::Promise<void> p = kj::READY_NOW;
std::list<std::string> messages = kj::mv(lc.messages);
for(std::string& s : messages) {
p = p.then([&s,&lc]{
kj::String str = kj::str(s);
return lc.ws->send(str).attach(kj::mv(str));
});
}
return p.attach(kj::mv(messages)).then([this,&lc]{
return websocketWrite(lc);
});
});
if(resource == "/") {
scope.type = MonitorScope::HOME;
return kj::mv(scope);
}
kj::Promise<void> websocketUpgraded(WebsocketClient& lc, std::string resource) {
// convert the requested URL to a MonitorScope
if(resource.substr(0, 5) == "/jobs") {
if(resource.length() == 5) {
lc.scope.type = MonitorScope::ALL;
} else {
resource = resource.substr(5);
size_t split = resource.find('/',1);
std::string job = resource.substr(1,split-1);
if(!job.empty()) {
lc.scope.job = job;
lc.scope.type = MonitorScope::JOB;
}
if(split != std::string::npos) {
size_t split2 = resource.find('/', split+1);
std::string run = resource.substr(split+1, split2-split);
if(!run.empty()) {
lc.scope.num = static_cast<uint>(atoi(run.c_str()));
lc.scope.type = MonitorScope::RUN;
}
if(split2 != std::string::npos && resource.compare(split2, 4, "/log") == 0) {
lc.scope.type = MonitorScope::LOG;
}
}
}
}
laminar.registerClient(&lc);
kj::Promise<void> connection = websocketRead(lc).exclusiveJoin(websocketWrite(lc));
// registerClient can happen after a successful websocket handshake.
// However, the connection might not be closed gracefully, so the
// corresponding deregister operation happens in the WebsocketClient
// destructor rather than the close handler or a then() clause
laminar.sendStatus(&lc);
return connection;
if(resource.substr(0, 5) != "/jobs")
return nullptr;
if(resource.length() == 5) {
scope.type = MonitorScope::ALL;
return kj::mv(scope);
}
// Parses the url of the form /log/NAME/NUMBER, filling in the passed
// references and returning true if successful. /log/NAME/latest is
// also allowed, in which case the num reference is set to 0
bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) {
if(url.startsWith("/log/")) {
kj::StringPtr path = url.slice(5);
KJ_IF_MAYBE(sep, path.findFirst('/')) {
name = path.slice(0, *sep).begin();
kj::StringPtr tail = path.slice(*sep+1);
num = static_cast<uint>(atoi(tail.begin()));
name.erase(*sep);
if(tail == "latest")
num = laminar.latestRun(name);
if(num > 0)
return true;
}
resource = resource.substr(5);
size_t split = resource.find('/',1);
std::string job = resource.substr(1,split-1);
if(job.empty())
return nullptr;
scope.job = job;
scope.type = MonitorScope::JOB;
if(split == std::string::npos)
return kj::mv(scope);
size_t split2 = resource.find('/', split+1);
std::string run = resource.substr(split+1, split2-split);
if(run.empty())
return nullptr;
scope.num = static_cast<uint>(atoi(run.c_str()));
scope.type = MonitorScope::RUN;
return kj::mv(scope);
}
// Parses the url of the form /log/NAME/NUMBER, filling in the passed
// references and returning true if successful. /log/NAME/latest is
// also allowed, in which case the num reference is set to 0
bool Http::parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) {
if(url.startsWith("/log/")) {
kj::StringPtr path = url.slice(5);
KJ_IF_MAYBE(sep, path.findFirst('/')) {
name = path.slice(0, *sep).begin();
kj::StringPtr tail = path.slice(*sep+1);
num = static_cast<uint>(atoi(tail.begin()));
name.erase(*sep);
if(tail == "latest")
num = laminar.latestRun(name);
if(num > 0)
return true;
}
return false;
}
return false;
}
kj::Promise<void> writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) {
auto paf = kj::newPromiseAndFulfiller<void>();
client->fulfiller = kj::mv(paf.fulfiller);
return paf.promise.then([=]{
kj::Promise<void> p = kj::READY_NOW;
std::list<std::string> chunks = kj::mv(client->chunks);
for(std::string& s : chunks) {
p = p.then([=,&s]{
return stream->write(s.data(), s.size());
});
}
return p.attach(kj::mv(chunks)).then([=]{
return client->done ? kj::Promise<void>(kj::READY_NOW) : writeLogChunk(client, stream);
kj::Promise<void> Http::cleanupPeers(kj::Timer& timer)
{
return timer.afterDelay(15 * kj::SECONDS).then([&]{
for(EventPeer* p : eventPeers) {
// an empty SSE message is a colon followed by two newlines
p->pendingOutput.push_back(":\n\n");
p->fulfiller->fulfill();
}
return cleanupPeers(timer);
}).eagerlyEvaluate(nullptr);
}
kj::Promise<void> writeEvents(EventPeer* peer, kj::AsyncOutputStream* stream) {
auto paf = kj::newPromiseAndFulfiller<void>();
peer->fulfiller = kj::mv(paf.fulfiller);
return paf.promise.then([=]{
kj::Promise<void> p = kj::READY_NOW;
std::list<std::string> chunks = kj::mv(peer->pendingOutput);
for(std::string& s : chunks) {
p = p.then([=,&s]{
return stream->write(s.data(), s.size());
});
}
return p.attach(kj::mv(chunks)).then([=]{
return writeEvents(peer, stream);
});
});
}
kj::Promise<void> writeLogChunk(LogWatcher* client, kj::AsyncOutputStream* stream) {
auto paf = kj::newPromiseAndFulfiller<bool>();
client->fulfiller = kj::mv(paf.fulfiller);
return paf.promise.then([=](bool done){
kj::Promise<void> p = kj::READY_NOW;
std::list<std::string> chunks = kj::mv(client->pendingOutput);
for(std::string& s : chunks) {
p = p.then([=,&s]{
return stream->write(s.data(), s.size());
});
}
return p.attach(kj::mv(chunks)).then([=]{
return done ? kj::Promise<void>(kj::READY_NOW) : writeLogChunk(client, stream);
});
});
}
kj::Promise<void> Http::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders &headers, kj::AsyncInputStream &requestBody, HttpService::Response &response)
{
const char* start, *end, *content_type;
std::string badge;
// for log requests
std::string name;
uint num;
kj::HttpHeaders responseHeaders(*headerTable);
responseHeaders.clear();
bool is_sse = false;
char* queryString = nullptr;
// Clients usually expect that http servers will ignore unknown query parameters,
// and expect to use this feature to work around browser limitations like there
// being no way to programatically force a resource to be reloaded from the server
// (without "Cache-Control: no-store", which is overkill). See issue #89.
// So first parse any query parameters we *are* interested in, then simply remove
// them from the URL, to make comparisions easier.
KJ_IF_MAYBE(queryIdx, url.findFirst('?')) {
const_cast<char*>(url.begin())[*queryIdx] = '\0';
queryString = const_cast<char*>(url.begin() + *queryIdx + 1);
url = url.begin();
}
virtual kj::Promise<void> request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override
{
if(headers.isWebSocket()) {
responseHeaders.clear();
kj::Own<WebsocketClient> lc = kj::heap<WebsocketClient>(laminar, response.acceptWebSocket(responseHeaders));
return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc));
} else {
// handle regular HTTP request
const char* start, *end, *content_type;
std::string badge;
// for log requests
std::string name;
uint num;
responseHeaders.clear();
// Clients usually expect that http servers will ignore unknown query parameters,
// and expect to use this feature to work around browser limitations like there
// being no way to programatically force a resource to be reloaded from the server
// (without "Cache-Control: no-store", which is overkill). See issue #89.
// Since we currently don't handle any query parameters at all, the easiest way
// to achieve this is unconditionally remove all query parameters from the request.
// This will need to be redone if we ever accept query parameters, which probably
// will happen as part of issue #90.
KJ_IF_MAYBE(queryIdx, url.findFirst('?')) {
const_cast<char*>(url.begin())[*queryIdx] = '\0';
url = url.begin();
}
if(url.startsWith("/archive/")) {
KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) {
auto array = (*file)->mmap(0, (*file)->stat().size);
responseHeaders.add("Content-Transfer-Encoding", "binary");
auto stream = response.send(200, "OK", responseHeaders, array.size());
return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream));
}
} else if(parseLogEndpoint(url, name, num)) {
kj::Own<HttpChunkedClient> cc = kj::heap<HttpChunkedClient>(laminar);
cc->scope.job = name;
cc->scope.num = num;
bool complete;
std::string output;
cc->scope.type = MonitorScope::LOG;
if(laminar.handleLogRequest(name, num, output, complete)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8");
responseHeaders.add("Content-Transfer-Encoding", "binary");
// Disables nginx reverse-proxy's buffering. Necessary for dynamic log output.
responseHeaders.add("X-Accel-Buffering", "no");
auto stream = response.send(200, "OK", responseHeaders, nullptr);
laminar.registerClient(cc.get());
return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{
if(complete)
return kj::Promise<void>(kj::READY_NOW);
return writeLogChunk(c, s);
}).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc));
}
} else if(url == "/custom/style.css") {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8");
responseHeaders.add("Content-Transfer-Encoding", "binary");
std::string css = laminar.getCustomCss();
auto stream = response.send(200, "OK", responseHeaders, css.size());
return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream));
} else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type);
responseHeaders.add("Content-Encoding", "gzip");
responseHeaders.add("Content-Transfer-Encoding", "binary");
auto stream = response.send(200, "OK", responseHeaders, end-start);
return stream->write(start, end-start).attach(kj::mv(stream));
} else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml");
responseHeaders.add("Cache-Control", "no-cache");
auto stream = response.send(200, "OK", responseHeaders, badge.size());
return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream));
}
return response.sendError(404, "Not Found", responseHeaders);
KJ_IF_MAYBE(accept, headers.get(ACCEPT)) {
is_sse = (*accept == "text/event-stream");
}
if(is_sse) {
KJ_IF_MAYBE(s, fromUrl(url.cStr(), queryString)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/event-stream");
auto peer = kj::heap<WithSetRef<EventPeer>>(eventPeers);
peer->scope = *s;
std::string st = "data: " + laminar.getStatus(peer->scope) + "\n\n";
auto stream = response.send(200, "OK", responseHeaders);
return stream->write(st.data(), st.size()).attach(kj::mv(st)).then([=,s=stream.get(),p=peer.get()]{
return writeEvents(p,s);
}).attach(kj::mv(stream)).attach(kj::mv(peer));
}
} else if(url.startsWith("/archive/")) {
KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) {
auto array = (*file)->mmap(0, (*file)->stat().size);
responseHeaders.add("Content-Transfer-Encoding", "binary");
auto stream = response.send(200, "OK", responseHeaders, array.size());
return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream));
}
} else if(parseLogEndpoint(url, name, num)) {
auto lw = kj::heap<WithSetRef<LogWatcher>>(logWatchers);
lw->job = name;
lw->run = num;
bool complete;
std::string output;
if(laminar.handleLogRequest(name, num, output, complete)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8");
responseHeaders.add("Content-Transfer-Encoding", "binary");
// Disables nginx reverse-proxy's buffering. Necessary for dynamic log output.
responseHeaders.add("X-Accel-Buffering", "no");
auto stream = response.send(200, "OK", responseHeaders, nullptr);
return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=lw.get()]{
if(complete)
return kj::Promise<void>(kj::READY_NOW);
return writeLogChunk(c, s);
}).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(lw));
}
} else if(url == "/custom/style.css") {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8");
responseHeaders.add("Content-Transfer-Encoding", "binary");
std::string css = laminar.getCustomCss();
auto stream = response.send(200, "OK", responseHeaders, css.size());
return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream));
} else if(resources->handleRequest(url.cStr(), &start, &end, &content_type)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type);
responseHeaders.add("Content-Encoding", "gzip");
responseHeaders.add("Content-Transfer-Encoding", "binary");
auto stream = response.send(200, "OK", responseHeaders, end-start);
return stream->write(start, end-start).attach(kj::mv(stream));
} else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml");
responseHeaders.add("Cache-Control", "no-cache");
auto stream = response.send(200, "OK", responseHeaders, badge.size());
return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream));
}
return response.sendError(404, "Not Found", responseHeaders);
}
LaminarInterface& laminar;
Resources resources;
kj::HttpHeaders responseHeaders;
};
Http::Http(Laminar &li) :
laminar(li),
resources(kj::heap<Resources>())
{
kj::HttpHeaderTable::Builder builder;
ACCEPT = builder.add("Accept");
headerTable = builder.build();
}
Http::~Http()
{
KJ_ASSERT(logWatchers.size() == 0);
KJ_ASSERT(eventPeers.size() == 0);
}
Http::Http(LaminarInterface &li) :
headerTable(kj::heap<kj::HttpHeaderTable>()),
httpService(kj::heap<HttpImpl>(li, *headerTable)),
laminar(li)
kj::Promise<void> Http::startServer(kj::Timer& timer, kj::Own<kj::ConnectionReceiver>&& listener)
{
kj::Own<kj::HttpServer> server = kj::heap<kj::HttpServer>(timer, *headerTable, *this);
return server->listenHttp(*listener).attach(cleanupPeers(timer)).attach(kj::mv(listener)).attach(kj::mv(server));
}
void Http::notifyEvent(const char *type, const char *data, std::string job, uint run)
{
for(EventPeer* c : eventPeers) {
if(c->scope.wantsStatus(job, run)
// The run page also should know that another job has started
// (so maybe it can show a previously hidden "next" button).
// Hence this small hack:
// TODO obviate
|| (std::string(type)=="job_started" && c->scope.type == MonitorScope::Type::RUN && c->scope.job == job))
{
c->pendingOutput.push_back("data: " + std::string(data) + "\n\n");
c->fulfiller->fulfill();
}
}
}
kj::Promise<void> Http::startServer(kj::Timer& timer, kj::Own<kj::ConnectionReceiver>&& listener) {
auto httpServer = kj::heap<kj::HttpServer>(timer, *headerTable, *httpService);
return httpServer->listenHttp(*listener).attach(kj::mv(listener)).attach(kj::mv(httpServer));
void Http::notifyLog(std::string job, uint run, std::string log_chunk, bool eot)
{
for(LogWatcher* lw : logWatchers) {
if(lw->job == job && lw->run == run) {
lw->pendingOutput.push_back(kj::mv(log_chunk));
lw->fulfiller->fulfill(kj::mv(eot));
}
}
}

@ -19,18 +19,48 @@
#ifndef LAMINAR_HTTP_H_
#define LAMINAR_HTTP_H_
#include <kj/memory.h>
#include <kj/compat/http.h>
#include <string>
#include <set>
struct LaminarInterface;
// Definition needed for musl
typedef unsigned int uint;
typedef unsigned long ulong;
class Http {
struct Laminar;
struct Resources;
struct LogWatcher;
struct EventPeer;
class Http : public kj::HttpService {
public:
Http(LaminarInterface &li);
kj::Promise<void> startServer(kj::Timer &timer, kj::Own<kj::ConnectionReceiver>&& listener);
Http(Laminar&li);
virtual ~Http();
kj::Promise<void> startServer(kj::Timer &timer, kj::Own<kj::ConnectionReceiver> &&listener);
void notifyEvent(const char* type, const char* data, std::string job = nullptr, uint run = 0);
void notifyLog(std::string job, uint run, std::string log_chunk, bool eot);
private:
virtual kj::Promise<void> request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override;
bool parseLogEndpoint(kj::StringPtr url, std::string &name, uint &num);
// With SSE, there is no notification if a client disappears. Also, an idle
// client must be kept alive if there is no activity in their MonitorScope.
// Deal with these by sending a periodic keepalive and reaping the client if
// the write fails.
kj::Promise<void> cleanupPeers(kj::Timer &timer);
Laminar& laminar;
std::set<EventPeer*> eventPeers;
kj::Own<kj::HttpHeaderTable> headerTable;
kj::Own<kj::HttpService> httpService;
LaminarInterface& laminar;
kj::Own<Resources> resources;
std::set<LogWatcher*> logWatchers;
kj::HttpHeaderId ACCEPT;
};
#endif //LAMINAR_HTTP_H_

@ -1,175 +0,0 @@
///
/// Copyright 2015-2019 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef LAMINAR_INTERFACE_H_
#define LAMINAR_INTERFACE_H_
#include "run.h"
#include <string>
#include <memory>
#include <unordered_map>
// Simple struct to define which information a frontend client is interested
// in, both in initial request phase and real-time updates. It corresponds
// loosely to frontend URLs
struct MonitorScope {
enum Type {
HOME, // home page: recent builds and statistics
ALL, // browse jobs
JOB, // a specific job page
RUN, // a specific run page
LOG // a run's log page
};
MonitorScope(Type type = HOME, std::string job = std::string(), uint num = 0) :
type(type),
job(job),
num(num),
page(0),
field("number"),
order_desc(true)
{}
// whether this scope wants status information about the given job or run
bool wantsStatus(std::string ajob, uint anum = 0) const {
if(type == HOME || type == ALL) return true;
if(type == JOB) return ajob == job;
if(type == RUN) return ajob == job && anum == num;
return false;
}
bool wantsLog(std::string ajob, uint anum) const {
return type == LOG && ajob == job && anum == num;
}
Type type;
std::string job;
uint num = 0;
// sorting
uint page = 0;
std::string field;
bool order_desc;
};
// Represents a (websocket) client that wants to be notified about events
// matching the supplied scope. Pass instances of this to LaminarInterface
// registerClient and deregisterClient
struct LaminarClient {
virtual ~LaminarClient() noexcept(false) {}
virtual void sendMessage(std::string payload) = 0;
// TODO: redesign
virtual void notifyJobFinished() {}
MonitorScope scope;
};
// Represents a (rpc) client that wants to be notified about run completion.
// Pass instances of this to LaminarInterface registerWaiter and
// deregisterWaiter
struct LaminarWaiter {
virtual ~LaminarWaiter() =default;
virtual void complete(const Run*) = 0;
};
// Represents a file mapped in memory. Used to serve artefacts
struct MappedFile {
virtual ~MappedFile() =default;
virtual const void* address() = 0;
virtual size_t size() = 0;
};
// The interface connecting the network layer to the application business
// logic. These methods fulfil the requirements of both the HTTP/Websocket
// and RPC interfaces.
struct LaminarInterface {
virtual ~LaminarInterface() {}
// Queues a job, returns immediately. Return value will be nullptr if
// the supplied name is not a known job.
virtual std::shared_ptr<Run> queueJob(std::string name, ParamMap params = ParamMap()) = 0;
// Register a client (but don't give up ownership). The client will be
// notified with a JSON message of any events matching its scope
// (see LaminarClient and MonitorScope above)
virtual void registerClient(LaminarClient* client) = 0;
// Call this before destroying a client so that Laminar doesn't try
// to call LaminarClient::sendMessage on invalid data
virtual void deregisterClient(LaminarClient* client) = 0;
// Register a waiter (but don't give up ownership). The waiter will be
// notified with a callback of any run completion (see LaminarWaiter above)
virtual void registerWaiter(LaminarWaiter* waiter) = 0;
// Call this before destroying a waiter so that Laminar doesn't try
// to call LaminarWaiter::complete on invalid data
virtual void deregisterWaiter(LaminarWaiter* waiter) = 0;
// Return the latest known number of the named job
virtual uint latestRun(std::string job) = 0;
// Given a job name and number, return existence and (via reference params)
// its current log output and whether the job is ongoing
virtual bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) = 0;
// Synchronously send a snapshot of the current status to the given
// client (as governed by the client's MonitorScope). This is called on
// initial websocket connect.
virtual void sendStatus(LaminarClient* client) = 0;
// Implements the laminar client interface allowing the setting of
// arbitrary parameters on a run (usually itself) to be available in
// the environment of subsequent scripts.
virtual bool setParam(std::string job, uint buildNum, std::string param, std::string value) = 0;
// Gets the list of jobs currently waiting in the execution queue
virtual const std::list<std::shared_ptr<Run>>& listQueuedJobs() = 0;
// Gets the list of currently executing jobs
virtual const RunSet& listRunningJobs() = 0;
// Gets the list of known jobs - scans cfg/jobs for *.run files
virtual std::list<std::string> listKnownJobs() = 0;
// Fetches the content of an artifact given its filename relative to
// $LAMINAR_HOME/archive. Ideally, this would instead be served by a
// proper web server which handles this url.
virtual kj::Maybe<kj::Own<const kj::ReadableFile>> getArtefact(std::string path) = 0;
// Given the name of a job, populate the provided string reference with
// SVG content describing the last known state of the job. Returns false
// if the job is unknown.
virtual bool handleBadgeRequest(std::string job, std::string& badge) = 0;
// Fetches the content of $LAMINAR_HOME/custom/style.css or an empty
// string. Ideally, this would instead be served by a proper web server
// which handles this url.
virtual std::string getCustomCss() = 0;
// Aborts a single job
virtual bool abort(std::string job, uint buildNum) = 0;
// Abort all running jobs
virtual void abortAll() = 0;
// Callback to handle a configuration modification notification
virtual void notifyConfigChanged() = 0;
};
#endif // LAMINAR_INTERFACE_H_

@ -20,6 +20,8 @@
#include "server.h"
#include "conf.h"
#include "log.h"
#include "http.h"
#include "rpc.h"
#include <sys/wait.h>
#include <sys/mman.h>
@ -35,7 +37,7 @@
#include <rapidjson/writer.h>
// rapidjson::Writer with a StringBuffer is used a lot in Laminar for
// preparing JSON messages to send to Websocket clients. A small wrapper
// preparing JSON messages to send to HTTP clients. A small wrapper
// class here reduces verbosity later for this common use case.
class Json : public rapidjson::Writer<rapidjson::StringBuffer> {
public:
@ -52,13 +54,6 @@ template<> Json& Json::set(const char* key, double value) { String(key); Double(
template<> Json& Json::set(const char* key, const char* value) { String(key); String(value); return *this; }
template<> Json& Json::set(const char* key, std::string value) { String(key); String(value.c_str()); return *this; }
namespace {
// Default values when none were supplied in $LAMINAR_CONF_FILE (/etc/laminar.conf)
constexpr const char* INTADDR_RPC_DEFAULT = "unix-abstract:laminar";
constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080";
constexpr const char* ARCHIVE_URL_DEFAULT = "/archive/";
}
// short syntax helpers for kj::Path
template<typename T>
inline kj::Path operator/(const kj::Path& p, const T& ext) {
@ -71,18 +66,19 @@ inline kj::Path operator/(const std::string& p, const T& ext) {
typedef std::string str;
Laminar::Laminar(const char *home) :
homePath(kj::Path::parse(&home[1])),
fsHome(kj::newDiskFilesystem()->getRoot().openSubdir(homePath, kj::WriteMode::MODIFY))
Laminar::Laminar(Server &server, Settings settings) :
settings(settings),
srv(server),
homePath(kj::Path::parse(&settings.home[1])),
fsHome(kj::newDiskFilesystem()->getRoot().openSubdir(homePath, kj::WriteMode::MODIFY)),
http(kj::heap<Http>(*this)),
rpc(kj::heap<Rpc>(*this))
{
LASSERT(home[0] == '/');
LASSERT(settings.home[0] == '/');
archiveUrl = ARCHIVE_URL_DEFAULT;
if(char* envArchive = getenv("LAMINAR_ARCHIVE_URL")) {
archiveUrl = envArchive;
if(archiveUrl.back() != '/')
archiveUrl.append("/");
}
archiveUrl = settings.archive_url;
if(archiveUrl.back() != '/')
archiveUrl.append("/");
numKeepRunDirs = 0;
@ -103,29 +99,22 @@ Laminar::Laminar(const char *home) :
buildNums[name] = build;
});
srv = nullptr;
srv.watchPaths([this]{
LLOG(INFO, "Reloading configuration");
loadConfiguration();
// config change may allow stuck jobs to dequeue
assignNewJobs();
}).addPath((homePath/"cfg"/"nodes").toString(true).cStr())
.addPath((homePath/"cfg"/"jobs").toString(true).cStr());
srv.listenRpc(*rpc, settings.bind_rpc);
srv.listenHttp(*http, settings.bind_http);
// Load configuration, may be called again in response to an inotify event
// that the configuration files have been modified
loadConfiguration();
}
void Laminar::registerClient(LaminarClient* client) {
clients.insert(client);
}
void Laminar::deregisterClient(LaminarClient* client) {
clients.erase(client);
}
void Laminar::registerWaiter(LaminarWaiter *waiter) {
waiters.insert(waiter);
}
void Laminar::deregisterWaiter(LaminarWaiter *waiter) {
waiters.erase(waiter);
}
uint Laminar::latestRun(std::string job) {
auto it = activeJobs.byJobName().equal_range(job);
if(it.first == it.second) {
@ -141,8 +130,6 @@ uint Laminar::latestRun(std::string job) {
}
}
// TODO: reunify with sendStatus. The difference is that this method is capable of
// returning "not found" to the caller, and sendStatus isn't
bool Laminar::handleLogRequest(std::string name, uint num, std::string& output, bool& complete) {
if(Run* run = activeRun(name, num)) {
output = run->log;
@ -216,40 +203,15 @@ void Laminar::populateArtifacts(Json &j, std::string job, uint num) const {
}
}
void Laminar::sendStatus(LaminarClient* client) {
if(client->scope.type == MonitorScope::LOG) {
// If the requested job is currently in progress
if(const Run* run = activeRun(client->scope.job, client->scope.num)) {
client->sendMessage(run->log.c_str());
} else { // it must be finished, fetch it from the database
db->stmt("SELECT output, outputLen FROM builds WHERE name = ? AND number = ?")
.bind(client->scope.job, client->scope.num)
.fetch<str,int>([=](str maybeZipped, unsigned long sz) {
str log(sz+1,'\0');
if(sz >= COMPRESS_LOG_MIN_SIZE) {
int res = ::uncompress((uint8_t*) log.data(), &sz,
(const uint8_t*) maybeZipped.data(), maybeZipped.size());
if(res == Z_OK)
client->sendMessage(log);
else
LLOG(ERROR, "Failed to uncompress log");
} else {
client->sendMessage(maybeZipped);
}
});
client->notifyJobFinished();
}
return;
}
std::string Laminar::getStatus(MonitorScope scope) {
Json j;
j.set("type", "status");
j.set("title", getenv("LAMINAR_TITLE") ?: "Laminar");
j.set("time", time(nullptr));
j.startObject("data");
if(client->scope.type == MonitorScope::RUN) {
if(scope.type == MonitorScope::RUN) {
db->stmt("SELECT queuedAt,startedAt,completedAt,result,reason,parentJob,parentBuild FROM builds WHERE name = ? AND number = ?")
.bind(client->scope.job, client->scope.num)
.bind(scope.job, scope.num)
.fetch<time_t, time_t, time_t, int, std::string, std::string, uint>([&](time_t queued, time_t started, time_t completed, int result, std::string reason, std::string parentJob, uint parentBuild) {
j.set("queued", started-queued);
j.set("started", started);
@ -258,7 +220,7 @@ void Laminar::sendStatus(LaminarClient* client) {
j.set("reason", reason);
j.startObject("upstream").set("name", parentJob).set("num", parentBuild).EndObject(2);
});
if(const Run* run = activeRun(client->scope.job, client->scope.num)) {
if(const Run* run = activeRun(scope.job, scope.num)) {
j.set("queued", run->startedAt - run->queuedAt);
j.set("started", run->startedAt);
j.set("result", to_string(RunState::RUNNING));
@ -270,30 +232,30 @@ void Laminar::sendStatus(LaminarClient* client) {
j.set("etc", run->startedAt + lastRuntime);
});
}
j.set("latestNum", int(buildNums[client->scope.job]));
j.set("latestNum", int(buildNums[scope.job]));
j.startArray("artifacts");
populateArtifacts(j, client->scope.job, client->scope.num);
populateArtifacts(j, scope.job, scope.num);
j.EndArray();
} else if(client->scope.type == MonitorScope::JOB) {
} else if(scope.type == MonitorScope::JOB) {
const uint runsPerPage = 10;
j.startArray("recent");
// ORDER BY param cannot be bound
std::string order_by;
std::string direction = client->scope.order_desc ? "DESC" : "ASC";
if(client->scope.field == "number")
std::string direction = scope.order_desc ? "DESC" : "ASC";
if(scope.field == "number")
order_by = "number " + direction;
else if(client->scope.field == "result")
else if(scope.field == "result")
order_by = "result " + direction + ", number DESC";
else if(client->scope.field == "started")
else if(scope.field == "started")
order_by = "startedAt " + direction + ", number DESC";
else if(client->scope.field == "duration")
else if(scope.field == "duration")
order_by = "(completedAt-startedAt) " + direction + ", number DESC";
else
order_by = "number DESC";
std::string stmt = "SELECT number,startedAt,completedAt,result,reason FROM builds WHERE name = ? ORDER BY "
+ order_by + " LIMIT ?,?";
db->stmt(stmt.c_str())
.bind(client->scope.job, client->scope.page * runsPerPage, runsPerPage)
.bind(scope.job, scope.page * runsPerPage, runsPerPage)
.fetch<uint,time_t,time_t,int,str>([&](uint build,time_t started,time_t completed,int result,str reason){
j.StartObject();
j.set("number", build)
@ -305,18 +267,18 @@ void Laminar::sendStatus(LaminarClient* client) {
});
j.EndArray();
db->stmt("SELECT COUNT(*),AVG(completedAt-startedAt) FROM builds WHERE name = ?")
.bind(client->scope.job)
.bind(scope.job)
.fetch<uint,uint>([&](uint nRuns, uint averageRuntime){
j.set("averageRuntime", averageRuntime);
j.set("pages", (nRuns-1) / runsPerPage + 1);
j.startObject("sort");
j.set("page", client->scope.page)
.set("field", client->scope.field)
.set("order", client->scope.order_desc ? "dsc" : "asc")
j.set("page", scope.page)
.set("field", scope.field)
.set("order", scope.order_desc ? "dsc" : "asc")
.EndObject();
});
j.startArray("running");
auto p = activeJobs.byJobName().equal_range(client->scope.job);
auto p = activeJobs.byJobName().equal_range(scope.job);
for(auto it = p.first; it != p.second; ++it) {
const std::shared_ptr<Run> run = *it;
j.StartObject();
@ -330,27 +292,27 @@ void Laminar::sendStatus(LaminarClient* client) {
j.EndArray();
int nQueued = 0;
for(const auto& run : queuedJobs) {
if (run->name == client->scope.job) {
if (run->name == scope.job) {
nQueued++;
}
}
j.set("nQueued", nQueued);
db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result = ? ORDER BY completedAt DESC LIMIT 1")
.bind(client->scope.job, int(RunState::SUCCESS))
.bind(scope.job, int(RunState::SUCCESS))
.fetch<int,time_t>([&](int build, time_t started){
j.startObject("lastSuccess");
j.set("number", build).set("started", started);
j.EndObject();
});
db->stmt("SELECT number,startedAt FROM builds WHERE name = ? AND result <> ? ORDER BY completedAt DESC LIMIT 1")
.bind(client->scope.job, int(RunState::SUCCESS))
.bind(scope.job, int(RunState::SUCCESS))
.fetch<int,time_t>([&](int build, time_t started){
j.startObject("lastFailed");
j.set("number", build).set("started", started);
j.EndObject();
});
} else if(client->scope.type == MonitorScope::ALL) {
} else if(scope.type == MonitorScope::ALL) {
j.startArray("jobs");
db->stmt("SELECT name,number,startedAt,completedAt,result FROM builds b JOIN (SELECT name n,MAX(number) l FROM builds GROUP BY n) q ON b.name = q.n AND b.number = q.l")
.fetch<str,uint,time_t,time_t,int>([&](str name,uint number, time_t started, time_t completed, int result){
@ -512,31 +474,16 @@ void Laminar::sendStatus(LaminarClient* client) {
}
j.EndObject();
client->sendMessage(j.str());
return j.str();
}
Laminar::~Laminar() noexcept try {
delete db;
delete srv;
} catch (std::exception& e) {
LLOG(ERROR, e.what());
return;
}
void Laminar::run() {
const char* listen_rpc = getenv("LAMINAR_BIND_RPC") ?: INTADDR_RPC_DEFAULT;
const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT;
srv = new Server(*this, listen_rpc, listen_http);
srv->addWatchPath((homePath/"cfg"/"nodes").toString(true).cStr());
srv->addWatchPath((homePath/"cfg"/"jobs").toString(true).cStr());
srv->start();
}
void Laminar::stop() {
srv->stop();
}
bool Laminar::loadConfiguration() {
if(const char* ndirs = getenv("LAMINAR_KEEP_RUNDIRS"))
numKeepRunDirs = static_cast<uint>(atoi(ndirs));
@ -627,24 +574,12 @@ std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) {
.startObject("data")
.set("name", name)
.EndObject();
const char* msg = j.str();
for(LaminarClient* c : clients) {
if(c->scope.wantsStatus(name))
c->sendMessage(msg);
}
http->notifyEvent("job_queued", j.str(), name.c_str());
assignNewJobs();
return run;
}
void Laminar::notifyConfigChanged()
{
LLOG(INFO, "Reloading configuration");
loadConfiguration();
// config change may allow stuck jobs to dequeue
assignNewJobs();
}
bool Laminar::abort(std::string job, uint buildNum) {
if(Run* run = activeRun(job, buildNum)) {
run->abort(true);
@ -700,11 +635,11 @@ bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
runFinished(run.get());
});
if(run->timeout > 0) {
exec = exec.attach(srv->addTimeout(run->timeout, [r=run.get()](){
exec = exec.attach(srv.addTimeout(run->timeout, [r=run.get()](){
r->abort(true);
}));
}
srv->addTask(kj::mv(exec));
srv.addTask(kj::mv(exec));
LLOG(INFO, "Started job on node", run->name, run->build, node->name);
// update next build number
@ -731,16 +666,7 @@ bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
}
j.EndArray();
j.EndObject();
const char* msg = j.str();
for(LaminarClient* c : clients) {
if(c->scope.wantsStatus(run->name, run->build)
// The run page also should know that another job has started
// (so maybe it can show a previously hidden "next" button).
// Hence this small hack:
|| (c->scope.type == MonitorScope::Type::RUN && c->scope.job == run->name))
c->sendMessage(msg);
}
http->notifyEvent("job_started", j.str(), run->name.c_str(), run->build);
return true;
}
}
@ -765,17 +691,14 @@ kj::Promise<void> Laminar::handleRunStep(Run* run) {
return kj::READY_NOW;
}
kj::Promise<int> exited = srv->onChildExit(run->current_pid);
kj::Promise<int> exited = srv.onChildExit(run->current_pid);
// promise is fulfilled when the process is reaped. But first we wait for all
// output from the pipe (Run::output_fd) to be consumed.
return srv->readDescriptor(run->output_fd, [this,run](const char*b,size_t n){
return srv.readDescriptor(run->output_fd, [this,run](const char*b,size_t n){
// handle log output
std::string s(b, n);
run->log += s;
for(LaminarClient* c : clients) {
if(c->scope.wantsLog(run->name, run->build))
c->sendMessage(s);
}
http->notifyLog(run->name, run->build, s, false);
}).then([p = std::move(exited)]() mutable {
// wait until the process is reaped
return kj::mv(p);
@ -832,18 +755,8 @@ void Laminar::runFinished(Run * r) {
populateArtifacts(j, r->name, r->build);
j.EndArray();
j.EndObject();
const char* msg = j.str();
for(LaminarClient* c : clients) {
if(c->scope.wantsStatus(r->name, r->build))
c->sendMessage(msg);
c->notifyJobFinished();
}
// notify the waiters
for(LaminarWaiter* w : waiters) {
w->complete(r);
}
http->notifyEvent("job_completed", j.str(), r->name, r->build);
http->notifyLog(r->name, r->build, "", true);
// erase reference to run from activeJobs. Since runFinished is called in a
// lambda whose context contains a shared_ptr<Run>, the run won't be deleted
// until the context is destroyed at the end of the lambda execution.

@ -19,13 +19,14 @@
#ifndef LAMINAR_LAMINAR_H_
#define LAMINAR_LAMINAR_H_
#include "interface.h"
#include "run.h"
#include "monitorscope.h"
#include "node.h"
#include "database.h"
#include <unordered_map>
#include <kj/filesystem.h>
#include <kj/async-io.h>
// Node name to node object map
typedef std::unordered_map<std::string, std::shared_ptr<Node>> NodeMap;
@ -33,40 +34,72 @@ typedef std::unordered_map<std::string, std::shared_ptr<Node>> NodeMap;
struct Server;
class Json;
class Http;
class Rpc;
struct Settings {
const char* home;
const char* bind_rpc;
const char* bind_http;
const char* archive_url;
};
// The main class implementing the application's business logic.
// It owns a Server to manage the HTTP/websocket and Cap'n Proto RPC
// interfaces and communicates via the LaminarInterface methods and
// the LaminarClient objects (see interface.h)
class Laminar final : public LaminarInterface {
class Laminar final {
public:
Laminar(const char* homePath);
~Laminar() noexcept override;
// Runs the application forever
void run();
// Call this in a signal handler to make run() return
void stop();
// Implementations of LaminarInterface
std::shared_ptr<Run> queueJob(std::string name, ParamMap params = ParamMap()) override;
void registerClient(LaminarClient* client) override;
void deregisterClient(LaminarClient* client) override;
void registerWaiter(LaminarWaiter* waiter) override;
void deregisterWaiter(LaminarWaiter* waiter) override;
uint latestRun(std::string job) override;
bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) override;
void sendStatus(LaminarClient* client) override;
bool setParam(std::string job, uint buildNum, std::string param, std::string value) override;
const std::list<std::shared_ptr<Run>>& listQueuedJobs() override;
const RunSet& listRunningJobs() override;
std::list<std::string> listKnownJobs() override;
kj::Maybe<kj::Own<const kj::ReadableFile>> getArtefact(std::string path) override;
bool handleBadgeRequest(std::string job, std::string& badge) override;
std::string getCustomCss() override;
bool abort(std::string job, uint buildNum) override;
void abortAll() override;
void notifyConfigChanged() override;
Laminar(Server& server, Settings settings);
~Laminar() noexcept;
// Queues a job, returns immediately. Return value will be nullptr if
// the supplied name is not a known job.
std::shared_ptr<Run> queueJob(std::string name, ParamMap params = ParamMap());
// Return the latest known number of the named job
uint latestRun(std::string job);
// Given a job name and number, return existence and (via reference params)
// its current log output and whether the job is ongoing
bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete);
// Given a relevant scope, returns a JSON string describing the current
// server status. Content differs depending on the page viewed by the user,
// which should be provided as part of the scope.
std::string getStatus(MonitorScope scope);
// Implements the laminarc function of setting arbitrary parameters on a run,
// (typically the current run) which will be made available in the environment
// of subsequent scripts.
bool setParam(std::string job, uint buildNum, std::string param, std::string value);
// Gets the list of jobs currently waiting in the execution queue
const std::list<std::shared_ptr<Run>>& listQueuedJobs();
// Gets the list of currently executing jobs
const RunSet& listRunningJobs();
// Gets the list of known jobs - scans cfg/jobs for *.run files
std::list<std::string> listKnownJobs();
// Fetches the content of an artifact given its filename relative to
// $LAMINAR_HOME/archive. Ideally, this would instead be served by a
// proper web server which handles this url.
kj::Maybe<kj::Own<const kj::ReadableFile>> getArtefact(std::string path);
// Given the name of a job, populate the provided string reference with
// SVG content describing the last known state of the job. Returns false
// if the job is unknown.
bool handleBadgeRequest(std::string job, std::string& badge);
// Fetches the content of $LAMINAR_HOME/custom/style.css or an empty
// string. Ideally, this would instead be served by a proper web server
// which handles this url.
std::string getCustomCss();
// Aborts a single job
bool abort(std::string job, uint buildNum);
// Abort all running jobs
void abortAll();
private:
bool loadConfiguration();
@ -89,16 +122,18 @@ private:
std::unordered_map<std::string, std::set<std::string>> jobTags;
Settings settings;
RunSet activeJobs;
Database* db;
Server* srv;
Server& srv;
NodeMap nodes;
kj::Path homePath;
kj::Own<const kj::Directory> fsHome;
std::set<LaminarClient*> clients;
std::set<LaminarWaiter*> waiters;
uint numKeepRunDirs;
std::string archiveUrl;
kj::Own<Http> http;
kj::Own<Rpc> rpc;
};
#endif // LAMINAR_LAMINAR_H_

@ -17,17 +17,31 @@
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "laminar.h"
#include "server.h"
#include "log.h"
#include <signal.h>
#include <kj/async-unix.h>
#include <kj/filesystem.h>
static Laminar* laminar;
static Server* server;
static void laminar_quit(int) {
laminar->stop();
// Abort current jobs. Most of the time this isn't necessary since
// systemd stop or other kill mechanism will send SIGTERM to the whole
// process group.
laminar->abortAll();
server->stop();
}
namespace {
constexpr const char* INTADDR_RPC_DEFAULT = "unix-abstract:laminar";
constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080";
constexpr const char* ARCHIVE_URL_DEFAULT = "/archive/";
}
int main(int argc, char** argv) {
for(int i = 1; i < argc; ++i) {
if(strcmp(argv[i], "-v") == 0) {
@ -35,15 +49,27 @@ int main(int argc, char** argv) {
}
}
laminar = new Laminar(getenv("LAMINAR_HOME") ?: "/var/lib/laminar");
auto ioContext = kj::setupAsyncIo();
Settings settings;
// Default values when none were supplied in $LAMINAR_CONF_FILE (/etc/laminar.conf)
settings.home = getenv("LAMINAR_HOME") ?: "/var/lib/laminar";
settings.bind_rpc = getenv("LAMINAR_BIND_RPC") ?: INTADDR_RPC_DEFAULT;
settings.bind_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT;
settings.archive_url = getenv("LAMINAR_ARCHIVE_URL") ?: ARCHIVE_URL_DEFAULT;
server = new Server(ioContext);
laminar = new Laminar(*server, settings);
kj::UnixEventPort::captureChildExit();
signal(SIGINT, &laminar_quit);
signal(SIGTERM, &laminar_quit);
laminar->run();
server->start();
delete laminar;
delete server;
LLOG(INFO, "Clean exit");
return 0;

@ -0,0 +1,62 @@
///
/// Copyright 2015-2019 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef LAMINAR_MONITORSCOPE_H_
#define LAMINAR_MONITORSCOPE_H_
#include <string>
// Simple struct to define which information a frontend client is interested
// in, both in initial request phase and real-time updates. It corresponds
// loosely to frontend URLs
struct MonitorScope {
enum Type {
HOME, // home page: recent builds and statistics
ALL, // browse jobs
JOB, // a specific job page
RUN, // a specific run page
};
MonitorScope(Type type = HOME, std::string job = std::string(), uint num = 0) :
type(type),
job(job),
num(num),
page(0),
field("number"),
order_desc(true)
{}
// whether this scope wants status information about the given job or run
bool wantsStatus(std::string ajob, uint anum = 0) const {
if(type == HOME || type == ALL) return true;
if(type == JOB) return ajob == job;
if(type == RUN) return ajob == job && anum == num;
return false;
}
Type type;
std::string job;
uint num = 0;
// sorting
uint page = 0;
std::string field;
bool order_desc;
};
#endif // LAMINAR_MONITORSCOPE_H_

@ -22,30 +22,30 @@ const timeScale = function(max){
? { scale:function(v){return Math.round(v/60)/10}, label:'Minutes' }
: { scale:function(v){return v;}, label:'Seconds' };
}
const WebsocketHandler = function() {
function setupWebsocket(path, next) {
let ws = new WebSocket(document.head.baseURI.replace(/^http/,'ws') + path.substr(1));
ws.onmessage = function(msg) {
const ServerEventHandler = function() {
function setupEventSource(path, query, next) {
const es = new EventSource(document.head.baseURI + path.substr(1) + query);
es.path = path; // save for later in case we need to add query params
es.onmessage = function(msg) {
msg = JSON.parse(msg.data);
// "status" is the first message the websocket always delivers.
// "status" is the first message the server always delivers.
// Use this to confirm the navigation. The component is not
// created until next() is called, so creating a reference
// for other message types must be deferred. There are some extra
// subtle checks here. If this websocket already has a component,
// subtle checks here. If this eventsource already has a component,
// then this is not the first time the status message has been
// received. If the frontend requests an update, the status message
// should not be handled here, but treated the same as any other
// message. An exception is if the connection has been lost - in
// that case we should treat this as a "first-time" status message.
// this.comp.ws is used as a proxy for this.
if (msg.type === 'status' && (!this.comp || !this.comp.ws)) {
// this.comp.es is used as a proxy for this.
if (msg.type === 'status' && (!this.comp || !this.comp.es)) {
next(comp => {
// Set up bidirectional reference
// 1. needed to reference the component for other msg types
this.comp = comp;
// 2. needed to close the ws on navigation away
comp.ws = this;
comp.es = this;
// Update html and nav titles
document.title = comp.$root.title = msg.title;
// Calculate clock offset (used by ProgressUpdater)
@ -59,47 +59,35 @@ const WebsocketHandler = function() {
if (!this.comp)
return console.error("Page component was undefined");
else {
this.comp.$root.connected = true;
this.comp.$root.showNotify(msg.type, msg.data);
if(typeof this.comp[msg.type] === 'function')
this.comp[msg.type](msg.data);
}
}
};
ws.onclose = function(ev) {
// if this.comp isn't set, this connection has never been used
// and a re-connection isn't meaningful
if(!ev.wasClean && 'comp' in this) {
this.comp.$root.connected = false;
// remove the reference to the websocket from the component.
// This not only cleans up an unneeded reference but ensures a
// status message on reconnection is treated as "first-time"
delete this.comp.ws;
this.reconnectTimeout = setTimeout(()=>{
var newWs = setupWebsocket(path, (fn) => { fn(this.comp); });
// the next() callback won't happen if the server is still
// unreachable. Save the reference to the last component
// here so we can recover if/when it does return. This means
// passing this.comp in the next() callback above is redundant
// but necessary to keep the same implementation.
newWs.comp = this.comp;
}, 2000);
}
}
return ws;
};
es.onerror = function() {
this.comp.$root.connected = false;
}
return es;
}
return {
beforeRouteEnter(to, from, next) {
setupWebsocket(to.path, (fn) => { next(fn); });
setupEventSource(to.path, '', (fn) => { next(fn); });
},
beforeRouteUpdate(to, from, next) {
this.ws.close();
clearTimeout(this.ws.reconnectTimeout);
setupWebsocket(to.path, (fn) => { fn(this); next(); });
this.es.close();
setupEventSource(to.path, '', (fn) => { fn(this); next(); });
},
beforeRouteLeave(to, from, next) {
this.ws.close();
clearTimeout(this.ws.reconnectTimeout);
this.es.close();
next();
},
methods: {
query(q) {
this.es.close();
setupEventSource(this.es.path, '?' + Object.entries(q).map(([k,v])=>`${k}=${v}`).join('&'), (fn) => { fn(this); });
}
}
};
}();
@ -195,7 +183,7 @@ const Home = function() {
return {
template: '#home',
mixins: [WebsocketHandler, Utils, ProgressUpdater],
mixins: [ServerEventHandler, Utils, ProgressUpdater],
data: function() {
return state;
},
@ -432,7 +420,7 @@ const Jobs = function() {
};
return {
template: '#jobs',
mixins: [WebsocketHandler, Utils, ProgressUpdater],
mixins: [ServerEventHandler, Utils, ProgressUpdater],
data: function() { return state; },
methods: {
status: function(msg) {
@ -536,7 +524,7 @@ var Job = function() {
var chtBt = null;
return Vue.extend({
template: '#job',
mixins: [WebsocketHandler, Utils, ProgressUpdater],
mixins: [ServerEventHandler, Utils, ProgressUpdater],
data: function() {
return state;
},
@ -634,11 +622,11 @@ var Job = function() {
},
page_next: function() {
state.sort.page++;
this.ws.send(JSON.stringify(state.sort));
this.query(state.sort)
},
page_prev: function() {
state.sort.page--;
this.ws.send(JSON.stringify(state.sort));
this.query(state.sort)
},
do_sort: function(field) {
if(state.sort.field == field) {
@ -647,7 +635,7 @@ var Job = function() {
state.sort.order = 'dsc';
state.sort.field = field;
}
this.ws.send(JSON.stringify(state.sort));
this.query(state.sort)
}
}
});
@ -690,7 +678,7 @@ const Run = function() {
return {
template: '#run',
mixins: [WebsocketHandler, Utils, ProgressUpdater],
mixins: [ServerEventHandler, Utils, ProgressUpdater],
data: function() {
return state;
},

@ -18,8 +18,7 @@
///
#include "rpc.h"
#include "laminar.capnp.h"
#include "interface.h"
#include "laminar.h"
#include "log.h"
namespace {
@ -38,16 +37,16 @@ LaminarCi::JobResult fromRunState(RunState state) {
}
// This is the implementation of the Laminar Cap'n Proto RPC interface.
// As such, it implements the pure virtual interface generated from
// laminar.capnp with calls to the LaminarInterface
class RpcImpl : public LaminarCi::Server, public LaminarWaiter {
// laminar.capnp with calls to the primary Laminar class
class RpcImpl : public LaminarCi::Server {
public:
RpcImpl(LaminarInterface& l) :
RpcImpl(Laminar& l) :
LaminarCi::Server(),
laminar(l)
{
}
~RpcImpl() override {
virtual ~RpcImpl() {
}
// Queue a job, without waiting for it to start
@ -83,10 +82,9 @@ public:
LLOG(INFO, "RPC run", jobName);
std::shared_ptr<Run> run = laminar.queueJob(jobName, params(context.getParams().getParams()));
if(Run* r = run.get()) {
runWaiters[r].emplace_back(kj::newPromiseAndFulfiller<RunState>());
return runWaiters[r].back().promise.then([context,run](RunState state) mutable {
return r->whenFinished().then([context,r](RunState state) mutable {
context.getResults().setResult(fromRunState(state));
context.getResults().setBuildNum(run->build);
context.getResults().setBuildNum(r->build);
});
} else {
context.getResults().setResult(LaminarCi::JobResult::UNKNOWN);
@ -164,18 +162,11 @@ private:
return res;
}
// Implements LaminarWaiter::complete
void complete(const Run* r) override {
for(kj::PromiseFulfillerPair<RunState>& w : runWaiters[r])
w.fulfiller->fulfill(RunState(r->result));
runWaiters.erase(r);
}
private:
LaminarInterface& laminar;
Laminar& laminar;
std::unordered_map<const Run*, std::list<kj::PromiseFulfillerPair<RunState>>> runWaiters;
};
Rpc::Rpc(LaminarInterface& li) :
Rpc::Rpc(Laminar& li) :
rpcInterface(kj::heap<RpcImpl>(li))
{}

@ -23,11 +23,11 @@
#include <capnp/rpc-twoparty.h>
#include <capnp/rpc.capnp.h>
struct LaminarInterface;
struct Laminar;
class Rpc {
public:
Rpc(LaminarInterface &li);
Rpc(Laminar&li);
kj::Promise<void> accept(kj::Own<kj::AsyncIoStream>&& connection);
capnp::Capability::Client rpcInterface;

@ -51,7 +51,8 @@ Run::Run(std::string name, ParamMap pm, kj::Path&& rootPath) :
params(kj::mv(pm)),
queuedAt(time(nullptr)),
rootPath(kj::mv(rootPath)),
started(kj::newPromiseAndFulfiller<void>())
started(kj::newPromiseAndFulfiller<void>()),
finished(kj::newPromiseAndFulfiller<RunState>())
{
for(auto it = params.begin(); it != params.end();) {
if(it->first[0] == '=') {
@ -261,4 +262,6 @@ void Run::reaped(int status) {
else if(status != 0)
result = RunState::FAILED;
// otherwise preserve earlier status
finished.fulfiller->fulfill(RunState(result));
}

@ -74,6 +74,7 @@ public:
std::string reason() const;
kj::Promise<void>&& whenStarted() { return kj::mv(started.promise); }
kj::Promise<RunState>&& whenFinished() { return kj::mv(finished.promise); }
std::shared_ptr<Node> node;
RunState result;
@ -108,6 +109,7 @@ private:
std::list<kj::Path> env;
std::string reasonMsg;
kj::PromiseFulfillerPair<void> started;
kj::PromiseFulfillerPair<RunState> finished;
};

@ -17,10 +17,10 @@
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "server.h"
#include "interface.h"
#include "log.h"
#include "rpc.h"
#include "http.h"
#include "laminar.h"
#include <kj/async-io.h>
#include <kj/async-unix.h>
@ -35,41 +35,11 @@
// a multiple of sizeof(struct signalfd_siginfo) == 128
#define PROC_IO_BUFSIZE 4096
Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
kj::StringPtr httpBindAddress) :
laminarInterface(li),
ioContext(kj::setupAsyncIo()),
Server::Server(kj::AsyncIoContext& io) :
ioContext(io),
listeners(kj::heap<kj::TaskSet>(*this)),
childTasks(*this),
httpReady(kj::newPromiseAndFulfiller<void>()),
http(kj::heap<Http>(li)),
rpc(kj::heap<Rpc>(li))
childTasks(*this)
{
// RPC task
if(rpcBindAddress.startsWith("unix:"))
unlink(rpcBindAddress.slice(strlen("unix:")).cStr());
listeners->add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress)
.then([this](kj::Own<kj::NetworkAddress>&& addr) {
return acceptRpcClient(addr->listen());
}));
// HTTP task
if(httpBindAddress.startsWith("unix:"))
unlink(httpBindAddress.slice(strlen("unix:")).cStr());
listeners->add(ioContext.provider->getNetwork().parseAddress(httpBindAddress)
.then([this](kj::Own<kj::NetworkAddress>&& addr) {
// TODO: a better way? Currently used only for testing
httpReady.fulfiller->fulfill();
return http->startServer(ioContext.lowLevelProvider->getTimer(), addr->listen());
}));
// handle watched paths
{
inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
pathWatch = readDescriptor(inotify_fd, [this](const char*, size_t){
laminarInterface.notifyConfigChanged();
});
}
}
Server::~Server() {
@ -89,15 +59,12 @@ void Server::start() {
// Shutdown sequence:
// 1. stop accepting new connections
listeners = nullptr;
// 2. abort current jobs. Most of the time this isn't necessary since
// systemd stop or other kill mechanism will send SIGTERM to the whole
// process group.
laminarInterface.abortAll();
// 3. wait for all children to close
// 2. wait for all children to close
childTasks.onEmpty().wait(ioContext.waitScope);
// 4. run the loop once more to send any pending output to websocket clients
// TODO not sure the comments below are true
// 3. run the loop once more to send any pending output to http clients
ioContext.waitScope.poll();
// 5. return: websockets will be destructed when class is deleted
// 4. return: http connections will be destructed when class is deleted
}
void Server::stop() {
@ -126,16 +93,52 @@ kj::Promise<int> Server::onChildExit(kj::Maybe<pid_t> &pid) {
return ioContext.unixEventPort.onChildExit(pid);
}
void Server::addWatchPath(const char* dpath) {
inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE);
Server::PathWatcher& Server::watchPaths(std::function<void()> fn)
{
struct PathWatcherImpl : public PathWatcher {
PathWatcher& addPath(const char* path) override {
inotify_add_watch(fd, path, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE);
return *this;
}
int fd;
};
auto pwi = kj::heap<PathWatcherImpl>();
PathWatcher* pw = pwi.get();
pwi->fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
listeners->add(readDescriptor(pwi->fd, [fn](const char*, size_t){
fn();
}).attach(kj::mv(pwi)));
return *pw;
}
void Server::listenRpc(Rpc &rpc, kj::StringPtr rpcBindAddress)
{
if(rpcBindAddress.startsWith("unix:"))
unlink(rpcBindAddress.slice(strlen("unix:")).cStr());
listeners->add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress)
.then([this,&rpc](kj::Own<kj::NetworkAddress>&& addr) {
return acceptRpcClient(rpc, addr->listen());
}));
}
void Server::listenHttp(Http &http, kj::StringPtr httpBindAddress)
{
if(httpBindAddress.startsWith("unix:"))
unlink(httpBindAddress.slice(strlen("unix:")).cStr());
listeners->add(ioContext.provider->getNetwork().parseAddress(httpBindAddress)
.then([this,&http](kj::Own<kj::NetworkAddress>&& addr) {
return http.startServer(ioContext.lowLevelProvider->getTimer(), addr->listen());
}));
}
kj::Promise<void> Server::acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener) {
kj::Promise<void> Server::acceptRpcClient(Rpc& rpc, kj::Own<kj::ConnectionReceiver>&& listener) {
kj::ConnectionReceiver& cr = *listener.get();
return cr.accept().then(kj::mvCapture(kj::mv(listener),
[this](kj::Own<kj::ConnectionReceiver>&& listener, kj::Own<kj::AsyncIoStream>&& connection) {
childTasks.add(rpc->accept(kj::mv(connection)));
return acceptRpcClient(kj::mv(listener));
[this, &rpc](kj::Own<kj::ConnectionReceiver>&& listener, kj::Own<kj::AsyncIoStream>&& connection) {
addTask(rpc.accept(kj::mv(connection)));
return acceptRpcClient(rpc, kj::mv(listener));
}));
}

@ -25,19 +25,14 @@
#include <capnp/capability.h>
#include <functional>
struct LaminarInterface;
struct Laminar;
struct Http;
struct Rpc;
// This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces
// and manages the program's asynchronous event loop
// This class manages the program's asynchronous event loop
class Server final : public kj::TaskSet::ErrorHandler {
public:
// Initializes the server with a LaminarInterface to handle requests from
// HTTP/Websocket or RPC clients and bind addresses for each of those
// interfaces. See the documentation for kj::AsyncIoProvider::getNetwork
// for a description of the address format
Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, kj::StringPtr httpBindAddress);
Server(kj::AsyncIoContext& ioContext);
~Server();
void start();
void stop();
@ -52,32 +47,28 @@ public:
// get a promise which resolves when a child process exits
kj::Promise<int> onChildExit(kj::Maybe<pid_t>& pid);
// add a path to be watched for changes
void addWatchPath(const char* dpath);
struct PathWatcher {
virtual PathWatcher& addPath(const char* path) = 0;
};
PathWatcher& watchPaths(std::function<void()>);
void listenRpc(Rpc& rpc, kj::StringPtr rpcBindAddress);
void listenHttp(Http& http, kj::StringPtr httpBindAddress);
private:
kj::Promise<void> acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
kj::Promise<void> acceptRpcClient(Rpc& rpc, kj::Own<kj::ConnectionReceiver>&& listener);
kj::Promise<void> handleFdRead(kj::AsyncInputStream* stream, char* buffer, std::function<void(const char*,size_t)> cb);
void taskFailed(kj::Exception&& exception) override;
private:
int efd_quit;
LaminarInterface& laminarInterface;
kj::AsyncIoContext ioContext;
kj::AsyncIoContext& ioContext;
kj::Own<kj::TaskSet> listeners;
kj::TaskSet childTasks;
kj::Maybe<kj::Promise<void>> reapWatch;
int inotify_fd;
kj::Maybe<kj::Promise<void>> pathWatch;
// TODO: restructure so this isn't necessary
friend class ServerTest;
kj::PromiseFulfillerPair<void> httpReady;
// TODO: WIP
kj::Own<Http> http;
kj::Own<Rpc> rpc;
};
#endif // LAMINAR_SERVER_H_

@ -0,0 +1,71 @@
///
/// Copyright 2019 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef LAMINAR_EVENTSOURCE_H_
#define LAMINAR_EVENTSOURCE_H_
#include <kj/async-io.h>
#include <kj/compat/http.h>
#include <rapidjson/document.h>
#include <vector>
class EventSource {
public:
EventSource(kj::AsyncIoContext& ctx, const char* httpConnectAddr, const char* path) :
networkAddress(ctx.provider->getNetwork().parseAddress(httpConnectAddr).wait(ctx.waitScope)),
httpClient(kj::newHttpClient(ctx.lowLevelProvider->getTimer(), headerTable, *networkAddress)),
headerTable(),
headers(headerTable),
buffer(kj::heapArrayBuilder<char>(BUFFER_SIZE))
{
headers.add("Accept", "text/event-stream");
auto resp = httpClient->request(kj::HttpMethod::GET, path, headers).response.wait(ctx.waitScope);
promise = waitForMessages(resp.body.get(), 0).attach(kj::mv(resp));
}
const std::vector<rapidjson::Document>& messages() {
return receivedMessages;
}
private:
kj::Own<kj::NetworkAddress> networkAddress;
kj::Own<kj::HttpClient> httpClient;
kj::HttpHeaderTable headerTable;
kj::HttpHeaders headers;
kj::ArrayBuilder<char> buffer;
kj::Maybe<kj::Promise<void>> promise;
std::vector<rapidjson::Document> receivedMessages;
kj::Promise<void> waitForMessages(kj::AsyncInputStream* stream, ulong offset) {
return stream->read(buffer.asPtr().begin() + offset, 1, BUFFER_SIZE).then([=](size_t s) {
ulong end = offset + s;
buffer.asPtr().begin()[end] = '\0';
if(strcmp(&buffer.asPtr().begin()[end - 2], "\n\n") == 0) {
rapidjson::Document d;
d.Parse(buffer.begin() + strlen("data: "));
receivedMessages.emplace_back(kj::mv(d));
end = 0;
}
return waitForMessages(stream, end);
});
}
static const int BUFFER_SIZE = 1024;
};
#endif // LAMINAR_EVENTSOURCE_H_

@ -0,0 +1,82 @@
///
/// Copyright 2019 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef LAMINAR_FIXTURE_H_
#define LAMINAR_FIXTURE_H_
#include <capnp/rpc-twoparty.h>
#include <gtest/gtest.h>
#include "laminar.capnp.h"
#include "eventsource.h"
#include "tempdir.h"
#include "laminar.h"
#include "server.h"
class LaminarFixture : public ::testing::Test {
public:
LaminarFixture() {
bind_rpc = std::string("unix:/") + tmp.path.toString(true).cStr() + "/rpc.sock";
bind_http = std::string("unix:/") + tmp.path.toString(true).cStr() + "/http.sock";
home = tmp.path.toString(true).cStr();
tmp.fs->openSubdir(kj::Path{"cfg", "jobs"}, kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT);
settings.home = home.c_str();
settings.bind_rpc = bind_rpc.c_str();
settings.bind_http = bind_http.c_str();
settings.archive_url = "/test-archive/";
server = new Server(ioContext);
laminar = new Laminar(*server, settings);
}
~LaminarFixture() noexcept(true) {
delete server;
delete laminar;
}
kj::Own<EventSource> eventSource(const char* path) {
return kj::heap<EventSource>(ioContext, bind_http.c_str(), path);
}
void defineJob(const char* name, const char* scriptContent) {
KJ_IF_MAYBE(f, tmp.fs->tryOpenFile(kj::Path{"cfg", "jobs", std::string(name) + ".run"},
kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT | kj::WriteMode::EXECUTABLE)) {
(*f)->writeAll(std::string("#!/bin/sh\n") + scriptContent + "\n");
}
}
LaminarCi::Client client() {
if(!rpc) {
auto stream = ioContext.provider->getNetwork().parseAddress(bind_rpc).wait(ioContext.waitScope)->connect().wait(ioContext.waitScope);
auto net = kj::heap<capnp::TwoPartyVatNetwork>(*stream, capnp::rpc::twoparty::Side::CLIENT);
rpc = kj::heap<capnp::RpcSystem<capnp::rpc::twoparty::VatId>>(*net, nullptr).attach(kj::mv(net), kj::mv(stream));
}
static capnp::word scratch[4];
memset(scratch, 0, sizeof(scratch));
auto hostId = capnp::MallocMessageBuilder(scratch).getRoot<capnp::rpc::twoparty::VatId>();
hostId.setSide(capnp::rpc::twoparty::Side::SERVER);
return rpc->bootstrap(hostId).castAs<LaminarCi>();
}
kj::Own<capnp::RpcSystem<capnp::rpc::twoparty::VatId>> rpc;
TempDir tmp;
std::string home, bind_rpc, bind_http;
Settings settings;
Server* server;
Laminar* laminar;
static kj::AsyncIoContext ioContext;
};
#endif // LAMINAR_FIXTURE_H_

@ -0,0 +1,75 @@
///
/// Copyright 2019 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include <kj/async-unix.h>
#include "laminar-fixture.h"
// TODO: consider handling this differently
kj::AsyncIoContext LaminarFixture::ioContext = kj::setupAsyncIo();
TEST_F(LaminarFixture, EmptyStatusMessageStructure) {
auto es = eventSource("/");
ioContext.waitScope.poll();
ASSERT_EQ(1, es->messages().size());
auto json = es->messages().front().GetObject();
EXPECT_STREQ("status", json["type"].GetString());
EXPECT_STREQ("Laminar", json["title"].GetString());
EXPECT_LT(time(nullptr) - json["time"].GetInt64(), 1);
auto data = json["data"].GetObject();
EXPECT_TRUE(data.HasMember("recent"));
EXPECT_TRUE(data.HasMember("running"));
EXPECT_TRUE(data.HasMember("queued"));
EXPECT_TRUE(data.HasMember("executorsTotal"));
EXPECT_TRUE(data.HasMember("executorsBusy"));
EXPECT_TRUE(data.HasMember("buildsPerDay"));
EXPECT_TRUE(data.HasMember("buildsPerJob"));
EXPECT_TRUE(data.HasMember("timePerJob"));
EXPECT_TRUE(data.HasMember("resultChanged"));
EXPECT_TRUE(data.HasMember("lowPassRates"));
EXPECT_TRUE(data.HasMember("buildTimeChanges"));
EXPECT_TRUE(data.HasMember("buildTimeDist"));
}
TEST_F(LaminarFixture, JobNotifyHomePage) {
defineJob("foo", "true");
auto es = eventSource("/");
auto req = client().runRequest();
req.setJobName("foo");
ASSERT_EQ(LaminarCi::JobResult::SUCCESS, req.send().wait(ioContext.waitScope).getResult());
// wait for job completed
ioContext.waitScope.poll();
ASSERT_EQ(4, es->messages().size());
auto job_queued = es->messages().at(1).GetObject();
EXPECT_STREQ("job_queued", job_queued["type"].GetString());
EXPECT_STREQ("foo", job_queued["data"]["name"].GetString());
auto job_started = es->messages().at(2).GetObject();
EXPECT_STREQ("job_started", job_started["type"].GetString());
EXPECT_STREQ("foo", job_started["data"]["name"].GetString());
auto job_completed = es->messages().at(3).GetObject();
EXPECT_STREQ("job_completed", job_completed["type"].GetString());
EXPECT_STREQ("foo", job_completed["data"]["name"].GetString());
}

@ -0,0 +1,28 @@
///
/// Copyright 2019 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include <kj/async-unix.h>
#include <gtest/gtest.h>
// gtest main supplied in order to call captureChildExit
int main(int argc, char **argv) {
kj::UnixEventPort::captureChildExit();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -1,47 +0,0 @@
///
/// Copyright 2018 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include <gtest/gtest.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/document.h>
#include "laminar.h"
class TestLaminarClient : public LaminarClient {
public:
virtual void sendMessage(std::string p) { payload = p; }
std::string payload;
};
class LaminarTest : public testing::Test {
protected:
LaminarTest() :
testing::Test(),
laminar("/tmp")
{}
Laminar laminar;
};
TEST_F(LaminarTest, StatusMessageContainsTime) {
TestLaminarClient testClient;
laminar.sendStatus(&testClient);
rapidjson::Document d;
d.Parse(testClient.payload.c_str());
ASSERT_TRUE(d.IsObject());
ASSERT_TRUE(d.HasMember("time"));
EXPECT_GE(1, d["time"].GetInt() - time(nullptr));
}

@ -1,138 +0,0 @@
///
/// Copyright 2018 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <thread>
#include <sys/socket.h>
#include "server.h"
#include "log.h"
#include "interface.h"
#include "laminar.capnp.h"
#include "tempdir.h"
#include "rpc.h"
class MockLaminar : public LaminarInterface {
public:
LaminarClient* client = nullptr;
~MockLaminar() {}
virtual void registerClient(LaminarClient* c) override {
ASSERT_EQ(nullptr, client);
client = c;
EXPECT_CALL(*this, sendStatus(client)).Times(testing::Exactly(1));
}
virtual void deregisterClient(LaminarClient* c) override {
ASSERT_EQ(client, c);
client = nullptr;
}
// MOCK_METHOD does not seem to work with return values whose destructors have noexcept(false)
kj::Maybe<kj::Own<const kj::ReadableFile>> getArtefact(std::string path) override { return nullptr; }
MOCK_METHOD2(queueJob, std::shared_ptr<Run>(std::string name, ParamMap params));
MOCK_METHOD1(registerWaiter, void(LaminarWaiter* waiter));
MOCK_METHOD1(deregisterWaiter, void(LaminarWaiter* waiter));
MOCK_METHOD1(latestRun, uint(std::string));
MOCK_METHOD4(handleLogRequest, bool(std::string, uint, std::string&, bool&));
MOCK_METHOD1(sendStatus, void(LaminarClient* client));
MOCK_METHOD4(setParam, bool(std::string job, uint buildNum, std::string param, std::string value));
MOCK_METHOD0(listQueuedJobs, const std::list<std::shared_ptr<Run>>&());
MOCK_METHOD0(listRunningJobs, const RunSet&());
MOCK_METHOD0(listKnownJobs, std::list<std::string>());
MOCK_METHOD0(getCustomCss, std::string());
MOCK_METHOD2(handleBadgeRequest, bool(std::string, std::string&));
MOCK_METHOD2(abort, bool(std::string, uint));
MOCK_METHOD0(abortAll, void());
MOCK_METHOD0(notifyConfigChanged, void());
};
class ServerTest : public ::testing::Test {
protected:
void SetUp() override {
EXPECT_CALL(mockLaminar, registerWaiter(testing::_));
EXPECT_CALL(mockLaminar, deregisterWaiter(testing::_));
server = new Server(mockLaminar, "unix:"+std::string(tempDir.path.append("rpc.sock").toString(true).cStr()), "127.0.0.1:8080");
}
void TearDown() override {
delete server;
}
LaminarCi::Client client() const {
return server->rpc->rpcInterface.castAs<LaminarCi>();
}
kj::WaitScope& ws() const {
return server->ioContext.waitScope;
}
void waitForHttpReady() {
server->httpReady.promise.wait(server->ioContext.waitScope);
}
kj::Network& network() { return server->ioContext.provider->getNetwork(); }
TempDir tempDir;
MockLaminar mockLaminar;
Server* server;
};
TEST_F(ServerTest, RpcQueue) {
auto req = client().queueRequest();
req.setJobName("foo");
EXPECT_CALL(mockLaminar, queueJob("foo", ParamMap())).Times(testing::Exactly(1));
req.send().wait(ws());
}
// Tests that agressively closed websockets are properly removed
// and will not be attempted to be contacted again
TEST_F(ServerTest, HttpWebsocketRST) {
waitForHttpReady();
// TODO: generalize
constexpr const char* WS =
"GET / HTTP/1.1\r\n"
"Host: localhost:8080\r\n"
"Connection: Upgrade\r\n"
"Upgrade: websocket\r\n"
"Sec-WebSocket-Key: GTFmrUCM9N6B32LdDE3Rzw==\r\n"
"Sec-WebSocket-Version: 13\r\n\r\n";
static char buffer[256];
network().parseAddress("localhost:8080").then([this](kj::Own<kj::NetworkAddress>&& addr){
return addr->connect().attach(kj::mv(addr)).then([this](kj::Own<kj::AsyncIoStream>&& stream){
kj::AsyncIoStream* s = stream.get();
return s->write(WS, strlen(WS)).then([this,s](){
// Read the websocket header response, ensure the client has been registered
return s->tryRead(buffer, 64, 256).then([this,s](size_t sz){
EXPECT_LE(64, sz);
EXPECT_NE(nullptr, mockLaminar.client);
// agressively abort the connection
struct linger so_linger;
so_linger.l_onoff = 1;
so_linger.l_linger = 0;
s->setsockopt(SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
return kj::Promise<void>(kj::READY_NOW);
});
}).attach(kj::mv(stream));
});
}).wait(ws());
ws().poll();
// Expect that the client has been cleared. If it has not, Laminar could
// try to write to the closed file descriptor, causing an exception
EXPECT_EQ(nullptr, mockLaminar.client);
}
Loading…
Cancel
Save