From cec4721e52726476269605a10868fc45446ee684 Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Fri, 15 Feb 2019 19:05:44 +0200 Subject: [PATCH] resolves #79: serve logs over plain chunked http --- UserManual.md | 6 +++ src/interface.h | 11 ++++- src/laminar.cpp | 51 +++++++++++++++++++- src/laminar.h | 4 +- src/resources/js/app.js | 46 ++++++++++-------- src/server.cpp | 100 ++++++++++++++++++++++++++++++++++++---- test/test-server.cpp | 3 ++ 7 files changed, 190 insertions(+), 31 deletions(-) diff --git a/UserManual.md b/UserManual.md index 855c2ae..79580ff 100644 --- a/UserManual.md +++ b/UserManual.md @@ -205,6 +205,12 @@ This can be securely and flexibly combined with remote triggering using `ssh`. T Consider using [webhook](https://github.com/adnanh/webhook) or a similar application to call `laminarc`. +## Viewing job logs + +A job's console output can be viewed on the Web UI at http://localhost:8080/jobs/$NAME/$NUMBER. + +Additionally, the raw log output may be fetched over a plain HTTP request to http://localhost:8080/log/$NAME/$NUMBER. The response will be chunked, allowing this mechanism to also be used for in-progress jobs. Furthermore, the special endpoint http://localhost:8080/log/$NAME/latest will redirect to the most recent log output. Be aware that the use of this endpoint may be subject to races when new jobs start. + --- # Job chains diff --git a/src/interface.h b/src/interface.h index 2fdae7a..25387ea 100644 --- a/src/interface.h +++ b/src/interface.h @@ -1,5 +1,5 @@ /// -/// Copyright 2015 Oliver Giles +/// Copyright 2015-2019 Oliver Giles /// /// This file is part of Laminar /// @@ -73,6 +73,8 @@ struct MonitorScope { struct LaminarClient { virtual ~LaminarClient() noexcept(false) {} virtual void sendMessage(std::string payload) = 0; + // TODO: redesign + virtual void notifyJobFinished() {} MonitorScope scope; }; @@ -118,6 +120,13 @@ struct LaminarInterface { // to call LaminarWaiter::complete on invalid data virtual void deregisterWaiter(LaminarWaiter* waiter) = 0; + // Return the latest known number of the named job + virtual uint latestRun(std::string job) = 0; + + // Given a job name and number, return existence and (via reference params) + // its current log output and whether the job is ongoing + virtual bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) = 0; + // Synchronously send a snapshot of the current status to the given // client (as governed by the client's MonitorScope). This is called on // initial websocket connect. diff --git a/src/laminar.cpp b/src/laminar.cpp index 9de858a..a7d3c73 100644 --- a/src/laminar.cpp +++ b/src/laminar.cpp @@ -1,5 +1,5 @@ /// -/// Copyright 2015-2018 Oliver Giles +/// Copyright 2015-2019 Oliver Giles /// /// This file is part of Laminar /// @@ -122,6 +122,53 @@ void Laminar::deregisterWaiter(LaminarWaiter *waiter) { waiters.erase(waiter); } +uint Laminar::latestRun(std::string job) { + auto it = activeJobs.byJobName().equal_range(job); + if(it.first == it.second) { + uint result = 0; + db->stmt("SELECT MAX(buildNum) FROM builds WHERE name = ?") + .bind(job) + .fetch([&](uint x){ + result = x; + }); + return result; + } else { + return (*--it.second)->build; + } +} + +// TODO: reunify with sendStatus. The difference is that this method is capable of +// returning "not found" to the caller, and sendStatus isn't +bool Laminar::handleLogRequest(std::string name, uint num, std::string& output, bool& complete) { + if(Run* run = activeRun(name, num)) { + output = run->log; + complete = false; + return true; + } else { // it must be finished, fetch it from the database + const char* stmt = num == 0 ? "SELECT output, outputLen FROM builds WHERE name = ? ORDER BY number DESC LIMIT 1" : "SELECT output, outputLen FROM builds WHERE name = ? AND number = ?" ; + db->stmt(stmt) + .bind(name, num) + .fetch([&](str maybeZipped, unsigned long sz) { + str log(sz,'\0'); + if(sz >= COMPRESS_LOG_MIN_SIZE) { + int res = ::uncompress((uint8_t*) log.data(), &sz, + (const uint8_t*) maybeZipped.data(), maybeZipped.size()); + if(res == Z_OK) + std::swap(output, log); + else + LLOG(ERROR, "Failed to uncompress log", res); + } else { + std::swap(output, maybeZipped); + } + }); + if(output.size()) { + complete = true; + return true; + } + } + return false; +} + bool Laminar::setParam(std::string job, uint buildNum, std::string param, std::string value) { if(Run* run = activeRun(job, buildNum)) { run->params[param] = value; @@ -187,6 +234,7 @@ void Laminar::sendStatus(LaminarClient* client) { client->sendMessage(maybeZipped); } }); + client->notifyJobFinished(); } return; } @@ -785,6 +833,7 @@ void Laminar::runFinished(Run * r) { for(LaminarClient* c : clients) { if(c->scope.wantsStatus(r->name, r->build)) c->sendMessage(msg); + c->notifyJobFinished(); } // notify the waiters diff --git a/src/laminar.h b/src/laminar.h index d5c2c63..8780d22 100644 --- a/src/laminar.h +++ b/src/laminar.h @@ -1,5 +1,5 @@ /// -/// Copyright 2015-2018 Oliver Giles +/// Copyright 2015-2019 Oliver Giles /// /// This file is part of Laminar /// @@ -53,6 +53,8 @@ public: void deregisterClient(LaminarClient* client) override; void registerWaiter(LaminarWaiter* waiter) override; void deregisterWaiter(LaminarWaiter* waiter) override; + uint latestRun(std::string job) override; + bool handleLogRequest(std::string name, uint num, std::string& output, bool& complete) override; void sendStatus(LaminarClient* client) override; bool setParam(std::string job, uint buildNum, std::string param, std::string value) override; diff --git a/src/resources/js/app.js b/src/resources/js/app.js index 3b97645..99e07f7 100644 --- a/src/resources/js/app.js +++ b/src/resources/js/app.js @@ -665,15 +665,29 @@ const Run = function() { autoscroll: false }; var firstLog = false; - var logHandler = function(vm, d) { - state.log += ansi_up.ansi_to_html(d.replace(//g,'>').replace(/\033\[\{([^:]+):(\d+)\033\\/g, (m,$1,$2)=>{return ''+$1+':#'+$2+'';})); - vm.$forceUpdate(); - if (!firstLog) { - firstLog = true; - } else if (state.autoscroll) { - window.scrollTo(0, document.body.scrollHeight); - } - }; + const logFetcher = (vm, name, num) => { + const abort = new AbortController(); + fetch('/log/'+name+'/'+num, {signal:abort.signal}).then(res => { + const reader = res.body.pipeThrough(new TextDecoderStream).getReader(); + let total = 0; + return function pump() { + return reader.read().then(({done, value}) => { + if (done) + return; + state.log += ansi_up.ansi_to_html(value.replace(//g,'>').replace(/\033\[\{([^:]+):(\d+)\033\\/g, (m,$1,$2)=>{return ''+$1+':#'+$2+'';})); + vm.$forceUpdate(); + if (!firstLog) { + firstLog = true; + } else if (state.autoscroll) { + window.scrollTo(0, document.body.scrollHeight); + } + return pump(); + }); + } + }).catch(e => {}); + return abort; + } + return { template: '#run', @@ -704,24 +718,18 @@ const Run = function() { beforeRouteEnter(to, from, next) { next(vm => { state.log = ''; - vm.logws = wsp(to.path + '/log'); - vm.logws.onmessage = function(msg) { - logHandler(vm, msg.data); - } + vm.logstream = logFetcher(vm, to.params.name, to.params.number); }); }, beforeRouteUpdate(to, from, next) { var vm = this; - vm.logws.close(); + vm.logstream.abort(); state.log = ''; - vm.logws = wsp(to.path + '/log'); - vm.logws.onmessage = function(msg) { - logHandler(vm, msg.data); - } + vm.logstream = logFetcher(vm, to.params.name, to.params.number); next(); }, beforeRouteLeave(to, from, next) { - this.logws.close(); + this.logstream.close(); next(); } }; diff --git a/src/server.cpp b/src/server.cpp index 65667a1..7e61747 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1,5 +1,5 @@ /// -/// Copyright 2015-2018 Oliver Giles +/// Copyright 2015-2019 Oliver Giles /// /// This file is part of Laminar /// @@ -210,6 +210,30 @@ public: virtual ~HttpImpl() {} private: + class HttpChunkedClient : public LaminarClient { + public: + HttpChunkedClient(LaminarInterface& laminar) : + laminar(laminar) + {} + ~HttpChunkedClient() override { + laminar.deregisterClient(this); + } + void sendMessage(std::string payload) override { + chunks.push_back(kj::mv(payload)); + fulfiller->fulfill(); + } + void notifyJobFinished() override { + done = true; + fulfiller->fulfill(); + } + LaminarInterface& laminar; + std::list chunks; + // cannot use chunks.empty() because multiple fulfill()s + // could be coalesced + bool done = false; + kj::Own> fulfiller; + }; + // Implements LaminarClient and holds the Websocket connection object. // Automatically destructed when the promise created in request() resolves // or is cancelled @@ -223,7 +247,7 @@ private: laminar.deregisterClient(this); } virtual void sendMessage(std::string payload) override { - messages.push_back(kj::mv(payload)); + messages.emplace_back(kj::mv(payload)); // sendMessage might be called several times before the event loop // gets a chance to act on the fulfiller. So store the payload here // where it can be fetched later and don't pass the payload with the @@ -322,39 +346,97 @@ private: return connection; } + // Parses the url of the form /log/NAME/NUMBER, filling in the passed + // references and returning true if successful. /log/NAME/latest is + // also allowed, in which case the num reference is set to 0 + bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) { + if(url.startsWith("/log/")) { + kj::StringPtr path = url.slice(5); + KJ_IF_MAYBE(sep, path.findFirst('/')) { + name = path.slice(0, *sep).begin(); + kj::StringPtr tail = path.slice(*sep+1); + num = static_cast(atoi(tail.begin())); + if(num > 0 || tail == "latest") { + name.erase(*sep); + return true; + } + } + } + return false; + } + + kj::Promise writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) { + auto paf = kj::newPromiseAndFulfiller(); + client->fulfiller = kj::mv(paf.fulfiller); + return paf.promise.then([=]{ + kj::Promise p = kj::READY_NOW; + std::list chunks = kj::mv(client->chunks); + for(std::string& s : chunks) { + p = p.then([=,&s]{ + return stream->write(s.data(), s.size()); + }); + } + return p.attach(kj::mv(chunks)).then([=]{ + return client->done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream); + }); + }); + } + virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, kj::AsyncInputStream& requestBody, Response& response) override { - std::string resource = url.cStr(); if(headers.isWebSocket()) { responseHeaders.clear(); kj::Own lc = kj::heap(laminar, response.acceptWebSocket(responseHeaders)); - return websocketUpgraded(*lc, resource).attach(kj::mv(lc)); + return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc)); } else { // handle regular HTTP request const char* start, *end, *content_type; std::string badge; + // for log requests + std::string name; + uint num; responseHeaders.clear(); - if(resource.compare(0, strlen("/archive/"), "/archive/") == 0) { - KJ_IF_MAYBE(file, laminar.getArtefact(resource.substr(strlen("/archive/")))) { + if(url.startsWith("/archive/")) { + KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) { auto array = (*file)->mmap(0, (*file)->stat().size); responseHeaders.add("Content-Transfer-Encoding", "binary"); auto stream = response.send(200, "OK", responseHeaders, array.size()); return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); } - } else if(resource.compare("/custom/style.css") == 0) { + } else if(parseLogEndpoint(url, name, num)) { + kj::Own cc = kj::heap(laminar); + cc->scope.job = name; + if(num == 0) + num = laminar.latestRun(name); + cc->scope.num = num; + bool complete; + std::string output; + cc->scope.type = MonitorScope::LOG; + if(laminar.handleLogRequest(name, num, output, complete)) { + responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8"); + responseHeaders.add("Content-Transfer-Encoding", "binary"); + auto stream = response.send(200, "OK", responseHeaders, nullptr); + laminar.registerClient(cc.get()); + return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{ + if(complete) + return kj::Promise(kj::READY_NOW); + return writeLogChunk(c, s); + }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc)); + } + } else if(url == "/custom/style.css") { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8"); responseHeaders.add("Content-Transfer-Encoding", "binary"); std::string css = laminar.getCustomCss(); auto stream = response.send(200, "OK", responseHeaders, css.size()); return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream)); - } else if(resources.handleRequest(resource, &start, &end, &content_type)) { + } else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); responseHeaders.add("Content-Encoding", "gzip"); responseHeaders.add("Content-Transfer-Encoding", "binary"); auto stream = response.send(200, "OK", responseHeaders, end-start); return stream->write(start, end-start).attach(kj::mv(stream)); - } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(resource.substr(7,resource.length()-11), badge)) { + } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(url.slice(7, url.size()-11).begin(), badge)) { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml"); responseHeaders.add("Cache-Control", "no-cache"); auto stream = response.send(200, "OK", responseHeaders, badge.size()); diff --git a/test/test-server.cpp b/test/test-server.cpp index 81b5a78..74601b1 100644 --- a/test/test-server.cpp +++ b/test/test-server.cpp @@ -47,6 +47,9 @@ public: MOCK_METHOD2(queueJob, std::shared_ptr(std::string name, ParamMap params)); MOCK_METHOD1(registerWaiter, void(LaminarWaiter* waiter)); MOCK_METHOD1(deregisterWaiter, void(LaminarWaiter* waiter)); + MOCK_METHOD1(latestRun, uint(std::string)); + MOCK_METHOD4(handleLogRequest, bool(std::string, uint, std::string&, bool&)); + MOCK_METHOD1(sendStatus, void(LaminarClient* client)); MOCK_METHOD4(setParam, bool(std::string job, uint buildNum, std::string param, std::string value)); MOCK_METHOD0(listQueuedJobs, const std::list>&());