]> git.lizzy.rs Git - dragonnet.git/blob - recv_thread.c
Add buffer based (de)serialization
[dragonnet.git] / recv_thread.c
1 #include <assert.h>
2 #include <dragontype/number.h>
3 #include <pthread.h>
4 #include <stdbool.h>
5 #include <stdio.h>
6 #include <unistd.h>
7
8 #include "peer.h"
9 #include "recv_thread.h"
10
11 void *dragonnet_peer_recv_thread(void *g_peer)
12 {
13         DragonnetPeer *p = (DragonnetPeer *) g_peer;
14
15         pthread_rwlock_wrlock(&p->mu);
16         assert(p->state == DRAGONNET_PEER_CREATED);
17         p->state++;
18         pthread_rwlock_unlock(&p->mu);
19
20         while (true) {
21                 u16 type;
22
23                 // Copy socket fd so that shutdown doesn't block
24                 pthread_rwlock_rdlock(&p->mu);
25                 int sock = p->sock;
26                 pthread_rwlock_unlock(&p->mu);
27
28                 ssize_t len = recv(sock, &type, sizeof type, MSG_WAITALL);
29                 if (len < 0) {
30                         perror("recv");
31                         dragonnet_peer_delete(p);
32                         return NULL;
33                 }
34
35                 // Connection closed
36                 if (len == 0) {
37                         pthread_rwlock_wrlock(&p->mu);
38
39                         close(p->sock);
40                         p->sock = -1;
41                         p->state++;
42
43                         pthread_rwlock_unlock(&p->mu);
44                         return NULL;
45                 }
46
47                 type = be16toh(type);
48
49                 pthread_rwlock_rdlock(&p->mu);
50                 void (*on_recv_type)(struct dragonnet_peer *, u16) = p->on_recv_type;
51                 pthread_rwlock_unlock(&p->mu);
52
53                 if (on_recv_type != NULL)
54                         on_recv_type(p, type);
55         }
56 }