mirror of
https://github.com/ohwgiles/laminar.git
synced 2024-10-27 20:34:20 +00:00
server: allocate a single buffer per file descriptor
This commit is contained in:
parent
b70e501d6d
commit
272176a6a5
@ -422,7 +422,8 @@ void Server::stop() {
|
|||||||
|
|
||||||
void Server::addDescriptor(int fd, std::function<void(const char*,size_t)> cb) {
|
void Server::addDescriptor(int fd, std::function<void(const char*,size_t)> cb) {
|
||||||
auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
|
auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
|
||||||
tasks.add(handleFdRead(event, cb).attach(std::move(event)));
|
std::vector<char> buffer(PROC_IO_BUFSIZE);
|
||||||
|
tasks.add(handleFdRead(event, kj::mv(buffer), cb).attach(std::move(event)));
|
||||||
}
|
}
|
||||||
|
|
||||||
void Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
|
void Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
|
||||||
@ -455,13 +456,12 @@ void Server::acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener) {
|
|||||||
// returns a promise which will read a chunk of data from the file descriptor
|
// returns a promise which will read a chunk of data from the file descriptor
|
||||||
// wrapped by stream and invoke the provided callback with the read data.
|
// wrapped by stream and invoke the provided callback with the read data.
|
||||||
// Repeats until ::read returns <= 0
|
// Repeats until ::read returns <= 0
|
||||||
kj::Promise<void> Server::handleFdRead(kj::AsyncInputStream* stream, std::function<void(const char*,size_t)> cb) {
|
kj::Promise<void> Server::handleFdRead(kj::AsyncInputStream* stream, std::vector<char>&& buffer, std::function<void(const char*,size_t)> cb) {
|
||||||
std::vector<char> buffer(PROC_IO_BUFSIZE);
|
|
||||||
return stream->tryRead((void*)buffer.data(), 1, PROC_IO_BUFSIZE).then(kj::mvCapture(kj::mv(buffer), [this,stream,cb](std::vector<char>&& buffer, size_t sz) {
|
return stream->tryRead((void*)buffer.data(), 1, PROC_IO_BUFSIZE).then(kj::mvCapture(kj::mv(buffer), [this,stream,cb](std::vector<char>&& buffer, size_t sz) {
|
||||||
if(sz > 0) {
|
if(sz > 0) {
|
||||||
cb(buffer.data(), sz);
|
cb(buffer.data(), sz);
|
||||||
return handleFdRead(stream, cb);
|
return handleFdRead(stream, kj::mv(buffer), cb);
|
||||||
}
|
}
|
||||||
return kj::Promise<void>(kj::READY_NOW);
|
return kj::Promise<void>(kj::READY_NOW).attach(kj::mv(buffer));
|
||||||
})).attach(kj::mv(buffer));
|
}));
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
#include <capnp/message.h>
|
#include <capnp/message.h>
|
||||||
#include <capnp/capability.h>
|
#include <capnp/capability.h>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
struct LaminarInterface;
|
struct LaminarInterface;
|
||||||
|
|
||||||
@ -46,7 +47,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
void acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
void acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
||||||
void acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
void acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
||||||
kj::Promise<void> handleFdRead(kj::AsyncInputStream* stream, std::function<void(const char*,size_t)> cb);
|
kj::Promise<void> handleFdRead(kj::AsyncInputStream* stream, std::vector<char>&& buffer, std::function<void(const char*,size_t)> cb);
|
||||||
|
|
||||||
void taskFailed(kj::Exception&& exception) override {
|
void taskFailed(kj::Exception&& exception) override {
|
||||||
kj::throwFatalException(kj::mv(exception));
|
kj::throwFatalException(kj::mv(exception));
|
||||||
|
Loading…
Reference in New Issue
Block a user