From a2701dcfd9ff1535e9fe22d74f4fa59dd6d8678c Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Sun, 13 Sep 2015 22:25:26 +0200 Subject: [PATCH] Initial commit --- CMakeLists.txt | 92 ++++++ README.md | 5 + src/client.cpp | 165 +++++++++++ src/database.cpp | 75 +++++ src/database.h | 142 +++++++++ src/interface.h | 113 +++++++ src/laminar.capnp | 28 ++ src/laminar.cpp | 535 ++++++++++++++++++++++++++++++++++ src/laminar.h | 90 ++++++ src/main.cpp | 53 ++++ src/node.cpp | 25 ++ src/node.h | 41 +++ src/resources.cpp | 67 +++++ src/resources.h | 41 +++ src/resources/index.html | 45 +++ src/resources/js/app.js | 250 ++++++++++++++++ src/resources/tpl/browse.html | 16 + src/resources/tpl/home.html | 58 ++++ src/resources/tpl/job.html | 22 ++ src/resources/tpl/log.html | 8 + src/resources/tpl/run.html | 13 + src/run.cpp | 116 ++++++++ src/run.h | 134 +++++++++ src/server.cpp | 414 ++++++++++++++++++++++++++ src/server.h | 62 ++++ 25 files changed, 2610 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 README.md create mode 100644 src/client.cpp create mode 100644 src/database.cpp create mode 100644 src/database.h create mode 100644 src/interface.h create mode 100644 src/laminar.capnp create mode 100644 src/laminar.cpp create mode 100644 src/laminar.h create mode 100644 src/main.cpp create mode 100644 src/node.cpp create mode 100644 src/node.h create mode 100644 src/resources.cpp create mode 100644 src/resources.h create mode 100644 src/resources/index.html create mode 100644 src/resources/js/app.js create mode 100644 src/resources/tpl/browse.html create mode 100644 src/resources/tpl/home.html create mode 100644 src/resources/tpl/job.html create mode 100644 src/resources/tpl/log.html create mode 100644 src/resources/tpl/run.html create mode 100644 src/run.cpp create mode 100644 src/run.h create mode 100644 src/server.cpp create mode 100644 src/server.h diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..a30a07d --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,92 @@ +### +### Copyright 2015 Oliver Giles +### +### This file is part of Laminar +### +### Laminar is free software: you can redistribute it and/or modify +### it under the terms of the GNU General Public License as published by +### the Free Software Foundation, either version 3 of the License, or +### (at your option) any later version. +### +### Laminar is distributed in the hope that it will be useful, +### but WITHOUT ANY WARRANTY; without even the implied warranty of +### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +### GNU General Public License for more details. +### +### You should have received a copy of the GNU General Public License +### along with Laminar. If not, see +### +project(laminar) +cmake_minimum_required(VERSION 2.8) + +set(CMAKE_INCLUDE_CURRENT_DIR ON) + +add_definitions("-std=c++11 -Wall -Wextra -Wno-unused-parameter -Wno-sign-compare") +set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Werror -DDEBUG") + +# This macro takes a list of files, gzips them and converts the output into +# object files so they can be linked directly into the application. +# ld generates symbols based on the string argument given to its executable, +# so it is significant from which directory it is called. BASEDIR will be +# removed from the beginning of paths to the remaining arguments +macro(generate_compressed_bins BASEDIR) + foreach(FILE ${ARGN}) + set(COMPRESSED_FILE "${FILE}.z") + set(OUTPUT_FILE "${FILE}.o") + get_filename_component(DIR ${FILE} DIRECTORY) + if(DIR) + file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${DIR}) + endif() + add_custom_command(OUTPUT ${COMPRESSED_FILE} + COMMAND gzip < ${BASEDIR}/${FILE} > ${COMPRESSED_FILE} + DEPENDS ${BASEDIR}/${FILE} + ) + add_custom_command(OUTPUT ${OUTPUT_FILE} + COMMAND ld -r -b binary -o ${OUTPUT_FILE} ${COMPRESSED_FILE} + DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/${COMPRESSED_FILE} + ) + list(APPEND COMPRESSED_BINS ${OUTPUT_FILE}) + endforeach() +endmacro() + +# Generates Cap'n Proto interface from definition file +add_custom_command(OUTPUT laminar.capnp.c++ laminar.capnp.h + COMMAND capnp compile -oc++:${CMAKE_BINARY_DIR} + --src-prefix=${CMAKE_SOURCE_DIR}/src ${CMAKE_SOURCE_DIR}/src/laminar.capnp + DEPENDS src/laminar.capnp) + +# Zip and compile statically served resources +generate_compressed_bins(${CMAKE_SOURCE_DIR}/src/resources index.html js/app.js + tpl/home.html tpl/job.html tpl/run.html tpl/log.html tpl/browse.html) +# Download 3rd-party frontend JS libs... +file(DOWNLOAD https://ajax.googleapis.com/ajax/libs/angularjs/1.3.14/angular.min.js + js/angular.min.js EXPECTED_MD5 b1137641dbb512a60e83d673f7e2d98f) +file(DOWNLOAD https://ajax.googleapis.com/ajax/libs/angularjs/1.3.14/angular-route.min.js + js/angular-route.min.js EXPECTED_MD5 28ef7d7b4349ae0dce602748185ef32a) +file(DOWNLOAD https://ajax.googleapis.com/ajax/libs/angularjs/1.3.14/angular-sanitize.min.js + js/angular-sanitize.min.js EXPECTED_MD5 0854eae86bcdf5f92b1ab2b458d8d054) +file(DOWNLOAD https://raw.githubusercontent.com/drudru/ansi_up/v1.3.0/ansi_up.js + js/ansi_up.js EXPECTED_MD5 158566dc1ff8f2804de972f7e841e2f6) +file(DOWNLOAD https://cdnjs.cloudflare.com/ajax/libs/Chart.js/1.0.2/Chart.min.js + js/Chart.min.js EXPECTED_MD5 0d3004601c1a855a3d203502549528a7) +file(DOWNLOAD https://raw.githubusercontent.com/tomsouthall/Chart.HorizontalBar.js/v1.04/Chart.HorizontalBar.js + js/Chart.HorizontalBar.js EXPECTED_MD5 95070a38e69bc56534e1b2086d985270) +file(DOWNLOAD https://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/css/bootstrap.min.css + css/bootstrap.min.css EXPECTED_MD5 5d5357cb3704e1f43a1f5bfed2aebf42) +# ...and compile them +generate_compressed_bins(${CMAKE_BINARY_DIR} js/angular.min.js js/angular-route.min.js + js/angular-sanitize.min.js js/ansi_up.js js/Chart.min.js js/Chart.HorizontalBar.js + css/bootstrap.min.css) +# (see resources.cpp where these are fetched) + +## Server +add_executable(laminard src/database.cpp src/main.cpp src/server.cpp src/laminar.cpp + src/resources.cpp src/run.cpp src/node.cpp laminar.capnp.c++ ${COMPRESSED_BINS}) +# TODO: some alternative to boost::filesystem? +target_link_libraries(laminard capnp capnp-rpc kj-async kj boost_filesystem boost_system sqlite3 iniparser) + +## Client +add_executable(laminarc src/client.cpp laminar.capnp.c++) +target_link_libraries(laminarc capnp capnp-rpc kj-async kj) + + diff --git a/README.md b/README.md new file mode 100644 index 0000000..06064f4 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +## laminar + +Lightweight, linuxy Continuous Integration service. + +Alpha. Docs coming. diff --git a/src/client.cpp b/src/client.cpp new file mode 100644 index 0000000..abcd540 --- /dev/null +++ b/src/client.cpp @@ -0,0 +1,165 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "laminar.capnp.h" + +#include +#include + +#include +#include +#include + +#define EFAILED 55 + +template +static int setParams(int argc, char** argv, T& request) { + int n = 0; + for(int i = 0; i < argc; ++i) { + if(strchr(argv[i], '=') == NULL) + break; + n++; + } + + int argsConsumed = n; + + char* job = getenv("lJobName"); + char* num = getenv("lBuildNum"); + char* reason = getenv("LAMINAR_REASON"); + + if(job && num) n+=2; + else if(reason) n++; + + if(n == 0) return argsConsumed; + + auto params = request.initParams(n); + + for(int i = 0; i < argsConsumed; ++i) { + char* name = argv[i]; + char* val = strchr(name, '='); + *val++ = '\0'; + params[i].setName(name); + params[i].setValue(val); + } + + if(job && num) { + params[argsConsumed].setName("=parentJob"); + params[argsConsumed].setValue(job); + params[argsConsumed+1].setName("=parentBuild"); + params[argsConsumed+1].setValue(num); + } else if(reason) { + params[argsConsumed].setName("=reason"); + params[argsConsumed].setValue(reason); + } + + return argsConsumed; +} + +int main(int argc, char** argv) { + // TODO: pass this through an enviroment variable set by laminard + const char* address = "unix:\0laminar"; + + if(argc < 2) { + fprintf(stderr, "Usage: %s [parameters...]\n", argv[0]); + return EINVAL; + } + + capnp::EzRpcClient client(address); + LaminarCi::Client laminar = client.getMain(); + + auto& waitScope = client.getWaitScope(); + + if(strcmp(argv[1], "trigger") == 0) { + if(argc < 3) { + fprintf(stderr, "Usage %s trigger \n", argv[0]); + return EINVAL; + } + kj::Vector> promises; + int jobNameIndex = 2; + // make a request for each job specified on the commandline + do { + auto req = laminar.triggerRequest(); + req.setJobName(argv[jobNameIndex]); + int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req); + promises.add(req.send()); + jobNameIndex += n + 1; + } while(jobNameIndex < argc); + // pend on the promises + for(auto& p : promises) { + if(p.wait(waitScope).getResult() != LaminarCi::MethodResult::SUCCESS) { + fprintf(stderr, "Failed to queue job '%s'\n", argv[2]); + return ENOENT; + } + } + } else if(strcmp(argv[1], "start") == 0) { + if(argc < 3) { + fprintf(stderr, "Usage %s start \n", argv[0]); + return EINVAL; + } + kj::Vector> promises; + int jobNameIndex = 2; + // make a request for each job specified on the commandline + do { + auto req = laminar.startRequest(); + req.setJobName(argv[jobNameIndex]); + int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req); + promises.add(req.send()); + jobNameIndex += n + 1; + } while(jobNameIndex < argc); + // pend on the promises + for(auto& p : promises) { + if(p.wait(waitScope).getResult() != LaminarCi::JobResult::SUCCESS) { + return EFAILED; + } + } + } else if(strcmp(argv[1], "set") == 0) { + if(argc < 3) { + fprintf(stderr, "Usage %s set param=value\n", argv[0]); + return EINVAL; + } + auto req = laminar.setRequest(); + char* eq = strchr(argv[2], '='); + char* job = getenv("lJobName"); + char* num = getenv("lBuildNum"); + if(job && num && eq) { + char* name = argv[2]; + *eq++ = '\0'; + char* val = eq; + req.setJobName(job); + req.setBuildNum(atoi(num)); + req.getParam().setName(name); + req.getParam().setValue(val); + req.send().wait(waitScope); + } else { + fprintf(stderr, "Missing lJobName and lBuildNum or param is not in the format key=value\n"); + return EINVAL; + } + } else if(strcmp(argv[1], "wait") == 0) { + auto req = laminar.pendRequest(); + req.setJobName(argv[2]); + req.setBuildNum(atoi(argv[3])); + auto response = req.send().wait(waitScope); + if(response.getResult() != LaminarCi::JobResult::SUCCESS) + return EFAILED; + } else { + fprintf(stderr, "Unknown comand %s\n", argv[1]); + return EINVAL; + } + + return 0; +} diff --git a/src/database.cpp b/src/database.cpp new file mode 100644 index 0000000..053e3db --- /dev/null +++ b/src/database.cpp @@ -0,0 +1,75 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "database.h" + +#include + +Database::Database(const char *path) { + sqlite3_open(path, &hdl); +} + +Database::~Database() { + sqlite3_close(hdl); +} + +Database::Statement::Statement(sqlite3 *db, const char *query) { + sqlite3_prepare_v2(db, query, -1, &stmt, NULL); +} + +Database::Statement::~Statement() { + sqlite3_finalize(stmt); +} + + +bool Database::Statement::exec() { + return sqlite3_step(stmt) == SQLITE_OK; +} + +void Database::Statement::bindValue(int i, int e) { + sqlite3_bind_int(stmt, i, e); +} + +void Database::Statement::bindValue(int i, const char* e) { + sqlite3_bind_text(stmt, i, e, -1, NULL); + +} + +void Database::Statement::bindValue(int i, std::string e) { + sqlite3_bind_text(stmt, i, e.c_str(), e.length(), NULL); +} + +template<> std::string Database::Statement::fetchColumn(int col) { + return (char*)sqlite3_column_text(stmt, col); +} + +template<> const char* Database::Statement::fetchColumn(int col) { + return (char*)sqlite3_column_text(stmt, col); +} + +template<> int Database::Statement::fetchColumn(int col) { + return sqlite3_column_int(stmt, col); +} + +template<> time_t Database::Statement::fetchColumn(int col) { + return sqlite3_column_int64(stmt, col); +} + +bool Database::Statement::row() { + return sqlite3_step(stmt) == SQLITE_ROW; +} diff --git a/src/database.h b/src/database.h new file mode 100644 index 0000000..a4c0c99 --- /dev/null +++ b/src/database.h @@ -0,0 +1,142 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef _LAMINAR_DATABASE_H_ +#define _LAMINAR_DATABASE_H_ + +#include +#include + +struct sqlite3; +struct sqlite3_stmt; + +// This is a small sqlite wrapper using some clever template action +// to somewhat reduce verbosity. Usage: +// db.stmt("SELECT result WHERE name = ?") +// .bind(name) +// .fetch([](int result) { +// // function called for each retrieved row +// doSomething(result); +// }); +class Database { +public: + Database(const char* path); + ~Database(); + +private: + // Represents a database statement. Call Database::stmt() to get + // one, then call bind(), fetch() or exec() on the returned object + class Statement { + private: + // Internal template helper that defines the type + // in the variadic type array Args at offset N + template + struct typeindex : typeindex {}; + template + struct typeindex<0, T, Args...> { typedef T type; }; + + public: + Statement(sqlite3* db, const char* query); + ~Statement(); + + // Bind several parameters in a single call. They are bound + // by index in the order passed into this function + template + Statement& bind(Args...args) { + return bindRecursive(1, args...); + } + // Fetch columns. Supply a callback that will be executed for + // each row in the resultset, with arguments matching the + // expected column types + template + void fetch(typename typeindex<0, std::function>::type callback) { + FetchMarshaller fm(this, callback); + } + // execute without fetching any parameters. Intended for + // non-SELECT statements; + bool exec(); + + private: + // Internal template helper used to unpack arguments into + // the fetch callback. + template struct rng { }; + // Internal template helper to generate a rng<> object: + // genrng<4>::type is rng<0,1,2,3> + template + struct genrng : genrng {}; + template + struct genrng<0, N...> { typedef rng type; }; + + template + struct FetchMarshaller { + FetchMarshaller(Statement* st, std::function cb){ + marshal(st, cb, typename genrng::type()); + } + template + void marshal(Statement* st, std::function cb, rng) { + while(st->row()) { + cb(st->fetchColumn::type>(N)...); + } + } + }; + template + friend class FetchMarshaller; + + bool row(); + + template + Statement& bindRecursive(int i, T v, Args...args) { + bindValue(i, v); // specialization must exist for T + return bindRecursive(i + 1, args...); + } + // template terminating condition + Statement& bindRecursive(int) { + return *this; + } + + // Bind value specializations + void bindValue(int i, int e); + void bindValue(int i, const char* e); + void bindValue(int i, std::string e); + + // Declaration for fetch column interface, + // intentionally missing definition + template + T fetchColumn(int col); + + sqlite3_stmt* stmt; + }; + +public: + Statement stmt(const char* q) { + return Statement(hdl, q); + } + // shorthand + bool exec(const char* q) { return Statement(hdl, q).exec(); } +private: + + sqlite3* hdl; +}; + +// specialization declarations, defined in source file +template<> std::string Database::Statement::fetchColumn(int col); +template<> const char* Database::Statement::fetchColumn(int col); +template<> int Database::Statement::fetchColumn(int col); +template<> time_t Database::Statement::fetchColumn(int col); + +#endif // _LAMINAR_DATABASE_H_ diff --git a/src/interface.h b/src/interface.h new file mode 100644 index 0000000..584686f --- /dev/null +++ b/src/interface.h @@ -0,0 +1,113 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef INTERFACE_H +#define INTERFACE_H + +#include "run.h" + +#include + +#include +#include +#include + +typedef std::unordered_map ParamMap; + +// Simple struct to define which information a frontend client is interested +// in, both in initial request phase and real-time updates. It corresponds +// loosely to frontend URLs +struct MonitorScope { + enum Type { + HOME, // home page: recent builds and statistics + ALL, // browse jobs + JOB, // a specific job page + RUN, // a specific run page + LOG // a run's log page + }; + + MonitorScope(Type type = HOME, std::string job = std::string(), int num = 0) : + type(type), + job(job), + num(num) + {} + + // whether this scope wants status information about the given job or run + bool wantsStatus(std::string ajob, int anum = 0) const { + if(type == HOME || type == ALL) return true; + if(type == JOB) return ajob == job; + if(type == RUN) return ajob == job && anum == num; + return false; + } + + bool wantsLog(std::string ajob, int anum) const { + return type == LOG && ajob == job && anum == num; + } + + Type type; + std::string job; + int num = 0; +}; + +// Represents a (websocket) client that wants to be notified about events +// matching the supplied scope. Pass instances of this to LaminarInterface +// registerClient and deregisterClient +struct LaminarClient { + virtual void sendMessage(std::string payload) = 0; + MonitorScope scope; +}; + +// The interface connecting the network layer to the application business +// logic. These methods fulfil the requirements of both the HTTP/Websocket +// and RPC interfaces. +struct LaminarInterface { + // Queues a job, returns immediately. Return value will be nullptr if + // the supplied name is not a known job. + virtual std::shared_ptr queueJob(std::string name, ParamMap params = ParamMap()) = 0; + + // Returns a promise that will wait for a run matching the given name + // and build number to complete. The promise will resolve to the result + // of the run. If no such run exists, the status will be RunState::UNKNOWN + virtual kj::Promise waitForRun(std::string name, int buildNum) = 0; + + // Specialization of above for an existing Run object (for example returned + // from queueJob). Returned promise will never resolve to RunState::UNKNOWN + virtual kj::Promise waitForRun(const Run*) = 0; + + // Register a client (but don't give up ownership). The client will be + // notified with a JSON message of any events matching its scope + // (see LaminarClient and MonitorScope above) + virtual void registerClient(LaminarClient* client) = 0; + + // Call this before destroying a client so that Laminar doesn't try + // to call LaminarClient::sendMessage on invalid data + virtual void deregisterClient(LaminarClient* client) = 0; + + // Synchronously send a snapshot of the current status to the given + // client (as governed by the client's MonitorScope). This is called on + // initial websocket connect. + virtual void sendStatus(LaminarClient* client) = 0; + + // Implements the laminar client interface allowing the setting of + // arbitrary parameters on a run (usually itself) to be available in + // the environment of subsequent scripts. + virtual bool setParam(std::string job, int buildNum, std::string param, std::string value) = 0; +}; + +#endif // INTERFACE_H + diff --git a/src/laminar.capnp b/src/laminar.capnp new file mode 100644 index 0000000..25ffdc7 --- /dev/null +++ b/src/laminar.capnp @@ -0,0 +1,28 @@ +@0xc2cbd510f16dab57; + +interface LaminarCi { + + trigger @0 (jobName :Text, params :List(JobParam)) -> (result :MethodResult); + start @1 (jobName :Text, params :List(JobParam)) -> (result :JobResult); + pend @2 (jobName :Text, buildNum :UInt32) -> (result :JobResult); + set @3 (jobName :Text, buildNum :UInt32, param :JobParam) -> (result :MethodResult); + + struct JobParam { + name @0 :Text; + value @1 :Text; + } + + enum MethodResult { + failed @0; + success @1; + } + + enum JobResult { + unknown @0; + failed @1; + aborted @2; + success @3; + } + +} + diff --git a/src/laminar.cpp b/src/laminar.cpp new file mode 100644 index 0000000..196fbce --- /dev/null +++ b/src/laminar.cpp @@ -0,0 +1,535 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "laminar.h" +#include "server.h" + +#include +#include +#include + +#include +namespace fs = boost::filesystem; + +#include +#include + +namespace { + +// rapidjson::Writer with a StringBuffer is used a lot in Laminar for +// preparing JSON messages to send to Websocket clients. A small wrapper +// class here reduces verbosity later for this common use case. +class Json : public rapidjson::Writer { +public: + Json() : rapidjson::Writer(buf) { StartObject(); } + template + Json& set(const char* key, T value); + Json& startObject(const char* key) { String(key); StartObject(); return *this; } + Json& startArray(const char* key) { String(key); StartArray(); return *this; } + const char* str() { EndObject(); return buf.GetString(); } +private: + rapidjson::StringBuffer buf; +}; +template<> Json& Json::set(const char* key, const char* value) { String(key); String(value); return *this; } +template<> Json& Json::set(const char* key, std::string value) { String(key); String(value.c_str()); return *this; } +template<> Json& Json::set(const char* key, int value) { String(key); Int(value); return *this; } +template<> Json& Json::set(const char* key, time_t value) { String(key); Int64(value); return *this; } + +} + +namespace { +// Default values when none were supplied in $LAMINAR_CONF_FILE (/etc/laminar.conf) +constexpr const char* INTADDR_RPC_DEFAULT = "unix:\0laminar"; +constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080"; +constexpr const char* BASE_CFG_DIR = "/home/og/dev/laminar/cfg"; +} + +typedef std::string str; + +Laminar::Laminar(const char* configFile) { + // read params from config file + conf = iniparser_load(configFile); + KJ_REQUIRE(conf != nullptr, "Could not parse", configFile); + homeDir = iniparser_getstring(conf, ":LAMINAR_HOME", "/var/lib/laminar"); + + db = new Database((fs::path(homeDir)/"laminar.sqlite").string().c_str()); + // Prepare database for first use + // TODO: error handling + db->exec("CREATE TABLE IF NOT EXISTS builds(" + "name TEXT, number INT UNSIGNED, node TEXT, queuedAt INT, startedAt INT, completedAt INT, result INT, output TEXT, parentJob TEXT, parentBuild INT, reason TEXT)"); + + // retrieve the last build numbers + db->stmt("SELECT name, MAX(number) FROM builds GROUP BY name") + .fetch([this](str name, int build){ + buildNums[name] = build; + }); + + // This is only a separate function because I imagined that it would + // be nice to reload some configuration during runtime without restarting + // the server completely. Currently not called from anywhere else + // TODO: implement that + loadConfiguration(); +} + +void Laminar::registerClient(LaminarClient* client) { + clients.insert(client); +} + +void Laminar::deregisterClient(LaminarClient* client) { + clients.erase(client); +} + +bool Laminar::setParam(std::string job, int buildNum, std::string param, std::string value) { + auto it = activeJobs.get<1>().find(std::make_tuple(job, buildNum)); + if(it == activeJobs.get<1>().end()) + return false; + std::shared_ptr run = *it; + run->params[param] = value; + return true; +} + +void Laminar::sendStatus(LaminarClient* client) { + if(client->scope.type == MonitorScope::LOG) { + // If the requested job is currently in progress + auto it = activeJobs.get<1>().find(std::make_tuple(client->scope.job, client->scope.num)); + if(it != activeJobs.get<1>().end()) { + client->sendMessage((*it)->log.c_str()); + } else { // it must be finished, fetch it from the database + db->stmt("SELECT output FROM builds WHERE name = ? AND number = ?") + .bind(client->scope.job, client->scope.num) + .fetch([=](const char* log) { + client->sendMessage(log); + }); + } + } else if(client->scope.type == MonitorScope::RUN) { + Json j; + j.set("type", "status"); + j.startObject("data"); + db->stmt("SELECT startedAt, result, reason FROM builds WHERE name = ? AND number = ?") + .bind(client->scope.job, client->scope.num) + .fetch([&](time_t started, int result, std::string reason) { + j.set("started", started); + j.set("result", to_string(RunState(result))); + j.set("reason", reason); + }); + j.EndObject(); + client->sendMessage(j.str()); + } else if(client->scope.type == MonitorScope::JOB) { + Json j; + j.set("type", "status"); + j.startObject("data"); + j.startArray("recent"); + db->stmt("SELECT * FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 5") + .bind(client->scope.job) + .fetch([&](str name,int build,str node,time_t,time_t started,time_t completed,int result){ + j.StartObject(); + j.set("name", name) + .set("number", build) + .set("node", node) + .set("duration", completed - started) + .set("started", started) + .set("result", to_string(RunState(result))) + .EndObject(); + }); + j.EndArray(); + j.startArray("running"); + auto p = activeJobs.get<4>().equal_range(client->scope.job); + for(auto it = p.first; it != p.second; ++it) { + const std::shared_ptr run = *it; + j.StartObject(); + j.set("name", run->name); + j.set("number", run->build); + j.set("node", run->node->name); + j.set("started", run->startedAt); + j.EndObject(); + } + j.EndArray(); + j.startArray("queued"); + for(const std::shared_ptr run : queuedJobs) { + if (run->name == client->scope.job) { + j.StartObject(); + j.set("name", run->name); + j.EndObject(); + } + } + j.EndArray(); + j.EndObject(); + client->sendMessage(j.str()); + + } else if(client->scope.type == MonitorScope::ALL) { + Json j; + j.set("type", "status"); + j.startObject("data"); + j.startArray("jobs"); + db->stmt("SELECT name FROM builds GROUP BY name") + .fetch([&](str name){ + j.StartObject(); + j.set("name", name) + .EndObject(); + }); + j.EndArray(); + j.EndObject(); + client->sendMessage(j.str()); + } else { // Home page + Json j; + j.set("type", "status"); + j.startObject("data"); + j.startArray("recent"); + db->stmt("SELECT * FROM builds ORDER BY completedAt DESC LIMIT 5") + .fetch([&](str name,int build,str node,time_t,time_t started,time_t completed,int result){ + j.StartObject(); + j.set("name", name) + .set("number", build) + .set("node", node) + .set("duration", completed - started) + .set("started", started) + .set("result", to_string(RunState(result))) + .EndObject(); + }); + j.EndArray(); + j.startArray("running"); + for(const std::shared_ptr run : activeJobs.get<3>()) { + j.StartObject(); + j.set("name", run->name); + j.set("number", run->build); + j.set("node", run->node->name); + j.set("started", run->startedAt); + j.EndObject(); + } + j.EndArray(); + j.startArray("queued"); + for(const std::shared_ptr run : queuedJobs) { + j.StartObject(); + j.set("name", run->name); + j.EndObject(); + } + j.EndArray(); + int execTotal = 0; + int execBusy = 0; + for(const auto& it : nodes) { + const Node& node = it.second; + execTotal += node.numExecutors; + execBusy += node.busyExecutors; + } + j.set("executorsTotal", execTotal); + j.set("executorsBusy", execBusy); + j.startArray("buildsPerDay"); + for(int i = 6; i >= 0; --i) { + j.StartObject(); + db->stmt("SELECT result, COUNT(*) FROM builds WHERE completedAt > ? AND completedAt < ? GROUP by result") + .bind(86400*(time(0)/86400 - i), 86400*(time(0)/86400 - (i-1))) + .fetch([&](int result, int num){ + j.set(to_string(RunState(result)).c_str(), num); + }); + j.EndObject(); + } + j.EndArray(); + j.startObject("buildsPerJob"); + db->stmt("SELECT name, COUNT(*) FROM builds WHERE completedAt > ? GROUP BY name") + .bind(time(0) - 86400) + .fetch([&](str job, int count){ + j.set(job.c_str(), count); + }); + j.EndObject(); + + j.EndObject(); + client->sendMessage(j.str()); + } +} + +Laminar::~Laminar() { + iniparser_freedict(conf); + delete db; + delete srv; +} + +void Laminar::run() { + const char* listen_rpc = iniparser_getstring(conf, ":LAMINAR_BIND_RPC", INTADDR_RPC_DEFAULT); + const char* listen_http = iniparser_getstring(conf, ":LAMINAR_BIND_HTTP", INTADDR_HTTP_DEFAULT); + + srv = new Server(*this, listen_rpc, listen_http); + + srv->start(); +} + +void Laminar::stop() { + clients.clear(); + srv->stop(); +} + +bool Laminar::loadConfiguration() { + NodeMap nm; + + fs::directory_iterator dit(fs::path(homeDir)/"cfg"/"nodes"); + for(fs::directory_entry& it : dit) { + if(!fs::is_directory(it.status())) + continue; + + fs::directory_entry config(it.path()/"config"); + if(!fs::is_regular_file(config.status())) + continue; + + dictionary* ini = iniparser_load(config.path().string().c_str()); + if(!ini) { + KJ_LOG(ERROR, "Could not parse node config", config.path().string()); + continue; + } + + int executors = iniparser_getint(ini, ":EXECUTORS", 6); + + Node node; + node.name = it.path().filename().string(); + node.numExecutors = executors; + nm.emplace(node.name, std::move(node)); + + iniparser_freedict(ini); + + } + + if(nm.empty()) { + // add a default node + Node node; + node.name = "default"; + node.numExecutors = 6; + nm.emplace("default", std::move(node)); + } + + nodes = nm; + + return true; +} + +std::shared_ptr Laminar::queueJob(std::string name, ParamMap params) { + if(!fs::exists(fs::path(homeDir)/"cfg"/"jobs"/name)) + return nullptr; + + // attempt to create a workspace for this job if it doesn't exist + if(!fs::exists(fs::path(homeDir)/"run"/name/"workspace")) { + if(!fs::create_directories(fs::path(homeDir)/"run"/name/"workspace")) { + KJ_LOG(ERROR, "Could not create job workspace", name); + return nullptr; + } + } + + std::shared_ptr run = std::make_shared(); + run->name = name; + run->queuedAt = time(0); + for(auto it = params.begin(); it != params.end();) { + if(it->first[0] == '=') { + if(it->first == "=parentJob") { + run->parentName = it->second; + } else if(it->first == "=parentBuild") { + run->parentBuild = atoi(it->second.c_str()); + } else if(it->first == "=reason") { + run->reasonMsg = it->second; + } else { + KJ_LOG(ERROR, "Unknown internal job parameter", it->first); + } + it = params.erase(it); + } else + ++it; + } + run->params = params; + queuedJobs.push_back(run); + + // notify clients + Json j; + j.set("type", "job_queued") + .startObject("data") + .set("name", name) + .EndObject(); + const char* msg = j.str(); + for(LaminarClient* c : clients) { + if(c->scope.wantsStatus(name)) + c->sendMessage(msg); + } + + assignNewJobs(); + return run; +} + +kj::Promise Laminar::waitForRun(std::string name, int buildNum) { + auto it = activeJobs.get<1>().find(std::make_tuple(name, buildNum)); + if(it == activeJobs.get<1>().end()) + return RunState::UNKNOWN; + return waitForRun(it->get()); +} + +kj::Promise Laminar::waitForRun(const Run* run) { + waiters[run].emplace_back(Waiter{}); + return waiters[run].back().takePromise(); +} + +bool Laminar::stepRun(std::shared_ptr run) { + bool complete = run->step(); + if(!complete) { + srv->addProcess(run->fd, [=](char* b,size_t n){ + std::string s(b,n); + run->log += s; + for(LaminarClient* c : clients) { + if(c->scope.wantsLog(run->name, run->build)) + c->sendMessage(s); + } + }, [this,run](){ reapAdvance();}); + } + return complete; +} + +void Laminar::reapAdvance() { + int ret = 0; + // TODO: If we pass WNOHANG here for better asynchronicity, how do + // we re-schedule a poll to wait for finished child processes? + pid_t pid = waitpid(-1, &ret, 0); + // TODO: handle signalled child processes + if(pid > 0) { + KJ_LOG(INFO, "Reaping", pid); + auto it = activeJobs.get<0>().find(pid); + std::shared_ptr run = *it; + bool completed = true; + activeJobs.get<0>().modify(it, [&](std::shared_ptr run){ + close(run->fd); + run->reaped(ret); + completed = stepRun(run); + }); + if(completed) + run->complete(); + } + assignNewJobs(); +} + +void Laminar::assignNewJobs() { + auto it = queuedJobs.begin(); + while(it != queuedJobs.end()) { + bool assigned = false; + for(auto& sn : nodes) { + Node& node = sn.second; + std::shared_ptr run = *it; + if(node.queue(*run)) { + node.busyExecutors++; + run->node = &node; + run->startedAt = time(0); + run->build = ++buildNums[run->name]; + run->laminarHome = homeDir; + + fs::path wd = fs::path(homeDir)/"run"/run->name/std::to_string(run->build); + if(!fs::is_directory(wd) && !fs::create_directory(wd)) { + KJ_LOG(ERROR, "Could not create working directory", wd.string()); + break; + } + run->wd = wd.string(); + + // add scripts + fs::path cfgDir = fs::path(homeDir)/"cfg"; + // global before-run script + if(fs::exists(cfgDir/"before")) + run->addScript((cfgDir/"before").string()); + // per-node before-run script + if(fs::exists(cfgDir/"nodes"/node.name/"before")) + run->addScript((cfgDir/"before").string()); + // job before-run script + if(fs::exists(cfgDir/"jobs"/run->name/"before")) + run->addScript((cfgDir/"jobs"/run->name/"before").string()); + // main run script. must exist. + run->addScript((cfgDir/"jobs"/run->name/"run").string()); + // job after-run script + if(fs::exists(cfgDir/"jobs"/run->name/"after")) + run->addScript((cfgDir/"jobs"/run->name/"after").string()); + // per-node after-run script + if(fs::exists(cfgDir/"nodes"/node.name/"after")) + run->addScript((cfgDir/"nodes"/node.name/"after").string()); + // global after-run script + if(fs::exists(cfgDir/"after")) + run->addScript((cfgDir/"after").string()); + + KJ_LOG(INFO, "Queued job to node", run->name, run->build, node.name); + + // notify clients + Json j; + j.set("type", "job_started") + .startObject("data") + .set("queueIndex", std::distance(it,queuedJobs.begin())) + .set("name", run->name) + .set("started", run->startedAt) + .set("number", run->build) + .set("reason", run->reason()); + db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1") + .bind(run->name) + .fetch([&](int etc){ + j.set("etc", time(0) + etc); + }); + j.EndObject(); + const char* msg = j.str(); + for(LaminarClient* c : clients) { + if(c->scope.wantsStatus(run->name, run->build)) + c->sendMessage(msg); + } + + // setup run completion handler + run->notifyCompletion = [&](const Run* r) { + node.busyExecutors--; + KJ_LOG(INFO, "Run completed", r->name, to_string(r->result)); + time_t completedAt = time(0); + db->stmt("INSERT INTO builds VALUES(?,?,?,?,?,?,?,?,?,?,?)") + .bind(r->name, r->build, node.name, r->queuedAt, r->startedAt, completedAt, int(r->result), + r->log, r->parentName, r->parentBuild, r->reason()) + .exec(); + + // notify clients + Json j; + j.set("type", "job_completed") + .startObject("data") + .set("name", r->name) + .set("number", r->build) + .set("duration", completedAt - r->startedAt) + .set("started", r->startedAt) + .set("result", to_string(r->result)) + .EndObject(); + const char* msg = j.str(); + for(LaminarClient* c : clients) { + if(c->scope.wantsStatus(r->name, r->build)) + c->sendMessage(msg); + } + + // wake the waiters + for(Waiter& waiter : waiters[r]) + waiter.release(r->result); + waiters.erase(r); + + // will delete the job + activeJobs.get<2>().erase(r); + }; + + // trigger the first step of the run + if(stepRun(run)) { + // should never happen + KJ_LOG(INFO, "No steps for run"); + run->complete(); + } + + assigned = true; + break; + } + } + if(assigned) { + activeJobs.insert(*it); + it = queuedJobs.erase(it); + } else + ++it; + + } +} diff --git a/src/laminar.h b/src/laminar.h new file mode 100644 index 0000000..a0e175b --- /dev/null +++ b/src/laminar.h @@ -0,0 +1,90 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef _LAMINAR_LAMINAR_H_ +#define _LAMINAR_LAMINAR_H_ + +#include "interface.h" +#include "run.h" +#include "node.h" +#include "database.h" + +#include + +// Node name to node object map +typedef std::unordered_map NodeMap; + +struct Server; +struct _dictionary_; // from iniparser + +// The main class implementing the application's business logic. +// It owns a Server to manage the HTTP/websocket and Cap'n Proto RPC +// interfaces and communicates via the LaminarInterface methods and +// the LaminarClient objects (see interface.h) +class Laminar : public LaminarInterface { +public: + Laminar(const char* configFile); + ~Laminar(); + + // Runs the application forever + void run(); + // Call this in a signal handler to make run() return + void stop(); + + // Implementations of LaminarInterface + std::shared_ptr queueJob(std::string name, ParamMap params = ParamMap()) override; + kj::Promise waitForRun(std::string name, int buildNum) override; + kj::Promise waitForRun(const Run* run) override; + void registerClient(LaminarClient* client) override; + void deregisterClient(LaminarClient* client) override; + void sendStatus(LaminarClient* client) override; + bool setParam(std::string job, int buildNum, std::string param, std::string value) override; + +private: + bool loadConfiguration(); + void reapAdvance(); + void assignNewJobs(); + bool stepRun(std::shared_ptr run); + + std::list> queuedJobs; + + // Implements the waitForRun API. + // TODO: refactor + struct Waiter { + Waiter() : paf(kj::newPromiseAndFulfiller()) {} + void release(RunState state) { + paf.fulfiller->fulfill(RunState(state)); + } + kj::Promise takePromise() { return std::move(paf.promise); } + private: + kj::PromiseFulfillerPair paf; + }; + std::unordered_map> waiters; + + std::unordered_map buildNums; + + RunSet activeJobs; + Database* db; + Server* srv; + _dictionary_* conf; + NodeMap nodes; + std::string homeDir; + std::set clients; +}; + +#endif // _LAMINAR_LAMINAR_H_ diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000..0d0e427 --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,53 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "laminar.h" + +#include +#include + +std::function sigHandler; +static void __sigHandler(int) { sigHandler(); } + +int main(int argc, char** argv) { + for(int i = 1; i < argc; ++i) { + if(strcmp(argv[i], "-v") == 0) { + kj::_::Debug::setLogLevel(kj::_::Debug::Severity::INFO); + } + } + + const char* configFile = getenv("LAMINAR_CONF_FILE"); + if(!configFile || !*configFile) + configFile = "/etc/laminar.conf"; + + do { + Laminar laminar(configFile); + sigHandler = [&](){ + KJ_LOG(INFO, "Received SIGINT"); + laminar.stop(); + }; + signal(SIGINT, &__sigHandler); + signal(SIGTERM, &__sigHandler); + + laminar.run(); + } while(false); + + KJ_DBG("end of main"); + return 0; +} + diff --git a/src/node.cpp b/src/node.cpp new file mode 100644 index 0000000..4b9e1fa --- /dev/null +++ b/src/node.cpp @@ -0,0 +1,25 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "node.h" + +bool Node::queue(const Run& run) { + // later this could check if the given run is allowed to run + // on this node, for example if the run's tags match the node's tags + return busyExecutors < numExecutors; +} diff --git a/src/node.h b/src/node.h new file mode 100644 index 0000000..a78ffc7 --- /dev/null +++ b/src/node.h @@ -0,0 +1,41 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef NODE_H +#define NODE_H + +#include + +class Run; + +// Represents a group of executors. Currently almost unnecessary POD +// abstraction, but may be enhanced in the future to support e.g. tags +class Node { +public: + Node() {} + + std::string name; + int numExecutors; + int busyExecutors = 0; + + // Attempts to queue the given run to this node. Returns true if succeeded. + bool queue(const Run& run); +}; + + +#endif // NODE_H diff --git a/src/resources.cpp b/src/resources.cpp new file mode 100644 index 0000000..54d91b2 --- /dev/null +++ b/src/resources.cpp @@ -0,0 +1,67 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminaris distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "resources.h" +#include + +#define INIT_RESOURCE(route, name) \ + extern const char _binary_##name##_z_start[];\ + extern const char _binary_##name##_z_end[]; \ + resources[route] = std::make_pair(_binary_ ## name ## _z_start, _binary_ ## name ## _z_end) + +Resources::Resources() +{ + // TODO: Content-type + INIT_RESOURCE("/", index_html); + INIT_RESOURCE("/js/app.js", js_app_js); + INIT_RESOURCE("/js/Chart.HorizontalBar.js", js_Chart_HorizontalBar_js); + INIT_RESOURCE("/js/ansi_up.js", js_ansi_up_js); + INIT_RESOURCE("/tpl/home.html", tpl_home_html); + INIT_RESOURCE("/tpl/job.html", tpl_job_html); + INIT_RESOURCE("/tpl/run.html", tpl_run_html); + INIT_RESOURCE("/tpl/log.html", tpl_log_html); + INIT_RESOURCE("/tpl/browse.html", tpl_browse_html); + INIT_RESOURCE("/js/angular.min.js", js_angular_min_js); + INIT_RESOURCE("/js/angular-route.min.js", js_angular_route_min_js); + INIT_RESOURCE("/js/angular-sanitize.min.js", js_angular_sanitize_min_js); + INIT_RESOURCE("/js/ansi_up.js", js_ansi_up_js); + INIT_RESOURCE("/js/Chart.min.js", js_Chart_min_js); + INIT_RESOURCE("/js/Chart.HorizontalBar.js", js_Chart_HorizontalBar_js); + INIT_RESOURCE("/css/bootstrap.min.css", css_bootstrap_min_css); +} + +inline bool beginsWith(std::string haystack, const char* needle) { + return strncmp(haystack.c_str(), needle, strlen(needle)) == 0; +} + +bool Resources::handleRequest(std::string path, const char **start, const char **end) { + // need to keep the list of "application links" synchronised with the angular + // application. We cannot return a 404 for any of these + auto it = beginsWith(path,"/jobs") + ? resources.find("/") + : resources.find(path); + + if(it != resources.end()) { + *start = it->second.first; + *end = it->second.second; + return true; + } + + return false; +} + diff --git a/src/resources.h b/src/resources.h new file mode 100644 index 0000000..bee58a6 --- /dev/null +++ b/src/resources.h @@ -0,0 +1,41 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminaris distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef _LAMINAR_RESOURCES_H_ +#define _LAMINAR_RESOURCES_H_ + +#include +#include +#include + +// Simple class to abstract away the mapping of HTTP requests for +// resources to their data. +class Resources { +public: + Resources(); + + // If a resource is known for the given path, set start and end to the + // binary data to send to the client. Function returns false if no resource + // for the given path is known (404) + bool handleRequest(std::string path, const char** start, const char** end); + +private: + std::unordered_map> resources; +}; + +#endif // _LAMINAR_RESOURCES_H_ diff --git a/src/resources/index.html b/src/resources/index.html new file mode 100644 index 0000000..f92f161 --- /dev/null +++ b/src/resources/index.html @@ -0,0 +1,45 @@ + + + + + + + + Laminar + + + + + + + + + + + + +
+ + + diff --git a/src/resources/js/app.js b/src/resources/js/app.js new file mode 100644 index 0000000..841eef4 --- /dev/null +++ b/src/resources/js/app.js @@ -0,0 +1,250 @@ +Laminar = { + runIcon: function(result) { + return result === "success" ? '' : ''; + }, + jobFormatter: function(o) { + o.duration = o.duration + "s" + o.when = (new Date(1000 * o.started)).toLocaleString(); + return o; + } +}; +angular.module('laminar',['ngRoute','ngSanitize']) +.config(function($routeProvider, $locationProvider, $sceProvider) { + $routeProvider + .when('/', { + templateUrl: 'tpl/home.html', + controller: 'mainController' + }) + .when('/jobs', { + templateUrl: 'tpl/browse.html', + controller: 'BrowseController', + }) + .when('/jobs/:name', { + templateUrl: 'tpl/job.html', + controller: 'JobController' + }) + .when('/jobs/:name/:num', { + templateUrl: 'tpl/run.html', + controller: 'RunController' + }) + .when('/jobs/:name/:num/log', { + templateUrl: 'tpl/log.html', + controller: 'LogController' + }) + $locationProvider.html5Mode(true); + $sceProvider.enabled(false); +}) +.factory('$ws',function($q,$location){ + return { + statusListener: function(callbacks) { + var ws = new WebSocket("ws://" + location.host + $location.path()); + ws.onmessage = function(message) { + message = JSON.parse(message.data); + callbacks[message.type](message.data); + }; + }, + logListener: function(callback) { + var ws = new WebSocket("ws://" + location.host + $location.path()); + ws.onmessage = function(message) { + callback(message.data); + }; + } + }; +}) +.controller('mainController', function($scope, $ws, $interval){ + $scope.jobsQueued = []; + $scope.jobsRunning = []; + $scope.jobsRecent = []; + + var chtUtilization, chtBuildsPerDay, chtBuildsPerJob; + + var updateUtilization = function(busy) { + chtUtilization.segments[0].value += busy ? 1 : -1; + chtUtilization.segments[1].value -= busy ? 1 : -1; + chtUtilization.update(); + } + + $ws.statusListener({ + status: function(data) { + // populate jobs + $scope.jobsQueued = data.queued; + $scope.jobsRunning = data.running; + $scope.jobsRecent = data.recent.map(Laminar.jobFormatter); + $scope.$apply(); + + // setup charts + chtUtilization = new Chart(document.getElementById("chartUtil").getContext("2d")).Pie( + [{value: data.executorsBusy, color:"sandybrown", label: "Busy"}, + {value: data.executorsTotal, color: "steelblue", label: "Idle"}], + {animationEasing: 'easeInOutQuad'} + ); + chtBuildsPerDay = new Chart(document.getElementById("chartBpd").getContext("2d")).Line({ + labels: function(){ + res = []; + var now = new Date(); + for(var i = 6; i >= 0; --i) { + var then = new Date(now.getTime() - i*86400000); + res.push(["Sun","Mon","Tue","Wed","Thu","Fri","Sat"][then.getDay()]); + } + return res; + }(), + datasets: [{ + label: "Successful Builds", + fillColor: "darkseagreen", + strokeColor: "forestgreen", + data: data.buildsPerDay.map(function(e){return e.success||0;}) + },{ + label: "Failed Bulids", + fillColor: "darksalmon", + strokeColor: "crimson", + data: data.buildsPerDay.map(function(e){return e.failed||0;}) + }]}, + { showTooltips: false } + ); + chtBuildsPerJob = new Chart(document.getElementById("chartBpj").getContext("2d")).HorizontalBar({ + labels: Object.keys(data.buildsPerJob), + datasets: [{ + fillColor: "steelblue", + data: Object.keys(data.buildsPerJob).map(function(e){return data.buildsPerJob[e];}) + }] + },{}); + }, + job_queued: function(data) { + $scope.jobsQueued.splice(0,0,data); + $scope.$apply(); + }, + job_started: function(data) { + $scope.jobsQueued.splice($scope.jobsQueued.length - data.queueIndex - 1,1); + $scope.jobsRunning.splice(0,0,data); + $scope.$apply(); + updateUtilization(true); + }, + job_completed: function(data) { + if(data.result === "success") + chtBuildsPerDay.datasets[0].points[6].value++; + else + chtBuildsPerDay.datasets[1].points[6].value++; + chtBuildsPerDay.update(); + + for(var i = 0; i < $scope.jobsRunning.length; ++i) { + var job = $scope.jobsRunning[i]; + if(job.name == data.name && job.number == data.number) { + $scope.jobsRunning.splice(i,1); + $scope.jobsRecent.splice(0,0,Laminar.jobFormatter(data)); + $scope.$apply(); + + break; + } + } + updateUtilization(false); + for(var j = 0; j < chtBuildsPerJob.datasets[0].bars.length; ++j) { + if(chtBuildsPerJob.datasets[0].bars[j].label == job.name) { + chtBuildsPerJob.datasets[0].bars[j].value++; + chtBuildsPerJob.update(); + break; + } + } + } + }); + $scope.active = function(url) { + return false; + } + $scope.runIcon = Laminar.runIcon; + timeUpdater = $interval(function() { + $scope.jobsRunning.forEach(function(o){ + if(o.etc) { + var d = new Date(); + var p = (d.getTime()/1000 - o.started) / (o.etc - o.started); + if(p > 1.2) { + o.overtime = true; + } else if(p >= 1) { + o.progress = 99; + } else { + o.progress = 100 * p; + } + } + }); + }, 1000); + $scope.$on('$destroy', function() { + $interval.cancel(timeUpdater); + }); +}) +.controller('BrowseController', function($scope, $ws, $interval){ + $scope.jobs = []; + $ws.statusListener({ + status: function(data) { + $scope.jobs = data.jobs; + $scope.$apply(); + }, + }); +}) +.controller('JobController', function($scope, $routeParams, $ws) { + $scope.name = $routeParams.name; + $scope.jobsQueued = []; + $scope.jobsRunning = []; + $scope.jobsRecent = []; + $ws.statusListener({ + status: function(data) { + $scope.jobsQueued = data.queued.filter(function(e){return e.name == $routeParams.name;}); + $scope.jobsRunning = data.running.filter(function(e){return e.name == $routeParams.name;}); + $scope.jobsRecent = data.recent.filter(function(e){return e.name == $routeParams.name;}); + $scope.$apply(); + }, + job_queued: function(data) { + if(data.name == $routeParams.name) { + $scope.jobsQueued.splice(0,0,data); + $scope.$apply(); + } + }, + job_started: function(data) { + if(data.name == $routeParams.name) { + $scope.jobsQueued.splice($scope.jobsQueued.length - 1,1); + $scope.jobsRunning.splice(0,0,data); + $scope.$apply(); + } + }, + job_completed: function(data) { + for(var i = 0; i < $scope.jobsRunning.length; ++i) { + var job = $scope.jobsRunning[i]; + if(job.name == data.name && job.number == data.number) { + $scope.jobsRunning.splice(i,1); + $scope.jobsRecent.splice(0,0,data); + $scope.$apply(); + break; + } + } + } + }); + $scope.runIcon = Laminar.runIcon; +}) +.controller('RunController', function($scope, $routeParams, $ws) { + $scope.name = $routeParams.name; + $scope.num = $routeParams.num; + $ws.statusListener({ + status: function(data) { + $scope.job = Laminar.jobFormatter(data); + $scope.$apply(); + }, + job_completed: function(data) { + $scope.job = Laminar.jobFormatter(data); + $scope.$apply(); + } + }); + $scope.runIcon = Laminar.runIcon; +}) +.controller('LogController', function($scope, $routeParams, $ws) { + $scope.name = $routeParams.name; + $scope.num = $routeParams.num; + $scope._log = "" + $ws.logListener(function(data) { + $scope._log += ansi_up.ansi_to_html(data); + $scope.$apply(); + window.scrollTo(0, document.body.scrollHeight); + }); + $scope.log = function() { + // TODO sanitize + return ansi_up.ansi_to_html($scope._log); + } + +}) +.run(function() {}); diff --git a/src/resources/tpl/browse.html b/src/resources/tpl/browse.html new file mode 100644 index 0000000..381436a --- /dev/null +++ b/src/resources/tpl/browse.html @@ -0,0 +1,16 @@ +
+
+
+

Browse jobs

+
+ + +
+ + + + +
{{job.name}}
+
+
+
\ No newline at end of file diff --git a/src/resources/tpl/home.html b/src/resources/tpl/home.html new file mode 100644 index 0000000..644db97 --- /dev/null +++ b/src/resources/tpl/home.html @@ -0,0 +1,58 @@ +
+
+
+

Recent Builds

+ + + + + + + + + + +
{{job.name}} queued
{{job.name}} #{{job.number}}
+
+
+
{{job.name}} #{{job.number}}
Took {{job.duration}} at {{job.when}}
+
+
+

Dashboard

+
+
+
+
Builds per day
+
+ +
+
+
+
+
+
Builds per job in the last 24 hours
+
+ +
+
+
+
+
+
Current executor utilization
+
+ +
+
+
+
+
+
what to put here?
+
+
+
+
+
+
+
+
+ diff --git a/src/resources/tpl/job.html b/src/resources/tpl/job.html new file mode 100644 index 0000000..b129848 --- /dev/null +++ b/src/resources/tpl/job.html @@ -0,0 +1,22 @@ +
+
+
+

{{name}}

+ + + + + + + + + + +
queued
#{{job.number}} progressbar?
#{{job.number}}
+
+
+ +
+
+
+ diff --git a/src/resources/tpl/log.html b/src/resources/tpl/log.html new file mode 100644 index 0000000..0a0b67c --- /dev/null +++ b/src/resources/tpl/log.html @@ -0,0 +1,8 @@ +
+
+
+

Log output for {{name}} #{{num}}

+

+
+
+
diff --git a/src/resources/tpl/run.html b/src/resources/tpl/run.html new file mode 100644 index 0000000..f876f04 --- /dev/null +++ b/src/resources/tpl/run.html @@ -0,0 +1,13 @@ +
+
+
+
+

{{name}} #{{num}}

+
< Job
Log output
+
 
+
Reason
{{job.reason}}
+
Started
{{job.when}}
+
+
+
+
diff --git a/src/run.cpp b/src/run.cpp new file mode 100644 index 0000000..ec426d4 --- /dev/null +++ b/src/run.cpp @@ -0,0 +1,116 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminaris distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "run.h" + +#include +#include + +#include + +#include +namespace fs = boost::filesystem; + + +std::string to_string(const RunState& rs) { + switch(rs) { + case RunState::PENDING: return "pending"; + case RunState::ABORTED: return "aborted"; + case RunState::FAILED: return "failed"; + case RunState::SUCCESS: return "success"; + case RunState::UNKNOWN: + default: + return "unknown"; + } +} + + +Run::Run() { + result = RunState::SUCCESS; + +} + +Run::~Run() { + KJ_DBG("Run destroyed"); + //delete log; +} +std::string Run::reason() const { + if(!parentName.empty()) { + return std::string("Triggered by upstream ") + parentName + " #" + std::to_string(parentBuild); + } + return reasonMsg; +} + +bool Run::step() { + if(!currentScript.empty() && procStatus != 0) + result = RunState::FAILED; + + if(scripts.size()) { + currentScript = scripts.front(); + scripts.pop(); + + int pfd[2]; + pipe(pfd); + pid_t pid = fork(); + if(pid == 0) { + close(pfd[0]); + dup2(pfd[1], 1); + dup2(pfd[1], 2); + close(pfd[1]); + std::string buildNum = std::to_string(build); + + std::string PATH = (fs::path(laminarHome)/"cfg"/"scripts").string() + ":"; + if(const char* p = getenv("PATH")) { + PATH.append(p); + } + + chdir(wd.c_str()); + + setenv("PATH", PATH.c_str(), true); + setenv("lBuildNum", buildNum.c_str(), true); + setenv("lJobName", name.c_str(), true); + setenv("lWorkspace", (fs::path(laminarHome)/"run"/name/"workspace").string().c_str(), true); + for(auto& pair : params) { + setenv(pair.first.c_str(), pair.second.c_str(), false); + } + printf("[laminar] Executing %s\n", currentScript.c_str()); + execl(currentScript.c_str(), currentScript.c_str(), NULL); + KJ_LOG(FATAL, "execl returned", strerror(errno)); + _exit(1); + } + + KJ_LOG(INFO, "Forked", currentScript, pid); + close(pfd[1]); + fd = pfd[0]; + this->pid = pid; + + return false; + } else { + return true; + } +} +void Run::addScript(std::string script) { + scripts.push(script); +} +void Run::reaped(int status) { + procStatus = status; +} + +void Run::complete() { + notifyCompletion(this); +} diff --git a/src/run.h b/src/run.h new file mode 100644 index 0000000..7ba48db --- /dev/null +++ b/src/run.h @@ -0,0 +1,134 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminaris distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef _LAMINAR_RUN_H_ +#define _LAMINAR_RUN_H_ + +#include +#include +#include +#include +#include + +enum class RunState { + UNKNOWN, + PENDING, + ABORTED, + FAILED, + SUCCESS +}; + +std::string to_string(const RunState& rs); + +class Node; + +// Represents an execution of a job. Not much more than POD +class Run { +public: + Run(); + ~Run(); + + // copying this class would be asking for trouble... + Run(const Run&) = delete; + Run& operator=(const Run&) = delete; + + // executes the next script (if any), returning true if there is nothing + // more to be done - in this case the caller should call complete() + bool step(); + + // call this when all scripts are done to get the notifyCompletion callback + void complete(); + + // adds a script to the queue of scripts to be executed by this run + void addScript(std::string script); + + // called when a process owned by this run has been reaped. The status + // may be used to set the run's job status + void reaped(int status); + + std::string reason() const; + + std::function notifyCompletion; + Node* node; + RunState result; + std::string laminarHome; + std::string name; + std::string wd; + std::string parentName; + int parentBuild = 0; + std::string reasonMsg; + int build = 0; + std::string log; + pid_t pid; + int fd; + int procStatus; + std::unordered_map params; + + time_t queuedAt; + time_t startedAt; +private: + std::queue scripts; + std::string currentScript; +}; + + +// All this below is a somewhat overengineered method of keeping track of +// currently executing builds (Run objects). This would probably scale +// very well, but it's completely gratuitous since we are not likely to +// be executing thousands of builds at the same time +#include +#include +#include +#include +#include +#include +#include + +namespace bmi = boost::multi_index; + +struct _same { + typedef const Run* result_type; + const Run* operator()(const std::shared_ptr& run) const { + return run.get(); + } +}; + +struct RunSet: public boost::multi_index_container< + std::shared_ptr, + // A single Run can be fetched by... + bmi::indexed_by< + // their current running pid + bmi::hashed_unique>, + bmi::hashed_unique, + // a combination of their job name and build number + bmi::member, + bmi::member + >>, + // or a pointer to a Run object. + bmi::hashed_unique<_same>, + // A group of Runs can be fetched by the time they started + bmi::ordered_non_unique>, + // or by their job name + bmi::ordered_non_unique> + > +> { + // TODO: getters for each index +}; + +#endif // _LAMINAR_RUN_H_ diff --git a/src/server.cpp b/src/server.cpp new file mode 100644 index 0000000..7fe5f76 --- /dev/null +++ b/src/server.cpp @@ -0,0 +1,414 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminar is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#include "server.h" +#include "interface.h" +#include "laminar.capnp.h" +#include "resources.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +// Configuration struct for the websocketpp template library. +struct wsconfig : public websocketpp::config::core { +// static const websocketpp::log::level elog_level = +// websocketpp::log::elevel::info; + +// static const websocketpp::log::level alog_level = +// websocketpp::log::alevel::access_core | +// websocketpp::log::alevel::message_payload ; + + static const websocketpp::log::level elog_level = + websocketpp::log::elevel::none; + + static const websocketpp::log::level alog_level = + websocketpp::log::alevel::none; + + typedef struct { LaminarClient* lc; } connection_base; +}; +typedef websocketpp::server websocket; + +namespace { + +// Used for returning run state to RPC clients +LaminarCi::JobResult fromRunState(RunState state) { + switch(state) { + case RunState::SUCCESS: return LaminarCi::JobResult::SUCCESS; + default: + KJ_DBG("TODO log state", to_string(state)); + return LaminarCi::JobResult::UNKNOWN; + } +} + +} + +// This is the implementation of the Laminar Cap'n Proto RPC interface. +// As such, it implements the pure virtual interface generated from +// laminar.capnp with calls to the LaminarInterface +class RpcImpl : public LaminarCi::Server { +public: + RpcImpl(LaminarInterface& l) : + LaminarCi::Server(), + laminar(l) + { + } + + // Start a job, without waiting for it to finish + kj::Promise trigger(TriggerContext context) override { + std::string jobName = context.getParams().getJobName(); + KJ_LOG(INFO, "RPC trigger", jobName); + ParamMap params; + for(auto p : context.getParams().getParams()) { + params[p.getName().cStr()] = p.getValue().cStr(); + } + LaminarCi::MethodResult result = laminar.queueJob(jobName, params) + ? LaminarCi::MethodResult::SUCCESS + : LaminarCi::MethodResult::FAILED; + context.getResults().setResult(result); + return kj::READY_NOW; + } + + // Start a job and wait for the result + kj::Promise start(StartContext context) override { + std::string jobName = context.getParams().getJobName(); + KJ_LOG(INFO, "RPC start", jobName); + ParamMap params; + for(auto p : context.getParams().getParams()) { + params[p.getName().cStr()] = p.getValue().cStr(); + } + std::shared_ptr run = laminar.queueJob(jobName, params); + if(run.get()) { + return laminar.waitForRun(run.get()).then([context](RunState state) mutable { + context.getResults().setResult(fromRunState(state)); + }); + } else { + context.getResults().setResult(LaminarCi::JobResult::UNKNOWN); + return kj::READY_NOW; + } + } + + // Wait for an already-running job to complete, returning the result + kj::Promise pend(PendContext context) override { + std::string jobName = context.getParams().getJobName(); + int buildNum = context.getParams().getBuildNum(); + KJ_LOG(INFO, "RPC pend", jobName, buildNum); + + kj::Promise promise = laminar.waitForRun(jobName, buildNum); + + return promise.then([context](RunState state) mutable { + context.getResults().setResult(fromRunState(state)); + }); + } + + // Set a parameter on a running build + kj::Promise set(SetContext context) override { + std::string jobName = context.getParams().getJobName(); + int buildNum = context.getParams().getBuildNum(); + KJ_LOG(INFO, "RPC set", jobName, buildNum); + + LaminarCi::MethodResult result = laminar.setParam(jobName, buildNum, + context.getParams().getParam().getName(), context.getParams().getParam().getValue()) + ? LaminarCi::MethodResult::SUCCESS + : LaminarCi::MethodResult::FAILED; + context.getResults().setResult(result); + return kj::READY_NOW; + } + +private: + LaminarInterface& laminar; + kj::LowLevelAsyncIoProvider* asyncio; +}; + + +// This is the implementation of the HTTP/Websocket interface. It exposes +// websocket connections as LaminarClients and registers them with the +// LaminarInterface so that status messages will be delivered to the client. +// On opening a websocket connection, it delivers a status snapshot message +// (see LaminarInterface::sendStatus) +class Server::HttpImpl { +public: + HttpImpl(LaminarInterface& l) : + laminar(l) + { + // debug logging + // wss.set_access_channels(websocketpp::log::alevel::all); + // wss.set_error_channels(websocketpp::log::elevel::all); + + // TODO: This could be used in the future to trigger actions on the + // server in response to a web client request. Currently not supported. + // wss.set_message_handler([](std::weak_ptr s, websocket::message_ptr msg){ + // msg->get_payload(); + // }); + + // Handle plain HTTP requests by delivering the binary resource + wss.set_http_handler([this](websocketpp::connection_hdl hdl){ + websocket::connection_ptr c = wss.get_con_from_hdl(hdl); + const char* start, *end; + if(resources.handleRequest(c->get_resource(), &start, &end)) { + c->set_status(websocketpp::http::status_code::ok); + c->append_header("Content-Encoding", "gzip"); + c->set_body(std::string(start, end)); + } else { + // 404 + c->set_status(websocketpp::http::status_code::not_found); + } + }); + + // Handle new websocket connection. Parse the URL to determine + // the client's scope of interest, register the client for update + // messages, and call sendStatus. + wss.set_open_handler([this](websocketpp::connection_hdl hdl){ + websocket::connection_ptr c = wss.get_con_from_hdl(hdl); + std::string res = c->get_resource(); + if(res.substr(0, 5) == "/jobs") { + if(res.length() == 5) { + c->lc->scope.type = MonitorScope::ALL; + } else { + res = res.substr(5); + int split = res.find('/',1); + std::string job = res.substr(1,split-1); + if(!job.empty()) { + c->lc->scope.job = job; + c->lc->scope.type = MonitorScope::JOB; + } + if(split != std::string::npos) { + int split2 = res.find('/', split+1); + std::string run = res.substr(split+1, split2-split); + if(!run.empty()) { + c->lc->scope.num = atoi(run.c_str()); + c->lc->scope.type = MonitorScope::RUN; + } + if(split2 != std::string::npos && res.compare(split2, 4, "/log") == 0) { + c->lc->scope.type = MonitorScope::LOG; + } + } + } + } + laminar.registerClient(c->lc); + laminar.sendStatus(c->lc); + }); + + wss.set_close_handler([this](websocketpp::connection_hdl hdl){ + laminar.deregisterClient(wss.get_con_from_hdl(hdl)->lc); + }); + } + + // Return a new connection object linked with the context defined below. + // This is a bit untidy, it would be better to make them a single object, + // but I didn't yet figure it out + websocket::connection_ptr newConnection(LaminarClient* lc) { + websocket::connection_ptr c = wss.get_connection(); + c->lc = lc; + return c; + } + +private: + Resources resources; + LaminarInterface& laminar; + websocket wss; +}; + +// Context for an RPC connection +struct RpcConnection { + RpcConnection(kj::Own&& stream, + capnp::Capability::Client bootstrap, + capnp::ReaderOptions readerOpts) : + stream(kj::mv(stream)), + network(*this->stream, capnp::rpc::twoparty::Side::SERVER, readerOpts), + rpcSystem(capnp::makeRpcServer(network, bootstrap)) + { + } + kj::Own stream; + capnp::TwoPartyVatNetwork network; + capnp::RpcSystem rpcSystem; +}; + +// Context for a WebsocketConnection (implements LaminarClient) +// This object is a streambuf and reimplements xsputn so that it can follow any +// write the websocketpp library makes to it with a write to the appropriate +// descriptor in the kj-async context. +struct Server::WebsocketConnection : public LaminarClient, public std::streambuf { + WebsocketConnection(kj::Own&& stream, Server::HttpImpl& http) : + stream(kj::mv(stream)), + out(this), + cn(http.newConnection(this)), + writePaf(kj::newPromiseAndFulfiller()) + { + cn->register_ostream(&out); + cn->start(); + } + + ~WebsocketConnection() noexcept(true) { + outputBuffer.clear(); + writePaf.fulfiller->fulfill(); + } + + kj::Promise pend() { + return stream->tryRead(ibuf, 1, 1024).then([this](size_t sz){ + cn->read_all(ibuf, sz); + if(sz == 0 || cn->get_state() == websocketpp::session::state::closed) { + cn->eof(); + return kj::Promise(kj::READY_NOW); + } + return pend(); + }); + } + + kj::Promise writeTask() { + return writePaf.promise.then([this]() { + std::string payload; + // clear the outputBuffer for more context, and take a chunk + // to send now + payload.swap(outputBuffer); + writePaf = kj::newPromiseAndFulfiller(); + if(payload.empty()) { + return kj::Promise(kj::READY_NOW); + } else { + return stream->write(payload.data(), payload.length()).then([this]{ + return writeTask(); + }); + } + }); + } + + void sendMessage(std::string payload) override { + cn->send(payload, websocketpp::frame::opcode::text); + } + + std::streamsize xsputn(const char* s, std::streamsize sz) override { + outputBuffer.append(std::string(s, sz)); + writePaf.fulfiller->fulfill(); + return sz; + } + + kj::Own stream; + std::ostream out; + websocket::connection_ptr cn; + std::string outputBuffer; + kj::PromiseFulfillerPair writePaf; + // TODO: think about this size + char ibuf[1024]; +}; + +Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, + kj::StringPtr httpBindAddress) : + rpcInterface(kj::heap(li)), + httpInterface(new HttpImpl(li)), + ioContext(kj::setupAsyncIo()), + tasks(*this) +{ + + { // RPC task + auto paf = kj::newPromiseAndFulfiller(); + tasks.add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress, 0) + .then(kj::mvCapture(paf.fulfiller, + [this](kj::Own>&& portFulfiller, + kj::Own&& addr) { + auto listener = addr->listen(); + portFulfiller->fulfill(listener->getPort()); + acceptRpcClient(kj::mv(listener)); + }))); + } + + { // HTTP task + auto paf = kj::newPromiseAndFulfiller(); + tasks.add(ioContext.provider->getNetwork().parseAddress(httpBindAddress, 0) + .then(kj::mvCapture(paf.fulfiller, + [this](kj::Own>&& portFulfiller, + kj::Own&& addr) { + auto listener = addr->listen(); + portFulfiller->fulfill(listener->getPort()); + acceptHttpClient(kj::mv(listener)); + }))); + } +} + +Server::~Server() { + // RpcImpl is deleted through Capability::Client. + // Deal with the HTTP interface the old-fashioned way + delete httpInterface; +} + +void Server::start() { + // this eventfd is just to allow us to quit the server at some point + // in the future by adding this event to the async loop. I couldn't see + // a simpler way... + efd = eventfd(0,0); + kj::Promise quit = kj::evalLater([this](){ + static uint64_t _; + auto wakeEvent = ioContext.lowLevelProvider->wrapInputFd(efd); + return wakeEvent->read(&_, sizeof(uint64_t)).attach(std::move(wakeEvent)); + }); + quit.wait(ioContext.waitScope); +} + +void Server::stop() { + eventfd_write(efd, 1); +} + +void Server::addProcess(int fd, std::function readCb, std::function cb) { + auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd); + tasks.add(handleProcessOutput(event,readCb).attach(std::move(event)).then(std::move(cb))); +} + +void Server::acceptHttpClient(kj::Own&& listener) { + auto ptr = listener.get(); + tasks.add(ptr->accept().then(kj::mvCapture(kj::mv(listener), + [this](kj::Own&& listener, + kj::Own&& connection) { + acceptHttpClient(kj::mv(listener)); + auto conn = kj::heap(kj::mv(connection), *httpInterface); + return conn->pend().exclusiveJoin(conn->writeTask()).attach(std::move(conn)); + })) + ); +} + +void Server::acceptRpcClient(kj::Own&& listener) { + auto ptr = listener.get(); + tasks.add(ptr->accept().then(kj::mvCapture(kj::mv(listener), + [this](kj::Own&& listener, + kj::Own&& connection) { + acceptRpcClient(kj::mv(listener)); + auto server = kj::heap(kj::mv(connection), rpcInterface, capnp::ReaderOptions()); + tasks.add(server->network.onDisconnect().attach(kj::mv(server))); + })) + ); +} + +// handles stdout/stderr from a child process by sending it to the provided +// callback function +kj::Promise Server::handleProcessOutput(kj::AsyncInputStream* stream, std::function readCb) { + // TODO think about this size + static char* buffer = new char[1024]; + return stream->tryRead(buffer, 1, 1024).then([this,stream,readCb](size_t sz) { + readCb(buffer, sz); + if(sz > 0) { + return handleProcessOutput(stream, readCb); + } + return kj::Promise(kj::READY_NOW); + }); +} diff --git a/src/server.h b/src/server.h new file mode 100644 index 0000000..1abfe56 --- /dev/null +++ b/src/server.h @@ -0,0 +1,62 @@ +/// +/// Copyright 2015 Oliver Giles +/// +/// This file is part of Laminar +/// +/// Laminar is free software: you can redistribute it and/or modify +/// it under the terms of the GNU General Public License as published by +/// the Free Software Foundation, either version 3 of the License, or +/// (at your option) any later version. +/// +/// Laminaris distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +/// GNU General Public License for more details. +/// +/// You should have received a copy of the GNU General Public License +/// along with Laminar. If not, see +/// +#ifndef _LAMINAR_SERVER_H_ +#define _LAMINAR_SERVER_H_ + +#include +#include +#include +#include + +struct LaminarInterface; + +// This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces. +// It also manages the program's asynchronous event loop +class Server final : public kj::TaskSet::ErrorHandler { +public: + // Initializes the server with a LaminarInterface to handle requests from + // HTTP/Websocket or RPC clients and bind addresses for each of those + // interfaces. See the documentation for kj::AsyncIoProvider::getNetwork + // for a description of the address format + Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, kj::StringPtr httpBindAddress); + ~Server(); + void start(); + void stop(); + void addProcess(int fd, std::function readCb, std::function cb); + +private: + void acceptHttpClient(kj::Own&& listener); + void acceptRpcClient(kj::Own&& listener); + kj::Promise handleProcessOutput(kj::AsyncInputStream* stream, std::function readCb); + + void taskFailed(kj::Exception&& exception) override { + kj::throwFatalException(kj::mv(exception)); + } + +private: + int efd; + capnp::Capability::Client rpcInterface; + struct WebsocketConnection; + struct HttpImpl; + HttpImpl* httpInterface; + kj::AsyncIoContext ioContext; + kj::TaskSet tasks; +}; + +#endif // _LAMINAR_SERVER_H_