@ -257,14 +257,13 @@ public:
}
}
}
// registerClient can happen after a successful websocket handshake.
// However, the connection might not be closed gracefully, so the
// corresponding deregister operation happens in the connection
// destructor rather than the close handler
laminar . registerClient ( c - > lc ) ;
laminar . sendStatus ( c - > lc ) ;
} ) ;
wss . set_close_handler ( [ this ] ( websocketpp : : connection_hdl hdl ) {
websocket : : connection_ptr c = wss . get_con_from_hdl ( hdl ) ;
laminar . deregisterClient ( c - > lc ) ;
} ) ;
}
// Return a new connection object linked with the context defined below.
@ -276,6 +275,12 @@ public:
return c ;
}
void connectionDestroyed ( LaminarClient * lc ) {
// This will be called for all connections, not just websockets, so
// the laminar instance should silently ignore unknown clients
laminar . deregisterClient ( lc ) ;
}
private :
Resources resources ;
LaminarInterface & laminar ;
@ -302,6 +307,7 @@ struct RpcConnection {
// and the corresponding kj async methods
struct Server : : WebsocketConnection : public LaminarClient {
WebsocketConnection ( kj : : Own < kj : : AsyncIoStream > & & stream , Server : : HttpImpl & http ) :
http ( http ) ,
stream ( kj : : mv ( stream ) ) ,
cn ( http . newConnection ( this ) ) ,
writePaf ( kj : : newPromiseAndFulfiller < void > ( ) )
@ -314,7 +320,13 @@ struct Server::WebsocketConnection : public LaminarClient {
cn - > start ( ) ;
}
virtual ~ WebsocketConnection ( ) noexcept ( true ) override { }
virtual ~ WebsocketConnection ( ) noexcept ( true ) override {
// Removes the connection from the list of registered clients. Must be
// here rather than in the websocket closing handshake because connections
// might be unexpectedly/aggressively closed and any references must be
// removed.
http . connectionDestroyed ( this ) ;
}
kj : : Promise < void > pend ( ) {
return stream - > tryRead ( ibuf , 1 , sizeof ( ibuf ) ) . then ( [ this ] ( size_t sz ) {
@ -347,6 +359,7 @@ struct Server::WebsocketConnection : public LaminarClient {
cn - > send ( payload , websocketpp : : frame : : opcode : : text ) ;
}
HttpImpl & http ;
kj : : Own < kj : : AsyncIoStream > stream ;
websocket : : connection_ptr cn ;
std : : string outputBuffer ;
@ -357,9 +370,11 @@ struct Server::WebsocketConnection : public LaminarClient {
Server : : Server ( LaminarInterface & li , kj : : StringPtr rpcBindAddress ,
kj : : StringPtr httpBindAddress ) :
rpcInterface ( kj : : heap < RpcImpl > ( li ) ) ,
httpInterface ( new HttpImpl ( li ) ) ,
laminarInterface ( li ) ,
httpInterface ( kj : : heap < HttpImpl > ( li ) ) ,
ioContext ( kj : : setupAsyncIo ( ) ) ,
tasks ( * this )
tasks ( * this ) ,
httpReady ( kj : : newPromiseAndFulfiller < void > ( ) )
{
// RPC task
if ( rpcBindAddress . startsWith ( " unix: " ) )
@ -375,13 +390,12 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
tasks . add ( ioContext . provider - > getNetwork ( ) . parseAddress ( httpBindAddress )
. then ( [ this ] ( kj : : Own < kj : : NetworkAddress > & & addr ) {
acceptHttpClient ( addr - > listen ( ) ) ;
// TODO: a better way? Currently used only for testing
httpReady . fulfiller - > fulfill ( ) ;
} ) ) ;
}
Server : : ~ Server ( ) {
// RpcImpl is deleted through Capability::Client.
// Deal with the HTTP interface the old-fashioned way
delete httpInterface ;
}
void Server : : start ( ) {
@ -410,14 +424,11 @@ void Server::addDescriptor(int fd, std::function<void(const char*,size_t)> cb) {
void Server : : acceptHttpClient ( kj : : Own < kj : : ConnectionReceiver > & & listener ) {
auto ptr = listener . get ( ) ;
tasks . add ( ptr - > accept ( ) . then ( kj : : mvCapture ( kj : : mv ( listener ) ,
[ this ] ( kj : : Own < kj : : ConnectionReceiver > & & listener ,
kj : : Own < kj : : AsyncIoStream > & & connection ) {
[ this ] ( kj : : Own < kj : : ConnectionReceiver > & & listener , kj : : Own < kj : : AsyncIoStream > & & connection ) {
acceptHttpClient ( kj : : mv ( listener ) ) ;
auto conn = kj : : heap < WebsocketConnection > ( kj : : mv ( connection ) , * httpInterface ) ;
auto promises = kj : : heapArrayBuilder < kj : : Promise < void > > ( 2 ) ;
promises . add ( conn - > pend ( ) ) ;
promises . add ( conn - > writeTask ( ) ) ;
return kj : : joinPromises ( promises . finish ( ) ) . attach ( std : : move ( conn ) ) ;
// delete the connection when either the read or write task completes
return conn - > pend ( ) . exclusiveJoin ( conn - > writeTask ( ) ) . attach ( kj : : mv ( conn ) ) ;
} ) )
) ;
}
@ -425,8 +436,7 @@ void Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
void Server : : acceptRpcClient ( kj : : Own < kj : : ConnectionReceiver > & & listener ) {
auto ptr = listener . get ( ) ;
tasks . add ( ptr - > accept ( ) . then ( kj : : mvCapture ( kj : : mv ( listener ) ,
[ this ] ( kj : : Own < kj : : ConnectionReceiver > & & listener ,
kj : : Own < kj : : AsyncIoStream > & & connection ) {
[ this ] ( kj : : Own < kj : : ConnectionReceiver > & & listener , kj : : Own < kj : : AsyncIoStream > & & connection ) {
acceptRpcClient ( kj : : mv ( listener ) ) ;
auto server = kj : : heap < RpcConnection > ( kj : : mv ( connection ) , rpcInterface , capnp : : ReaderOptions ( ) ) ;
tasks . add ( server - > network . onDisconnect ( ) . attach ( kj : : mv ( server ) ) ) ;
@ -446,3 +456,10 @@ kj::Promise<void> Server::handleFdRead(kj::AsyncInputStream* stream, char* buffe
return kj : : Promise < void > ( kj : : READY_NOW ) ;
} ) ;
}
void Server : : taskFailed ( kj : : Exception & & exception ) {
// An unexpected http connection close can cause an exception, so don't re-throw.
// TODO: consider re-throwing selected exceptions
LLOG ( INFO , exception ) ;
//kj::throwFatalException(kj::mv(exception));
}