1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2024-10-27 20:34:20 +00:00

server cleanup, fix logic error in early promise resolution

This commit is contained in:
Oliver Giles 2015-09-21 22:30:50 +02:00
parent cf9bee07db
commit f923762c7e
2 changed files with 39 additions and 36 deletions

View File

@ -69,6 +69,7 @@ struct MonitorScope {
// registerClient and deregisterClient // registerClient and deregisterClient
struct LaminarClient { struct LaminarClient {
virtual void sendMessage(std::string payload) = 0; virtual void sendMessage(std::string payload) = 0;
virtual void close() = 0;
MonitorScope scope; MonitorScope scope;
}; };

View File

@ -174,6 +174,7 @@ public:
std::string content; std::string content;
if(laminar.getArtefact(file, content)) { if(laminar.getArtefact(file, content)) {
c->set_status(websocketpp::http::status_code::ok); c->set_status(websocketpp::http::status_code::ok);
c->append_header("Content-Transfer-Encoding", "binary");
c->set_body(content); c->set_body(content);
} else { } else {
c->set_status(websocketpp::http::status_code::not_found); c->set_status(websocketpp::http::status_code::not_found);
@ -181,11 +182,14 @@ public:
} else if(resources.handleRequest(resource, &start, &end)) { } else if(resources.handleRequest(resource, &start, &end)) {
c->set_status(websocketpp::http::status_code::ok); c->set_status(websocketpp::http::status_code::ok);
c->append_header("Content-Encoding", "gzip"); c->append_header("Content-Encoding", "gzip");
c->set_body(std::string(start, end)); c->append_header("Content-Transfer-Encoding", "binary");
std::string response(start,end);
c->set_body(response);
} else { } else {
// 404 // 404
c->set_status(websocketpp::http::status_code::not_found); c->set_status(websocketpp::http::status_code::not_found);
} }
c->lc->close();
}); });
// Handle new websocket connection. Parse the URL to determine // Handle new websocket connection. Parse the URL to determine
@ -223,7 +227,9 @@ public:
}); });
wss.set_close_handler([this](websocketpp::connection_hdl hdl){ wss.set_close_handler([this](websocketpp::connection_hdl hdl){
laminar.deregisterClient(wss.get_con_from_hdl(hdl)->lc); websocket::connection_ptr c = wss.get_con_from_hdl(hdl);
laminar.deregisterClient(c->lc);
c->lc->close();
}); });
} }
@ -266,7 +272,8 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf
stream(kj::mv(stream)), stream(kj::mv(stream)),
out(this), out(this),
cn(http.newConnection(this)), cn(http.newConnection(this)),
writePaf(kj::newPromiseAndFulfiller<void>()) writePaf(kj::newPromiseAndFulfiller<void>()),
closeOnComplete(false)
{ {
cn->register_ostream(&out); cn->register_ostream(&out);
cn->start(); cn->start();
@ -278,7 +285,7 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf
} }
kj::Promise<void> pend() { kj::Promise<void> pend() {
return stream->tryRead(ibuf, 1, 1024).then([this](size_t sz){ return stream->tryRead(ibuf, 1, sizeof(ibuf)).then([this](size_t sz){
cn->read_all(ibuf, sz); cn->read_all(ibuf, sz);
if(sz == 0 || cn->get_state() == websocketpp::session::state::closed) { if(sz == 0 || cn->get_state() == websocketpp::session::state::closed) {
cn->eof(); cn->eof();
@ -298,9 +305,9 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf
if(payload.empty()) { if(payload.empty()) {
return kj::Promise<void>(kj::READY_NOW); return kj::Promise<void>(kj::READY_NOW);
} else { } else {
return stream->write(payload.data(), payload.length()).then([this]{ return stream->write(payload.data(), payload.size()).then([this](){
return writeTask(); return closeOnComplete ? kj::Promise<void>(kj::READY_NOW) : writeTask();
}); }).attach(kj::mv(payload));
} }
}); });
} }
@ -309,6 +316,12 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf
cn->send(payload, websocketpp::frame::opcode::text); cn->send(payload, websocketpp::frame::opcode::text);
} }
void close() override {
closeOnComplete = true;
outputBuffer.clear();
writePaf.fulfiller->fulfill();
}
std::streamsize xsputn(const char* s, std::streamsize sz) override { std::streamsize xsputn(const char* s, std::streamsize sz) override {
outputBuffer.append(std::string(s, sz)); outputBuffer.append(std::string(s, sz));
writePaf.fulfiller->fulfill(); writePaf.fulfiller->fulfill();
@ -320,8 +333,8 @@ struct Server::WebsocketConnection : public LaminarClient, public std::streambuf
websocket::connection_ptr cn; websocket::connection_ptr cn;
std::string outputBuffer; std::string outputBuffer;
kj::PromiseFulfillerPair<void> writePaf; kj::PromiseFulfillerPair<void> writePaf;
// TODO: think about this size char ibuf[131072];
char ibuf[1024]; bool closeOnComplete;
}; };
Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
@ -331,30 +344,17 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
ioContext(kj::setupAsyncIo()), ioContext(kj::setupAsyncIo()),
tasks(*this) tasks(*this)
{ {
// RPC task
{ // RPC task
auto paf = kj::newPromiseAndFulfiller<uint>();
tasks.add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress, 0) tasks.add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress, 0)
.then(kj::mvCapture(paf.fulfiller, .then([this](kj::Own<kj::NetworkAddress>&& addr) {
[this](kj::Own<kj::PromiseFulfiller<uint>>&& portFulfiller, acceptRpcClient(addr->listen());
kj::Own<kj::NetworkAddress>&& addr) { }));
auto listener = addr->listen();
portFulfiller->fulfill(listener->getPort());
acceptRpcClient(kj::mv(listener));
})));
}
{ // HTTP task // HTTP task
auto paf = kj::newPromiseAndFulfiller<uint>();
tasks.add(ioContext.provider->getNetwork().parseAddress(httpBindAddress, 0) tasks.add(ioContext.provider->getNetwork().parseAddress(httpBindAddress, 0)
.then(kj::mvCapture(paf.fulfiller, .then([this](kj::Own<kj::NetworkAddress>&& addr) {
[this](kj::Own<kj::PromiseFulfiller<uint>>&& portFulfiller, acceptHttpClient(addr->listen());
kj::Own<kj::NetworkAddress>&& addr) { }));
auto listener = addr->listen();
portFulfiller->fulfill(listener->getPort());
acceptHttpClient(kj::mv(listener));
})));
}
} }
Server::~Server() { Server::~Server() {
@ -392,7 +392,10 @@ void Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
kj::Own<kj::AsyncIoStream>&& connection) { kj::Own<kj::AsyncIoStream>&& connection) {
acceptHttpClient(kj::mv(listener)); acceptHttpClient(kj::mv(listener));
auto conn = kj::heap<WebsocketConnection>(kj::mv(connection), *httpInterface); auto conn = kj::heap<WebsocketConnection>(kj::mv(connection), *httpInterface);
return conn->pend().exclusiveJoin(conn->writeTask()).attach(std::move(conn)); auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(std::move(conn->pend()));
promises.add(std::move(conn->writeTask()));
return kj::joinPromises(promises.finish()).attach(std::move(conn));
})) }))
); );
} }
@ -412,9 +415,8 @@ void Server::acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener) {
// handles stdout/stderr from a child process by sending it to the provided // handles stdout/stderr from a child process by sending it to the provided
// callback function // callback function
kj::Promise<void> Server::handleProcessOutput(kj::AsyncInputStream* stream, std::function<void(char*,size_t)> readCb) { kj::Promise<void> Server::handleProcessOutput(kj::AsyncInputStream* stream, std::function<void(char*,size_t)> readCb) {
// TODO think about this size static char* buffer = new char[131072];
static char* buffer = new char[1024]; return stream->tryRead(buffer, 1, sizeof(buffer)).then([this,stream,readCb](size_t sz) {
return stream->tryRead(buffer, 1, 1024).then([this,stream,readCb](size_t sz) {
readCb(buffer, sz); readCb(buffer, sz);
if(sz > 0) { if(sz > 0) {
return handleProcessOutput(stream, readCb); return handleProcessOutput(stream, readCb);