From: Elias Fleckenstein Date: Sun, 13 Feb 2022 16:31:45 +0000 (+0100) Subject: Implement queue waiting X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=04c93844e4e4185d91950f0f5473bc88a84b5461;p=dragonstd.git Implement queue waiting --- diff --git a/queue.c b/queue.c index 04aaccd..37dce73 100644 --- a/queue.c +++ b/queue.c @@ -1,16 +1,20 @@ +#include #include #include "queue.h" Queue *queue_create() { Queue *queue = malloc(sizeof(Queue)); + queue->cancel = false; queue->list = list_create(NULL); + pthread_cond_init(&queue->cv, NULL); pthread_mutex_init(&queue->mtx, NULL); return queue; } void queue_delete(Queue *queue) { + pthread_cond_destroy(&queue->cv); pthread_mutex_destroy(&queue->mtx); list_clear(&queue->list); free(queue); @@ -20,6 +24,7 @@ void queue_enqueue(Queue *queue, void *elem) { pthread_mutex_lock(&queue->mtx); list_put(&queue->list, elem, NULL); + pthread_cond_signal(&queue->cv); pthread_mutex_unlock(&queue->mtx); } @@ -30,18 +35,35 @@ void *dequeue(Queue *queue) void *queue_dequeue_callback(Queue *queue, void (*callback)(void *elem)) { - pthread_mutex_lock(&queue->mtx); void *elem = NULL; - ListPair **lptr = &queue->list.first; - if (*lptr) { - elem = (*lptr)->key; - ListPair *next = (*lptr)->next; - free(*lptr); - *lptr = next; - - if (callback) - callback(elem); + + while (! queue->cancel && ! elem) { + pthread_mutex_lock(&queue->mtx); + + ListPair **lptr = &queue->list.first; + if (*lptr) { + elem = (*lptr)->key; + ListPair *next = (*lptr)->next; + free(*lptr); + *lptr = next; + + if (callback) + callback(elem); + } else { + pthread_cond_wait(&queue->cv, &queue->mtx); + } + + pthread_mutex_unlock(&queue->mtx); } - pthread_mutex_unlock(&queue->mtx); + return elem; } + +void queue_cancel(Queue *queue) +{ + queue->cancel = true; + + pthread_mutex_lock(&queue->mtx); + pthread_cond_broadcast(&queue->cv); + pthread_mutex_unlock(&queue->mtx); +} diff --git a/queue.h b/queue.h index 48829cb..80b1e54 100644 --- a/queue.h +++ b/queue.h @@ -2,11 +2,15 @@ #define _DRAGONSTD_QUEUE_H_ #include +#include +#include #include "list.h" typedef struct { + atomic_bool cancel; List list; + pthread_cond_t cv; pthread_mutex_t mtx; } Queue; @@ -15,5 +19,6 @@ void queue_delete(Queue *queue); void queue_enqueue(Queue *queue, void *elem); void *queue_dequeue(Queue *queue); void *queue_dequeue_callback(Queue *queue, void (*callback)(void *elem)); +void queue_cancel(Queue *queue); #endif