mirror of
https://github.com/ohwgiles/laminar.git
synced 2024-10-27 20:34:20 +00:00
resolves #19: laminard crashes under load
reading into a static buffer is a race condition that is only manifested under load. There's no guarantee the clause in then() will run before another task overwrites the buffer. Allocating a local string is the only correct solution
This commit is contained in:
parent
7dce535264
commit
d91816097a
@ -371,7 +371,7 @@ void Laminar::run() {
|
|||||||
sigaddset(&mask, SIGCHLD);
|
sigaddset(&mask, SIGCHLD);
|
||||||
sigprocmask(SIG_BLOCK, &mask, NULL);
|
sigprocmask(SIG_BLOCK, &mask, NULL);
|
||||||
int sigchld = signalfd(-1, &mask, 0);
|
int sigchld = signalfd(-1, &mask, 0);
|
||||||
srv->addDescriptor(sigchld, [this](char* buf, size_t sz){
|
srv->addDescriptor(sigchld, [this](const char* buf, size_t sz){
|
||||||
struct signalfd_siginfo* siginfo = (struct signalfd_siginfo*) buf;
|
struct signalfd_siginfo* siginfo = (struct signalfd_siginfo*) buf;
|
||||||
// TODO: re-enable assertion when the cause for its triggering
|
// TODO: re-enable assertion when the cause for its triggering
|
||||||
// is discovered and solved
|
// is discovered and solved
|
||||||
@ -505,7 +505,7 @@ std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) {
|
|||||||
bool Laminar::stepRun(std::shared_ptr<Run> run) {
|
bool Laminar::stepRun(std::shared_ptr<Run> run) {
|
||||||
bool complete = run->step();
|
bool complete = run->step();
|
||||||
if(!complete) {
|
if(!complete) {
|
||||||
srv->addDescriptor(run->fd, [=](char* b,size_t n){
|
srv->addDescriptor(run->fd, [=](const char* b,size_t n){
|
||||||
std::string s(b,n);
|
std::string s(b,n);
|
||||||
run->log += s;
|
run->log += s;
|
||||||
for(LaminarClient* c : clients) {
|
for(LaminarClient* c : clients) {
|
||||||
|
@ -420,7 +420,7 @@ void Server::stop() {
|
|||||||
eventfd_write(efd_quit, 1);
|
eventfd_write(efd_quit, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Server::addDescriptor(int fd, std::function<void(char*,size_t)> cb) {
|
void Server::addDescriptor(int fd, std::function<void(const char*,size_t)> cb) {
|
||||||
auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
|
auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
|
||||||
tasks.add(handleFdRead(event, cb).attach(std::move(event)));
|
tasks.add(handleFdRead(event, cb).attach(std::move(event)));
|
||||||
}
|
}
|
||||||
@ -455,13 +455,14 @@ void Server::acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener) {
|
|||||||
// returns a promise which will read a chunk of data from the file descriptor
|
// returns a promise which will read a chunk of data from the file descriptor
|
||||||
// wrapped by stream and invoke the provided callback with the read data.
|
// wrapped by stream and invoke the provided callback with the read data.
|
||||||
// Repeats until ::read returns <= 0
|
// Repeats until ::read returns <= 0
|
||||||
kj::Promise<void> Server::handleFdRead(kj::AsyncInputStream* stream, std::function<void(char*,size_t)> cb) {
|
kj::Promise<void> Server::handleFdRead(kj::AsyncInputStream* stream, std::function<void(const char*,size_t)> cb) {
|
||||||
static char* buffer = new char[PROC_IO_BUFSIZE];
|
std::string buffer;
|
||||||
return stream->tryRead(buffer, 1, PROC_IO_BUFSIZE).then([this,stream,cb](size_t sz) {
|
buffer.reserve(PROC_IO_BUFSIZE);
|
||||||
|
return stream->tryRead((void*)buffer.data(), 1, PROC_IO_BUFSIZE).then(kj::mvCapture(kj::mv(buffer), [this,stream,cb](std::string&& buffer, size_t sz) {
|
||||||
if(sz > 0) {
|
if(sz > 0) {
|
||||||
cb(buffer, sz);
|
cb(buffer.data(), sz);
|
||||||
return handleFdRead(stream, cb);
|
return handleFdRead(stream, cb);
|
||||||
}
|
}
|
||||||
return kj::Promise<void>(kj::READY_NOW);
|
return kj::Promise<void>(kj::READY_NOW);
|
||||||
});
|
})).attach(kj::mv(buffer));
|
||||||
}
|
}
|
||||||
|
@ -41,12 +41,12 @@ public:
|
|||||||
|
|
||||||
// add a file descriptor to be monitored for output. The callback will be
|
// add a file descriptor to be monitored for output. The callback will be
|
||||||
// invoked with the read data
|
// invoked with the read data
|
||||||
void addDescriptor(int fd, std::function<void(char*,size_t)> cb);
|
void addDescriptor(int fd, std::function<void(const char*,size_t)> cb);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
void acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
||||||
void acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
void acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
||||||
kj::Promise<void> handleFdRead(kj::AsyncInputStream* stream, std::function<void(char*,size_t)> cb);
|
kj::Promise<void> handleFdRead(kj::AsyncInputStream* stream, std::function<void(const char*,size_t)> cb);
|
||||||
|
|
||||||
void taskFailed(kj::Exception&& exception) override {
|
void taskFailed(kj::Exception&& exception) override {
|
||||||
kj::throwFatalException(kj::mv(exception));
|
kj::throwFatalException(kj::mv(exception));
|
||||||
|
Loading…
Reference in New Issue
Block a user