diff --git a/src/server.cpp b/src/server.cpp index b84d11d..b1b352d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -218,10 +218,9 @@ private: kj::Own> fulfiller; }; - kj::Promise handleWebsocket(WebsocketClient& lc) { - auto paf = kj::newPromiseAndFulfiller(); - lc.fulfiller = kj::mv(paf.fulfiller); - return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) -> kj::Promise { + kj::Promise websocketRead(WebsocketClient& lc) + { + return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) { KJ_SWITCH_ONEOF(message) { KJ_CASE_ONEOF(str, kj::String) { rapidjson::Document d; @@ -230,31 +229,36 @@ private: int page = d["page"].GetInt(); lc.scope.page = page; laminar.sendStatus(&lc); - // freeze this promise. sendStatus will cause the other half of - // exclusiveJoin below to proceed and this branch will be cancelled - return kj::NEVER_DONE; + return websocketRead(lc); } } KJ_CASE_ONEOF(close, kj::WebSocket::Close) { // clean socket shutdown - return lc.ws->close(close.code, close.reason).then([]{return false;}); + return lc.ws->close(close.code, close.reason); } KJ_CASE_ONEOF_DEFAULT {} } // unhandled/unknown message - return lc.ws->disconnect().then([]{return false;}); - }).exclusiveJoin(kj::mv(paf.promise).then([&lc]{ + return lc.ws->disconnect(); + }); + } + + kj::Promise websocketWrite(WebsocketClient& lc) + { + auto paf = kj::newPromiseAndFulfiller(); + lc.fulfiller = kj::mv(paf.fulfiller); + return paf.promise.then([this,&lc]{ kj::Promise p = kj::READY_NOW; - for(std::string& s : lc.messages) { + std::list messages = kj::mv(lc.messages); + for(std::string& s : messages) { p = p.then([&s,&lc]{ kj::String str = kj::str(s); return lc.ws->send(str).attach(kj::mv(str)); }); } - return p.then([]{return true;}); - //return lc.ws->send(str).attach(kj::mv(str)).then([]{ return true;}); - })).then([this,&lc](bool cont){ - return cont ? handleWebsocket(lc) : kj::READY_NOW; + return p.attach(kj::mv(messages)).then([this,&lc]{ + return websocketWrite(lc); + }); }); } @@ -285,7 +289,7 @@ private: } } laminar.registerClient(&lc); - kj::Promise connection = handleWebsocket(lc); + kj::Promise connection = websocketRead(lc).exclusiveJoin(websocketWrite(lc)); // registerClient can happen after a successful websocket handshake. // However, the connection might not be closed gracefully, so the // corresponding deregister operation happens in the WebsocketClient