1
0
mirror of https://github.com/falk-werner/webfuse synced 2024-10-27 20:34:10 +00:00
falk-werner_webfuse/src/webfuse/ws/server.cpp

280 lines
7.2 KiB
C++
Raw Normal View History

2022-11-13 21:22:35 +00:00
#include "webfuse/ws/server.hpp"
2022-11-19 21:57:32 +00:00
2022-11-13 21:22:35 +00:00
#include <libwebsockets.h>
2022-11-19 21:57:32 +00:00
#include <cinttypes>
2022-11-13 21:22:35 +00:00
#include <cstring>
#include <iostream>
#include <thread>
#include <atomic>
2022-11-19 21:57:32 +00:00
#include <mutex>
#include <future>
#include <chrono>
2022-11-14 18:02:46 +00:00
#include <stdexcept>
2022-11-13 21:22:35 +00:00
2022-11-19 21:57:32 +00:00
#include <queue>
#include <string>
#include <unordered_map>
namespace
{
struct user_data
{
struct lws * connection = nullptr;
std::string current_message;
2022-11-19 21:57:32 +00:00
std::mutex mut;
uint32_t id = 0;
std::queue<webfuse::messagewriter> requests;
std::unordered_map<uint32_t, std::promise<webfuse::messagereader>> pending_responses;
2022-11-19 21:57:32 +00:00
};
}
2022-11-13 21:22:35 +00:00
extern "C"
{
static int ws_server_callback(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
2022-11-19 21:57:32 +00:00
auto const * protocol = lws_get_protocol(wsi);
if (nullptr == protocol) { return 0; }
if (&ws_server_callback != protocol->callback) { return 0; }
auto * data = reinterpret_cast<user_data *>(protocol->user);
int result = 0;
2022-11-13 21:22:35 +00:00
switch(reason)
{
case LWS_CALLBACK_ESTABLISHED:
2022-11-19 21:57:32 +00:00
if (nullptr == data->connection)
{
data->connection = wsi;
}
else
{
result = -1;
}
2022-11-13 21:22:35 +00:00
break;
case LWS_CALLBACK_CLOSED:
2022-11-19 21:57:32 +00:00
if (wsi == data->connection)
{
data->connection = nullptr;
}
2022-11-13 21:22:35 +00:00
break;
case LWS_CALLBACK_RECEIVE:
{
auto * fragment = reinterpret_cast<char*>(in);
data->current_message.append(fragment, len);
if (lws_is_final_fragment(wsi))
{
try
{
webfuse::messagereader reader(data->current_message);
uint32_t id = reader.read_u32();
uint8_t message_type = reader.read_u8();
std::lock_guard lock(data->mut);
auto it = data->pending_responses.find(id);
if (it != data->pending_responses.end())
{
it->second.set_value(std::move(reader));
data->pending_responses.erase(it);
}
else
{
// ToDo: log request not found
std::cout << "warning: request not found: id=" << id << std::endl;
for(auto const & entry: data->pending_responses)
{
std::cout << "\t" << entry.first << std::endl;
}
}
}
catch(...)
{
// ToDo: log invalid message
std::cout << "warning: invalid message" << std::endl;
}
}
}
2022-11-13 21:22:35 +00:00
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
2022-11-19 21:57:32 +00:00
{
webfuse::messagewriter writer(webfuse::request_type::unknown);
2022-11-19 21:57:32 +00:00
bool has_msg = false;
bool has_more = false;
{
std::lock_guard lock(data->mut);
has_msg = !(data->requests.empty());
if (has_msg)
{
has_msg = true;
writer = std::move(data->requests.front());
2022-11-19 21:57:32 +00:00
data->requests.pop();
has_more = !(data->requests.empty());
}
}
if (has_msg)
{
size_t size;
unsigned char * raw_data = writer.get_data(size);
2022-11-19 21:57:32 +00:00
int const rc = lws_write(data->connection, raw_data, size, LWS_WRITE_BINARY);
}
}
2022-11-13 21:22:35 +00:00
break;
default:
break;
}
2022-11-19 21:57:32 +00:00
return result;
2022-11-13 21:22:35 +00:00
}
}
namespace webfuse
{
class ws_server::detail
{
detail(detail const &) = delete;
detail& operator=(detail const &) = delete;
detail(detail &&) = delete;
detail& operator=(detail &&) = delete;
public:
detail(ws_config const & config)
: shutdown_requested(false)
2022-11-13 21:22:35 +00:00
{
memset(reinterpret_cast<void*>(protocols), 0, sizeof(protocols));
protocols[0].name = "webfuse2";
protocols[0].callback = &ws_server_callback;
protocols[0].per_session_data_size = 0;
2022-11-19 21:57:32 +00:00
protocols[0].user = reinterpret_cast<void*>(&data);
2022-11-13 21:22:35 +00:00
memset(reinterpret_cast<void*>(&info), 0, sizeof(info));
info.port = config.port;
info.protocols = protocols;
info.vhost_name = "localhost";
info.options = LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE | LWS_SERVER_OPTION_EXPLICIT_VHOSTS;
context = lws_create_context(&info);
lws_vhost * const vhost = lws_create_vhost(context, &info);
// port = lws_get_vhost_port(vhost);
thread = std::thread([this]() {
while (!shutdown_requested)
{
2022-11-19 21:57:32 +00:00
{
std::lock_guard lock(data.mut);
if (!data.requests.empty())
{
if (nullptr != data.connection)
{
lws_callback_on_writable(data.connection);
}
else
{
data.requests = std::move(std::queue<webfuse::messagewriter>());
2022-11-19 21:57:32 +00:00
data.pending_responses.clear();
}
}
}
lws_service(context, 0);
}
});
2022-11-13 21:22:35 +00:00
}
~detail()
{
shutdown_requested = true;
lws_cancel_service(context);
thread.join();
2022-11-13 21:22:35 +00:00
lws_context_destroy(context);
}
uint32_t next_id()
{
data.id++;
if (0 == data.id)
{
data.id = 1;
}
return data.id;
}
std::thread thread;
std::atomic<bool> shutdown_requested;
2022-11-13 21:22:35 +00:00
lws_protocols protocols[2];
lws_context_creation_info info;
lws_context * context;
2022-11-19 21:57:32 +00:00
user_data data;
2022-11-13 21:22:35 +00:00
};
ws_server::ws_server(ws_config const & config)
: d(new detail(config))
{
}
ws_server::~ws_server()
{
delete d;
}
ws_server::ws_server(ws_server && other)
{
this->d = other.d;
other.d = nullptr;
}
ws_server& ws_server::operator=(ws_server && other)
{
if (this != &other)
{
delete this->d;
this->d = other.d;
other.d = nullptr;
}
return *this;
}
messagereader ws_server::perform(messagewriter writer)
2022-11-14 18:02:46 +00:00
{
std::future<messagereader> f;
2022-11-14 18:02:46 +00:00
{
std::promise<messagereader> p;
2022-11-19 21:57:32 +00:00
f = p.get_future();
std::lock_guard lock(d->data.mut);
uint32_t id = d->next_id();
writer.set_id(id);
d->data.requests.emplace(std::move(writer));
d->data.pending_responses.emplace(id, std::move(p));
2022-11-14 18:02:46 +00:00
}
2022-11-19 21:57:32 +00:00
lws_cancel_service(d->context);
if(std::future_status::timeout == f.wait_for(std::chrono::seconds(10)))
2022-11-14 18:02:46 +00:00
{
2022-11-19 21:57:32 +00:00
throw std::runtime_error("timeout");
2022-11-14 18:02:46 +00:00
}
2022-11-19 21:57:32 +00:00
return std::move(f.get());
2022-11-14 18:02:46 +00:00
}
2022-11-13 21:22:35 +00:00
}