1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2024-10-27 20:34:20 +00:00

Initial commit

This commit is contained in:
Oliver Giles 2015-09-13 22:25:26 +02:00
commit a2701dcfd9
25 changed files with 2610 additions and 0 deletions

92
CMakeLists.txt Normal file
View File

@ -0,0 +1,92 @@
###
### Copyright 2015 Oliver Giles
###
### This file is part of Laminar
###
### Laminar is free software: you can redistribute it and/or modify
### it under the terms of the GNU General Public License as published by
### the Free Software Foundation, either version 3 of the License, or
### (at your option) any later version.
###
### Laminar is distributed in the hope that it will be useful,
### but WITHOUT ANY WARRANTY; without even the implied warranty of
### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
### GNU General Public License for more details.
###
### You should have received a copy of the GNU General Public License
### along with Laminar. If not, see <http://www.gnu.org/licenses/>
###
project(laminar)
cmake_minimum_required(VERSION 2.8)
set(CMAKE_INCLUDE_CURRENT_DIR ON)
add_definitions("-std=c++11 -Wall -Wextra -Wno-unused-parameter -Wno-sign-compare")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Werror -DDEBUG")
# This macro takes a list of files, gzips them and converts the output into
# object files so they can be linked directly into the application.
# ld generates symbols based on the string argument given to its executable,
# so it is significant from which directory it is called. BASEDIR will be
# removed from the beginning of paths to the remaining arguments
macro(generate_compressed_bins BASEDIR)
foreach(FILE ${ARGN})
set(COMPRESSED_FILE "${FILE}.z")
set(OUTPUT_FILE "${FILE}.o")
get_filename_component(DIR ${FILE} DIRECTORY)
if(DIR)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${DIR})
endif()
add_custom_command(OUTPUT ${COMPRESSED_FILE}
COMMAND gzip < ${BASEDIR}/${FILE} > ${COMPRESSED_FILE}
DEPENDS ${BASEDIR}/${FILE}
)
add_custom_command(OUTPUT ${OUTPUT_FILE}
COMMAND ld -r -b binary -o ${OUTPUT_FILE} ${COMPRESSED_FILE}
DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/${COMPRESSED_FILE}
)
list(APPEND COMPRESSED_BINS ${OUTPUT_FILE})
endforeach()
endmacro()
# Generates Cap'n Proto interface from definition file
add_custom_command(OUTPUT laminar.capnp.c++ laminar.capnp.h
COMMAND capnp compile -oc++:${CMAKE_BINARY_DIR}
--src-prefix=${CMAKE_SOURCE_DIR}/src ${CMAKE_SOURCE_DIR}/src/laminar.capnp
DEPENDS src/laminar.capnp)
# Zip and compile statically served resources
generate_compressed_bins(${CMAKE_SOURCE_DIR}/src/resources index.html js/app.js
tpl/home.html tpl/job.html tpl/run.html tpl/log.html tpl/browse.html)
# Download 3rd-party frontend JS libs...
file(DOWNLOAD https://ajax.googleapis.com/ajax/libs/angularjs/1.3.14/angular.min.js
js/angular.min.js EXPECTED_MD5 b1137641dbb512a60e83d673f7e2d98f)
file(DOWNLOAD https://ajax.googleapis.com/ajax/libs/angularjs/1.3.14/angular-route.min.js
js/angular-route.min.js EXPECTED_MD5 28ef7d7b4349ae0dce602748185ef32a)
file(DOWNLOAD https://ajax.googleapis.com/ajax/libs/angularjs/1.3.14/angular-sanitize.min.js
js/angular-sanitize.min.js EXPECTED_MD5 0854eae86bcdf5f92b1ab2b458d8d054)
file(DOWNLOAD https://raw.githubusercontent.com/drudru/ansi_up/v1.3.0/ansi_up.js
js/ansi_up.js EXPECTED_MD5 158566dc1ff8f2804de972f7e841e2f6)
file(DOWNLOAD https://cdnjs.cloudflare.com/ajax/libs/Chart.js/1.0.2/Chart.min.js
js/Chart.min.js EXPECTED_MD5 0d3004601c1a855a3d203502549528a7)
file(DOWNLOAD https://raw.githubusercontent.com/tomsouthall/Chart.HorizontalBar.js/v1.04/Chart.HorizontalBar.js
js/Chart.HorizontalBar.js EXPECTED_MD5 95070a38e69bc56534e1b2086d985270)
file(DOWNLOAD https://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/css/bootstrap.min.css
css/bootstrap.min.css EXPECTED_MD5 5d5357cb3704e1f43a1f5bfed2aebf42)
# ...and compile them
generate_compressed_bins(${CMAKE_BINARY_DIR} js/angular.min.js js/angular-route.min.js
js/angular-sanitize.min.js js/ansi_up.js js/Chart.min.js js/Chart.HorizontalBar.js
css/bootstrap.min.css)
# (see resources.cpp where these are fetched)
## Server
add_executable(laminard src/database.cpp src/main.cpp src/server.cpp src/laminar.cpp
src/resources.cpp src/run.cpp src/node.cpp laminar.capnp.c++ ${COMPRESSED_BINS})
# TODO: some alternative to boost::filesystem?
target_link_libraries(laminard capnp capnp-rpc kj-async kj boost_filesystem boost_system sqlite3 iniparser)
## Client
add_executable(laminarc src/client.cpp laminar.capnp.c++)
target_link_libraries(laminarc capnp capnp-rpc kj-async kj)

5
README.md Normal file
View File

@ -0,0 +1,5 @@
## laminar
Lightweight, linuxy Continuous Integration service.
Alpha. Docs coming.

165
src/client.cpp Normal file
View File

@ -0,0 +1,165 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "laminar.capnp.h"
#include <capnp/ez-rpc.h>
#include <kj/vector.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#define EFAILED 55
template<typename T>
static int setParams(int argc, char** argv, T& request) {
int n = 0;
for(int i = 0; i < argc; ++i) {
if(strchr(argv[i], '=') == NULL)
break;
n++;
}
int argsConsumed = n;
char* job = getenv("lJobName");
char* num = getenv("lBuildNum");
char* reason = getenv("LAMINAR_REASON");
if(job && num) n+=2;
else if(reason) n++;
if(n == 0) return argsConsumed;
auto params = request.initParams(n);
for(int i = 0; i < argsConsumed; ++i) {
char* name = argv[i];
char* val = strchr(name, '=');
*val++ = '\0';
params[i].setName(name);
params[i].setValue(val);
}
if(job && num) {
params[argsConsumed].setName("=parentJob");
params[argsConsumed].setValue(job);
params[argsConsumed+1].setName("=parentBuild");
params[argsConsumed+1].setValue(num);
} else if(reason) {
params[argsConsumed].setName("=reason");
params[argsConsumed].setValue(reason);
}
return argsConsumed;
}
int main(int argc, char** argv) {
// TODO: pass this through an enviroment variable set by laminard
const char* address = "unix:\0laminar";
if(argc < 2) {
fprintf(stderr, "Usage: %s <command> [parameters...]\n", argv[0]);
return EINVAL;
}
capnp::EzRpcClient client(address);
LaminarCi::Client laminar = client.getMain<LaminarCi>();
auto& waitScope = client.getWaitScope();
if(strcmp(argv[1], "trigger") == 0) {
if(argc < 3) {
fprintf(stderr, "Usage %s trigger <jobName>\n", argv[0]);
return EINVAL;
}
kj::Vector<capnp::RemotePromise<LaminarCi::TriggerResults>> promises;
int jobNameIndex = 2;
// make a request for each job specified on the commandline
do {
auto req = laminar.triggerRequest();
req.setJobName(argv[jobNameIndex]);
int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req);
promises.add(req.send());
jobNameIndex += n + 1;
} while(jobNameIndex < argc);
// pend on the promises
for(auto& p : promises) {
if(p.wait(waitScope).getResult() != LaminarCi::MethodResult::SUCCESS) {
fprintf(stderr, "Failed to queue job '%s'\n", argv[2]);
return ENOENT;
}
}
} else if(strcmp(argv[1], "start") == 0) {
if(argc < 3) {
fprintf(stderr, "Usage %s start <jobName>\n", argv[0]);
return EINVAL;
}
kj::Vector<capnp::RemotePromise<LaminarCi::StartResults>> promises;
int jobNameIndex = 2;
// make a request for each job specified on the commandline
do {
auto req = laminar.startRequest();
req.setJobName(argv[jobNameIndex]);
int n = setParams(argc - jobNameIndex - 1, &argv[jobNameIndex + 1], req);
promises.add(req.send());
jobNameIndex += n + 1;
} while(jobNameIndex < argc);
// pend on the promises
for(auto& p : promises) {
if(p.wait(waitScope).getResult() != LaminarCi::JobResult::SUCCESS) {
return EFAILED;
}
}
} else if(strcmp(argv[1], "set") == 0) {
if(argc < 3) {
fprintf(stderr, "Usage %s set param=value\n", argv[0]);
return EINVAL;
}
auto req = laminar.setRequest();
char* eq = strchr(argv[2], '=');
char* job = getenv("lJobName");
char* num = getenv("lBuildNum");
if(job && num && eq) {
char* name = argv[2];
*eq++ = '\0';
char* val = eq;
req.setJobName(job);
req.setBuildNum(atoi(num));
req.getParam().setName(name);
req.getParam().setValue(val);
req.send().wait(waitScope);
} else {
fprintf(stderr, "Missing lJobName and lBuildNum or param is not in the format key=value\n");
return EINVAL;
}
} else if(strcmp(argv[1], "wait") == 0) {
auto req = laminar.pendRequest();
req.setJobName(argv[2]);
req.setBuildNum(atoi(argv[3]));
auto response = req.send().wait(waitScope);
if(response.getResult() != LaminarCi::JobResult::SUCCESS)
return EFAILED;
} else {
fprintf(stderr, "Unknown comand %s\n", argv[1]);
return EINVAL;
}
return 0;
}

