Queue *queue_create()
{
Queue *queue = malloc(sizeof(Queue));
+ queue->finish = false;
queue->cancel = false;
queue->list = list_create(NULL);
pthread_cond_init(&queue->cv, NULL);
void queue_enqueue(Queue *queue, void *elem)
{
pthread_mutex_lock(&queue->mtx);
- list_put(&queue->list, elem, NULL);
- pthread_cond_signal(&queue->cv);
+ if (! queue->finish) {
+ list_put(&queue->list, elem, NULL);
+ pthread_cond_signal(&queue->cv);
+ }
pthread_mutex_unlock(&queue->mtx);
}
-void *dequeue(Queue *queue)
+void *queue_dequeue(Queue *queue)
{
return queue_dequeue_callback(queue, NULL);
}
pthread_cond_broadcast(&queue->cv);
pthread_mutex_unlock(&queue->mtx);
}
+
+void queue_finish(Queue *queue)
+{
+ pthread_mutex_lock(&queue->mtx);
+ queue->finish = true;
+ pthread_mutex_unlock(&queue->mtx);
+
+ while (true) {
+ pthread_mutex_lock(&queue->mtx);
+ ListPair *first = queue->list.first;
+ pthread_mutex_unlock(&queue->mtx);
+
+ if (first)
+ sched_yield();
+ else
+ break;
+ }
+}
typedef struct
{
atomic_bool cancel;
+ bool finish;
List list;
pthread_cond_t cv;
pthread_mutex_t mtx;
void queue_enqueue(Queue *queue, void *elem);
void *queue_dequeue(Queue *queue);
void *queue_dequeue_callback(Queue *queue, void (*callback)(void *elem));
-void queue_cancel(Queue *queue);
+void queue_cancel(Queue *queue); // disallow dequeing
+void queue_finish(Queue *queue); // disallow enqueing, wait until consumption finished
#endif