From: Elias Fleckenstein Date: Sat, 12 Feb 2022 22:12:32 +0000 (+0100) Subject: Rework multithreading responsibilities and disconnect process X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=b2fd5e955c0f910fdd1abf89626cfe324efbfe90;p=dragonnet.git Rework multithreading responsibilities and disconnect process --- diff --git a/listen.c b/listen.c index b6239dc..5571806 100644 --- a/listen.c +++ b/listen.c @@ -14,17 +14,15 @@ static bool dragonnet_peer_init_accepted(DragonnetPeer *p, int sock, struct sockaddr_in6 addr, DragonnetListener *l) { - pthread_rwlock_init(&p->mu, NULL); - pthread_rwlock_wrlock(&p->mu); + pthread_mutex_init(&p->mtx, NULL); - pthread_rwlock_rdlock(&l->mu); p->sock = sock; p->laddr = l->laddr; p->raddr = dragonnet_addr_parse_sock(addr); + p->on_disconnect = l->on_disconnect; + p->on_recv = l->on_recv; p->on_recv_type = l->on_recv_type; - pthread_rwlock_unlock(&l->mu); - pthread_rwlock_unlock(&p->mu); return true; } @@ -33,7 +31,8 @@ static DragonnetPeer *dragonnet_peer_accept(int sock, struct sockaddr_in6 addr, { DragonnetPeer *p = malloc(sizeof *p); if (!dragonnet_peer_init_accepted(p, sock, addr, l)) { - dragonnet_peer_delete(p); + pthread_mutex_destroy(&p->mtx); + free(p); return NULL; } @@ -44,15 +43,16 @@ static DragonnetPeer *dragonnet_peer_accept(int sock, struct sockaddr_in6 addr, // Listener // -------- -DragonnetListener *dragonnet_listener_new(char *addr, - void (*on_connect)(DragonnetPeer *p)) +DragonnetListener *dragonnet_listener_new(char *addr) { DragonnetListener *l = malloc(sizeof *l); pthread_rwlock_init(&l->mu, NULL); - pthread_rwlock_wrlock(&l->mu); + l->active = true; l->sock = socket(AF_INET6, SOCK_STREAM, 0); - l->on_connect = on_connect; + l->on_connect = NULL; + l->on_disconnect = NULL; + l->on_recv = NULL; l->on_recv_type = calloc(sizeof *l->on_recv_type, dragonnet_num_types); int so_reuseaddr = 1; @@ -78,35 +78,14 @@ DragonnetListener *dragonnet_listener_new(char *addr, return NULL; } - pthread_rwlock_unlock(&l->mu); return l; } -void dragonnet_listener_set_recv_hook(DragonnetListener *l, DragonnetTypeId type_id, - void (*on_recv)(struct dragonnet_peer *, void *)) -{ - pthread_rwlock_rdlock(&l->mu); - DragonnetListenerState state = l->state; - pthread_rwlock_unlock(&l->mu); - - if (state >= DRAGONNET_LISTENER_ACTIVE) - return; - - pthread_rwlock_wrlock(&l->mu); - l->on_recv_type[type_id] = on_recv; - pthread_rwlock_unlock(&l->mu); -} - static void *listener_main(void *g_listener) { DragonnetListener *l = (DragonnetListener *) g_listener; - pthread_rwlock_wrlock(&l->mu); - assert(l->state == DRAGONNET_LISTENER_CREATED); - l->state++; - pthread_rwlock_unlock(&l->mu); - - while (l->state == DRAGONNET_LISTENER_ACTIVE) { + while (l->active) { struct sockaddr_in6 clt_addr; socklen_t clt_addrlen = sizeof clt_addr; @@ -124,14 +103,14 @@ static void *listener_main(void *g_listener) if (p == NULL) continue; - dragonnet_peer_run(p); - pthread_rwlock_rdlock(&l->mu); void (*on_connect)(DragonnetPeer *) = l->on_connect; pthread_rwlock_unlock(&l->mu); if (on_connect != NULL) on_connect(p); + + dragonnet_peer_run(p); } return NULL; @@ -144,18 +123,9 @@ void dragonnet_listener_run(DragonnetListener *l) void dragonnet_listener_close(DragonnetListener *l) { - pthread_rwlock_wrlock(&l->mu); - - pthread_t accept_thread = l->accept_thread; - assert(l->state == DRAGONNET_LISTENER_ACTIVE); - close(l->sock); - l->sock = -1; - l->state++; - - pthread_rwlock_unlock(&l->mu); - - pthread_cancel(accept_thread); - pthread_join(accept_thread, NULL); + l->active = false; + pthread_cancel(l->accept_thread); + pthread_join(l->accept_thread, NULL); } void dragonnet_listener_delete(DragonnetListener *l) diff --git a/listen.h b/listen.h index 2c2b052..ea76162 100644 --- a/listen.h +++ b/listen.h @@ -4,28 +4,21 @@ #include #include -typedef enum { - DRAGONNET_LISTENER_CREATED, - DRAGONNET_LISTENER_ACTIVE, - DRAGONNET_LISTENER_CLOSED -} DragonnetListenerState; - typedef struct { int sock; DragonnetAddr laddr; - DragonnetListenerState state; pthread_t accept_thread; + bool active; void (*on_connect)(DragonnetPeer *); + void (*on_disconnect)(DragonnetPeer *); + bool (*on_recv)(DragonnetPeer *, DragonnetTypeId, void *); void (**on_recv_type)(DragonnetPeer *, void *); pthread_rwlock_t mu; } DragonnetListener; -DragonnetListener *dragonnet_listener_new(char *addr, - void (*on_connect)(DragonnetPeer *p)); -void dragonnet_listener_set_recv_hook(DragonnetListener *l, DragonnetTypeId type_id, - void (*on_recv)(struct dragonnet_peer *, void *)); +DragonnetListener *dragonnet_listener_new(char *addr); void dragonnet_listener_run(DragonnetListener *l); void dragonnet_listener_close(DragonnetListener *l); void dragonnet_listener_delete(DragonnetListener *l); diff --git a/peer.c b/peer.c index 5cd481a..75eb6ab 100644 --- a/peer.c +++ b/peer.c @@ -8,11 +8,12 @@ static bool dragonnet_peer_init(DragonnetPeer *p, char *addr) { - pthread_rwlock_init(&p->mu, NULL); - pthread_rwlock_wrlock(&p->mu); + pthread_mutex_init(&p->mtx, NULL); p->sock = socket(AF_INET6, SOCK_STREAM, 0); p->raddr = dragonnet_addr_parse_str(addr); + p->on_disconnect = NULL; + p->on_recv = NULL; p->on_recv_type = calloc(sizeof *p->on_recv_type, dragonnet_num_types); struct sockaddr_in6 sock_addr = dragonnet_addr_sock(p->raddr); @@ -31,8 +32,6 @@ static bool dragonnet_peer_init(DragonnetPeer *p, char *addr) } p->laddr = dragonnet_addr_parse_sock(sock_name); - - pthread_rwlock_unlock(&p->mu); return true; } @@ -40,53 +39,20 @@ DragonnetPeer *dragonnet_connect(char *addr) { DragonnetPeer *p = malloc(sizeof *p); if (!dragonnet_peer_init(p, addr)) { - dragonnet_peer_delete(p); + pthread_mutex_destroy(&p->mtx); + free(p); return NULL; } return p; } -void dragonnet_peer_set_recv_hook(DragonnetPeer *p, DragonnetTypeId type_id, - void (*on_recv)(struct dragonnet_peer *, void *)) -{ - pthread_rwlock_rdlock(&p->mu); - DragonnetPeerState state = p->state; - pthread_rwlock_unlock(&p->mu); - - if (state >= DRAGONNET_PEER_ACTIVE) - return; - - pthread_rwlock_wrlock(&p->mu); - p->on_recv_type[type_id] = on_recv; - pthread_rwlock_unlock(&p->mu); -} - void dragonnet_peer_run(DragonnetPeer *p) { - pthread_rwlock_wrlock(&p->mu); pthread_create(&p->recv_thread, NULL, &dragonnet_peer_recv_thread, p); - pthread_rwlock_unlock(&p->mu); - - while (p->state < DRAGONNET_PEER_ACTIVE); -} - -void dragonnet_peer_close(DragonnetPeer *p) -{ - pthread_rwlock_wrlock(&p->mu); - - pthread_t recv_thread = p->recv_thread; - if (p->state == DRAGONNET_PEER_ACTIVE) - shutdown(p->sock, SHUT_RDWR); - - pthread_rwlock_unlock(&p->mu); - - pthread_cancel(recv_thread); - pthread_join(recv_thread, NULL); } -void dragonnet_peer_delete(DragonnetPeer *p) +void dragonnet_peer_shutdown(DragonnetPeer *p) { - pthread_rwlock_destroy(&p->mu); - free(p); + shutdown(p->sock, SHUT_RDWR); } diff --git a/peer.h b/peer.h index e2c957a..8b1a65a 100644 --- a/peer.h +++ b/peer.h @@ -6,31 +6,23 @@ #include #include -typedef enum { - DRAGONNET_PEER_CREATED, - DRAGONNET_PEER_ACTIVE, - DRAGONNET_PEER_CLOSED -} DragonnetPeerState; - typedef uint16_t DragonnetTypeId; typedef struct dragonnet_peer { int sock; DragonnetAddr laddr, raddr; - DragonnetPeerState state; pthread_t recv_thread; + pthread_mutex_t mtx; + void (*on_disconnect)(struct dragonnet_peer *); bool (*on_recv)(struct dragonnet_peer *, DragonnetTypeId, void *); void (**on_recv_type)(struct dragonnet_peer *, void *); - pthread_rwlock_t mu; + void *extra; } DragonnetPeer; DragonnetPeer *dragonnet_connect(char *addr); -void dragonnet_peer_set_recv_hook(DragonnetPeer *p, DragonnetTypeId type_id, - void (*on_recv)(struct dragonnet_peer *, void *)); void dragonnet_peer_run(DragonnetPeer *p); -void dragonnet_peer_close(DragonnetPeer *p); -void dragonnet_peer_delete(DragonnetPeer *p); +void dragonnet_peer_shutdown(DragonnetPeer *p); #endif diff --git a/recv.c b/recv.c index d69b687..7f586c9 100644 --- a/recv.c +++ b/recv.c @@ -4,27 +4,13 @@ #include #include -void dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n) +bool dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n) { - pthread_rwlock_rdlock(&p->mu); - int sock = p->sock; - pthread_rwlock_unlock(&p->mu); - - ssize_t len = recv(sock, buf, n, MSG_WAITALL); + ssize_t len = recv(p->sock, buf, n, MSG_WAITALL); if (len < 0) { perror("recv"); - dragonnet_peer_delete(p); - return; + exit(EXIT_FAILURE); } - // Connection closed - if (len == 0) { - pthread_rwlock_wrlock(&p->mu); - - close(p->sock); - p->sock = -1; - p->state++; - - pthread_rwlock_unlock(&p->mu); - } + return len != 0; } diff --git a/recv.h b/recv.h index a9e0fb9..adb39aa 100644 --- a/recv.h +++ b/recv.h @@ -5,12 +5,13 @@ typedef struct { size_t siz; - void (*deserialize)(DragonnetPeer *, void *); + bool (*deserialize)(DragonnetPeer *, void *); + void (*free)(void *); } DragonnetType; extern DragonnetTypeId dragonnet_num_types; extern DragonnetType dragonnet_types[]; -void dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n); +bool dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n); #endif diff --git a/recv_thread.c b/recv_thread.c index b9e344e..f76dc73 100644 --- a/recv_thread.c +++ b/recv_thread.c @@ -2,61 +2,76 @@ #include #include #include +#include #include #include #include +#include +#include #include void *dragonnet_peer_recv_thread(void *g_peer) { DragonnetPeer *p = (DragonnetPeer *) g_peer; - pthread_rwlock_wrlock(&p->mu); - assert(p->state == DRAGONNET_PEER_CREATED); - p->state++; - pthread_rwlock_unlock(&p->mu); - while (true) { DragonnetTypeId type_id; - // Copy socket fd so that shutdown doesn't block - pthread_rwlock_rdlock(&p->mu); - int sock = p->sock; - pthread_rwlock_unlock(&p->mu); + bool reset = false; - ssize_t len = recv(sock, &type_id, sizeof type_id, MSG_WAITALL); + ssize_t len = recv(p->sock, &type_id, sizeof type_id, MSG_WAITALL); if (len < 0) { - perror("recv"); - dragonnet_peer_delete(p); - return NULL; + if (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT) { + reset = true; + } else { + perror("recv"); + exit(EXIT_FAILURE); + } } // Connection closed - if (len == 0) { - pthread_rwlock_wrlock(&p->mu); + if (len == 0 || reset) { + if (p->on_disconnect) + p->on_disconnect(p); close(p->sock); - p->sock = -1; - p->state++; - pthread_rwlock_unlock(&p->mu); + pthread_mutex_destroy(&p->mtx); + free(p); return NULL; } type_id = be16toh(type_id); + + if (type_id >= dragonnet_num_types) { + fprintf(stderr, "warning: received invalid type id %d\n", type_id); + continue; + } + DragonnetType type = dragonnet_types[type_id]; + unsigned char buf[type.siz]; - type.deserialize(p, buf); + memset(buf, 0, type.siz); + + if (!type.deserialize(p, buf)) { + if (type.free != NULL) + type.free(buf); + + fprintf(stderr, "warning: failed to deserialize package of type %d\n", type_id); + + continue; + } - pthread_rwlock_rdlock(&p->mu); bool (*on_recv)(struct dragonnet_peer *, DragonnetTypeId, void *) = p->on_recv; void (*on_recv_type)(DragonnetPeer *, void *) = p->on_recv_type[type_id]; - pthread_rwlock_unlock(&p->mu); if (on_recv != NULL && !on_recv(p, type_id, buf)) on_recv_type = NULL; if (on_recv_type != NULL) on_recv_type(p, buf); + + if (type.free != NULL) + type.free(buf); } } diff --git a/send.c b/send.c index 4e93593..ce37366 100644 --- a/send.c +++ b/send.c @@ -4,20 +4,23 @@ #include #include -void dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n) +bool dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n) { - pthread_rwlock_rdlock(&p->mu); - int sock = p->sock; - pthread_rwlock_unlock(&p->mu); + ssize_t len = send(p->sock, buf, n, MSG_NOSIGNAL | (submit ? 0 : MSG_MORE)); - ssize_t len = send(sock, buf, n, MSG_NOSIGNAL | (submit ? 0 : MSG_MORE)); if (len < 0) { - if (errno == EPIPE) { - dragonnet_peer_close(p); - return; + if (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT) { + shutdown(p->sock, SHUT_RDWR); + pthread_mutex_unlock(&p->mtx); + return false; } perror("send"); - dragonnet_peer_delete(p); + exit(EXIT_FAILURE); } + + if (submit) + pthread_mutex_unlock(&p->mtx); + + return true; } diff --git a/send.h b/send.h index 55bb71a..206a198 100644 --- a/send.h +++ b/send.h @@ -4,6 +4,6 @@ #include #include -void dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n); +bool dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n); #endif