From 4eea967b5d8b8466bb12810dd638fb9382781406 Mon Sep 17 00:00:00 2001 From: Falk Werner Date: Tue, 7 Jul 2020 16:48:46 +0200 Subject: [PATCH 1/2] added test to show error reading large files --- test/webfuse/test_server.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/webfuse/test_server.cc b/test/webfuse/test_server.cc index 78b86c3..9f56b18 100644 --- a/test/webfuse/test_server.cc +++ b/test/webfuse/test_server.cc @@ -382,11 +382,11 @@ TEST(server, read_large_file) MockInvokationHander handler; EXPECT_CALL(handler, Invoke(StrEq("lookup"), _)).Times(AnyNumber()); EXPECT_CALL(handler, Invoke(StrEq("lookup"), Lookup(1, "a.file"))).Times(1) - .WillOnce(Return("{\"inode\": 2, \"mode\": 420, \"type\": \"file\", \"size\": 102400}")); + .WillOnce(Return("{\"inode\": 2, \"mode\": 420, \"type\": \"file\", \"size\": 1024000}")); EXPECT_CALL(handler, Invoke(StrEq("getattr"), GetAttr(1))).Times(AnyNumber()) .WillRepeatedly(Return("{\"mode\": 420, \"type\": \"dir\"}")); EXPECT_CALL(handler, Invoke(StrEq("getattr"), GetAttr(2))).Times(AnyNumber()) - .WillRepeatedly(Return("{\"mode\": 420, \"type\": \"file\", \"size\": 102400}")); + .WillRepeatedly(Return("{\"mode\": 420, \"type\": \"file\", \"size\": 1024000}")); EXPECT_CALL(handler, Invoke(StrEq("open"), Open(2))).Times(1) .WillOnce(Return("{\"handle\": 42}")); EXPECT_CALL(handler, Invoke(StrEq("read"), _)).Times(AnyNumber()) @@ -394,7 +394,9 @@ TEST(server, read_large_file) int offset = json_integer_value(json_array_get(params, 3)); int length = json_integer_value(json_array_get(params, 4)); - int remaining = (offset < 102400) ? 102400 - offset : 0; + std::cout << "offset: " << offset << ", length: " << length << std::endl; + + int remaining = (offset < 1024000) ? 1024000 - offset : 0; int count = (length < remaining) ? length : remaining; std::string data = std::string(count, '*'); From 8e6df14aab1ae3f9f2b5afa2507b16afc0c1eac6 Mon Sep 17 00:00:00 2001 From: Falk Werner Date: Tue, 7 Jul 2020 22:22:23 +0200 Subject: [PATCH 2/2] fix: allow parallel jsonrpc proxy requests (issue #85) --- lib/webfuse/impl/jsonrpc/proxy.c | 97 ++------- lib/webfuse/impl/jsonrpc/proxy_intern.h | 14 +- .../impl/jsonrpc/proxy_request_manager.c | 189 ++++++++++++++++++ .../impl/jsonrpc/proxy_request_manager.h | 47 +++++ meson.build | 1 + test/webfuse/jsonrpc/test_proxy.cc | 27 +-- test/webfuse/test_server.cc | 2 - 7 files changed, 258 insertions(+), 119 deletions(-) create mode 100644 lib/webfuse/impl/jsonrpc/proxy_request_manager.c create mode 100644 lib/webfuse/impl/jsonrpc/proxy_request_manager.h diff --git a/lib/webfuse/impl/jsonrpc/proxy.c b/lib/webfuse/impl/jsonrpc/proxy.c index 2870e2d..c3d1235 100644 --- a/lib/webfuse/impl/jsonrpc/proxy.c +++ b/lib/webfuse/impl/jsonrpc/proxy.c @@ -1,9 +1,9 @@ #include "webfuse/impl/jsonrpc/proxy_intern.h" +#include "webfuse/impl/jsonrpc/proxy_request_manager.h" #include "webfuse/impl/jsonrpc/response_intern.h" #include "webfuse/impl/jsonrpc/error.h" #include "webfuse/status.h" -#include "webfuse/impl/timer/timer.h" #include #include @@ -28,25 +28,6 @@ void wf_impl_jsonrpc_proxy_dispose( free(proxy); } -static void wf_impl_jsonrpc_proxy_on_timeout( - struct wf_timer * timer, void * proxy_ptr) -{ - struct wf_jsonrpc_proxy * proxy = proxy_ptr; - - if (proxy->request.is_pending) - { - wf_jsonrpc_proxy_finished_fn * finished = proxy->request.finished; - void * user_data = proxy->request.user_data; - - proxy->request.is_pending = false; - proxy->request.id = 0; - proxy->request.user_data = NULL; - proxy->request.finished = NULL; - wf_impl_timer_cancel(timer); - - wf_impl_jsonrpc_propate_error(finished, user_data, WF_BAD_TIMEOUT, "Timeout"); - } -} static json_t * wf_impl_jsonrpc_request_create( char const * method, @@ -106,31 +87,16 @@ void wf_impl_jsonrpc_proxy_init( void * user_data) { proxy->send = send; - proxy->timeout = timeout; proxy->user_data = user_data; - proxy->request.is_pending = false; - proxy->request.timer = wf_impl_timer_create(timeout_manager, - &wf_impl_jsonrpc_proxy_on_timeout, proxy); + + proxy->request_manager = wf_impl_jsonrpc_proxy_request_manager_create( + timeout_manager, timeout); } void wf_impl_jsonrpc_proxy_cleanup( struct wf_jsonrpc_proxy * proxy) { - if (proxy->request.is_pending) - { - void * user_data = proxy->request.user_data; - wf_jsonrpc_proxy_finished_fn * finished = proxy->request.finished; - - proxy->request.is_pending = false; - proxy->request.finished = NULL; - proxy->request.user_data = NULL; - proxy->request.id = 0; - wf_impl_timer_cancel(proxy->request.timer); - - wf_impl_jsonrpc_propate_error(finished, user_data, WF_BAD, "Bad: cancelled pending request during shutdown"); - } - - wf_impl_timer_dispose(proxy->request.timer); + wf_impl_jsonrpc_proxy_request_manager_dispose(proxy->request_manager); } void wf_impl_jsonrpc_proxy_vinvoke( @@ -141,36 +107,20 @@ void wf_impl_jsonrpc_proxy_vinvoke( char const * param_info, va_list args) { - if (!proxy->request.is_pending) + int id = wf_impl_jsonrpc_proxy_request_manager_add_request( + proxy->request_manager, finished, user_data); + + json_t * request = wf_impl_jsonrpc_request_create(method_name, id, param_info, args); + bool const is_send = ((NULL != request) && (proxy->send(request, proxy->user_data))); + if (!is_send) { - proxy->request.is_pending = true; - proxy->request.finished = finished; - proxy->request.user_data = user_data; - proxy->request.id = 42; - wf_impl_timer_start(proxy->request.timer, proxy->timeout); - - json_t * request = wf_impl_jsonrpc_request_create(method_name, proxy->request.id, param_info, args); - - bool const is_send = ((NULL != request) && (proxy->send(request, proxy->user_data))); - if (!is_send) - { - proxy->request.is_pending = false; - proxy->request.finished = NULL; - proxy->request.user_data = NULL; - proxy->request.id = 0; - wf_impl_timer_cancel(proxy->request.timer); - - wf_impl_jsonrpc_propate_error(finished, user_data, WF_BAD, "Bad: requenst is not sent"); - } - - if (NULL != request) - { - json_decref(request); - } + wf_impl_jsonrpc_proxy_request_manager_cancel_request( + proxy->request_manager, id, WF_BAD, "Bad: failed to send request"); } - else + + if (NULL != request) { - wf_impl_jsonrpc_propate_error(finished, user_data, WF_BAD_BUSY, "Busy"); + json_decref(request); } } @@ -197,19 +147,8 @@ void wf_impl_jsonrpc_proxy_onresult( struct wf_jsonrpc_response response; wf_impl_jsonrpc_response_init(&response, message); - if ((proxy->request.is_pending) && (response.id == proxy->request.id)) - { - wf_jsonrpc_proxy_finished_fn * finished = proxy->request.finished; - void * user_data = proxy->request.user_data; - - proxy->request.is_pending = false; - proxy->request.id = 0; - proxy->request.user_data = NULL; - proxy->request.finished = NULL; - wf_impl_timer_cancel(proxy->request.timer); - - finished(user_data, response.result, response.error); - } + wf_impl_jsonrpc_proxy_request_manager_finish_request( + proxy->request_manager, &response); wf_impl_jsonrpc_response_cleanup(&response); } diff --git a/lib/webfuse/impl/jsonrpc/proxy_intern.h b/lib/webfuse/impl/jsonrpc/proxy_intern.h index 86a773a..d260798 100644 --- a/lib/webfuse/impl/jsonrpc/proxy_intern.h +++ b/lib/webfuse/impl/jsonrpc/proxy_intern.h @@ -10,21 +10,11 @@ extern "C" { #endif -struct wf_timer; - -struct wf_jsonrpc_request -{ - bool is_pending; - wf_jsonrpc_proxy_finished_fn * finished; - void * user_data; - int id; - struct wf_timer * timer; -}; +struct wf_jsonrpc_proxy_request_manager; struct wf_jsonrpc_proxy { - struct wf_jsonrpc_request request; - int timeout; + struct wf_jsonrpc_proxy_request_manager * request_manager; wf_jsonrpc_send_fn * send; void * user_data; }; diff --git a/lib/webfuse/impl/jsonrpc/proxy_request_manager.c b/lib/webfuse/impl/jsonrpc/proxy_request_manager.c new file mode 100644 index 0000000..d576a20 --- /dev/null +++ b/lib/webfuse/impl/jsonrpc/proxy_request_manager.c @@ -0,0 +1,189 @@ +#include "webfuse/impl/jsonrpc/proxy_request_manager.h" +#include "webfuse/status.h" +#include "webfuse/impl/timer/timer.h" +#include "webfuse/impl/jsonrpc/response_intern.h" +#include "webfuse/impl/jsonrpc/error.h" + +#include +#include + +struct wf_timer; + +struct wf_jsonrpc_proxy_request +{ + struct wf_jsonrpc_proxy_request_manager * manager; + int id; + wf_jsonrpc_proxy_finished_fn * finished; + void * user_data; + struct wf_timer * timer; + struct wf_jsonrpc_proxy_request * next; +}; + +struct wf_jsonrpc_proxy_request_manager +{ + struct wf_timer_manager * timer_manager; + int timeout; + int id; + struct wf_jsonrpc_proxy_request * requests; +}; + +static void +wf_impl_jsonrpc_proxy_request_on_timeout( + struct wf_timer * timer, + void * user_data) +{ + struct wf_jsonrpc_proxy_request * request = user_data; + + wf_impl_jsonrpc_proxy_request_manager_cancel_request( + request->manager, + request->id, + WF_BAD_TIMEOUT, + "Timeout"); +} + +static int +wf_impl_jsonrpc_proxy_request_manager_next_id( + struct wf_jsonrpc_proxy_request_manager * manager) +{ + if (manager->id < INT_MAX) + { + manager->id++; + } + else + { + manager->id = 1; + } + + return manager->id; +} + + +struct wf_jsonrpc_proxy_request_manager * +wf_impl_jsonrpc_proxy_request_manager_create( + struct wf_timer_manager * timer_manager, + int timeout) +{ + struct wf_jsonrpc_proxy_request_manager * manager = malloc(sizeof(struct wf_jsonrpc_proxy_request_manager)); + manager->id = 1; + manager->timer_manager = timer_manager; + manager->timeout = timeout; + manager->requests = NULL; + + return manager; +} + +void +wf_impl_jsonrpc_proxy_request_manager_dispose( + struct wf_jsonrpc_proxy_request_manager * manager) +{ + struct wf_jsonrpc_proxy_request * request = manager->requests; + while (NULL != request) + { + struct wf_jsonrpc_proxy_request * next = request->next; + + wf_impl_jsonrpc_propate_error( + request->finished, request->user_data, + WF_BAD, "Bad: cancelled pending request during shutdown"); + wf_impl_timer_cancel(request->timer); + wf_impl_timer_dispose(request->timer); + free(request); + request = next; + } + + free(manager); +} + +int +wf_impl_jsonrpc_proxy_request_manager_add_request( + struct wf_jsonrpc_proxy_request_manager * manager, + wf_jsonrpc_proxy_finished_fn * finished, + void * user_data) +{ + struct wf_jsonrpc_proxy_request * request = malloc(sizeof(struct wf_jsonrpc_proxy_request)); + request->finished = finished; + request->user_data = user_data; + request->manager = manager; + request->id = wf_impl_jsonrpc_proxy_request_manager_next_id(manager); + request->timer = wf_impl_timer_create(manager->timer_manager, + &wf_impl_jsonrpc_proxy_request_on_timeout ,request); + wf_impl_timer_start(request->timer, manager->timeout); + + request->next = manager->requests; + manager->requests = request; + + return request->id; +} + +void +wf_impl_jsonrpc_proxy_request_manager_cancel_request( + struct wf_jsonrpc_proxy_request_manager * manager, + int id, + int error_code, + char const * error_message) +{ + struct wf_jsonrpc_proxy_request * prev = NULL; + struct wf_jsonrpc_proxy_request * request = manager->requests; + while (request != NULL) + { + struct wf_jsonrpc_proxy_request * next = request->next; + if (id == request->id) + { + wf_impl_jsonrpc_propate_error( + request->finished, request->user_data, + error_code, error_message); + wf_impl_timer_cancel(request->timer); + wf_impl_timer_dispose(request->timer); + free(request); + + if (NULL != prev) + { + prev->next = next; + } + else + { + manager->requests = next; + } + break; + } + + prev = request; + request = next; + } +} + +void +wf_impl_jsonrpc_proxy_request_manager_finish_request( + struct wf_jsonrpc_proxy_request_manager * manager, + struct wf_jsonrpc_response * response) +{ + struct wf_jsonrpc_proxy_request * prev = NULL; + struct wf_jsonrpc_proxy_request * request = manager->requests; + while (request != NULL) + { + struct wf_jsonrpc_proxy_request * next = request->next; + if (response->id == request->id) + { + wf_jsonrpc_proxy_finished_fn * finished = request->finished; + void * user_data = request->user_data; + + wf_impl_timer_cancel(request->timer); + wf_impl_timer_dispose(request->timer); + free(request); + + if (NULL != prev) + { + prev->next = next; + } + else + { + manager->requests = next; + } + + finished(user_data, response->result, response->error); + break; + } + + prev = request; + request = next; + } +} diff --git a/lib/webfuse/impl/jsonrpc/proxy_request_manager.h b/lib/webfuse/impl/jsonrpc/proxy_request_manager.h new file mode 100644 index 0000000..1760756 --- /dev/null +++ b/lib/webfuse/impl/jsonrpc/proxy_request_manager.h @@ -0,0 +1,47 @@ +#ifndef WF_IMPL_JSONRPC_PROXY_REQUEST_MANAGER_H +#define WF_IMPL_JSONRPC_PROXY_REQUEST_MANAGER_H + +#include "webfuse/impl/jsonrpc/proxy_finished_fn.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +struct wf_jsonrpc_proxy_request_manager; +struct wf_jsonrpc_response; +struct wf_timer_manager; + +extern struct wf_jsonrpc_proxy_request_manager * +wf_impl_jsonrpc_proxy_request_manager_create( + struct wf_timer_manager * timer_manager, + int timeout); + +extern void +wf_impl_jsonrpc_proxy_request_manager_dispose( + struct wf_jsonrpc_proxy_request_manager * manager); + +extern int +wf_impl_jsonrpc_proxy_request_manager_add_request( + struct wf_jsonrpc_proxy_request_manager * manager, + wf_jsonrpc_proxy_finished_fn * finished, + void * user_data); + +extern void +wf_impl_jsonrpc_proxy_request_manager_cancel_request( + struct wf_jsonrpc_proxy_request_manager * manager, + int id, + int error_code, + char const * error_message); + +extern void +wf_impl_jsonrpc_proxy_request_manager_finish_request( + struct wf_jsonrpc_proxy_request_manager * manager, + struct wf_jsonrpc_response * response); + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/meson.build b/meson.build index 3ff6f23..604d268 100644 --- a/meson.build +++ b/meson.build @@ -29,6 +29,7 @@ webfuse_static = static_library('webfuse', 'lib/webfuse/impl/timer/timepoint.c', 'lib/webfuse/impl/timer/timer.c', 'lib/webfuse/impl/jsonrpc/proxy.c', + 'lib/webfuse/impl/jsonrpc/proxy_request_manager.c', 'lib/webfuse/impl/jsonrpc/proxy_variadic.c', 'lib/webfuse/impl/jsonrpc/server.c', 'lib/webfuse/impl/jsonrpc/method.c', diff --git a/test/webfuse/jsonrpc/test_proxy.cc b/test/webfuse/jsonrpc/test_proxy.cc index c23ef2e..f5368d0 100644 --- a/test/webfuse/jsonrpc/test_proxy.cc +++ b/test/webfuse/jsonrpc/test_proxy.cc @@ -174,7 +174,7 @@ TEST(wf_jsonrpc_proxy, invoke_calls_finish_if_send_fails) wf_impl_timer_manager_dispose(timer_manager); } -TEST(wf_jsonrpc_proxy, invoke_fails_if_another_request_is_pending) +TEST(wf_jsonrpc_proxy, invoke_if_another_request_is_pending) { struct wf_timer_manager * timer_manager = wf_impl_timer_manager_create(); @@ -195,9 +195,6 @@ TEST(wf_jsonrpc_proxy, invoke_fails_if_another_request_is_pending) ASSERT_FALSE(finished_context.is_called); - ASSERT_TRUE(finished_context2.is_called); - ASSERT_EQ(WF_BAD_BUSY, jsonrpc_get_status(finished_context2.error)); - wf_impl_jsonrpc_proxy_dispose(proxy); wf_impl_timer_manager_dispose(timer_manager); } @@ -387,28 +384,6 @@ TEST(wf_jsonrpc_proxy, notify_dont_send_invalid_request) wf_impl_timer_manager_dispose(timer_manager); } -TEST(wf_jsonrpc_proxy, swallow_timeout_if_no_request_pending) -{ - MockTimer timer_api; - - wf_timer_on_timer_fn * on_timer = nullptr; - void * timer_context = nullptr; - EXPECT_CALL(timer_api, wf_impl_timer_create(_, _, _)) - .Times(1) - .WillOnce(DoAll(SaveArg<1>(&on_timer), SaveArg<2>(&timer_context), Return(nullptr))); - EXPECT_CALL(timer_api, wf_impl_timer_dispose(_)).Times(1); - - SendContext send_context; - void * send_data = reinterpret_cast(&send_context); - struct wf_jsonrpc_proxy * proxy = wf_impl_jsonrpc_proxy_create(nullptr, 1, &jsonrpc_send, send_data); - - on_timer(nullptr, timer_context); - ASSERT_FALSE(send_context.is_called); - - - wf_impl_jsonrpc_proxy_dispose(proxy); -} - TEST(wf_jsonrpc_proxy, on_result_swallow_if_no_request_pending) { struct wf_timer_manager * timer_manager = wf_impl_timer_manager_create(); diff --git a/test/webfuse/test_server.cc b/test/webfuse/test_server.cc index 9f56b18..85aba28 100644 --- a/test/webfuse/test_server.cc +++ b/test/webfuse/test_server.cc @@ -394,8 +394,6 @@ TEST(server, read_large_file) int offset = json_integer_value(json_array_get(params, 3)); int length = json_integer_value(json_array_get(params, 4)); - std::cout << "offset: " << offset << ", length: " << length << std::endl; - int remaining = (offset < 1024000) ? 1024000 - offset : 0; int count = (length < remaining) ? length : remaining;