From: HimbeerserverDE Date: Sun, 3 Oct 2021 15:42:51 +0000 (+0200) Subject: Basic timeout detection X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=275acaf16eb8d9898ac7ff517267d37dabc08441;p=dragonnet.git Basic timeout detection --- diff --git a/listen.c b/listen.c index b504924..60644dd 100644 --- a/listen.c +++ b/listen.c @@ -23,6 +23,20 @@ static DragonnetPeer *dragonnet_peer_accept(int sock, struct sockaddr_in6 addr, p->laddr = l->laddr; p->raddr = dragonnet_addr_parse_sock(addr); + if (setsockopt(p->sock, SOL_SOCKET, SO_RCVTIMEO, &dragonnet_timeout, + sizeof dragonnet_timeout) < 0) { + perror("setsockopt"); + dragonnet_peer_delete(p); + return NULL; + } + + if (setsockopt(p->sock, SOL_SOCKET, SO_SNDTIMEO, &dragonnet_timeout, + sizeof dragonnet_timeout) < 0) { + perror("setsockopt"); + dragonnet_peer_delete(p); + return NULL; + } + pthread_rwlock_unlock(p->mu); return p; } @@ -31,7 +45,8 @@ static DragonnetPeer *dragonnet_peer_accept(int sock, struct sockaddr_in6 addr, // Listener // -------- -DragonnetListener *dragonnet_listener_new(char *addr, void (*on_connect)(DragonnetPeer *p)) +DragonnetListener *dragonnet_listener_new(char *addr, + void (*on_connect)(DragonnetPeer *p)) { DragonnetListener *l = malloc(sizeof *l); l->mu = malloc(sizeof *l->mu); @@ -41,8 +56,9 @@ DragonnetListener *dragonnet_listener_new(char *addr, void (*on_connect)(Dragonn l->sock = socket(AF_INET6, SOCK_STREAM, 0); l->on_connect = on_connect; - int flag = 1; - if (setsockopt(l->sock, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag) < 0) { + int so_reuseaddr = 1; + if (setsockopt(l->sock, SOL_SOCKET, SO_REUSEADDR, &so_reuseaddr, + sizeof so_reuseaddr) < 0) { perror("setsockopt"); dragonnet_listener_delete(l); return NULL; @@ -101,6 +117,7 @@ void dragonnet_listener_close(DragonnetListener *l) assert(l->state == DRAGONNET_LISTENER_ACTIVE); close(l->sock); + l->sock = 0; l->state++; pthread_rwlock_unlock(l->mu); diff --git a/listen.h b/listen.h index 6e322ea..f2e30b6 100644 --- a/listen.h +++ b/listen.h @@ -20,7 +20,8 @@ typedef struct { pthread_rwlock_t *mu; } DragonnetListener; -DragonnetListener *dragonnet_listener_new(char *addr, void (*on_connect)(DragonnetPeer *p)); +DragonnetListener *dragonnet_listener_new(char *addr, + void (*on_connect)(DragonnetPeer *p)); void dragonnet_listener_run(DragonnetListener *l); void dragonnet_listener_close(DragonnetListener *l); void dragonnet_listener_delete(DragonnetListener *l); diff --git a/peer.c b/peer.c index 9a6d57b..3dfa03a 100644 --- a/peer.c +++ b/peer.c @@ -3,6 +3,12 @@ #include #include "peer.h" +#include "recv_thread.h" + +const struct timeval dragonnet_timeout = { + .tv_sec = 30, + .tv_usec = 0 +}; DragonnetPeer *dragonnet_connect(char *addr) { @@ -14,8 +20,23 @@ DragonnetPeer *dragonnet_connect(char *addr) p->sock = socket(AF_INET6, SOCK_STREAM, 0); p->raddr = dragonnet_addr_parse_str(addr); + if (setsockopt(p->sock, SOL_SOCKET, SO_RCVTIMEO, &dragonnet_timeout, + sizeof dragonnet_timeout) < 0) { + perror("setsockopt"); + dragonnet_peer_delete(p); + return NULL; + } + + if (setsockopt(p->sock, SOL_SOCKET, SO_SNDTIMEO, &dragonnet_timeout, + sizeof dragonnet_timeout) < 0) { + perror("setsockopt"); + dragonnet_peer_delete(p); + return NULL; + } + struct sockaddr_in6 sock_addr = dragonnet_addr_sock(p->raddr); - if (connect(p->sock, (const struct sockaddr *) &sock_addr, sizeof sock_addr) < 0) { + if (connect(p->sock, (const struct sockaddr *) &sock_addr, + sizeof sock_addr) < 0) { perror("connect"); dragonnet_peer_delete(p); return NULL; @@ -36,13 +57,21 @@ DragonnetPeer *dragonnet_connect(char *addr) return p; } +void dragonnet_peer_run(DragonnetPeer *p) +{ + pthread_t recv_thread; + pthread_create(&recv_thread, NULL, &dragonnet_peer_recv_thread, p); + pthread_join(recv_thread, NULL); +} + void dragonnet_peer_close(DragonnetPeer *p) { pthread_rwlock_wrlock(p->mu); - assert(p->state == DRAGONNET_PEER_ACTIVE); - shutdown(p->sock, SHUT_RDWR); - p->state++; + if (p->state == DRAGONNET_PEER_ACTIVE) { + shutdown(p->sock, SHUT_RDWR); + p->state++; + } pthread_rwlock_unlock(p->mu); } diff --git a/peer.h b/peer.h index 4053bee..73050e1 100644 --- a/peer.h +++ b/peer.h @@ -5,6 +5,8 @@ #include "addr.h" +const extern struct timeval dragonnet_timeout; + typedef enum { DRAGONNET_PEER_CREATED, DRAGONNET_PEER_ACTIVE, @@ -20,6 +22,7 @@ typedef struct { } DragonnetPeer; DragonnetPeer *dragonnet_connect(char *addr); +void dragonnet_peer_run(DragonnetPeer *p); void dragonnet_peer_close(DragonnetPeer *p); void dragonnet_peer_delete(DragonnetPeer *p); diff --git a/recv_thread.c b/recv_thread.c new file mode 100644 index 0000000..a0ee5c6 --- /dev/null +++ b/recv_thread.c @@ -0,0 +1,47 @@ +#include +#include +#include +#include +#include +#include + +#include "peer.h" +#include "recv_thread.h" + +void *dragonnet_peer_recv_thread(void *g_peer) +{ + DragonnetPeer *p = (DragonnetPeer *) g_peer; + + pthread_rwlock_wrlock(p->mu); + assert(p->state == DRAGONNET_PEER_CREATED); + p->state++; + pthread_rwlock_unlock(p->mu); + + while (true) { + uint16_t msg; + + pthread_rwlock_rdlock(p->mu); + ssize_t len = recv(p->sock, &msg, sizeof msg, MSG_WAITALL); + pthread_rwlock_unlock(p->mu); + + if (len < 0 && errno != EWOULDBLOCK) { + perror("recv"); + dragonnet_peer_delete(p); + return NULL; + } + + // connection closed + if ((len >= 0 && len != sizeof msg) || errno == EWOULDBLOCK) { + pthread_rwlock_wrlock(p->mu); + + close(p->sock); + p->sock = 0; + p->state++; + + pthread_rwlock_unlock(p->mu); + return NULL; + } + + // deserialization + } +} diff --git a/recv_thread.h b/recv_thread.h new file mode 100644 index 0000000..ea847ba --- /dev/null +++ b/recv_thread.h @@ -0,0 +1,6 @@ +#ifndef _DRAGONNET_RECV_THREAD_H_ +#define _DRAGONNET_RECV_THREAD_H + +void *dragonnet_peer_recv_thread(void *g_peer); + +#endif