diff --git a/src/interface.h b/src/interface.h index c4b0410..e430f6d 100644 --- a/src/interface.h +++ b/src/interface.h @@ -67,7 +67,6 @@ struct MonitorScope { // registerClient and deregisterClient struct LaminarClient { virtual void sendMessage(std::string payload) = 0; - virtual void close(bool now = true) = 0; MonitorScope scope; }; diff --git a/src/server.cpp b/src/server.cpp index 254a536..8a4c09b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -222,7 +222,6 @@ public: // 404 c->set_status(websocketpp::http::status_code::not_found); } - c->lc->close(false); }); // Handle new websocket connection. Parse the URL to determine @@ -262,7 +261,6 @@ public: wss.set_close_handler([this](websocketpp::connection_hdl hdl){ websocket::connection_ptr c = wss.get_con_from_hdl(hdl); laminar.deregisterClient(c->lc); - c->lc->close(); }); } @@ -297,31 +295,29 @@ struct RpcConnection { }; // Context for a WebsocketConnection (implements LaminarClient) -// This object is a streambuf and reimplements xsputn so that it can follow any -// write the websocketpp library makes to it with a write to the appropriate -// descriptor in the kj-async context. -struct Server::WebsocketConnection : public LaminarClient, public std::streambuf { +// This object maps read and write handlers between the websocketpp library +// and the corresponding kj async methods +struct Server::WebsocketConnection : public LaminarClient { WebsocketConnection(kj::Own&& stream, Server::HttpImpl& http) : stream(kj::mv(stream)), - out(this), cn(http.newConnection(this)), - writePaf(kj::newPromiseAndFulfiller()), - closeOnComplete(false) + writePaf(kj::newPromiseAndFulfiller()) { - cn->register_ostream(&out); + cn->set_write_handler([this](websocketpp::connection_hdl, const char* s, size_t sz) { + outputBuffer.append(std::string(s, sz)); + writePaf.fulfiller->fulfill(); + return std::error_code(); + }); cn->start(); } - ~WebsocketConnection() noexcept(true) { - outputBuffer.clear(); - writePaf.fulfiller->fulfill(); + virtual ~WebsocketConnection() noexcept(true) { } kj::Promise pend() { return stream->tryRead(ibuf, 1, sizeof(ibuf)).then([this](size_t sz){ - cn->read_all(ibuf, sz); + cn->read_some(ibuf, sz); if(sz == 0 || cn->get_state() == websocketpp::session::state::closed) { - cn->eof(); return kj::Promise(kj::READY_NOW); } return pend(); @@ -335,14 +331,13 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf // to send now payload.swap(outputBuffer); writePaf = kj::newPromiseAndFulfiller(); - if(payload.empty()) { - stream->shutdownWrite(); - return kj::Promise(kj::READY_NOW); - } else { - return stream->write(payload.data(), payload.size()).then([this](){ - return closeOnComplete ? stream->shutdownWrite(), kj::Promise(kj::READY_NOW) : writeTask(); - }).attach(kj::mv(payload)); - } + KJ_ASSERT(!payload.empty()); + return stream->write(payload.data(), payload.size()).then([this](){ + if(cn->get_state() == websocketpp::session::state::closed) { + return kj::Promise(kj::READY_NOW); + } + return writeTask(); + }).attach(kj::mv(payload)); }); } @@ -350,27 +345,11 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf cn->send(payload, websocketpp::frame::opcode::text); } - void close(bool now) override { - closeOnComplete = true; - if(now) { - outputBuffer.clear(); - writePaf.fulfiller->fulfill(); - } - } - - std::streamsize xsputn(const char* s, std::streamsize sz) override { - outputBuffer.append(std::string(s, sz)); - writePaf.fulfiller->fulfill(); - return sz; - } - kj::Own stream; - std::ostream out; websocket::connection_ptr cn; std::string outputBuffer; kj::PromiseFulfillerPair writePaf; char ibuf[131072]; - bool closeOnComplete; }; Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,