removed single threaded test server

pull/2/head
Falk Werner 4 years ago
parent 8a03f16aa5
commit 4d277701cb

@ -209,7 +209,6 @@ alltests = executable('alltests',
'test/webfuse/utils/timeout_watcher.cc', 'test/webfuse/utils/timeout_watcher.cc',
'test/webfuse/utils/path.c', 'test/webfuse/utils/path.c',
'test/webfuse/utils/static_filesystem.c', 'test/webfuse/utils/static_filesystem.c',
'test/webfuse/utils/ws_server.cc',
'test/webfuse/utils/threaded_ws_server.cc', 'test/webfuse/utils/threaded_ws_server.cc',
'test/webfuse/mocks/fake_invokation_context.cc', 'test/webfuse/mocks/fake_invokation_context.cc',
'test/webfuse/mocks/mock_authenticator.cc', 'test/webfuse/mocks/mock_authenticator.cc',

@ -2,6 +2,7 @@
#include "webfuse/adapter/client.h" #include "webfuse/adapter/client.h"
#include "webfuse/adapter/credentials.h" #include "webfuse/adapter/credentials.h"
#include "webfuse/core/protocol_names.h"
#include "webfuse/utils/threaded_ws_server.h" #include "webfuse/utils/threaded_ws_server.h"
using webfuse_test::ThreadedWsServer; using webfuse_test::ThreadedWsServer;
@ -94,8 +95,6 @@ void callback2(
TEST(client, general_usage) TEST(client, general_usage)
{ {
ThreadedWsServer server(54321);
context ctx; context ctx;
ctx.state = connection_state::connecting; ctx.state = connection_state::connecting;
@ -112,7 +111,7 @@ TEST(client, general_usage)
TEST(client, connect) TEST(client, connect)
{ {
ThreadedWsServer server; ThreadedWsServer server(WF_PROTOCOL_NAME_PROVIDER_SERVER);
context ctx; context ctx;
ctx.state = connection_state::connecting; ctx.state = connection_state::connecting;

@ -3,20 +3,25 @@
#include <webfuse/provider/client_protocol.h> #include <webfuse/provider/client_protocol.h>
#include <webfuse/provider/client_config.h> #include <webfuse/provider/client_config.h>
#include "webfuse/utils/ws_server.hpp" #include "webfuse/utils/threaded_ws_server.h"
#include "webfuse/mocks/mock_provider_client.hpp" #include "webfuse/mocks/mock_provider_client.hpp"
#include "webfuse/core/protocol_names.h"
#include "webfuse/utils/timeout_watcher.hpp"
#include <cstring> #include <cstring>
#include <thread> #include <thread>
#include <atomic> #include <atomic>
using webfuse_test::WebsocketServer; using webfuse_test::ThreadedWsServer;
using webfuse_test::MockProviderClient; using webfuse_test::MockProviderClient;
using webfuse_test::IProviderClient; using webfuse_test::IProviderClient;
using webfuse_test::TimeoutWatcher;
using testing::_; using testing::_;
using testing::AtMost; using testing::AtMost;
using testing::Invoke; using testing::Invoke;
#define DEFAULT_TIMEOUT (std::chrono::milliseconds(5 * 1000))
namespace namespace
{ {
@ -27,29 +32,42 @@ class ClientProtocolFixture
public: public:
explicit ClientProtocolFixture(IProviderClient& client, bool enableAuthentication = false) explicit ClientProtocolFixture(IProviderClient& client, bool enableAuthentication = false)
{ {
server = new ThreadedWsServer(WF_PROTOCOL_NAME_ADAPTER_SERVER);
config = wfp_client_config_create(); config = wfp_client_config_create();
client.AttachTo(config, enableAuthentication); client.AttachTo(config, enableAuthentication);
protocol = wfp_client_protocol_create(config); protocol = wfp_client_protocol_create(config);
struct lws_protocols client_protocol; memset(protocols, 0, sizeof(struct lws_protocols) * 2);
memset(&client_protocol, 0, sizeof(struct lws_protocols)); wfp_client_protocol_init_lws(protocol, protocols);
wfp_client_protocol_init_lws(protocol, &client_protocol);
memset(&info, 0, sizeof(struct lws_context_creation_info));
info.port = CONTEXT_PORT_NO_LISTEN;
info.protocols = protocols;
info.uid = -1;
info.gid = -1;
server = new WebsocketServer(54321, &client_protocol, 1); context = lws_create_context(&info);
} }
~ClientProtocolFixture() ~ClientProtocolFixture()
{ {
delete server; lws_context_destroy(context);
wfp_client_protocol_dispose(protocol); wfp_client_protocol_dispose(protocol);
wfp_client_config_dispose(config); wfp_client_config_dispose(config);
delete server;
} }
void Connect() void Connect()
{ {
wfp_client_protocol_connect(protocol, server->getContext(), "ws://localhost:54321/"); TimeoutWatcher watcher(DEFAULT_TIMEOUT);
server->waitForConnection();
wfp_client_protocol_connect(protocol, context, server->GetUrl().c_str());
while (!server->IsConnected())
{
watcher.check();
lws_service(context, 0);
}
} }
void Disconnect() void Disconnect()
@ -59,19 +77,28 @@ public:
void SendToClient(json_t * request) void SendToClient(json_t * request)
{ {
server->sendMessage(request); server->SendMessage(request);
} }
json_t * ReceiveMessageFromClient() json_t * ReceiveMessageFromClient()
{ {
return server->receiveMessage(); TimeoutWatcher watcher(DEFAULT_TIMEOUT);
json_t * result = server->ReceiveMessage();
while (nullptr == result)
{
watcher.check();
lws_service(context, 0);
result = server->ReceiveMessage();
}
return result;
} }
void AwaitAuthentication( void AwaitAuthentication(
std::string const & expected_username, std::string const & expected_username,
std::string const & expected_password) std::string const & expected_password)
{ {
json_t * request = server->receiveMessage(); json_t * request = ReceiveMessageFromClient();
ASSERT_TRUE(json_is_object(request)); ASSERT_TRUE(json_is_object(request));
json_t * method = json_object_get(request, "method"); json_t * method = json_object_get(request, "method");
@ -103,14 +130,14 @@ public:
json_t * response = json_object(); json_t * response = json_object();
json_object_set_new(response, "result", json_object()); json_object_set_new(response, "result", json_object());
json_object_set(response, "id", id); json_object_set(response, "id", id);
server->sendMessage(response); SendToClient(response);
json_decref(request); json_decref(request);
} }
void AwaitAddFilesystem(std::string& filesystemName) void AwaitAddFilesystem(std::string& filesystemName)
{ {
json_t * addFilesystemRequest = server->receiveMessage(); json_t * addFilesystemRequest = ReceiveMessageFromClient();
ASSERT_NE(nullptr, addFilesystemRequest); ASSERT_NE(nullptr, addFilesystemRequest);
ASSERT_TRUE(json_is_object(addFilesystemRequest)); ASSERT_TRUE(json_is_object(addFilesystemRequest));
@ -135,15 +162,19 @@ public:
json_object_set_new(response, "result", result); json_object_set_new(response, "result", result);
json_object_set(response, "id", id); json_object_set(response, "id", id);
server->sendMessage(response); SendToClient(response);
json_decref(addFilesystemRequest); json_decref(addFilesystemRequest);
} }
private: private:
WebsocketServer * server; ThreadedWsServer * server;
wfp_client_config * config; wfp_client_config * config;
wfp_client_protocol * protocol; wfp_client_protocol * protocol;
struct lws_context_creation_info info;
struct lws_protocols protocols[2];
struct lws_context * context;
}; };
void GetCredentials(wfp_credentials * credentials) void GetCredentials(wfp_credentials * credentials)

@ -1,13 +1,12 @@
#include "webfuse/utils/threaded_ws_server.h" #include "webfuse/utils/threaded_ws_server.h"
#include "webfuse/core/protocol_names.h"
#include "webfuse/core/lws_log.h" #include "webfuse/core/lws_log.h"
#include <libwebsockets.h> #include <libwebsockets.h>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <condition_variable>
#include <chrono> #include <chrono>
#include <sstream> #include <sstream>
#include <queue>
#define TIMEOUT (std::chrono::milliseconds(10 * 1000)) #define TIMEOUT (std::chrono::milliseconds(10 * 1000))
@ -21,6 +20,8 @@ public:
virtual ~IServer() = default; virtual ~IServer() = default;
virtual void OnConnected(lws * wsi) = 0; virtual void OnConnected(lws * wsi) = 0;
virtual void OnConnectionClosed(lws * wsi) = 0; virtual void OnConnectionClosed(lws * wsi) = 0;
virtual void OnMessageReceived(struct lws * wsi, char const * data, size_t length) = 0;
virtual void OnWritable(struct lws * wsi) = 0;
}; };
} }
@ -33,15 +34,20 @@ class ThreadedWsServer::Private : IServer
Private(Private const &) = delete; Private(Private const &) = delete;
Private & operator=(Private const &) = delete; Private & operator=(Private const &) = delete;
public: public:
explicit Private(int port); Private(std::string const & protocol, int port);
~Private(); ~Private();
void WaitForConnection(); bool IsConnected();
std::string GetUrl() const; std::string GetUrl() const;
void SendMessage(json_t * message);
json_t * ReceiveMessage();
void OnConnected(lws * wsi) override; void OnConnected(lws * wsi) override;
void OnConnectionClosed(lws * wsi) override; void OnConnectionClosed(lws * wsi) override;
void OnMessageReceived(struct lws * wsi, char const * data, size_t length) override;
void OnWritable(struct lws * wsi) override;
private: private:
static void run(Private * self); static void run(Private * self);
std::string protocol_;
int port_; int port_;
bool is_connected; bool is_connected;
bool is_shutdown_requested; bool is_shutdown_requested;
@ -51,7 +57,8 @@ private:
lws_context_creation_info info; lws_context_creation_info info;
std::thread context; std::thread context;
std::mutex mutex; std::mutex mutex;
std::condition_variable convar; std::queue<std::string> writeQueue;
std::queue<std::string> recvQueue;
}; };
} }
@ -80,6 +87,15 @@ static int wf_test_utils_threaded_ws_server_callback(
case LWS_CALLBACK_CLOSED: case LWS_CALLBACK_CLOSED:
server->OnConnectionClosed(wsi); server->OnConnectionClosed(wsi);
break; break;
case LWS_CALLBACK_RECEIVE:
{
auto * data = reinterpret_cast<char const *>(in);
server->OnMessageReceived(wsi, data, len);
}
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
server->OnWritable(wsi);
break;
default: default:
break; break;
} }
@ -94,8 +110,8 @@ static int wf_test_utils_threaded_ws_server_callback(
namespace webfuse_test namespace webfuse_test
{ {
ThreadedWsServer::ThreadedWsServer(int port) ThreadedWsServer::ThreadedWsServer(std::string const & protocol, int port)
: d(new Private(port)) : d(new Private(protocol, port))
{ {
} }
@ -105,9 +121,19 @@ ThreadedWsServer::~ThreadedWsServer()
delete d; delete d;
} }
void ThreadedWsServer::WaitForConnection() bool ThreadedWsServer::IsConnected()
{
return d->IsConnected();
}
void ThreadedWsServer::SendMessage(json_t * message)
{
d->SendMessage(message);
}
json_t * ThreadedWsServer::ReceiveMessage()
{ {
d->WaitForConnection(); return d->ReceiveMessage();
} }
std::string ThreadedWsServer::GetUrl() const std::string ThreadedWsServer::GetUrl() const
@ -116,8 +142,9 @@ std::string ThreadedWsServer::GetUrl() const
} }
ThreadedWsServer::Private::Private(int port) ThreadedWsServer::Private::Private(std::string const & protocol, int port)
: port_(port) : protocol_(protocol)
, port_(port)
, is_connected(false) , is_connected(false)
, is_shutdown_requested(false) , is_shutdown_requested(false)
, wsi_(nullptr) , wsi_(nullptr)
@ -126,7 +153,7 @@ ThreadedWsServer::Private::Private(int port)
IServer * server = this; IServer * server = this;
memset(ws_protocols, 0, sizeof(struct lws_protocols) * 2 ); memset(ws_protocols, 0, sizeof(struct lws_protocols) * 2 );
ws_protocols[0].name = WF_PROTOCOL_NAME_PROVIDER_SERVER; ws_protocols[0].name = protocol_.c_str();
ws_protocols[0].callback = &wf_test_utils_threaded_ws_server_callback; ws_protocols[0].callback = &wf_test_utils_threaded_ws_server_callback;
ws_protocols[0].per_session_data_size = 0; ws_protocols[0].per_session_data_size = 0;
ws_protocols[0].user = reinterpret_cast<void*>(server); ws_protocols[0].user = reinterpret_cast<void*>(server);
@ -173,17 +200,10 @@ void ThreadedWsServer::Private::run(Private * self)
} }
} }
void ThreadedWsServer::Private::WaitForConnection() bool ThreadedWsServer::Private::IsConnected()
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
while (!is_connected) return is_connected;
{
auto status = convar.wait_for(lock, TIMEOUT);
if (std::cv_status::timeout == status)
{
throw std::runtime_error("timeout");
}
}
} }
void ThreadedWsServer::Private::OnConnected(lws * wsi) void ThreadedWsServer::Private::OnConnected(lws * wsi)
@ -191,7 +211,6 @@ void ThreadedWsServer::Private::OnConnected(lws * wsi)
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
is_connected = true; is_connected = true;
wsi_ = wsi; wsi_ = wsi;
convar.notify_all();
} }
void ThreadedWsServer::Private::OnConnectionClosed(lws * wsi) void ThreadedWsServer::Private::OnConnectionClosed(lws * wsi)
@ -200,9 +219,83 @@ void ThreadedWsServer::Private::OnConnectionClosed(lws * wsi)
if (wsi == wsi_) if (wsi == wsi_)
{ {
is_connected = false; is_connected = false;
wsi_ = wsi; wsi_ = nullptr;
convar.notify_all(); }
}
void ThreadedWsServer::Private::OnWritable(struct lws * wsi)
{
bool notify = false;
{
std::unique_lock<std::mutex> lock(mutex);
if (!writeQueue.empty())
{
std::string const & message = writeQueue.front();
unsigned char * data = new unsigned char[LWS_PRE + message.size()];
memcpy(&data[LWS_PRE], message.c_str(), message.size());
lws_write(wsi, &data[LWS_PRE], message.size(), LWS_WRITE_TEXT);
delete[] data;
writeQueue.pop();
notify = !writeQueue.empty();
}
}
if (notify)
{
lws_callback_on_writable(wsi);
}
}
void ThreadedWsServer::Private::SendMessage(json_t * message)
{
lws * wsi = nullptr;
{
std::unique_lock<std::mutex> lock(mutex);
if (nullptr != wsi_)
{
char* message_text = json_dumps(message, JSON_COMPACT);
writeQueue.push(message_text);
json_decref(message);
free(message_text);
wsi = wsi_;
}
}
if (nullptr != wsi)
{
lws_callback_on_writable(wsi_);
}
}
void ThreadedWsServer::Private::OnMessageReceived(struct lws * wsi, char const * data, size_t length)
{
std::unique_lock<std::mutex> lock(mutex);
if (wsi == wsi_)
{
recvQueue.push(std::string(data, length));
}
}
json_t * ThreadedWsServer::Private::ReceiveMessage()
{
std::unique_lock<std::mutex> lock(mutex);
json_t * result = nullptr;
if (!recvQueue.empty())
{
std::string const & message_text = recvQueue.front();
result = json_loads(message_text.c_str(), JSON_DECODE_ANY, nullptr);
recvQueue.pop();
} }
return result;
} }
std::string ThreadedWsServer::Private::GetUrl() const std::string ThreadedWsServer::Private::GetUrl() const