75
src/database.cpp Normal file
View File

@ -0,0 +1,75 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "database.h"
#include <sqlite3.h>
Database::Database(const char *path) {
sqlite3_open(path, &hdl);
}
Database::~Database() {
sqlite3_close(hdl);
}
Database::Statement::Statement(sqlite3 *db, const char *query) {
sqlite3_prepare_v2(db, query, -1, &stmt, NULL);
}
Database::Statement::~Statement() {
sqlite3_finalize(stmt);
}
bool Database::Statement::exec() {
return sqlite3_step(stmt) == SQLITE_OK;
}
void Database::Statement::bindValue(int i, int e) {
sqlite3_bind_int(stmt, i, e);
}
void Database::Statement::bindValue(int i, const char* e) {
sqlite3_bind_text(stmt, i, e, -1, NULL);
}
void Database::Statement::bindValue(int i, std::string e) {
sqlite3_bind_text(stmt, i, e.c_str(), e.length(), NULL);
}
template<> std::string Database::Statement::fetchColumn(int col) {
return (char*)sqlite3_column_text(stmt, col);
}
template<> const char* Database::Statement::fetchColumn(int col) {
return (char*)sqlite3_column_text(stmt, col);
}
template<> int Database::Statement::fetchColumn(int col) {
return sqlite3_column_int(stmt, col);
}
template<> time_t Database::Statement::fetchColumn(int col) {
return sqlite3_column_int64(stmt, col);
}
bool Database::Statement::row() {
return sqlite3_step(stmt) == SQLITE_ROW;
}

142
src/database.h Normal file
View File

@ -0,0 +1,142 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef _LAMINAR_DATABASE_H_
#define _LAMINAR_DATABASE_H_
#include <string>
#include <functional>
struct sqlite3;
struct sqlite3_stmt;
// This is a small sqlite wrapper using some clever template action
// to somewhat reduce verbosity. Usage:
// db.stmt("SELECT result WHERE name = ?")
// .bind(name)
// .fetch([](int result) {
// // function called for each retrieved row
// doSomething(result);
// });
class Database {
public:
Database(const char* path);
~Database();
private:
// Represents a database statement. Call Database::stmt() to get
// one, then call bind(), fetch() or exec() on the returned object
class Statement {
private:
// Internal template helper that defines the type
// in the variadic type array Args at offset N
template<int N, typename T, typename...Args>
struct typeindex : typeindex<N-1, Args...> {};
template<typename T, typename...Args>
struct typeindex<0, T, Args...> { typedef T type; };
public:
Statement(sqlite3* db, const char* query);
~Statement();
// Bind several parameters in a single call. They are bound
// by index in the order passed into this function
template<typename...Args>
Statement& bind(Args...args) {
return bindRecursive<Args...>(1, args...);
}
// Fetch columns. Supply a callback that will be executed for
// each row in the resultset, with arguments matching the
// expected column types
template<typename...Args>
void fetch(typename typeindex<0, std::function<void(Args...)>>::type callback) {
FetchMarshaller<Args...> fm(this, callback);
}
// execute without fetching any parameters. Intended for
// non-SELECT statements;
bool exec();
private:
// Internal template helper used to unpack arguments into
// the fetch callback.
template<int...N> struct rng { };
// Internal template helper to generate a rng<> object:
// genrng<4>::type is rng<0,1,2,3>
template<int J, int...N>
struct genrng : genrng<J-1, J-1, N...> {};
template<int...N>
struct genrng<0, N...> { typedef rng<N...> type; };
template<typename...Args>
struct FetchMarshaller {
FetchMarshaller(Statement* st, std::function<void(Args...)> cb){
marshal(st, cb, typename genrng<sizeof...(Args)>::type());
}
template<int...N>
void marshal(Statement* st, std::function<void(Args...)> cb, rng<N...>) {
while(st->row()) {
cb(st->fetchColumn<typename typeindex<N, Args...>::type>(N)...);
}
}
};
template<typename...Args>
friend class FetchMarshaller;
bool row();
template<typename T, typename...Args>
Statement& bindRecursive(int i, T v, Args...args) {
bindValue(i, v); // specialization must exist for T
return bindRecursive(i + 1, args...);
}
// template terminating condition
Statement& bindRecursive(int) {
return *this;
}
// Bind value specializations
void bindValue(int i, int e);
void bindValue(int i, const char* e);
void bindValue(int i, std::string e);
// Declaration for fetch column interface,
// intentionally missing definition
template<typename T>
T fetchColumn(int col);
sqlite3_stmt* stmt;
};
public:
Statement stmt(const char* q) {
return Statement(hdl, q);
}
// shorthand
bool exec(const char* q) { return Statement(hdl, q).exec(); }
private:
sqlite3* hdl;
};
// specialization declarations, defined in source file
template<> std::string Database::Statement::fetchColumn(int col);
template<> const char* Database::Statement::fetchColumn(int col);
template<> int Database::Statement::fetchColumn(int col);
template<> time_t Database::Statement::fetchColumn(int col);
#endif // _LAMINAR_DATABASE_H_

113
src/interface.h Normal file
View File

@ -0,0 +1,113 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef INTERFACE_H
#define INTERFACE_H
#include "run.h"
#include <kj/async.h>
#include <string>
#include <memory>
#include <unordered_map>
typedef std::unordered_map<std::string, std::string> ParamMap;
// Simple struct to define which information a frontend client is interested
// in, both in initial request phase and real-time updates. It corresponds
// loosely to frontend URLs
struct MonitorScope {
enum Type {
HOME, // home page: recent builds and statistics
ALL, // browse jobs
JOB, // a specific job page
RUN, // a specific run page
LOG // a run's log page
};
MonitorScope(Type type = HOME, std::string job = std::string(), int num = 0) :
type(type),
job(job),
num(num)
{}
// whether this scope wants status information about the given job or run
bool wantsStatus(std::string ajob, int anum = 0) const {
if(type == HOME || type == ALL) return true;
if(type == JOB) return ajob == job;
if(type == RUN) return ajob == job && anum == num;
return false;
}
bool wantsLog(std::string ajob, int anum) const {
return type == LOG && ajob == job && anum == num;
}
Type type;
std::string job;
int num = 0;
};
// Represents a (websocket) client that wants to be notified about events
// matching the supplied scope. Pass instances of this to LaminarInterface
// registerClient and deregisterClient
struct LaminarClient {
virtual void sendMessage(std::string payload) = 0;
MonitorScope scope;
};
// The interface connecting the network layer to the application business
// logic. These methods fulfil the requirements of both the HTTP/Websocket
// and RPC interfaces.
struct LaminarInterface {
// Queues a job, returns immediately. Return value will be nullptr if
// the supplied name is not a known job.
virtual std::shared_ptr<Run> queueJob(std::string name, ParamMap params = ParamMap()) = 0;
// Returns a promise that will wait for a run matching the given name
// and build number to complete. The promise will resolve to the result
// of the run. If no such run exists, the status will be RunState::UNKNOWN
virtual kj::Promise<RunState> waitForRun(std::string name, int buildNum) = 0;
// Specialization of above for an existing Run object (for example returned
// from queueJob). Returned promise will never resolve to RunState::UNKNOWN
virtual kj::Promise<RunState> waitForRun(const Run*) = 0;
// Register a client (but don't give up ownership). The client will be
// notified with a JSON message of any events matching its scope
// (see LaminarClient and MonitorScope above)
virtual void registerClient(LaminarClient* client) = 0;
// Call this before destroying a client so that Laminar doesn't try
// to call LaminarClient::sendMessage on invalid data
virtual void deregisterClient(LaminarClient* client) = 0;
// Synchronously send a snapshot of the current status to the given
// client (as governed by the client's MonitorScope). This is called on
// initial websocket connect.
virtual void sendStatus(LaminarClient* client) = 0;
// Implements the laminar client interface allowing the setting of
// arbitrary parameters on a run (usually itself) to be available in
// the environment of subsequent scripts.
virtual bool setParam(std::string job, int buildNum, std::string param, std::string value) = 0;
};
#endif // INTERFACE_H

28
src/laminar.capnp Normal file
View File

@ -0,0 +1,28 @@
@0xc2cbd510f16dab57;
interface LaminarCi {
trigger @0 (jobName :Text, params :List(JobParam)) -> (result :MethodResult);
start @1 (jobName :Text, params :List(JobParam)) -> (result :JobResult);
pend @2 (jobName :Text, buildNum :UInt32) -> (result :JobResult);
set @3 (jobName :Text, buildNum :UInt32, param :JobParam) -> (result :MethodResult);
struct JobParam {
name @0 :Text;
value @1 :Text;
}
enum MethodResult {
failed @0;
success @1;
}
enum JobResult {
unknown @0;
failed @1;
aborted @2;
success @3;
}
}

535
src/laminar.cpp Normal file
View File

