]> git.lizzy.rs Git - dragonnet.git/commitdiff
Rework multithreading responsibilities and disconnect process
authorElias Fleckenstein <eliasfleckenstein@web.de>
Sat, 12 Feb 2022 22:12:32 +0000 (23:12 +0100)
committerElias Fleckenstein <eliasfleckenstein@web.de>
Sat, 12 Feb 2022 22:12:32 +0000 (23:12 +0100)
listen.c
listen.h
peer.c
peer.h
recv.c
recv.h
recv_thread.c
send.c
send.h

index b6239dc98d99cab51cd53eab62580ea7ceb51090..5571806ed9893f75b403312f8807b9ce6e186553 100644 (file)
--- a/listen.c
+++ b/listen.c
 static bool dragonnet_peer_init_accepted(DragonnetPeer *p, int sock,
                struct sockaddr_in6 addr, DragonnetListener *l)
 {
-       pthread_rwlock_init(&p->mu, NULL);
-       pthread_rwlock_wrlock(&p->mu);
+       pthread_mutex_init(&p->mtx, NULL);
 
-       pthread_rwlock_rdlock(&l->mu);
        p->sock = sock;
        p->laddr = l->laddr;
        p->raddr = dragonnet_addr_parse_sock(addr);
+       p->on_disconnect = l->on_disconnect;
+       p->on_recv = l->on_recv;
        p->on_recv_type = l->on_recv_type;
-       pthread_rwlock_unlock(&l->mu);
 
-       pthread_rwlock_unlock(&p->mu);
        return true;
 }
 
