diff --git a/CMakeLists.txt b/CMakeLists.txt index 98cdf76..c3aa7fe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -86,7 +86,7 @@ generate_compressed_bins(${CMAKE_BINARY_DIR} js/vue-router.min.js js/vue.min.js add_executable(laminard src/database.cpp src/main.cpp src/server.cpp src/laminar.cpp src/conf.cpp src/resources.cpp src/run.cpp laminar.capnp.c++ ${COMPRESSED_BINS}) # TODO: some alternative to boost::filesystem? -target_link_libraries(laminard capnp-rpc capnp kj-async kj pthread boost_filesystem boost_system sqlite3 z) +target_link_libraries(laminard capnp-rpc capnp kj-http kj-async kj pthread boost_filesystem boost_system sqlite3 z) ## Client add_executable(laminarc src/client.cpp laminar.capnp.c++) @@ -98,7 +98,7 @@ if(BUILD_TESTS) find_package(GTest REQUIRED) include_directories(${GTEST_INCLUDE_DIRS} src) add_executable(laminar-tests src/conf.cpp src/database.cpp src/laminar.cpp src/run.cpp src/server.cpp laminar.capnp.c++ src/resources.cpp ${COMPRESSED_BINS} test/test-conf.cpp test/test-database.cpp test/test-laminar.cpp test/test-run.cpp test/test-server.cpp) - target_link_libraries(laminar-tests ${GTEST_BOTH_LIBRARIES} gmock capnp-rpc capnp kj-async kj pthread boost_filesystem boost_system sqlite3 z) + target_link_libraries(laminar-tests ${GTEST_BOTH_LIBRARIES} gmock capnp-rpc capnp kj-http kj-async kj pthread boost_filesystem boost_system sqlite3 z) endif() set(SYSTEMD_UNITDIR /lib/systemd/system CACHE PATH "Path to systemd unit files") diff --git a/README.md b/README.md index 01ae085..aa133a7 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ See [the website](http://laminar.ohwg.net) and the [documentation](http://lamina ## Building from source -First install development packages for `capnproto (git)`, `rapidjson`, `websocketpp`, `sqlite` and `boost-filesystem` from your distribution's repository or other source. Then: +First install development packages for `capnproto (git)`, `rapidjson`, `sqlite` and `boost-filesystem` from your distribution's repository or other source. Then: ```bash git clone https://github.com/ohwgiles/laminar.git diff --git a/docker-build-centos.sh b/docker-build-centos.sh index 4b9d83c..ee71fab 100755 --- a/docker-build-centos.sh +++ b/docker-build-centos.sh @@ -17,28 +17,21 @@ docker run --rm -i -v $SOURCE_DIR:/root/rpmbuild/SOURCES/laminar-$VERSION:ro -v mkdir /build cd /build -wget -O capnproto.tar.gz https://github.com/capnproto/capnproto/archive/3079784bfaf3ba05edacfc63d6d494b76a85a3a5.tar.gz -wget -O websocketpp.tar.gz https://github.com/zaphoyd/websocketpp/archive/0.7.0.tar.gz +wget -O capnproto.tar.gz https://github.com/capnproto/capnproto/archive/06a7136708955d91f8ddc1fa3d54e620eacba13e.tar.gz wget -O rapidjson.tar.gz https://github.com/miloyip/rapidjson/archive/v1.1.0.tar.gz md5sum -c < #include -#include -#include - #include #include #include @@ -42,25 +39,6 @@ // a multiple of sizeof(struct signalfd_siginfo) == 128 #define PROC_IO_BUFSIZE 4096 -// Configuration struct for the websocketpp template library. -struct wsconfig : public websocketpp::config::core { -// static const websocketpp::log::level elog_level = -// websocketpp::log::elevel::info; - -// static const websocketpp::log::level alog_level = -// websocketpp::log::alevel::access_core | -// websocketpp::log::alevel::message_payload ; - - static const websocketpp::log::level elog_level = - websocketpp::log::elevel::none; - - static const websocketpp::log::level alog_level = - websocketpp::log::alevel::none; - - typedef struct { LaminarClient* lc; } connection_base; -}; -typedef websocketpp::server websocket; - namespace { // Used for returning run state to RPC clients @@ -200,125 +178,162 @@ private: std::unordered_map>> runWaiters; }; - -// This is the implementation of the HTTP/Websocket interface. It exposes +// This is the implementation of the HTTP/Websocket interface. It creates // websocket connections as LaminarClients and registers them with the // LaminarInterface so that status messages will be delivered to the client. // On opening a websocket connection, it delivers a status snapshot message // (see LaminarInterface::sendStatus) -class Server::HttpImpl { +class HttpImpl : public kj::HttpService { public: - HttpImpl(LaminarInterface& l) : - laminar(l) - { - // debug logging - // wss.set_access_channels(websocketpp::log::alevel::all); - // wss.set_error_channels(websocketpp::log::elevel::all); + HttpImpl(LaminarInterface& laminar, kj::HttpHeaderTable&tbl) : + laminar(laminar), + responseHeaders(tbl) + {} + virtual ~HttpImpl() {} - // Handle incoming websocket message - wss.set_message_handler([this](websocketpp::connection_hdl hdl, websocket::message_ptr msg){ - websocket::connection_ptr c = wss.get_con_from_hdl(hdl); - std::string payload = msg->get_payload(); - rapidjson::Document d; - d.ParseInsitu(const_cast(payload.data())); - if(d.HasMember("page") && d["page"].IsInt()) { - int page = d["page"].GetInt(); - c->lc->scope.page = page; - laminar.sendStatus(c->lc); +private: + // Implements LaminarClient and holds the Websocket connection object. + // Automatically destructed when the promise created in request() resolves + // or is cancelled + class WebsocketClient : public LaminarClient { + public: + WebsocketClient(LaminarInterface& laminar, kj::Own&& ws) : + laminar(laminar), + ws(kj::mv(ws)) + {} + ~WebsocketClient() override { + laminar.deregisterClient(this); + } + virtual void sendMessage(std::string payload) override { + messages.push_back(kj::mv(payload)); + // sendMessage might be called several times before the event loop + // gets a chance to act on the fulfiller. So store the payload here + // where it can be fetched later and don't pass the payload with the + // fulfiller because subsequent calls to fulfill() are ignored. + fulfiller->fulfill(); + } + LaminarInterface& laminar; + kj::Own ws; + std::list messages; + kj::Own> fulfiller; + }; + + kj::Promise handleWebsocket(WebsocketClient& lc) { + auto paf = kj::newPromiseAndFulfiller(); + lc.fulfiller = kj::mv(paf.fulfiller); + return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) -> kj::Promise { + KJ_SWITCH_ONEOF(message) { + KJ_CASE_ONEOF(str, kj::String) { + rapidjson::Document d; + d.ParseInsitu(const_cast(str.cStr())); + if(d.HasMember("page") && d["page"].IsInt()) { + int page = d["page"].GetInt(); + lc.scope.page = page; + laminar.sendStatus(&lc); + // freeze this promise. sendStatus will cause the other half of + // exclusiveJoin below to proceed and this branch will be cancelled + return kj::NEVER_DONE; + } + } + KJ_CASE_ONEOF(close, kj::WebSocket::Close) { + // clean socket shutdown + return lc.ws->close(close.code, close.reason).then([]{return false;}); + } + KJ_CASE_ONEOF_DEFAULT {} } + // unhandled/unknown message + return lc.ws->disconnect().then([]{return false;}); + }).exclusiveJoin(kj::mv(paf.promise).then([&lc]{ + kj::Promise p = kj::READY_NOW; + for(std::string& s : lc.messages) { + p = p.then([&s,&lc]{ + kj::String str = kj::str(s); + return lc.ws->send(str).attach(kj::mv(str)); + }); + } + return p.then([]{return true;}); + //return lc.ws->send(str).attach(kj::mv(str)).then([]{ return true;}); + })).then([this,&lc](bool cont){ + return cont ? handleWebsocket(lc) : kj::READY_NOW; }); + } - // Handle plain HTTP requests by delivering the binary resource - wss.set_http_handler([this](websocketpp::connection_hdl hdl){ - websocket::connection_ptr c = wss.get_con_from_hdl(hdl); + kj::Promise websocketUpgraded(WebsocketClient& lc, std::string resource) { + // convert the requested URL to a MonitorScope + if(resource.substr(0, 5) == "/jobs") { + if(resource.length() == 5) { + lc.scope.type = MonitorScope::ALL; + } else { + resource = resource.substr(5); + size_t split = resource.find('/',1); + std::string job = resource.substr(1,split-1); + if(!job.empty()) { + lc.scope.job = job; + lc.scope.type = MonitorScope::JOB; + } + if(split != std::string::npos) { + size_t split2 = resource.find('/', split+1); + std::string run = resource.substr(split+1, split2-split); + if(!run.empty()) { + lc.scope.num = static_cast(atoi(run.c_str())); + lc.scope.type = MonitorScope::RUN; + } + if(split2 != std::string::npos && resource.compare(split2, 4, "/log") == 0) { + lc.scope.type = MonitorScope::LOG; + } + } + } + } + laminar.registerClient(&lc); + kj::Promise connection = handleWebsocket(lc); + // registerClient can happen after a successful websocket handshake. + // However, the connection might not be closed gracefully, so the + // corresponding deregister operation happens in the WebsocketClient + // destructor rather than the close handler or a then() clause + laminar.sendStatus(&lc); + return connection; + } + + virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, Response& response) override + { + std::string resource = url.cStr(); + if(headers.isWebSocket()) { + responseHeaders.clear(); + kj::Own lc = kj::heap(laminar, response.acceptWebSocket(responseHeaders)); + return websocketUpgraded(*lc, resource).attach(kj::mv(lc)); + } else { + // handle regular HTTP request const char* start, *end, *content_type; - std::string resource = c->get_resource(); + responseHeaders.clear(); if(resource.compare(0, strlen("/archive/"), "/archive/") == 0) { std::string file(resource.substr(strlen("/archive/"))); std::string content; if(laminar.getArtefact(file, content)) { - c->set_status(websocketpp::http::status_code::ok); - c->append_header("Content-Transfer-Encoding", "binary"); - c->set_body(content); - } else { - c->set_status(websocketpp::http::status_code::not_found); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + auto stream = response.send(200, "OK", responseHeaders, content.size()); + return stream->write(content.data(), content.size()).attach(kj::mv(stream)); } } else if(resource.compare("/custom/style.css") == 0) { - c->set_status(websocketpp::http::status_code::ok); - c->append_header("Content-Type", "text/css; charset=utf-8"); - c->append_header("Content-Transfer-Encoding", "binary"); - c->set_body(laminar.getCustomCss()); + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + std::string css = laminar.getCustomCss(); + auto stream = response.send(200, "OK", responseHeaders, css.size()); + return stream->write(css.data(), css.size()).attach(kj::mv(stream)); } else if(resources.handleRequest(resource, &start, &end, &content_type)) { - c->set_status(websocketpp::http::status_code::ok); - c->append_header("Content-Type", content_type); - c->append_header("Content-Encoding", "gzip"); - c->append_header("Content-Transfer-Encoding", "binary"); - std::string response(start,end); - c->set_body(response); - } else { - // 404 - c->set_status(websocketpp::http::status_code::not_found); + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); + responseHeaders.add("Content-Encoding", "gzip"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + auto stream = response.send(200, "OK", responseHeaders, end-start); + return stream->write(start, end-start).attach(kj::mv(stream)); } - }); - - // Handle new websocket connection. Parse the URL to determine - // the client's scope of interest, register the client for update - // messages, and call sendStatus. - wss.set_open_handler([this](websocketpp::connection_hdl hdl){ - websocket::connection_ptr c = wss.get_con_from_hdl(hdl); - std::string res = c->get_resource(); - if(res.substr(0, 5) == "/jobs") { - if(res.length() == 5) { - c->lc->scope.type = MonitorScope::ALL; - } else { - res = res.substr(5); - size_t split = res.find('/',1); - std::string job = res.substr(1,split-1); - if(!job.empty()) { - c->lc->scope.job = job; - c->lc->scope.type = MonitorScope::JOB; - } - if(split != std::string::npos) { - size_t split2 = res.find('/', split+1); - std::string run = res.substr(split+1, split2-split); - if(!run.empty()) { - c->lc->scope.num = static_cast(atoi(run.c_str())); - c->lc->scope.type = MonitorScope::RUN; - } - if(split2 != std::string::npos && res.compare(split2, 4, "/log") == 0) { - c->lc->scope.type = MonitorScope::LOG; - } - } - } - } - // registerClient can happen after a successful websocket handshake. - // However, the connection might not be closed gracefully, so the - // corresponding deregister operation happens in the connection - // destructor rather than the close handler - laminar.registerClient(c->lc); - laminar.sendStatus(c->lc); - }); + return response.sendError(404, "Not Found", responseHeaders); + } } - // Return a new connection object linked with the context defined below. - // This is a bit untidy, it would be better to make them a single object, - // but I didn't yet figure it out - websocket::connection_ptr newConnection(LaminarClient* lc) { - websocket::connection_ptr c = wss.get_connection(); - c->lc = lc; - return c; - } - - void connectionDestroyed(LaminarClient* lc) { - // This will be called for all connections, not just websockets, so - // the laminar instance should silently ignore unknown clients - laminar.deregisterClient(lc); - } - -private: - Resources resources; LaminarInterface& laminar; - websocket wss; + Resources resources; + kj::HttpHeaders responseHeaders; }; // Context for an RPC connection @@ -336,77 +351,14 @@ struct RpcConnection { capnp::RpcSystem rpcSystem; }; -// Context for a WebsocketConnection (implements LaminarClient) -// This object maps read and write handlers between the websocketpp library -// and the corresponding kj async methods -struct Server::WebsocketConnection : public LaminarClient { - WebsocketConnection(kj::Own&& stream, Server::HttpImpl& http) : - http(http), - stream(kj::mv(stream)), - cn(http.newConnection(this)), - writePaf(kj::newPromiseAndFulfiller()) - { - cn->set_write_handler([this](websocketpp::connection_hdl, const char* s, size_t sz) { - outputBuffer.append(std::string(s, sz)); - writePaf.fulfiller->fulfill(); - return std::error_code(); - }); - cn->start(); - } - - virtual ~WebsocketConnection() noexcept(true) override { - // Removes the connection from the list of registered clients. Must be - // here rather than in the websocket closing handshake because connections - // might be unexpectedly/aggressively closed and any references must be - // removed. - http.connectionDestroyed(this); - } - - kj::Promise pend() { - return stream->tryRead(ibuf, 1, sizeof(ibuf)).then([this](size_t sz){ - cn->read_some(ibuf, sz); - if(sz == 0 || cn->get_state() == websocketpp::session::state::closed) { - return kj::Promise(kj::READY_NOW); - } - return pend(); - }); - } - - kj::Promise writeTask() { - return writePaf.promise.then([this]() { - std::string payload; - // clear the outputBuffer for more context, and take a chunk - // to send now - payload.swap(outputBuffer); - writePaf = kj::newPromiseAndFulfiller(); - KJ_ASSERT(!payload.empty()); - return stream->write(payload.data(), payload.size()).then([this](){ - if(cn->get_state() == websocketpp::session::state::closed) { - return kj::Promise(kj::READY_NOW); - } - return writeTask(); - }).attach(kj::mv(payload)); - }); - } - - void sendMessage(std::string payload) override { - cn->send(payload, websocketpp::frame::opcode::text); - } - - HttpImpl& http; - kj::Own stream; - websocket::connection_ptr cn; - std::string outputBuffer; - kj::PromiseFulfillerPair writePaf; - char ibuf[131072]; -}; - Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, kj::StringPtr httpBindAddress) : rpcInterface(kj::heap(li)), laminarInterface(li), - httpInterface(kj::heap(li)), ioContext(kj::setupAsyncIo()), + headerTable(), + httpService(kj::heap(li, headerTable)), + httpServer(kj::heap(ioContext.provider->getTimer(), headerTable, *httpService)), listeners(kj::heap(*this)), childTasks(*this), httpConnections(*this), @@ -427,7 +379,8 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, .then([this](kj::Own&& addr) { // TODO: a better way? Currently used only for testing httpReady.fulfiller->fulfill(); - return acceptHttpClient(addr->listen()); + kj::Own listener = addr->listen(); + return httpServer->listenHttp(*listener).attach(kj::mv(listener)); })); // handle SIGCHLD @@ -507,17 +460,6 @@ void Server::addWatchPath(const char* dpath) { inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE); } -kj::Promise Server::acceptHttpClient(kj::Own&& listener) { - kj::ConnectionReceiver& cr = *listener.get(); - return cr.accept().then(kj::mvCapture(kj::mv(listener), - [this](kj::Own&& listener, kj::Own&& connection) { - auto conn = kj::heap(kj::mv(connection), *httpInterface); - // delete the connection when either the read or write task completes - httpConnections.add(conn->pend().exclusiveJoin(conn->writeTask()).attach(kj::mv(conn))); - return acceptHttpClient(kj::mv(listener)); - })); -} - kj::Promise Server::acceptRpcClient(kj::Own&& listener) { kj::ConnectionReceiver& cr = *listener.get(); return cr.accept().then(kj::mvCapture(kj::mv(listener), diff --git a/src/server.h b/src/server.h index 7df256a..6d647ae 100644 --- a/src/server.h +++ b/src/server.h @@ -20,6 +20,7 @@ #define LAMINAR_SERVER_H_ #include +#include #include #include #include @@ -50,21 +51,19 @@ public: void addWatchPath(const char* dpath); private: - kj::Promise acceptHttpClient(kj::Own&& listener); kj::Promise acceptRpcClient(kj::Own&& listener); kj::Promise handleFdRead(kj::AsyncInputStream* stream, char* buffer, std::function cb); void taskFailed(kj::Exception&& exception) override; private: - struct WebsocketConnection; - class HttpImpl; - int efd_quit; capnp::Capability::Client rpcInterface; LaminarInterface& laminarInterface; - kj::Own httpInterface; kj::AsyncIoContext ioContext; + kj::HttpHeaderTable headerTable; + kj::Own httpService; + kj::Own httpServer; kj::Own listeners; kj::TaskSet childTasks; kj::TaskSet httpConnections;