diff --git a/lib/webfuse/impl/client_protocol.c b/lib/webfuse/impl/client_protocol.c index 646bf58..1d9370d 100644 --- a/lib/webfuse/impl/client_protocol.c +++ b/lib/webfuse/impl/client_protocol.c @@ -19,6 +19,7 @@ #include #define WF_DEFAULT_TIMEOUT (10 * 1000) +#define WF_DEFAULT_MESSAGE_SIZE (10 * 1024) struct wf_impl_client_protocol_add_filesystem_context { @@ -32,6 +33,7 @@ wf_impl_client_protocol_process( char const * data, size_t length) { + json_t * message = json_loadb(data, length, 0, NULL); if (NULL != message) { @@ -44,6 +46,35 @@ wf_impl_client_protocol_process( } } +static void +wf_impl_client_protocol_receive( + struct wf_client_protocol * protocol, + char const * data, + size_t length, + bool is_final_fragment) +{ + if (is_final_fragment) + { + if (wf_impl_buffer_is_empty(&protocol->recv_buffer)) + { + wf_impl_client_protocol_process(protocol, data, length); + } + else + { + wf_impl_buffer_append(&protocol->recv_buffer, data, length); + wf_impl_client_protocol_process(protocol, + wf_impl_buffer_data(&protocol->recv_buffer), + wf_impl_buffer_size(&protocol->recv_buffer)); + wf_impl_buffer_clear(&protocol->recv_buffer); + } + } + else + { + wf_impl_buffer_append(&protocol->recv_buffer, data, length); + } +} + + static bool wf_impl_client_protocol_send( json_t * request, @@ -145,7 +176,7 @@ static int wf_impl_client_protocol_lws_callback( protocol->callback(protocol->user_data, WF_CLIENT_DISCONNECTED, NULL); break; case LWS_CALLBACK_CLIENT_RECEIVE: - wf_impl_client_protocol_process(protocol, in, len); + wf_impl_client_protocol_receive(protocol, in, len, lws_is_final_fragment(wsi)); break; case LWS_CALLBACK_SERVER_WRITEABLE: // fall-through @@ -196,6 +227,7 @@ wf_impl_client_protocol_init( protocol->user_data = user_data; protocol->filesystem = NULL; + wf_impl_buffer_init(&protocol->recv_buffer, WF_DEFAULT_MESSAGE_SIZE); wf_impl_slist_init(&protocol->messages); protocol->timer_manager = wf_impl_timer_manager_create(); protocol->proxy = wf_impl_jsonrpc_proxy_create(protocol->timer_manager, WF_DEFAULT_TIMEOUT, &wf_impl_client_protocol_send, protocol); @@ -218,6 +250,8 @@ wf_impl_client_protocol_cleanup( wf_impl_filesystem_dispose(protocol->filesystem); protocol->filesystem = NULL; } + + wf_impl_buffer_cleanup(&protocol->recv_buffer); } void diff --git a/lib/webfuse/impl/client_protocol.h b/lib/webfuse/impl/client_protocol.h index 34d86ed..c9e2189 100644 --- a/lib/webfuse/impl/client_protocol.h +++ b/lib/webfuse/impl/client_protocol.h @@ -3,6 +3,7 @@ #include "webfuse/client_callback.h" #include "webfuse/impl/util/slist.h" +#include "webfuse/impl/util/buffer.h" #ifndef __cplusplus #include @@ -37,6 +38,7 @@ struct wf_client_protocol struct wf_timer_manager * timer_manager; struct wf_jsonrpc_proxy * proxy; struct wf_slist messages; + struct wf_buffer recv_buffer; }; extern void diff --git a/lib/webfuse/impl/server_protocol.c b/lib/webfuse/impl/server_protocol.c index 3aa8cde..48792c0 100644 --- a/lib/webfuse/impl/server_protocol.c +++ b/lib/webfuse/impl/server_protocol.c @@ -61,7 +61,7 @@ static int wf_impl_server_protocol_callback( case LWS_CALLBACK_RECEIVE: if (NULL != session) { - wf_impl_session_receive(session, in, len); + wf_impl_session_receive(session, in, len, lws_is_final_fragment(wsi)); } break; case LWS_CALLBACK_RAW_RX_FILE: diff --git a/lib/webfuse/impl/session.c b/lib/webfuse/impl/session.c index 48b2d78..5d3d078 100644 --- a/lib/webfuse/impl/session.c +++ b/lib/webfuse/impl/session.c @@ -17,6 +17,7 @@ #include #define WF_DEFAULT_TIMEOUT (10 * 1000) +#define WF_DEFAULT_MESSAGE_SIZE (8 * 1024) static bool wf_impl_session_send( json_t * request, @@ -60,6 +61,7 @@ struct wf_impl_session * wf_impl_session_create( session->mountpoint_factory = mountpoint_factory; session->rpc = wf_impl_jsonrpc_proxy_create(timer_manager, WF_DEFAULT_TIMEOUT, &wf_impl_session_send, session); wf_impl_slist_init(&session->messages); + wf_impl_buffer_init(&session->recv_buffer, WF_DEFAULT_MESSAGE_SIZE); return session; } @@ -85,6 +87,7 @@ void wf_impl_session_dispose( wf_impl_message_queue_cleanup(&session->messages); wf_impl_session_dispose_filesystems(&session->filesystems); + wf_impl_buffer_cleanup(&session->recv_buffer); free(session); } @@ -146,8 +149,7 @@ void wf_impl_session_onwritable( } } - -void wf_impl_session_receive( +static void wf_impl_session_process( struct wf_impl_session * session, char const * data, size_t length) @@ -164,9 +166,35 @@ void wf_impl_session_receive( wf_impl_jsonrpc_server_process(session->server, message, &wf_impl_session_send, session); } - json_decref(message); + json_decref(message); } +} +void wf_impl_session_receive( + struct wf_impl_session * session, + char const * data, + size_t length, + bool is_final_fragment) +{ + if (is_final_fragment) + { + if (wf_impl_buffer_is_empty(&session->recv_buffer)) + { + wf_impl_session_process(session, data, length); + } + else + { + wf_impl_buffer_append(&session->recv_buffer, data, length); + wf_impl_session_process(session, + wf_impl_buffer_data(&session->recv_buffer), + wf_impl_buffer_size(&session->recv_buffer)); + wf_impl_buffer_clear(&session->recv_buffer); + } + } + else + { + wf_impl_buffer_append(&session->recv_buffer, data, length); + } } static struct wf_impl_filesystem * wf_impl_session_get_filesystem( diff --git a/lib/webfuse/impl/session.h b/lib/webfuse/impl/session.h index 7766b6c..6453993 100644 --- a/lib/webfuse/impl/session.h +++ b/lib/webfuse/impl/session.h @@ -12,6 +12,7 @@ using std::size_t; #include "webfuse/impl/message_queue.h" #include "webfuse/impl/filesystem.h" #include "webfuse/impl/util/slist.h" +#include "webfuse/impl/util/buffer.h" #include "webfuse/impl/jsonrpc/proxy.h" #include "webfuse/impl/jsonrpc/server.h" @@ -38,6 +39,7 @@ struct wf_impl_session struct wf_jsonrpc_server * server; struct wf_jsonrpc_proxy * rpc; struct wf_slist filesystems; + struct wf_buffer recv_buffer; }; extern struct wf_impl_session * wf_impl_session_create( @@ -61,7 +63,8 @@ extern bool wf_impl_session_add_filesystem( extern void wf_impl_session_receive( struct wf_impl_session * session, char const * data, - size_t length); + size_t length, + bool is_final_fragment); extern void wf_impl_session_onwritable( struct wf_impl_session * session); diff --git a/lib/webfuse/impl/util/buffer.c b/lib/webfuse/impl/util/buffer.c new file mode 100644 index 0000000..11e4d26 --- /dev/null +++ b/lib/webfuse/impl/util/buffer.c @@ -0,0 +1,64 @@ +#include "webfuse/impl/util/buffer.h" +#include +#include + +void +wf_impl_buffer_init( + struct wf_buffer * buffer, + size_t initial_capacity) +{ + buffer->data = malloc(initial_capacity); + buffer->size = 0; + buffer->capacity = initial_capacity; +} + +void +wf_impl_buffer_cleanup( + struct wf_buffer * buffer) +{ + free(buffer->data); +} + +bool +wf_impl_buffer_is_empty( + struct wf_buffer * buffer) +{ + return (0 == buffer->size); +} + +void +wf_impl_buffer_clear( + struct wf_buffer * buffer) +{ + buffer->size = 0; +} + +void +wf_impl_buffer_append( + struct wf_buffer * buffer, + char const * data, + size_t length) +{ + while (length > (buffer->capacity - buffer->size)) + { + buffer->capacity *= 2; + buffer->data = realloc(buffer->data, buffer->capacity); + } + + memcpy(&(buffer->data[buffer->size]), data, length); + buffer->size += length; +} + +char * +wf_impl_buffer_data( + struct wf_buffer * buffer) +{ + return buffer->data; +} + +size_t +wf_impl_buffer_size( + struct wf_buffer * buffer) +{ + return buffer->size; +} diff --git a/lib/webfuse/impl/util/buffer.h b/lib/webfuse/impl/util/buffer.h new file mode 100644 index 0000000..85cfd65 --- /dev/null +++ b/lib/webfuse/impl/util/buffer.h @@ -0,0 +1,59 @@ +#ifndef WF_IMPL_UTIL_BUFFER_H +#define WF_IMPL_UTIL_BUFFER_H + +#ifndef __cplusplus +#include +#include +#else +#include +#endif + +#ifdef __cplusplus +extern "C" +{ +#endif + +struct wf_buffer +{ + char * data; + size_t size; + size_t capacity; +}; + +extern void +wf_impl_buffer_init( + struct wf_buffer * buffer, + size_t initial_capacity); + +extern void +wf_impl_buffer_cleanup( + struct wf_buffer * buffer); + +extern bool +wf_impl_buffer_is_empty( + struct wf_buffer * buffer); + +extern void +wf_impl_buffer_clear( + struct wf_buffer * buffer); + +extern void +wf_impl_buffer_append( + struct wf_buffer * buffer, + char const * data, + size_t lenght); + +extern char * +wf_impl_buffer_data( + struct wf_buffer * buffer); + +extern size_t +wf_impl_buffer_size( + struct wf_buffer * buffer); + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/meson.build b/meson.build index 6836e4e..3ff6f23 100644 --- a/meson.build +++ b/meson.build @@ -21,6 +21,7 @@ webfuse_static = static_library('webfuse', 'lib/webfuse/api.c', 'lib/webfuse/impl/util/slist.c', 'lib/webfuse/impl/util/base64.c', + 'lib/webfuse/impl/util/buffer.c', 'lib/webfuse/impl/util/lws_log.c', 'lib/webfuse/impl/util/json_util.c', 'lib/webfuse/impl/util/url.c', @@ -143,6 +144,7 @@ alltests = executable('alltests', 'test/webfuse/util/test_container_of.cc', 'test/webfuse/util/test_slist.cc', 'test/webfuse/util/test_base64.cc', + 'test/webfuse/util/test_buffer.cc', 'test/webfuse/util/test_url.cc', 'test/webfuse/test_status.cc', 'test/webfuse/test_message.cc', diff --git a/test/webfuse/test_client.cc b/test/webfuse/test_client.cc index a1ae12b..fe559e8 100644 --- a/test/webfuse/test_client.cc +++ b/test/webfuse/test_client.cc @@ -10,6 +10,8 @@ #include "webfuse/mocks/mock_invokation_handler.hpp" #include "webfuse/test_util/file.hpp" #include "webfuse/mocks/lookup_matcher.hpp" +#include "webfuse/mocks/open_matcher.hpp" +#include "webfuse/mocks/getattr_matcher.hpp" #include #include @@ -19,10 +21,13 @@ using webfuse_test::WsServer; using webfuse_test::MockInvokationHander; using webfuse_test::MockAdapterClientCallback; using webfuse_test::File; +using webfuse_test::GetAttr; +using webfuse_test::Open; using webfuse_test::Lookup; using testing::_; using testing::Invoke; using testing::AnyNumber; +using testing::AtMost; using testing::Return; using testing::Throw; using testing::StrEq; @@ -505,17 +510,9 @@ TEST(AdapterClient, LookupFile) EXPECT_CALL(handler, Invoke(StrEq("lookup"), _)).Times(AnyNumber()) .WillRepeatedly(Throw(std::runtime_error("unknown"))); EXPECT_CALL(handler, Invoke(StrEq("lookup"), Lookup(1, "Hello.txt"))).Times(AnyNumber()) - .WillRepeatedly(Return( - "{" - "\"inode\": 2," - "\"mode\": 420," //0644 - "\"type\": \"file\"," - "\"size\": 42," - "\"atime\": 0," - "\"mtime\": 0," - "\"ctime\": 0" - "}" - )); + .WillRepeatedly(Return("{\"inode\": 2, \"mode\": 420, \"type\": \"file\", \"size\": 42}")); + EXPECT_CALL(handler, Invoke(StrEq("getattr"), GetAttr(1))).Times(AnyNumber()) + .WillOnce(Return("{\"mode\": 420, \"type\": \"dir\"}")); MockAdapterClientCallback callback; EXPECT_CALL(callback, Invoke(_, _, _)).Times(AnyNumber()); @@ -542,6 +539,8 @@ TEST(AdapterClient, LookupFile) client.AddFileSystem(); ASSERT_EQ(std::future_status::ready, called.get_future().wait_for(TIMEOUT)); + std::string base_dir = client.GetDir(); + ASSERT_TRUE(File(base_dir).isDirectory()); std::string file_name = client.GetDir() + "/Hello.txt"; File file(file_name); ASSERT_TRUE(file.isFile()); @@ -549,3 +548,76 @@ TEST(AdapterClient, LookupFile) client.Disconnect(); ASSERT_EQ(std::future_status::ready, disconnected.get_future().wait_for(TIMEOUT)); } + +TEST(AdapterClient, ReadLargeFile) +{ + MockInvokationHander handler; + WsServer server(handler, WF_PROTOCOL_NAME_PROVIDER_SERVER); + EXPECT_CALL(handler, Invoke(StrEq("add_filesystem"),_)).Times(1) + .WillOnce(Return("{\"id\": \"test\"}")); + EXPECT_CALL(handler, Invoke(StrEq("lookup"), _)).Times(AnyNumber()) + .WillRepeatedly(Throw(std::runtime_error("unknown"))); + EXPECT_CALL(handler, Invoke(StrEq("lookup"), Lookup(1, "a.file"))).Times(1) + .WillOnce(Return("{\"inode\": 2, \"mode\": 420, \"type\": \"file\", \"size\": 4096}")); + EXPECT_CALL(handler, Invoke(StrEq("getattr"), GetAttr(1))).Times(AnyNumber()) + .WillRepeatedly(Return("{\"mode\": 420, \"type\": \"dir\"}")); + EXPECT_CALL(handler, Invoke(StrEq("open"), Open(2))).Times(1) + .WillOnce(Return("{\"handle\": 42}")); + EXPECT_CALL(handler, Invoke(StrEq("read"), _)).Times(AnyNumber()) + .WillRepeatedly(Invoke([](char const *, json_t * params) { + int offset = json_integer_value(json_array_get(params, 3)); + int length = json_integer_value(json_array_get(params, 4)); + + int remaining = (offset < 4096) ? 4096 - offset : 0; + int count = (length < remaining) ? length : remaining; + + std::string data = std::string(count, '*'); + + json_t * result = json_object(); + json_object_set_new(result, "data", json_string(data.c_str())); + json_object_set_new(result, "format", json_string("identity")); + json_object_set_new(result, "count", json_integer(count)); + + char * result_text = json_dumps(result, 0); + std::string result_str = result_text; + free(result_text); + json_decref(result); + + return result_str; + })); + EXPECT_CALL(handler, Invoke(StrEq("close"), _)).Times(AtMost(1)); + + MockAdapterClientCallback callback; + EXPECT_CALL(callback, Invoke(_, _, _)).Times(AnyNumber()); + + std::promise connected; + EXPECT_CALL(callback, Invoke(_, WF_CLIENT_CONNECTED, nullptr)).Times(1) + .WillOnce(Invoke([&] (wf_client *, int, void *) mutable { connected.set_value(); })); + + std::promise disconnected; + EXPECT_CALL(callback, Invoke(_, WF_CLIENT_DISCONNECTED, nullptr)).Times(1) + .WillOnce(Invoke([&] (wf_client *, int, void *) mutable { disconnected.set_value(); })); + + std::promise called; + EXPECT_CALL(callback, Invoke(_, WF_CLIENT_FILESYSTEM_ADDED, nullptr)).Times(1) + .WillOnce(Invoke([&called] (wf_client *, int, void *) mutable { + called.set_value(); + })); + + AdapterClient client(callback.GetCallbackFn(), callback.GetUserData(), server.GetUrl()); + + client.Connect(); + ASSERT_EQ(std::future_status::ready, connected.get_future().wait_for(TIMEOUT)); + + client.AddFileSystem(); + ASSERT_EQ(std::future_status::ready, called.get_future().wait_for(TIMEOUT)); + + std::string base_dir = client.GetDir(); + ASSERT_TRUE(File(base_dir).isDirectory()); + File file(base_dir + "/a.file"); + std::string contents(4096, '*'); + ASSERT_TRUE(file.hasContents(contents)); + + client.Disconnect(); + ASSERT_EQ(std::future_status::ready, disconnected.get_future().wait_for(TIMEOUT)); +} diff --git a/test/webfuse/test_server.cc b/test/webfuse/test_server.cc index 834ad85..500b380 100644 --- a/test/webfuse/test_server.cc +++ b/test/webfuse/test_server.cc @@ -324,7 +324,7 @@ TEST(server, read_large_file) EXPECT_CALL(handler, Invoke(StrEq("lookup"), Lookup(1, "a.file"))).Times(1) .WillOnce(Return("{\"inode\": 2, \"mode\": 420, \"type\": \"file\", \"size\": 4096}")); EXPECT_CALL(handler, Invoke(StrEq("getattr"), GetAttr(1))).Times(AnyNumber()) - .WillOnce(Return("{\"mode\": 420, \"type\": \"dir\"}")); + .WillRepeatedly(Return("{\"mode\": 420, \"type\": \"dir\"}")); EXPECT_CALL(handler, Invoke(StrEq("open"), Open(2))).Times(1) .WillOnce(Return("{\"handle\": 42}")); EXPECT_CALL(handler, Invoke(StrEq("read"), _)).Times(AnyNumber()) @@ -332,9 +332,6 @@ TEST(server, read_large_file) int offset = json_integer_value(json_array_get(params, 3)); int length = json_integer_value(json_array_get(params, 4)); - std::cout << "offset: " << offset << std::endl; - std::cout << "length: " << length << std::endl; - int remaining = (offset < 4096) ? 4096 - offset : 0; int count = (length < remaining) ? length : remaining; @@ -348,6 +345,7 @@ TEST(server, read_large_file) char * result_text = json_dumps(result, 0); std::string result_str = result_text; free(result_text); + json_decref(result); return result_str; })); @@ -369,7 +367,8 @@ TEST(server, read_large_file) std::string base_dir = server.GetBaseDir(); ASSERT_TRUE(File(base_dir).isDirectory()); File file(base_dir + "/test/a.file"); - ASSERT_TRUE(file.hasContents("*")); + std::string contents(4096, '*'); + ASSERT_TRUE(file.hasContents(contents)); auto disconnected = client.Disconnect(); ASSERT_TRUE(disconnected); diff --git a/test/webfuse/util/test_buffer.cc b/test/webfuse/util/test_buffer.cc new file mode 100644 index 0000000..1b20029 --- /dev/null +++ b/test/webfuse/util/test_buffer.cc @@ -0,0 +1,46 @@ +#include "webfuse/impl/util/buffer.h" +#include + +TEST(wf_buffer, append) +{ + wf_buffer buffer; + wf_impl_buffer_init(&buffer, 2); + + wf_impl_buffer_append(&buffer, "Hello", 5); + wf_impl_buffer_append(&buffer, ", " , 2); + wf_impl_buffer_append(&buffer, "World", 6); + + ASSERT_STREQ("Hello, World", wf_impl_buffer_data(&buffer)); + ASSERT_EQ(13, wf_impl_buffer_size(&buffer)); + + wf_impl_buffer_cleanup(&buffer); +} + +TEST(wf_buffer, is_empty) +{ + wf_buffer buffer; + wf_impl_buffer_init(&buffer, 20); + + ASSERT_TRUE(wf_impl_buffer_is_empty(&buffer)); + + wf_impl_buffer_append(&buffer, "Hi!", 4); + ASSERT_FALSE(wf_impl_buffer_is_empty(&buffer)); + + wf_impl_buffer_cleanup(&buffer); +} + +TEST(wf_buffer, clear) +{ + wf_buffer buffer; + wf_impl_buffer_init(&buffer, 2); + + ASSERT_TRUE(wf_impl_buffer_is_empty(&buffer)); + + wf_impl_buffer_append(&buffer, "Hi!", 4); + ASSERT_FALSE(wf_impl_buffer_is_empty(&buffer)); + + wf_impl_buffer_clear(&buffer); + ASSERT_TRUE(wf_impl_buffer_is_empty(&buffer)); + + wf_impl_buffer_cleanup(&buffer); +}