fix: allow parallel jsonrpc proxy requests (issue #85)

pull/86/head
Falk Werner 4 years ago
parent 4eea967b5d
commit 8e6df14aab

@ -1,9 +1,9 @@
#include "webfuse/impl/jsonrpc/proxy_intern.h"
#include "webfuse/impl/jsonrpc/proxy_request_manager.h"
#include "webfuse/impl/jsonrpc/response_intern.h"
#include "webfuse/impl/jsonrpc/error.h"
#include "webfuse/status.h"
#include "webfuse/impl/timer/timer.h"
#include <stdlib.h>
#include <string.h>
@ -28,25 +28,6 @@ void wf_impl_jsonrpc_proxy_dispose(
free(proxy);
}
static void wf_impl_jsonrpc_proxy_on_timeout(
struct wf_timer * timer, void * proxy_ptr)
{
struct wf_jsonrpc_proxy * proxy = proxy_ptr;
if (proxy->request.is_pending)
{
wf_jsonrpc_proxy_finished_fn * finished = proxy->request.finished;
void * user_data = proxy->request.user_data;
proxy->request.is_pending = false;
proxy->request.id = 0;
proxy->request.user_data = NULL;
proxy->request.finished = NULL;
wf_impl_timer_cancel(timer);
wf_impl_jsonrpc_propate_error(finished, user_data, WF_BAD_TIMEOUT, "Timeout");
}
}
static json_t * wf_impl_jsonrpc_request_create(
char const * method,
@ -106,31 +87,16 @@ void wf_impl_jsonrpc_proxy_init(
void * user_data)
{
proxy->send = send;
proxy->timeout = timeout;
proxy->user_data = user_data;
proxy->request.is_pending = false;
proxy->request.timer = wf_impl_timer_create(timeout_manager,
&wf_impl_jsonrpc_proxy_on_timeout, proxy);
proxy->request_manager = wf_impl_jsonrpc_proxy_request_manager_create(
timeout_manager, timeout);
}
void wf_impl_jsonrpc_proxy_cleanup(
struct wf_jsonrpc_proxy * proxy)
{
if (proxy->request.is_pending)
{
void * user_data = proxy->request.user_data;
wf_jsonrpc_proxy_finished_fn * finished = proxy->request.finished;
proxy->request.is_pending = false;
proxy->request.finished = NULL;
proxy->request.user_data = NULL;
proxy->request.id = 0;
wf_impl_timer_cancel(proxy->request.timer);
wf_impl_jsonrpc_propate_error(finished, user_data, WF_BAD, "Bad: cancelled pending request during shutdown");
}
wf_impl_timer_dispose(proxy->request.timer);
wf_impl_jsonrpc_proxy_request_manager_dispose(proxy->request_manager);
}
void wf_impl_jsonrpc_proxy_vinvoke(
@ -141,36 +107,20 @@ void wf_impl_jsonrpc_proxy_vinvoke(
char const * param_info,
va_list args)
{
if (!proxy->request.is_pending)
int id = wf_impl_jsonrpc_proxy_request_manager_add_request(
proxy->request_manager, finished, user_data);
json_t * request = wf_impl_jsonrpc_request_create(method_name, id, param_info, args);
bool const is_send = ((NULL != request) && (proxy->send(request, proxy->user_data)));
if (!is_send)
{
proxy->request.is_pending = true;
proxy->request.finished = finished;
proxy->request.user_data = user_data;
proxy->request.id = 42;
wf_impl_timer_start(proxy->request.timer, proxy->timeout);
json_t * request = wf_impl_jsonrpc_request_create(method_name, proxy->request.id, param_info, args);
bool const is_send = ((NULL != request) && (proxy->send(request, proxy->user_data)));
if (!is_send)
{
proxy->request.is_pending = false;
proxy->request.finished = NULL;
proxy->request.user_data = NULL;
proxy->request.id = 0;
wf_impl_timer_cancel(proxy->request.timer);
wf_impl_jsonrpc_propate_error(finished, user_data, WF_BAD, "Bad: requenst is not sent");
}
if (NULL != request)
{
json_decref(request);
}
wf_impl_jsonrpc_proxy_request_manager_cancel_request(
proxy->request_manager, id, WF_BAD, "Bad: failed to send request");
}
else
if (NULL != request)
{
wf_impl_jsonrpc_propate_error(finished, user_data, WF_BAD_BUSY, "Busy");
json_decref(request);
}
}
@ -197,19 +147,8 @@ void wf_impl_jsonrpc_proxy_onresult(
struct wf_jsonrpc_response response;
wf_impl_jsonrpc_response_init(&response, message);
if ((proxy->request.is_pending) && (response.id == proxy->request.id))
{
wf_jsonrpc_proxy_finished_fn * finished = proxy->request.finished;
void * user_data = proxy->request.user_data;
proxy->request.is_pending = false;
proxy->request.id = 0;
proxy->request.user_data = NULL;
proxy->request.finished = NULL;
wf_impl_timer_cancel(proxy->request.timer);
finished(user_data, response.result, response.error);
}
wf_impl_jsonrpc_proxy_request_manager_finish_request(
proxy->request_manager, &response);
wf_impl_jsonrpc_response_cleanup(&response);
}

@ -10,21 +10,11 @@ extern "C"
{
#endif
struct wf_timer;
struct wf_jsonrpc_request
{
bool is_pending;
wf_jsonrpc_proxy_finished_fn * finished;
void * user_data;
int id;
struct wf_timer * timer;
};
struct wf_jsonrpc_proxy_request_manager;
struct wf_jsonrpc_proxy
{
struct wf_jsonrpc_request request;
int timeout;
struct wf_jsonrpc_proxy_request_manager * request_manager;
wf_jsonrpc_send_fn * send;
void * user_data;
};

@ -0,0 +1,189 @@
#include "webfuse/impl/jsonrpc/proxy_request_manager.h"
#include "webfuse/status.h"
#include "webfuse/impl/timer/timer.h"
#include "webfuse/impl/jsonrpc/response_intern.h"
#include "webfuse/impl/jsonrpc/error.h"
#include <stdlib.h>
#include <limits.h>
struct wf_timer;
struct wf_jsonrpc_proxy_request
{
struct wf_jsonrpc_proxy_request_manager * manager;
int id;
wf_jsonrpc_proxy_finished_fn * finished;
void * user_data;
struct wf_timer * timer;
struct wf_jsonrpc_proxy_request * next;
};
struct wf_jsonrpc_proxy_request_manager
{
struct wf_timer_manager * timer_manager;
int timeout;
int id;
struct wf_jsonrpc_proxy_request * requests;
};
static void
wf_impl_jsonrpc_proxy_request_on_timeout(
struct wf_timer * timer,
void * user_data)
{
struct wf_jsonrpc_proxy_request * request = user_data;
wf_impl_jsonrpc_proxy_request_manager_cancel_request(
request->manager,
request->id,
WF_BAD_TIMEOUT,
"Timeout");
}
static int
wf_impl_jsonrpc_proxy_request_manager_next_id(
struct wf_jsonrpc_proxy_request_manager * manager)
{
if (manager->id < INT_MAX)
{
manager->id++;
}
else
{
manager->id = 1;
}
return manager->id;
}
struct wf_jsonrpc_proxy_request_manager *
wf_impl_jsonrpc_proxy_request_manager_create(
struct wf_timer_manager * timer_manager,
int timeout)
{
struct wf_jsonrpc_proxy_request_manager * manager = malloc(sizeof(struct wf_jsonrpc_proxy_request_manager));
manager->id = 1;
manager->timer_manager = timer_manager;
manager->timeout = timeout;
manager->requests = NULL;
return manager;
}
void
wf_impl_jsonrpc_proxy_request_manager_dispose(
struct wf_jsonrpc_proxy_request_manager * manager)
{
struct wf_jsonrpc_proxy_request * request = manager->requests;
while (NULL != request)
{
struct wf_jsonrpc_proxy_request * next = request->next;
wf_impl_jsonrpc_propate_error(
request->finished, request->user_data,
WF_BAD, "Bad: cancelled pending request during shutdown");
wf_impl_timer_cancel(request->timer);
wf_impl_timer_dispose(request->timer);
free(request);
request = next;
}
free(manager);
}
int
wf_impl_jsonrpc_proxy_request_manager_add_request(
struct wf_jsonrpc_proxy_request_manager * manager,
wf_jsonrpc_proxy_finished_fn * finished,
void * user_data)
{
struct wf_jsonrpc_proxy_request * request = malloc(sizeof(struct wf_jsonrpc_proxy_request));
request->finished = finished;
request->user_data = user_data;
request->manager = manager;
request->id = wf_impl_jsonrpc_proxy_request_manager_next_id(manager);
request->timer = wf_impl_timer_create(manager->timer_manager,
&wf_impl_jsonrpc_proxy_request_on_timeout ,request);
wf_impl_timer_start(request->timer, manager->timeout);
request->next = manager->requests;
manager->requests = request;
return request->id;
}
void
wf_impl_jsonrpc_proxy_request_manager_cancel_request(
struct wf_jsonrpc_proxy_request_manager * manager,
int id,
int error_code,
char const * error_message)
{
struct wf_jsonrpc_proxy_request * prev = NULL;
struct wf_jsonrpc_proxy_request * request = manager->requests;
while (request != NULL)
{
struct wf_jsonrpc_proxy_request * next = request->next;
if (id == request->id)
{
wf_impl_jsonrpc_propate_error(
request->finished, request->user_data,
error_code, error_message);
wf_impl_timer_cancel(request->timer);
wf_impl_timer_dispose(request->timer);
free(request);
if (NULL != prev)
{
prev->next = next;
}
else
{
manager->requests = next;
}
break;
}
prev = request;
request = next;
}
}
void
wf_impl_jsonrpc_proxy_request_manager_finish_request(
struct wf_jsonrpc_proxy_request_manager * manager,
struct wf_jsonrpc_response * response)
{
struct wf_jsonrpc_proxy_request * prev = NULL;
struct wf_jsonrpc_proxy_request * request = manager->requests;
while (request != NULL)
{
struct wf_jsonrpc_proxy_request * next = request->next;
if (response->id == request->id)
{
wf_jsonrpc_proxy_finished_fn * finished = request->finished;
void * user_data = request->user_data;
wf_impl_timer_cancel(request->timer);
wf_impl_timer_dispose(request->timer);
free(request);
if (NULL != prev)
{
prev->next = next;
}
else
{
manager->requests = next;
}
finished(user_data, response->result, response->error);
break;
}
prev = request;
request = next;
}
}

@ -0,0 +1,47 @@
#ifndef WF_IMPL_JSONRPC_PROXY_REQUEST_MANAGER_H
#define WF_IMPL_JSONRPC_PROXY_REQUEST_MANAGER_H
#include "webfuse/impl/jsonrpc/proxy_finished_fn.h"
#ifdef __cplusplus
extern "C"
{
#endif
struct wf_jsonrpc_proxy_request_manager;
struct wf_jsonrpc_response;
struct wf_timer_manager;
extern struct wf_jsonrpc_proxy_request_manager *
wf_impl_jsonrpc_proxy_request_manager_create(
struct wf_timer_manager * timer_manager,
int timeout);
extern void
wf_impl_jsonrpc_proxy_request_manager_dispose(
struct wf_jsonrpc_proxy_request_manager * manager);
extern int
wf_impl_jsonrpc_proxy_request_manager_add_request(
struct wf_jsonrpc_proxy_request_manager * manager,
wf_jsonrpc_proxy_finished_fn * finished,
void * user_data);
extern void
wf_impl_jsonrpc_proxy_request_manager_cancel_request(
struct wf_jsonrpc_proxy_request_manager * manager,
int id,
int error_code,
char const * error_message);
extern void
wf_impl_jsonrpc_proxy_request_manager_finish_request(
struct wf_jsonrpc_proxy_request_manager * manager,
struct wf_jsonrpc_response * response);
#ifdef __cplusplus
}
#endif
#endif

@ -29,6 +29,7 @@ webfuse_static = static_library('webfuse',
'lib/webfuse/impl/timer/timepoint.c',
'lib/webfuse/impl/timer/timer.c',
'lib/webfuse/impl/jsonrpc/proxy.c',
'lib/webfuse/impl/jsonrpc/proxy_request_manager.c',
'lib/webfuse/impl/jsonrpc/proxy_variadic.c',
'lib/webfuse/impl/jsonrpc/server.c',
'lib/webfuse/impl/jsonrpc/method.c',

@ -174,7 +174,7 @@ TEST(wf_jsonrpc_proxy, invoke_calls_finish_if_send_fails)
wf_impl_timer_manager_dispose(timer_manager);
}
TEST(wf_jsonrpc_proxy, invoke_fails_if_another_request_is_pending)
TEST(wf_jsonrpc_proxy, invoke_if_another_request_is_pending)
{
struct wf_timer_manager * timer_manager = wf_impl_timer_manager_create();
@ -195,9 +195,6 @@ TEST(wf_jsonrpc_proxy, invoke_fails_if_another_request_is_pending)
ASSERT_FALSE(finished_context.is_called);
ASSERT_TRUE(finished_context2.is_called);
ASSERT_EQ(WF_BAD_BUSY, jsonrpc_get_status(finished_context2.error));
wf_impl_jsonrpc_proxy_dispose(proxy);
wf_impl_timer_manager_dispose(timer_manager);
}
@ -387,28 +384,6 @@ TEST(wf_jsonrpc_proxy, notify_dont_send_invalid_request)
wf_impl_timer_manager_dispose(timer_manager);
}
TEST(wf_jsonrpc_proxy, swallow_timeout_if_no_request_pending)
{
MockTimer timer_api;
wf_timer_on_timer_fn * on_timer = nullptr;
void * timer_context = nullptr;
EXPECT_CALL(timer_api, wf_impl_timer_create(_, _, _))
.Times(1)
.WillOnce(DoAll(SaveArg<1>(&on_timer), SaveArg<2>(&timer_context), Return(nullptr)));
EXPECT_CALL(timer_api, wf_impl_timer_dispose(_)).Times(1);
SendContext send_context;
void * send_data = reinterpret_cast<void*>(&send_context);
struct wf_jsonrpc_proxy * proxy = wf_impl_jsonrpc_proxy_create(nullptr, 1, &jsonrpc_send, send_data);
on_timer(nullptr, timer_context);
ASSERT_FALSE(send_context.is_called);
wf_impl_jsonrpc_proxy_dispose(proxy);
}
TEST(wf_jsonrpc_proxy, on_result_swallow_if_no_request_pending)
{
struct wf_timer_manager * timer_manager = wf_impl_timer_manager_create();

@ -394,8 +394,6 @@ TEST(server, read_large_file)
int offset = json_integer_value(json_array_get(params, 3));
int length = json_integer_value(json_array_get(params, 4));
std::cout << "offset: " << offset << ", length: " << length << std::endl;
int remaining = (offset < 1024000) ? 1024000 - offset : 0;
int count = (length < remaining) ? length : remaining;

Loading…
Cancel
Save