From 4d277701cb14ddbede1e1c7ca3311b6c3160cbc1 Mon Sep 17 00:00:00 2001 From: Falk Werner Date: Fri, 12 Jun 2020 16:48:15 +0200 Subject: [PATCH] removed single threaded test server --- meson.build | 1 - test/webfuse/tests/adapter/test_client.cc | 5 +- .../tests/provider/test_client_protocol.cc | 65 +++-- test/webfuse/utils/threaded_ws_server.cc | 141 ++++++++-- test/webfuse/utils/threaded_ws_server.h | 6 +- test/webfuse/utils/ws_server.cc | 266 ------------------ test/webfuse/utils/ws_server.hpp | 29 -- 7 files changed, 171 insertions(+), 342 deletions(-) delete mode 100644 test/webfuse/utils/ws_server.cc delete mode 100644 test/webfuse/utils/ws_server.hpp diff --git a/meson.build b/meson.build index 8d9ffe1..b95bf0a 100644 --- a/meson.build +++ b/meson.build @@ -209,7 +209,6 @@ alltests = executable('alltests', 'test/webfuse/utils/timeout_watcher.cc', 'test/webfuse/utils/path.c', 'test/webfuse/utils/static_filesystem.c', - 'test/webfuse/utils/ws_server.cc', 'test/webfuse/utils/threaded_ws_server.cc', 'test/webfuse/mocks/fake_invokation_context.cc', 'test/webfuse/mocks/mock_authenticator.cc', diff --git a/test/webfuse/tests/adapter/test_client.cc b/test/webfuse/tests/adapter/test_client.cc index 11d30e6..086f5a0 100644 --- a/test/webfuse/tests/adapter/test_client.cc +++ b/test/webfuse/tests/adapter/test_client.cc @@ -2,6 +2,7 @@ #include "webfuse/adapter/client.h" #include "webfuse/adapter/credentials.h" +#include "webfuse/core/protocol_names.h" #include "webfuse/utils/threaded_ws_server.h" using webfuse_test::ThreadedWsServer; @@ -94,8 +95,6 @@ void callback2( TEST(client, general_usage) { - ThreadedWsServer server(54321); - context ctx; ctx.state = connection_state::connecting; @@ -112,7 +111,7 @@ TEST(client, general_usage) TEST(client, connect) { - ThreadedWsServer server; + ThreadedWsServer server(WF_PROTOCOL_NAME_PROVIDER_SERVER); context ctx; ctx.state = connection_state::connecting; diff --git a/test/webfuse/tests/provider/test_client_protocol.cc b/test/webfuse/tests/provider/test_client_protocol.cc index 2035b0f..5aa1301 100644 --- a/test/webfuse/tests/provider/test_client_protocol.cc +++ b/test/webfuse/tests/provider/test_client_protocol.cc @@ -3,20 +3,25 @@ #include #include -#include "webfuse/utils/ws_server.hpp" +#include "webfuse/utils/threaded_ws_server.h" #include "webfuse/mocks/mock_provider_client.hpp" +#include "webfuse/core/protocol_names.h" +#include "webfuse/utils/timeout_watcher.hpp" #include #include #include -using webfuse_test::WebsocketServer; +using webfuse_test::ThreadedWsServer; using webfuse_test::MockProviderClient; using webfuse_test::IProviderClient; +using webfuse_test::TimeoutWatcher; using testing::_; using testing::AtMost; using testing::Invoke; +#define DEFAULT_TIMEOUT (std::chrono::milliseconds(5 * 1000)) + namespace { @@ -27,29 +32,42 @@ class ClientProtocolFixture public: explicit ClientProtocolFixture(IProviderClient& client, bool enableAuthentication = false) { + server = new ThreadedWsServer(WF_PROTOCOL_NAME_ADAPTER_SERVER); + config = wfp_client_config_create(); client.AttachTo(config, enableAuthentication); - protocol = wfp_client_protocol_create(config); - struct lws_protocols client_protocol; - memset(&client_protocol, 0, sizeof(struct lws_protocols)); - wfp_client_protocol_init_lws(protocol, &client_protocol); + memset(protocols, 0, sizeof(struct lws_protocols) * 2); + wfp_client_protocol_init_lws(protocol, protocols); + + memset(&info, 0, sizeof(struct lws_context_creation_info)); + info.port = CONTEXT_PORT_NO_LISTEN; + info.protocols = protocols; + info.uid = -1; + info.gid = -1; - server = new WebsocketServer(54321, &client_protocol, 1); + context = lws_create_context(&info); } ~ClientProtocolFixture() { - delete server; + lws_context_destroy(context); wfp_client_protocol_dispose(protocol); wfp_client_config_dispose(config); + delete server; } void Connect() { - wfp_client_protocol_connect(protocol, server->getContext(), "ws://localhost:54321/"); - server->waitForConnection(); + TimeoutWatcher watcher(DEFAULT_TIMEOUT); + + wfp_client_protocol_connect(protocol, context, server->GetUrl().c_str()); + while (!server->IsConnected()) + { + watcher.check(); + lws_service(context, 0); + } } void Disconnect() @@ -59,19 +77,28 @@ public: void SendToClient(json_t * request) { - server->sendMessage(request); + server->SendMessage(request); } json_t * ReceiveMessageFromClient() { - return server->receiveMessage(); + TimeoutWatcher watcher(DEFAULT_TIMEOUT); + json_t * result = server->ReceiveMessage(); + while (nullptr == result) + { + watcher.check(); + lws_service(context, 0); + result = server->ReceiveMessage(); + } + + return result; } void AwaitAuthentication( std::string const & expected_username, std::string const & expected_password) { - json_t * request = server->receiveMessage(); + json_t * request = ReceiveMessageFromClient(); ASSERT_TRUE(json_is_object(request)); json_t * method = json_object_get(request, "method"); @@ -103,14 +130,14 @@ public: json_t * response = json_object(); json_object_set_new(response, "result", json_object()); json_object_set(response, "id", id); - server->sendMessage(response); + SendToClient(response); json_decref(request); } void AwaitAddFilesystem(std::string& filesystemName) { - json_t * addFilesystemRequest = server->receiveMessage(); + json_t * addFilesystemRequest = ReceiveMessageFromClient(); ASSERT_NE(nullptr, addFilesystemRequest); ASSERT_TRUE(json_is_object(addFilesystemRequest)); @@ -135,15 +162,19 @@ public: json_object_set_new(response, "result", result); json_object_set(response, "id", id); - server->sendMessage(response); + SendToClient(response); json_decref(addFilesystemRequest); } private: - WebsocketServer * server; + ThreadedWsServer * server; wfp_client_config * config; wfp_client_protocol * protocol; + struct lws_context_creation_info info; + struct lws_protocols protocols[2]; + struct lws_context * context; + }; void GetCredentials(wfp_credentials * credentials) diff --git a/test/webfuse/utils/threaded_ws_server.cc b/test/webfuse/utils/threaded_ws_server.cc index 6d1df1b..a0ea1ae 100644 --- a/test/webfuse/utils/threaded_ws_server.cc +++ b/test/webfuse/utils/threaded_ws_server.cc @@ -1,13 +1,12 @@ #include "webfuse/utils/threaded_ws_server.h" -#include "webfuse/core/protocol_names.h" #include "webfuse/core/lws_log.h" #include #include #include -#include #include #include +#include #define TIMEOUT (std::chrono::milliseconds(10 * 1000)) @@ -21,6 +20,8 @@ public: virtual ~IServer() = default; virtual void OnConnected(lws * wsi) = 0; virtual void OnConnectionClosed(lws * wsi) = 0; + virtual void OnMessageReceived(struct lws * wsi, char const * data, size_t length) = 0; + virtual void OnWritable(struct lws * wsi) = 0; }; } @@ -33,15 +34,20 @@ class ThreadedWsServer::Private : IServer Private(Private const &) = delete; Private & operator=(Private const &) = delete; public: - explicit Private(int port); + Private(std::string const & protocol, int port); ~Private(); - void WaitForConnection(); + bool IsConnected(); std::string GetUrl() const; + void SendMessage(json_t * message); + json_t * ReceiveMessage(); void OnConnected(lws * wsi) override; void OnConnectionClosed(lws * wsi) override; + void OnMessageReceived(struct lws * wsi, char const * data, size_t length) override; + void OnWritable(struct lws * wsi) override; private: static void run(Private * self); + std::string protocol_; int port_; bool is_connected; bool is_shutdown_requested; @@ -51,7 +57,8 @@ private: lws_context_creation_info info; std::thread context; std::mutex mutex; - std::condition_variable convar; + std::queue writeQueue; + std::queue recvQueue; }; } @@ -80,6 +87,15 @@ static int wf_test_utils_threaded_ws_server_callback( case LWS_CALLBACK_CLOSED: server->OnConnectionClosed(wsi); break; + case LWS_CALLBACK_RECEIVE: + { + auto * data = reinterpret_cast(in); + server->OnMessageReceived(wsi, data, len); + } + break; + case LWS_CALLBACK_SERVER_WRITEABLE: + server->OnWritable(wsi); + break; default: break; } @@ -94,8 +110,8 @@ static int wf_test_utils_threaded_ws_server_callback( namespace webfuse_test { -ThreadedWsServer::ThreadedWsServer(int port) -: d(new Private(port)) +ThreadedWsServer::ThreadedWsServer(std::string const & protocol, int port) +: d(new Private(protocol, port)) { } @@ -105,9 +121,19 @@ ThreadedWsServer::~ThreadedWsServer() delete d; } -void ThreadedWsServer::WaitForConnection() +bool ThreadedWsServer::IsConnected() +{ + return d->IsConnected(); +} + +void ThreadedWsServer::SendMessage(json_t * message) +{ + d->SendMessage(message); +} + +json_t * ThreadedWsServer::ReceiveMessage() { - d->WaitForConnection(); + return d->ReceiveMessage(); } std::string ThreadedWsServer::GetUrl() const @@ -116,8 +142,9 @@ std::string ThreadedWsServer::GetUrl() const } -ThreadedWsServer::Private::Private(int port) -: port_(port) +ThreadedWsServer::Private::Private(std::string const & protocol, int port) +: protocol_(protocol) +, port_(port) , is_connected(false) , is_shutdown_requested(false) , wsi_(nullptr) @@ -126,7 +153,7 @@ ThreadedWsServer::Private::Private(int port) IServer * server = this; memset(ws_protocols, 0, sizeof(struct lws_protocols) * 2 ); - ws_protocols[0].name = WF_PROTOCOL_NAME_PROVIDER_SERVER; + ws_protocols[0].name = protocol_.c_str(); ws_protocols[0].callback = &wf_test_utils_threaded_ws_server_callback; ws_protocols[0].per_session_data_size = 0; ws_protocols[0].user = reinterpret_cast(server); @@ -173,17 +200,10 @@ void ThreadedWsServer::Private::run(Private * self) } } -void ThreadedWsServer::Private::WaitForConnection() +bool ThreadedWsServer::Private::IsConnected() { std::unique_lock lock(mutex); - while (!is_connected) - { - auto status = convar.wait_for(lock, TIMEOUT); - if (std::cv_status::timeout == status) - { - throw std::runtime_error("timeout"); - } - } + return is_connected; } void ThreadedWsServer::Private::OnConnected(lws * wsi) @@ -191,7 +211,6 @@ void ThreadedWsServer::Private::OnConnected(lws * wsi) std::unique_lock lock(mutex); is_connected = true; wsi_ = wsi; - convar.notify_all(); } void ThreadedWsServer::Private::OnConnectionClosed(lws * wsi) @@ -200,9 +219,83 @@ void ThreadedWsServer::Private::OnConnectionClosed(lws * wsi) if (wsi == wsi_) { is_connected = false; - wsi_ = wsi; - convar.notify_all(); + wsi_ = nullptr; + } +} + +void ThreadedWsServer::Private::OnWritable(struct lws * wsi) +{ + bool notify = false; + + { + std::unique_lock lock(mutex); + + if (!writeQueue.empty()) + { + std::string const & message = writeQueue.front(); + + unsigned char * data = new unsigned char[LWS_PRE + message.size()]; + memcpy(&data[LWS_PRE], message.c_str(), message.size()); + lws_write(wsi, &data[LWS_PRE], message.size(), LWS_WRITE_TEXT); + delete[] data; + + writeQueue.pop(); + notify = !writeQueue.empty(); + } + } + + if (notify) + { + lws_callback_on_writable(wsi); + } +} + + +void ThreadedWsServer::Private::SendMessage(json_t * message) +{ + lws * wsi = nullptr; + + { + std::unique_lock lock(mutex); + + if (nullptr != wsi_) + { + char* message_text = json_dumps(message, JSON_COMPACT); + writeQueue.push(message_text); + json_decref(message); + free(message_text); + wsi = wsi_; + } + } + + if (nullptr != wsi) + { + lws_callback_on_writable(wsi_); + } +} + +void ThreadedWsServer::Private::OnMessageReceived(struct lws * wsi, char const * data, size_t length) +{ + std::unique_lock lock(mutex); + if (wsi == wsi_) + { + recvQueue.push(std::string(data, length)); + } +} + +json_t * ThreadedWsServer::Private::ReceiveMessage() +{ + std::unique_lock lock(mutex); + + json_t * result = nullptr; + if (!recvQueue.empty()) + { + std::string const & message_text = recvQueue.front(); + result = json_loads(message_text.c_str(), JSON_DECODE_ANY, nullptr); + recvQueue.pop(); } + + return result; } std::string ThreadedWsServer::Private::GetUrl() const diff --git a/test/webfuse/utils/threaded_ws_server.h b/test/webfuse/utils/threaded_ws_server.h index 127dcf4..fe6467e 100644 --- a/test/webfuse/utils/threaded_ws_server.h +++ b/test/webfuse/utils/threaded_ws_server.h @@ -13,10 +13,12 @@ class ThreadedWsServer ThreadedWsServer(ThreadedWsServer const &) = delete; ThreadedWsServer & operator=(ThreadedWsServer const &) = delete; public: - explicit ThreadedWsServer(int port = 0); + ThreadedWsServer(std::string const & protocol, int port = 0); ~ThreadedWsServer(); - void WaitForConnection(); + bool IsConnected(); std::string GetUrl() const; + void SendMessage(json_t * message); + json_t * ReceiveMessage(); private: class Private; Private * d; diff --git a/test/webfuse/utils/ws_server.cc b/test/webfuse/utils/ws_server.cc deleted file mode 100644 index 48e2588..0000000 --- a/test/webfuse/utils/ws_server.cc +++ /dev/null @@ -1,266 +0,0 @@ -#include "webfuse/utils/ws_server.hpp" -#include "webfuse/utils/timeout_watcher.hpp" - -#include "webfuse/core/util.h" -#include "webfuse/core/protocol_names.h" -#include -#include -#include -#include -#include - -using webfuse_test::TimeoutWatcher; - -#define DEFAULT_TIMEOUT (std::chrono::milliseconds(5 * 1000)) - -namespace -{ - -class IServer -{ -public: - virtual ~IServer() = default; - virtual void onConnectionEstablished(struct lws * wsi) = 0; - virtual void onConnectionClosed(struct lws * wsi) = 0; - virtual void onMessageReceived(struct lws * wsi, char const * data, size_t length) = 0; - virtual void onWritable(struct lws * wsi) = 0; -}; - -} - -extern "C" -{ - -static int wf_test_utils_ws_server_callback( - struct lws * wsi, - enum lws_callback_reasons reason, - void * WF_UNUSED_PARAM(user), - void * in, - size_t len) -{ - struct lws_protocols const * ws_protocol = lws_get_protocol(wsi); - if (NULL == ws_protocol) - { - return 0; - } - - auto * server = reinterpret_cast(ws_protocol->user); - switch(reason) - { - case LWS_CALLBACK_ESTABLISHED: - server->onConnectionEstablished(wsi); - break; - case LWS_CALLBACK_CLOSED: - server->onConnectionClosed(wsi); - break; - case LWS_CALLBACK_RECEIVE: - { - auto * data = reinterpret_cast(in); - server->onMessageReceived(wsi, data, len); - } - break; - case LWS_CALLBACK_SERVER_WRITEABLE: - server->onWritable(wsi); - break; - default: - break; - } - - return 0; -} - -} - -namespace webfuse_test -{ - -class WebsocketServer::Private: public IServer -{ -public: - Private(int port, struct lws_protocols * additionalProtocols, size_t additionalProtocolsCount) - : client_wsi(nullptr) - { - ws_protocols = new struct lws_protocols[2 + additionalProtocolsCount]; - memset(ws_protocols, 0, sizeof(struct lws_protocols) * (2 + additionalProtocolsCount)); - - ws_protocols[0].name = WF_PROTOCOL_NAME_ADAPTER_SERVER; - ws_protocols[0].callback = &wf_test_utils_ws_server_callback; - ws_protocols[0].per_session_data_size = 0; - ws_protocols[0].user = reinterpret_cast(this); - - if (0 < additionalProtocolsCount) - { - memcpy(&ws_protocols[additionalProtocolsCount], additionalProtocols, sizeof(struct lws_protocols) * additionalProtocolsCount); - } - - memset(&info, 0, sizeof(struct lws_context_creation_info)); - info.port = port; - info.mounts = NULL; - info.protocols =ws_protocols; - info.vhost_name = "localhost"; - info.ws_ping_pong_interval = 10; - info.options = LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE; - - context = lws_create_context(&info); - - } - - virtual ~Private() - { - lws_context_destroy(context); - delete[] ws_protocols; - } - - struct lws_context * getContext() - { - return context; - } - - void waitForConnection() - { - TimeoutWatcher watcher(DEFAULT_TIMEOUT); - - while (nullptr == client_wsi) - { - watcher.check(); - lws_service(context, 100); - } - } - - void sendMessage(json_t * message) - { - char* message_text = json_dumps(message, JSON_COMPACT); - writeQueue.push(message_text); - json_decref(message); - free(message_text); - - if (nullptr != client_wsi) - { - lws_callback_on_writable(client_wsi); - - TimeoutWatcher watcher(DEFAULT_TIMEOUT); - while (!writeQueue.empty()) - { - watcher.check(); - lws_service(context, 100); - } - } - } - - json_t * receiveMessage() - { - TimeoutWatcher watcher(DEFAULT_TIMEOUT); - - while (recvQueue.empty()) - { - watcher.check(); - lws_service(context, 100); - } - - std::string const & message_text = recvQueue.front(); - json_t * message = json_loads(message_text.c_str(), JSON_DECODE_ANY, nullptr); - recvQueue.pop(); - - return message; - } - - void onConnectionEstablished(struct lws * wsi) override - { - client_wsi = wsi; - } - - void onConnectionClosed(struct lws * wsi) override - { - if (wsi == client_wsi) - { - client_wsi = nullptr; - } - } - - void onMessageReceived(struct lws * wsi, char const * data, size_t length) override - { - if (wsi == client_wsi) - { - recvQueue.push(std::string(data, length)); - } - } - - void onWritable(struct lws * wsi) override - { - if (!writeQueue.empty()) - { - std::string const & message = writeQueue.front(); - - unsigned char * data = new unsigned char[LWS_PRE + message.size()]; - memcpy(&data[LWS_PRE], message.c_str(), message.size()); - lws_write(wsi, &data[LWS_PRE], message.size(), LWS_WRITE_TEXT); - delete[] data; - - writeQueue.pop(); - if (!writeQueue.empty()) - { - lws_callback_on_writable(wsi); - } - } - } - - -private: - void send(std::string const & message) - { - if (nullptr != client_wsi) - { - writeQueue.push(message); - lws_callback_on_writable(client_wsi); - } - } - - struct lws * client_wsi; - - struct lws_protocols * ws_protocols; - struct lws_context_creation_info info; - struct lws_context * context; - std::queue writeQueue; - std::queue recvQueue; - -}; - -WebsocketServer::WebsocketServer(int port) -: d(new Private(port, nullptr, 0)) -{ - -} - -WebsocketServer::WebsocketServer(int port, struct lws_protocols * additionalProtocols, std::size_t additionalProtocolsCount) -: d(new Private(port, additionalProtocols, additionalProtocolsCount)) -{ - -} - -WebsocketServer::~WebsocketServer() -{ - delete d; -} - -struct lws_context * WebsocketServer::getContext() -{ - return d->getContext(); -} - -void WebsocketServer::waitForConnection() -{ - d->waitForConnection(); -} - -void WebsocketServer::sendMessage(json_t * message) -{ - d->sendMessage(message); -} - -json_t * WebsocketServer::receiveMessage() -{ - return d->receiveMessage(); -} - - -} \ No newline at end of file diff --git a/test/webfuse/utils/ws_server.hpp b/test/webfuse/utils/ws_server.hpp deleted file mode 100644 index e9d825a..0000000 --- a/test/webfuse/utils/ws_server.hpp +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef WF_TEST_UTILS_WS_SERVER_HPP -#define WF_TEST_UTILS_WS_SERVER_HPP - -#include -#include - -namespace webfuse_test -{ - -class WebsocketServer -{ - WebsocketServer(WebsocketServer const &) = delete; - WebsocketServer & operator=(WebsocketServer const &) = delete; -public: - explicit WebsocketServer(int port); - WebsocketServer(int port, struct lws_protocols * additionalProtocols, std::size_t additionalProtocolsCount); - ~WebsocketServer(); - struct lws_context * getContext(); - void waitForConnection(); - void sendMessage(json_t * message); - json_t * receiveMessage(); -private: - class Private; - Private * d; -}; - -} - -#endif