From 216ecee7c5b4caf18071054ab72437b790d236f2 Mon Sep 17 00:00:00 2001 From: Oliver Giles Date: Fri, 6 Apr 2018 18:04:50 +0300 Subject: [PATCH] resolves #34: watch for configuration changes Reload the configuration if a change is detected without requiring a server restart --- src/conf.cpp | 2 +- src/interface.h | 3 +++ src/laminar.cpp | 9 +++++++++ src/laminar.h | 3 ++- src/server.cpp | 41 +++++++++++++++++++++++++++++------------ src/server.h | 5 +++++ test/test-server.cpp | 1 + 7 files changed, 50 insertions(+), 14 deletions(-) diff --git a/src/conf.cpp b/src/conf.cpp index ca791c3..4d87ac7 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -25,7 +25,7 @@ int StringMap::convert(std::string e) { return atoi(e.c_str()); } StringMap parseConfFile(const char* path) { StringMap result; - std::fstream f(path); + std::ifstream f(path); std::string line; while(std::getline(f, line)) { if(line[0] == '#') diff --git a/src/interface.h b/src/interface.h index 4061233..d3a48b8 100644 --- a/src/interface.h +++ b/src/interface.h @@ -131,6 +131,9 @@ struct LaminarInterface { // Callback for laminar to reap child processes. virtual void reapChildren() = 0; + + // Callback to handle a configuration modification notification + virtual void notifyConfigChanged() = 0; }; #endif // LAMINAR_INTERFACE_H_ diff --git a/src/laminar.cpp b/src/laminar.cpp index 67fe822..0643921 100644 --- a/src/laminar.cpp +++ b/src/laminar.cpp @@ -364,6 +364,8 @@ void Laminar::run() { const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT; srv = new Server(*this, listen_rpc, listen_http); + srv->addWatchPath(fs::path(fs::path(homeDir)/"cfg"/"nodes").string().c_str()); + srv->addWatchPath(fs::path(fs::path(homeDir)/"cfg"/"jobs").string().c_str()); srv->start(); } @@ -530,6 +532,13 @@ void Laminar::reapChildren() { assignNewJobs(); } +void Laminar::notifyConfigChanged() +{ + loadConfiguration(); + // config change may allow stuck jobs to dequeue + assignNewJobs(); +} + void Laminar::abortAll() { for(std::shared_ptr run : activeJobs) { run->abort(); diff --git a/src/laminar.h b/src/laminar.h index 44015df..e8735d3 100644 --- a/src/laminar.h +++ b/src/laminar.h @@ -57,8 +57,9 @@ public: bool setParam(std::string job, uint buildNum, std::string param, std::string value) override; bool getArtefact(std::string path, std::string& result) override; std::string getCustomCss() override; - void reapChildren() override; void abortAll() override; + void reapChildren() override; + void notifyConfigChanged() override; private: bool loadConfiguration(); diff --git a/src/server.cpp b/src/server.cpp index 878da20..5dbc74f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -32,6 +32,7 @@ #include #include +#include #include #include @@ -401,18 +402,30 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress, })); // handle SIGCHLD - sigset_t mask; - sigemptyset(&mask); - sigaddset(&mask, SIGCHLD); - sigprocmask(SIG_BLOCK, &mask, nullptr); - int sigchld = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); - auto event = ioContext.lowLevelProvider->wrapInputFd(sigchld, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); - auto buffer = kj::heapArrayBuilder(PROC_IO_BUFSIZE); - reapWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char* buf, size_t){ - const struct signalfd_siginfo* siginfo = reinterpret_cast(buf); - KJ_ASSERT(siginfo->ssi_signo == SIGCHLD); - laminarInterface.reapChildren(); - }).attach(std::move(event)).attach(std::move(buffer)); + { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGCHLD); + sigprocmask(SIG_BLOCK, &mask, nullptr); + int sigchld = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); + auto event = ioContext.lowLevelProvider->wrapInputFd(sigchld, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); + auto buffer = kj::heapArrayBuilder(PROC_IO_BUFSIZE); + reapWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char* buf, size_t){ + const struct signalfd_siginfo* siginfo = reinterpret_cast(buf); + KJ_ASSERT(siginfo->ssi_signo == SIGCHLD); + laminarInterface.reapChildren(); + }).attach(std::move(event)).attach(std::move(buffer)); + } + + // handle watched paths + { + inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); + auto event = ioContext.lowLevelProvider->wrapInputFd(inotify_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); + auto buffer = kj::heapArrayBuilder(PROC_IO_BUFSIZE); + pathWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char*, size_t){ + laminarInterface.notifyConfigChanged(); + }).attach(std::move(event)).attach(std::move(buffer)); + } } Server::~Server() { @@ -455,6 +468,10 @@ void Server::addDescriptor(int fd, std::function cb) { childTasks.add(handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer))); } +void Server::addWatchPath(const char* dpath) { + inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE); +} + kj::Promise Server::acceptHttpClient(kj::Own&& listener) { kj::ConnectionReceiver& cr = *listener.get(); return cr.accept().then(kj::mvCapture(kj::mv(listener), diff --git a/src/server.h b/src/server.h index 43b8f95..acdd76b 100644 --- a/src/server.h +++ b/src/server.h @@ -43,6 +43,9 @@ public: // invoked with the read data void addDescriptor(int fd, std::function cb); + // add a path to be watched for changes + void addWatchPath(const char* dpath); + private: kj::Promise acceptHttpClient(kj::Own&& listener); kj::Promise acceptRpcClient(kj::Own&& listener); @@ -63,6 +66,8 @@ private: kj::TaskSet childTasks; kj::TaskSet httpConnections; kj::Maybe> reapWatch; + int inotify_fd; + kj::Maybe> pathWatch; // TODO: restructure so this isn't necessary friend class ServerTest; diff --git a/test/test-server.cpp b/test/test-server.cpp index 12ee297..555b594 100644 --- a/test/test-server.cpp +++ b/test/test-server.cpp @@ -64,6 +64,7 @@ public: MOCK_METHOD0(getCustomCss, std::string()); MOCK_METHOD0(abortAll, void()); MOCK_METHOD0(reapChildren, void()); + MOCK_METHOD0(notifyConfigChanged, void()); }; class ServerTest : public ::testing::Test {