@ -29,9 +29,6 @@
# include <fstream>
# include <fstream>
# include <zlib.h>
# include <zlib.h>
# include <boost/filesystem.hpp>
namespace fs = boost : : filesystem ;
# define COMPRESS_LOG_MIN_SIZE 1024
# define COMPRESS_LOG_MIN_SIZE 1024
# include <rapidjson/stringbuffer.h>
# include <rapidjson/stringbuffer.h>
@ -62,23 +59,30 @@ constexpr const char* INTADDR_HTTP_DEFAULT = "*:8080";
constexpr const char * ARCHIVE_URL_DEFAULT = " /archive " ;
constexpr const char * ARCHIVE_URL_DEFAULT = " /archive " ;
}
}
// helper for appending to boost::filesystem::path
// short syntax helpers for kj::Path
fs : : path operator + ( fs : : path p , const char * ext ) {
template < typename T >
std : : string leaf = p . leaf ( ) . string ( ) ;
inline kj : : Path operator / ( const kj : : Path & p , const T & ext ) {
leaf + = ext ;
return p . append ( ext ) ;
return p . remove_leaf ( ) / leaf ;
}
template < typename T >
inline kj : : Path operator / ( const std : : string & p , const T & ext ) {
return kj : : Path { p } / ext ;
}
}
typedef std : : string str ;
typedef std : : string str ;
Laminar : : Laminar ( ) {
Laminar : : Laminar ( const char * home ) :
homePath ( kj : : Path : : parse ( & home [ 1 ] ) ) ,
fsHome ( kj : : newDiskFilesystem ( ) - > getRoot ( ) . openSubdir ( homePath , kj : : WriteMode : : MODIFY ) )
{
KJ_ASSERT ( home [ 0 ] = = ' / ' ) ;
archiveUrl = ARCHIVE_URL_DEFAULT ;
archiveUrl = ARCHIVE_URL_DEFAULT ;
if ( char * envArchive = getenv ( " LAMINAR_ARCHIVE_URL " ) )
if ( char * envArchive = getenv ( " LAMINAR_ARCHIVE_URL " ) )
archiveUrl = envArchive ;
archiveUrl = envArchive ;
numKeepRunDirs = 0 ;
numKeepRunDirs = 0 ;
homeDir = getenv ( " LAMINAR_HOME " ) ? : " /var/lib/laminar " ;
db = new Database ( ( fs: : path ( homeDir ) / " laminar.sqlite " ) . string ( ) . c_s tr( ) ) ;
db = new Database ( ( homePath/ " laminar.sqlite " ) . toString ( true ) . cS tr( ) ) ;
// Prepare database for first use
// Prepare database for first use
// TODO: error handling
// TODO: error handling
db - > exec ( " CREATE TABLE IF NOT EXISTS builds( "
db - > exec ( " CREATE TABLE IF NOT EXISTS builds( "
@ -128,17 +132,16 @@ bool Laminar::setParam(std::string job, uint buildNum, std::string param, std::s
void Laminar : : populateArtifacts ( Json & j , std : : string job , uint num ) const {
void Laminar : : populateArtifacts ( Json & j , std : : string job , uint num ) const {
fs : : path dir ( fs : : path ( homeDir ) / " archive " / job / std : : to_string ( num ) ) ;
kj : : Path runArchive { job , std : : to_string ( num ) } ;
if ( fs : : is_directory ( dir ) ) {
KJ_IF_MAYBE ( dir , fsHome - > tryOpenSubdir ( " archive " / runArchive ) ) {
size_t prefixLen = ( fs : : path ( homeDir ) / " archive " ) . string ( ) . length ( ) ;
for ( kj : : StringPtr file : ( * dir ) - > listNames ( ) ) {
size_t scopeLen = dir . string ( ) . length ( ) ;
kj : : FsNode : : Metadata meta = ( * dir ) - > lstat ( kj : : Path { file } ) ;
for ( fs : : recursive_directory_iterator it ( dir ) ; it ! = fs : : recursive_directory_iterator ( ) ; + + it ) {
if ( meta . type ! = kj : : FsNode : : Type : : FILE )
if ( ! fs : : is_regular_file ( * it ) )
continue ;
continue ;
j . StartObject ( ) ;
j . StartObject ( ) ;
j . set ( " url " , archiveUrl + it - > path ( ) . string ( ) . substr ( prefixLen ) ) ;
j . set ( " url " , archiveUrl + ( runArchive / file ) . toString ( ) . cStr ( ) ) ;
j . set ( " filename " , it- > path ( ) . string ( ) . substr ( scopeLen + 1 ) ) ;
j . set ( " filename " , file. cStr ( ) ) ;
j . set ( " size " , fs: : file_size ( it - > path ( ) ) ) ;
j . set ( " size " , meta. size ) ;
j . EndObject ( ) ;
j . EndObject ( ) ;
}
}
}
}
@ -440,9 +443,12 @@ void Laminar::sendStatus(LaminarClient* client) {
client - > sendMessage ( j . str ( ) ) ;
client - > sendMessage ( j . str ( ) ) ;
}
}
Laminar : : ~ Laminar ( ) {
Laminar : : ~ Laminar ( ) noexcept try {
delete db ;
delete db ;
delete srv ;
delete srv ;
} catch ( std : : exception & e ) {
LLOG ( ERROR , e . what ( ) ) ;
return ;
}
}
void Laminar : : run ( ) {
void Laminar : : run ( ) {
@ -450,8 +456,8 @@ void Laminar::run() {
const char * listen_http = getenv ( " LAMINAR_BIND_HTTP " ) ? : INTADDR_HTTP_DEFAULT ;
const char * listen_http = getenv ( " LAMINAR_BIND_HTTP " ) ? : INTADDR_HTTP_DEFAULT ;
srv = new Server ( * this , listen_rpc , listen_http ) ;
srv = new Server ( * this , listen_rpc , listen_http ) ;
srv - > addWatchPath ( fs : : path ( fs : : path ( homeDir ) / " cfg " / " nodes " ) . string( ) . c_s tr( ) ) ;
srv - > addWatchPath ( ( homePath / " cfg " / " nodes " ) . toString( ) . cS tr( ) ) ;
srv - > addWatchPath ( fs : : path ( fs : : path ( homeDir ) / " cfg " / " jobs " ) . string( ) . c_s tr( ) ) ;
srv - > addWatchPath ( ( homePath / " cfg " / " jobs " ) . toString( ) . cS tr( ) ) ;
srv - > start ( ) ;
srv - > start ( ) ;
}
}
@ -465,16 +471,14 @@ bool Laminar::loadConfiguration() {
std : : set < std : : string > knownNodes ;
std : : set < std : : string > knownNodes ;
fs : : path nodeCfg = fs : : path ( homeDir ) / " cfg " / " nodes " ;
KJ_IF_MAYBE ( nodeDir , fsHome - > tryOpenSubdir ( kj : : Path { " cfg " , " nodes " } ) ) {
for ( kj : : Directory : : Entry & entry : ( * nodeDir ) - > listEntries ( ) ) {
if ( fs : : is_directory ( nodeCfg ) ) {
if ( entry . type ! = kj : : FsNode : : Type : : FILE | | ! entry . name . endsWith ( " .conf " ) )
for ( fs : : directory_iterator it ( nodeCfg ) ; it ! = fs : : directory_iterator ( ) ; + + it ) {
if ( ! fs : : is_regular_file ( it - > status ( ) ) | | it - > path ( ) . extension ( ) ! = " .conf " )
continue ;
continue ;
StringMap conf = parseConfFile ( it - > path ( ) . string ( ) . c_s tr( ) ) ;
StringMap conf = parseConfFile ( ( homePath / entry . name ) . toString ( ) . cS tr( ) ) ;
std : : string nodeName = it - > path ( ) . stem ( ) . string ( ) ;
std : : string nodeName ( entry . name . cStr ( ) , entry . name . findLast ( ' . ' ) . orDefault ( 0 ) - 1 ) ;
auto existingNode = nodes . find ( nodeName ) ;
auto existingNode = nodes . find ( nodeName ) ;
std : : shared_ptr < Node > node = existingNode = = nodes . end ( ) ? nodes . emplace ( nodeName , std : : shared_ptr < Node > ( new Node ) ) . first - > second : existingNode - > second ;
std : : shared_ptr < Node > node = existingNode = = nodes . end ( ) ? nodes . emplace ( nodeName , std : : shared_ptr < Node > ( new Node ) ) . first - > second : existingNode - > second ;
node - > name = nodeName ;
node - > name = nodeName ;
@ -511,13 +515,13 @@ bool Laminar::loadConfiguration() {
nodes . emplace ( " " , node ) ;
nodes . emplace ( " " , node ) ;
}
}
fs : : path jobsDir = fs : : path ( homeDir ) / " cfg " / " jobs " ;
KJ_IF_MAYBE ( jobsDir , fsHome - > tryOpenSubdir ( kj : : Path { " cfg " , " jobs " } ) ) {
if ( fs : : is_directory ( jobsDir ) ) {
for ( kj : : Directory : : Entry & entry : ( * jobsDir ) - > listEntries ( ) ) {
for ( fs : : directory_iterator it ( jobsDir ) ; it ! = fs : : directory_iterator ( ) ; + + it ) {
if ( entry . type ! = kj : : FsNode : : Type : : FILE | | ! entry . name . endsWith ( " .conf " ) )
if ( ! fs : : is_regular_file ( it - > status ( ) ) | | it - > path ( ) . extension ( ) ! = " .conf " )
continue ;
continue ;
StringMap conf = parseConfFile ( ( homePath / entry . name ) . toString ( ) . cStr ( ) ) ;
StringMap conf = parseConfFile ( it - > path ( ) . string ( ) . c_str ( ) ) ;
std: : string jobName ( entry . name . cStr ( ) , entry . name . findLast ( ' . ' ) . orDefault ( 0 ) - 1 ) ;
std : : string tags = conf . get < std : : string > ( " TAGS " ) ;
std : : string tags = conf . get < std : : string > ( " TAGS " ) ;
if ( ! tags . empty ( ) ) {
if ( ! tags . empty ( ) ) {
@ -526,7 +530,7 @@ bool Laminar::loadConfiguration() {
std : : string tag ;
std : : string tag ;
while ( std : : getline ( iss , tag , ' , ' ) )
while ( std : : getline ( iss , tag , ' , ' ) )
tagList . insert ( tag ) ;
tagList . insert ( tag ) ;
jobTags [ it- > path ( ) . stem ( ) . string ( ) ] = tagList ;
jobTags [ jobName ] = tagList ;
}
}
}
}
@ -536,30 +540,12 @@ bool Laminar::loadConfiguration() {
}
}
std : : shared_ptr < Run > Laminar : : queueJob ( std : : string name , ParamMap params ) {
std : : shared_ptr < Run > Laminar : : queueJob ( std : : string name , ParamMap params ) {
if ( ! fs : : exists ( fs : : path ( homeDir ) / " cfg " / " jobs " / name + " .run " ) ) {
if ( ! fs Home- > exists ( kj : : Path { " cfg " , " jobs " , name + " .run " } ) ) {
LLOG ( ERROR , " Non-existent job " , name ) ;
LLOG ( ERROR , " Non-existent job " , name ) ;
return nullptr ;
return nullptr ;
}
}
std : : shared_ptr < Run > run = std : : make_shared < Run > ( ) ;
std : : shared_ptr < Run > run = std : : make_shared < Run > ( name , kj : : mv ( params ) , homePath . clone ( ) ) ;
run - > name = name ;
run - > queuedAt = time ( nullptr ) ;
for ( auto it = params . begin ( ) ; it ! = params . end ( ) ; ) {
if ( it - > first [ 0 ] = = ' = ' ) {
if ( it - > first = = " =parentJob " ) {
run - > parentName = it - > second ;
} else if ( it - > first = = " =parentBuild " ) {
run - > parentBuild = atoi ( it - > second . c_str ( ) ) ;
} else if ( it - > first = = " =reason " ) {
run - > reasonMsg = it - > second ;
} else {
LLOG ( ERROR , " Unknown internal job parameter " , it - > first ) ;
}
it = params . erase ( it ) ;
} else
+ + it ;
}
run - > params = params ;
queuedJobs . push_back ( run ) ;
queuedJobs . push_back ( run ) ;
// notify clients
// notify clients
@ -591,7 +577,7 @@ void Laminar::abortAll() {
}
}
}
}
bool Laminar : : nodeCanQueue ( const Node & node , const Run & run ) const {
bool Laminar : : nodeCanQueue ( const Node & node , std : : string jobName ) const {
// if a node is too busy, it can't take the job
// if a node is too busy, it can't take the job
if ( node . busyExecutors > = node . numExecutors )
if ( node . busyExecutors > = node . numExecutors )
return false ;
return false ;
@ -600,7 +586,7 @@ bool Laminar::nodeCanQueue(const Node& node, const Run& run) const {
if ( node . tags . size ( ) = = 0 )
if ( node . tags . size ( ) = = 0 )
return true ;
return true ;
auto it = jobTags . find ( run. n ame) ;
auto it = jobTags . find ( jobN ame) ;
// if the job has no tags, it cannot be run on this node
// if the job has no tags, it cannot be run on this node
if ( it = = jobTags . end ( ) )
if ( it = = jobTags . end ( ) )
return false ;
return false ;
@ -617,109 +603,30 @@ bool Laminar::nodeCanQueue(const Node& node, const Run& run) const {
bool Laminar : : tryStartRun ( std : : shared_ptr < Run > run , int queueIndex ) {
bool Laminar : : tryStartRun ( std : : shared_ptr < Run > run , int queueIndex ) {
for ( auto & sn : nodes ) {
for ( auto & sn : nodes ) {
std : : shared_ptr < Node > node = sn . second ;
std : : shared_ptr < Node > node = sn . second ;
if ( nodeCanQueue ( * node . get ( ) , * run ) ) {
fs : : path cfgDir = fs : : path ( homeDir ) / " cfg " ;
boost : : system : : error_code err ;
// create a workspace for this job if it doesn't exist
fs : : path ws = fs : : path ( homeDir ) / " run " / run - > name / " workspace " ;
if ( ! fs : : exists ( ws ) ) {
if ( ! fs : : create_directories ( ws , err ) ) {
LLOG ( ERROR , " Could not create job workspace " , run - > name ) ;
break ;
}
// prepend the workspace init script
if ( fs : : exists ( cfgDir / " jobs " / run - > name + " .init " ) )
run - > addScript ( ( cfgDir / " jobs " / run - > name + " .init " ) . string ( ) , ws . string ( ) ) ;
}
uint buildNum = buildNums [ run - > name ] + 1 ;
// create the run directory
fs : : path rd = fs : : path ( homeDir ) / " run " / run - > name / std : : to_string ( buildNum ) ;
bool createWorkdir = true ;
if ( fs : : is_directory ( rd ) ) {
LLOG ( WARNING , " Working directory already exists, removing " , rd . string ( ) ) ;
fs : : remove_all ( rd , err ) ;
if ( err ) {
LLOG ( WARNING , " Failed to remove working directory " , err . message ( ) ) ;
createWorkdir = false ;
}
}
if ( createWorkdir & & ! fs : : create_directory ( rd , err ) ) {
LLOG ( ERROR , " Could not create working directory " , rd . string ( ) ) ;
break ;
}
run - > runDir = rd . string ( ) ;
// create an archive directory
fs : : path archive = fs : : path ( homeDir ) / " archive " / run - > name / std : : to_string ( buildNum ) ;
if ( fs : : is_directory ( archive ) ) {
LLOG ( WARNING , " Archive directory already exists " , archive . string ( ) ) ;
} else if ( ! fs : : create_directories ( archive ) ) {
LLOG ( ERROR , " Could not create archive directory " , archive . string ( ) ) ;
break ;
}
// add scripts
if ( nodeCanQueue ( * node . get ( ) , run - > name ) & & run - > configure ( buildNums [ run - > name ] + 1 , node , * fsHome ) ) {
// global before-run script
if ( fs : : exists ( cfgDir / " before " ) )
run - > addScript ( ( cfgDir / " before " ) . string ( ) ) ;
// per-node before-run script
if ( fs : : exists ( cfgDir / " nodes " / node - > name + " .before " ) )
run - > addScript ( ( cfgDir / " nodes " / node - > name + " .before " ) . string ( ) ) ;
// job before-run script
if ( fs : : exists ( cfgDir / " jobs " / run - > name + " .before " ) )
run - > addScript ( ( cfgDir / " jobs " / run - > name + " .before " ) . string ( ) ) ;
// main run script. must exist.
run - > addScript ( ( cfgDir / " jobs " / run - > name + " .run " ) . string ( ) ) ;
// job after-run script
if ( fs : : exists ( cfgDir / " jobs " / run - > name + " .after " ) )
run - > addScript ( ( cfgDir / " jobs " / run - > name + " .after " ) . string ( ) , true ) ;
// per-node after-run script
if ( fs : : exists ( cfgDir / " nodes " / node - > name + " .after " ) )
run - > addScript ( ( cfgDir / " nodes " / node - > name + " .after " ) . string ( ) , true ) ;
// global after-run script
if ( fs : : exists ( cfgDir / " after " ) )
run - > addScript ( ( cfgDir / " after " ) . string ( ) , true ) ;
// add environment files
if ( fs : : exists ( cfgDir / " env " ) )
run - > addEnv ( ( cfgDir / " env " ) . string ( ) ) ;
if ( fs : : exists ( cfgDir / " nodes " / node - > name + " .env " ) )
run - > addEnv ( ( cfgDir / " nodes " / node - > name + " .env " ) . string ( ) ) ;
if ( fs : : exists ( cfgDir / " jobs " / run - > name + " .env " ) )
run - > addEnv ( ( cfgDir / " jobs " / run - > name + " .env " ) . string ( ) ) ;
// add job timeout if specified
if ( fs : : exists ( cfgDir / " jobs " / run - > name + " .conf " ) ) {
int timeout = parseConfFile ( fs : : path ( cfgDir / " jobs " / run - > name + " .conf " ) . string ( ) . c_str ( ) ) . get < int > ( " TIMEOUT " , 0 ) ;
if ( timeout > 0 ) {
// A raw pointer to run is used here so as not to have a circular reference.
// The captured raw pointer is safe because if the Run is destroyed the Promise
// will be cancelled and the callback never called.
Run * r = run . get ( ) ;
r - > timeout = srv - > addTimeout ( timeout , [ r ] ( ) {
r - > abort ( true ) ;
} ) ;
}
}
// start the job
node - > busyExecutors + + ;
node - > busyExecutors + + ;
run - > node = node ;
run - > startedAt = time ( nullptr ) ;
run - > laminarHome = homeDir ;
run - > build = buildNum ;
// set the last known result if exists
// set the last known result if exists
db - > stmt ( " SELECT result FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1 " )
db - > stmt ( " SELECT result FROM builds WHERE name = ? ORDER BY completedAt DESC LIMIT 1 " )
. bind ( run - > name )
. bind ( run - > name )
. fetch < int > ( [ = ] ( int result ) {
. fetch < int > ( [ = ] ( int result ) {
run - > lastResult = RunState ( result ) ;
run - > lastResult = RunState ( result ) ;
} ) ;
} ) ;
// update next build number
buildNums [ run - > name ] = buildNum ;
LLOG ( INFO , " Queued job to node " , run - > name , run - > build , node - > name ) ;
// Actually schedules the Run steps
kj : : Promise < void > exec = handleRunStep ( run . get ( ) ) . then ( [ this , r = run . get ( ) ] {
runFinished ( r ) ;
} ) ;
if ( run - > timeout > 0 ) {
exec = exec . attach ( srv - > addTimeout ( run - > timeout , [ r = run . get ( ) ] ( ) {
r - > abort ( true ) ;
} ) ) ;
}
srv - > addTask ( kj : : mv ( exec ) ) ;
LLOG ( INFO , " Started job on node " , run - > name , run - > build , node - > name ) ;
// update next build number
buildNums [ run - > name ] + + ;
// notify clients
// notify clients
Json j ;
Json j ;
@ -752,14 +659,6 @@ bool Laminar::tryStartRun(std::shared_ptr<Run> run, int queueIndex) {
c - > sendMessage ( msg ) ;
c - > sendMessage ( msg ) ;
}
}
// notify the rpc client if the start command was used
run - > started . fulfiller - > fulfill ( ) ;
// this actually spawns the first step
srv - > addTask ( handleRunStep ( run . get ( ) ) . then ( [ this , run ] {
runFinished ( run . get ( ) ) ;
} ) ) ;
return true ;
return true ;
}
}
}
}
@ -878,50 +777,21 @@ void Laminar::runFinished(Run * r) {
auto it = activeJobs . byJobName ( ) . equal_range ( r - > name ) ;
auto it = activeJobs . byJobName ( ) . equal_range ( r - > name ) ;
uint oldestActive = ( it . first = = it . second ) ? buildNums [ r - > name ] : ( * it . first ) - > build - 1 ;
uint oldestActive = ( it . first = = it . second ) ? buildNums [ r - > name ] : ( * it . first ) - > build - 1 ;
for ( int i = static_cast < int > ( oldestActive - numKeepRunDirs ) ; i > 0 ; i - - ) {
for ( int i = static_cast < int > ( oldestActive - numKeepRunDirs ) ; i > 0 ; i - - ) {
fs: : path d = fs : : path ( homeDir ) / " run " / r - > name / std : : to_string ( i ) ;
kj: : Path d { " run " , r - > name , std : : to_string ( i ) } ;
// Once the directory does not exist, it's probably not worth checking
// Once the directory does not exist, it's probably not worth checking
// any further. 99% of the time this loop should only ever have 1 iteration
// any further. 99% of the time this loop should only ever have 1 iteration
// anyway so hence this (admittedly debatable) optimization.
// anyway so hence this (admittedly debatable) optimization.
if ( ! fs : : exists ( d ) )
if ( ! fs Home- > exists ( d ) )
break ;
break ;
fs : : remove_all ( d ) ;
fs Home- > remove ( d ) ;
}
}
// in case we freed up an executor, check the queue
// in case we freed up an executor, check the queue
assignNewJobs ( ) ;
assignNewJobs ( ) ;
}
}
class MappedFileImpl : public MappedFile {
kj : : Maybe < kj : : Own < const kj : : ReadableFile > > Laminar : : getArtefact ( std : : string path ) {
public :
return fsHome - > openFile ( kj : : Path ( " archive " ) . append ( kj : : Path : : parse ( path ) ) ) ;
MappedFileImpl ( const char * path ) :
fd ( open ( path , O_RDONLY ) ) ,
sz ( 0 ) ,
ptr ( nullptr )
{
if ( fd = = - 1 ) return ;
struct stat st ;
if ( fstat ( fd , & st ) ! = 0 ) return ;
sz = st . st_size ;
ptr = mmap ( nullptr , sz , PROT_READ , MAP_SHARED , fd , 0 ) ;
if ( ptr = = MAP_FAILED )
ptr = nullptr ;
}
~ MappedFileImpl ( ) override {
if ( ptr )
munmap ( ptr , sz ) ;
if ( fd ! = - 1 )
close ( fd ) ;
}
virtual const void * address ( ) override { return ptr ; }
virtual size_t size ( ) override { return sz ; }
private :
int fd ;
size_t sz ;
void * ptr ;
} ;
kj : : Own < MappedFile > Laminar : : getArtefact ( std : : string path ) {
return kj : : heap < MappedFileImpl > ( fs : : path ( fs : : path ( homeDir ) / " archive " / path ) . c_str ( ) ) ;
}
}
bool Laminar : : handleBadgeRequest ( std : : string job , std : : string & badge ) {
bool Laminar : : handleBadgeRequest ( std : : string job , std : : string & badge ) {
@ -967,9 +837,9 @@ R"x(
}
}
std : : string Laminar : : getCustomCss ( ) {
std : : string Laminar : : getCustomCss ( ) {
MappedFileImpl cssFile ( fs : : path ( fs : : path ( homeDir ) / " custom " / " style.css " ) . c_str ( ) ) ;
KJ_IF_MAYBE ( cssFile , fsHome - > tryOpenFile ( kj : : Path { " custom " , " style.css " } ) ) {
if ( cssFile . address ( ) ) {
return ( * cssFile ) - > readAllText ( ) . cStr ( ) ;
return std : : string ( static_cast < const char * > ( cssFile . address ( ) ) , cssFile . size ( ) ) ;
} else {
return std : : string ( ) ;
}
}
return std : : string ( ) ;
}
}