1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2024-10-27 20:34:20 +00:00

server: minor websocket client handler refactor

Now explicitly closed connections on the client side are
closed with a clean error code. This is motivated by upcoming
work to detect broken websockets and automatically reconnect
This commit is contained in:
Oliver Giles 2017-12-16 18:21:33 +02:00
parent eb5b900849
commit 9e1a65ccee
2 changed files with 18 additions and 40 deletions

View File

@ -67,7 +67,6 @@ struct MonitorScope {
// registerClient and deregisterClient // registerClient and deregisterClient
struct LaminarClient { struct LaminarClient {
virtual void sendMessage(std::string payload) = 0; virtual void sendMessage(std::string payload) = 0;
virtual void close(bool now = true) = 0;
MonitorScope scope; MonitorScope scope;
}; };

View File

@ -222,7 +222,6 @@ public:
// 404 // 404
c->set_status(websocketpp::http::status_code::not_found); c->set_status(websocketpp::http::status_code::not_found);
} }
c->lc->close(false);
}); });
// Handle new websocket connection. Parse the URL to determine // Handle new websocket connection. Parse the URL to determine
@ -262,7 +261,6 @@ public:
wss.set_close_handler([this](websocketpp::connection_hdl hdl){ wss.set_close_handler([this](websocketpp::connection_hdl hdl){
websocket::connection_ptr c = wss.get_con_from_hdl(hdl); websocket::connection_ptr c = wss.get_con_from_hdl(hdl);
laminar.deregisterClient(c->lc); laminar.deregisterClient(c->lc);
c->lc->close();
}); });
} }
@ -297,31 +295,29 @@ struct RpcConnection {
}; };
// Context for a WebsocketConnection (implements LaminarClient) // Context for a WebsocketConnection (implements LaminarClient)
// This object is a streambuf and reimplements xsputn so that it can follow any // This object maps read and write handlers between the websocketpp library
// write the websocketpp library makes to it with a write to the appropriate // and the corresponding kj async methods
// descriptor in the kj-async context. struct Server::WebsocketConnection : public LaminarClient {
struct Server::WebsocketConnection : public LaminarClient, public std::streambuf {
WebsocketConnection(kj::Own<kj::AsyncIoStream>&& stream, Server::HttpImpl& http) : WebsocketConnection(kj::Own<kj::AsyncIoStream>&& stream, Server::HttpImpl& http) :
stream(kj::mv(stream)), stream(kj::mv(stream)),
out(this),
cn(http.newConnection(this)), cn(http.newConnection(this)),
writePaf(kj::newPromiseAndFulfiller<void>()), writePaf(kj::newPromiseAndFulfiller<void>())
closeOnComplete(false)
{ {
cn->register_ostream(&out); cn->set_write_handler([this](websocketpp::connection_hdl, const char* s, size_t sz) {
outputBuffer.append(std::string(s, sz));
writePaf.fulfiller->fulfill();
return std::error_code();
});
cn->start(); cn->start();
} }
~WebsocketConnection() noexcept(true) { virtual ~WebsocketConnection() noexcept(true) {
outputBuffer.clear();
writePaf.fulfiller->fulfill();
} }
kj::Promise<void> pend() { kj::Promise<void> pend() {
return stream->tryRead(ibuf, 1, sizeof(ibuf)).then([this](size_t sz){ return stream->tryRead(ibuf, 1, sizeof(ibuf)).then([this](size_t sz){
cn->read_all(ibuf, sz); cn->read_some(ibuf, sz);
if(sz == 0 || cn->get_state() == websocketpp::session::state::closed) { if(sz == 0 || cn->get_state() == websocketpp::session::state::closed) {
cn->eof();
return kj::Promise<void>(kj::READY_NOW); return kj::Promise<void>(kj::READY_NOW);
} }
return pend(); return pend();
@ -335,14 +331,13 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf
// to send now // to send now
payload.swap(outputBuffer); payload.swap(outputBuffer);
writePaf = kj::newPromiseAndFulfiller<void>(); writePaf = kj::newPromiseAndFulfiller<void>();
if(payload.empty()) { KJ_ASSERT(!payload.empty());
stream->shutdownWrite(); return stream->write(payload.data(), payload.size()).then([this](){
return kj::Promise<void>(kj::READY_NOW); if(cn->get_state() == websocketpp::session::state::closed) {
} else { return kj::Promise<void>(kj::READY_NOW);
return stream->write(payload.data(), payload.size()).then([this](){ }
return closeOnComplete ? stream->shutdownWrite(), kj::Promise<void>(kj::READY_NOW) : writeTask(); return writeTask();
}).attach(kj::mv(payload)); }).attach(kj::mv(payload));
}
}); });
} }
@ -350,27 +345,11 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf
cn->send(payload, websocketpp::frame::opcode::text); cn->send(payload, websocketpp::frame::opcode::text);
} }
void close(bool now) override {
closeOnComplete = true;
if(now) {
outputBuffer.clear();
writePaf.fulfiller->fulfill();
}
}
std::streamsize xsputn(const char* s, std::streamsize sz) override {
outputBuffer.append(std::string(s, sz));
writePaf.fulfiller->fulfill();
return sz;
}
kj::Own<kj::AsyncIoStream> stream; kj::Own<kj::AsyncIoStream> stream;
std::ostream out;
websocket::connection_ptr cn; websocket::connection_ptr cn;
std::string outputBuffer; std::string outputBuffer;
kj::PromiseFulfillerPair<void> writePaf; kj::PromiseFulfillerPair<void> writePaf;
char ibuf[131072]; char ibuf[131072];
bool closeOnComplete;
}; };
Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,