1
0
mirror of https://github.com/payden/libwsclient synced 2024-10-27 17:54:01 +00:00

Add method for fragmented messages.

And use this method for UNIX socket piping.
This commit is contained in:
Payden Sutherland 2012-11-16 22:26:44 -05:00
parent 89bc677a40
commit b09c6949eb
2 changed files with 181 additions and 35 deletions

View File

@ -7,6 +7,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/time.h>
#include <sys/un.h> #include <sys/un.h>
#include <pthread.h> #include <pthread.h>
@ -94,16 +95,18 @@ void libwsclient_close(wsclient *client) {
char data[6]; char data[6];
int i = 0, n, mask_int; int i = 0, n, mask_int;
struct timeval tv; struct timeval tv;
gettimeofday(&tv); gettimeofday(&tv, NULL);
srand(tv.tv_sec * tv.tv_usec); srand(tv.tv_sec * tv.tv_usec);
mask_int = rand(); mask_int = rand();
memcpy(data+2, &mask_int, 4); memcpy(data+2, &mask_int, 4);
data[0] = 0x88; data[0] = 0x88;
data[1] = 0x80; data[1] = 0x80;
pthread_mutex_lock(&client->send_lock);
do { do {
n = send(client->sockfd, data, 6, 0); n = send(client->sockfd, data, 6, 0);
i += n; i += n;
} while(i < 6 && n > 0); } while(i < 6 && n > 0);
pthread_mutex_unlock(&client->send_lock);
if(n < 0) { if(n < 0) {
if(client->onerror) { if(client->onerror) {
err = libwsclient_new_error(WS_DO_CLOSE_SEND_ERR); err = libwsclient_new_error(WS_DO_CLOSE_SEND_ERR);
@ -126,7 +129,7 @@ void libwsclient_handle_control_frame(wsclient *c, wsclient_frame *ctl_frame) {
char mask[4]; char mask[4];
int mask_int; int mask_int;
struct timeval tv; struct timeval tv;
gettimeofday(&tv); gettimeofday(&tv, NULL);
srand(tv.tv_sec * tv.tv_usec); srand(tv.tv_sec * tv.tv_usec);
mask_int = rand(); mask_int = rand();
memcpy(mask, &mask_int, 4); memcpy(mask, &mask_int, 4);
@ -140,10 +143,12 @@ void libwsclient_handle_control_frame(wsclient *c, wsclient_frame *ctl_frame) {
*(ctl_frame->rawdata + ctl_frame->payload_offset + i) ^= (mask[i % 4] & 0xff); //mask payload *(ctl_frame->rawdata + ctl_frame->payload_offset + i) ^= (mask[i % 4] & 0xff); //mask payload
*(ctl_frame->rawdata + 1) |= 0x80; //turn mask bit on *(ctl_frame->rawdata + 1) |= 0x80; //turn mask bit on
i = 0; i = 0;
pthread_mutex_lock(&c->send_lock);
while(i < ctl_frame->payload_offset + ctl_frame->payload_len && n >= 0) { while(i < ctl_frame->payload_offset + ctl_frame->payload_len && n >= 0) {
n = send(c->sockfd, ctl_frame->rawdata + i, ctl_frame->payload_offset + ctl_frame->payload_len - i, 0); n = send(c->sockfd, ctl_frame->rawdata + i, ctl_frame->payload_offset + ctl_frame->payload_len - i, 0);
i += n; i += n;
} }
pthread_mutex_unlock(&c->send_lock);
if(n < 0) { if(n < 0) {
if(c->onerror) { if(c->onerror) {
err = libwsclient_new_error(WS_HANDLE_CTL_FRAME_SEND_ERR); err = libwsclient_new_error(WS_HANDLE_CTL_FRAME_SEND_ERR);
@ -392,9 +397,10 @@ void *libwsclient_helper_socket_thread(void *ptr) {
wsclient *c = ptr; wsclient *c = ptr;
struct sockaddr_un remote; struct sockaddr_un remote;
socklen_t len; socklen_t len;
int remote_sock, n, payload_idx, payload_sz; int remote_sock, n, n2, flags;
char recv_buf[HELPER_RECV_BUF_SIZE]; char recv_buf[HELPER_RECV_BUF_SIZE];
char *payload = NULL, *payload_tmp = NULL; char secondary_buf[HELPER_RECV_BUF_SIZE];
for(;;) { //TODO: some way to cleanly break this loop for(;;) { //TODO: some way to cleanly break this loop
@ -402,35 +408,41 @@ void *libwsclient_helper_socket_thread(void *ptr) {
if((remote_sock = accept(c->helper_sock, (struct sockaddr *)&remote, &len)) == -1) { if((remote_sock = accept(c->helper_sock, (struct sockaddr *)&remote, &len)) == -1) {
continue; continue;
} }
payload_idx = 0;
payload = (char *)malloc(HELPER_RECV_BUF_SIZE);
payload_sz = HELPER_RECV_BUF_SIZE;
memset(payload, 0, payload_sz);
do { pthread_mutex_lock(&c->send_lock);
n2 = 0;
n = 1;
flags = WS_FRAGMENT_START;
while(1) {
memset(recv_buf, 0, HELPER_RECV_BUF_SIZE); memset(recv_buf, 0, HELPER_RECV_BUF_SIZE);
n = recv(remote_sock, recv_buf, HELPER_RECV_BUF_SIZE - 1, 0); n = recv(remote_sock, recv_buf, HELPER_RECV_BUF_SIZE - 1, 0);
if(n == 0 && n2 != 0) {
libwsclient_send_fragment(c, secondary_buf, n2, flags | WS_FRAGMENT_FIN);
break;
}
if(n == 0 && n2 == 0) {
fprintf(stderr, "Never received any data from UNIX socket, not sending anything.\n");
break;
}
if(n != 0 && n2 != 0) {
libwsclient_send_fragment(c, secondary_buf, n2, flags);
flags &= ~WS_FRAGMENT_START;
}
if(n > 0) { if(n > 0) {
if(n + payload_idx >= payload_sz) { memset(secondary_buf, 0, HELPER_RECV_BUF_SIZE);
payload_tmp = payload; memcpy(secondary_buf, recv_buf, n);
payload_sz += HELPER_RECV_BUF_SIZE; n2 = n;
payload = (char *)realloc(payload, payload_sz); }
if(!payload) { if(n < 0) {
payload = payload_tmp; fprintf(stderr, "Error occured\n");
fprintf(stderr, "Unable to realloc, data sent will be truncated.\n"); perror("recv");
break; break;
}
memset(payload + payload_idx, 0, payload_sz - payload_idx);
}
memcpy(payload + payload_idx, recv_buf, n);
payload_idx += n;
} }
} while(n > 0); }
pthread_mutex_unlock(&c->send_lock);
close(remote_sock); close(remote_sock);
libwsclient_send(c, payload);
free(payload);
payload = NULL;
} }
return NULL; return NULL;
} }
@ -448,6 +460,10 @@ wsclient *libwsclient_new(const char *URI) {
fprintf(stderr, "Unable to init mutex in libwsclient_new.\n"); fprintf(stderr, "Unable to init mutex in libwsclient_new.\n");
exit(WS_EXIT_PTHREAD_MUTEX_INIT); exit(WS_EXIT_PTHREAD_MUTEX_INIT);
} }
if(pthread_mutex_init(&client->send_lock, NULL) != 0) {
fprintf(stderr, "Unable to init send lock in libwsclient_new.\n");
exit(WS_EXIT_PTHREAD_MUTEX_INIT);
}
pthread_mutex_lock(&client->lock); pthread_mutex_lock(&client->lock);
client->URI = (char *)malloc(strlen(URI)+1); client->URI = (char *)malloc(strlen(URI)+1);
if(!client->URI) { if(!client->URI) {
@ -767,6 +783,132 @@ wsclient_error *libwsclient_new_error(int errcode) {
return err; return err;
} }
int libwsclient_send_fragment(wsclient *client, char *strdata, int len, int flags) {
wsclient_error *err = NULL;
struct timeval tv;
unsigned char mask[4];
unsigned int mask_int;
unsigned long long payload_len;
unsigned char finNopcode;
unsigned int payload_len_small;
unsigned int payload_offset = 6;
unsigned int len_size;
unsigned long long be_payload_len;
unsigned int sent = 0;
int i, sockfd;
unsigned int frame_size;
char *data = NULL;
sockfd = client->sockfd;
if(client->flags & CLIENT_SENT_CLOSE_FRAME) {
if(client->onerror) {
err = libwsclient_new_error(WS_SEND_AFTER_CLOSE_FRAME_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return 0;
}
if(client->flags & CLIENT_CONNECTING) {
if(client->onerror) {
err = libwsclient_new_error(WS_SEND_DURING_CONNECT_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return 0;
}
if(strdata == NULL) {
if(client->onerror) {
err = libwsclient_new_error(WS_SEND_NULL_DATA_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return 0;
}
gettimeofday(&tv, NULL);
srand(tv.tv_usec * tv.tv_sec);
mask_int = rand();
memcpy(mask, &mask_int, 4);
payload_len = len;
if(payload_len <= 125) {
frame_size = 6 + payload_len;
payload_len_small = payload_len;
} else if(payload_len > 125 && payload_len <= 0xffff) {
frame_size = 8 + payload_len;
payload_len_small = 126;
payload_offset += 2;
} else if(payload_len > 0xffff && payload_len <= 0xffffffffffffffffLL) {
frame_size = 14 + payload_len;
payload_len_small = 127;
payload_offset += 8;
} else {
if(client->onerror) {
err = libwsclient_new_error(WS_SEND_DATA_TOO_LARGE_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
return 0;
}
data = (char *)malloc(frame_size);
memset(data, 0, frame_size);
*data = flags & 0xff;
*(data+1) = payload_len_small | 0x80; //payload length with mask bit on
if(payload_len_small == 126) {
payload_len &= 0xffff;
len_size = 2;
for(i = 0; i < len_size; i++) {
*(data+2+i) = *((char *)&payload_len+(len_size-i-1));
//memcpy(data+2+i, (void *)&payload_len+(len_size-i-1), 1);
}
}
if(payload_len_small == 127) {
payload_len &= 0xffffffffffffffffLL;
len_size = 8;
for(i = 0; i < len_size; i++) {
*(data+2+i) = *((char *)&payload_len+(len_size-i-1));
//memcpy(data+2+i, (void *)&payload_len+(len_size-i-1), 1);
}
}
for(i=0;i<4;i++)
*(data+(payload_offset-4)+i) = mask[i] & 0xff;
memcpy(data+payload_offset, strdata, len);
for(i=0;i<len;i++)
*(data+payload_offset+i) ^= mask[i % 4] & 0xff;
sent = 0;
i = 1;
//we don't need the send lock here. It *should* have already been aquired before sending fragmented message
//and will be released after last fragment sent.
while(sent < frame_size && i > 0) {
i = send(sockfd, data+sent, frame_size - sent, 0);
sent += i;
}
if(i < 0) {
if(client->onerror) {
err = libwsclient_new_error(WS_SEND_SEND_ERR);
client->onerror(client, err);
free(err);
err = NULL;
}
}
free(data);
return sent;
}
int libwsclient_send(wsclient *client, char *strdata) { int libwsclient_send(wsclient *client, char *strdata) {
wsclient_error *err = NULL; wsclient_error *err = NULL;
struct timeval tv; struct timeval tv;
@ -779,10 +921,12 @@ int libwsclient_send(wsclient *client, char *strdata) {
unsigned int len_size; unsigned int len_size;
unsigned long long be_payload_len; unsigned long long be_payload_len;
unsigned int sent = 0; unsigned int sent = 0;
int i; int i, sockfd;
unsigned int frame_size; unsigned int frame_size;
char *data; char *data;
pthread_mutex_lock(&client->lock);
sockfd = client->sockfd;
if(client->flags & CLIENT_SENT_CLOSE_FRAME) { if(client->flags & CLIENT_SENT_CLOSE_FRAME) {
if(client->onerror) { if(client->onerror) {
err = libwsclient_new_error(WS_SEND_AFTER_CLOSE_FRAME_ERR); err = libwsclient_new_error(WS_SEND_AFTER_CLOSE_FRAME_ERR);
@ -790,7 +934,6 @@ int libwsclient_send(wsclient *client, char *strdata) {
free(err); free(err);
err = NULL; err = NULL;
} }
pthread_mutex_unlock(&client->lock);
return 0; return 0;
} }
if(client->flags & CLIENT_CONNECTING) { if(client->flags & CLIENT_CONNECTING) {
@ -800,11 +943,9 @@ int libwsclient_send(wsclient *client, char *strdata) {
free(err); free(err);
err = NULL; err = NULL;
} }
pthread_mutex_unlock(&client->lock);
return 0; return 0;
} }
int sockfd = client->sockfd; sockfd = client->sockfd;
pthread_mutex_unlock(&client->lock);
if(strdata == NULL) { if(strdata == NULL) {
if(client->onerror) { if(client->onerror) {
err = libwsclient_new_error(WS_SEND_NULL_DATA_ERR); err = libwsclient_new_error(WS_SEND_NULL_DATA_ERR);
@ -812,11 +953,10 @@ int libwsclient_send(wsclient *client, char *strdata) {
free(err); free(err);
err = NULL; err = NULL;
} }
return -1; return 0;
} }
gettimeofday(&tv, NULL);
gettimeofday(&tv);
srand(tv.tv_usec * tv.tv_sec); srand(tv.tv_usec * tv.tv_sec);
mask_int = rand(); mask_int = rand();
memcpy(mask, &mask_int, 4); memcpy(mask, &mask_int, 4);
@ -872,10 +1012,12 @@ int libwsclient_send(wsclient *client, char *strdata) {
sent = 0; sent = 0;
i = 0; i = 0;
pthread_mutex_lock(&client->send_lock);
while(sent < frame_size && i >= 0) { while(sent < frame_size && i >= 0) {
i = send(sockfd, data+sent, frame_size - sent, 0); i = send(sockfd, data+sent, frame_size - sent, 0);
sent += i; sent += i;
} }
pthread_mutex_unlock(&client->send_lock);
if(i < 0) { if(i < 0) {
if(client->onerror) { if(client->onerror) {

View File

@ -21,6 +21,9 @@
#define REQUEST_VALID_STATUS (1 << 2) #define REQUEST_VALID_STATUS (1 << 2)
#define REQUEST_VALID_ACCEPT (1 << 3) #define REQUEST_VALID_ACCEPT (1 << 3)
#define WS_FRAGMENT_START (1 << 0)
#define WS_FRAGMENT_FIN (1 << 7)
#define WS_EXIT_MALLOC -1 #define WS_EXIT_MALLOC -1
#define WS_EXIT_PTHREAD_MUTEX_INIT -2 #define WS_EXIT_PTHREAD_MUTEX_INIT -2
#define WS_EXIT_PTHREAD_CREATE -3 #define WS_EXIT_PTHREAD_CREATE -3
@ -81,6 +84,7 @@ typedef struct _wsclient {
pthread_t handshake_thread; pthread_t handshake_thread;
pthread_t run_thread; pthread_t run_thread;
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_mutex_t send_lock;
char *URI; char *URI;
int sockfd; int sockfd;
int flags; int flags;