resolves #79: serve logs over plain chunked http

pull/88/head
Oliver Giles 5 years ago
parent 5c7421c833
commit cec4721e52

@ -205,6 +205,12 @@ This can be securely and flexibly combined with remote triggering using `ssh`. T
Consider using [webhook](https://github.com/adnanh/webhook) or a similar application to call `laminarc`.
## Viewing job logs
A job's console output can be viewed on the Web UI at http://localhost:8080/jobs/$NAME/$NUMBER.
Additionally, the raw log output may be fetched over a plain HTTP request to http://localhost:8080/log/$NAME/$NUMBER. The response will be chunked, allowing this mechanism to also be used for in-progress jobs. Furthermore, the special endpoint http://localhost:8080/log/$NAME/latest will redirect to the most recent log output. Be aware that the use of this endpoint may be subject to races when new jobs start.
---
# Job chains

@ -1,5 +1,5 @@
///
/// Copyright 2015 Oliver Giles
/// Copyright 2015-2019 Oliver Giles
///
/// This file is part of Laminar
///
@ -73,6 +73,8 @@ struct MonitorScope {
struct LaminarClient {
virtual ~LaminarClient() noexcept(false) {}
virtual void sendMessage(std::string payload) = 0;
// TODO: redesign
virtual void notifyJobFinished() {}
MonitorScope scope;
};
@ -118,6 +120,13 @@ struct LaminarInterface {
// to call LaminarWaiter::complete on invalid data
virtual void deregisterWaiter(LaminarWaiter* waiter) = 0;
// Return the latest known number of the named job
virtual uint latestRun(std::string job) = 0;
// Given a job name and number, return existence and (via reference params)
// its current log output and whether the job is ongoing
virtual bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) = 0;
// Synchronously send a snapshot of the current status to the given
// client (as governed by the client's MonitorScope). This is called on
// initial websocket connect.

@ -1,5 +1,5 @@
///
/// Copyright 2015-2018 Oliver Giles
/// Copyright 2015-2019 Oliver Giles
///
/// This file is part of Laminar
///
@ -122,6 +122,53 @@ void Laminar::deregisterWaiter(LaminarWaiter *waiter) {
waiters.erase(waiter);
}
uint Laminar::latestRun(std::string job) {
auto it = activeJobs.byJobName().equal_range(job);
if(it.first == it.second) {
uint result = 0;
db->stmt("SELECT MAX(buildNum) FROM builds WHERE name = ?")
.bind(job)
.fetch<uint>([&](uint x){
result = x;
});
return result;
} else {
return (*--it.second)->build;
}
}
// TODO: reunify with sendStatus. The difference is that this method is capable of
// returning "not found" to the caller, and sendStatus isn't
bool Laminar::handleLogRequest(std::string name, uint num, std::string& output, bool& complete) {
if(Run* run = activeRun(name, num)) {
output = run->log;
complete = false;
return true;
} else { // it must be finished, fetch it from the database
const char* stmt = num == 0 ? "SELECT output, outputLen FROM builds WHERE name = ? ORDER BY number DESC LIMIT 1" : "SELECT output, outputLen FROM builds WHERE name = ? AND number = ?" ;
db->stmt(stmt)
.bind(name, num)
.fetch<str,int>([&](str maybeZipped, unsigned long sz) {
str log(sz,'\0');
if(sz >= COMPRESS_LOG_MIN_SIZE) {
int res = ::uncompress((uint8_t*) log.data(), &sz,
(const uint8_t*) maybeZipped.data(), maybeZipped.size());
if(res == Z_OK)
std::swap(output, log);
else
LLOG(ERROR, "Failed to uncompress log", res);
} else {
std::swap(output, maybeZipped);
}
});
if(output.size()) {
complete = true;
return true;
}
}
return false;
}
bool Laminar::setParam(std::string job, uint buildNum, std::string param, std::string value) {
if(Run* run = activeRun(job, buildNum)) {
run->params[param] = value;
@ -187,6 +234,7 @@ void Laminar::sendStatus(LaminarClient* client) {
client->sendMessage(maybeZipped);
}
});
client->notifyJobFinished();
}
return;
}
@ -785,6 +833,7 @@ void Laminar::runFinished(Run * r) {
for(LaminarClient* c : clients) {
if(c->scope.wantsStatus(r->name, r->build))
c->sendMessage(msg);
c->notifyJobFinished();
}
// notify the waiters

@ -1,5 +1,5 @@
///
/// Copyright 2015-2018 Oliver Giles
/// Copyright 2015-2019 Oliver Giles
///
/// This file is part of Laminar
///
@ -53,6 +53,8 @@ public:
void deregisterClient(LaminarClient* client) override;
void registerWaiter(LaminarWaiter* waiter) override;
void deregisterWaiter(LaminarWaiter* waiter) override;
uint latestRun(std::string job) override;
bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) override;
void sendStatus(LaminarClient* client) override;
bool setParam(std::string job, uint buildNum, std::string param, std::string value) override;

