From b09c6949eb495dc6be2237fe7cf920828176f3c0 Mon Sep 17 00:00:00 2001 From: Payden Sutherland Date: Fri, 16 Nov 2012 22:26:44 -0500 Subject: [PATCH] Add method for fragmented messages. And use this method for UNIX socket piping. --- wsclient.c | 212 ++++++++++++++++++++++++++++++++++++++++++++--------- wsclient.h | 4 + 2 files changed, 181 insertions(+), 35 deletions(-) diff --git a/wsclient.c b/wsclient.c index 0cdf1f9..efe163f 100644 --- a/wsclient.c +++ b/wsclient.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -94,16 +95,18 @@ void libwsclient_close(wsclient *client) { char data[6]; int i = 0, n, mask_int; struct timeval tv; - gettimeofday(&tv); + gettimeofday(&tv, NULL); srand(tv.tv_sec * tv.tv_usec); mask_int = rand(); memcpy(data+2, &mask_int, 4); data[0] = 0x88; data[1] = 0x80; + pthread_mutex_lock(&client->send_lock); do { n = send(client->sockfd, data, 6, 0); i += n; } while(i < 6 && n > 0); + pthread_mutex_unlock(&client->send_lock); if(n < 0) { if(client->onerror) { err = libwsclient_new_error(WS_DO_CLOSE_SEND_ERR); @@ -126,7 +129,7 @@ void libwsclient_handle_control_frame(wsclient *c, wsclient_frame *ctl_frame) { char mask[4]; int mask_int; struct timeval tv; - gettimeofday(&tv); + gettimeofday(&tv, NULL); srand(tv.tv_sec * tv.tv_usec); mask_int = rand(); memcpy(mask, &mask_int, 4); @@ -140,10 +143,12 @@ void libwsclient_handle_control_frame(wsclient *c, wsclient_frame *ctl_frame) { *(ctl_frame->rawdata + ctl_frame->payload_offset + i) ^= (mask[i % 4] & 0xff); //mask payload *(ctl_frame->rawdata + 1) |= 0x80; //turn mask bit on i = 0; + pthread_mutex_lock(&c->send_lock); while(i < ctl_frame->payload_offset + ctl_frame->payload_len && n >= 0) { n = send(c->sockfd, ctl_frame->rawdata + i, ctl_frame->payload_offset + ctl_frame->payload_len - i, 0); i += n; } + pthread_mutex_unlock(&c->send_lock); if(n < 0) { if(c->onerror) { err = libwsclient_new_error(WS_HANDLE_CTL_FRAME_SEND_ERR); @@ -392,9 +397,10 @@ void *libwsclient_helper_socket_thread(void *ptr) { wsclient *c = ptr; struct sockaddr_un remote; socklen_t len; - int remote_sock, n, payload_idx, payload_sz; + int remote_sock, n, n2, flags; char recv_buf[HELPER_RECV_BUF_SIZE]; - char *payload = NULL, *payload_tmp = NULL; + char secondary_buf[HELPER_RECV_BUF_SIZE]; + for(;;) { //TODO: some way to cleanly break this loop @@ -402,35 +408,41 @@ void *libwsclient_helper_socket_thread(void *ptr) { if((remote_sock = accept(c->helper_sock, (struct sockaddr *)&remote, &len)) == -1) { continue; } - payload_idx = 0; - payload = (char *)malloc(HELPER_RECV_BUF_SIZE); - payload_sz = HELPER_RECV_BUF_SIZE; - memset(payload, 0, payload_sz); - do { + pthread_mutex_lock(&c->send_lock); + + n2 = 0; + n = 1; + flags = WS_FRAGMENT_START; + while(1) { memset(recv_buf, 0, HELPER_RECV_BUF_SIZE); n = recv(remote_sock, recv_buf, HELPER_RECV_BUF_SIZE - 1, 0); + if(n == 0 && n2 != 0) { + libwsclient_send_fragment(c, secondary_buf, n2, flags | WS_FRAGMENT_FIN); + break; + } + if(n == 0 && n2 == 0) { + fprintf(stderr, "Never received any data from UNIX socket, not sending anything.\n"); + break; + } + if(n != 0 && n2 != 0) { + libwsclient_send_fragment(c, secondary_buf, n2, flags); + flags &= ~WS_FRAGMENT_START; + } if(n > 0) { - if(n + payload_idx >= payload_sz) { - payload_tmp = payload; - payload_sz += HELPER_RECV_BUF_SIZE; - payload = (char *)realloc(payload, payload_sz); - if(!payload) { - payload = payload_tmp; - fprintf(stderr, "Unable to realloc, data sent will be truncated.\n"); - break; - } - memset(payload + payload_idx, 0, payload_sz - payload_idx); - } - memcpy(payload + payload_idx, recv_buf, n); - payload_idx += n; + memset(secondary_buf, 0, HELPER_RECV_BUF_SIZE); + memcpy(secondary_buf, recv_buf, n); + n2 = n; + } + if(n < 0) { + fprintf(stderr, "Error occured\n"); + perror("recv"); + break; } - } while(n > 0); + } + pthread_mutex_unlock(&c->send_lock); close(remote_sock); - libwsclient_send(c, payload); - free(payload); - payload = NULL; } return NULL; } @@ -448,6 +460,10 @@ wsclient *libwsclient_new(const char *URI) { fprintf(stderr, "Unable to init mutex in libwsclient_new.\n"); exit(WS_EXIT_PTHREAD_MUTEX_INIT); } + if(pthread_mutex_init(&client->send_lock, NULL) != 0) { + fprintf(stderr, "Unable to init send lock in libwsclient_new.\n"); + exit(WS_EXIT_PTHREAD_MUTEX_INIT); + } pthread_mutex_lock(&client->lock); client->URI = (char *)malloc(strlen(URI)+1); if(!client->URI) { @@ -767,6 +783,132 @@ wsclient_error *libwsclient_new_error(int errcode) { return err; } +int libwsclient_send_fragment(wsclient *client, char *strdata, int len, int flags) { + wsclient_error *err = NULL; + struct timeval tv; + unsigned char mask[4]; + unsigned int mask_int; + unsigned long long payload_len; + unsigned char finNopcode; + unsigned int payload_len_small; + unsigned int payload_offset = 6; + unsigned int len_size; + unsigned long long be_payload_len; + unsigned int sent = 0; + int i, sockfd; + unsigned int frame_size; + char *data = NULL; + + + sockfd = client->sockfd; + + + if(client->flags & CLIENT_SENT_CLOSE_FRAME) { + if(client->onerror) { + err = libwsclient_new_error(WS_SEND_AFTER_CLOSE_FRAME_ERR); + client->onerror(client, err); + free(err); + err = NULL; + } + return 0; + } + if(client->flags & CLIENT_CONNECTING) { + if(client->onerror) { + err = libwsclient_new_error(WS_SEND_DURING_CONNECT_ERR); + client->onerror(client, err); + free(err); + err = NULL; + } + return 0; + } + + if(strdata == NULL) { + if(client->onerror) { + err = libwsclient_new_error(WS_SEND_NULL_DATA_ERR); + client->onerror(client, err); + free(err); + err = NULL; + } + return 0; + } + + gettimeofday(&tv, NULL); + srand(tv.tv_usec * tv.tv_sec); + mask_int = rand(); + memcpy(mask, &mask_int, 4); + payload_len = len; + if(payload_len <= 125) { + frame_size = 6 + payload_len; + payload_len_small = payload_len; + + } else if(payload_len > 125 && payload_len <= 0xffff) { + frame_size = 8 + payload_len; + payload_len_small = 126; + payload_offset += 2; + } else if(payload_len > 0xffff && payload_len <= 0xffffffffffffffffLL) { + frame_size = 14 + payload_len; + payload_len_small = 127; + payload_offset += 8; + } else { + if(client->onerror) { + err = libwsclient_new_error(WS_SEND_DATA_TOO_LARGE_ERR); + client->onerror(client, err); + free(err); + err = NULL; + } + return 0; + } + data = (char *)malloc(frame_size); + + memset(data, 0, frame_size); + *data = flags & 0xff; + *(data+1) = payload_len_small | 0x80; //payload length with mask bit on + if(payload_len_small == 126) { + payload_len &= 0xffff; + len_size = 2; + for(i = 0; i < len_size; i++) { + *(data+2+i) = *((char *)&payload_len+(len_size-i-1)); + //memcpy(data+2+i, (void *)&payload_len+(len_size-i-1), 1); + } + } + if(payload_len_small == 127) { + payload_len &= 0xffffffffffffffffLL; + len_size = 8; + for(i = 0; i < len_size; i++) { + *(data+2+i) = *((char *)&payload_len+(len_size-i-1)); + //memcpy(data+2+i, (void *)&payload_len+(len_size-i-1), 1); + } + } + for(i=0;i<4;i++) + *(data+(payload_offset-4)+i) = mask[i] & 0xff; + + memcpy(data+payload_offset, strdata, len); + for(i=0;i 0) { + i = send(sockfd, data+sent, frame_size - sent, 0); + sent += i; + } + + + if(i < 0) { + if(client->onerror) { + err = libwsclient_new_error(WS_SEND_SEND_ERR); + client->onerror(client, err); + free(err); + err = NULL; + } + } + + free(data); + return sent; +} + int libwsclient_send(wsclient *client, char *strdata) { wsclient_error *err = NULL; struct timeval tv; @@ -779,10 +921,12 @@ int libwsclient_send(wsclient *client, char *strdata) { unsigned int len_size; unsigned long long be_payload_len; unsigned int sent = 0; - int i; + int i, sockfd; unsigned int frame_size; char *data; - pthread_mutex_lock(&client->lock); + + sockfd = client->sockfd; + if(client->flags & CLIENT_SENT_CLOSE_FRAME) { if(client->onerror) { err = libwsclient_new_error(WS_SEND_AFTER_CLOSE_FRAME_ERR); @@ -790,7 +934,6 @@ int libwsclient_send(wsclient *client, char *strdata) { free(err); err = NULL; } - pthread_mutex_unlock(&client->lock); return 0; } if(client->flags & CLIENT_CONNECTING) { @@ -800,11 +943,9 @@ int libwsclient_send(wsclient *client, char *strdata) { free(err); err = NULL; } - pthread_mutex_unlock(&client->lock); return 0; } - int sockfd = client->sockfd; - pthread_mutex_unlock(&client->lock); + sockfd = client->sockfd; if(strdata == NULL) { if(client->onerror) { err = libwsclient_new_error(WS_SEND_NULL_DATA_ERR); @@ -812,11 +953,10 @@ int libwsclient_send(wsclient *client, char *strdata) { free(err); err = NULL; } - return -1; + return 0; } - - gettimeofday(&tv); + gettimeofday(&tv, NULL); srand(tv.tv_usec * tv.tv_sec); mask_int = rand(); memcpy(mask, &mask_int, 4); @@ -872,10 +1012,12 @@ int libwsclient_send(wsclient *client, char *strdata) { sent = 0; i = 0; + pthread_mutex_lock(&client->send_lock); while(sent < frame_size && i >= 0) { i = send(sockfd, data+sent, frame_size - sent, 0); sent += i; } + pthread_mutex_unlock(&client->send_lock); if(i < 0) { if(client->onerror) { diff --git a/wsclient.h b/wsclient.h index 3bb3bd9..74692f5 100644 --- a/wsclient.h +++ b/wsclient.h @@ -21,6 +21,9 @@ #define REQUEST_VALID_STATUS (1 << 2) #define REQUEST_VALID_ACCEPT (1 << 3) +#define WS_FRAGMENT_START (1 << 0) +#define WS_FRAGMENT_FIN (1 << 7) + #define WS_EXIT_MALLOC -1 #define WS_EXIT_PTHREAD_MUTEX_INIT -2 #define WS_EXIT_PTHREAD_CREATE -3 @@ -81,6 +84,7 @@ typedef struct _wsclient { pthread_t handshake_thread; pthread_t run_thread; pthread_mutex_t lock; + pthread_mutex_t send_lock; char *URI; int sockfd; int flags;