mirror of
https://github.com/ohwgiles/laminar.git
synced 2024-10-27 20:34:20 +00:00
resolves #79: serve logs over plain chunked http
This commit is contained in:
parent
5c7421c833
commit
cec4721e52
@ -205,6 +205,12 @@ This can be securely and flexibly combined with remote triggering using `ssh`. T
|
||||
|
||||
Consider using [webhook](https://github.com/adnanh/webhook) or a similar application to call `laminarc`.
|
||||
|
||||
## Viewing job logs
|
||||
|
||||
A job's console output can be viewed on the Web UI at http://localhost:8080/jobs/$NAME/$NUMBER.
|
||||
|
||||
Additionally, the raw log output may be fetched over a plain HTTP request to http://localhost:8080/log/$NAME/$NUMBER. The response will be chunked, allowing this mechanism to also be used for in-progress jobs. Furthermore, the special endpoint http://localhost:8080/log/$NAME/latest will redirect to the most recent log output. Be aware that the use of this endpoint may be subject to races when new jobs start.
|
||||
|
||||
---
|
||||
|
||||
# Job chains
|
||||
|
@ -1,5 +1,5 @@
|
||||
///
|
||||
/// Copyright 2015 Oliver Giles
|
||||
/// Copyright 2015-2019 Oliver Giles
|
||||
///
|
||||
/// This file is part of Laminar
|
||||
///
|
||||
@ -73,6 +73,8 @@ struct MonitorScope {
|
||||
struct LaminarClient {
|
||||
virtual ~LaminarClient() noexcept(false) {}
|
||||
virtual void sendMessage(std::string payload) = 0;
|
||||
// TODO: redesign
|
||||
virtual void notifyJobFinished() {}
|
||||
MonitorScope scope;
|
||||
};
|
||||
|
||||
@ -118,6 +120,13 @@ struct LaminarInterface {
|
||||
// to call LaminarWaiter::complete on invalid data
|
||||
virtual void deregisterWaiter(LaminarWaiter* waiter) = 0;
|
||||
|
||||
// Return the latest known number of the named job
|
||||
virtual uint latestRun(std::string job) = 0;
|
||||
|
||||
// Given a job name and number, return existence and (via reference params)
|
||||
// its current log output and whether the job is ongoing
|
||||
virtual bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) = 0;
|
||||
|
||||
// Synchronously send a snapshot of the current status to the given
|
||||
// client (as governed by the client's MonitorScope). This is called on
|
||||
// initial websocket connect.
|
||||
|
@ -1,5 +1,5 @@
|
||||
///
|
||||
/// Copyright 2015-2018 Oliver Giles
|
||||
/// Copyright 2015-2019 Oliver Giles
|
||||
///
|
||||
/// This file is part of Laminar
|
||||
///
|
||||
@ -122,6 +122,53 @@ void Laminar::deregisterWaiter(LaminarWaiter *waiter) {
|
||||
waiters.erase(waiter);
|
||||
}
|
||||
|
||||
uint Laminar::latestRun(std::string job) {
|
||||
auto it = activeJobs.byJobName().equal_range(job);
|
||||
if(it.first == it.second) {
|
||||
uint result = 0;
|
||||
db->stmt("SELECT MAX(buildNum) FROM builds WHERE name = ?")
|
||||
.bind(job)
|
||||
.fetch<uint>([&](uint x){
|
||||
result = x;
|
||||
});
|
||||
return result;
|
||||
} else {
|
||||
return (*--it.second)->build;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: reunify with sendStatus. The difference is that this method is capable of
|
||||
// returning "not found" to the caller, and sendStatus isn't
|
||||
bool Laminar::handleLogRequest(std::string name, uint num, std::string& output, bool& complete) {
|
||||
if(Run* run = activeRun(name, num)) {
|
||||
output = run->log;
|
||||
complete = false;
|
||||
return true;
|
||||
} else { // it must be finished, fetch it from the database
|
||||
const char* stmt = num == 0 ? "SELECT output, outputLen FROM builds WHERE name = ? ORDER BY number DESC LIMIT 1" : "SELECT output, outputLen FROM builds WHERE name = ? AND number = ?" ;
|
||||
db->stmt(stmt)
|
||||
.bind(name, num)
|
||||
.fetch<str,int>([&](str maybeZipped, unsigned long sz) {
|
||||
str log(sz,'\0');
|
||||
if(sz >= COMPRESS_LOG_MIN_SIZE) {
|
||||
int res = ::uncompress((uint8_t*) log.data(), &sz,
|
||||
(const uint8_t*) maybeZipped.data(), maybeZipped.size());
|
||||
if(res == Z_OK)
|
||||
std::swap(output, log);
|
||||
else
|
||||
LLOG(ERROR, "Failed to uncompress log", res);
|
||||
} else {
|
||||
std::swap(output, maybeZipped);
|
||||
}
|
||||
});
|
||||
if(output.size()) {
|
||||
complete = true;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool Laminar::setParam(std::string job, uint buildNum, std::string param, std::string value) {
|
||||
if(Run* run = activeRun(job, buildNum)) {
|
||||
run->params[param] = value;
|
||||
@ -187,6 +234,7 @@ void Laminar::sendStatus(LaminarClient* client) {
|
||||
client->sendMessage(maybeZipped);
|
||||
}
|
||||
});
|
||||
client->notifyJobFinished();
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -785,6 +833,7 @@ void Laminar::runFinished(Run * r) {
|
||||
for(LaminarClient* c : clients) {
|
||||
if(c->scope.wantsStatus(r->name, r->build))
|
||||
c->sendMessage(msg);
|
||||
c->notifyJobFinished();
|
||||
}
|
||||
|
||||
// notify the waiters
|
||||
|
@ -1,5 +1,5 @@
|
||||
///
|
||||
/// Copyright 2015-2018 Oliver Giles
|
||||
/// Copyright 2015-2019 Oliver Giles
|
||||
///
|
||||
/// This file is part of Laminar
|
||||
///
|
||||
@ -53,6 +53,8 @@ public:
|
||||
void deregisterClient(LaminarClient* client) override;
|
||||
void registerWaiter(LaminarWaiter* waiter) override;
|
||||
void deregisterWaiter(LaminarWaiter* waiter) override;
|
||||
uint latestRun(std::string job) override;
|
||||
bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) override;
|
||||
|
||||
void sendStatus(LaminarClient* client) override;
|
||||
bool setParam(std::string job, uint buildNum, std::string param, std::string value) override;
|
||||
|
@ -665,15 +665,29 @@ const Run = function() {
|
||||
autoscroll: false
|
||||
};
|
||||
var firstLog = false;
|
||||
var logHandler = function(vm, d) {
|
||||
state.log += ansi_up.ansi_to_html(d.replace(/</g,'<').replace(/>/g,'>').replace(/\033\[\{([^:]+):(\d+)\033\\/g, (m,$1,$2)=>{return '<a href="/jobs/'+$1+'" onclick="return vroute(this);">'+$1+'</a>:<a href="/jobs/'+$1+'/'+$2+'" onclick="return vroute(this);">#'+$2+'</a>';}));
|
||||
const logFetcher = (vm, name, num) => {
|
||||
const abort = new AbortController();
|
||||
fetch('/log/'+name+'/'+num, {signal:abort.signal}).then(res => {
|
||||
const reader = res.body.pipeThrough(new TextDecoderStream).getReader();
|
||||
let total = 0;
|
||||
return function pump() {
|
||||
return reader.read().then(({done, value}) => {
|
||||
if (done)
|
||||
return;
|
||||
state.log += ansi_up.ansi_to_html(value.replace(/</g,'<').replace(/>/g,'>').replace(/\033\[\{([^:]+):(\d+)\033\\/g, (m,$1,$2)=>{return '<a href="/jobs/'+$1+'" onclick="return vroute(this);">'+$1+'</a>:<a href="/jobs/'+$1+'/'+$2+'" onclick="return vroute(this);">#'+$2+'</a>';}));
|
||||
vm.$forceUpdate();
|
||||
if (!firstLog) {
|
||||
firstLog = true;
|
||||
} else if (state.autoscroll) {
|
||||
window.scrollTo(0, document.body.scrollHeight);
|
||||
}
|
||||
};
|
||||
return pump();
|
||||
});
|
||||
}
|
||||
}).catch(e => {});
|
||||
return abort;
|
||||
}
|
||||
|
||||
|
||||
return {
|
||||
template: '#run',
|
||||
@ -704,24 +718,18 @@ const Run = function() {
|
||||
beforeRouteEnter(to, from, next) {
|
||||
next(vm => {
|
||||
state.log = '';
|
||||
vm.logws = wsp(to.path + '/log');
|
||||
vm.logws.onmessage = function(msg) {
|
||||
logHandler(vm, msg.data);
|
||||
}
|
||||
vm.logstream = logFetcher(vm, to.params.name, to.params.number);
|
||||
});
|
||||
},
|
||||
beforeRouteUpdate(to, from, next) {
|
||||
var vm = this;
|
||||
vm.logws.close();
|
||||
vm.logstream.abort();
|
||||
state.log = '';
|
||||
vm.logws = wsp(to.path + '/log');
|
||||
vm.logws.onmessage = function(msg) {
|
||||
logHandler(vm, msg.data);
|
||||
}
|
||||
vm.logstream = logFetcher(vm, to.params.name, to.params.number);
|
||||
next();
|
||||
},
|
||||
beforeRouteLeave(to, from, next) {
|
||||
this.logws.close();
|
||||
this.logstream.close();
|
||||
next();
|
||||
}
|
||||
};
|
||||
|
100
src/server.cpp
100
src/server.cpp
@ -1,5 +1,5 @@
|
||||
///
|
||||
/// Copyright 2015-2018 Oliver Giles
|
||||
/// Copyright 2015-2019 Oliver Giles
|
||||
///
|
||||
/// This file is part of Laminar
|
||||
///
|
||||
@ -210,6 +210,30 @@ public:
|
||||
virtual ~HttpImpl() {}
|
||||
|
||||
private:
|
||||
class HttpChunkedClient : public LaminarClient {
|
||||
public:
|
||||
HttpChunkedClient(LaminarInterface& laminar) :
|
||||
laminar(laminar)
|
||||
{}
|
||||
~HttpChunkedClient() override {
|
||||
laminar.deregisterClient(this);
|
||||
}
|
||||
void sendMessage(std::string payload) override {
|
||||
chunks.push_back(kj::mv(payload));
|
||||
fulfiller->fulfill();
|
||||
}
|
||||
void notifyJobFinished() override {
|
||||
done = true;
|
||||
fulfiller->fulfill();
|
||||
}
|
||||
LaminarInterface& laminar;
|
||||
std::list<std::string> chunks;
|
||||
// cannot use chunks.empty() because multiple fulfill()s
|
||||
// could be coalesced
|
||||
bool done = false;
|
||||
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
|
||||
};
|
||||
|
||||
// Implements LaminarClient and holds the Websocket connection object.
|
||||
// Automatically destructed when the promise created in request() resolves
|
||||
// or is cancelled
|
||||
@ -223,7 +247,7 @@ private:
|
||||
laminar.deregisterClient(this);
|
||||
}
|
||||
virtual void sendMessage(std::string payload) override {
|
||||
messages.push_back(kj::mv(payload));
|
||||
messages.emplace_back(kj::mv(payload));
|
||||
// sendMessage might be called several times before the event loop
|
||||
// gets a chance to act on the fulfiller. So store the payload here
|
||||
// where it can be fetched later and don't pass the payload with the
|
||||
@ -322,39 +346,97 @@ private:
|
||||
return connection;
|
||||
}
|
||||
|
||||
// Parses the url of the form /log/NAME/NUMBER, filling in the passed
|
||||
// references and returning true if successful. /log/NAME/latest is
|
||||
// also allowed, in which case the num reference is set to 0
|
||||
bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) {
|
||||
if(url.startsWith("/log/")) {
|
||||
kj::StringPtr path = url.slice(5);
|
||||
KJ_IF_MAYBE(sep, path.findFirst('/')) {
|
||||
name = path.slice(0, *sep).begin();
|
||||
kj::StringPtr tail = path.slice(*sep+1);
|
||||
num = static_cast<uint>(atoi(tail.begin()));
|
||||
if(num > 0 || tail == "latest") {
|
||||
name.erase(*sep);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
kj::Promise<void> writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) {
|
||||
auto paf = kj::newPromiseAndFulfiller<void>();
|
||||
client->fulfiller = kj::mv(paf.fulfiller);
|
||||
return paf.promise.then([=]{
|
||||
kj::Promise<void> p = kj::READY_NOW;
|
||||
std::list<std::string> chunks = kj::mv(client->chunks);
|
||||
for(std::string& s : chunks) {
|
||||
p = p.then([=,&s]{
|
||||
return stream->write(s.data(), s.size());
|
||||
});
|
||||
}
|
||||
return p.attach(kj::mv(chunks)).then([=]{
|
||||
return client->done ? kj::Promise<void>(kj::READY_NOW) : writeLogChunk(client, stream);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
virtual kj::Promise<void> request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers,
|
||||
kj::AsyncInputStream& requestBody, Response& response) override
|
||||
{
|
||||
std::string resource = url.cStr();
|
||||
if(headers.isWebSocket()) {
|
||||
responseHeaders.clear();
|
||||
kj::Own<WebsocketClient> lc = kj::heap<WebsocketClient>(laminar, response.acceptWebSocket(responseHeaders));
|
||||
return websocketUpgraded(*lc, resource).attach(kj::mv(lc));
|
||||
return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc));
|
||||
} else {
|
||||
// handle regular HTTP request
|
||||
const char* start, *end, *content_type;
|
||||
std::string badge;
|
||||
// for log requests
|
||||
std::string name;
|
||||
uint num;
|
||||
responseHeaders.clear();
|
||||
if(resource.compare(0, strlen("/archive/"), "/archive/") == 0) {
|
||||
KJ_IF_MAYBE(file, laminar.getArtefact(resource.substr(strlen("/archive/")))) {
|
||||
if(url.startsWith("/archive/")) {
|
||||
KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) {
|
||||
auto array = (*file)->mmap(0, (*file)->stat().size);
|
||||
responseHeaders.add("Content-Transfer-Encoding", "binary");
|
||||
auto stream = response.send(200, "OK", responseHeaders, array.size());
|
||||
return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream));
|
||||
}
|
||||
} else if(resource.compare("/custom/style.css") == 0) {
|
||||
} else if(parseLogEndpoint(url, name, num)) {
|
||||
kj::Own<HttpChunkedClient> cc = kj::heap<HttpChunkedClient>(laminar);
|
||||
cc->scope.job = name;
|
||||
if(num == 0)
|
||||
num = laminar.latestRun(name);
|
||||
cc->scope.num = num;
|
||||
bool complete;
|
||||
std::string output;
|
||||
cc->scope.type = MonitorScope::LOG;
|
||||
if(laminar.handleLogRequest(name, num, output, complete)) {
|
||||
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8");
|
||||
responseHeaders.add("Content-Transfer-Encoding", "binary");
|
||||
auto stream = response.send(200, "OK", responseHeaders, nullptr);
|
||||
laminar.registerClient(cc.get());
|
||||
return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{
|
||||
if(complete)
|
||||
return kj::Promise<void>(kj::READY_NOW);
|
||||
return writeLogChunk(c, s);
|
||||
}).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc));
|
||||
}
|
||||
} else if(url == "/custom/style.css") {
|
||||
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8");
|
||||
responseHeaders.add("Content-Transfer-Encoding", "binary");
|
||||
std::string css = laminar.getCustomCss();
|
||||
auto stream = response.send(200, "OK", responseHeaders, css.size());
|
||||
return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream));
|
||||
} else if(resources.handleRequest(resource, &start, &end, &content_type)) {
|
||||
} else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) {
|
||||
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type);
|
||||
responseHeaders.add("Content-Encoding", "gzip");
|
||||
responseHeaders.add("Content-Transfer-Encoding", "binary");
|
||||
auto stream = response.send(200, "OK", responseHeaders, end-start);
|
||||
return stream->write(start, end-start).attach(kj::mv(stream));
|
||||
} else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(resource.substr(7,resource.length()-11), badge)) {
|
||||
} else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(url.slice(7, url.size()-11).begin(), badge)) {
|
||||
responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml");
|
||||
responseHeaders.add("Cache-Control", "no-cache");
|
||||
auto stream = response.send(200, "OK", responseHeaders, badge.size());
|
||||
|
@ -47,6 +47,9 @@ public:
|
||||
MOCK_METHOD2(queueJob, std::shared_ptr<Run>(std::string name, ParamMap params));
|
||||
MOCK_METHOD1(registerWaiter, void(LaminarWaiter* waiter));
|
||||
MOCK_METHOD1(deregisterWaiter, void(LaminarWaiter* waiter));
|
||||
MOCK_METHOD1(latestRun, uint(std::string));
|
||||
MOCK_METHOD4(handleLogRequest, bool(std::string, uint, std::string&, bool&));
|
||||
|
||||
MOCK_METHOD1(sendStatus, void(LaminarClient* client));
|
||||
MOCK_METHOD4(setParam, bool(std::string job, uint buildNum, std::string param, std::string value));
|
||||
MOCK_METHOD0(listQueuedJobs, const std::list<std::shared_ptr<Run>>&());
|
||||
|
Loading…
Reference in New Issue
Block a user