Merge pull request #83 from falk-werner/fix_read_large_file

Fix read large file
pull/86/head
Falk Werner 4 years ago committed by GitHub
commit 060c43f883
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,6 +19,7 @@
#include <libwebsockets.h>
#define WF_DEFAULT_TIMEOUT (10 * 1000)
#define WF_DEFAULT_MESSAGE_SIZE (10 * 1024)
struct wf_impl_client_protocol_add_filesystem_context
{
@ -32,6 +33,7 @@ wf_impl_client_protocol_process(
char const * data,
size_t length)
{
json_t * message = json_loadb(data, length, 0, NULL);
if (NULL != message)
{
@ -44,6 +46,35 @@ wf_impl_client_protocol_process(
}
}
static void
wf_impl_client_protocol_receive(
struct wf_client_protocol * protocol,
char const * data,
size_t length,
bool is_final_fragment)
{
if (is_final_fragment)
{
if (wf_impl_buffer_is_empty(&protocol->recv_buffer))
{
wf_impl_client_protocol_process(protocol, data, length);
}
else
{
wf_impl_buffer_append(&protocol->recv_buffer, data, length);
wf_impl_client_protocol_process(protocol,
wf_impl_buffer_data(&protocol->recv_buffer),
wf_impl_buffer_size(&protocol->recv_buffer));
wf_impl_buffer_clear(&protocol->recv_buffer);
}
}
else
{
wf_impl_buffer_append(&protocol->recv_buffer, data, length);
}
}
static bool
wf_impl_client_protocol_send(
json_t * request,
@ -145,7 +176,7 @@ static int wf_impl_client_protocol_lws_callback(
protocol->callback(protocol->user_data, WF_CLIENT_DISCONNECTED, NULL);
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
wf_impl_client_protocol_process(protocol, in, len);
wf_impl_client_protocol_receive(protocol, in, len, lws_is_final_fragment(wsi));
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
// fall-through
@ -196,6 +227,7 @@ wf_impl_client_protocol_init(
protocol->user_data = user_data;
protocol->filesystem = NULL;
wf_impl_buffer_init(&protocol->recv_buffer, WF_DEFAULT_MESSAGE_SIZE);
wf_impl_slist_init(&protocol->messages);
protocol->timer_manager = wf_impl_timer_manager_create();
protocol->proxy = wf_impl_jsonrpc_proxy_create(protocol->timer_manager, WF_DEFAULT_TIMEOUT, &wf_impl_client_protocol_send, protocol);
@ -218,6 +250,8 @@ wf_impl_client_protocol_cleanup(
wf_impl_filesystem_dispose(protocol->filesystem);
protocol->filesystem = NULL;
}
wf_impl_buffer_cleanup(&protocol->recv_buffer);
}
void

@ -3,6 +3,7 @@
#include "webfuse/client_callback.h"
#include "webfuse/impl/util/slist.h"
#include "webfuse/impl/util/buffer.h"
#ifndef __cplusplus
#include <stdbool.h>
@ -37,6 +38,7 @@ struct wf_client_protocol
struct wf_timer_manager * timer_manager;
struct wf_jsonrpc_proxy * proxy;
struct wf_slist messages;
struct wf_buffer recv_buffer;
};
extern void

@ -61,7 +61,7 @@ static int wf_impl_server_protocol_callback(
case LWS_CALLBACK_RECEIVE:
if (NULL != session)
{
wf_impl_session_receive(session, in, len);
wf_impl_session_receive(session, in, len, lws_is_final_fragment(wsi));
}
break;
case LWS_CALLBACK_RAW_RX_FILE:

@ -17,6 +17,7 @@
#include <stdlib.h>
#define WF_DEFAULT_TIMEOUT (10 * 1000)
#define WF_DEFAULT_MESSAGE_SIZE (8 * 1024)
static bool wf_impl_session_send(
json_t * request,
@ -60,6 +61,7 @@ struct wf_impl_session * wf_impl_session_create(
session->mountpoint_factory = mountpoint_factory;
session->rpc = wf_impl_jsonrpc_proxy_create(timer_manager, WF_DEFAULT_TIMEOUT, &wf_impl_session_send, session);
wf_impl_slist_init(&session->messages);
wf_impl_buffer_init(&session->recv_buffer, WF_DEFAULT_MESSAGE_SIZE);
return session;
}
@ -85,6 +87,7 @@ void wf_impl_session_dispose(
wf_impl_message_queue_cleanup(&session->messages);
wf_impl_session_dispose_filesystems(&session->filesystems);
wf_impl_buffer_cleanup(&session->recv_buffer);
free(session);
}
@ -146,8 +149,7 @@ void wf_impl_session_onwritable(
}
}
void wf_impl_session_receive(
static void wf_impl_session_process(
struct wf_impl_session * session,
char const * data,
size_t length)
@ -164,9 +166,35 @@ void wf_impl_session_receive(
wf_impl_jsonrpc_server_process(session->server, message, &wf_impl_session_send, session);
}
json_decref(message);
json_decref(message);
}
}
void wf_impl_session_receive(
struct wf_impl_session * session,
char const * data,
size_t length,
bool is_final_fragment)
{
if (is_final_fragment)
{
if (wf_impl_buffer_is_empty(&session->recv_buffer))
{
wf_impl_session_process(session, data, length);
}
else
{
wf_impl_buffer_append(&session->recv_buffer, data, length);
wf_impl_session_process(session,
wf_impl_buffer_data(&session->recv_buffer),
wf_impl_buffer_size(&session->recv_buffer));
wf_impl_buffer_clear(&session->recv_buffer);
}
}
else
{
wf_impl_buffer_append(&session->recv_buffer, data, length);
}
}
static struct wf_impl_filesystem * wf_impl_session_get_filesystem(

@ -12,6 +12,7 @@ using std::size_t;
#include "webfuse/impl/message_queue.h"
#include "webfuse/impl/filesystem.h"
#include "webfuse/impl/util/slist.h"
#include "webfuse/impl/util/buffer.h"
#include "webfuse/impl/jsonrpc/proxy.h"
#include "webfuse/impl/jsonrpc/server.h"
@ -38,6 +39,7 @@ struct wf_impl_session
struct wf_jsonrpc_server * server;
struct wf_jsonrpc_proxy * rpc;
struct wf_slist filesystems;
struct wf_buffer recv_buffer;
};
extern struct wf_impl_session * wf_impl_session_create(
@ -61,7 +63,8 @@ extern bool wf_impl_session_add_filesystem(
extern void wf_impl_session_receive(
struct wf_impl_session * session,
char const * data,
size_t length);
size_t length,
bool is_final_fragment);
extern void wf_impl_session_onwritable(
struct wf_impl_session * session);

@ -0,0 +1,64 @@
#include "webfuse/impl/util/buffer.h"
#include <stdlib.h>
#include <string.h>
void
wf_impl_buffer_init(
struct wf_buffer * buffer,
size_t initial_capacity)
{
buffer->data = malloc(initial_capacity);
buffer->size = 0;
buffer->capacity = initial_capacity;
}
void
wf_impl_buffer_cleanup(
struct wf_buffer * buffer)
{
free(buffer->data);
}
bool
wf_impl_buffer_is_empty(
struct wf_buffer * buffer)
{
return (0 == buffer->size);
}
void
wf_impl_buffer_clear(
struct wf_buffer * buffer)
{
buffer->size = 0;
}
void
wf_impl_buffer_append(
struct wf_buffer * buffer,
char const * data,
size_t length)
{
while (length > (buffer->capacity - buffer->size))
{
buffer->capacity *= 2;
buffer->data = realloc(buffer->data, buffer->capacity);
}
memcpy(&(buffer->data[buffer->size]), data, length);
buffer->size += length;
}
char *
wf_impl_buffer_data(
struct wf_buffer * buffer)
{
return buffer->data;
}
size_t
wf_impl_buffer_size(
struct wf_buffer * buffer)
{
return buffer->size;
}

@ -0,0 +1,59 @@
#ifndef WF_IMPL_UTIL_BUFFER_H
#define WF_IMPL_UTIL_BUFFER_H
#ifndef __cplusplus
#include <stddef.h>
#include <stdbool.h>
#else
#include <cstddef>
#endif
#ifdef __cplusplus
extern "C"
{
#endif
struct wf_buffer
{
char * data;
size_t size;
size_t capacity;
};
extern void
wf_impl_buffer_init(
struct wf_buffer * buffer,
size_t initial_capacity);
extern void
wf_impl_buffer_cleanup(
struct wf_buffer * buffer);
extern bool
wf_impl_buffer_is_empty(
struct wf_buffer * buffer);
extern void
wf_impl_buffer_clear(
struct wf_buffer * buffer);
extern void
wf_impl_buffer_append(
struct wf_buffer * buffer,
char const * data,
size_t lenght);
extern char *
wf_impl_buffer_data(
struct wf_buffer * buffer);
extern size_t
wf_impl_buffer_size(
struct wf_buffer * buffer);
#ifdef __cplusplus
}
#endif
#endif

@ -21,6 +21,7 @@ webfuse_static = static_library('webfuse',
'lib/webfuse/api.c',
'lib/webfuse/impl/util/slist.c',
'lib/webfuse/impl/util/base64.c',
'lib/webfuse/impl/util/buffer.c',
'lib/webfuse/impl/util/lws_log.c',
'lib/webfuse/impl/util/json_util.c',
'lib/webfuse/impl/util/url.c',
@ -143,6 +144,7 @@ alltests = executable('alltests',
'test/webfuse/util/test_container_of.cc',
'test/webfuse/util/test_slist.cc',
'test/webfuse/util/test_base64.cc',
'test/webfuse/util/test_buffer.cc',
'test/webfuse/util/test_url.cc',
'test/webfuse/test_status.cc',
'test/webfuse/test_message.cc',

@ -10,6 +10,8 @@
#include "webfuse/mocks/mock_invokation_handler.hpp"
#include "webfuse/test_util/file.hpp"
#include "webfuse/mocks/lookup_matcher.hpp"
#include "webfuse/mocks/open_matcher.hpp"
#include "webfuse/mocks/getattr_matcher.hpp"
#include <future>
#include <chrono>
@ -19,10 +21,13 @@ using webfuse_test::WsServer;
using webfuse_test::MockInvokationHander;
using webfuse_test::MockAdapterClientCallback;
using webfuse_test::File;
using webfuse_test::GetAttr;
using webfuse_test::Open;
using webfuse_test::Lookup;
using testing::_;
using testing::Invoke;
using testing::AnyNumber;
using testing::AtMost;
using testing::Return;
using testing::Throw;
using testing::StrEq;
@ -505,17 +510,9 @@ TEST(AdapterClient, LookupFile)
EXPECT_CALL(handler, Invoke(StrEq("lookup"), _)).Times(AnyNumber())
.WillRepeatedly(Throw(std::runtime_error("unknown")));
EXPECT_CALL(handler, Invoke(StrEq("lookup"), Lookup(1, "Hello.txt"))).Times(AnyNumber())
.WillRepeatedly(Return(
"{"
"\"inode\": 2,"
"\"mode\": 420," //0644
"\"type\": \"file\","
"\"size\": 42,"
"\"atime\": 0,"
"\"mtime\": 0,"
"\"ctime\": 0"
"}"
));
.WillRepeatedly(Return("{\"inode\": 2, \"mode\": 420, \"type\": \"file\", \"size\": 42}"));
EXPECT_CALL(handler, Invoke(StrEq("getattr"), GetAttr(1))).Times(AnyNumber())
.WillOnce(Return("{\"mode\": 420, \"type\": \"dir\"}"));
MockAdapterClientCallback callback;
EXPECT_CALL(callback, Invoke(_, _, _)).Times(AnyNumber());
@ -542,6 +539,8 @@ TEST(AdapterClient, LookupFile)
client.AddFileSystem();
ASSERT_EQ(std::future_status::ready, called.get_future().wait_for(TIMEOUT));
std::string base_dir = client.GetDir();
ASSERT_TRUE(File(base_dir).isDirectory());
std::string file_name = client.GetDir() + "/Hello.txt";
File file(file_name);
ASSERT_TRUE(file.isFile());
@ -549,3 +548,76 @@ TEST(AdapterClient, LookupFile)
client.Disconnect();
ASSERT_EQ(std::future_status::ready, disconnected.get_future().wait_for(TIMEOUT));
}
TEST(AdapterClient, ReadLargeFile)
{
MockInvokationHander handler;
WsServer server(handler, WF_PROTOCOL_NAME_PROVIDER_SERVER);
EXPECT_CALL(handler, Invoke(StrEq("add_filesystem"),_)).Times(1)
.WillOnce(Return("{\"id\": \"test\"}"));
EXPECT_CALL(handler, Invoke(StrEq("lookup"), _)).Times(AnyNumber())
.WillRepeatedly(Throw(std::runtime_error("unknown")));
EXPECT_CALL(handler, Invoke(StrEq("lookup"), Lookup(1, "a.file"))).Times(1)
.WillOnce(Return("{\"inode\": 2, \"mode\": 420, \"type\": \"file\", \"size\": 4096}"));
EXPECT_CALL(handler, Invoke(StrEq("getattr"), GetAttr(1))).Times(AnyNumber())
.WillRepeatedly(Return("{\"mode\": 420, \"type\": \"dir\"}"));
EXPECT_CALL(handler, Invoke(StrEq("open"), Open(2))).Times(1)
.WillOnce(Return("{\"handle\": 42}"));
EXPECT_CALL(handler, Invoke(StrEq("read"), _)).Times(AnyNumber())
.WillRepeatedly(Invoke([](char const *, json_t * params) {
int offset = json_integer_value(json_array_get(params, 3));
int length = json_integer_value(json_array_get(params, 4));
int remaining = (offset < 4096) ? 4096 - offset : 0;
int count = (length < remaining) ? length : remaining;
std::string data = std::string(count, '*');
json_t * result = json_object();
json_object_set_new(result, "data", json_string(data.c_str()));
json_object_set_new(result, "format", json_string("identity"));
json_object_set_new(result, "count", json_integer(count));
char * result_text = json_dumps(result, 0);
std::string result_str = result_text;
free(result_text);
json_decref(result);
return result_str;
}));
EXPECT_CALL(handler, Invoke(StrEq("close"), _)).Times(AtMost(1));
MockAdapterClientCallback callback;
EXPECT_CALL(callback, Invoke(_, _, _)).Times(AnyNumber());
std::promise<void> connected;
EXPECT_CALL(callback, Invoke(_, WF_CLIENT_CONNECTED, nullptr)).Times(1)
.WillOnce(Invoke([&] (wf_client *, int, void *) mutable { connected.set_value(); }));
std::promise<void> disconnected;
EXPECT_CALL(callback, Invoke(_, WF_CLIENT_DISCONNECTED, nullptr)).Times(1)
.WillOnce(Invoke([&] (wf_client *, int, void *) mutable { disconnected.set_value(); }));
std::promise<void> called;
EXPECT_CALL(callback, Invoke(_, WF_CLIENT_FILESYSTEM_ADDED, nullptr)).Times(1)
.WillOnce(Invoke([&called] (wf_client *, int, void *) mutable {
called.set_value();
}));
AdapterClient client(callback.GetCallbackFn(), callback.GetUserData(), server.GetUrl());
client.Connect();
ASSERT_EQ(std::future_status::ready, connected.get_future().wait_for(TIMEOUT));
client.AddFileSystem();
ASSERT_EQ(std::future_status::ready, called.get_future().wait_for(TIMEOUT));
std::string base_dir = client.GetDir();
ASSERT_TRUE(File(base_dir).isDirectory());
File file(base_dir + "/a.file");
std::string contents(4096, '*');
ASSERT_TRUE(file.hasContents(contents));
client.Disconnect();
ASSERT_EQ(std::future_status::ready, disconnected.get_future().wait_for(TIMEOUT));
}

@ -27,6 +27,7 @@ using testing::_;
using testing::AnyNumber;
using testing::AtMost;
using testing::Return;
using testing::Invoke;
namespace
{
@ -315,6 +316,65 @@ TEST(server, read)
ASSERT_TRUE(disconnected);
}
TEST(server, read_large_file)
{
Server server;
MockInvokationHander handler;
EXPECT_CALL(handler, Invoke(StrEq("lookup"), _)).Times(AnyNumber());
EXPECT_CALL(handler, Invoke(StrEq("lookup"), Lookup(1, "a.file"))).Times(1)
.WillOnce(Return("{\"inode\": 2, \"mode\": 420, \"type\": \"file\", \"size\": 4096}"));
EXPECT_CALL(handler, Invoke(StrEq("getattr"), GetAttr(1))).Times(AnyNumber())
.WillRepeatedly(Return("{\"mode\": 420, \"type\": \"dir\"}"));
EXPECT_CALL(handler, Invoke(StrEq("open"), Open(2))).Times(1)
.WillOnce(Return("{\"handle\": 42}"));
EXPECT_CALL(handler, Invoke(StrEq("read"), _)).Times(AnyNumber())
.WillRepeatedly(Invoke([](char const *, json_t * params) {
int offset = json_integer_value(json_array_get(params, 3));
int length = json_integer_value(json_array_get(params, 4));
int remaining = (offset < 4096) ? 4096 - offset : 0;
int count = (length < remaining) ? length : remaining;
std::string data = std::string(count, '*');
json_t * result = json_object();
json_object_set_new(result, "data", json_string(data.c_str()));
json_object_set_new(result, "format", json_string("identity"));
json_object_set_new(result, "count", json_integer(count));
char * result_text = json_dumps(result, 0);
std::string result_str = result_text;
free(result_text);
json_decref(result);
return result_str;
}));
EXPECT_CALL(handler, Invoke(StrEq("close"), _)).Times(AtMost(1));
WsClient client(handler, WF_PROTOCOL_NAME_PROVIDER_CLIENT);
auto connected = client.Connect(server.GetPort(), WF_PROTOCOL_NAME_ADAPTER_SERVER);
ASSERT_TRUE(connected);
std::string response_text = client.Invoke("{\"method\": \"add_filesystem\", \"params\": [\"test\"], \"id\": 42}");
json_t * response = json_loads(response_text.c_str(), 0, nullptr);
ASSERT_TRUE(json_is_object(response));
json_t * result = json_object_get(response, "result");
ASSERT_TRUE(json_is_object(result));
json_t * id = json_object_get(response, "id");
ASSERT_EQ(42, json_integer_value(id));
json_decref(response);
std::string base_dir = server.GetBaseDir();
ASSERT_TRUE(File(base_dir).isDirectory());
File file(base_dir + "/test/a.file");
std::string contents(4096, '*');
ASSERT_TRUE(file.hasContents(contents));
auto disconnected = client.Disconnect();
ASSERT_TRUE(disconnected);
}
TEST(server, readdir)
{
Server server;

@ -0,0 +1,46 @@
#include "webfuse/impl/util/buffer.h"
#include <gtest/gtest.h>
TEST(wf_buffer, append)
{
wf_buffer buffer;
wf_impl_buffer_init(&buffer, 2);
wf_impl_buffer_append(&buffer, "Hello", 5);
wf_impl_buffer_append(&buffer, ", " , 2);
wf_impl_buffer_append(&buffer, "World", 6);
ASSERT_STREQ("Hello, World", wf_impl_buffer_data(&buffer));
ASSERT_EQ(13, wf_impl_buffer_size(&buffer));
wf_impl_buffer_cleanup(&buffer);
}
TEST(wf_buffer, is_empty)
{
wf_buffer buffer;
wf_impl_buffer_init(&buffer, 20);
ASSERT_TRUE(wf_impl_buffer_is_empty(&buffer));
wf_impl_buffer_append(&buffer, "Hi!", 4);
ASSERT_FALSE(wf_impl_buffer_is_empty(&buffer));
wf_impl_buffer_cleanup(&buffer);
}
TEST(wf_buffer, clear)
{
wf_buffer buffer;
wf_impl_buffer_init(&buffer, 2);
ASSERT_TRUE(wf_impl_buffer_is_empty(&buffer));
wf_impl_buffer_append(&buffer, "Hi!", 4);
ASSERT_FALSE(wf_impl_buffer_is_empty(&buffer));
wf_impl_buffer_clear(&buffer);
ASSERT_TRUE(wf_impl_buffer_is_empty(&buffer));
wf_impl_buffer_cleanup(&buffer);
}
Loading…
Cancel
Save