diff --git a/src/laminar.cpp b/src/laminar.cpp index 1501234..56df60a 100644 --- a/src/laminar.cpp +++ b/src/laminar.cpp @@ -371,7 +371,7 @@ void Laminar::run() { sigaddset(&mask, SIGCHLD); sigprocmask(SIG_BLOCK, &mask, NULL); int sigchld = signalfd(-1, &mask, 0); - srv->addDescriptor(sigchld, [this](char* buf, size_t sz){ + srv->addDescriptor(sigchld, [this](const char* buf, size_t sz){ struct signalfd_siginfo* siginfo = (struct signalfd_siginfo*) buf; // TODO: re-enable assertion when the cause for its triggering // is discovered and solved @@ -505,7 +505,7 @@ std::shared_ptr Laminar::queueJob(std::string name, ParamMap params) { bool Laminar::stepRun(std::shared_ptr run) { bool complete = run->step(); if(!complete) { - srv->addDescriptor(run->fd, [=](char* b,size_t n){ + srv->addDescriptor(run->fd, [=](const char* b,size_t n){ std::string s(b,n); run->log += s; for(LaminarClient* c : clients) { diff --git a/src/server.cpp b/src/server.cpp index 4f88aa0..03295ac 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -420,7 +420,7 @@ void Server::stop() { eventfd_write(efd_quit, 1); } -void Server::addDescriptor(int fd, std::function cb) { +void Server::addDescriptor(int fd, std::function cb) { auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); tasks.add(handleFdRead(event, cb).attach(std::move(event))); } @@ -455,13 +455,14 @@ void Server::acceptRpcClient(kj::Own&& listener) { // returns a promise which will read a chunk of data from the file descriptor // wrapped by stream and invoke the provided callback with the read data. // Repeats until ::read returns <= 0 -kj::Promise Server::handleFdRead(kj::AsyncInputStream* stream, std::function cb) { - static char* buffer = new char[PROC_IO_BUFSIZE]; - return stream->tryRead(buffer, 1, PROC_IO_BUFSIZE).then([this,stream,cb](size_t sz) { +kj::Promise Server::handleFdRead(kj::AsyncInputStream* stream, std::function cb) { + std::string buffer; + buffer.reserve(PROC_IO_BUFSIZE); + return stream->tryRead((void*)buffer.data(), 1, PROC_IO_BUFSIZE).then(kj::mvCapture(kj::mv(buffer), [this,stream,cb](std::string&& buffer, size_t sz) { if(sz > 0) { - cb(buffer, sz); + cb(buffer.data(), sz); return handleFdRead(stream, cb); } return kj::Promise(kj::READY_NOW); - }); + })).attach(kj::mv(buffer)); } diff --git a/src/server.h b/src/server.h index c15ca30..8485282 100644 --- a/src/server.h +++ b/src/server.h @@ -41,12 +41,12 @@ public: // add a file descriptor to be monitored for output. The callback will be // invoked with the read data - void addDescriptor(int fd, std::function cb); + void addDescriptor(int fd, std::function cb); private: void acceptHttpClient(kj::Own&& listener); void acceptRpcClient(kj::Own&& listener); - kj::Promise handleFdRead(kj::AsyncInputStream* stream, std::function cb); + kj::Promise handleFdRead(kj::AsyncInputStream* stream, std::function cb); void taskFailed(kj::Exception&& exception) override { kj::throwFatalException(kj::mv(exception));