From f923762c7ef05e200aea0b966d6bab8d426d5695 Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Mon, 21 Sep 2015 22:30:50 +0200 Subject: [PATCH] server cleanup, fix logic error in early promise resolution --- src/interface.h | 1 + src/server.cpp | 74 +++++++++++++++++++++++++------------------------ 2 files changed, 39 insertions(+), 36 deletions(-) diff --git a/src/interface.h b/src/interface.h index 07d8572..057603b 100644 --- a/src/interface.h +++ b/src/interface.h @@ -69,6 +69,7 @@ struct MonitorScope { // registerClient and deregisterClient struct LaminarClient { virtual void sendMessage(std::string payload) = 0; + virtual void close() = 0; MonitorScope scope; }; diff --git a/src/server.cpp b/src/server.cpp index 644d089..dafe911 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -174,6 +174,7 @@ public: std::string content; if(laminar.getArtefact(file, content)) { c->set_status(websocketpp::http::status_code::ok); + c->append_header("Content-Transfer-Encoding", "binary"); c->set_body(content); } else { c->set_status(websocketpp::http::status_code::not_found); @@ -181,11 +182,14 @@ public: } else if(resources.handleRequest(resource, &start, &end)) { c->set_status(websocketpp::http::status_code::ok); c->append_header("Content-Encoding", "gzip"); - c->set_body(std::string(start, end)); + c->append_header("Content-Transfer-Encoding", "binary"); + std::string response(start,end); + c->set_body(response); } else { // 404 c->set_status(websocketpp::http::status_code::not_found); } + c->lc->close(); }); // Handle new websocket connection. Parse the URL to determine @@ -223,7 +227,9 @@ public: }); wss.set_close_handler([this](websocketpp::connection_hdl hdl){ - laminar.deregisterClient(wss.get_con_from_hdl(hdl)->lc); + websocket::connection_ptr c = wss.get_con_from_hdl(hdl); + laminar.deregisterClient(c->lc); + c->lc->close(); }); } @@ -266,7 +272,8 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf stream(kj::mv(stream)), out(this), cn(http.newConnection(this)), - writePaf(kj::newPromiseAndFulfiller()) + writePaf(kj::newPromiseAndFulfiller()), + closeOnComplete(false) { cn->register_ostream(&out); cn->start(); @@ -278,7 +285,7 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf } kj::Promise pend() { - return stream->tryRead(ibuf, 1, 1024).then([this](size_t sz){ + return stream->tryRead(ibuf, 1, sizeof(ibuf)).then([this](size_t sz){ cn->read_all(ibuf, sz); if(sz == 0 || cn->get_state() == websocketpp::session::state::closed) { cn->eof(); @@ -298,9 +305,9 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf if(payload.empty()) { return kj::Promise(kj::READY_NOW); } else { - return stream->write(payload.data(), payload.length()).then([this]{ - return writeTask(); - }); + return stream->write(payload.data(), payload.size()).then([this](){ + return closeOnComplete ? kj::Promise(kj::READY_NOW) : writeTask(); + }).attach(kj::mv(payload)); } }); } @@ -309,6 +316,12 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf cn->send(payload, websocketpp::frame::opcode::text); } + void close() override { + closeOnComplete = true; + outputBuffer.clear(); + writePaf.fulfiller->fulfill(); + } + std::streamsize xsputn(const char* s, std::streamsize sz) override { outputBuffer.append(std::string(s, sz)); writePaf.fulfiller->fulfill(); @@ -320,8 +333,8 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf websocket::connection_ptr cn; std::string outputBuffer; kj::PromiseFulfillerPair writePaf; - // TODO: think about this size - char ibuf[1024]; + char ibuf[131072]; + bool closeOnComplete; }; Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, @@ -331,30 +344,17 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, ioContext(kj::setupAsyncIo()), tasks(*this) { + // RPC task + tasks.add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress, 0) + .then([this](kj::Own&& addr) { + acceptRpcClient(addr->listen()); + })); - { // RPC task - auto paf = kj::newPromiseAndFulfiller(); - tasks.add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress, 0) - .then(kj::mvCapture(paf.fulfiller, - [this](kj::Own>&& portFulfiller, - kj::Own&& addr) { - auto listener = addr->listen(); - portFulfiller->fulfill(listener->getPort()); - acceptRpcClient(kj::mv(listener)); - }))); - } - - { // HTTP task - auto paf = kj::newPromiseAndFulfiller(); - tasks.add(ioContext.provider->getNetwork().parseAddress(httpBindAddress, 0) - .then(kj::mvCapture(paf.fulfiller, - [this](kj::Own>&& portFulfiller, - kj::Own&& addr) { - auto listener = addr->listen(); - portFulfiller->fulfill(listener->getPort()); - acceptHttpClient(kj::mv(listener)); - }))); - } + // HTTP task + tasks.add(ioContext.provider->getNetwork().parseAddress(httpBindAddress, 0) + .then([this](kj::Own&& addr) { + acceptHttpClient(addr->listen()); + })); } Server::~Server() { @@ -392,7 +392,10 @@ void Server::acceptHttpClient(kj::Own&& listener) { kj::Own&& connection) { acceptHttpClient(kj::mv(listener)); auto conn = kj::heap(kj::mv(connection), *httpInterface); - return conn->pend().exclusiveJoin(conn->writeTask()).attach(std::move(conn)); + auto promises = kj::heapArrayBuilder>(2); + promises.add(std::move(conn->pend())); + promises.add(std::move(conn->writeTask())); + return kj::joinPromises(promises.finish()).attach(std::move(conn)); })) ); } @@ -412,9 +415,8 @@ void Server::acceptRpcClient(kj::Own&& listener) { // handles stdout/stderr from a child process by sending it to the provided // callback function kj::Promise Server::handleProcessOutput(kj::AsyncInputStream* stream, std::function readCb) { - // TODO think about this size - static char* buffer = new char[1024]; - return stream->tryRead(buffer, 1, 1024).then([this,stream,readCb](size_t sz) { + static char* buffer = new char[131072]; + return stream->tryRead(buffer, 1, sizeof(buffer)).then([this,stream,readCb](size_t sz) { readCb(buffer, sz); if(sz > 0) { return handleProcessOutput(stream, readCb);