@ -0,0 +1,535 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "laminar.h"
#include "server.h"
#include <sys/wait.h>
#include <iniparser.h>
#include <kj/debug.h>
#include <boost/filesystem.hpp>
namespace fs = boost::filesystem;
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
namespace {
// rapidjson::Writer with a StringBuffer is used a lot in Laminar for
// preparing JSON messages to send to Websocket clients. A small wrapper
// class here reduces verbosity later for this common use case.
class Json : public rapidjson::Writer<rapidjson::StringBuffer> {
public:
Json() : rapidjson::Writer<rapidjson::StringBuffer>(buf) { StartObject(); }
template<typename T>
Json& set(const char* key, T value);
Json& startObject(const char* key) { String(key); StartObject(); return *this; }
Json& startArray(const char* key) { String(key); StartArray(); return *this; }
const char* str() { EndObject(); return buf.GetString(); }
private:
rapidjson::StringBuffer buf;
};
template<> Json& Json::set(const char* key, const char* value) { String(key); String(value); return *this; }
template<> Json& Json::set(const char* key, std::string value) { String(key); String(value.c_str()); return *this; }
template<> Json& Json::set(const char* key, int value) { String(key); Int(value); return *this; }
template<> Json& Json::set(const char* key, time_t value) { String(key); Int64(value); return *this; }
}
namespace {
// Default values when none were supplied in $LAMINAR_CONF_FILE (/etc/laminar.conf)
constexpr const char* INTADDR_RPC_DEFAULT = "unix:\0laminar";
constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080";
constexpr const char* BASE_CFG_DIR = "/home/og/dev/laminar/cfg";
}
typedef std::string str;
Laminar::Laminar(const char* configFile) {
// read params from config file
conf = iniparser_load(configFile);
KJ_REQUIRE(conf != nullptr, "Could not parse", configFile);
homeDir = iniparser_getstring(conf, ":LAMINAR_HOME", "/var/lib/laminar");
db = new Database((fs::path(homeDir)/"laminar.sqlite").string().c_str());
// Prepare database for first use
// TODO: error handling
db->exec("CREATE TABLE IF NOT EXISTS builds("
"name TEXT, number INT UNSIGNED, node TEXT, queuedAt INT, startedAt INT, completedAt INT, result INT, output TEXT, parentJob TEXT, parentBuild INT, reason TEXT)");
// retrieve the last build numbers
db->stmt("SELECT name, MAX(number) FROM builds GROUP BY name")
.fetch<str,int>([this](str name, int build){
buildNums[name] = build;
});
// This is only a separate function because I imagined that it would
// be nice to reload some configuration during runtime without restarting
// the server completely. Currently not called from anywhere else
// TODO: implement that
loadConfiguration();
}
void Laminar::registerClient(LaminarClient* client) {
clients.insert(client);
}
void Laminar::deregisterClient(LaminarClient* client) {
clients.erase(client);
}
bool Laminar::setParam(std::string job, int buildNum, std::string param, std::string value) {
auto it = activeJobs.get<1>().find(std::make_tuple(job, buildNum));
if(it == activeJobs.get<1>().end())
return false;
std::shared_ptr<Run> run = *it;
run->params[param] = value;
return true;
}
void Laminar::sendStatus(LaminarClient* client) {
if(client->scope.type == MonitorScope::LOG) {
// If the requested job is currently in progress
auto it = activeJobs.get<1>().find(std::make_tuple(client->scope.job, client->scope.num));
if(it != activeJobs.get<1>().end()) {
client->sendMessage((*it)->log.c_str());
} else { // it must be finished, fetch it from the database
db->stmt("SELECT output FROM builds WHERE name = ? AND number = ?")
.bind(client->scope.job, client->scope.num)
.fetch<const char*>([=](const char* log) {
client->sendMessage(log);
});
}
} else if(client->scope.type == MonitorScope::RUN) {
Json j;
j.set("type", "status");
j.startObject("data");
db->stmt("SELECT startedAt, result, reason FROM builds WHERE name = ? AND number = ?")
.bind(client->scope.job, client->scope.num)
.fetch<time_t, int, std::string>([&](time_t started, int result, std::string reason) {
j.set("started", started);
j.set("result", to_string(RunState(result)));
j.set("reason", reason);
});
j.EndObject();
client->sendMessage(j.str());
} else if(client->scope.type == MonitorScope::JOB) {
Json j;
j.set("type", "status");
j.startObject("data");
j.startArray("recent");
db->stmt("SELECT * FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 5")
.bind(client->scope.job)
.fetch<str,int,str,time_t,time_t,time_t,int>([&](str name,int build,str node,time_t,time_t started,time_t completed,int result){
j.StartObject();
j.set("name", name)
.set("number", build)
.set("node", node)
.set("duration", completed - started)
.set("started", started)
.set("result", to_string(RunState(result)))
.EndObject();
});
j.EndArray();
j.startArray("running");
auto p = activeJobs.get<4>().equal_range(client->scope.job);
for(auto it = p.first; it != p.second; ++it) {
const std::shared_ptr<Run> run = *it;
j.StartObject();
j.set("name", run->name);
j.set("number", run->build);
j.set("node", run->node->name);
j.set("started", run->startedAt);
j.EndObject();
}
j.EndArray();
j.startArray("queued");
for(const std::shared_ptr<Run> run : queuedJobs) {
if (run->name == client->scope.job) {
j.StartObject();
j.set("name", run->name);
j.EndObject();
}
}
j.EndArray();
j.EndObject();
client->sendMessage(j.str());
} else if(client->scope.type == MonitorScope::ALL) {
Json j;
j.set("type", "status");
j.startObject("data");
j.startArray("jobs");
db->stmt("SELECT name FROM builds GROUP BY name")
.fetch<str>([&](str name){
j.StartObject();
j.set("name", name)
.EndObject();
});
j.EndArray();
j.EndObject();
client->sendMessage(j.str());
} else { // Home page
Json j;
j.set("type", "status");
j.startObject("data");
j.startArray("recent");
db->stmt("SELECT * FROM builds ORDER BY completedAt DESC LIMIT 5")
.fetch<str,int,str,time_t,time_t,time_t,int>([&](str name,int build,str node,time_t,time_t started,time_t completed,int result){
j.StartObject();
j.set("name", name)
.set("number", build)
.set("node", node)
.set("duration", completed - started)
.set("started", started)
.set("result", to_string(RunState(result)))
.EndObject();
});
j.EndArray();
j.startArray("running");
for(const std::shared_ptr<Run> run : activeJobs.get<3>()) {
j.StartObject();
j.set("name", run->name);
j.set("number", run->build);
j.set("node", run->node->name);
j.set("started", run->startedAt);
j.EndObject();
}
j.EndArray();
j.startArray("queued");
for(const std::shared_ptr<Run> run : queuedJobs) {
j.StartObject();
j.set("name", run->name);
j.EndObject();
}
j.EndArray();
int execTotal = 0;
int execBusy = 0;
for(const auto& it : nodes) {
const Node& node = it.second;
execTotal += node.numExecutors;
execBusy += node.busyExecutors;
}
j.set("executorsTotal", execTotal);
j.set("executorsBusy", execBusy);
j.startArray("buildsPerDay");
for(int i = 6; i >= 0; --i) {
j.StartObject();
db->stmt("SELECT result, COUNT(*) FROM builds WHERE completedAt > ? AND completedAt < ? GROUP by result")
.bind(86400*(time(0)/86400 - i), 86400*(time(0)/86400 - (i-1)))
.fetch<int,int>([&](int result, int num){
j.set(to_string(RunState(result)).c_str(), num);
});
j.EndObject();
}
j.EndArray();
j.startObject("buildsPerJob");
db->stmt("SELECT name, COUNT(*) FROM builds WHERE completedAt > ? GROUP BY name")
.bind(time(0) - 86400)
.fetch<str, int>([&](str job, int count){
j.set(job.c_str(), count);
});
j.EndObject();
j.EndObject();
client->sendMessage(j.str());
}
}
Laminar::~Laminar() {
iniparser_freedict(conf);
delete db;
delete srv;
}
void Laminar::run() {
const char* listen_rpc = iniparser_getstring(conf, ":LAMINAR_BIND_RPC", INTADDR_RPC_DEFAULT);
const char* listen_http = iniparser_getstring(conf, ":LAMINAR_BIND_HTTP", INTADDR_HTTP_DEFAULT);
srv = new Server(*this, listen_rpc, listen_http);
srv->start();
}
void Laminar::stop() {
clients.clear();
srv->stop();
}
bool Laminar::loadConfiguration() {
NodeMap nm;
fs::directory_iterator dit(fs::path(homeDir)/"cfg"/"nodes");
for(fs::directory_entry& it : dit) {
if(!fs::is_directory(it.status()))
continue;
fs::directory_entry config(it.path()/"config");
if(!fs::is_regular_file(config.status()))
continue;
dictionary* ini = iniparser_load(config.path().string().c_str());
if(!ini) {
KJ_LOG(ERROR, "Could not parse node config", config.path().string());
continue;
}
int executors = iniparser_getint(ini, ":EXECUTORS", 6);
Node node;
node.name = it.path().filename().string();
node.numExecutors = executors;
nm.emplace(node.name, std::move(node));
iniparser_freedict(ini);
}
if(nm.empty()) {
// add a default node
Node node;
node.name = "default";
node.numExecutors = 6;
nm.emplace("default", std::move(node));
}
nodes = nm;
return true;
}
std::shared_ptr<Run> Laminar::queueJob(std::string name, ParamMap params) {
if(!fs::exists(fs::path(homeDir)/"cfg"/"jobs"/name))
return nullptr;
// attempt to create a workspace for this job if it doesn't exist
if(!fs::exists(fs::path(homeDir)/"run"/name/"workspace")) {
if(!fs::create_directories(fs::path(homeDir)/"run"/name/"workspace")) {
KJ_LOG(ERROR, "Could not create job workspace", name);
return nullptr;
}
}
std::shared_ptr<Run> run = std::make_shared<Run>();
run->name = name;
run->queuedAt = time(0);
for(auto it = params.begin(); it != params.end();) {
if(it->first[0] == '=') {
if(it->first == "=parentJob") {
run->parentName = it->second;
} else if(it->first == "=parentBuild") {
run->parentBuild = atoi(it->second.c_str());
} else if(it->first == "=reason") {
run->reasonMsg = it->second;
} else {
KJ_LOG(ERROR, "Unknown internal job parameter", it->first);
}
it = params.erase(it);
} else
++it;
}
run->params = params;
queuedJobs.push_back(run);
// notify clients
Json j;
j.set("type", "job_queued")
.startObject("data")
.set("name", name)
.EndObject();
const char* msg = j.str();
for(LaminarClient* c : clients) {
if(c->scope.wantsStatus(name))
c->sendMessage(msg);
}
assignNewJobs();
return run;
}
kj::Promise<RunState> Laminar::waitForRun(std::string name, int buildNum) {
auto it = activeJobs.get<1>().find(std::make_tuple(name, buildNum));
if(it == activeJobs.get<1>().end())
return RunState::UNKNOWN;
return waitForRun(it->get());
}
kj::Promise<RunState> Laminar::waitForRun(const Run* run) {
waiters[run].emplace_back(Waiter{});
return waiters[run].back().takePromise();
}
bool Laminar::stepRun(std::shared_ptr<Run> run) {
bool complete = run->step();
if(!complete) {
srv->addProcess(run->fd, [=](char* b,size_t n){
std::string s(b,n);
run->log += s;
for(LaminarClient* c : clients) {
if(c->scope.wantsLog(run->name, run->build))
c->sendMessage(s);
}
}, [this,run](){ reapAdvance();});
}
return complete;
}
void Laminar::reapAdvance() {
int ret = 0;
// TODO: If we pass WNOHANG here for better asynchronicity, how do
// we re-schedule a poll to wait for finished child processes?
pid_t pid = waitpid(-1, &ret, 0);
// TODO: handle signalled child processes
if(pid > 0) {
KJ_LOG(INFO, "Reaping", pid);
auto it = activeJobs.get<0>().find(pid);
std::shared_ptr<Run> run = *it;
bool completed = true;
activeJobs.get<0>().modify(it, [&](std::shared_ptr<Run> run){
close(run->fd);
run->reaped(ret);
completed = stepRun(run);
});
if(completed)
run->complete();
}
assignNewJobs();
}
void Laminar::assignNewJobs() {
auto it = queuedJobs.begin();
while(it != queuedJobs.end()) {
bool assigned = false;
for(auto& sn : nodes) {
Node& node = sn.second;
std::shared_ptr<Run> run = *it;
if(node.queue(*run)) {
node.busyExecutors++;
run->node = &node;
run->startedAt = time(0);
run->build = ++buildNums[run->name];
run->laminarHome = homeDir;
fs::path wd = fs::path(homeDir)/"run"/run->name/std::to_string(run->build);
if(!fs::is_directory(wd) && !fs::create_directory(wd)) {
KJ_LOG(ERROR, "Could not create working directory", wd.string());
break;
}
run->wd = wd.string();
// add scripts
fs::path cfgDir = fs::path(homeDir)/"cfg";
// global before-run script
if(fs::exists(cfgDir/"before"))
run->addScript((cfgDir/"before").string());
// per-node before-run script
if(fs::exists(cfgDir/"nodes"/node.name/"before"))
run->addScript((cfgDir/"before").string());
// job before-run script
if(fs::exists(cfgDir/"jobs"/run->name/"before"))
run->addScript((cfgDir/"jobs"/run->name/"before").string());
// main run script. must exist.
run->addScript((cfgDir/"jobs"/run->name/"run").string());
// job after-run script
if(fs::exists(cfgDir/"jobs"/run->name/"after"))
run->addScript((cfgDir/"jobs"/run->name/"after").string());
// per-node after-run script
if(fs::exists(cfgDir/"nodes"/node.name/"after"))
run->addScript((cfgDir/"nodes"/node.name/"after").string());
// global after-run script
if(fs::exists(cfgDir/"after"))
run->addScript((cfgDir/"after").string());
KJ_LOG(INFO, "Queued job to node", run->name, run->build, node.name);
// notify clients
Json j;
j.set("type", "job_started")
.startObject("data")
.set("queueIndex", std::distance(it,queuedJobs.begin()))
.set("name", run->name)
.set("started", run->startedAt)
.set("number", run->build)
.set("reason", run->reason());
db->stmt("SELECT completedAt - startedAt FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1")
.bind(run->name)
.fetch<int>([&](int etc){
j.set("etc", time(0) + etc);
});
j.EndObject();
const char* msg = j.str();
for(LaminarClient* c : clients) {
if(c->scope.wantsStatus(run->name, run->build))
c->sendMessage(msg);
}
// setup run completion handler
run->notifyCompletion = [&](const Run* r) {
node.busyExecutors--;
KJ_LOG(INFO, "Run completed", r->name, to_string(r->result));
time_t completedAt = time(0);
db->stmt("INSERT INTO builds VALUES(?,?,?,?,?,?,?,?,?,?,?)")
.bind(r->name, r->build, node.name, r->queuedAt, r->startedAt, completedAt, int(r->result),
r->log, r->parentName, r->parentBuild, r->reason())
.exec();
// notify clients
Json j;
j.set("type", "job_completed")
.startObject("data")
.set("name", r->name)
.set("number", r->build)
.set("duration", completedAt - r->startedAt)
.set("started", r->startedAt)
.set("result", to_string(r->result))
.EndObject();
const char* msg = j.str();
for(LaminarClient* c : clients) {
if(c->scope.wantsStatus(r->name, r->build))
c->sendMessage(msg);
}
// wake the waiters
for(Waiter& waiter : waiters[r])
waiter.release(r->result);
waiters.erase(r);
// will delete the job
activeJobs.get<2>().erase(r);
};
// trigger the first step of the run
if(stepRun(run)) {
// should never happen
KJ_LOG(INFO, "No steps for run");
run->complete();
}
assigned = true;
break;
}
}
if(assigned) {
activeJobs.insert(*it);
it = queuedJobs.erase(it);
} else
++it;
}
}

