1
0
mirror of https://github.com/ohwgiles/laminar.git synced 2024-10-27 20:34:20 +00:00

resolves #29: graceful shutdown

on SIGINT/SIGTERM:
1. stop accepting new connections
2. send SIGTERM to all child tasks
3. wait for processes to end
4. drop all websockets
This commit is contained in:
Oliver Giles 2018-02-24 18:53:11 +02:00
parent 30f2203a3b
commit 9c256815e4
9 changed files with 132 additions and 63 deletions

View File

@ -125,6 +125,12 @@ struct LaminarInterface {
// string. This shouldn't be used, because the sysadmin should have
// configured a real webserver to serve these things.
virtual std::string getCustomCss() = 0;
// Abort all running jobs
virtual void abortAll() = 0;
// Callback for laminar to reap child processes.
virtual void reapChildren() = 0;
};
#endif // LAMINAR_INTERFACE_H_

View File

@ -22,7 +22,6 @@
#include "log.h"
#include <sys/wait.h>
#include <sys/signalfd.h>
#include <fstream>
#include <zlib.h>
@ -365,31 +364,10 @@ void Laminar::run() {
const char* listen_http = getenv("LAMINAR_BIND_HTTP") ?: INTADDR_HTTP_DEFAULT;
srv = new Server(*this, listen_rpc, listen_http);
// handle SIGCHLD
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGCHLD);
sigprocmask(SIG_BLOCK, &mask, nullptr);
int sigchld = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
srv->addDescriptor(sigchld, [this](const char* buf, size_t){
const struct signalfd_siginfo* siginfo = reinterpret_cast<const struct signalfd_siginfo*>(buf);
// TODO: re-enable assertion when the cause for its triggering
// is discovered and solved
//KJ_ASSERT(siginfo->ssi_signo == SIGCHLD);
if(siginfo->ssi_signo == SIGCHLD) {
reapAdvance();
assignNewJobs();
} else {
LLOG(ERROR, "Unexpected signo", siginfo->ssi_signo);
}
});
srv->start();
}
void Laminar::stop() {
clients.clear();
srv->stop();
}
@ -523,7 +501,7 @@ void Laminar::handleRunLog(std::shared_ptr<Run> run, std::string s) {
// Reaps a zombie and steps the corresponding Run to its next state.
// Should be called on SIGCHLD
void Laminar::reapAdvance() {
void Laminar::reapChildren() {
int ret = 0;
pid_t pid;
constexpr int bufsz = 1024;
@ -548,6 +526,14 @@ void Laminar::reapAdvance() {
if(completed)
run->complete();
}
assignNewJobs();
}
void Laminar::abortAll() {
for(std::shared_ptr<Run> run : activeJobs) {
run->abort();
}
}
bool Laminar::nodeCanQueue(const Node& node, const Run& run) const {

View File

@ -57,10 +57,11 @@ public:
bool setParam(std::string job, uint buildNum, std::string param, std::string value) override;
bool getArtefact(std::string path, std::string& result) override;
std::string getCustomCss() override;
void reapChildren() override;
void abortAll() override;
private:
bool loadConfiguration();
void reapAdvance();
void assignNewJobs();
bool stepRun(std::shared_ptr<Run> run);
void handleRunLog(std::shared_ptr<Run> run, std::string log);

View File

@ -58,9 +58,6 @@ std::string Run::reason() const {
}
bool Run::step() {
if(!currentScript.path.empty() && procStatus != 0)
result = RunState::FAILED;
if(scripts.size()) {
currentScript = scripts.front();
scripts.pop();
@ -75,6 +72,9 @@ bool Run::step() {
sigaddset(&mask, SIGCHLD);
sigprocmask(SIG_UNBLOCK, &mask, nullptr);
// set pgid == pid for easy killing on abort
setpgid(0, 0);
close(pfd[0]);
dup2(pfd[1], 1);
dup2(pfd[1], 2);
@ -127,14 +127,31 @@ bool Run::step() {
return true;
}
}
void Run::addScript(std::string scriptPath, std::string scriptWorkingDir) {
scripts.push({scriptPath, scriptWorkingDir});
}
void Run::addEnv(std::string path) {
env.push_back(path);
}
void Run::abort() {
// clear all pending scripts
std::queue<Script>().swap(scripts);
kill(-pid, SIGTERM);
}
void Run::reaped(int status) {
procStatus = status;
// once state is non-success it cannot change again
if(result != RunState::SUCCESS)
return;
if(WIFSIGNALED(status) && (WTERMSIG(status) == SIGTERM || WTERMSIG(status) == SIGKILL))
result = RunState::ABORTED;
else if(status != 0)
result = RunState::FAILED;
// otherwise preserve earlier status
}
void Run::complete() {

View File

@ -65,6 +65,9 @@ public:
// adds an environment file that will be sourced before this run
void addEnv(std::string path);
// aborts this run
void abort();
// called when a process owned by this run has been reaped. The status
// may be used to set the run's job status
void reaped(int status);
@ -85,7 +88,6 @@ public:
std::string log;
pid_t pid;
int fd;
int procStatus = 0;
std::unordered_map<std::string, std::string> params;
time_t queuedAt;

View File

@ -32,6 +32,8 @@
#include <websocketpp/server.hpp>
#include <sys/eventfd.h>
#include <sys/signal.h>
#include <sys/signalfd.h>
// Size of buffer used to read from file descriptors. Should be
// a multiple of sizeof(struct signalfd_siginfo) == 128
@ -373,75 +375,103 @@ Server::Server(LaminarInterface& li, kj::StringPtr rpcBindAddress,
laminarInterface(li),
httpInterface(kj::heap<HttpImpl>(li)),
ioContext(kj::setupAsyncIo()),
tasks(*this),
listeners(kj::heap<kj::TaskSet>(*this)),
childTasks(*this),
httpConnections(*this),
httpReady(kj::newPromiseAndFulfiller<void>())
{
// RPC task
if(rpcBindAddress.startsWith("unix:"))
unlink(rpcBindAddress.slice(strlen("unix:")).cStr());
tasks.add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress)
listeners->add(ioContext.provider->getNetwork().parseAddress(rpcBindAddress)
.then([this](kj::Own<kj::NetworkAddress>&& addr) {
acceptRpcClient(addr->listen());
return acceptRpcClient(addr->listen());
}));
// HTTP task
if(httpBindAddress.startsWith("unix:"))
unlink(httpBindAddress.slice(strlen("unix:")).cStr());
tasks.add(ioContext.provider->getNetwork().parseAddress(httpBindAddress)
listeners->add(ioContext.provider->getNetwork().parseAddress(httpBindAddress)
.then([this](kj::Own<kj::NetworkAddress>&& addr) {
acceptHttpClient(addr->listen());
// TODO: a better way? Currently used only for testing
httpReady.fulfiller->fulfill();
return acceptHttpClient(addr->listen());
}));
// handle SIGCHLD
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGCHLD);
sigprocmask(SIG_BLOCK, &mask, nullptr);
int sigchld = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
auto event = ioContext.lowLevelProvider->wrapInputFd(sigchld, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
reapWatch = handleFdRead(event, buffer.asPtr().begin(), [this](const char* buf, size_t){
const struct signalfd_siginfo* siginfo = reinterpret_cast<const struct signalfd_siginfo*>(buf);
KJ_ASSERT(siginfo->ssi_signo == SIGCHLD);
laminarInterface.reapChildren();
}).attach(std::move(event)).attach(std::move(buffer));
}
Server::~Server() {
}
void Server::start() {
// this eventfd is just to allow us to quit the server at some point
// in the future by adding this event to the async loop. I couldn't see
// a simpler way...
// The eventfd is used to quit the server later since we need to trigger
// a reaction from the event loop
efd_quit = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
kj::Promise<void> quit = kj::evalLater([this](){
kj::evalLater([this](){
static uint64_t _;
auto wakeEvent = ioContext.lowLevelProvider->wrapInputFd(efd_quit);
return wakeEvent->read(&_, sizeof(uint64_t)).attach(std::move(wakeEvent));
});
quit.wait(ioContext.waitScope);
}).wait(ioContext.waitScope);
// Execution arrives here when the eventfd is triggered (in stop())
// Shutdown sequence:
// 1. stop accepting new connections
listeners = nullptr;
// 2. abort current jobs. Most of the time this isn't necessary since
// systemd stop or other kill mechanism will send SIGTERM to the whole
// process group.
laminarInterface.abortAll();
// 3. wait for all children to close
childTasks.onEmpty().wait(ioContext.waitScope);
// 4. run the loop once more to send any pending output to websocket clients
ioContext.waitScope.poll();
// 5. return: websockets will be destructed
}
void Server::stop() {
// This method is expected to be called in signal context, so an eventfd
// is used to get the main loop to react. See run()
eventfd_write(efd_quit, 1);
}
void Server::addDescriptor(int fd, std::function<void(const char*,size_t)> cb) {
auto event = this->ioContext.lowLevelProvider->wrapInputFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
auto buffer = kj::heapArrayBuilder<char>(PROC_IO_BUFSIZE);
tasks.add(handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer)));
childTasks.add(handleFdRead(event, buffer.asPtr().begin(), cb).attach(std::move(event)).attach(std::move(buffer)));
}
void Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
auto ptr = listener.get();
tasks.add(ptr->accept().then(kj::mvCapture(kj::mv(listener),
kj::Promise<void> Server::acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener) {
kj::ConnectionReceiver& cr = *listener.get();
return cr.accept().then(kj::mvCapture(kj::mv(listener),
[this](kj::Own<kj::ConnectionReceiver>&& listener, kj::Own<kj::AsyncIoStream>&& connection) {
acceptHttpClient(kj::mv(listener));
auto conn = kj::heap<WebsocketConnection>(kj::mv(connection), *httpInterface);
// delete the connection when either the read or write task completes
return conn->pend().exclusiveJoin(conn->writeTask()).attach(kj::mv(conn));
}))
);
httpConnections.add(conn->pend().exclusiveJoin(conn->writeTask()).attach(kj::mv(conn)));
return acceptHttpClient(kj::mv(listener));
}));
}
void Server::acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener) {
auto ptr = listener.get();
tasks.add(ptr->accept().then(kj::mvCapture(kj::mv(listener),
kj::Promise<void> Server::acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener) {
kj::ConnectionReceiver& cr = *listener.get();
return cr.accept().then(kj::mvCapture(kj::mv(listener),
[this](kj::Own<kj::ConnectionReceiver>&& listener, kj::Own<kj::AsyncIoStream>&& connection) {
acceptRpcClient(kj::mv(listener));
auto server = kj::heap<RpcConnection>(kj::mv(connection), rpcInterface, capnp::ReaderOptions());
tasks.add(server->network.onDisconnect().attach(kj::mv(server)));
}))
);
childTasks.add(server->network.onDisconnect().attach(kj::mv(server)));
return acceptRpcClient(kj::mv(listener));
}));
}
// returns a promise which will read a chunk of data from the file descriptor

View File

@ -44,8 +44,8 @@ public:
void addDescriptor(int fd, std::function<void(const char*,size_t)> cb);
private:
void acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
void acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
kj::Promise<void> acceptHttpClient(kj::Own<kj::ConnectionReceiver>&& listener);
kj::Promise<void> acceptRpcClient(kj::Own<kj::ConnectionReceiver>&& listener);
kj::Promise<void> handleFdRead(kj::AsyncInputStream* stream, char* buffer, std::function<void(const char*,size_t)> cb);
void taskFailed(kj::Exception&& exception) override;
@ -59,7 +59,10 @@ private:
LaminarInterface& laminarInterface;
kj::Own<HttpImpl> httpInterface;
kj::AsyncIoContext ioContext;
kj::TaskSet tasks;
kj::Own<kj::TaskSet> listeners;
kj::TaskSet childTasks;
kj::TaskSet httpConnections;
kj::Maybe<kj::Promise<void>> reapWatch;
// TODO: restructure so this isn't necessary
friend class ServerTest;

View File

@ -27,12 +27,14 @@ protected:
void SetUp() override {
run.node = &node;
}
void runAll() {
while(!run.step()) {
void wait() {
int state = -1;
waitpid(run.pid, &state, 0);
run.reaped(state);
}
void runAll() {
while(!run.step())
wait();
}
std::string readAllOutput() {
std::string res;
@ -96,3 +98,23 @@ TEST_F(RunTest, ParamsToEnv) {
StringMap map = parseFromString(readAllOutput());
EXPECT_EQ("bar", map["foo"]);
}
TEST_F(RunTest, Abort) {
run.addScript("/usr/bin/yes");
run.step();
usleep(200); // TODO fix
run.abort();
wait();
EXPECT_EQ(RunState::ABORTED, run.result);
}
TEST_F(RunTest, AbortAfterFailed) {
run.addScript("/bin/false");
runAll();
run.addScript("/usr/bin/yes");
run.step();
usleep(200); // TODO fix
run.abort();
wait();
EXPECT_EQ(RunState::FAILED, run.result);
}

View File

@ -62,6 +62,8 @@ public:
MOCK_METHOD4(setParam, bool(std::string job, uint buildNum, std::string param, std::string value));
MOCK_METHOD2(getArtefact, bool(std::string path, std::string& result));
MOCK_METHOD0(getCustomCss, std::string());
MOCK_METHOD0(abortAll, void());
MOCK_METHOD0(reapChildren, void());
};
class ServerTest : public ::testing::Test {