@ -665,15 +665,29 @@ const Run = function() {
autoscroll: false
};
var firstLog = false;
var logHandler = function(vm, d) {
state.log += ansi_up.ansi_to_html(d.replace(/</g,'&lt;').replace(/>/g,'&gt;').replace(/\033\[\{([^:]+):(\d+)\033\\/g, (m,$1,$2)=>{return '<a href="/jobs/'+$1+'" onclick="return vroute(this);">'+$1+'</a>:<a href="/jobs/'+$1+'/'+$2+'" onclick="return vroute(this);">#'+$2+'</a>';}));
vm.$forceUpdate();
if (!firstLog) {
firstLog = true;
} else if (state.autoscroll) {
window.scrollTo(0, document.body.scrollHeight);
}
};
const logFetcher = (vm, name, num) => {
const abort = new AbortController();
fetch('/log/'+name+'/'+num, {signal:abort.signal}).then(res => {
const reader = res.body.pipeThrough(new TextDecoderStream).getReader();
let total = 0;
return function pump() {
return reader.read().then(({done, value}) => {
if (done)
return;
state.log += ansi_up.ansi_to_html(value.replace(/</g,'&lt;').replace(/>/g,'&gt;').replace(/\033\[\{([^:]+):(\d+)\033\\/g, (m,$1,$2)=>{return '<a href="/jobs/'+$1+'" onclick="return vroute(this);">'+$1+'</a>:<a href="/jobs/'+$1+'/'+$2+'" onclick="return vroute(this);">#'+$2+'</a>';}));
vm.$forceUpdate();
if (!firstLog) {
firstLog = true;
} else if (state.autoscroll) {
window.scrollTo(0, document.body.scrollHeight);
}
return pump();
});
}
}).catch(e => {});
return abort;
}
return {
template: '#run',
@ -704,24 +718,18 @@ const Run = function() {
beforeRouteEnter(to, from, next) {
next(vm => {
state.log = '';
vm.logws = wsp(to.path + '/log');
vm.logws.onmessage = function(msg) {
logHandler(vm, msg.data);
}
vm.logstream = logFetcher(vm, to.params.name, to.params.number);
});
},
beforeRouteUpdate(to, from, next) {
var vm = this;
vm.logws.close();
vm.logstream.abort();
state.log = '';
vm.logws = wsp(to.path + '/log');
vm.logws.onmessage = function(msg) {
logHandler(vm, msg.data);
}
vm.logstream = logFetcher(vm, to.params.name, to.params.number);
next();
},
beforeRouteLeave(to, from, next) {
this.logws.close();
this.logstream.close();
next();
}
};

@ -1,5 +1,5 @@
///
/// Copyright 2015-2018 Oliver Giles
/// Copyright 2015-2019 Oliver Giles
///
/// This file is part of Laminar
///
@ -210,6 +210,30 @@ public:
virtual ~HttpImpl() {}
private:
class HttpChunkedClient : public LaminarClient {
public:
HttpChunkedClient(LaminarInterface& laminar) :
laminar(laminar)
{}
~HttpChunkedClient() override {
laminar.deregisterClient(this);
}
void sendMessage(std::string payload) override {
chunks.push_back(kj::mv(payload));
fulfiller->fulfill();
}
void notifyJobFinished() override {
done = true;
fulfiller->fulfill();
}
LaminarInterface& laminar;
std::list<std::string> chunks;
// cannot use chunks.empty() because multiple fulfill()s
// could be coalesced
bool done = false;
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
};
// Implements LaminarClient and holds the Websocket connection object.
// Automatically destructed when the promise created in request() resolves
// or is cancelled
@ -223,7 +247,7 @@ private:
laminar.deregisterClient(this);
}
virtual void sendMessage(std::string payload) override {
messages.push_back(kj::mv(payload));
messages.emplace_back(kj::mv(payload));
// sendMessage might be called several times before the event loop
// gets a chance to act on the fulfiller. So store the payload here
// where it can be fetched later and don't pass the payload with the
@ -322,39 +346,97 @@ private:
return connection;
}
// Parses the url of the form /log/NAME/NUMBER, filling in the passed
// references and returning true if successful. /log/NAME/latest is
// also allowed, in which case the num reference is set to 0
bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) {
if(url.startsWith("/log/")) {
kj::StringPtr path = url.slice(5);
KJ_IF_MAYBE(sep, path.findFirst('/')) {
name = path.slice(0, *sep).begin();
kj::StringPtr tail = path.slice(*sep+1);
num = static_cast<uint>(atoi(tail.begin()));
if(num > 0 || tail == "latest") {
name.erase(*sep);
return true;
}
}
}
return false;
}
kj::Promise<void> writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) {
auto paf = kj::newPromiseAndFulfiller<void>();
client->fulfiller = kj::mv(paf.fulfiller);
return paf.promise.then([=]{
kj::Promise<void> p = kj::READY_NOW;
std::list<std::string> chunks = kj::mv(client->chunks);
for(std::string& s : chunks) {
p = p.then([=,&s]{
return stream->write(s.data(), s.size());
});
}
return p.attach(kj::mv(chunks)).then([=]{
return client->done ? kj::Promise<void>(kj::READY_NOW) : writeLogChunk(client, stream);
});
});
}
virtual kj::Promise<void> request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override
{
std::string resource = url.cStr();
if(headers.isWebSocket()) {
responseHeaders.clear();
kj::Own<WebsocketClient> lc = kj::heap<WebsocketClient>(laminar, response.acceptWebSocket(responseHeaders));
return websocketUpgraded(*lc, resource).attach(kj::mv(lc));
return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc));
} else {
// handle regular HTTP request
const char* start, *end, *content_type;
std::string badge;
// for log requests
std::string name;
uint num;
responseHeaders.clear();
if(resource.compare(0, strlen("/archive/"), "/archive/") == 0) {
KJ_IF_MAYBE(file, laminar.getArtefact(resource.substr(strlen("/archive/")))) {
if(url.startsWith("/archive/")) {
KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) {
auto array = (*file)->mmap(0, (*file)->stat().size);
responseHeaders.add("Content-Transfer-Encoding", "binary");
auto stream = response.send(200, "OK", responseHeaders, array.size());
return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream));
}
} else if(resource.compare("/custom/style.css") == 0) {
} else if(parseLogEndpoint(url, name, num)) {
kj::Own<HttpChunkedClient> cc = kj::heap<HttpChunkedClient>(laminar);
cc->scope.job = name;
if(num == 0)
num = laminar.latestRun(name);
cc->scope.num = num;
bool complete;
std::string output;
cc->scope.type = MonitorScope::LOG;
if(laminar.handleLogRequest(name, num, output, complete)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8");
responseHeaders.add("Content-Transfer-Encoding", "binary");
auto stream = response.send(200, "OK", responseHeaders, nullptr);
laminar.registerClient(cc.get());
return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{
if(complete)
return kj::Promise<void>(kj::READY_NOW);
return writeLogChunk(c, s);
}).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc));
}
} else if(url == "/custom/style.css") {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8");
responseHeaders.add("Content-Transfer-Encoding", "binary");
std::string css = laminar.getCustomCss();
auto stream = response.send(200, "OK", responseHeaders, css.size());
return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream));
} else if(resources.handleRequest(resource, &start, &end, &content_type)) {
} else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type);
responseHeaders.add("Content-Encoding", "gzip");
responseHeaders.add("Content-Transfer-Encoding", "binary");
auto stream = response.send(200, "OK", responseHeaders, end-start);
return stream->write(start, end-start).attach(kj::mv(stream));
} else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(resource.substr(7,resource.length()-11), badge)) {
} else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(url.slice(7, url.size()-11).begin(), badge)) {
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml");
responseHeaders.add("Cache-Control", "no-cache");
auto stream = response.send(200, "OK", responseHeaders, badge.size());

@ -47,6 +47,9 @@ public:
MOCK_METHOD2(queueJob, std::shared_ptr<Run>(std::string name, ParamMap params));
MOCK_METHOD1(registerWaiter, void(LaminarWaiter* waiter));
MOCK_METHOD1(deregisterWaiter, void(LaminarWaiter* waiter));
MOCK_METHOD1(latestRun, uint(std::string));
MOCK_METHOD4(handleLogRequest, bool(std::string, uint, std::string&, bool&));
MOCK_METHOD1(sendStatus, void(LaminarClient* client));
MOCK_METHOD4(setParam, bool(std::string job, uint buildNum, std::string param, std::string value));
MOCK_METHOD0(listQueuedJobs, const std::list<std::shared_ptr<Run>>&());

Loading…
Cancel
Save