@@ -33,7 +31,8 @@ static DragonnetPeer *dragonnet_peer_accept(int sock, struct sockaddr_in6 addr,
 {
        DragonnetPeer *p = malloc(sizeof *p);
        if (!dragonnet_peer_init_accepted(p, sock, addr, l)) {
-               dragonnet_peer_delete(p);
+               pthread_mutex_destroy(&p->mtx);
+               free(p);
                return NULL;
        }
 
@@ -44,15 +43,16 @@ static DragonnetPeer *dragonnet_peer_accept(int sock, struct sockaddr_in6 addr,
 // Listener
 // --------
 
-DragonnetListener *dragonnet_listener_new(char *addr,
-               void (*on_connect)(DragonnetPeer *p))
+DragonnetListener *dragonnet_listener_new(char *addr)
 {
        DragonnetListener *l = malloc(sizeof *l);
        pthread_rwlock_init(&l->mu, NULL);
-       pthread_rwlock_wrlock(&l->mu);
 
+       l->active = true;
        l->sock = socket(AF_INET6, SOCK_STREAM, 0);
-       l->on_connect = on_connect;
+       l->on_connect = NULL;
+       l->on_disconnect = NULL;
+       l->on_recv = NULL;
        l->on_recv_type = calloc(sizeof *l->on_recv_type, dragonnet_num_types);
 
        int so_reuseaddr = 1;
@@ -78,35 +78,14 @@ DragonnetListener *dragonnet_listener_new(char *addr,
                return NULL;
        }
 
-       pthread_rwlock_unlock(&l->mu);
        return l;
 }
 
-void dragonnet_listener_set_recv_hook(DragonnetListener *l, DragonnetTypeId type_id,
-               void (*on_recv)(struct dragonnet_peer *, void *))
-{
-       pthread_rwlock_rdlock(&l->mu);
-       DragonnetListenerState state = l->state;
-       pthread_rwlock_unlock(&l->mu);
-
-       if (state >= DRAGONNET_LISTENER_ACTIVE)
-               return;
-
-       pthread_rwlock_wrlock(&l->mu);
-       l->on_recv_type[type_id] = on_recv;
-       pthread_rwlock_unlock(&l->mu);
-}
-
 static void *listener_main(void *g_listener)
 {
        DragonnetListener *l = (DragonnetListener *) g_listener;
 
-       pthread_rwlock_wrlock(&l->mu);
-       assert(l->state == DRAGONNET_LISTENER_CREATED);
-       l->state++;
-       pthread_rwlock_unlock(&l->mu);
-
-       while (l->state == DRAGONNET_LISTENER_ACTIVE) {
+       while (l->active) {
                struct sockaddr_in6 clt_addr;
                socklen_t clt_addrlen = sizeof clt_addr;
 
@@ -124,14 +103,14 @@ static void *listener_main(void *g_listener)
                if (p == NULL)
                        continue;
 
-               dragonnet_peer_run(p);
-
                pthread_rwlock_rdlock(&l->mu);
                void (*on_connect)(DragonnetPeer *) = l->on_connect;
                pthread_rwlock_unlock(&l->mu);
 
                if (on_connect != NULL)
                        on_connect(p);
+
+               dragonnet_peer_run(p);
        }
 
        return NULL;
@@ -144,18 +123,9 @@ void dragonnet_listener_run(DragonnetListener *l)
 
 void dragonnet_listener_close(DragonnetListener *l)
 {
-       pthread_rwlock_wrlock(&l->mu);
-
-       pthread_t accept_thread = l->accept_thread;
-       assert(l->state == DRAGONNET_LISTENER_ACTIVE);
-       close(l->sock);
-       l->sock = -1;
-       l->state++;
-
-       pthread_rwlock_unlock(&l->mu);
-
-       pthread_cancel(accept_thread);
-       pthread_join(accept_thread, NULL);
+       l->active = false;
+       pthread_cancel(l->accept_thread);
+       pthread_join(l->accept_thread, NULL);
 }
 
 void dragonnet_listener_delete(DragonnetListener *l)
index 2c2b0520b89e768ff19c9496ed870b3e35d2f220..ea76162207e76b4e09aae8434c8d72e3cd365b74 100644 (file)
--- a/listen.h
+++ b/listen.h
@@ -4,28 +4,21 @@
 #include <dragonnet/peer.h>
 #include <stdbool.h>
 
-typedef enum {
-       DRAGONNET_LISTENER_CREATED,
-       DRAGONNET_LISTENER_ACTIVE,
-       DRAGONNET_LISTENER_CLOSED
-} DragonnetListenerState;
-
 typedef struct {
        int sock;
        DragonnetAddr laddr;
-       DragonnetListenerState state;
        pthread_t accept_thread;
+       bool active;
 
        void (*on_connect)(DragonnetPeer *);
+       void (*on_disconnect)(DragonnetPeer *);
+       bool (*on_recv)(DragonnetPeer *, DragonnetTypeId, void *);
        void (**on_recv_type)(DragonnetPeer *, void *);
 
        pthread_rwlock_t mu;
 } DragonnetListener;
 
-DragonnetListener *dragonnet_listener_new(char *addr,
-               void (*on_connect)(DragonnetPeer *p));
-void dragonnet_listener_set_recv_hook(DragonnetListener *l, DragonnetTypeId type_id,
-               void (*on_recv)(struct dragonnet_peer *, void *));
+DragonnetListener *dragonnet_listener_new(char *addr);
 void dragonnet_listener_run(DragonnetListener *l);
 void dragonnet_listener_close(DragonnetListener *l);
 void dragonnet_listener_delete(DragonnetListener *l);
diff --git a/peer.c b/peer.c
index 5cd481af089367eb491b9183dd0b93ed565f1297..75eb6abe18876f5059965771e67613a3730a6d4a 100644 (file)
--- a/peer.c
+++ b/peer.c
@@ -8,11 +8,12 @@
 
 static bool dragonnet_peer_init(DragonnetPeer *p, char *addr)
 {
-       pthread_rwlock_init(&p->mu, NULL);
-       pthread_rwlock_wrlock(&p->mu);
+       pthread_mutex_init(&p->mtx, NULL);
 
        p->sock = socket(AF_INET6, SOCK_STREAM, 0);
        p->raddr = dragonnet_addr_parse_str(addr);
+       p->on_disconnect = NULL;
+       p->on_recv = NULL;
        p->on_recv_type = calloc(sizeof *p->on_recv_type, dragonnet_num_types);
 
        struct sockaddr_in6 sock_addr = dragonnet_addr_sock(p->raddr);
@@ -31,8 +32,6 @@ static bool dragonnet_peer_init(DragonnetPeer *p, char *addr)
        }
 
        p->laddr = dragonnet_addr_parse_sock(sock_name);
-
-       pthread_rwlock_unlock(&p->mu);
        return true;
 }
 
@@ -40,53 +39,20 @@ DragonnetPeer *dragonnet_connect(char *addr)
 {
        DragonnetPeer *p = malloc(sizeof *p);
        if (!dragonnet_peer_init(p, addr)) {
-               dragonnet_peer_delete(p);
+               pthread_mutex_destroy(&p->mtx);
+               free(p);
                return NULL;
        }
 
        return p;
 }
 
-void dragonnet_peer_set_recv_hook(DragonnetPeer *p, DragonnetTypeId type_id,
-               void (*on_recv)(struct dragonnet_peer *, void *))
-{
-       pthread_rwlock_rdlock(&p->mu);
-       DragonnetPeerState state = p->state;
-       pthread_rwlock_unlock(&p->mu);
-
-       if (state >= DRAGONNET_PEER_ACTIVE)
-               return;
-
-       pthread_rwlock_wrlock(&p->mu);
-       p->on_recv_type[type_id] = on_recv;
-       pthread_rwlock_unlock(&p->mu);
-}
-
 void dragonnet_peer_run(DragonnetPeer *p)
 {
-       pthread_rwlock_wrlock(&p->mu);
        pthread_create(&p->recv_thread, NULL, &dragonnet_peer_recv_thread, p);
-       pthread_rwlock_unlock(&p->mu);
-
-       while (p->state < DRAGONNET_PEER_ACTIVE);
-}
-
-void dragonnet_peer_close(DragonnetPeer *p)
-{
-       pthread_rwlock_wrlock(&p->mu);
-
-       pthread_t recv_thread = p->recv_thread;
-       if (p->state == DRAGONNET_PEER_ACTIVE)
-               shutdown(p->sock, SHUT_RDWR);
-
-       pthread_rwlock_unlock(&p->mu);
-
-       pthread_cancel(recv_thread);
-       pthread_join(recv_thread, NULL);
 }
 
-void dragonnet_peer_delete(DragonnetPeer *p)
+void dragonnet_peer_shutdown(DragonnetPeer *p)
 {
-       pthread_rwlock_destroy(&p->mu);
-       free(p);
+       shutdown(p->sock, SHUT_RDWR);
 }
diff --git a/peer.h b/peer.h
index e2c957a630e92fe993946a1df5229359e08704ee..8b1a65a68e3a59998901d29938c9bd6adc31c23b 100644 (file)
--- a/peer.h
+++ b/peer.h
@@ -6,31 +6,23 @@
 #include <stdbool.h>
 #include <stdint.h>
 
-typedef enum {
-       DRAGONNET_PEER_CREATED,
-       DRAGONNET_PEER_ACTIVE,
-       DRAGONNET_PEER_CLOSED
-} DragonnetPeerState;
-
 typedef uint16_t DragonnetTypeId;
 
 typedef struct dragonnet_peer {
        int sock;
        DragonnetAddr laddr, raddr;
-       DragonnetPeerState state;
        pthread_t recv_thread;
+       pthread_mutex_t mtx;
 
+       void (*on_disconnect)(struct dragonnet_peer *);
        bool (*on_recv)(struct dragonnet_peer *, DragonnetTypeId, void *);
        void (**on_recv_type)(struct dragonnet_peer *, void *);
 
-       pthread_rwlock_t mu;
+       void *extra;
 } DragonnetPeer;
 
 DragonnetPeer *dragonnet_connect(char *addr);
-void dragonnet_peer_set_recv_hook(DragonnetPeer *p, DragonnetTypeId type_id,
-               void (*on_recv)(struct dragonnet_peer *, void *));
 void dragonnet_peer_run(DragonnetPeer *p);
-void dragonnet_peer_close(DragonnetPeer *p);
-void dragonnet_peer_delete(DragonnetPeer *p);
+void dragonnet_peer_shutdown(DragonnetPeer *p);
 
 #endif
diff --git a/recv.c b/recv.c
index d69b687cf6cc4357345722a7ebe3cb0fad8837a5..7f586c9f532ace46c7a2abf1c947eeaa700f3c8e 100644 (file)
--- a/recv.c
+++ b/recv.c
@@ -4,27 +4,13 @@
 #include <string.h>
 #include <unistd.h>
 
-void dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n)
+bool dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n)
 {
-       pthread_rwlock_rdlock(&p->mu);
-       int sock = p->sock;
-       pthread_rwlock_unlock(&p->mu);
-
-       ssize_t len = recv(sock, buf, n, MSG_WAITALL);
+       ssize_t len = recv(p->sock, buf, n, MSG_WAITALL);
        if (len < 0) {
                perror("recv");
-               dragonnet_peer_delete(p);
-               return;
+               exit(EXIT_FAILURE);
        }
 
-       // Connection closed
-       if (len == 0) {
-               pthread_rwlock_wrlock(&p->mu);
-
-               close(p->sock);
-               p->sock = -1;
-               p->state++;
-
-               pthread_rwlock_unlock(&p->mu);
-       }
+       return len != 0;
 }
diff --git a/recv.h b/recv.h
index a9e0fb9ff841a2f07d970641290830db10f188da..adb39aa5b6314c222c624a29cfaab7a21d99e9f8 100644 (file)
--- a/recv.h
+++ b/recv.h
@@ -5,12 +5,13 @@
 
 typedef struct {
        size_t siz;
-       void (*deserialize)(DragonnetPeer *, void *);
+       bool (*deserialize)(DragonnetPeer *, void *);
+       void (*free)(void *);
 } DragonnetType;
 
 extern DragonnetTypeId dragonnet_num_types;
 extern DragonnetType dragonnet_types[];
 
-void dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n);
+bool dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n);
 
 #endif