90
src/laminar.h Normal file
View File

@ -0,0 +1,90 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef _LAMINAR_LAMINAR_H_
#define _LAMINAR_LAMINAR_H_
#include "interface.h"
#include "run.h"
#include "node.h"
#include "database.h"
#include <unordered_map>
// Node name to node object map
typedef std::unordered_map<std::string,Node> NodeMap;
struct Server;
struct _dictionary_; // from iniparser
// The main class implementing the application's business logic.
// It owns a Server to manage the HTTP/websocket and Cap'n Proto RPC
// interfaces and communicates via the LaminarInterface methods and
// the LaminarClient objects (see interface.h)
class Laminar : public LaminarInterface {
public:
Laminar(const char* configFile);
~Laminar();
// Runs the application forever
void run();
// Call this in a signal handler to make run() return
void stop();
// Implementations of LaminarInterface
std::shared_ptr<Run> queueJob(std::string name, ParamMap params = ParamMap()) override;
kj::Promise<RunState> waitForRun(std::string name, int buildNum) override;
kj::Promise<RunState> waitForRun(const Run* run) override;
void registerClient(LaminarClient* client) override;
void deregisterClient(LaminarClient* client) override;
void sendStatus(LaminarClient* client) override;
bool setParam(std::string job, int buildNum, std::string param, std::string value) override;
private:
bool loadConfiguration();
void reapAdvance();
void assignNewJobs();
bool stepRun(std::shared_ptr<Run> run);
std::list<std::shared_ptr<Run>> queuedJobs;
// Implements the waitForRun API.
// TODO: refactor
struct Waiter {
Waiter() : paf(kj::newPromiseAndFulfiller<RunState>()) {}
void release(RunState state) {
paf.fulfiller->fulfill(RunState(state));
}
kj::Promise<RunState> takePromise() { return std::move(paf.promise); }
private:
kj::PromiseFulfillerPair<RunState> paf;
};
std::unordered_map<const Run*,std::list<Waiter>> waiters;
std::unordered_map<std::string, uint> buildNums;
RunSet activeJobs;
Database* db;
Server* srv;
_dictionary_* conf;
NodeMap nodes;
std::string homeDir;
std::set<LaminarClient*> clients;
};
#endif // _LAMINAR_LAMINAR_H_

53
src/main.cpp Normal file
View File

@ -0,0 +1,53 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "laminar.h"
#include <signal.h>
#include <kj/debug.h>
std::function<void()> sigHandler;
static void __sigHandler(int) { sigHandler(); }
int main(int argc, char** argv) {
for(int i = 1; i < argc; ++i) {
if(strcmp(argv[i], "-v") == 0) {
kj::_::Debug::setLogLevel(kj::_::Debug::Severity::INFO);
}
}
const char* configFile = getenv("LAMINAR_CONF_FILE");
if(!configFile || !*configFile)
configFile = "/etc/laminar.conf";
do {
Laminar laminar(configFile);
sigHandler = [&](){
KJ_LOG(INFO, "Received SIGINT");
laminar.stop();
};
signal(SIGINT, &__sigHandler);
signal(SIGTERM, &__sigHandler);
laminar.run();
} while(false);
KJ_DBG("end of main");
return 0;
}

25
src/node.cpp Normal file
View File

