]> git.lizzy.rs Git - dragonnet.git/blob - recv_thread.c
b9e344e8f476afee1268efc8872669462f8531bd
[dragonnet.git] / recv_thread.c
1 #include <assert.h>
2 #include <dragonnet/peer.h>
3 #include <dragonnet/recv.h>
4 #include <dragonnet/recv_thread.h>
5 #include <pthread.h>
6 #include <stdbool.h>
7 #include <stdio.h>
8 #include <unistd.h>
9
10 void *dragonnet_peer_recv_thread(void *g_peer)
11 {
12         DragonnetPeer *p = (DragonnetPeer *) g_peer;
13
14         pthread_rwlock_wrlock(&p->mu);
15         assert(p->state == DRAGONNET_PEER_CREATED);
16         p->state++;
17         pthread_rwlock_unlock(&p->mu);
18
19         while (true) {
20                 DragonnetTypeId type_id;
21
22                 // Copy socket fd so that shutdown doesn't block
23                 pthread_rwlock_rdlock(&p->mu);
24                 int sock = p->sock;
25                 pthread_rwlock_unlock(&p->mu);
26
27                 ssize_t len = recv(sock, &type_id, sizeof type_id, MSG_WAITALL);
28                 if (len < 0) {
29                         perror("recv");
30                         dragonnet_peer_delete(p);
31                         return NULL;
32                 }
33
34                 // Connection closed
35                 if (len == 0) {
36                         pthread_rwlock_wrlock(&p->mu);
37
38                         close(p->sock);
39                         p->sock = -1;
40                         p->state++;
41
42                         pthread_rwlock_unlock(&p->mu);
43                         return NULL;
44                 }
45
46                 type_id = be16toh(type_id);
47                 DragonnetType type = dragonnet_types[type_id];
48                 unsigned char buf[type.siz];
49                 type.deserialize(p, buf);
50
51                 pthread_rwlock_rdlock(&p->mu);
52                 bool (*on_recv)(struct dragonnet_peer *, DragonnetTypeId, void *) = p->on_recv;
53                 void (*on_recv_type)(DragonnetPeer *, void *) = p->on_recv_type[type_id];
54                 pthread_rwlock_unlock(&p->mu);
55
56                 if (on_recv != NULL && !on_recv(p, type_id, buf))
57                         on_recv_type = NULL;
58
59                 if (on_recv_type != NULL)
60                         on_recv_type(p, buf);
61         }
62 }