From b16991b17ae24265b73cca134088804bc66b6919 Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Fri, 19 Mar 2021 21:04:38 +1300 Subject: [PATCH] fix race in http log output Creating of LogWatcher makes it automatically receive new log chunks as it is part of logWatchers. But we cannot be sure that stream->write().then() will have already completed, so there is a chance that a null fulfiller will be caused, causing a crash. We cannot defer the creation of the LogWatcher until after stream->write() because in the meantime we may lose new messages, so call writeLogChunk to make sure we have a fulfiller before entering stream->write(). Also, pending chunks of log output were std::move()'d to the first interested client in the loop, they need to be copied if there is more than one client. --- src/http.cpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/http.cpp b/src/http.cpp index 0321521..cb7c584 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -221,9 +221,6 @@ kj::Promise Http::request(kj::HttpMethod method, kj::StringPtr url, const return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); } } else if(parseLogEndpoint(url, name, num)) { - auto lw = kj::heap>(logWatchers); - lw->job = name; - lw->run = num; bool complete; std::string output; if(laminar.handleLogRequest(name, num, output, complete)) { @@ -232,11 +229,16 @@ kj::Promise Http::request(kj::HttpMethod method, kj::StringPtr url, const // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output. responseHeaders.add("X-Accel-Buffering", "no"); auto stream = response.send(200, "OK", responseHeaders, nullptr); - return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=lw.get()]{ + auto s = stream.get(); + auto lw = kj::heap>(logWatchers); + lw->job = name; + lw->run = num; + auto promise = writeLogChunk(lw.get(), stream.get()).attach(kj::mv(stream)).attach(kj::mv(lw)); + return s->write(output.data(), output.size()).attach(kj::mv(output)).then([p=kj::mv(promise),complete]() mutable { if(complete) return kj::Promise(kj::READY_NOW); - return writeLogChunk(c, s); - }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(lw)); + return kj::mv(p); + }); } } else if(resources->handleRequest(url.cStr(), &start, &end, &content_type)) { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); @@ -288,7 +290,7 @@ void Http::notifyLog(std::string job, uint run, std::string log_chunk, bool eot) { for(LogWatcher* lw : logWatchers) { if(lw->job == job && lw->run == run) { - lw->pendingOutput.push_back(kj::mv(log_chunk)); + lw->pendingOutput.push_back(log_chunk); lw->fulfiller->fulfill(kj::mv(eot)); } }