1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2024-10-27 20:34:20 +00:00

fix race in http log output

Creating of LogWatcher makes it automatically receive new log
chunks as it is part of logWatchers. But we cannot be sure that
stream->write().then() will have already completed, so there is
a chance that a null fulfiller will be caused, causing a crash.
We cannot defer the creation of the LogWatcher until after
stream->write() because in the meantime we may lose new messages,
so call writeLogChunk to make sure we have a fulfiller before
entering stream->write().

Also, pending chunks of log output were std::move()'d to the first
interested client in the loop, they need to be copied if there
is more than one client.
This commit is contained in:
Oliver Giles 2021-03-19 21:04:38 +13:00
parent c7c586167c
commit b16991b17a

View File

@ -221,9 +221,6 @@ kj::Promise<void> Http::request(kj::HttpMethod method, kj::StringPtr url, const
return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream));
} }
} else if(parseLogEndpoint(url, name, num)) { } else if(parseLogEndpoint(url, name, num)) {
auto lw = kj::heap<WithSetRef<LogWatcher>>(logWatchers);
lw->job = name;
lw->run = num;
bool complete; bool complete;
std::string output; std::string output;
if(laminar.handleLogRequest(name, num, output, complete)) { if(laminar.handleLogRequest(name, num, output, complete)) {
@ -232,11 +229,16 @@ kj::Promise<void> Http::request(kj::HttpMethod method, kj::StringPtr url, const
// Disables nginx reverse-proxy's buffering. Necessary for dynamic log output. // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output.
responseHeaders.add("X-Accel-Buffering", "no"); responseHeaders.add("X-Accel-Buffering", "no");
auto stream = response.send(200, "OK", responseHeaders, nullptr); auto stream = response.send(200, "OK", responseHeaders, nullptr);
return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=lw.get()]{ auto s = stream.get();
auto lw = kj::heap<WithSetRef<LogWatcher>>(logWatchers);
lw->job = name;
lw->run = num;
auto promise = writeLogChunk(lw.get(), stream.get()).attach(kj::mv(stream)).attach(kj::mv(lw));
return s->write(output.data(), output.size()).attach(kj::mv(output)).then([p=kj::mv(promise),complete]() mutable {
if(complete) if(complete)
return kj::Promise<void>(kj::READY_NOW); return kj::Promise<void>(kj::READY_NOW);
return writeLogChunk(c, s); return kj::mv(p);
}).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(lw)); });
} }
} else if(resources->handleRequest(url.cStr(), &start, &end, &content_type)) { } else if(resources->handleRequest(url.cStr(), &start, &end, &content_type)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type);
@ -288,7 +290,7 @@ void Http::notifyLog(std::string job, uint run, std::string log_chunk, bool eot)
{ {
for(LogWatcher* lw : logWatchers) { for(LogWatcher* lw : logWatchers) {
if(lw->job == job && lw->run == run) { if(lw->job == job && lw->run == run) {
lw->pendingOutput.push_back(kj::mv(log_chunk)); lw->pendingOutput.push_back(log_chunk);
lw->fulfiller->fulfill(kj::mv(eot)); lw->fulfiller->fulfill(kj::mv(eot));
} }
} }