mirror of
				https://github.com/ohwgiles/laminar.git
				synced 2025-06-13 12:54:29 +00:00 
			
		
		
		
	Implement websocket communication with kj-http
Now that capnp/kj provides http and websocket functions, replace the excellent websocketpp library with the kj functions. This removes a dependency and allows for more consistent idiomatic code. Thanks websocketpp, it was great to have you along! This should enable parts of the refactor described in #49
This commit is contained in:
		
							parent
							
								
									0b15939f90
								
							
						
					
					
						commit
						4c2aa2680f
					
				| @ -86,7 +86,7 @@ generate_compressed_bins(${CMAKE_BINARY_DIR} js/vue-router.min.js js/vue.min.js | |||||||
| add_executable(laminard src/database.cpp src/main.cpp src/server.cpp src/laminar.cpp | add_executable(laminard src/database.cpp src/main.cpp src/server.cpp src/laminar.cpp | ||||||
|     src/conf.cpp src/resources.cpp src/run.cpp laminar.capnp.c++ ${COMPRESSED_BINS}) |     src/conf.cpp src/resources.cpp src/run.cpp laminar.capnp.c++ ${COMPRESSED_BINS}) | ||||||
| # TODO: some alternative to boost::filesystem? | # TODO: some alternative to boost::filesystem? | ||||||
| target_link_libraries(laminard capnp-rpc capnp kj-async kj pthread boost_filesystem boost_system sqlite3 z) | target_link_libraries(laminard capnp-rpc capnp kj-http kj-async kj pthread boost_filesystem boost_system sqlite3 z) | ||||||
| 
 | 
 | ||||||
| ## Client | ## Client | ||||||
| add_executable(laminarc src/client.cpp laminar.capnp.c++) | add_executable(laminarc src/client.cpp laminar.capnp.c++) | ||||||
| @ -98,7 +98,7 @@ if(BUILD_TESTS) | |||||||
|     find_package(GTest REQUIRED) |     find_package(GTest REQUIRED) | ||||||
|     include_directories(${GTEST_INCLUDE_DIRS} src) |     include_directories(${GTEST_INCLUDE_DIRS} src) | ||||||
|     add_executable(laminar-tests src/conf.cpp src/database.cpp src/laminar.cpp src/run.cpp src/server.cpp laminar.capnp.c++ src/resources.cpp ${COMPRESSED_BINS} test/test-conf.cpp test/test-database.cpp test/test-laminar.cpp test/test-run.cpp test/test-server.cpp) |     add_executable(laminar-tests src/conf.cpp src/database.cpp src/laminar.cpp src/run.cpp src/server.cpp laminar.capnp.c++ src/resources.cpp ${COMPRESSED_BINS} test/test-conf.cpp test/test-database.cpp test/test-laminar.cpp test/test-run.cpp test/test-server.cpp) | ||||||
|     target_link_libraries(laminar-tests ${GTEST_BOTH_LIBRARIES} gmock capnp-rpc capnp kj-async kj pthread boost_filesystem boost_system sqlite3 z) |     target_link_libraries(laminar-tests ${GTEST_BOTH_LIBRARIES} gmock capnp-rpc capnp kj-http kj-async kj pthread boost_filesystem boost_system sqlite3 z) | ||||||
| endif() | endif() | ||||||
| 
 | 
 | ||||||
| set(SYSTEMD_UNITDIR /lib/systemd/system CACHE PATH "Path to systemd unit files") | set(SYSTEMD_UNITDIR /lib/systemd/system CACHE PATH "Path to systemd unit files") | ||||||
|  | |||||||
| @ -10,7 +10,7 @@ See [the website](http://laminar.ohwg.net) and the [documentation](http://lamina | |||||||
| 
 | 
 | ||||||
| ## Building from source | ## Building from source | ||||||
| 
 | 
 | ||||||
| First install development packages for `capnproto (git)`, `rapidjson`, `websocketpp`, `sqlite` and `boost-filesystem` from your distribution's repository or other source. Then: | First install development packages for `capnproto (git)`, `rapidjson`, `sqlite` and `boost-filesystem` from your distribution's repository or other source. Then: | ||||||
| 
 | 
 | ||||||
| ```bash | ```bash | ||||||
| git clone https://github.com/ohwgiles/laminar.git | git clone https://github.com/ohwgiles/laminar.git | ||||||
|  | |||||||
| @ -17,28 +17,21 @@ docker run --rm -i -v $SOURCE_DIR:/root/rpmbuild/SOURCES/laminar-$VERSION:ro -v | |||||||
| mkdir /build | mkdir /build | ||||||
| cd /build | cd /build | ||||||
| 
 | 
 | ||||||
| wget -O capnproto.tar.gz https://github.com/capnproto/capnproto/archive/3079784bfaf3ba05edacfc63d6d494b76a85a3a5.tar.gz | wget -O capnproto.tar.gz https://github.com/capnproto/capnproto/archive/06a7136708955d91f8ddc1fa3d54e620eacba13e.tar.gz | ||||||
| wget -O websocketpp.tar.gz https://github.com/zaphoyd/websocketpp/archive/0.7.0.tar.gz |  | ||||||
| wget -O rapidjson.tar.gz https://github.com/miloyip/rapidjson/archive/v1.1.0.tar.gz | wget -O rapidjson.tar.gz https://github.com/miloyip/rapidjson/archive/v1.1.0.tar.gz | ||||||
| md5sum -c <<EOF | md5sum -c <<EOF | ||||||
| c5c04c1892a381e30bd032a6bceef111  capnproto.tar.gz | a24b4d6e671d97c8a2aacd0dd4677726  capnproto.tar.gz | ||||||
| 5027c20cde76fdaef83a74acfcf98e23  websocketpp.tar.gz |  | ||||||
| badd12c511e081fec6c89c43a7027bce  rapidjson.tar.gz | badd12c511e081fec6c89c43a7027bce  rapidjson.tar.gz | ||||||
| EOF | EOF | ||||||
| 
 | 
 | ||||||
| tar xzf capnproto.tar.gz | tar xzf capnproto.tar.gz | ||||||
| tar xzf websocketpp.tar.gz |  | ||||||
| tar xzf rapidjson.tar.gz | tar xzf rapidjson.tar.gz | ||||||
| 
 | 
 | ||||||
| cd /build/capnproto-3079784bfaf3ba05edacfc63d6d494b76a85a3a5/c++/ | cd /build/capnproto-06a7136708955d91f8ddc1fa3d54e620eacba13e/c++/ | ||||||
| cmake3 -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTING=off . | cmake3 -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTING=off . | ||||||
| make -j4 | make -j4 | ||||||
| make install | make install | ||||||
| 
 | 
 | ||||||
| cd /build/websocketpp-0.7.0/ |  | ||||||
| cmake3 . |  | ||||||
| make install |  | ||||||
| 
 |  | ||||||
| cd /build/rapidjson-1.1.0/ | cd /build/rapidjson-1.1.0/ | ||||||
| cmake3 -DRAPIDJSON_BUILD_EXAMPLES=off . | cmake3 -DRAPIDJSON_BUILD_EXAMPLES=off . | ||||||
| make install | make install | ||||||
|  | |||||||
| @ -17,28 +17,21 @@ docker run --rm -i -v $SOURCE_DIR:/laminar:ro -v $OUTPUT_DIR:/output $DOCKER_TAG | |||||||
| mkdir /build | mkdir /build | ||||||
| cd /build | cd /build | ||||||
| 
 | 
 | ||||||
| wget -O capnproto.tar.gz https://github.com/capnproto/capnproto/archive/3079784bfaf3ba05edacfc63d6d494b76a85a3a5.tar.gz | wget -O capnproto.tar.gz https://github.com/capnproto/capnproto/archive/06a7136708955d91f8ddc1fa3d54e620eacba13e.tar.gz | ||||||
| wget -O websocketpp.tar.gz https://github.com/zaphoyd/websocketpp/archive/0.7.0.tar.gz |  | ||||||
| wget -O rapidjson.tar.gz https://github.com/miloyip/rapidjson/archive/v1.1.0.tar.gz | wget -O rapidjson.tar.gz https://github.com/miloyip/rapidjson/archive/v1.1.0.tar.gz | ||||||
| md5sum -c <<EOF | md5sum -c <<EOF | ||||||
| c5c04c1892a381e30bd032a6bceef111  capnproto.tar.gz | a24b4d6e671d97c8a2aacd0dd4677726  capnproto.tar.gz | ||||||
| 5027c20cde76fdaef83a74acfcf98e23  websocketpp.tar.gz |  | ||||||
| badd12c511e081fec6c89c43a7027bce  rapidjson.tar.gz | badd12c511e081fec6c89c43a7027bce  rapidjson.tar.gz | ||||||
| EOF | EOF | ||||||
| 
 | 
 | ||||||
| tar xzf capnproto.tar.gz | tar xzf capnproto.tar.gz | ||||||
| tar xzf websocketpp.tar.gz |  | ||||||
| tar xzf rapidjson.tar.gz | tar xzf rapidjson.tar.gz | ||||||
| 
 | 
 | ||||||
| cd /build/capnproto-3079784bfaf3ba05edacfc63d6d494b76a85a3a5/c++/ | cd /build/capnproto-06a7136708955d91f8ddc1fa3d54e620eacba13e/c++/ | ||||||
| cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTING=off . | cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTING=off . | ||||||
| make -j4 | make -j4 | ||||||
| make install | make install | ||||||
| 
 | 
 | ||||||
| cd /build/websocketpp-0.7.0/ |  | ||||||
| cmake . |  | ||||||
| make install |  | ||||||
| 
 |  | ||||||
| cd /build/rapidjson-1.1.0/ | cd /build/rapidjson-1.1.0/ | ||||||
| cmake -DRAPIDJSON_BUILD_EXAMPLES=off . | cmake -DRAPIDJSON_BUILD_EXAMPLES=off . | ||||||
| make install | make install | ||||||
|  | |||||||
| @ -67,7 +67,7 @@ struct MonitorScope { | |||||||
| // matching the supplied scope. Pass instances of this to LaminarInterface
 | // matching the supplied scope. Pass instances of this to LaminarInterface
 | ||||||
| // registerClient and deregisterClient
 | // registerClient and deregisterClient
 | ||||||
| struct LaminarClient { | struct LaminarClient { | ||||||
|     virtual ~LaminarClient() =default; |     virtual ~LaminarClient() noexcept(false) {} | ||||||
|     virtual void sendMessage(std::string payload) = 0; |     virtual void sendMessage(std::string payload) = 0; | ||||||
|     MonitorScope scope; |     MonitorScope scope; | ||||||
| }; | }; | ||||||
|  | |||||||
							
								
								
									
										338
									
								
								src/server.cpp
									
									
									
									
									
								
							
							
						
						
									
										338
									
								
								src/server.cpp
									
									
									
									
									
								
							| @ -28,9 +28,6 @@ | |||||||
| #include <kj/async-io.h> | #include <kj/async-io.h> | ||||||
| #include <kj/threadlocal.h> | #include <kj/threadlocal.h> | ||||||
| 
 | 
 | ||||||
| #include <websocketpp/config/core.hpp> |  | ||||||
| #include <websocketpp/server.hpp> |  | ||||||
| 
 |  | ||||||
| #include <sys/eventfd.h> | #include <sys/eventfd.h> | ||||||
| #include <sys/inotify.h> | #include <sys/inotify.h> | ||||||
| #include <sys/signal.h> | #include <sys/signal.h> | ||||||
| @ -42,25 +39,6 @@ | |||||||
| // a multiple of sizeof(struct signalfd_siginfo) == 128
 | // a multiple of sizeof(struct signalfd_siginfo) == 128
 | ||||||
| #define PROC_IO_BUFSIZE 4096 | #define PROC_IO_BUFSIZE 4096 | ||||||
| 
 | 
 | ||||||
| // Configuration struct for the websocketpp template library.
 |  | ||||||
| struct wsconfig : public websocketpp::config::core { |  | ||||||
| //    static const websocketpp::log::level elog_level =
 |  | ||||||
| //        websocketpp::log::elevel::info;
 |  | ||||||
| 
 |  | ||||||
| //    static const websocketpp::log::level alog_level =
 |  | ||||||
| //        websocketpp::log::alevel::access_core |
 |  | ||||||
| //        websocketpp::log::alevel::message_payload ;
 |  | ||||||
| 
 |  | ||||||
|     static const websocketpp::log::level elog_level = |  | ||||||
|         websocketpp::log::elevel::none; |  | ||||||
| 
 |  | ||||||
|     static const websocketpp::log::level alog_level = |  | ||||||
|         websocketpp::log::alevel::none; |  | ||||||
| 
 |  | ||||||
|     typedef struct { LaminarClient* lc; } connection_base; |  | ||||||
| }; |  | ||||||
| typedef websocketpp::server<wsconfig> websocket; |  | ||||||
| 
 |  | ||||||
| namespace { | namespace { | ||||||
| 
 | 
 | ||||||
| // Used for returning run state to RPC clients
 | // Used for returning run state to RPC clients
 | ||||||
| @ -200,125 +178,162 @@ private: | |||||||
|     std::unordered_map<const Run*, std::list<kj::PromiseFulfillerPair<RunState>>> runWaiters; |     std::unordered_map<const Run*, std::list<kj::PromiseFulfillerPair<RunState>>> runWaiters; | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| 
 | // This is the implementation of the HTTP/Websocket interface. It creates
 | ||||||
| // This is the implementation of the HTTP/Websocket interface. It exposes
 |  | ||||||
| // websocket connections as LaminarClients and registers them with the
 | // websocket connections as LaminarClients and registers them with the
 | ||||||
| // LaminarInterface so that status messages will be delivered to the client.
 | // LaminarInterface so that status messages will be delivered to the client.
 | ||||||
| // On opening a websocket connection, it delivers a status snapshot message
 | // On opening a websocket connection, it delivers a status snapshot message
 | ||||||
| // (see LaminarInterface::sendStatus)
 | // (see LaminarInterface::sendStatus)
 | ||||||
| class Server::HttpImpl { | class HttpImpl : public kj::HttpService { | ||||||
| public: | public: | ||||||
|     HttpImpl(LaminarInterface& l) : |     HttpImpl(LaminarInterface& laminar, kj::HttpHeaderTable&tbl) : | ||||||
|         laminar(l) |         laminar(laminar), | ||||||
|     { |         responseHeaders(tbl) | ||||||
|         // debug logging
 |     {} | ||||||
|         // wss.set_access_channels(websocketpp::log::alevel::all);
 |     virtual ~HttpImpl() {} | ||||||
|         // wss.set_error_channels(websocketpp::log::elevel::all);
 |  | ||||||
| 
 | 
 | ||||||
|         // Handle incoming websocket message
 | private: | ||||||
|         wss.set_message_handler([this](websocketpp::connection_hdl hdl, websocket::message_ptr msg){ |     // Implements LaminarClient and holds the Websocket connection object.
 | ||||||
|             websocket::connection_ptr c = wss.get_con_from_hdl(hdl); |     // Automatically destructed when the promise created in request() resolves
 | ||||||
|             std::string payload = msg->get_payload(); |     // or is cancelled
 | ||||||
|             rapidjson::Document d; |     class WebsocketClient : public LaminarClient { | ||||||
|             d.ParseInsitu(const_cast<char*>(payload.data())); |     public: | ||||||
|             if(d.HasMember("page") && d["page"].IsInt()) { |         WebsocketClient(LaminarInterface& laminar, kj::Own<kj::WebSocket>&& ws) : | ||||||
|                 int page = d["page"].GetInt(); |             laminar(laminar), | ||||||
|                 c->lc->scope.page = page; |             ws(kj::mv(ws)) | ||||||
|                 laminar.sendStatus(c->lc); |         {} | ||||||
|  |         ~WebsocketClient() override { | ||||||
|  |             laminar.deregisterClient(this); | ||||||
|  |         } | ||||||
|  |         virtual void sendMessage(std::string payload) override { | ||||||
|  |             messages.push_back(kj::mv(payload)); | ||||||
|  |             // sendMessage might be called several times before the event loop
 | ||||||
|  |             // gets a chance to act on the fulfiller. So store the payload here
 | ||||||
|  |             // where it can be fetched later and don't pass the payload with the
 | ||||||
|  |             // fulfiller because subsequent calls to fulfill() are ignored.
 | ||||||
|  |             fulfiller->fulfill(); | ||||||
|  |         } | ||||||
|  |         LaminarInterface& laminar; | ||||||
|  |         kj::Own<kj::WebSocket> ws; | ||||||
|  |         std::list<std::string> messages; | ||||||
|  |         kj::Own<kj::PromiseFulfiller<void>> fulfiller; | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     kj::Promise<void> handleWebsocket(WebsocketClient& lc) { | ||||||
|  |         auto paf = kj::newPromiseAndFulfiller<void>(); | ||||||
|  |         lc.fulfiller = kj::mv(paf.fulfiller); | ||||||
|  |         return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) -> kj::Promise<bool> { | ||||||
|  |             KJ_SWITCH_ONEOF(message) { | ||||||
|  |                 KJ_CASE_ONEOF(str, kj::String) { | ||||||
|  |                     rapidjson::Document d; | ||||||
|  |                     d.ParseInsitu(const_cast<char*>(str.cStr())); | ||||||
|  |                     if(d.HasMember("page") && d["page"].IsInt()) { | ||||||
|  |                         int page = d["page"].GetInt(); | ||||||
|  |                         lc.scope.page = page; | ||||||
|  |                         laminar.sendStatus(&lc); | ||||||
|  |                         // freeze this promise. sendStatus will cause the other half of
 | ||||||
|  |                         // exclusiveJoin below to proceed and this branch will be cancelled
 | ||||||
|  |                         return kj::NEVER_DONE; | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |                 KJ_CASE_ONEOF(close, kj::WebSocket::Close) { | ||||||
|  |                     // clean socket shutdown
 | ||||||
|  |                     return lc.ws->close(close.code, close.reason).then([]{return false;}); | ||||||
|  |                 } | ||||||
|  |                 KJ_CASE_ONEOF_DEFAULT {} | ||||||
|             } |             } | ||||||
|  |             // unhandled/unknown message
 | ||||||
|  |             return lc.ws->disconnect().then([]{return false;}); | ||||||
|  |         }).exclusiveJoin(kj::mv(paf.promise).then([&lc]{ | ||||||
|  |             kj::Promise<void> p = kj::READY_NOW; | ||||||
|  |             for(std::string& s : lc.messages) { | ||||||
|  |                 p = p.then([&s,&lc]{ | ||||||
|  |                     kj::String str = kj::str(s); | ||||||
|  |                     return lc.ws->send(str).attach(kj::mv(str)); | ||||||
|  |                 }); | ||||||
|  |             } | ||||||
|  |             return p.then([]{return true;}); | ||||||
|  |             //return lc.ws->send(str).attach(kj::mv(str)).then([]{ return true;});
 | ||||||
|  |         })).then([this,&lc](bool cont){ | ||||||
|  |             return cont ? handleWebsocket(lc) : kj::READY_NOW; | ||||||
|         }); |         }); | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|         // Handle plain HTTP requests by delivering the binary resource
 |     kj::Promise<void> websocketUpgraded(WebsocketClient& lc, std::string resource) { | ||||||
|         wss.set_http_handler([this](websocketpp::connection_hdl hdl){ |         // convert the requested URL to a MonitorScope
 | ||||||
|             websocket::connection_ptr c = wss.get_con_from_hdl(hdl); |         if(resource.substr(0, 5) == "/jobs") { | ||||||
|  |             if(resource.length() == 5) { | ||||||
|  |                 lc.scope.type = MonitorScope::ALL; | ||||||
|  |             } else { | ||||||
|  |                 resource = resource.substr(5); | ||||||
|  |                 size_t split = resource.find('/',1); | ||||||
|  |                 std::string job = resource.substr(1,split-1); | ||||||
|  |                 if(!job.empty()) { | ||||||
|  |                     lc.scope.job = job; | ||||||
|  |                     lc.scope.type = MonitorScope::JOB; | ||||||
|  |                 } | ||||||
|  |                 if(split != std::string::npos) { | ||||||
|  |                     size_t split2 = resource.find('/', split+1); | ||||||
|  |                     std::string run = resource.substr(split+1, split2-split); | ||||||
|  |                     if(!run.empty()) { | ||||||
|  |                         lc.scope.num = static_cast<uint>(atoi(run.c_str())); | ||||||
|  |                         lc.scope.type = MonitorScope::RUN; | ||||||
|  |                     } | ||||||
|  |                     if(split2 != std::string::npos && resource.compare(split2, 4, "/log") == 0) { | ||||||
|  |                         lc.scope.type = MonitorScope::LOG; | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         laminar.registerClient(&lc); | ||||||
|  |         kj::Promise<void> connection = handleWebsocket(lc); | ||||||
|  |         // registerClient can happen after a successful websocket handshake.
 | ||||||
|  |         // However, the connection might not be closed gracefully, so the
 | ||||||
|  |         // corresponding deregister operation happens in the WebsocketClient
 | ||||||
|  |         // destructor rather than the close handler or a then() clause
 | ||||||
|  |         laminar.sendStatus(&lc); | ||||||
|  |         return connection; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     virtual kj::Promise<void> request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, | ||||||
|  |             kj::AsyncInputStream& requestBody, Response& response) override | ||||||
|  |     { | ||||||
|  |         std::string resource = url.cStr(); | ||||||
|  |         if(headers.isWebSocket()) { | ||||||
|  |             responseHeaders.clear(); | ||||||
|  |             kj::Own<WebsocketClient> lc = kj::heap<WebsocketClient>(laminar, response.acceptWebSocket(responseHeaders)); | ||||||
|  |             return websocketUpgraded(*lc, resource).attach(kj::mv(lc)); | ||||||
|  |         } else { | ||||||
|  |             // handle regular HTTP request
 | ||||||
|             const char* start, *end, *content_type; |             const char* start, *end, *content_type; | ||||||
|             std::string resource = c->get_resource(); |             responseHeaders.clear(); | ||||||
|             if(resource.compare(0, strlen("/archive/"), "/archive/") == 0) { |             if(resource.compare(0, strlen("/archive/"), "/archive/") == 0) { | ||||||
|                 std::string file(resource.substr(strlen("/archive/"))); |                 std::string file(resource.substr(strlen("/archive/"))); | ||||||
|                 std::string content; |                 std::string content; | ||||||
|                 if(laminar.getArtefact(file, content)) { |                 if(laminar.getArtefact(file, content)) { | ||||||
|                     c->set_status(websocketpp::http::status_code::ok); |                     responseHeaders.add("Content-Transfer-Encoding", "binary"); | ||||||
|                     c->append_header("Content-Transfer-Encoding", "binary"); |                     auto stream = response.send(200, "OK", responseHeaders, content.size()); | ||||||
|                     c->set_body(content); |                     return stream->write(content.data(), content.size()).attach(kj::mv(stream)); | ||||||
|                 } else { |  | ||||||
|                     c->set_status(websocketpp::http::status_code::not_found); |  | ||||||
|                 } |                 } | ||||||
|             } else if(resource.compare("/custom/style.css") == 0) { |             } else if(resource.compare("/custom/style.css") == 0) { | ||||||
|                 c->set_status(websocketpp::http::status_code::ok); |                 responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8"); | ||||||
|                 c->append_header("Content-Type", "text/css; charset=utf-8"); |                 responseHeaders.add("Content-Transfer-Encoding", "binary"); | ||||||
|                 c->append_header("Content-Transfer-Encoding", "binary"); |                 std::string css = laminar.getCustomCss(); | ||||||
|                 c->set_body(laminar.getCustomCss()); |                 auto stream = response.send(200, "OK", responseHeaders, css.size()); | ||||||
|  |                 return stream->write(css.data(), css.size()).attach(kj::mv(stream)); | ||||||
|             } else if(resources.handleRequest(resource, &start, &end, &content_type)) { |             } else if(resources.handleRequest(resource, &start, &end, &content_type)) { | ||||||
|                 c->set_status(websocketpp::http::status_code::ok); |                 responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); | ||||||
|                 c->append_header("Content-Type", content_type); |                 responseHeaders.add("Content-Encoding", "gzip"); | ||||||
|                 c->append_header("Content-Encoding", "gzip"); |                 responseHeaders.add("Content-Transfer-Encoding", "binary"); | ||||||
|                 c->append_header("Content-Transfer-Encoding", "binary"); |                 auto stream = response.send(200, "OK", responseHeaders, end-start); | ||||||
|                 std::string response(start,end); |                 return stream->write(start, end-start).attach(kj::mv(stream)); | ||||||
|                 c->set_body(response); |  | ||||||
|             } else { |  | ||||||
|                 // 404
 |  | ||||||
|                 c->set_status(websocketpp::http::status_code::not_found); |  | ||||||
|             } |             } | ||||||
|         }); |             return response.sendError(404, "Not Found", responseHeaders); | ||||||
| 
 |         } | ||||||
|         // Handle new websocket connection. Parse the URL to determine
 |  | ||||||
|         // the client's scope of interest, register the client for update
 |  | ||||||
|         // messages, and call sendStatus.
 |  | ||||||
|         wss.set_open_handler([this](websocketpp::connection_hdl hdl){ |  | ||||||
|             websocket::connection_ptr c = wss.get_con_from_hdl(hdl); |  | ||||||
|             std::string res = c->get_resource(); |  | ||||||
|             if(res.substr(0, 5) == "/jobs") { |  | ||||||
|                 if(res.length() == 5) { |  | ||||||
|                     c->lc->scope.type = MonitorScope::ALL; |  | ||||||
|                 } else { |  | ||||||
|                     res = res.substr(5); |  | ||||||
|                     size_t split = res.find('/',1); |  | ||||||
|                     std::string job = res.substr(1,split-1); |  | ||||||
|                     if(!job.empty()) { |  | ||||||
|                         c->lc->scope.job = job; |  | ||||||
|                         c->lc->scope.type = MonitorScope::JOB; |  | ||||||
|                     } |  | ||||||
|                     if(split != std::string::npos) { |  | ||||||
|                         size_t split2 = res.find('/', split+1); |  | ||||||
|                         std::string run = res.substr(split+1, split2-split); |  | ||||||
|                         if(!run.empty()) { |  | ||||||
|                             c->lc->scope.num = static_cast<uint>(atoi(run.c_str())); |  | ||||||
|                             c->lc->scope.type = MonitorScope::RUN; |  | ||||||
|                         } |  | ||||||
|                         if(split2 != std::string::npos && res.compare(split2, 4, "/log") == 0) { |  | ||||||
|                             c->lc->scope.type = MonitorScope::LOG; |  | ||||||
|                         } |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|             // registerClient can happen after a successful websocket handshake.
 |  | ||||||
|             // However, the connection might not be closed gracefully, so the
 |  | ||||||
|             // corresponding deregister operation happens in the connection
 |  | ||||||
|             // destructor rather than the close handler
 |  | ||||||
|             laminar.registerClient(c->lc); |  | ||||||
|             laminar.sendStatus(c->lc); |  | ||||||
|         }); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     // Return a new connection object linked with the context defined below.
 |  | ||||||
|     // This is a bit untidy, it would be better to make them a single object,
 |  | ||||||
|     // but I didn't yet figure it out
 |  | ||||||
|     websocket::connection_ptr newConnection(LaminarClient* lc) { |  | ||||||
|         websocket::connection_ptr c = wss.get_connection(); |  | ||||||
|         c->lc = lc; |  | ||||||
|         return c; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     void connectionDestroyed(LaminarClient* lc) { |  | ||||||
|         // This will be called for all connections, not just websockets, so
 |  | ||||||
|         // the laminar instance should silently ignore unknown clients
 |  | ||||||
|         laminar.deregisterClient(lc); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
| private: |  | ||||||
|     Resources resources; |  | ||||||
|     LaminarInterface& laminar; |     LaminarInterface& laminar; | ||||||
|     websocket wss; |     Resources resources; | ||||||
|  |     kj::HttpHeaders responseHeaders; | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| // Context for an RPC connection
 | // Context for an RPC connection
 | ||||||
| @ -336,77 +351,14 @@ struct RpcConnection { | |||||||
|     capnp::RpcSystem<capnp::rpc::twoparty::VatId> rpcSystem; |     capnp::RpcSystem<capnp::rpc::twoparty::VatId> rpcSystem; | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| // Context for a WebsocketConnection (implements LaminarClient)
 |  | ||||||
| // This object maps read and write handlers between the websocketpp library
 |  | ||||||
| // and the corresponding kj async methods
 |  | ||||||
| struct Server::WebsocketConnection : public LaminarClient { |  | ||||||
|     WebsocketConnection(kj::Own<kj::AsyncIoStream>&& stream, Server::HttpImpl& http) : |  | ||||||
|         http(http), |  | ||||||
|         stream(kj::mv(stream)), |  | ||||||
|         cn(http.newConnection(this)), |  | ||||||
|         writePaf(kj::newPromiseAndFulfiller<void>()) |  | ||||||
|     { |  | ||||||
|         cn->set_write_handler([this](websocketpp::connection_hdl, const char* s, size_t sz) { |  | ||||||
|             outputBuffer.append(std::string(s, sz)); |  | ||||||
|             writePaf.fulfiller->fulfill(); |  | ||||||
|             return std::error_code(); |  | ||||||
|         }); |  | ||||||
|         cn->start(); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     virtual ~WebsocketConnection() noexcept(true) override { |  | ||||||
|         // Removes the connection from the list of registered clients. Must be
 |  | ||||||
|         // here rather than in the websocket closing handshake because connections
 |  | ||||||
|         // might be unexpectedly/aggressively closed and any references must be
 |  | ||||||
|         // removed.
 |  | ||||||
|         http.connectionDestroyed(this); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     kj::Promise<void> pend() { |  | ||||||
|         return stream->tryRead(ibuf, 1, sizeof(ibuf)).then([this](size_t sz){ |  | ||||||
|             cn->read_some(ibuf, sz); |  | ||||||
|             if(sz == 0 || cn->get_state() == websocketpp::session::state::closed) { |  | ||||||
|                 return kj::Promise<void>(kj::READY_NOW); |  | ||||||
|             } |  | ||||||
|             return pend(); |  | ||||||
|         }); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     kj::Promise<void> writeTask() { |  | ||||||
|         return writePaf.promise.then([this]() { |  | ||||||
|             std::string payload; |  | ||||||
|             // clear the outputBuffer for more context, and take a chunk
 |  | ||||||
|             // to send now
 |  | ||||||
|             payload.swap(outputBuffer); |  | ||||||
|             writePaf = kj::newPromiseAndFulfiller<void>(); |  | ||||||
|             KJ_ASSERT(!payload.empty()); |  | ||||||
|             return stream->write(payload.data(), payload.size()).then([this](){ |  | ||||||
|                 if(cn->get_state() == websocketpp::session::state::closed) { |  | ||||||
|                     return kj::Promise<void>(kj::READY_NOW); |  | ||||||
|                 } |  | ||||||
|                 return writeTask(); |  | ||||||
|             }).attach(kj::mv(payload)); |  | ||||||
|         }); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     void sendMessage(std::string payload) override { |  | ||||||
|         cn->send(payload, websocketpp::frame::opcode::text); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     HttpImpl& http; |  | ||||||
|     kj::Own<kj::AsyncIoStream> stream; |  | ||||||
|     websocket::connection_ptr cn; |  | ||||||
|     std::string outputBuffer; |  | ||||||
|     kj::PromiseFulfillerPair<void> writePaf; |  | ||||||
|     char ibuf[131072]; |  | ||||||
| }; |  | ||||||
| 
 |  | ||||||
| Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, | Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, | ||||||
|                kj::StringPtr httpBindAddress) : |                kj::StringPtr httpBindAddress) : | ||||||
|     rpcInterface(kj::heap<RpcImpl>(li)), |     rpcInterface(kj::heap<RpcImpl>(li)), | ||||||
|     laminarInterface(li), |     laminarInterface(li), | ||||||
|     httpInterface(kj::heap<HttpImpl>(li)), |  | ||||||
|     ioContext(kj::setupAsyncIo()), |     ioContext(kj::setupAsyncIo()), | ||||||
|  |     headerTable(), | ||||||
|  |     httpService(kj::heap<HttpImpl>(li, headerTable)), | ||||||
|  |     httpServer(kj::heap<kj::HttpServer>(ioContext.provider->getTimer(), headerTable, *httpService)), | ||||||
|     listeners(kj::heap<kj::TaskSet>(*this)), |     listeners(kj::heap<kj::TaskSet>(*this)), | ||||||
|     childTasks(*this), |     childTasks(*this), | ||||||
|     httpConnections(*this), |     httpConnections(*this), | ||||||
| @ -427,7 +379,8 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, | |||||||
|               .then([this](kj::Own<kj::NetworkAddress>&& addr) { |               .then([this](kj::Own<kj::NetworkAddress>&& addr) { | ||||||
|         // TODO: a better way? Currently used only for testing
 |         // TODO: a better way? Currently used only for testing
 | ||||||
|         httpReady.fulfiller->fulfill(); |         httpReady.fulfiller->fulfill(); | ||||||
|         return acceptHttpClient(addr->listen()); |         kj::Own<kj::ConnectionReceiver> listener = addr->listen(); | ||||||
|  |         return httpServer->listenHttp(*listener).attach(kj::mv(listener)); | ||||||
|     })); |     })); | ||||||
| 
 | 
 | ||||||
|     // handle SIGCHLD
 |     // handle SIGCHLD
 | ||||||
| @ -507,17 +460,6 @@ void Server::addWatchPath(const char* dpath) { | |||||||
|     inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE); |     inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| kj::Promise<void> Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) { |  | ||||||
|     kj::ConnectionReceiver& cr = *listener.get(); |  | ||||||
|     return cr.accept().then(kj::mvCapture(kj::mv(listener), |  | ||||||
|         [this](kj::Own<kj::ConnectionReceiver>&& listener, kj::Own<kj::AsyncIoStream>&& connection) { |  | ||||||
|             auto conn = kj::heap<WebsocketConnection>(kj::mv(connection), *httpInterface); |  | ||||||
|             // delete the connection when either the read or write task completes
 |  | ||||||
|             httpConnections.add(conn->pend().exclusiveJoin(conn->writeTask()).attach(kj::mv(conn))); |  | ||||||
|             return acceptHttpClient(kj::mv(listener)); |  | ||||||
|         })); |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| kj::Promise<void> Server::acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener) { | kj::Promise<void> Server::acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener) { | ||||||
|     kj::ConnectionReceiver& cr = *listener.get(); |     kj::ConnectionReceiver& cr = *listener.get(); | ||||||
|     return cr.accept().then(kj::mvCapture(kj::mv(listener), |     return cr.accept().then(kj::mvCapture(kj::mv(listener), | ||||||
|  | |||||||
| @ -20,6 +20,7 @@ | |||||||
| #define LAMINAR_SERVER_H_ | #define LAMINAR_SERVER_H_ | ||||||
| 
 | 
 | ||||||
| #include <kj/async-io.h> | #include <kj/async-io.h> | ||||||
|  | #include <kj/compat/http.h> | ||||||
| #include <capnp/message.h> | #include <capnp/message.h> | ||||||
| #include <capnp/capability.h> | #include <capnp/capability.h> | ||||||
| #include <functional> | #include <functional> | ||||||
| @ -50,21 +51,19 @@ public: | |||||||
|     void addWatchPath(const char* dpath); |     void addWatchPath(const char* dpath); | ||||||
| 
 | 
 | ||||||
| private: | private: | ||||||
|     kj::Promise<void> acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener); |  | ||||||
|     kj::Promise<void> acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener); |     kj::Promise<void> acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener); | ||||||
|     kj::Promise<void> handleFdRead(kj::AsyncInputStream* stream, char* buffer, std::function<void(const char*,size_t)> cb); |     kj::Promise<void> handleFdRead(kj::AsyncInputStream* stream, char* buffer, std::function<void(const char*,size_t)> cb); | ||||||
| 
 | 
 | ||||||
|     void taskFailed(kj::Exception&& exception) override; |     void taskFailed(kj::Exception&& exception) override; | ||||||
| 
 | 
 | ||||||
| private: | private: | ||||||
|     struct WebsocketConnection; |  | ||||||
|     class HttpImpl; |  | ||||||
| 
 |  | ||||||
|     int efd_quit; |     int efd_quit; | ||||||
|     capnp::Capability::Client rpcInterface; |     capnp::Capability::Client rpcInterface; | ||||||
|     LaminarInterface& laminarInterface; |     LaminarInterface& laminarInterface; | ||||||
|     kj::Own<HttpImpl> httpInterface; |  | ||||||
|     kj::AsyncIoContext ioContext; |     kj::AsyncIoContext ioContext; | ||||||
|  |     kj::HttpHeaderTable headerTable; | ||||||
|  |     kj::Own<kj::HttpService> httpService; | ||||||
|  |     kj::Own<kj::HttpServer> httpServer; | ||||||
|     kj::Own<kj::TaskSet> listeners; |     kj::Own<kj::TaskSet> listeners; | ||||||
|     kj::TaskSet childTasks; |     kj::TaskSet childTasks; | ||||||
|     kj::TaskSet httpConnections; |     kj::TaskSet httpConnections; | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user