]> git.lizzy.rs Git - dragonnet.git/blob - recv_thread.c
Rework multithreading responsibilities and disconnect process
[dragonnet.git] / recv_thread.c
1 #include <assert.h>
2 #include <dragonnet/peer.h>
3 #include <dragonnet/recv.h>
4 #include <dragonnet/recv_thread.h>
5 #include <errno.h>
6 #include <pthread.h>
7 #include <stdbool.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <unistd.h>
12
13 void *dragonnet_peer_recv_thread(void *g_peer)
14 {
15         DragonnetPeer *p = (DragonnetPeer *) g_peer;
16
17         while (true) {
18                 DragonnetTypeId type_id;
19
20                 bool reset = false;
21
22                 ssize_t len = recv(p->sock, &type_id, sizeof type_id, MSG_WAITALL);
23                 if (len < 0) {
24                         if (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT) {
25                                 reset = true;
26                         } else {
27                                 perror("recv");
28                                 exit(EXIT_FAILURE);
29                         }
30                 }
31
32                 // Connection closed
33                 if (len == 0 || reset) {
34                         if (p->on_disconnect)
35                                 p->on_disconnect(p);
36
37                         close(p->sock);
38
39                         pthread_mutex_destroy(&p->mtx);
40                         free(p);
41                         return NULL;
42                 }
43
44                 type_id = be16toh(type_id);
45
46                 if (type_id >= dragonnet_num_types) {
47                         fprintf(stderr, "warning: received invalid type id %d\n", type_id);
48                         continue;
49                 }
50
51                 DragonnetType type = dragonnet_types[type_id];
52
53                 unsigned char buf[type.siz];
54                 memset(buf, 0, type.siz);
55
56                 if (!type.deserialize(p, buf)) {
57                         if (type.free != NULL)
58                                 type.free(buf);
59
60                         fprintf(stderr, "warning: failed to deserialize package of type %d\n", type_id);
61
62                         continue;
63                 }
64
65                 bool (*on_recv)(struct dragonnet_peer *, DragonnetTypeId, void *) = p->on_recv;
66                 void (*on_recv_type)(DragonnetPeer *, void *) = p->on_recv_type[type_id];
67
68                 if (on_recv != NULL && !on_recv(p, type_id, buf))
69                         on_recv_type = NULL;
70
71                 if (on_recv_type != NULL)
72                         on_recv_type(p, buf);
73
74                 if (type.free != NULL)
75                         type.free(buf);
76         }
77 }