index b9e344e8f476afee1268efc8872669462f8531bd..f76dc73bbf1639114cad5b4eaa44d68a8440a188 100644 (file)
@@ -2,61 +2,76 @@
 #include <dragonnet/peer.h>
 #include <dragonnet/recv.h>
 #include <dragonnet/recv_thread.h>
+#include <errno.h>
 #include <pthread.h>
 #include <stdbool.h>
 #include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
 #include <unistd.h>
 
 void *dragonnet_peer_recv_thread(void *g_peer)
 {
        DragonnetPeer *p = (DragonnetPeer *) g_peer;
 
-       pthread_rwlock_wrlock(&p->mu);
-       assert(p->state == DRAGONNET_PEER_CREATED);
-       p->state++;
-       pthread_rwlock_unlock(&p->mu);
-
        while (true) {
                DragonnetTypeId type_id;
 
-               // Copy socket fd so that shutdown doesn't block
-               pthread_rwlock_rdlock(&p->mu);
-               int sock = p->sock;
-               pthread_rwlock_unlock(&p->mu);
+               bool reset = false;
 
-               ssize_t len = recv(sock, &type_id, sizeof type_id, MSG_WAITALL);
+               ssize_t len = recv(p->sock, &type_id, sizeof type_id, MSG_WAITALL);
                if (len < 0) {
-                       perror("recv");
-                       dragonnet_peer_delete(p);
-                       return NULL;
+                       if (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT) {
+                               reset = true;
+                       } else {
+                               perror("recv");
+                               exit(EXIT_FAILURE);
+                       }
                }
 
                // Connection closed
-               if (len == 0) {
-                       pthread_rwlock_wrlock(&p->mu);
+               if (len == 0 || reset) {
+                       if (p->on_disconnect)
+                               p->on_disconnect(p);
 
                        close(p->sock);
-                       p->sock = -1;
-                       p->state++;
 
-                       pthread_rwlock_unlock(&p->mu);
+                       pthread_mutex_destroy(&p->mtx);
+                       free(p);
                        return NULL;
                }
 
                type_id = be16toh(type_id);
+
+               if (type_id >= dragonnet_num_types) {
+                       fprintf(stderr, "warning: received invalid type id %d\n", type_id);
+                       continue;
+               }
+
                DragonnetType type = dragonnet_types[type_id];
+
                unsigned char buf[type.siz];
-               type.deserialize(p, buf);
+               memset(buf, 0, type.siz);
+
+               if (!type.deserialize(p, buf)) {
+                       if (type.free != NULL)
+                               type.free(buf);
+
+                       fprintf(stderr, "warning: failed to deserialize package of type %d\n", type_id);
+
+                       continue;
+               }
 
-               pthread_rwlock_rdlock(&p->mu);
                bool (*on_recv)(struct dragonnet_peer *, DragonnetTypeId, void *) = p->on_recv;
                void (*on_recv_type)(DragonnetPeer *, void *) = p->on_recv_type[type_id];
-               pthread_rwlock_unlock(&p->mu);
 
                if (on_recv != NULL && !on_recv(p, type_id, buf))
                        on_recv_type = NULL;
 
                if (on_recv_type != NULL)
                        on_recv_type(p, buf);
+
+               if (type.free != NULL)
+                       type.free(buf);
        }
 }
diff --git a/send.c b/send.c
index 4e93593298bf342a32e13321578f674e6e2b6299..ce3736670abcd0b2324397d5ba522692a48ca65c 100644 (file)
--- a/send.c
+++ b/send.c
@@ -4,20 +4,23 @@
 #include <stdlib.h>
 #include <string.h>
 
-void dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n)
+bool dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n)
 {
-       pthread_rwlock_rdlock(&p->mu);
-       int sock = p->sock;
-       pthread_rwlock_unlock(&p->mu);
+       ssize_t len = send(p->sock, buf, n, MSG_NOSIGNAL | (submit ? 0 : MSG_MORE));
 
-       ssize_t len = send(sock, buf, n, MSG_NOSIGNAL | (submit ? 0 : MSG_MORE));
        if (len < 0) {
-               if (errno == EPIPE) {
-                       dragonnet_peer_close(p);
-                       return;
+               if (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT) {
+                       shutdown(p->sock, SHUT_RDWR);
+                       pthread_mutex_unlock(&p->mtx);
+                       return false;
                }
 
                perror("send");
-               dragonnet_peer_delete(p);
+               exit(EXIT_FAILURE);
        }
+
+       if (submit)
+               pthread_mutex_unlock(&p->mtx);
+
+       return true;
 }
diff --git a/send.h b/send.h
index 55bb71ac22008e6fbb2ab3dababf0c087bd6e9ef..206a198f054af900cdeed0ac171d24e25c09ef47 100644 (file)
--- a/send.h
+++ b/send.h
@@ -4,6 +4,6 @@
 #include <dragonnet/peer.h>
 #include <stdbool.h>
 
-void dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n);
+bool dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n);
 
 #endif