mirror of
https://github.com/ohwgiles/laminar.git
synced 2024-10-27 20:34:20 +00:00
resolves #34: watch for configuration changes
Reload the configuration if a change is detected without requiring a server restart
This commit is contained in:
parent
a5d8b985f1
commit
216ecee7c5
@ -25,7 +25,7 @@ int StringMap::convert(std::string e) { return atoi(e.c_str()); }
|
||||
|
||||
StringMap parseConfFile(const char* path) {
|
||||
StringMap result;
|
||||
std::fstream f(path);
|
||||
std::ifstream f(path);
|
||||
std::string line;
|
||||
while(std::getline(f, line)) {
|
||||
if(line[0] == '#')
|
||||
|
@ -131,6 +131,9 @@ struct LaminarInterface {
|
||||
|
||||
// Callback for laminar to reap child processes.
|
||||
virtual void reapChildren() = 0;
|
||||
|
||||
// Callback to handle a configuration modification notification
|
||||
virtual void notifyConfigChanged() = 0;
|
||||
};
|
||||
|
||||
#endif // LAMINAR_INTERFACE_H_
|
||||
|
@ -364,6 +364,8 @@ void Laminar::run() {
|
||||
const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT;
|
||||
|
||||
srv = new Server(*this, listen_rpc, listen_http);
|
||||
srv->addWatchPath(fs::path(fs::path(homeDir)/"cfg"/"nodes").string().c_str());
|
||||
srv->addWatchPath(fs::path(fs::path(homeDir)/"cfg"/"jobs").string().c_str());
|
||||
srv->start();
|
||||
}
|
||||
|
||||
@ -530,6 +532,13 @@ void Laminar::reapChildren() {
|
||||
assignNewJobs();
|
||||
}
|
||||
|
||||
void Laminar::notifyConfigChanged()
|
||||
{
|
||||
loadConfiguration();
|
||||
// config change may allow stuck jobs to dequeue
|
||||
assignNewJobs();
|
||||
}
|
||||
|
||||
void Laminar::abortAll() {
|
||||
for(std::shared_ptr<Run> run : activeJobs) {
|
||||
run->abort();
|
||||
|
@ -57,8 +57,9 @@ public:
|
||||
bool setParam(std::string job, uint buildNum, std::string param, std::string value) override;
|
||||
bool getArtefact(std::string path, std::string& result) override;
|
||||
std::string getCustomCss() override;
|
||||
void reapChildren() override;
|
||||
void abortAll() override;
|
||||
void reapChildren() override;
|
||||
void notifyConfigChanged() override;
|
||||
|
||||
private:
|
||||
bool loadConfiguration();
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include <websocketpp/server.hpp>
|
||||
|
||||
#include <sys/eventfd.h>
|
||||
#include <sys/inotify.h>
|
||||
#include <sys/signal.h>
|
||||
#include <sys/signalfd.h>
|
||||
|
||||
@ -401,18 +402,30 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
|
||||
}));
|
||||
|
||||
// handle SIGCHLD
|
||||
sigset_t mask;
|
||||
sigemptyset(&mask);
|
||||
sigaddset(&mask, SIGCHLD);
|
||||
sigprocmask(SIG_BLOCK, &mask, nullptr);
|
||||
int sigchld = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
|
||||
auto event = ioContext.lowLevelProvider->wrapInputFd(sigchld, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
|
||||
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
|
||||
reapWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char* buf, size_t){
|
||||
const struct signalfd_siginfo* siginfo = reinterpret_cast<const struct signalfd_siginfo*>(buf);
|
||||
KJ_ASSERT(siginfo->ssi_signo == SIGCHLD);
|
||||
laminarInterface.reapChildren();
|
||||
}).attach(std::move(event)).attach(std::move(buffer));
|
||||
{
|
||||
sigset_t mask;
|
||||
sigemptyset(&mask);
|
||||
sigaddset(&mask, SIGCHLD);
|
||||
sigprocmask(SIG_BLOCK, &mask, nullptr);
|
||||
int sigchld = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
|
||||
auto event = ioContext.lowLevelProvider->wrapInputFd(sigchld, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
|
||||
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
|
||||
reapWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char* buf, size_t){
|
||||
const struct signalfd_siginfo* siginfo = reinterpret_cast<const struct signalfd_siginfo*>(buf);
|
||||
KJ_ASSERT(siginfo->ssi_signo == SIGCHLD);
|
||||
laminarInterface.reapChildren();
|
||||
}).attach(std::move(event)).attach(std::move(buffer));
|
||||
}
|
||||
|
||||
// handle watched paths
|
||||
{
|
||||
inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
|
||||
auto event = ioContext.lowLevelProvider->wrapInputFd(inotify_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
|
||||
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
|
||||
pathWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char*, size_t){
|
||||
laminarInterface.notifyConfigChanged();
|
||||
}).attach(std::move(event)).attach(std::move(buffer));
|
||||
}
|
||||
}
|
||||
|
||||
Server::~Server() {
|
||||
@ -455,6 +468,10 @@ void Server::addDescriptor(int fd, std::function<void(const char*,size_t)> cb) {
|
||||
childTasks.add(handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer)));
|
||||
}
|
||||
|
||||
void Server::addWatchPath(const char* dpath) {
|
||||
inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE);
|
||||
}
|
||||
|
||||
kj::Promise<void> Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
|
||||
kj::ConnectionReceiver& cr = *listener.get();
|
||||
return cr.accept().then(kj::mvCapture(kj::mv(listener),
|
||||
|
@ -43,6 +43,9 @@ public:
|
||||
// invoked with the read data
|
||||
void addDescriptor(int fd, std::function<void(const char*,size_t)> cb);
|
||||
|
||||
// add a path to be watched for changes
|
||||
void addWatchPath(const char* dpath);
|
||||
|
||||
private:
|
||||
kj::Promise<void> acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
||||
kj::Promise<void> acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
||||
@ -63,6 +66,8 @@ private:
|
||||
kj::TaskSet childTasks;
|
||||
kj::TaskSet httpConnections;
|
||||
kj::Maybe<kj::Promise<void>> reapWatch;
|
||||
int inotify_fd;
|
||||
kj::Maybe<kj::Promise<void>> pathWatch;
|
||||
|
||||
// TODO: restructure so this isn't necessary
|
||||
friend class ServerTest;
|
||||
|
@ -64,6 +64,7 @@ public:
|
||||
MOCK_METHOD0(getCustomCss, std::string());
|
||||
MOCK_METHOD0(abortAll, void());
|
||||
MOCK_METHOD0(reapChildren, void());
|
||||
MOCK_METHOD0(notifyConfigChanged, void());
|
||||
};
|
||||
|
||||
class ServerTest : public ::testing::Test {
|
||||
|
Loading…
Reference in New Issue
Block a user