From 078e0e988271e168d95bc23c3210d0611d6673cf Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Fri, 6 Jul 2018 12:32:20 +0300 Subject: [PATCH] improve websocket handling The previous implementation meant that messages could get lost if both sending and receiving were scheduled to be processed in the same event loop cycle. This commit separates the two channels more clearly, while still allowing the close event in the receive side to cancel the whole pipeline Part of #49 refactor --- src/server.cpp | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index b84d11d..b1b352d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -218,10 +218,9 @@ private: kj::Own> fulfiller; }; - kj::Promise handleWebsocket(WebsocketClient& lc) { - auto paf = kj::newPromiseAndFulfiller(); - lc.fulfiller = kj::mv(paf.fulfiller); - return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) -> kj::Promise { + kj::Promise websocketRead(WebsocketClient& lc) + { + return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) { KJ_SWITCH_ONEOF(message) { KJ_CASE_ONEOF(str, kj::String) { rapidjson::Document d; @@ -230,31 +229,36 @@ private: int page = d["page"].GetInt(); lc.scope.page = page; laminar.sendStatus(&lc); - // freeze this promise. sendStatus will cause the other half of - // exclusiveJoin below to proceed and this branch will be cancelled - return kj::NEVER_DONE; + return websocketRead(lc); } } KJ_CASE_ONEOF(close, kj::WebSocket::Close) { // clean socket shutdown - return lc.ws->close(close.code, close.reason).then([]{return false;}); + return lc.ws->close(close.code, close.reason); } KJ_CASE_ONEOF_DEFAULT {} } // unhandled/unknown message - return lc.ws->disconnect().then([]{return false;}); - }).exclusiveJoin(kj::mv(paf.promise).then([&lc]{ + return lc.ws->disconnect(); + }); + } + + kj::Promise websocketWrite(WebsocketClient& lc) + { + auto paf = kj::newPromiseAndFulfiller(); + lc.fulfiller = kj::mv(paf.fulfiller); + return paf.promise.then([this,&lc]{ kj::Promise p = kj::READY_NOW; - for(std::string& s : lc.messages) { + std::list messages = kj::mv(lc.messages); + for(std::string& s : messages) { p = p.then([&s,&lc]{ kj::String str = kj::str(s); return lc.ws->send(str).attach(kj::mv(str)); }); } - return p.then([]{return true;}); - //return lc.ws->send(str).attach(kj::mv(str)).then([]{ return true;}); - })).then([this,&lc](bool cont){ - return cont ? handleWebsocket(lc) : kj::READY_NOW; + return p.attach(kj::mv(messages)).then([this,&lc]{ + return websocketWrite(lc); + }); }); } @@ -285,7 +289,7 @@ private: } } laminar.registerClient(&lc); - kj::Promise connection = handleWebsocket(lc); + kj::Promise connection = websocketRead(lc).exclusiveJoin(websocketWrite(lc)); // registerClient can happen after a successful websocket handshake. // However, the connection might not be closed gracefully, so the // corresponding deregister operation happens in the WebsocketClient