@ -13,10 +13,12 @@ class ThreadedWsServer
ThreadedWsServer(ThreadedWsServer const &) = delete; ThreadedWsServer(ThreadedWsServer const &) = delete;
ThreadedWsServer & operator=(ThreadedWsServer const &) = delete; ThreadedWsServer & operator=(ThreadedWsServer const &) = delete;
public: public:
explicit ThreadedWsServer(int port = 0); ThreadedWsServer(std::string const & protocol, int port = 0);
~ThreadedWsServer(); ~ThreadedWsServer();
void WaitForConnection(); bool IsConnected();
std::string GetUrl() const; std::string GetUrl() const;
void SendMessage(json_t * message);
json_t * ReceiveMessage();
private: private:
class Private; class Private;
Private * d; Private * d;

@ -1,266 +0,0 @@
#include "webfuse/utils/ws_server.hpp"
#include "webfuse/utils/timeout_watcher.hpp"
#include "webfuse/core/util.h"
#include "webfuse/core/protocol_names.h"
#include <libwebsockets.h>
#include <cstring>
#include <vector>
#include <queue>
#include <string>
using webfuse_test::TimeoutWatcher;
#define DEFAULT_TIMEOUT (std::chrono::milliseconds(5 * 1000))
namespace
{
class IServer
{
public:
virtual ~IServer() = default;
virtual void onConnectionEstablished(struct lws * wsi) = 0;
virtual void onConnectionClosed(struct lws * wsi) = 0;
virtual void onMessageReceived(struct lws * wsi, char const * data, size_t length) = 0;
virtual void onWritable(struct lws * wsi) = 0;
};
}
extern "C"
{
static int wf_test_utils_ws_server_callback(
struct lws * wsi,
enum lws_callback_reasons reason,
void * WF_UNUSED_PARAM(user),
void * in,
size_t len)
{
struct lws_protocols const * ws_protocol = lws_get_protocol(wsi);
if (NULL == ws_protocol)
{
return 0;
}
auto * server = reinterpret_cast<IServer*>(ws_protocol->user);
switch(reason)
{
case LWS_CALLBACK_ESTABLISHED:
server->onConnectionEstablished(wsi);
break;
case LWS_CALLBACK_CLOSED:
server->onConnectionClosed(wsi);
break;
case LWS_CALLBACK_RECEIVE:
{
auto * data = reinterpret_cast<char const *>(in);
server->onMessageReceived(wsi, data, len);
}
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
server->onWritable(wsi);
break;
default:
break;
}
return 0;
}
}
namespace webfuse_test
{
class WebsocketServer::Private: public IServer
{
public:
Private(int port, struct lws_protocols * additionalProtocols, size_t additionalProtocolsCount)
: client_wsi(nullptr)
{
ws_protocols = new struct lws_protocols[2 + additionalProtocolsCount];
memset(ws_protocols, 0, sizeof(struct lws_protocols) * (2 + additionalProtocolsCount));
ws_protocols[0].name = WF_PROTOCOL_NAME_ADAPTER_SERVER;
ws_protocols[0].callback = &wf_test_utils_ws_server_callback;
ws_protocols[0].per_session_data_size = 0;
ws_protocols[0].user = reinterpret_cast<void*>(this);
if (0 < additionalProtocolsCount)
{
memcpy(&ws_protocols[additionalProtocolsCount], additionalProtocols, sizeof(struct lws_protocols) * additionalProtocolsCount);
}
memset(&info, 0, sizeof(struct lws_context_creation_info));
info.port = port;
info.mounts = NULL;
info.protocols =ws_protocols;
info.vhost_name = "localhost";
info.ws_ping_pong_interval = 10;
info.options = LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE;
context = lws_create_context(&info);
}
virtual ~Private()
{
lws_context_destroy(context);
delete[] ws_protocols;
}
struct lws_context * getContext()
{
return context;
}
void waitForConnection()
{
TimeoutWatcher watcher(DEFAULT_TIMEOUT);
while (nullptr == client_wsi)
{
watcher.check();
lws_service(context, 100);
}
}
void sendMessage(json_t * message)
{
char* message_text = json_dumps(message, JSON_COMPACT);
writeQueue.push(message_text);
json_decref(message);
free(message_text);
if (nullptr != client_wsi)
{
lws_callback_on_writable(client_wsi);
TimeoutWatcher watcher(DEFAULT_TIMEOUT);
while (!writeQueue.empty())
{
watcher.check();
lws_service(context, 100);
}
}
}
json_t * receiveMessage()
{
TimeoutWatcher watcher(DEFAULT_TIMEOUT);
while (recvQueue.empty())
{
watcher.check();
lws_service(context, 100);
}
std::string const & message_text = recvQueue.front();
json_t * message = json_loads(message_text.c_str(), JSON_DECODE_ANY, nullptr);
recvQueue.pop();
return message;
}
void onConnectionEstablished(struct lws * wsi) override
{
client_wsi = wsi;
}
void onConnectionClosed(struct lws * wsi) override
{
if (wsi == client_wsi)
{
client_wsi = nullptr;
}
}
void onMessageReceived(struct lws * wsi, char const * data, size_t length) override
{
if (wsi == client_wsi)
{
recvQueue.push(std::string(data, length));
}
}
void onWritable(struct lws * wsi) override
{
if (!writeQueue.empty())
{
std::string const & message = writeQueue.front();
unsigned char * data = new unsigned char[LWS_PRE + message.size()];
memcpy(&data[LWS_PRE], message.c_str(), message.size());
lws_write(wsi, &data[LWS_PRE], message.size(), LWS_WRITE_TEXT);
delete[] data;
writeQueue.pop();
if (!writeQueue.empty())
{
lws_callback_on_writable(wsi);
}
}
}
private:
void send(std::string const & message)
{
if (nullptr != client_wsi)
{
writeQueue.push(message);
lws_callback_on_writable(client_wsi);
}
}
struct lws * client_wsi;
struct lws_protocols * ws_protocols;
struct lws_context_creation_info info;
struct lws_context * context;
std::queue<std::string> writeQueue;
std::queue<std::string> recvQueue;
};
WebsocketServer::WebsocketServer(int port)
: d(new Private(port, nullptr, 0))
{
}
WebsocketServer::WebsocketServer(int port, struct lws_protocols * additionalProtocols, std::size_t additionalProtocolsCount)
: d(new Private(port, additionalProtocols, additionalProtocolsCount))
{
}
WebsocketServer::~WebsocketServer()
{
delete d;
}
struct lws_context * WebsocketServer::getContext()
{
return d->getContext();
}
void WebsocketServer::waitForConnection()
{
d->waitForConnection();
}
void WebsocketServer::sendMessage(json_t * message)
{
d->sendMessage(message);
}
json_t * WebsocketServer::receiveMessage()
{
return d->receiveMessage();
}
}

@ -1,29 +0,0 @@
#ifndef WF_TEST_UTILS_WS_SERVER_HPP
#define WF_TEST_UTILS_WS_SERVER_HPP
#include <libwebsockets.h>
#include <jansson.h>
namespace webfuse_test
{
class WebsocketServer
{
WebsocketServer(WebsocketServer const &) = delete;
WebsocketServer & operator=(WebsocketServer const &) = delete;
public:
explicit WebsocketServer(int port);
WebsocketServer(int port, struct lws_protocols * additionalProtocols, std::size_t additionalProtocolsCount);
~WebsocketServer();
struct lws_context * getContext();
void waitForConnection();
void sendMessage(json_t * message);
json_t * receiveMessage();
private:
class Private;
Private * d;
};
}
#endif
Loading…
Cancel
Save