@ -0,0 +1,25 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "node.h"
bool Node::queue(const Run& run) {
// later this could check if the given run is allowed to run
// on this node, for example if the run's tags match the node's tags
return busyExecutors < numExecutors;
}

41
src/node.h Normal file
View File

@ -0,0 +1,41 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef NODE_H
#define NODE_H
#include <string>
class Run;
// Represents a group of executors. Currently almost unnecessary POD
// abstraction, but may be enhanced in the future to support e.g. tags
class Node {
public:
Node() {}
std::string name;
int numExecutors;
int busyExecutors = 0;
// Attempts to queue the given run to this node. Returns true if succeeded.
bool queue(const Run& run);
};
#endif // NODE_H

67
src/resources.cpp Normal file
View File

@ -0,0 +1,67 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminaris distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "resources.h"
#include <string.h>
#define INIT_RESOURCE(route, name) \
extern const char _binary_##name##_z_start[];\
extern const char _binary_##name##_z_end[]; \
resources[route] = std::make_pair(_binary_ ## name ## _z_start, _binary_ ## name ## _z_end)
Resources::Resources()
{
// TODO: Content-type
INIT_RESOURCE("/", index_html);
INIT_RESOURCE("/js/app.js", js_app_js);
INIT_RESOURCE("/js/Chart.HorizontalBar.js", js_Chart_HorizontalBar_js);
INIT_RESOURCE("/js/ansi_up.js", js_ansi_up_js);
INIT_RESOURCE("/tpl/home.html", tpl_home_html);
INIT_RESOURCE("/tpl/job.html", tpl_job_html);
INIT_RESOURCE("/tpl/run.html", tpl_run_html);
INIT_RESOURCE("/tpl/log.html", tpl_log_html);
INIT_RESOURCE("/tpl/browse.html", tpl_browse_html);
INIT_RESOURCE("/js/angular.min.js", js_angular_min_js);
INIT_RESOURCE("/js/angular-route.min.js", js_angular_route_min_js);
INIT_RESOURCE("/js/angular-sanitize.min.js", js_angular_sanitize_min_js);
INIT_RESOURCE("/js/ansi_up.js", js_ansi_up_js);
INIT_RESOURCE("/js/Chart.min.js", js_Chart_min_js);
INIT_RESOURCE("/js/Chart.HorizontalBar.js", js_Chart_HorizontalBar_js);
INIT_RESOURCE("/css/bootstrap.min.css", css_bootstrap_min_css);
}
inline bool beginsWith(std::string haystack, const char* needle) {
return strncmp(haystack.c_str(), needle, strlen(needle)) == 0;
}
bool Resources::handleRequest(std::string path, const char **start, const char **end) {
// need to keep the list of "application links" synchronised with the angular
// application. We cannot return a 404 for any of these
auto it = beginsWith(path,"/jobs")
? resources.find("/")
: resources.find(path);
if(it != resources.end()) {
*start = it->second.first;
*end = it->second.second;
return true;
}
return false;
}

41
src/resources.h Normal file
View File

@ -0,0 +1,41 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminaris distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef _LAMINAR_RESOURCES_H_
#define _LAMINAR_RESOURCES_H_
#include <unordered_map>
#include <utility>
#include <string>
// Simple class to abstract away the mapping of HTTP requests for
// resources to their data.
class Resources {
public:
Resources();
// If a resource is known for the given path, set start and end to the
// binary data to send to the client. Function returns false if no resource
// for the given path is known (404)
bool handleRequest(std::string path, const char** start, const char** end);
private:
std::unordered_map<std::string, std::pair<const char*, const char*>> resources;
};
#endif // _LAMINAR_RESOURCES_H_

45
src/resources/index.html Normal file
View File

@ -0,0 +1,45 @@
<!doctype html>
<html ng-app="laminar">
<head>
<base href="/">
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Laminar</title>
<script src="/js/angular.min.js"></script>
<script src="/js/angular-route.min.js"></script>
<script src="/js/angular-sanitize.min.js"></script>
<script src="/js/ansi_up.js" type="text/javascript"></script>
<script src="/js/Chart.min.js"></script>
<script src="/js/Chart.HorizontalBar.js"></script>
<link href="/css/bootstrap.min.css" rel="stylesheet">
<script src="/js/app.js"></script>
<style>
body, html { height: 100%; }
.navbar { margin-bottom: 0; }
.navbar-brand { padding: 13px 15px; font-family: 'Cantarell';}
.navbar-inverse { border: 0; }
canvas {
width: 100% !important;
max-width: 800px;
height: auto !important;
}
.progress {
height: 10px;
margin-bottom: 0;
}
</style>
</head>
<body>
<nav class="navbar navbar-inverse">
<div class="container-fluid">
<a class="navbar-brand" href="/">laminar</a>
<ul class="nav navbar-nav">
<li ng-class="{active:active('/')}"><a href="/jobs">Jobs</a></li>
</ul>
</div>
</nav>
<div ng-view></div>
</body>
</html>

250
src/resources/js/app.js Normal file
View File

@ -0,0 +1,250 @@
Laminar = {
runIcon: function(result) {
return result === "success" ? '<span style="color:forestgreen">✔</span>' : '<span style="color:crimson;">✘</span>';
},
jobFormatter: function(o) {
o.duration = o.duration + "s"
o.when = (new Date(1000 * o.started)).toLocaleString();
return o;
}
};
angular.module('laminar',['ngRoute','ngSanitize'])
.config(function($routeProvider, $locationProvider, $sceProvider) {
$routeProvider
.when('/', {
templateUrl: 'tpl/home.html',
controller: 'mainController'
})
.when('/jobs', {
templateUrl: 'tpl/browse.html',
controller: 'BrowseController',
})
.when('/jobs/:name', {
templateUrl: 'tpl/job.html',
controller: 'JobController'
})
.when('/jobs/:name/:num', {
templateUrl: 'tpl/run.html',
controller: 'RunController'
})
.when('/jobs/:name/:num/log', {
templateUrl: 'tpl/log.html',
controller: 'LogController'
})
$locationProvider.html5Mode(true);
$sceProvider.enabled(false);
})
.factory('$ws',function($q,$location){
return {
statusListener: function(callbacks) {
var ws = new WebSocket("ws://" + location.host + $location.path());
ws.onmessage = function(message) {
message = JSON.parse(message.data);
callbacks[message.type](message.data);
};
},
logListener: function(callback) {
var ws = new WebSocket("ws://" + location.host + $location.path());
ws.onmessage = function(message) {
callback(message.data);
};
}
};
})
.controller('mainController', function($scope, $ws, $interval){
$scope.jobsQueued = [];
$scope.jobsRunning = [];
$scope.jobsRecent = [];
var chtUtilization, chtBuildsPerDay, chtBuildsPerJob;
var updateUtilization = function(busy) {
chtUtilization.segments[0].value += busy ? 1 : -1;
chtUtilization.segments[1].value -= busy ? 1 : -1;
chtUtilization.update();
}
$ws.statusListener({
status: function(data) {
// populate jobs
$scope.jobsQueued = data.queued;
$scope.jobsRunning = data.running;
$scope.jobsRecent = data.recent.map(Laminar.jobFormatter);
$scope.$apply();
// setup charts
chtUtilization = new Chart(document.getElementById("chartUtil").getContext("2d")).Pie(
[{value: data.executorsBusy, color:"sandybrown", label: "Busy"},
{value: data.executorsTotal, color: "steelblue", label: "Idle"}],
{animationEasing: 'easeInOutQuad'}
);
chtBuildsPerDay = new Chart(document.getElementById("chartBpd").getContext("2d")).Line({
labels: function(){
res = [];
var now = new Date();
for(var i = 6; i >= 0; --i) {
var then = new Date(now.getTime() - i*86400000);
res.push(["Sun","Mon","Tue","Wed","Thu","Fri","Sat"][then.getDay()]);
}
return res;
}(),
datasets: [{
label: "Successful Builds",
fillColor: "darkseagreen",
strokeColor: "forestgreen",
data: data.buildsPerDay.map(function(e){return e.success||0;})
},{
label: "Failed Bulids",
fillColor: "darksalmon",
strokeColor: "crimson",
data: data.buildsPerDay.map(function(e){return e.failed||0;})
}]},
{ showTooltips: false }
);
chtBuildsPerJob = new Chart(document.getElementById("chartBpj").getContext("2d")).HorizontalBar({
labels: Object.keys(data.buildsPerJob),
datasets: [{
fillColor: "steelblue",
data: Object.keys(data.buildsPerJob).map(function(e){return data.buildsPerJob[e];})
}]
},{});
},
job_queued: function(data) {
$scope.jobsQueued.splice(0,0,data);
$scope.$apply();
},
job_started: function(data) {
$scope.jobsQueued.splice($scope.jobsQueued.length - data.queueIndex - 1,1);
$scope.jobsRunning.splice(0,0,data);
$scope.$apply();
updateUtilization(true);
},
job_completed: function(data) {
if(data.result === "success")
chtBuildsPerDay.datasets[0].points[6].value++;
else
chtBuildsPerDay.datasets[1].points[6].value++;
chtBuildsPerDay.update();
for(var i = 0; i < $scope.jobsRunning.length; ++i) {
var job = $scope.jobsRunning[i];
if(job.name == data.name && job.number == data.number) {
$scope.jobsRunning.splice(i,1);
$scope.jobsRecent.splice(0,0,Laminar.jobFormatter(data));
$scope.$apply();
break;
}
}
updateUtilization(false);
for(var j = 0; j < chtBuildsPerJob.datasets[0].bars.length; ++j) {
if(chtBuildsPerJob.datasets[0].bars[j].label == job.name) {
chtBuildsPerJob.datasets[0].bars[j].value++;
chtBuildsPerJob.update();
break;
}
}
}
});
$scope.active = function(url) {
return false;
}
$scope.runIcon = Laminar.runIcon;
timeUpdater = $interval(function() {
$scope.jobsRunning.forEach(function(o){
if(o.etc) {
var d = new Date();
var p = (d.getTime()/1000 - o.started) / (o.etc - o.started);
if(p > 1.2) {
o.overtime = true;
} else if(p >= 1) {
o.progress = 99;
} else {
o.progress = 100 * p;
}
}
});
}, 1000);
$scope.$on('$destroy', function() {
$interval.cancel(timeUpdater);
});
})
.controller('BrowseController', function($scope, $ws, $interval){
$scope.jobs = [];
$ws.statusListener({
status: function(data) {
$scope.jobs = data.jobs;
$scope.$apply();
},
});
})
.controller('JobController', function($scope, $routeParams, $ws) {
$scope.name = $routeParams.name;
$scope.jobsQueued = [];
$scope.jobsRunning = [];
$scope.jobsRecent = [];
$ws.statusListener({
status: function(data) {
$scope.jobsQueued = data.queued.filter(function(e){return e.name == $routeParams.name;});
$scope.jobsRunning = data.running.filter(function(e){return e.name == $routeParams.name;});
$scope.jobsRecent = data.recent.filter(function(e){return e.name == $routeParams.name;});
$scope.$apply();
},
job_queued: function(data) {
if(data.name == $routeParams.name) {
$scope.jobsQueued.splice(0,0,data);
$scope.$apply();
}
},
job_started: function(data) {
if(data.name == $routeParams.name) {
$scope.jobsQueued.splice($scope.jobsQueued.length - 1,1);
$scope.jobsRunning.splice(0,0,data);
$scope.$apply();
}
},
job_completed: function(data) {
for(var i = 0; i < $scope.jobsRunning.length; ++i) {
var job = $scope.jobsRunning[i];
if(job.name == data.name && job.number == data.number) {
$scope.jobsRunning.splice(i,1);
$scope.jobsRecent.splice(0,0,data);
$scope.$apply();
break;
}
}
}
});
$scope.runIcon = Laminar.runIcon;
})
.controller('RunController', function($scope, $routeParams, $ws) {
$scope.name = $routeParams.name;
$scope.num = $routeParams.num;
$ws.statusListener({
status: function(data) {
$scope.job = Laminar.jobFormatter(data);
$scope.$apply();
},
job_completed: function(data) {
$scope.job = Laminar.jobFormatter(data);
$scope.$apply();
}
});
$scope.runIcon = Laminar.runIcon;
})
.controller('LogController', function($scope, $routeParams, $ws) {
$scope.name = $routeParams.name;
$scope.num = $routeParams.num;
$scope._log = ""
$ws.logListener(function(data) {
$scope._log += ansi_up.ansi_to_html(data);
$scope.$apply();
window.scrollTo(0, document.body.scrollHeight);
});
$scope.log = function() {
// TODO sanitize
return ansi_up.ansi_to_html($scope._log);
}
})
.run(function() {});

View File

@ -0,0 +1,16 @@
<div class="container-fluid">
<div class="row">
<div class="col-xs-12">
<h3>Browse jobs</h3>
<div class="form-inline form-group">
<label for="jobFilter">Filter</label>
<input id="jobFilter" ng-model="search.name">
</div>
<table class="table table-bordered">
<tr class="animate-repeat" ng-repeat="job in jobs | filter:search:strict">
<td><a href="jobs/{{job.name}}">{{job.name}}</a></td>
</tr>
</table>
</div>
</div>
</div>

View File

@ -0,0 +1,58 @@
<div class="container-fluid">
<div class="row">
<div class="col-sm-5 col-md-4 col-lg-3 dash">
<h3>Recent Builds</h3>
<table class="table table-bordered">
<tr class="animate-repeat" ng-repeat="job in jobsQueued track by $index">
<td><a href="jobs/{{job.name}}">{{job.name}}</a> <i>queued</i></td>
</tr>
<tr class="animate-repeat" ng-repeat="job in jobsRunning track by $index">
<td><a href="jobs/{{job.name}}">{{job.name}}</a> <a href="jobs/{{job.name}}/{{job.number}}">#{{job.number}}</a> <div class="progress">
<div class="progress-bar progress-bar-{{job.overtime?'warning':'info'}} progress-bar-striped {{job.etc?'':'active'}}" style="width:{{!job.etc?'100':job.progress}}%"></div>
</div>
</td>
</tr>
<tr class="animate-repeat" ng-repeat="job in jobsRecent track by $index">
<td><span ng-bind-html="runIcon(job.result)"></span> <a href="jobs/{{job.name}}">{{job.name}}</a> <a href="jobs/{{job.name}}/{{job.number}}">#{{job.number}}</a><br><small>Took {{job.duration}} at {{job.when}}</small></td>
</tr>
</table>
</div>
<div class="col-sm-7 col-md-8 col-lg-9">
<h3>Dashboard</h3>
<div class="row">
<div class="col-md-6">
<div class="panel panel-default">
<div class="panel-heading">Builds per day</div>
<div class="panel-body">
<canvas id="chartBpd"></canvas>
</div>
</div>
</div>
<div class="col-md-6">
<div class="panel panel-default">
<div class="panel-heading">Builds per job in the last 24 hours</div>
<div class="panel-body" id="chartStatus">
<canvas id="chartBpj"></canvas>
</div>
</div>
</div>
<div class="col-md-6">
<div class="panel panel-default">
<div class="panel-heading">Current executor utilization</div>
<div class="panel-body">
<canvas id="chartUtil"></canvas>
</div>
</div>
</div>
<div class="col-md-6">
<div class="panel panel-default">
<div class="panel-heading">what to put here?</div>
<div class="panel-body">
</div>
</div>
</div>
</div>
</div>
</div>
</div>

View File

@ -0,0 +1,22 @@
<div class="container-fluid">
<div class="row">
<div class="col-xs-12">
<h3>{{name}}</h3>
<table class="table table-bordered">
<tr class="animate-repeat" ng-repeat="job in jobsQueued track by $index">
<td><i>queued</i></td>
</tr>
<tr class="animate-repeat" ng-repeat="job in jobsRunning track by $index">
<td><a href="jobs/{{job.name}}/{{job.number}}">#{{job.number}}</a> progressbar?</td>
</tr>
<tr class="animate-repeat" ng-repeat="job in jobsRecent track by $index">
<td><span ng-bind-html="runIcon(job.result)"></span> <a href="jobs/{{job.name}}/{{job.number}}">#{{job.number}}</a></td>
</tr>
</table>
</div>
<div class="col-sm-7 col-md-8 col-lg-9">
</div>
</div>
</div>

View File

@ -0,0 +1,8 @@
<div class="container-fluid">
<div class="row">
<div class="col-xs-12">
<h3>Log output for {{name}} #{{num}}</h3>
<pre ng-bind-html="log()"></pre>
</div>
</div>
</div>

View File

@ -0,0 +1,13 @@
<div class="container-fluid">
<div class="row">
<div class="col-xs-12">
<dl class="dl-horizontal">
<dt style="vertical-align:bottom;"></dt><dd><h3><span ng-bind-html="runIcon(job.result)"></span> {{name}} #{{num}}</h3></dd>
<dt><a class="btn btn-default" href="jobs/{{name}}">&lt; Job</a></dt><dd><a class="btn btn-default" href="jobs/{{name}}/{{num}}/log">Log output</a></dd>
<dt></dt><dd>&nbsp;</dd>
<dt>Reason</dt><dd>{{job.reason}}</dd>
<dt>Started</dt><dd>{{job.when}}</dd>
</dl>
</div>
</div>
</div>

116
src/run.cpp Normal file
View File

@ -0,0 +1,116 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminaris distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "run.h"
#include <iostream>
#include <kj/debug.h>
#include <unistd.h>
#include <boost/filesystem.hpp>
namespace fs = boost::filesystem;
std::string to_string(const RunState& rs) {
switch(rs) {
case RunState::PENDING: return "pending";
case RunState::ABORTED: return "aborted";
case RunState::FAILED: return "failed";
case RunState::SUCCESS: return "success";
case RunState::UNKNOWN:
default:
return "unknown";
}
}
Run::Run() {
result = RunState::SUCCESS;
}
Run::~Run() {
KJ_DBG("Run destroyed");
//delete log;
}
std::string Run::reason() const {
if(!parentName.empty()) {
return std::string("Triggered by upstream ") + parentName + " #" + std::to_string(parentBuild);
}
return reasonMsg;
}
bool Run::step() {
if(!currentScript.empty() && procStatus != 0)
result = RunState::FAILED;
if(scripts.size()) {
currentScript = scripts.front();
scripts.pop();
int pfd[2];
pipe(pfd);
pid_t pid = fork();
if(pid == 0) {
close(pfd[0]);
dup2(pfd[1], 1);
dup2(pfd[1], 2);
close(pfd[1]);
std::string buildNum = std::to_string(build);
std::string PATH = (fs::path(laminarHome)/"cfg"/"scripts").string() + ":";
if(const char* p = getenv("PATH")) {
PATH.append(p);
}
chdir(wd.c_str());
setenv("PATH", PATH.c_str(), true);
setenv("lBuildNum", buildNum.c_str(), true);
setenv("lJobName", name.c_str(), true);
setenv("lWorkspace", (fs::path(laminarHome)/"run"/name/"workspace").string().c_str(), true);
for(auto& pair : params) {
setenv(pair.first.c_str(), pair.second.c_str(), false);
}
printf("[laminar] Executing %s\n", currentScript.c_str());
execl(currentScript.c_str(), currentScript.c_str(), NULL);
KJ_LOG(FATAL, "execl returned", strerror(errno));
_exit(1);
}
KJ_LOG(INFO, "Forked", currentScript, pid);
close(pfd[1]);
fd = pfd[0];
this->pid = pid;
return false;
} else {
return true;
}
}
void Run::addScript(std::string script) {
scripts.push(script);
}
void Run::reaped(int status) {
procStatus = status;
}
void Run::complete() {
notifyCompletion(this);
}

134
src/run.h Normal file
View File

@ -0,0 +1,134 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminaris distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef _LAMINAR_RUN_H_
#define _LAMINAR_RUN_H_
#include <string>
#include <queue>
#include <functional>
#include <ostream>
#include <unordered_map>
enum class RunState {
UNKNOWN,
PENDING,
ABORTED,
FAILED,
SUCCESS
};
std::string to_string(const RunState& rs);
class Node;
// Represents an execution of a job. Not much more than POD
class Run {
public:
Run();
~Run();
// copying this class would be asking for trouble...
Run(const Run&) = delete;
Run& operator=(const Run&) = delete;
// executes the next script (if any), returning true if there is nothing
// more to be done - in this case the caller should call complete()
bool step();
// call this when all scripts are done to get the notifyCompletion callback
void complete();
// adds a script to the queue of scripts to be executed by this run
void addScript(std::string script);
// called when a process owned by this run has been reaped. The status
// may be used to set the run's job status
void reaped(int status);
std::string reason() const;
std::function<void(const Run*)> notifyCompletion;
Node* node;
RunState result;
std::string laminarHome;
std::string name;
std::string wd;
std::string parentName;
int parentBuild = 0;
std::string reasonMsg;
int build = 0;
std::string log;
pid_t pid;
int fd;
int procStatus;
std::unordered_map<std::string, std::string> params;
time_t queuedAt;
time_t startedAt;
private:
std::queue<std::string> scripts;
std::string currentScript;
};
// All this below is a somewhat overengineered method of keeping track of
// currently executing builds (Run objects). This would probably scale
// very well, but it's completely gratuitous since we are not likely to
// be executing thousands of builds at the same time
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/composite_key.hpp>
#include <boost/multi_index/global_fun.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index/ordered_index.hpp>
namespace bmi = boost::multi_index;
struct _same {
typedef const Run* result_type;
const Run* operator()(const std::shared_ptr<Run>& run) const {
return run.get();
}
};
struct RunSet: public boost::multi_index_container<
std::shared_ptr<Run>,
// A single Run can be fetched by...
bmi::indexed_by<
// their current running pid
bmi::hashed_unique<bmi::member<Run, pid_t, &Run::pid>>,
bmi::hashed_unique<bmi::composite_key<
std::shared_ptr<Run>,
// a combination of their job name and build number
bmi::member<Run, std::string, &Run::name>,
bmi::member<Run, int, &Run::build>
>>,
// or a pointer to a Run object.
bmi::hashed_unique<_same>,
// A group of Runs can be fetched by the time they started
bmi::ordered_non_unique<bmi::member<Run, time_t, &Run::startedAt>>,
// or by their job name
bmi::ordered_non_unique<bmi::member<Run, std::string, &Run::name>>
>
> {
// TODO: getters for each index
};
#endif // _LAMINAR_RUN_H_

414
src/server.cpp Normal file
View File

@ -0,0 +1,414 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminar is distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#include "server.h"
#include "interface.h"
#include "laminar.capnp.h"
#include "resources.h"
#include <capnp/ez-rpc.h>
#include <capnp/rpc-twoparty.h>
#include <capnp/rpc.capnp.h>
#include <kj/async-io.h>
#include <kj/debug.h>
#include <kj/threadlocal.h>
#include <websocketpp/config/core.hpp>
#include <websocketpp/server.hpp>
#include <sys/eventfd.h>
// Configuration struct for the websocketpp template library.
struct wsconfig : public websocketpp::config::core {
// static const websocketpp::log::level elog_level =
// websocketpp::log::elevel::info;
// static const websocketpp::log::level alog_level =
// websocketpp::log::alevel::access_core |
// websocketpp::log::alevel::message_payload ;
static const websocketpp::log::level elog_level =
websocketpp::log::elevel::none;
static const websocketpp::log::level alog_level =
websocketpp::log::alevel::none;
typedef struct { LaminarClient* lc; } connection_base;
};
typedef websocketpp::server<wsconfig> websocket;
namespace {
// Used for returning run state to RPC clients
LaminarCi::JobResult fromRunState(RunState state) {
switch(state) {
case RunState::SUCCESS: return LaminarCi::JobResult::SUCCESS;
default:
KJ_DBG("TODO log state", to_string(state));
return LaminarCi::JobResult::UNKNOWN;
}
}
}
// This is the implementation of the Laminar Cap'n Proto RPC interface.
// As such, it implements the pure virtual interface generated from
// laminar.capnp with calls to the LaminarInterface
class RpcImpl : public LaminarCi::Server {
public:
RpcImpl(LaminarInterface& l) :
LaminarCi::Server(),
laminar(l)
{
}
// Start a job, without waiting for it to finish
kj::Promise<void> trigger(TriggerContext context) override {
std::string jobName = context.getParams().getJobName();
KJ_LOG(INFO, "RPC trigger", jobName);
ParamMap params;
for(auto p : context.getParams().getParams()) {
params[p.getName().cStr()] = p.getValue().cStr();
}
LaminarCi::MethodResult result = laminar.queueJob(jobName, params)
? LaminarCi::MethodResult::SUCCESS
: LaminarCi::MethodResult::FAILED;
context.getResults().setResult(result);
return kj::READY_NOW;
}
// Start a job and wait for the result
kj::Promise<void> start(StartContext context) override {
std::string jobName = context.getParams().getJobName();
KJ_LOG(INFO, "RPC start", jobName);
ParamMap params;
for(auto p : context.getParams().getParams()) {
params[p.getName().cStr()] = p.getValue().cStr();
}
std::shared_ptr<Run> run = laminar.queueJob(jobName, params);
if(run.get()) {
return laminar.waitForRun(run.get()).then([context](RunState state) mutable {
context.getResults().setResult(fromRunState(state));
});
} else {
context.getResults().setResult(LaminarCi::JobResult::UNKNOWN);
return kj::READY_NOW;
}
}
// Wait for an already-running job to complete, returning the result
kj::Promise<void> pend(PendContext context) override {
std::string jobName = context.getParams().getJobName();
int buildNum = context.getParams().getBuildNum();
KJ_LOG(INFO, "RPC pend", jobName, buildNum);
kj::Promise<RunState> promise = laminar.waitForRun(jobName, buildNum);
return promise.then([context](RunState state) mutable {
context.getResults().setResult(fromRunState(state));
});
}
// Set a parameter on a running build
kj::Promise<void> set(SetContext context) override {
std::string jobName = context.getParams().getJobName();
int buildNum = context.getParams().getBuildNum();
KJ_LOG(INFO, "RPC set", jobName, buildNum);
LaminarCi::MethodResult result = laminar.setParam(jobName, buildNum,
context.getParams().getParam().getName(), context.getParams().getParam().getValue())
? LaminarCi::MethodResult::SUCCESS
: LaminarCi::MethodResult::FAILED;
context.getResults().setResult(result);
return kj::READY_NOW;
}
private:
LaminarInterface& laminar;
kj::LowLevelAsyncIoProvider* asyncio;
};
// This is the implementation of the HTTP/Websocket interface. It exposes
// websocket connections as LaminarClients and registers them with the
// LaminarInterface so that status messages will be delivered to the client.
// On opening a websocket connection, it delivers a status snapshot message
// (see LaminarInterface::sendStatus)
class Server::HttpImpl {
public:
HttpImpl(LaminarInterface& l) :
laminar(l)
{
// debug logging
// wss.set_access_channels(websocketpp::log::alevel::all);
// wss.set_error_channels(websocketpp::log::elevel::all);
// TODO: This could be used in the future to trigger actions on the
// server in response to a web client request. Currently not supported.
// wss.set_message_handler([](std::weak_ptr<void> s, websocket::message_ptr msg){
// msg->get_payload();
// });
// Handle plain HTTP requests by delivering the binary resource
wss.set_http_handler([this](websocketpp::connection_hdl hdl){
websocket::connection_ptr c = wss.get_con_from_hdl(hdl);
const char* start, *end;
if(resources.handleRequest(c->get_resource(), &start, &end)) {
c->set_status(websocketpp::http::status_code::ok);
c->append_header("Content-Encoding", "gzip");
c->set_body(std::string(start, end));
} else {
// 404
c->set_status(websocketpp::http::status_code::not_found);
}
});
// Handle new websocket connection. Parse the URL to determine
// the client's scope of interest, register the client for update
// messages, and call sendStatus.
wss.set_open_handler([this](websocketpp::connection_hdl hdl){
websocket::connection_ptr c = wss.get_con_from_hdl(hdl);
std::string res = c->get_resource();
if(res.substr(0, 5) == "/jobs") {
if(res.length() == 5) {
c->lc->scope.type = MonitorScope::ALL;
} else {
res = res.substr(5);
int split = res.find('/',1);
std::string job = res.substr(1,split-1);
if(!job.empty()) {
c->lc->scope.job = job;
c->lc->scope.type = MonitorScope::JOB;
}
if(split != std::string::npos) {
int split2 = res.find('/', split+1);
std::string run = res.substr(split+1, split2-split);
if(!run.empty()) {
c->lc->scope.num = atoi(run.c_str());
c->lc->scope.type = MonitorScope::RUN;
}
if(split2 != std::string::npos && res.compare(split2, 4, "/log") == 0) {
c->lc->scope.type = MonitorScope::LOG;
}
}
}
}
laminar.registerClient(c->lc);
laminar.sendStatus(c->lc);
});
wss.set_close_handler([this](websocketpp::connection_hdl hdl){
laminar.deregisterClient(wss.get_con_from_hdl(hdl)->lc);
});
}
// Return a new connection object linked with the context defined below.
// This is a bit untidy, it would be better to make them a single object,
// but I didn't yet figure it out
websocket::connection_ptr newConnection(LaminarClient* lc) {
websocket::connection_ptr c = wss.get_connection();
c->lc = lc;
return c;
}
private:
Resources resources;
LaminarInterface& laminar;
websocket wss;
};
// Context for an RPC connection
struct RpcConnection {
RpcConnection(kj::Own<kj::AsyncIoStream>&& stream,
capnp::Capability::Client bootstrap,
capnp::ReaderOptions readerOpts) :
stream(kj::mv(stream)),
network(*this->stream, capnp::rpc::twoparty::Side::SERVER, readerOpts),
rpcSystem(capnp::makeRpcServer(network, bootstrap))
{
}
kj::Own<kj::AsyncIoStream> stream;
capnp::TwoPartyVatNetwork network;
capnp::RpcSystem<capnp::rpc::twoparty::VatId> rpcSystem;
};
// Context for a WebsocketConnection (implements LaminarClient)
// This object is a streambuf and reimplements xsputn so that it can follow any
// write the websocketpp library makes to it with a write to the appropriate
// descriptor in the kj-async context.
struct Server::WebsocketConnection : public LaminarClient, public std::streambuf {
WebsocketConnection(kj::Own<kj::AsyncIoStream>&& stream, Server::HttpImpl& http) :
stream(kj::mv(stream)),
out(this),
cn(http.newConnection(this)),
writePaf(kj::newPromiseAndFulfiller<void>())
{
cn->register_ostream(&out);
cn->start();
}
~WebsocketConnection() noexcept(true) {
outputBuffer.clear();
writePaf.fulfiller->fulfill();
}
kj::Promise<void> pend() {
return stream->tryRead(ibuf, 1, 1024).then([this](size_t sz){
cn->read_all(ibuf, sz);
if(sz == 0 || cn->get_state() == websocketpp::session::state::closed) {
cn->eof();
return kj::Promise<void>(kj::READY_NOW);
}
return pend();
});
}
kj::Promise<void> writeTask() {
return writePaf.promise.then([this]() {
std::string payload;
// clear the outputBuffer for more context, and take a chunk
// to send now
payload.swap(outputBuffer);
writePaf = kj::newPromiseAndFulfiller<void>();
if(payload.empty()) {
return kj::Promise<void>(kj::READY_NOW);
} else {
return stream->write(payload.data(), payload.length()).then([this]{
return writeTask();
});
}
});
}
void sendMessage(std::string payload) override {
cn->send(payload, websocketpp::frame::opcode::text);
}
std::streamsize xsputn(const char* s, std::streamsize sz) override {
outputBuffer.append(std::string(s, sz));
writePaf.fulfiller->fulfill();
return sz;
}
kj::Own<kj::AsyncIoStream> stream;
std::ostream out;
websocket::connection_ptr cn;
std::string outputBuffer;
kj::PromiseFulfillerPair<void> writePaf;
// TODO: think about this size
char ibuf[1024];
};
Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
kj::StringPtr httpBindAddress) :
rpcInterface(kj::heap<RpcImpl>(li)),
httpInterface(new HttpImpl(li)),
ioContext(kj::setupAsyncIo()),
tasks(*this)
{
{ // RPC task
auto paf = kj::newPromiseAndFulfiller<uint>();
tasks.add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress, 0)
.then(kj::mvCapture(paf.fulfiller,
[this](kj::Own<kj::PromiseFulfiller<uint>>&& portFulfiller,
kj::Own<kj::NetworkAddress>&& addr) {
auto listener = addr->listen();
portFulfiller->fulfill(listener->getPort());
acceptRpcClient(kj::mv(listener));
})));
}
{ // HTTP task
auto paf = kj::newPromiseAndFulfiller<uint>();
tasks.add(ioContext.provider->getNetwork().parseAddress(httpBindAddress, 0)
.then(kj::mvCapture(paf.fulfiller,
[this](kj::Own<kj::PromiseFulfiller<uint>>&& portFulfiller,
kj::Own<kj::NetworkAddress>&& addr) {
auto listener = addr->listen();
portFulfiller->fulfill(listener->getPort());
acceptHttpClient(kj::mv(listener));
})));
}
}
Server::~Server() {
// RpcImpl is deleted through Capability::Client.
// Deal with the HTTP interface the old-fashioned way
delete httpInterface;
}
void Server::start() {
// this eventfd is just to allow us to quit the server at some point
// in the future by adding this event to the async loop. I couldn't see
// a simpler way...
efd = eventfd(0,0);
kj::Promise<void> quit = kj::evalLater([this](){
static uint64_t _;
auto wakeEvent = ioContext.lowLevelProvider->wrapInputFd(efd);
return wakeEvent->read(&_, sizeof(uint64_t)).attach(std::move(wakeEvent));
});
quit.wait(ioContext.waitScope);
}
void Server::stop() {
eventfd_write(efd, 1);
}
void Server::addProcess(int fd, std::function<void(char*,size_t)> readCb, std::function<void()> cb) {
auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd);
tasks.add(handleProcessOutput(event,readCb).attach(std::move(event)).then(std::move(cb)));
}
void Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
auto ptr = listener.get();
tasks.add(ptr->accept().then(kj::mvCapture(kj::mv(listener),
[this](kj::Own<kj::ConnectionReceiver>&& listener,
kj::Own<kj::AsyncIoStream>&& connection) {
acceptHttpClient(kj::mv(listener));
auto conn = kj::heap<WebsocketConnection>(kj::mv(connection), *httpInterface);
return conn->pend().exclusiveJoin(conn->writeTask()).attach(std::move(conn));
}))
);
}
void Server::acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener) {
auto ptr = listener.get();
tasks.add(ptr->accept().then(kj::mvCapture(kj::mv(listener),
[this](kj::Own<kj::ConnectionReceiver>&& listener,
kj::Own<kj::AsyncIoStream>&& connection) {
acceptRpcClient(kj::mv(listener));
auto server = kj::heap<RpcConnection>(kj::mv(connection), rpcInterface, capnp::ReaderOptions());
tasks.add(server->network.onDisconnect().attach(kj::mv(server)));
}))
);
}
// handles stdout/stderr from a child process by sending it to the provided
// callback function
kj::Promise<void> Server::handleProcessOutput(kj::AsyncInputStream* stream, std::function<void(char*,size_t)> readCb) {
// TODO think about this size
static char* buffer = new char[1024];
return stream->tryRead(buffer, 1, 1024).then([this,stream,readCb](size_t sz) {
readCb(buffer, sz);
if(sz > 0) {
return handleProcessOutput(stream, readCb);
}
return kj::Promise<void>(kj::READY_NOW);
});
}

