]> git.lizzy.rs Git - dragonnet.git/blobdiff - recv_thread.c
Set thread names using GNU extension
[dragonnet.git] / recv_thread.c
index bff48ebb8b6443cd7867db987173137c1f6f8486..aefb33548d340b64a7e3b770858ddcf2ce2d3a26 100644 (file)
@@ -1,50 +1,83 @@
+#define _GNU_SOURCE
 #include <assert.h>
-#include <dragontype/number.h>
+#include <dragonnet/peer.h>
+#include <dragonnet/recv.h>
+#include <dragonnet/recv_thread.h>
+#include <errno.h>
+#include <features.h>
 #include <pthread.h>
 #include <stdbool.h>
 #include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
 #include <unistd.h>
 
-#include "peer.h"
-#include "recv_thread.h"
-
 void *dragonnet_peer_recv_thread(void *g_peer)
 {
-       DragonnetPeer *p = (DragonnetPeer *) g_peer;
+#ifdef __GLIBC__
+       pthread_setname_np(pthread_self(), "recv");
+#endif
 
-       pthread_rwlock_wrlock(&p->mu);
-       assert(p->state == DRAGONNET_PEER_CREATED);
-       p->state++;
-       pthread_rwlock_unlock(&p->mu);
+       DragonnetPeer *p = (DragonnetPeer *) g_peer;
 
        while (true) {
-               u16 msg;
+               DragonnetTypeId type_id;
 
-               // Copy socket fd so that shutdown doesn't block
-               pthread_rwlock_rdlock(&p->mu);
-               int sock = p->sock;
-               pthread_rwlock_unlock(&p->mu);
-
-               ssize_t len = recv(sock, &msg, sizeof msg, MSG_WAITALL);
+               bool reset = false;
 
+               ssize_t len = recv(p->sock, &type_id, sizeof type_id, MSG_WAITALL);
                if (len < 0) {
-                       perror("recv");
-                       dragonnet_peer_delete(p);
-                       return NULL;
+                       if (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT) {
+                               reset = true;
+                       } else {
+                               perror("recv");
+                               exit(EXIT_FAILURE);
+                       }
                }
 
                // Connection closed
-               if (len == 0) {
-                       pthread_rwlock_wrlock(&p->mu);
+               if (len == 0 || reset) {
+                       if (p->on_disconnect)
+                               p->on_disconnect(p);
 
                        close(p->sock);
-                       p->sock = -1;
-                       p->state++;
 
-                       pthread_rwlock_unlock(&p->mu);
+                       pthread_mutex_destroy(&p->mtx);
+                       free(p);
                        return NULL;
                }
 
-               // Deserialization
+               type_id = be16toh(type_id);
+
+               if (type_id >= dragonnet_num_types) {
+                       fprintf(stderr, "warning: received invalid type id %d\n", type_id);
+                       continue;
+               }
+
+               DragonnetType type = dragonnet_types[type_id];
+
+               unsigned char buf[type.siz];
+               memset(buf, 0, type.siz);
+
+               if (!type.deserialize(p, buf)) {
+                       if (type.free != NULL)
+                               type.free(buf);
+
+                       fprintf(stderr, "warning: failed to deserialize package of type %d\n", type_id);
+
+                       continue;
+               }
+
+               bool (*on_recv)(struct dragonnet_peer *, DragonnetTypeId, void *) = p->on_recv;
+               void (*on_recv_type)(DragonnetPeer *, void *) = p->on_recv_type[type_id];
+
+               if (on_recv != NULL && !on_recv(p, type_id, buf))
+                       on_recv_type = NULL;
+
+               if (on_recv_type != NULL)
+                       on_recv_type(p, buf);
+
+               if (type.free != NULL)
+                       type.free(buf);
        }
 }