/// /// Copyright 2015-2019 Oliver Giles /// /// This file is part of Laminar /// /// Laminar is free software: you can redistribute it and/or modify /// it under the terms of the GNU General Public License as published by /// the Free Software Foundation, either version 3 of the License, or /// (at your option) any later version. /// /// Laminar is distributed in the hope that it will be useful, /// but WITHOUT ANY WARRANTY; without even the implied warranty of /// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the /// GNU General Public License for more details. /// /// You should have received a copy of the GNU General Public License /// along with Laminar. If not, see /// #include "http.h" #include "resources.h" #include "monitorscope.h" #include "log.h" #include "laminar.h" // Helper class which wraps another class with calls to // adding and removing a pointer to itself from a passed // std::set reference. Used to keep track of currently // connected clients template struct WithSetRef : public T { WithSetRef(std::set& set, Args&& ...args) : T(std::forward(args)...), _set(set) { _set.insert(this); } ~WithSetRef() { _set.erase(this); } private: std::set& _set; }; struct EventPeer { MonitorScope scope; std::list pendingOutput; kj::Own> fulfiller; }; struct LogWatcher { std::string job; uint run; std::list pendingOutput; kj::Own> fulfiller; }; kj::Maybe fromUrl(std::string resource, char* query) { MonitorScope scope; if(query) { char *sk; for(char* k = strtok_r(query, "&", &sk); k; k = strtok_r(nullptr, "&", &sk)) { if(char* v = strchr(k, '=')) { *v++ = '\0'; if(strcmp(k, "page") == 0) scope.page = atoi(v); else if(strcmp(k, "field") == 0) scope.field = v; else if(strcmp(k, "order") == 0) scope.order_desc = (strcmp(v, "dsc") == 0); } } } if(resource == "/") { scope.type = MonitorScope::HOME; return kj::mv(scope); } if(resource == "/jobs" || resource == "/wallboard") { scope.type = MonitorScope::ALL; return kj::mv(scope); } if(resource.substr(0, 5) != "/jobs") return nullptr; resource = resource.substr(5); size_t split = resource.find('/',1); std::string job = resource.substr(1,split-1); if(job.empty()) return nullptr; scope.job = job; scope.type = MonitorScope::JOB; if(split == std::string::npos) return kj::mv(scope); size_t split2 = resource.find('/', split+1); std::string run = resource.substr(split+1, split2-split); if(run.empty()) return nullptr; scope.num = static_cast(atoi(run.c_str())); scope.type = MonitorScope::RUN; return kj::mv(scope); } // Parses the url of the form /log/NAME/NUMBER, filling in the passed // references and returning true if successful. /log/NAME/latest is // also allowed, in which case the num reference is set to 0 bool Http::parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) { if(url.startsWith("/log/")) { kj::StringPtr path = url.slice(5); KJ_IF_MAYBE(sep, path.findFirst('/')) { name = path.slice(0, *sep).begin(); kj::StringPtr tail = path.slice(*sep+1); num = static_cast(atoi(tail.begin())); name.erase(*sep); if(tail == "latest") num = laminar.latestRun(name); if(num > 0) return true; } } return false; } kj::Promise Http::cleanupPeers(kj::Timer& timer) { return timer.afterDelay(15 * kj::SECONDS).then([&]{ for(EventPeer* p : eventPeers) { // Even single threaded, if load causes this timeout to be serviced // before writeEvents has created a fulfiller, or if an exception // caused the destruction of the promise but attach(peer) hasn't yet // removed it from the eventPeers list, we will see a null fulfiller // here if(p->fulfiller) { // an empty SSE message is a colon followed by two newlines p->pendingOutput.push_back(":\n\n"); p->fulfiller->fulfill(); } } return cleanupPeers(timer); }).eagerlyEvaluate(nullptr); } kj::Promise writeEvents(EventPeer* peer, kj::AsyncOutputStream* stream) { auto paf = kj::newPromiseAndFulfiller(); peer->fulfiller = kj::mv(paf.fulfiller); return paf.promise.then([=]{ kj::Promise p = kj::READY_NOW; std::list chunks = kj::mv(peer->pendingOutput); for(std::string& s : chunks) { p = p.then([=,&s]{ return stream->write(s.data(), s.size()); }); } return p.attach(kj::mv(chunks)).then([=]{ return writeEvents(peer, stream); }); }); } kj::Promise writeLogChunk(LogWatcher* client, kj::AsyncOutputStream* stream) { auto paf = kj::newPromiseAndFulfiller(); client->fulfiller = kj::mv(paf.fulfiller); return paf.promise.then([=](bool done){ kj::Promise p = kj::READY_NOW; std::list chunks = kj::mv(client->pendingOutput); for(std::string& s : chunks) { p = p.then([=,&s]{ return stream->write(s.data(), s.size()); }); } return p.attach(kj::mv(chunks)).then([=]{ return done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream); }); }); } kj::Promise Http::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders &headers, kj::AsyncInputStream &requestBody, HttpService::Response &response) { const char* start, *end, *content_type; std::string badge; // for log requests std::string name; uint num; kj::HttpHeaders responseHeaders(*headerTable); responseHeaders.clear(); bool is_sse = false; char* queryString = nullptr; // Clients usually expect that http servers will ignore unknown query parameters, // and expect to use this feature to work around browser limitations like there // being no way to programatically force a resource to be reloaded from the server // (without "Cache-Control: no-store", which is overkill). See issue #89. // So first parse any query parameters we *are* interested in, then simply remove // them from the URL, to make comparisions easier. KJ_IF_MAYBE(queryIdx, url.findFirst('?')) { const_cast(url.begin())[*queryIdx] = '\0'; queryString = const_cast(url.begin() + *queryIdx + 1); url = url.begin(); } KJ_IF_MAYBE(accept, headers.get(ACCEPT)) { is_sse = (*accept == "text/event-stream"); } if(is_sse) { KJ_IF_MAYBE(s, fromUrl(url.cStr(), queryString)) { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/event-stream"); // Disables nginx reverse-proxy's buffering. Necessary for streamed events. responseHeaders.add("X-Accel-Buffering", "no"); auto peer = kj::heap>(eventPeers); peer->scope = *s; std::string st = "data: " + laminar.getStatus(peer->scope) + "\n\n"; auto stream = response.send(200, "OK", responseHeaders); return stream->write(st.data(), st.size()).attach(kj::mv(st)).then([=,s=stream.get(),p=peer.get()]{ return writeEvents(p,s); }).attach(kj::mv(stream)).attach(kj::mv(peer)); } } else if(url.startsWith("/archive/")) { KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) { auto array = (*file)->mmap(0, (*file)->stat().size); responseHeaders.add("Content-Transfer-Encoding", "binary"); auto stream = response.send(200, "OK", responseHeaders, array.size()); return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); } } else if(parseLogEndpoint(url, name, num)) { bool complete; std::string output; if(laminar.handleLogRequest(name, num, output, complete)) { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8"); responseHeaders.add("Content-Transfer-Encoding", "binary"); // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output. responseHeaders.add("X-Accel-Buffering", "no"); auto stream = response.send(200, "OK", responseHeaders, nullptr); auto s = stream.get(); auto lw = kj::heap>(logWatchers); lw->job = name; lw->run = num; auto promise = writeLogChunk(lw.get(), stream.get()).attach(kj::mv(stream)).attach(kj::mv(lw)); return s->write(output.data(), output.size()).attach(kj::mv(output)).then([p=kj::mv(promise),complete]() mutable { if(complete) return kj::Promise(kj::READY_NOW); return kj::mv(p); }); } } else if(resources->handleRequest(url.cStr(), &start, &end, &content_type)) { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); responseHeaders.add("Content-Encoding", "gzip"); responseHeaders.add("Content-Transfer-Encoding", "binary"); auto stream = response.send(200, "OK", responseHeaders, end-start); return stream->write(start, end-start).attach(kj::mv(stream)); } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml"); responseHeaders.add("Cache-Control", "no-cache"); auto stream = response.send(200, "OK", responseHeaders, badge.size()); return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream)); } return response.sendError(404, "Not Found", responseHeaders); } Http::Http(Laminar &li) : laminar(li), resources(kj::heap()) { kj::HttpHeaderTable::Builder builder; ACCEPT = builder.add("Accept"); headerTable = builder.build(); } Http::~Http() { LASSERT(logWatchers.size() == 0); LASSERT(eventPeers.size() == 0); } kj::Promise Http::startServer(kj::Timer& timer, kj::Own&& listener) { kj::Own server = kj::heap(timer, *headerTable, *this); return server->listenHttp(*listener).attach(cleanupPeers(timer)).attach(kj::mv(listener)).attach(kj::mv(server)); } void Http::notifyEvent(const char *data, std::string job) { for(EventPeer* c : eventPeers) { if(c->scope.wantsStatus(job)) { c->pendingOutput.push_back("data: " + std::string(data) + "\n\n"); c->fulfiller->fulfill(); } } } void Http::notifyLog(std::string job, uint run, std::string log_chunk, bool eot) { for(LogWatcher* lw : logWatchers) { if(lw->job == job && lw->run == run) { lw->pendingOutput.push_back(log_chunk); lw->fulfiller->fulfill(kj::mv(eot)); } } } void Http::setHtmlTemplate(std::string tmpl) { resources->setHtmlTemplate(tmpl); }