p->laddr = l->laddr;
p->raddr = dragonnet_addr_parse_sock(addr);
+ if (setsockopt(p->sock, SOL_SOCKET, SO_RCVTIMEO, &dragonnet_timeout,
+ sizeof dragonnet_timeout) < 0) {
+ perror("setsockopt");
+ dragonnet_peer_delete(p);
+ return NULL;
+ }
+
+ if (setsockopt(p->sock, SOL_SOCKET, SO_SNDTIMEO, &dragonnet_timeout,
+ sizeof dragonnet_timeout) < 0) {
+ perror("setsockopt");
+ dragonnet_peer_delete(p);
+ return NULL;
+ }
+
pthread_rwlock_unlock(p->mu);
return p;
}
// Listener
// --------
-DragonnetListener *dragonnet_listener_new(char *addr, void (*on_connect)(DragonnetPeer *p))
+DragonnetListener *dragonnet_listener_new(char *addr,
+ void (*on_connect)(DragonnetPeer *p))
{
DragonnetListener *l = malloc(sizeof *l);
l->mu = malloc(sizeof *l->mu);
l->sock = socket(AF_INET6, SOCK_STREAM, 0);
l->on_connect = on_connect;
- int flag = 1;
- if (setsockopt(l->sock, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag) < 0) {
+ int so_reuseaddr = 1;
+ if (setsockopt(l->sock, SOL_SOCKET, SO_REUSEADDR, &so_reuseaddr,
+ sizeof so_reuseaddr) < 0) {
perror("setsockopt");
dragonnet_listener_delete(l);
return NULL;
assert(l->state == DRAGONNET_LISTENER_ACTIVE);
close(l->sock);
+ l->sock = 0;
l->state++;
pthread_rwlock_unlock(l->mu);
pthread_rwlock_t *mu;
} DragonnetListener;
-DragonnetListener *dragonnet_listener_new(char *addr, void (*on_connect)(DragonnetPeer *p));
+DragonnetListener *dragonnet_listener_new(char *addr,
+ void (*on_connect)(DragonnetPeer *p));
void dragonnet_listener_run(DragonnetListener *l);
void dragonnet_listener_close(DragonnetListener *l);
void dragonnet_listener_delete(DragonnetListener *l);
#include <stdlib.h>
#include "peer.h"
+#include "recv_thread.h"
+
+const struct timeval dragonnet_timeout = {
+ .tv_sec = 30,
+ .tv_usec = 0
+};
DragonnetPeer *dragonnet_connect(char *addr)
{
p->sock = socket(AF_INET6, SOCK_STREAM, 0);
p->raddr = dragonnet_addr_parse_str(addr);
+ if (setsockopt(p->sock, SOL_SOCKET, SO_RCVTIMEO, &dragonnet_timeout,
+ sizeof dragonnet_timeout) < 0) {
+ perror("setsockopt");
+ dragonnet_peer_delete(p);
+ return NULL;
+ }
+
+ if (setsockopt(p->sock, SOL_SOCKET, SO_SNDTIMEO, &dragonnet_timeout,
+ sizeof dragonnet_timeout) < 0) {
+ perror("setsockopt");
+ dragonnet_peer_delete(p);
+ return NULL;
+ }
+
struct sockaddr_in6 sock_addr = dragonnet_addr_sock(p->raddr);
- if (connect(p->sock, (const struct sockaddr *) &sock_addr, sizeof sock_addr) < 0) {
+ if (connect(p->sock, (const struct sockaddr *) &sock_addr,
+ sizeof sock_addr) < 0) {
perror("connect");
dragonnet_peer_delete(p);
return NULL;
return p;
}
+void dragonnet_peer_run(DragonnetPeer *p)
+{
+ pthread_t recv_thread;
+ pthread_create(&recv_thread, NULL, &dragonnet_peer_recv_thread, p);
+ pthread_join(recv_thread, NULL);
+}
+
void dragonnet_peer_close(DragonnetPeer *p)
{
pthread_rwlock_wrlock(p->mu);
- assert(p->state == DRAGONNET_PEER_ACTIVE);
- shutdown(p->sock, SHUT_RDWR);
- p->state++;
+ if (p->state == DRAGONNET_PEER_ACTIVE) {
+ shutdown(p->sock, SHUT_RDWR);
+ p->state++;
+ }
pthread_rwlock_unlock(p->mu);
}
#include "addr.h"
+const extern struct timeval dragonnet_timeout;
+
typedef enum {
DRAGONNET_PEER_CREATED,
DRAGONNET_PEER_ACTIVE,
} DragonnetPeer;
DragonnetPeer *dragonnet_connect(char *addr);
+void dragonnet_peer_run(DragonnetPeer *p);
void dragonnet_peer_close(DragonnetPeer *p);
void dragonnet_peer_delete(DragonnetPeer *p);
--- /dev/null
+#include <assert.h>
+#include <errno.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include "peer.h"
+#include "recv_thread.h"
+
+void *dragonnet_peer_recv_thread(void *g_peer)
+{
+ DragonnetPeer *p = (DragonnetPeer *) g_peer;
+
+ pthread_rwlock_wrlock(p->mu);
+ assert(p->state == DRAGONNET_PEER_CREATED);
+ p->state++;
+ pthread_rwlock_unlock(p->mu);
+
+ while (true) {
+ uint16_t msg;
+
+ pthread_rwlock_rdlock(p->mu);
+ ssize_t len = recv(p->sock, &msg, sizeof msg, MSG_WAITALL);
+ pthread_rwlock_unlock(p->mu);
+
+ if (len < 0 && errno != EWOULDBLOCK) {
+ perror("recv");
+ dragonnet_peer_delete(p);
+ return NULL;
+ }
+
+ // connection closed
+ if ((len >= 0 && len != sizeof msg) || errno == EWOULDBLOCK) {
+ pthread_rwlock_wrlock(p->mu);
+
+ close(p->sock);
+ p->sock = 0;
+ p->state++;
+
+ pthread_rwlock_unlock(p->mu);
+ return NULL;
+ }
+
+ // deserialization
+ }
+}
--- /dev/null
+#ifndef _DRAGONNET_RECV_THREAD_H_
+#define _DRAGONNET_RECV_THREAD_H
+
+void *dragonnet_peer_recv_thread(void *g_peer);
+
+#endif