mirror of
https://github.com/ohwgiles/laminar.git
synced 2024-10-27 20:34:20 +00:00
resolves #34: watch for configuration changes
Reload the configuration if a change is detected without requiring a server restart
This commit is contained in:
parent
a5d8b985f1
commit
216ecee7c5
@ -25,7 +25,7 @@ int StringMap::convert(std::string e) { return atoi(e.c_str()); }
|
|||||||
|
|
||||||
StringMap parseConfFile(const char* path) {
|
StringMap parseConfFile(const char* path) {
|
||||||
StringMap result;
|
StringMap result;
|
||||||
std::fstream f(path);
|
std::ifstream f(path);
|
||||||
std::string line;
|
std::string line;
|
||||||
while(std::getline(f, line)) {
|
while(std::getline(f, line)) {
|
||||||
if(line[0] == '#')
|
if(line[0] == '#')
|
||||||
|
@ -131,6 +131,9 @@ struct LaminarInterface {
|
|||||||
|
|
||||||
// Callback for laminar to reap child processes.
|
// Callback for laminar to reap child processes.
|
||||||
virtual void reapChildren() = 0;
|
virtual void reapChildren() = 0;
|
||||||
|
|
||||||
|
// Callback to handle a configuration modification notification
|
||||||
|
virtual void notifyConfigChanged() = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif // LAMINAR_INTERFACE_H_
|
#endif // LAMINAR_INTERFACE_H_
|
||||||
|
@ -364,6 +364,8 @@ void Laminar::run() {
|
|||||||
const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT;
|
const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT;
|
||||||
|
|
||||||
srv = new Server(*this, listen_rpc, listen_http);
|
srv = new Server(*this, listen_rpc, listen_http);
|
||||||
|
srv->addWatchPath(fs::path(fs::path(homeDir)/"cfg"/"nodes").string().c_str());
|
||||||
|
srv->addWatchPath(fs::path(fs::path(homeDir)/"cfg"/"jobs").string().c_str());
|
||||||
srv->start();
|
srv->start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -530,6 +532,13 @@ void Laminar::reapChildren() {
|
|||||||
assignNewJobs();
|
assignNewJobs();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Laminar::notifyConfigChanged()
|
||||||
|
{
|
||||||
|
loadConfiguration();
|
||||||
|
// config change may allow stuck jobs to dequeue
|
||||||
|
assignNewJobs();
|
||||||
|
}
|
||||||
|
|
||||||
void Laminar::abortAll() {
|
void Laminar::abortAll() {
|
||||||
for(std::shared_ptr<Run> run : activeJobs) {
|
for(std::shared_ptr<Run> run : activeJobs) {
|
||||||
run->abort();
|
run->abort();
|
||||||
|
@ -57,8 +57,9 @@ public:
|
|||||||
bool setParam(std::string job, uint buildNum, std::string param, std::string value) override;
|
bool setParam(std::string job, uint buildNum, std::string param, std::string value) override;
|
||||||
bool getArtefact(std::string path, std::string& result) override;
|
bool getArtefact(std::string path, std::string& result) override;
|
||||||
std::string getCustomCss() override;
|
std::string getCustomCss() override;
|
||||||
void reapChildren() override;
|
|
||||||
void abortAll() override;
|
void abortAll() override;
|
||||||
|
void reapChildren() override;
|
||||||
|
void notifyConfigChanged() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool loadConfiguration();
|
bool loadConfiguration();
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
#include <websocketpp/server.hpp>
|
#include <websocketpp/server.hpp>
|
||||||
|
|
||||||
#include <sys/eventfd.h>
|
#include <sys/eventfd.h>
|
||||||
|
#include <sys/inotify.h>
|
||||||
#include <sys/signal.h>
|
#include <sys/signal.h>
|
||||||
#include <sys/signalfd.h>
|
#include <sys/signalfd.h>
|
||||||
|
|
||||||
@ -401,18 +402,30 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
// handle SIGCHLD
|
// handle SIGCHLD
|
||||||
sigset_t mask;
|
{
|
||||||
sigemptyset(&mask);
|
sigset_t mask;
|
||||||
sigaddset(&mask, SIGCHLD);
|
sigemptyset(&mask);
|
||||||
sigprocmask(SIG_BLOCK, &mask, nullptr);
|
sigaddset(&mask, SIGCHLD);
|
||||||
int sigchld = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
|
sigprocmask(SIG_BLOCK, &mask, nullptr);
|
||||||
auto event = ioContext.lowLevelProvider->wrapInputFd(sigchld, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
|
int sigchld = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
|
||||||
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
|
auto event = ioContext.lowLevelProvider->wrapInputFd(sigchld, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
|
||||||
reapWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char* buf, size_t){
|
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
|
||||||
const struct signalfd_siginfo* siginfo = reinterpret_cast<const struct signalfd_siginfo*>(buf);
|
reapWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char* buf, size_t){
|
||||||
KJ_ASSERT(siginfo->ssi_signo == SIGCHLD);
|
const struct signalfd_siginfo* siginfo = reinterpret_cast<const struct signalfd_siginfo*>(buf);
|
||||||
laminarInterface.reapChildren();
|
KJ_ASSERT(siginfo->ssi_signo == SIGCHLD);
|
||||||
}).attach(std::move(event)).attach(std::move(buffer));
|
laminarInterface.reapChildren();
|
||||||
|
}).attach(std::move(event)).attach(std::move(buffer));
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle watched paths
|
||||||
|
{
|
||||||
|
inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
|
||||||
|
auto event = ioContext.lowLevelProvider->wrapInputFd(inotify_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
|
||||||
|
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
|
||||||
|
pathWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char*, size_t){
|
||||||
|
laminarInterface.notifyConfigChanged();
|
||||||
|
}).attach(std::move(event)).attach(std::move(buffer));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Server::~Server() {
|
Server::~Server() {
|
||||||
@ -455,6 +468,10 @@ void Server::addDescriptor(int fd, std::function<void(const char*,size_t)> cb) {
|
|||||||
childTasks.add(handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer)));
|
childTasks.add(handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Server::addWatchPath(const char* dpath) {
|
||||||
|
inotify_add_watch(inotify_fd, dpath, IN_ONLYDIR | IN_CLOSE_WRITE | IN_CREATE | IN_DELETE);
|
||||||
|
}
|
||||||
|
|
||||||
kj::Promise<void> Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
|
kj::Promise<void> Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
|
||||||
kj::ConnectionReceiver& cr = *listener.get();
|
kj::ConnectionReceiver& cr = *listener.get();
|
||||||
return cr.accept().then(kj::mvCapture(kj::mv(listener),
|
return cr.accept().then(kj::mvCapture(kj::mv(listener),
|
||||||
|
@ -43,6 +43,9 @@ public:
|
|||||||
// invoked with the read data
|
// invoked with the read data
|
||||||
void addDescriptor(int fd, std::function<void(const char*,size_t)> cb);
|
void addDescriptor(int fd, std::function<void(const char*,size_t)> cb);
|
||||||
|
|
||||||
|
// add a path to be watched for changes
|
||||||
|
void addWatchPath(const char* dpath);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
kj::Promise<void> acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
kj::Promise<void> acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
||||||
kj::Promise<void> acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
kj::Promise<void> acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
|
||||||
@ -63,6 +66,8 @@ private:
|
|||||||
kj::TaskSet childTasks;
|
kj::TaskSet childTasks;
|
||||||
kj::TaskSet httpConnections;
|
kj::TaskSet httpConnections;
|
||||||
kj::Maybe<kj::Promise<void>> reapWatch;
|
kj::Maybe<kj::Promise<void>> reapWatch;
|
||||||
|
int inotify_fd;
|
||||||
|
kj::Maybe<kj::Promise<void>> pathWatch;
|
||||||
|
|
||||||
// TODO: restructure so this isn't necessary
|
// TODO: restructure so this isn't necessary
|
||||||
friend class ServerTest;
|
friend class ServerTest;
|
||||||
|
@ -64,6 +64,7 @@ public:
|
|||||||
MOCK_METHOD0(getCustomCss, std::string());
|
MOCK_METHOD0(getCustomCss, std::string());
|
||||||
MOCK_METHOD0(abortAll, void());
|
MOCK_METHOD0(abortAll, void());
|
||||||
MOCK_METHOD0(reapChildren, void());
|
MOCK_METHOD0(reapChildren, void());
|
||||||
|
MOCK_METHOD0(notifyConfigChanged, void());
|
||||||
};
|
};
|
||||||
|
|
||||||
class ServerTest : public ::testing::Test {
|
class ServerTest : public ::testing::Test {
|
||||||
|
Loading…
Reference in New Issue
Block a user