static bool dragonnet_peer_init_accepted(DragonnetPeer *p, int sock,
struct sockaddr_in6 addr, DragonnetListener *l)
{
- pthread_rwlock_init(&p->mu, NULL);
- pthread_rwlock_wrlock(&p->mu);
+ pthread_mutex_init(&p->mtx, NULL);
- pthread_rwlock_rdlock(&l->mu);
p->sock = sock;
p->laddr = l->laddr;
p->raddr = dragonnet_addr_parse_sock(addr);
+ p->on_disconnect = l->on_disconnect;
+ p->on_recv = l->on_recv;
p->on_recv_type = l->on_recv_type;
- pthread_rwlock_unlock(&l->mu);
- pthread_rwlock_unlock(&p->mu);
return true;
}
{
DragonnetPeer *p = malloc(sizeof *p);
if (!dragonnet_peer_init_accepted(p, sock, addr, l)) {
- dragonnet_peer_delete(p);
+ pthread_mutex_destroy(&p->mtx);
+ free(p);
return NULL;
}
// Listener
// --------
-DragonnetListener *dragonnet_listener_new(char *addr,
- void (*on_connect)(DragonnetPeer *p))
+DragonnetListener *dragonnet_listener_new(char *addr)
{
DragonnetListener *l = malloc(sizeof *l);
pthread_rwlock_init(&l->mu, NULL);
- pthread_rwlock_wrlock(&l->mu);
+ l->active = true;
l->sock = socket(AF_INET6, SOCK_STREAM, 0);
- l->on_connect = on_connect;
+ l->on_connect = NULL;
+ l->on_disconnect = NULL;
+ l->on_recv = NULL;
l->on_recv_type = calloc(sizeof *l->on_recv_type, dragonnet_num_types);
int so_reuseaddr = 1;
return NULL;
}
- pthread_rwlock_unlock(&l->mu);
return l;
}
-void dragonnet_listener_set_recv_hook(DragonnetListener *l, DragonnetTypeId type_id,
- void (*on_recv)(struct dragonnet_peer *, void *))
-{
- pthread_rwlock_rdlock(&l->mu);
- DragonnetListenerState state = l->state;
- pthread_rwlock_unlock(&l->mu);
-
- if (state >= DRAGONNET_LISTENER_ACTIVE)
- return;
-
- pthread_rwlock_wrlock(&l->mu);
- l->on_recv_type[type_id] = on_recv;
- pthread_rwlock_unlock(&l->mu);
-}
-
static void *listener_main(void *g_listener)
{
DragonnetListener *l = (DragonnetListener *) g_listener;
- pthread_rwlock_wrlock(&l->mu);
- assert(l->state == DRAGONNET_LISTENER_CREATED);
- l->state++;
- pthread_rwlock_unlock(&l->mu);
-
- while (l->state == DRAGONNET_LISTENER_ACTIVE) {
+ while (l->active) {
struct sockaddr_in6 clt_addr;
socklen_t clt_addrlen = sizeof clt_addr;
if (p == NULL)
continue;
- dragonnet_peer_run(p);
-
pthread_rwlock_rdlock(&l->mu);
void (*on_connect)(DragonnetPeer *) = l->on_connect;
pthread_rwlock_unlock(&l->mu);
if (on_connect != NULL)
on_connect(p);
+
+ dragonnet_peer_run(p);
}
return NULL;
void dragonnet_listener_close(DragonnetListener *l)
{
- pthread_rwlock_wrlock(&l->mu);
-
- pthread_t accept_thread = l->accept_thread;
- assert(l->state == DRAGONNET_LISTENER_ACTIVE);
- close(l->sock);
- l->sock = -1;
- l->state++;
-
- pthread_rwlock_unlock(&l->mu);
-
- pthread_cancel(accept_thread);
- pthread_join(accept_thread, NULL);
+ l->active = false;
+ pthread_cancel(l->accept_thread);
+ pthread_join(l->accept_thread, NULL);
}
void dragonnet_listener_delete(DragonnetListener *l)
#include <dragonnet/peer.h>
#include <stdbool.h>
-typedef enum {
- DRAGONNET_LISTENER_CREATED,
- DRAGONNET_LISTENER_ACTIVE,
- DRAGONNET_LISTENER_CLOSED
-} DragonnetListenerState;
-
typedef struct {
int sock;
DragonnetAddr laddr;
- DragonnetListenerState state;
pthread_t accept_thread;
+ bool active;
void (*on_connect)(DragonnetPeer *);
+ void (*on_disconnect)(DragonnetPeer *);
+ bool (*on_recv)(DragonnetPeer *, DragonnetTypeId, void *);
void (**on_recv_type)(DragonnetPeer *, void *);
pthread_rwlock_t mu;
} DragonnetListener;
-DragonnetListener *dragonnet_listener_new(char *addr,
- void (*on_connect)(DragonnetPeer *p));
-void dragonnet_listener_set_recv_hook(DragonnetListener *l, DragonnetTypeId type_id,
- void (*on_recv)(struct dragonnet_peer *, void *));
+DragonnetListener *dragonnet_listener_new(char *addr);
void dragonnet_listener_run(DragonnetListener *l);
void dragonnet_listener_close(DragonnetListener *l);
void dragonnet_listener_delete(DragonnetListener *l);
static bool dragonnet_peer_init(DragonnetPeer *p, char *addr)
{
- pthread_rwlock_init(&p->mu, NULL);
- pthread_rwlock_wrlock(&p->mu);
+ pthread_mutex_init(&p->mtx, NULL);
p->sock = socket(AF_INET6, SOCK_STREAM, 0);
p->raddr = dragonnet_addr_parse_str(addr);
+ p->on_disconnect = NULL;
+ p->on_recv = NULL;
p->on_recv_type = calloc(sizeof *p->on_recv_type, dragonnet_num_types);
struct sockaddr_in6 sock_addr = dragonnet_addr_sock(p->raddr);
}
p->laddr = dragonnet_addr_parse_sock(sock_name);
-
- pthread_rwlock_unlock(&p->mu);
return true;
}
{
DragonnetPeer *p = malloc(sizeof *p);
if (!dragonnet_peer_init(p, addr)) {
- dragonnet_peer_delete(p);
+ pthread_mutex_destroy(&p->mtx);
+ free(p);
return NULL;
}
return p;
}
-void dragonnet_peer_set_recv_hook(DragonnetPeer *p, DragonnetTypeId type_id,
- void (*on_recv)(struct dragonnet_peer *, void *))
-{
- pthread_rwlock_rdlock(&p->mu);
- DragonnetPeerState state = p->state;
- pthread_rwlock_unlock(&p->mu);
-
- if (state >= DRAGONNET_PEER_ACTIVE)
- return;
-
- pthread_rwlock_wrlock(&p->mu);
- p->on_recv_type[type_id] = on_recv;
- pthread_rwlock_unlock(&p->mu);
-}
-
void dragonnet_peer_run(DragonnetPeer *p)
{
- pthread_rwlock_wrlock(&p->mu);
pthread_create(&p->recv_thread, NULL, &dragonnet_peer_recv_thread, p);
- pthread_rwlock_unlock(&p->mu);
-
- while (p->state < DRAGONNET_PEER_ACTIVE);
-}
-
-void dragonnet_peer_close(DragonnetPeer *p)
-{
- pthread_rwlock_wrlock(&p->mu);
-
- pthread_t recv_thread = p->recv_thread;
- if (p->state == DRAGONNET_PEER_ACTIVE)
- shutdown(p->sock, SHUT_RDWR);
-
- pthread_rwlock_unlock(&p->mu);
-
- pthread_cancel(recv_thread);
- pthread_join(recv_thread, NULL);
}
-void dragonnet_peer_delete(DragonnetPeer *p)
+void dragonnet_peer_shutdown(DragonnetPeer *p)
{
- pthread_rwlock_destroy(&p->mu);
- free(p);
+ shutdown(p->sock, SHUT_RDWR);
}
#include <stdbool.h>
#include <stdint.h>
-typedef enum {
- DRAGONNET_PEER_CREATED,
- DRAGONNET_PEER_ACTIVE,
- DRAGONNET_PEER_CLOSED
-} DragonnetPeerState;
-
typedef uint16_t DragonnetTypeId;
typedef struct dragonnet_peer {
int sock;
DragonnetAddr laddr, raddr;
- DragonnetPeerState state;
pthread_t recv_thread;
+ pthread_mutex_t mtx;
+ void (*on_disconnect)(struct dragonnet_peer *);
bool (*on_recv)(struct dragonnet_peer *, DragonnetTypeId, void *);
void (**on_recv_type)(struct dragonnet_peer *, void *);
- pthread_rwlock_t mu;
+ void *extra;
} DragonnetPeer;
DragonnetPeer *dragonnet_connect(char *addr);
-void dragonnet_peer_set_recv_hook(DragonnetPeer *p, DragonnetTypeId type_id,
- void (*on_recv)(struct dragonnet_peer *, void *));
void dragonnet_peer_run(DragonnetPeer *p);
-void dragonnet_peer_close(DragonnetPeer *p);
-void dragonnet_peer_delete(DragonnetPeer *p);
+void dragonnet_peer_shutdown(DragonnetPeer *p);
#endif
#include <string.h>
#include <unistd.h>
-void dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n)
+bool dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n)
{
- pthread_rwlock_rdlock(&p->mu);
- int sock = p->sock;
- pthread_rwlock_unlock(&p->mu);
-
- ssize_t len = recv(sock, buf, n, MSG_WAITALL);
+ ssize_t len = recv(p->sock, buf, n, MSG_WAITALL);
if (len < 0) {
perror("recv");
- dragonnet_peer_delete(p);
- return;
+ exit(EXIT_FAILURE);
}
- // Connection closed
- if (len == 0) {
- pthread_rwlock_wrlock(&p->mu);
-
- close(p->sock);
- p->sock = -1;
- p->state++;
-
- pthread_rwlock_unlock(&p->mu);
- }
+ return len != 0;
}
typedef struct {
size_t siz;
- void (*deserialize)(DragonnetPeer *, void *);
+ bool (*deserialize)(DragonnetPeer *, void *);
+ void (*free)(void *);
} DragonnetType;
extern DragonnetTypeId dragonnet_num_types;
extern DragonnetType dragonnet_types[];
-void dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n);
+bool dragonnet_recv_raw(DragonnetPeer *p, void *buf, size_t n);
#endif
#include <dragonnet/peer.h>
#include <dragonnet/recv.h>
#include <dragonnet/recv_thread.h>
+#include <errno.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
#include <unistd.h>
void *dragonnet_peer_recv_thread(void *g_peer)
{
DragonnetPeer *p = (DragonnetPeer *) g_peer;
- pthread_rwlock_wrlock(&p->mu);
- assert(p->state == DRAGONNET_PEER_CREATED);
- p->state++;
- pthread_rwlock_unlock(&p->mu);
-
while (true) {
DragonnetTypeId type_id;
- // Copy socket fd so that shutdown doesn't block
- pthread_rwlock_rdlock(&p->mu);
- int sock = p->sock;
- pthread_rwlock_unlock(&p->mu);
+ bool reset = false;
- ssize_t len = recv(sock, &type_id, sizeof type_id, MSG_WAITALL);
+ ssize_t len = recv(p->sock, &type_id, sizeof type_id, MSG_WAITALL);
if (len < 0) {
- perror("recv");
- dragonnet_peer_delete(p);
- return NULL;
+ if (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT) {
+ reset = true;
+ } else {
+ perror("recv");
+ exit(EXIT_FAILURE);
+ }
}
// Connection closed
- if (len == 0) {
- pthread_rwlock_wrlock(&p->mu);
+ if (len == 0 || reset) {
+ if (p->on_disconnect)
+ p->on_disconnect(p);
close(p->sock);
- p->sock = -1;
- p->state++;
- pthread_rwlock_unlock(&p->mu);
+ pthread_mutex_destroy(&p->mtx);
+ free(p);
return NULL;
}
type_id = be16toh(type_id);
+
+ if (type_id >= dragonnet_num_types) {
+ fprintf(stderr, "warning: received invalid type id %d\n", type_id);
+ continue;
+ }
+
DragonnetType type = dragonnet_types[type_id];
+
unsigned char buf[type.siz];
- type.deserialize(p, buf);
+ memset(buf, 0, type.siz);
+
+ if (!type.deserialize(p, buf)) {
+ if (type.free != NULL)
+ type.free(buf);
+
+ fprintf(stderr, "warning: failed to deserialize package of type %d\n", type_id);
+
+ continue;
+ }
- pthread_rwlock_rdlock(&p->mu);
bool (*on_recv)(struct dragonnet_peer *, DragonnetTypeId, void *) = p->on_recv;
void (*on_recv_type)(DragonnetPeer *, void *) = p->on_recv_type[type_id];
- pthread_rwlock_unlock(&p->mu);
if (on_recv != NULL && !on_recv(p, type_id, buf))
on_recv_type = NULL;
if (on_recv_type != NULL)
on_recv_type(p, buf);
+
+ if (type.free != NULL)
+ type.free(buf);
}
}
#include <stdlib.h>
#include <string.h>
-void dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n)
+bool dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n)
{
- pthread_rwlock_rdlock(&p->mu);
- int sock = p->sock;
- pthread_rwlock_unlock(&p->mu);
+ ssize_t len = send(p->sock, buf, n, MSG_NOSIGNAL | (submit ? 0 : MSG_MORE));
- ssize_t len = send(sock, buf, n, MSG_NOSIGNAL | (submit ? 0 : MSG_MORE));
if (len < 0) {
- if (errno == EPIPE) {
- dragonnet_peer_close(p);
- return;
+ if (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT) {
+ shutdown(p->sock, SHUT_RDWR);
+ pthread_mutex_unlock(&p->mtx);
+ return false;
}
perror("send");
- dragonnet_peer_delete(p);
+ exit(EXIT_FAILURE);
}
+
+ if (submit)
+ pthread_mutex_unlock(&p->mtx);
+
+ return true;
}
#include <dragonnet/peer.h>
#include <stdbool.h>
-void dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n);
+bool dragonnet_send_raw(DragonnetPeer *p, bool submit, const void *buf, size_t n);
#endif