/// /// Copyright 2015-2019 Oliver Giles /// /// This file is part of Laminar /// /// Laminar is free software: you can redistribute it and/or modify /// it under the terms of the GNU General Public License as published by /// the Free Software Foundation, either version 3 of the License, or /// (at your option) any later version. /// /// Laminar is distributed in the hope that it will be useful, /// but WITHOUT ANY WARRANTY; without even the implied warranty of /// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the /// GNU General Public License for more details. /// /// You should have received a copy of the GNU General Public License /// along with Laminar. If not, see /// #include "server.h" #include "interface.h" #include "laminar.capnp.h" #include "resources.h" #include "log.h" #include #include #include #include #include #include #include #include #include #include #include // Size of buffer used to read from file descriptors. Should be // a multiple of sizeof(struct signalfd_siginfo) == 128 #define PROC_IO_BUFSIZE 4096 namespace { // Used for returning run state to RPC clients LaminarCi::JobResult fromRunState(RunState state) { switch(state) { case RunState::SUCCESS: return LaminarCi::JobResult::SUCCESS; case RunState::FAILED: return LaminarCi::JobResult::FAILED; case RunState::ABORTED: return LaminarCi::JobResult::ABORTED; default: return LaminarCi::JobResult::UNKNOWN; } } } // This is the implementation of the Laminar Cap'n Proto RPC interface. // As such, it implements the pure virtual interface generated from // laminar.capnp with calls to the LaminarInterface class RpcImpl : public LaminarCi::Server, public LaminarWaiter { public: RpcImpl(LaminarInterface& l) : LaminarCi::Server(), laminar(l) { laminar.registerWaiter(this); } ~RpcImpl() override { laminar.deregisterWaiter(this); } // Queue a job, without waiting for it to start kj::Promise queue(QueueContext context) override { std::string jobName = context.getParams().getJobName(); LLOG(INFO, "RPC queue", jobName); LaminarCi::MethodResult result = laminar.queueJob(jobName, params(context.getParams().getParams())) ? LaminarCi::MethodResult::SUCCESS : LaminarCi::MethodResult::FAILED; context.getResults().setResult(result); return kj::READY_NOW; } // Start a job, without waiting for it to finish kj::Promise start(StartContext context) override { std::string jobName = context.getParams().getJobName(); LLOG(INFO, "RPC start", jobName); std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams())); if(Run* r = run.get()) { return r->whenStarted().then([context,r]() mutable { context.getResults().setResult(LaminarCi::MethodResult::SUCCESS); context.getResults().setBuildNum(r->build); }); } else { context.getResults().setResult(LaminarCi::MethodResult::FAILED); return kj::READY_NOW; } } // Start a job and wait for the result kj::Promise run(RunContext context) override { std::string jobName = context.getParams().getJobName(); LLOG(INFO, "RPC run", jobName); std::shared_ptr run = laminar.queueJob(jobName, params(context.getParams().getParams())); if(const Run* r = run.get()) { runWaiters[r].emplace_back(kj::newPromiseAndFulfiller()); return runWaiters[r].back().promise.then([context,run](RunState state) mutable { context.getResults().setResult(fromRunState(state)); context.getResults().setBuildNum(run->build); }); } else { context.getResults().setResult(LaminarCi::JobResult::UNKNOWN); return kj::READY_NOW; } } // Set a parameter on a running build kj::Promise set(SetContext context) override { std::string jobName = context.getParams().getRun().getJob(); uint buildNum = context.getParams().getRun().getBuildNum(); LLOG(INFO, "RPC set", jobName, buildNum); LaminarCi::MethodResult result = laminar.setParam(jobName, buildNum, context.getParams().getParam().getName(), context.getParams().getParam().getValue()) ? LaminarCi::MethodResult::SUCCESS : LaminarCi::MethodResult::FAILED; context.getResults().setResult(result); return kj::READY_NOW; } // List jobs in queue kj::Promise listQueued(ListQueuedContext context) override { const std::list>& queue = laminar.listQueuedJobs(); auto res = context.getResults().initResult(queue.size()); int i = 0; for(auto it : queue) { res.set(i++, it->name); } return kj::READY_NOW; } // List running jobs kj::Promise listRunning(ListRunningContext context) override { const RunSet& active = laminar.listRunningJobs(); auto res = context.getResults().initResult(active.size()); int i = 0; for(auto it : active) { res[i].setJob(it->name); res[i].setBuildNum(it->build); i++; } return kj::READY_NOW; } // List known jobs kj::Promise listKnown(ListKnownContext context) override { std::list known = laminar.listKnownJobs(); auto res = context.getResults().initResult(known.size()); int i = 0; for(auto it : known) { res.set(i++, it); } return kj::READY_NOW; } kj::Promise abort(AbortContext context) override { std::string jobName = context.getParams().getRun().getJob(); uint buildNum = context.getParams().getRun().getBuildNum(); LLOG(INFO, "RPC abort", jobName, buildNum); LaminarCi::MethodResult result = laminar.abort(jobName, buildNum) ? LaminarCi::MethodResult::SUCCESS : LaminarCi::MethodResult::FAILED; context.getResults().setResult(result); return kj::READY_NOW; } private: // Helper to convert an RPC parameter list to a hash map ParamMap params(const capnp::List::Reader& paramReader) { ParamMap res; for(auto p : paramReader) { res[p.getName().cStr()] = p.getValue().cStr(); } return res; } // Implements LaminarWaiter::complete void complete(const Run* r) override { for(kj::PromiseFulfillerPair& w : runWaiters[r]) w.fulfiller->fulfill(RunState(r->result)); runWaiters.erase(r); } private: LaminarInterface& laminar; std::unordered_map>> runWaiters; }; // This is the implementation of the HTTP/Websocket interface. It creates // websocket connections as LaminarClients and registers them with the // LaminarInterface so that status messages will be delivered to the client. // On opening a websocket connection, it delivers a status snapshot message // (see LaminarInterface::sendStatus) class HttpImpl : public kj::HttpService { public: HttpImpl(LaminarInterface& laminar, kj::HttpHeaderTable&tbl) : laminar(laminar), responseHeaders(tbl) {} virtual ~HttpImpl() {} private: class HttpChunkedClient : public LaminarClient { public: HttpChunkedClient(LaminarInterface& laminar) : laminar(laminar) {} ~HttpChunkedClient() override { laminar.deregisterClient(this); } void sendMessage(std::string payload) override { chunks.push_back(kj::mv(payload)); fulfiller->fulfill(); } void notifyJobFinished() override { done = true; fulfiller->fulfill(); } LaminarInterface& laminar; std::list chunks; // cannot use chunks.empty() because multiple fulfill()s // could be coalesced bool done = false; kj::Own> fulfiller; }; // Implements LaminarClient and holds the Websocket connection object. // Automatically destructed when the promise created in request() resolves // or is cancelled class WebsocketClient : public LaminarClient { public: WebsocketClient(LaminarInterface& laminar, kj::Own&& ws) : laminar(laminar), ws(kj::mv(ws)) {} ~WebsocketClient() override { laminar.deregisterClient(this); } virtual void sendMessage(std::string payload) override { messages.emplace_back(kj::mv(payload)); // sendMessage might be called several times before the event loop // gets a chance to act on the fulfiller. So store the payload here // where it can be fetched later and don't pass the payload with the // fulfiller because subsequent calls to fulfill() are ignored. fulfiller->fulfill(); } LaminarInterface& laminar; kj::Own ws; std::list messages; kj::Own> fulfiller; }; kj::Promise websocketRead(WebsocketClient& lc) { return lc.ws->receive().then([&lc,this](kj::WebSocket::Message&& message) { KJ_SWITCH_ONEOF(message) { KJ_CASE_ONEOF(str, kj::String) { rapidjson::Document d; d.ParseInsitu(const_cast(str.cStr())); if(d.HasMember("page") && d["page"].IsInt() && d.HasMember("field") && d["field"].IsString() && d.HasMember("order") && d["order"].IsString()) { lc.scope.page = d["page"].GetInt(); lc.scope.field = d["field"].GetString(); lc.scope.order_desc = strcmp(d["order"].GetString(), "dsc") == 0; laminar.sendStatus(&lc); return websocketRead(lc); } } KJ_CASE_ONEOF(close, kj::WebSocket::Close) { // clean socket shutdown return lc.ws->close(close.code, close.reason); } KJ_CASE_ONEOF_DEFAULT {} } // unhandled/unknown message return lc.ws->disconnect(); }, [](kj::Exception&& e){ // server logs suggest early catching here avoids fatal exception later // TODO: reproduce in unit test LLOG(WARNING, e.getDescription()); return kj::READY_NOW; }); } kj::Promise websocketWrite(WebsocketClient& lc) { auto paf = kj::newPromiseAndFulfiller(); lc.fulfiller = kj::mv(paf.fulfiller); return paf.promise.then([this,&lc]{ kj::Promise p = kj::READY_NOW; std::list messages = kj::mv(lc.messages); for(std::string& s : messages) { p = p.then([&s,&lc]{ kj::String str = kj::str(s); return lc.ws->send(str).attach(kj::mv(str)); }); } return p.attach(kj::mv(messages)).then([this,&lc]{ return websocketWrite(lc); }); }); } kj::Promise websocketUpgraded(WebsocketClient& lc, std::string resource) { // convert the requested URL to a MonitorScope if(resource.substr(0, 5) == "/jobs") { if(resource.length() == 5) { lc.scope.type = MonitorScope::ALL; } else { resource = resource.substr(5); size_t split = resource.find('/',1); std::string job = resource.substr(1,split-1); if(!job.empty()) { lc.scope.job = job; lc.scope.type = MonitorScope::JOB; } if(split != std::string::npos) { size_t split2 = resource.find('/', split+1); std::string run = resource.substr(split+1, split2-split); if(!run.empty()) { lc.scope.num = static_cast(atoi(run.c_str())); lc.scope.type = MonitorScope::RUN; } if(split2 != std::string::npos && resource.compare(split2, 4, "/log") == 0) { lc.scope.type = MonitorScope::LOG; } } } } laminar.registerClient(&lc); kj::Promise connection = websocketRead(lc).exclusiveJoin(websocketWrite(lc)); // registerClient can happen after a successful websocket handshake. // However, the connection might not be closed gracefully, so the // corresponding deregister operation happens in the WebsocketClient // destructor rather than the close handler or a then() clause laminar.sendStatus(&lc); return connection; } // Parses the url of the form /log/NAME/NUMBER, filling in the passed // references and returning true if successful. /log/NAME/latest is // also allowed, in which case the num reference is set to 0 bool parseLogEndpoint(kj::StringPtr url, std::string& name, uint& num) { if(url.startsWith("/log/")) { kj::StringPtr path = url.slice(5); KJ_IF_MAYBE(sep, path.findFirst('/')) { name = path.slice(0, *sep).begin(); kj::StringPtr tail = path.slice(*sep+1); num = static_cast(atoi(tail.begin())); name.erase(*sep); if(tail == "latest") num = laminar.latestRun(name); if(num > 0) return true; } } return false; } kj::Promise writeLogChunk(HttpChunkedClient* client, kj::AsyncOutputStream* stream) { auto paf = kj::newPromiseAndFulfiller(); client->fulfiller = kj::mv(paf.fulfiller); return paf.promise.then([=]{ kj::Promise p = kj::READY_NOW; std::list chunks = kj::mv(client->chunks); for(std::string& s : chunks) { p = p.then([=,&s]{ return stream->write(s.data(), s.size()); }); } return p.attach(kj::mv(chunks)).then([=]{ return client->done ? kj::Promise(kj::READY_NOW) : writeLogChunk(client, stream); }); }); } virtual kj::Promise request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, kj::AsyncInputStream& requestBody, Response& response) override { if(headers.isWebSocket()) { responseHeaders.clear(); kj::Own lc = kj::heap(laminar, response.acceptWebSocket(responseHeaders)); return websocketUpgraded(*lc, url.cStr()).attach(kj::mv(lc)); } else { // handle regular HTTP request const char* start, *end, *content_type; std::string badge; // for log requests std::string name; uint num; responseHeaders.clear(); // Clients usually expect that http servers will ignore unknown query parameters, // and expect to use this feature to work around browser limitations like there // being no way to programatically force a resource to be reloaded from the server // (without "Cache-Control: no-store", which is overkill). See issue #89. // Since we currently don't handle any query parameters at all, the easiest way // to achieve this is unconditionally remove all query parameters from the request. // This will need to be redone if we ever accept query parameters, which probably // will happen as part of issue #90. KJ_IF_MAYBE(queryIdx, url.findFirst('?')) { const_cast(url.begin())[*queryIdx] = '\0'; url = url.begin(); } if(url.startsWith("/archive/")) { KJ_IF_MAYBE(file, laminar.getArtefact(url.slice(strlen("/archive/")))) { auto array = (*file)->mmap(0, (*file)->stat().size); responseHeaders.add("Content-Transfer-Encoding", "binary"); auto stream = response.send(200, "OK", responseHeaders, array.size()); return stream->write(array.begin(), array.size()).attach(kj::mv(array)).attach(kj::mv(file)).attach(kj::mv(stream)); } } else if(parseLogEndpoint(url, name, num)) { kj::Own cc = kj::heap(laminar); cc->scope.job = name; cc->scope.num = num; bool complete; std::string output; cc->scope.type = MonitorScope::LOG; if(laminar.handleLogRequest(name, num, output, complete)) { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/plain; charset=utf-8"); responseHeaders.add("Content-Transfer-Encoding", "binary"); // Disables nginx reverse-proxy's buffering. Necessary for dynamic log output. responseHeaders.add("X-Accel-Buffering", "no"); auto stream = response.send(200, "OK", responseHeaders, nullptr); laminar.registerClient(cc.get()); return stream->write(output.data(), output.size()).then([=,s=stream.get(),c=cc.get()]{ if(complete) return kj::Promise(kj::READY_NOW); return writeLogChunk(c, s); }).attach(kj::mv(output)).attach(kj::mv(stream)).attach(kj::mv(cc)); } } else if(url == "/custom/style.css") { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "text/css; charset=utf-8"); responseHeaders.add("Content-Transfer-Encoding", "binary"); std::string css = laminar.getCustomCss(); auto stream = response.send(200, "OK", responseHeaders, css.size()); return stream->write(css.data(), css.size()).attach(kj::mv(css)).attach(kj::mv(stream)); } else if(resources.handleRequest(url.cStr(), &start, &end, &content_type)) { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, content_type); responseHeaders.add("Content-Encoding", "gzip"); responseHeaders.add("Content-Transfer-Encoding", "binary"); auto stream = response.send(200, "OK", responseHeaders, end-start); return stream->write(start, end-start).attach(kj::mv(stream)); } else if(url.startsWith("/badge/") && url.endsWith(".svg") && laminar.handleBadgeRequest(std::string(url.begin()+7, url.size()-11), badge)) { responseHeaders.set(kj::HttpHeaderId::CONTENT_TYPE, "image/svg+xml"); responseHeaders.add("Cache-Control", "no-cache"); auto stream = response.send(200, "OK", responseHeaders, badge.size()); return stream->write(badge.data(), badge.size()).attach(kj::mv(badge)).attach(kj::mv(stream)); } return response.sendError(404, "Not Found", responseHeaders); } } LaminarInterface& laminar; Resources resources; kj::HttpHeaders responseHeaders; }; // Context for an RPC connection struct RpcConnection { RpcConnection(kj::Own&& stream, capnp::Capability::Client bootstrap, capnp::ReaderOptions readerOpts) : stream(kj::mv(stream)), network(*this->stream, capnp::rpc::twoparty::Side::SERVER, readerOpts), rpcSystem(capnp::makeRpcServer(network, bootstrap)) { } kj::Own stream; capnp::TwoPartyVatNetwork network; capnp::RpcSystem rpcSystem; }; Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, kj::StringPtr httpBindAddress) : rpcInterface(kj::heap(li)), laminarInterface(li), ioContext(kj::setupAsyncIo()), headerTable(), httpService(kj::heap(li, headerTable)), httpServer(kj::heap(ioContext.provider->getTimer(), headerTable, *httpService)), listeners(kj::heap(*this)), childTasks(*this), httpConnections(*this), httpReady(kj::newPromiseAndFulfiller()) { // RPC task if(rpcBindAddress.startsWith("unix:")) unlink(rpcBindAddress.slice(strlen("unix:")).cStr()); listeners->add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress) .then([this](kj::Own&& addr) { return acceptRpcClient(addr->listen()); })); // HTTP task if(httpBindAddress.startsWith("unix:")) unlink(httpBindAddress.slice(strlen("unix:")).cStr()); listeners->add(ioContext.provider->getNetwork().parseAddress(httpBindAddress) .then([this](kj::Own&& addr) { // TODO: a better way? Currently used only for testing httpReady.fulfiller->fulfill(); kj::Own listener = addr->listen(); return httpServer->listenHttp(*listener).attach(kj::mv(listener)); })); // handle watched paths { inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); pathWatch = readDescriptor(inotify_fd, [this](const char*, size_t){ laminarInterface.notifyConfigChanged(); }); } } Server::~Server() { } void Server::start() { // The eventfd is used to quit the server later since we need to trigger // a reaction from the event loop efd_quit = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); kj::evalLater([this](){ static uint64_t _; auto wakeEvent = ioContext.lowLevelProvider->wrapInputFd(efd_quit); return wakeEvent->read(&_, sizeof(uint64_t)).attach(std::move(wakeEvent)); }).wait(ioContext.waitScope); // Execution arrives here when the eventfd is triggered (in stop()) // Shutdown sequence: // 1. stop accepting new connections listeners = nullptr; // 2. abort current jobs. Most of the time this isn't necessary since // systemd stop or other kill mechanism will send SIGTERM to the whole // process group. laminarInterface.abortAll(); // 3. wait for all children to close childTasks.onEmpty().wait(ioContext.waitScope); // 4. run the loop once more to send any pending output to websocket clients ioContext.waitScope.poll(); // 5. return: websockets will be destructed when class is deleted } void Server::stop() { // This method is expected to be called in signal context, so an eventfd // is used to get the main loop to react. See run() eventfd_write(efd_quit, 1); } kj::Promise Server::readDescriptor(int fd, std::function cb) { auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); auto buffer = kj::heapArrayBuilder(PROC_IO_BUFSIZE); return handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer)); } void Server::addTask(kj::Promise&& task) { childTasks.add(kj::mv(task)); } kj::Promise Server::addTimeout(int seconds, std::function cb) { return ioContext.lowLevelProvider->getTimer().afterDelay(seconds * kj::SECONDS).then([cb](){ cb(); }).eagerlyEvaluate(nullptr); } kj::Promise Server::onChildExit(kj::Maybe &pid) { return ioContext.unixEventPort.onChildExit(pid); } void Server::addWatchPath(const char* dpath) { inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE); } kj::Promise Server::acceptRpcClient(kj::Own&& listener) { kj::ConnectionReceiver& cr = *listener.get(); return cr.accept().then(kj::mvCapture(kj::mv(listener), [this](kj::Own&& listener, kj::Own&& connection) { auto server = kj::heap(kj::mv(connection), rpcInterface, capnp::ReaderOptions()); childTasks.add(server->network.onDisconnect().attach(kj::mv(server))); return acceptRpcClient(kj::mv(listener)); })); } // returns a promise which will read a chunk of data from the file descriptor // wrapped by stream and invoke the provided callback with the read data. // Repeats until ::read returns <= 0 kj::Promise Server::handleFdRead(kj::AsyncInputStream* stream, char* buffer, std::function cb) { return stream->tryRead(buffer, 1, PROC_IO_BUFSIZE).then([this,stream,buffer,cb](size_t sz) { if(sz > 0) { cb(buffer, sz); return handleFdRead(stream, kj::mv(buffer), cb); } return kj::Promise(kj::READY_NOW); }); } void Server::taskFailed(kj::Exception &&exception) { //kj::throwFatalException(kj::mv(exception)); // prettier fprintf(stderr, "fatal: %s\n", exception.getDescription().cStr()); exit(EXIT_FAILURE); }