1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2024-10-27 20:34:20 +00:00

resolves #41: strange behaviour of limits

On a configuration change, update existing Nodes in Laminar's NodeMap
rather than replacing it with a new map.
This commit is contained in:
Oliver Giles 2018-04-20 14:18:10 +03:00
parent c937362961
commit 083f136186
4 changed files with 42 additions and 32 deletions

View File

@ -314,9 +314,9 @@ void Laminar::sendStatus(LaminarClient* client) {
int execTotal = 0; int execTotal = 0;
int execBusy = 0; int execBusy = 0;
for(const auto& it : nodes) { for(const auto& it : nodes) {
const Node& node = it.second; const std::shared_ptr<Node>& node = it.second;
execTotal += node.numExecutors; execTotal += node->numExecutors;
execBusy += node.busyExecutors; execBusy += node->busyExecutors;
} }
j.set("executorsTotal", execTotal); j.set("executorsTotal", execTotal);
j.set("executorsBusy", execBusy); j.set("executorsBusy", execBusy);
@ -375,7 +375,7 @@ bool Laminar::loadConfiguration() {
if(const char* ndirs = getenv("LAMINAR_KEEP_RUNDIRS")) if(const char* ndirs = getenv("LAMINAR_KEEP_RUNDIRS"))
numKeepRunDirs = static_cast<uint>(atoi(ndirs)); numKeepRunDirs = static_cast<uint>(atoi(ndirs));
NodeMap nm; std::set<std::string> knownNodes;
fs::path nodeCfg = fs::path(homeDir)/"cfg"/"nodes"; fs::path nodeCfg = fs::path(homeDir)/"cfg"/"nodes";
@ -386,9 +386,11 @@ bool Laminar::loadConfiguration() {
StringMap conf = parseConfFile(it->path().string().c_str()); StringMap conf = parseConfFile(it->path().string().c_str());
Node node; std::string nodeName = it->path().stem().string();
node.name = it->path().stem().string(); auto existingNode = nodes.find(nodeName);
node.numExecutors = conf.get<int>("EXECUTORS", 6); std::shared_ptr<Node> node = existingNode == nodes.end() ? nodes.emplace(nodeName, new Node).first->second : existingNode->second;
node->name = nodeName;
node->numExecutors = conf.get<int>("EXECUTORS", 6);
std::string tags = conf.get<std::string>("TAGS"); std::string tags = conf.get<std::string>("TAGS");
if(!tags.empty()) { if(!tags.empty()) {
@ -397,23 +399,30 @@ bool Laminar::loadConfiguration() {
std::string tag; std::string tag;
while(std::getline(iss, tag, ',')) while(std::getline(iss, tag, ','))
tagList.insert(tag); tagList.insert(tag);
node.tags = tagList; node->tags = tagList;
} }
nm.emplace(node.name, std::move(node)); knownNodes.insert(nodeName);
} }
} }
if(nm.empty()) { // remove any nodes whose config files disappeared.
// if there are no known nodes, take care not to remove and re-add the default node
for(auto it = nodes.begin(); it != nodes.end();) {
if((it->first == "" && knownNodes.size() == 0) || knownNodes.find(it->first) != knownNodes.end())
it++;
else
it = nodes.erase(it);
}
// add a default node // add a default node
Node node; if(nodes.empty()) {
node.name = ""; std::shared_ptr<Node> node(new Node);
node.numExecutors = 6; node->name = "";
nm.emplace("", std::move(node)); node->numExecutors = 6;
nodes.emplace("", node);
} }
nodes = nm;
fs::path jobsDir = fs::path(homeDir)/"cfg"/"jobs"; fs::path jobsDir = fs::path(homeDir)/"cfg"/"jobs";
if(fs::is_directory(jobsDir)) { if(fs::is_directory(jobsDir)) {
for(fs::directory_iterator it(jobsDir); it != fs::directory_iterator(); ++it) { for(fs::directory_iterator it(jobsDir); it != fs::directory_iterator(); ++it) {
@ -571,9 +580,9 @@ void Laminar::assignNewJobs() {
while(it != queuedJobs.end()) { while(it != queuedJobs.end()) {
bool assigned = false; bool assigned = false;
for(auto& sn : nodes) { for(auto& sn : nodes) {
Node& node = sn.second; std::shared_ptr<Node> node = sn.second;
std::shared_ptr<Run> run = *it; std::shared_ptr<Run> run = *it;
if(nodeCanQueue(node, *run)) { if(nodeCanQueue(*node.get(), *run)) {
fs::path cfgDir = fs::path(homeDir)/"cfg"; fs::path cfgDir = fs::path(homeDir)/"cfg";
boost::system::error_code err; boost::system::error_code err;
@ -621,8 +630,8 @@ void Laminar::assignNewJobs() {
if(fs::exists(cfgDir/"before")) if(fs::exists(cfgDir/"before"))
run->addScript((cfgDir/"before").string()); run->addScript((cfgDir/"before").string());
// per-node before-run script // per-node before-run script
if(fs::exists(cfgDir/"nodes"/node.name+".before")) if(fs::exists(cfgDir/"nodes"/node->name+".before"))
run->addScript((cfgDir/"nodes"/node.name+".before").string()); run->addScript((cfgDir/"nodes"/node->name+".before").string());
// job before-run script // job before-run script
if(fs::exists(cfgDir/"jobs"/run->name+".before")) if(fs::exists(cfgDir/"jobs"/run->name+".before"))
run->addScript((cfgDir/"jobs"/run->name+".before").string()); run->addScript((cfgDir/"jobs"/run->name+".before").string());
@ -632,8 +641,8 @@ void Laminar::assignNewJobs() {
if(fs::exists(cfgDir/"jobs"/run->name+".after")) if(fs::exists(cfgDir/"jobs"/run->name+".after"))
run->addScript((cfgDir/"jobs"/run->name+".after").string()); run->addScript((cfgDir/"jobs"/run->name+".after").string());
// per-node after-run script // per-node after-run script
if(fs::exists(cfgDir/"nodes"/node.name+".after")) if(fs::exists(cfgDir/"nodes"/node->name+".after"))
run->addScript((cfgDir/"nodes"/node.name+".after").string()); run->addScript((cfgDir/"nodes"/node->name+".after").string());
// global after-run script // global after-run script
if(fs::exists(cfgDir/"after")) if(fs::exists(cfgDir/"after"))
run->addScript((cfgDir/"after").string()); run->addScript((cfgDir/"after").string());
@ -641,14 +650,14 @@ void Laminar::assignNewJobs() {
// add environment files // add environment files
if(fs::exists(cfgDir/"env")) if(fs::exists(cfgDir/"env"))
run->addEnv((cfgDir/"env").string()); run->addEnv((cfgDir/"env").string());
if(fs::exists(cfgDir/"nodes"/node.name+".env")) if(fs::exists(cfgDir/"nodes"/node->name+".env"))
run->addEnv((cfgDir/"nodes"/node.name+".env").string()); run->addEnv((cfgDir/"nodes"/node->name+".env").string());
if(fs::exists(cfgDir/"jobs"/run->name+".env")) if(fs::exists(cfgDir/"jobs"/run->name+".env"))
run->addEnv((cfgDir/"jobs"/run->name+".env").string()); run->addEnv((cfgDir/"jobs"/run->name+".env").string());
// start the job // start the job
node.busyExecutors++; node->busyExecutors++;
run->node = &node; run->node = node;
run->startedAt = time(nullptr); run->startedAt = time(nullptr);
run->laminarHome = homeDir; run->laminarHome = homeDir;
run->build = buildNum; run->build = buildNum;
@ -661,7 +670,7 @@ void Laminar::assignNewJobs() {
// update next build number // update next build number
buildNums[run->name] = buildNum; buildNums[run->name] = buildNum;
LLOG(INFO, "Queued job to node", run->name, run->build, node.name); LLOG(INFO, "Queued job to node", run->name, run->build, node->name);
// notify clients // notify clients
Json j; Json j;
@ -713,7 +722,7 @@ void Laminar::assignNewJobs() {
} }
void Laminar::runFinished(Run * r) { void Laminar::runFinished(Run * r) {
Node* node = r->node; std::shared_ptr<Node> node = r->node;
node->busyExecutors--; node->busyExecutors--;
LLOG(INFO, "Run completed", r->name, to_string(r->result)); LLOG(INFO, "Run completed", r->name, to_string(r->result));

View File

@ -27,7 +27,7 @@
#include <unordered_map> #include <unordered_map>
// Node name to node object map // Node name to node object map
typedef std::unordered_map<std::string,Node> NodeMap; typedef std::unordered_map<std::string, std::shared_ptr<Node>> NodeMap;
struct Server; struct Server;
class Json; class Json;

View File

@ -25,6 +25,7 @@
#include <functional> #include <functional>
#include <ostream> #include <ostream>
#include <unordered_map> #include <unordered_map>
#include <memory>
enum class RunState { enum class RunState {
UNKNOWN, UNKNOWN,
@ -75,7 +76,7 @@ public:
std::string reason() const; std::string reason() const;
std::function<void(Run*)> notifyCompletion; std::function<void(Run*)> notifyCompletion;
Node* node; std::shared_ptr<Node> node;
RunState result; RunState result;
RunState lastResult; RunState lastResult;
std::string laminarHome; std::string laminarHome;

View File

@ -25,7 +25,7 @@
class RunTest : public ::testing::Test { class RunTest : public ::testing::Test {
protected: protected:
void SetUp() override { void SetUp() override {
run.node = &node; run.node = node;
} }
void wait() { void wait() {
int state = -1; int state = -1;
@ -55,7 +55,7 @@ protected:
} }
class Run run; class Run run;
Node node; std::shared_ptr<Node> node = std::shared_ptr<Node>(new Node);
}; };
TEST_F(RunTest, WorkingDirectory) { TEST_F(RunTest, WorkingDirectory) {