X-Git-Url: https://git.lizzy.rs/?a=blobdiff_plain;f=listen.c;h=b65ca8baea151ab2093a9c1d0bccbd93f8c2dd06;hb=84c6a0ddf3ef03e22786a4f5bb780dbf875a6919;hp=440f889f944f0aecd8f12ae15aeccf4b5ae843b2;hpb=7639851f75c977923e636044af1c44508424b75e;p=dragonnet.git diff --git a/listen.c b/listen.c index 440f889..b65ca8b 100644 --- a/listen.c +++ b/listen.c @@ -1,30 +1,44 @@ +#define _GNU_SOURCE #include +#include +#include +#include +#include #include +#include #include #include #include #include -#include "listen.h" - // ---- // Peer // ---- -static DragonnetPeer *dragonnet_peer_accept(int sock, struct sockaddr_in6 addr, - DragonnetListener *l) +static bool dragonnet_peer_init_accepted(DragonnetPeer *p, int sock, + struct sockaddr_in6 addr, DragonnetListener *l) { - DragonnetPeer *p = malloc(sizeof *p); - p->mu = malloc(sizeof *p->mu); - pthread_rwlock_init(p->mu, NULL); - pthread_rwlock_wrlock(p->mu); + pthread_mutex_init(&p->mtx, NULL); p->sock = sock; p->laddr = l->laddr; p->raddr = dragonnet_addr_parse_sock(addr); + p->on_disconnect = l->on_disconnect; + p->on_recv = l->on_recv; + p->on_recv_type = l->on_recv_type; + + return true; +} - if (p != NULL) - pthread_rwlock_unlock(p->mu); +static DragonnetPeer *dragonnet_peer_accept(int sock, struct sockaddr_in6 addr, + DragonnetListener *l) +{ + DragonnetPeer *p = malloc(sizeof *p); + if (!dragonnet_peer_init_accepted(p, sock, addr, l)) { + pthread_mutex_destroy(&p->mtx); + free(p); + return NULL; + } return p; } @@ -33,18 +47,20 @@ static DragonnetPeer *dragonnet_peer_accept(int sock, struct sockaddr_in6 addr, // Listener // -------- -DragonnetListener *dragonnet_listener_new(char *addr, void (*on_connect)(DragonnetPeer *p)) +DragonnetListener *dragonnet_listener_new(char *addr) { DragonnetListener *l = malloc(sizeof *l); - l->mu = malloc(sizeof *l->mu); - pthread_rwlock_init(l->mu, NULL); - pthread_rwlock_wrlock(l->mu); + l->active = true; l->sock = socket(AF_INET6, SOCK_STREAM, 0); - l->on_connect = on_connect; - - int flag = 1; - if (setsockopt(l->sock, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag) < 0) { + l->on_connect = NULL; + l->on_disconnect = NULL; + l->on_recv = NULL; + l->on_recv_type = calloc(sizeof *l->on_recv_type, dragonnet_num_types); + + int so_reuseaddr = 1; + if (setsockopt(l->sock, SOL_SOCKET, SO_REUSEADDR, &so_reuseaddr, + sizeof so_reuseaddr) < 0) { perror("setsockopt"); dragonnet_listener_delete(l); return NULL; @@ -65,26 +81,25 @@ DragonnetListener *dragonnet_listener_new(char *addr, void (*on_connect)(Dragonn return NULL; } - pthread_rwlock_unlock(l->mu); return l; } -void dragonnet_listener_run(DragonnetListener *l) +static void *listener_main(void *g_listener) { - pthread_rwlock_wrlock(l->mu); - - assert(l->state == DRAGONNET_LISTENER_CREATED); - l->state++; +#ifdef __GLIBC__ + pthread_setname_np(pthread_self(), "listen"); +#endif - pthread_rwlock_unlock(l->mu); + DragonnetListener *l = (DragonnetListener *) g_listener; - while (l->state == DRAGONNET_LISTENER_ACTIVE) { + while (l->active) { struct sockaddr_in6 clt_addr; socklen_t clt_addrlen = sizeof clt_addr; int clt_sock = accept(l->sock, (struct sockaddr *) &clt_addr, &clt_addrlen); if (clt_sock < 0) { - perror("accept"); + if (errno != EINTR) + perror("accept"); continue; } @@ -92,24 +107,32 @@ void dragonnet_listener_run(DragonnetListener *l) if (p == NULL) continue; - if (l->on_connect != NULL) - l->on_connect(p); + void (*on_connect)(DragonnetPeer *) = l->on_connect; + + if (on_connect != NULL) + on_connect(p); + + dragonnet_peer_run(p); } + + return NULL; } -void dragonnet_listener_close(DragonnetListener *l) +void dragonnet_listener_run(DragonnetListener *l) { - pthread_rwlock_wrlock(l->mu); + pthread_create(&l->accept_thread, NULL, &listener_main, l); +} - assert(l->state == DRAGONNET_LISTENER_ACTIVE); - close(l->sock); - l->state++; +void dragonnet_listener_close(DragonnetListener *l) +{ + l->active = false; - pthread_rwlock_unlock(l->mu); + pthread_kill(l->accept_thread, SIGINT); + pthread_join(l->accept_thread, NULL); } void dragonnet_listener_delete(DragonnetListener *l) { - pthread_rwlock_destroy(l->mu); + free(l->on_recv_type); free(l); }