1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2024-10-27 20:34:20 +00:00

resolves #34: watch for configuration changes

Reload the configuration if a change is detected
without requiring a server restart
This commit is contained in:
Oliver Giles 2018-04-06 18:04:50 +03:00
parent a5d8b985f1
commit 216ecee7c5
7 changed files with 50 additions and 14 deletions

View File

@ -25,7 +25,7 @@ int StringMap::convert(std::string e) { return atoi(e.c_str()); }
StringMap parseConfFile(const char* path) { StringMap parseConfFile(const char* path) {
StringMap result; StringMap result;
std::fstream f(path); std::ifstream f(path);
std::string line; std::string line;
while(std::getline(f, line)) { while(std::getline(f, line)) {
if(line[0] == '#') if(line[0] == '#')

View File

@ -131,6 +131,9 @@ struct LaminarInterface {
// Callback for laminar to reap child processes. // Callback for laminar to reap child processes.
virtual void reapChildren() = 0; virtual void reapChildren() = 0;
// Callback to handle a configuration modification notification
virtual void notifyConfigChanged() = 0;
}; };
#endif // LAMINAR_INTERFACE_H_ #endif // LAMINAR_INTERFACE_H_

View File

@ -364,6 +364,8 @@ void Laminar::run() {
const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT; const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT;
srv = new Server(*this, listen_rpc, listen_http); srv = new Server(*this, listen_rpc, listen_http);
srv->addWatchPath(fs::path(fs::path(homeDir)/"cfg"/"nodes").string().c_str());
srv->addWatchPath(fs::path(fs::path(homeDir)/"cfg"/"jobs").string().c_str());
srv->start(); srv->start();
} }
@ -530,6 +532,13 @@ void Laminar::reapChildren() {
assignNewJobs(); assignNewJobs();
} }
void Laminar::notifyConfigChanged()
{
loadConfiguration();
// config change may allow stuck jobs to dequeue
assignNewJobs();
}
void Laminar::abortAll() { void Laminar::abortAll() {
for(std::shared_ptr<Run> run : activeJobs) { for(std::shared_ptr<Run> run : activeJobs) {
run->abort(); run->abort();

View File

@ -57,8 +57,9 @@ public:
bool setParam(std::string job, uint buildNum, std::string param, std::string value) override; bool setParam(std::string job, uint buildNum, std::string param, std::string value) override;
bool getArtefact(std::string path, std::string& result) override; bool getArtefact(std::string path, std::string& result) override;
std::string getCustomCss() override; std::string getCustomCss() override;
void reapChildren() override;
void abortAll() override; void abortAll() override;
void reapChildren() override;
void notifyConfigChanged() override;
private: private:
bool loadConfiguration(); bool loadConfiguration();

View File

@ -32,6 +32,7 @@
#include <websocketpp/server.hpp> #include <websocketpp/server.hpp>
#include <sys/eventfd.h> #include <sys/eventfd.h>
#include <sys/inotify.h>
#include <sys/signal.h> #include <sys/signal.h>
#include <sys/signalfd.h> #include <sys/signalfd.h>
@ -401,6 +402,7 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
})); }));
// handle SIGCHLD // handle SIGCHLD
{
sigset_t mask; sigset_t mask;
sigemptyset(&mask); sigemptyset(&mask);
sigaddset(&mask, SIGCHLD); sigaddset(&mask, SIGCHLD);
@ -413,6 +415,17 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
KJ_ASSERT(siginfo->ssi_signo == SIGCHLD); KJ_ASSERT(siginfo->ssi_signo == SIGCHLD);
laminarInterface.reapChildren(); laminarInterface.reapChildren();
}).attach(std::move(event)).attach(std::move(buffer)); }).attach(std::move(event)).attach(std::move(buffer));
}
// handle watched paths
{
inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
auto event = ioContext.lowLevelProvider->wrapInputFd(inotify_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
pathWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char*, size_t){
laminarInterface.notifyConfigChanged();
}).attach(std::move(event)).attach(std::move(buffer));
}
} }
Server::~Server() { Server::~Server() {
@ -455,6 +468,10 @@ void Server::addDescriptor(int fd, std::function<void(const char*,size_t)> cb) {
childTasks.add(handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer))); childTasks.add(handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer)));
} }
void Server::addWatchPath(const char* dpath) {
inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE);
}
kj::Promise<void> Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) { kj::Promise<void> Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
kj::ConnectionReceiver& cr = *listener.get(); kj::ConnectionReceiver& cr = *listener.get();
return cr.accept().then(kj::mvCapture(kj::mv(listener), return cr.accept().then(kj::mvCapture(kj::mv(listener),

View File

@ -43,6 +43,9 @@ public:
// invoked with the read data // invoked with the read data
void addDescriptor(int fd, std::function<void(const char*,size_t)> cb); void addDescriptor(int fd, std::function<void(const char*,size_t)> cb);
// add a path to be watched for changes
void addWatchPath(const char* dpath);
private: private:
kj::Promise<void> acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener); kj::Promise<void> acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
kj::Promise<void> acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener); kj::Promise<void> acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
@ -63,6 +66,8 @@ private:
kj::TaskSet childTasks; kj::TaskSet childTasks;
kj::TaskSet httpConnections; kj::TaskSet httpConnections;
kj::Maybe<kj::Promise<void>> reapWatch; kj::Maybe<kj::Promise<void>> reapWatch;
int inotify_fd;
kj::Maybe<kj::Promise<void>> pathWatch;
// TODO: restructure so this isn't necessary // TODO: restructure so this isn't necessary
friend class ServerTest; friend class ServerTest;

View File

@ -64,6 +64,7 @@ public:
MOCK_METHOD0(getCustomCss, std::string()); MOCK_METHOD0(getCustomCss, std::string());
MOCK_METHOD0(abortAll, void()); MOCK_METHOD0(abortAll, void());
MOCK_METHOD0(reapChildren, void()); MOCK_METHOD0(reapChildren, void());
MOCK_METHOD0(notifyConfigChanged, void());
}; };
class ServerTest : public ::testing::Test { class ServerTest : public ::testing::Test {