+#define _GNU_SOURCE
#include <assert.h>
-#include <dragontype/number.h>
+#include <dragonnet/peer.h>
+#include <dragonnet/recv.h>
+#include <dragonnet/recv_thread.h>
+#include <errno.h>
+#include <features.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
#include <unistd.h>
-#include "peer.h"
-#include "recv_thread.h"
-
void *dragonnet_peer_recv_thread(void *g_peer)
{
- DragonnetPeer *p = (DragonnetPeer *) g_peer;
+#ifdef __GLIBC__
+ pthread_setname_np(pthread_self(), "recv");
+#endif
- pthread_rwlock_wrlock(&p->mu);
- assert(p->state == DRAGONNET_PEER_CREATED);
- p->state++;
- pthread_rwlock_unlock(&p->mu);
+ DragonnetPeer *p = (DragonnetPeer *) g_peer;
while (true) {
- u16 msg;
+ DragonnetTypeId type_id;
- // Copy socket fd so that shutdown doesn't block
- pthread_rwlock_rdlock(&p->mu);
- int sock = p->sock;
- pthread_rwlock_unlock(&p->mu);
-
- ssize_t len = recv(sock, &msg, sizeof msg, MSG_WAITALL);
+ bool reset = false;
+ ssize_t len = recv(p->sock, &type_id, sizeof type_id, MSG_WAITALL);
if (len < 0) {
- perror("recv");
- dragonnet_peer_delete(p);
- return NULL;
+ if (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT) {
+ reset = true;
+ } else {
+ perror("recv");
+ exit(EXIT_FAILURE);
+ }
}
// Connection closed
- if (len == 0) {
- pthread_rwlock_wrlock(&p->mu);
+ if (len == 0 || reset) {
+ if (p->on_disconnect)
+ p->on_disconnect(p);
close(p->sock);
- p->sock = -1;
- p->state++;
- pthread_rwlock_unlock(&p->mu);
+ pthread_mutex_destroy(&p->mtx);
+ free(p);
return NULL;
}
- // Deserialization
+ type_id = be16toh(type_id);
+
+ if (type_id >= dragonnet_num_types) {
+ fprintf(stderr, "warning: received invalid type id %d\n", type_id);
+ continue;
+ }
+
+ DragonnetType type = dragonnet_types[type_id];
+
+ unsigned char buf[type.siz];
+ memset(buf, 0, type.siz);
+
+ if (!type.deserialize(p, buf)) {
+ if (type.free != NULL)
+ type.free(buf);
+
+ fprintf(stderr, "warning: failed to deserialize package of type %d\n", type_id);
+
+ continue;
+ }
+
+ bool (*on_recv)(struct dragonnet_peer *, DragonnetTypeId, void *) = p->on_recv;
+ void (*on_recv_type)(DragonnetPeer *, void *) = p->on_recv_type[type_id];
+
+ if (on_recv != NULL && !on_recv(p, type_id, buf))
+ on_recv_type = NULL;
+
+ if (on_recv_type != NULL)
+ on_recv_type(p, buf);
+
+ if (type.free != NULL)
+ type.free(buf);
}
}