62
src/server.h Normal file
View File

@ -0,0 +1,62 @@
///
/// Copyright 2015 Oliver Giles
///
/// This file is part of Laminar
///
/// Laminar is free software: you can redistribute it and/or modify
/// it under the terms of the GNU General Public License as published by
/// the Free Software Foundation, either version 3 of the License, or
/// (at your option) any later version.
///
/// Laminaris distributed in the hope that it will be useful,
/// but WITHOUT ANY WARRANTY; without even the implied warranty of
/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
/// GNU General Public License for more details.
///
/// You should have received a copy of the GNU General Public License
/// along with Laminar. If not, see <http://www.gnu.org/licenses/>
///
#ifndef _LAMINAR_SERVER_H_
#define _LAMINAR_SERVER_H_
#include <kj/async-io.h>
#include <capnp/message.h>
#include <capnp/capability.h>
#include <functional>
struct LaminarInterface;
// This class abstracts the HTTP/Websockets and Cap'n Proto RPC interfaces.
// It also manages the program's asynchronous event loop
class Server final : public kj::TaskSet::ErrorHandler {
public:
// Initializes the server with a LaminarInterface to handle requests from
// HTTP/Websocket or RPC clients and bind addresses for each of those
// interfaces. See the documentation for kj::AsyncIoProvider::getNetwork
// for a description of the address format
Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, kj::StringPtr httpBindAddress);
~Server();
void start();
void stop();
void addProcess(int fd, std::function<void(char*,size_t)> readCb, std::function<void()> cb);
private:
void acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
void acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
kj::Promise<void> handleProcessOutput(kj::AsyncInputStream* stream, std::function<void(char*,size_t)> readCb);
void taskFailed(kj::Exception&& exception) override {
kj::throwFatalException(kj::mv(exception));
}
private:
int efd;
capnp::Capability::Client rpcInterface;
struct WebsocketConnection;
struct HttpImpl;
HttpImpl* httpInterface;
kj::AsyncIoContext ioContext;
kj::TaskSet tasks;
};
#endif // _LAMINAR_SERVER_H_