-#include <stdio.h>
-#include <stdlib.h>
+#include <sched.h>
#include "queue.h"
-Queue *queue_create()
+void queue_ini(Queue *queue)
{
- Queue *queue = malloc(sizeof(Queue));
- queue->cancel = false;
- queue->list = list_create(NULL);
- pthread_cond_init(&queue->cv, NULL);
+ list_ini(&queue->lst);
+ queue->cnl = queue->fin = 0;
+ pthread_cond_init(&queue->cnd, NULL);
pthread_mutex_init(&queue->mtx, NULL);
- return queue;
}
-void queue_delete(Queue *queue)
+void queue_del(Queue *queue)
{
- pthread_cond_destroy(&queue->cv);
+ pthread_cond_destroy(&queue->cnd);
pthread_mutex_destroy(&queue->mtx);
- list_clear(&queue->list);
- free(queue);
}
-void queue_enqueue(Queue *queue, void *elem)
+void queue_clr(Queue *queue, Iterator iter, void *arg, Transformer trans)
{
- pthread_mutex_lock(&queue->mtx);
- list_put(&queue->list, elem, NULL);
- pthread_cond_signal(&queue->cv);
- pthread_mutex_unlock(&queue->mtx);
+ list_clr(&queue->lst, iter, arg, trans);
}
-void *dequeue(Queue *queue)
+bool queue_enq(Queue *queue, void *dat)
{
- return queue_dequeue_callback(queue, NULL);
+ bool success = false;
+
+ pthread_mutex_lock(&queue->mtx);
+ if (! queue->fin) {
+ success = true;
+ list_apd(&queue->lst, dat);
+ pthread_cond_signal(&queue->cnd);
+ }
+ pthread_mutex_unlock(&queue->mtx);
+
+ return success;
}
-void *queue_dequeue_callback(Queue *queue, void (*callback)(void *elem))
+void *queue_deq(Queue *queue, Transformer trans)
{
- void *elem = NULL;
-
- while (! queue->cancel && ! elem) {
- pthread_mutex_lock(&queue->mtx);
+ void *dat = NULL;
- ListPair **lptr = &queue->list.first;
- if (*lptr) {
- elem = (*lptr)->key;
- ListPair *next = (*lptr)->next;
- free(*lptr);
- *lptr = next;
+ pthread_mutex_lock(&queue->mtx);
+ while (! queue->cnl && ! dat) {
+ ListNode **node = &queue->lst.fst;
+ if (*node) {
+ dat = (*node)->dat;
+ list_nrm(&queue->lst, node);
- if (callback)
- callback(elem);
+ if (trans)
+ dat = trans(dat);
} else {
- pthread_cond_wait(&queue->cv, &queue->mtx);
+ pthread_cond_wait(&queue->cnd, &queue->mtx);
}
-
- pthread_mutex_unlock(&queue->mtx);
}
+ pthread_mutex_unlock(&queue->mtx);
- return elem;
+ return dat;
}
-void queue_cancel(Queue *queue)
+void queue_cnl(Queue *queue)
{
- queue->cancel = true;
+ pthread_mutex_lock(&queue->mtx);
+ queue->cnl = 1;
+ pthread_cond_broadcast(&queue->cnd);
+ pthread_mutex_unlock(&queue->mtx);
+}
+void queue_fin(Queue *queue)
+{
pthread_mutex_lock(&queue->mtx);
- pthread_cond_broadcast(&queue->cv);
+ queue->fin = 1;
pthread_mutex_unlock(&queue->mtx);
+
+ for (;;) {
+ pthread_mutex_lock(&queue->mtx);
+ ListNode *node = queue->lst.fst;
+ pthread_mutex_unlock(&queue->mtx);
+
+ if (node)
+ sched_yield();
+ else
+ break;
+ }
}