]> git.lizzy.rs Git - plan9front.git/blob - sys/lib/python/Queue.py
turn ptrdiff_t into a 64 bit type
[plan9front.git] / sys / lib / python / Queue.py
1 """A multi-producer, multi-consumer queue."""
2
3 from time import time as _time
4 from collections import deque
5
6 __all__ = ['Empty', 'Full', 'Queue']
7
8 class Empty(Exception):
9     "Exception raised by Queue.get(block=0)/get_nowait()."
10     pass
11
12 class Full(Exception):
13     "Exception raised by Queue.put(block=0)/put_nowait()."
14     pass
15
16 class Queue:
17     """Create a queue object with a given maximum size.
18
19     If maxsize is <= 0, the queue size is infinite.
20     """
21     def __init__(self, maxsize=0):
22         try:
23             import threading
24         except ImportError:
25             import dummy_threading as threading
26         self._init(maxsize)
27         # mutex must be held whenever the queue is mutating.  All methods
28         # that acquire mutex must release it before returning.  mutex
29         # is shared between the three conditions, so acquiring and
30         # releasing the conditions also acquires and releases mutex.
31         self.mutex = threading.Lock()
32         # Notify not_empty whenever an item is added to the queue; a
33         # thread waiting to get is notified then.
34         self.not_empty = threading.Condition(self.mutex)
35         # Notify not_full whenever an item is removed from the queue;
36         # a thread waiting to put is notified then.
37         self.not_full = threading.Condition(self.mutex)
38         # Notify all_tasks_done whenever the number of unfinished tasks
39         # drops to zero; thread waiting to join() is notified to resume
40         self.all_tasks_done = threading.Condition(self.mutex)
41         self.unfinished_tasks = 0
42
43     def task_done(self):
44         """Indicate that a formerly enqueued task is complete.
45
46         Used by Queue consumer threads.  For each get() used to fetch a task,
47         a subsequent call to task_done() tells the queue that the processing
48         on the task is complete.
49
50         If a join() is currently blocking, it will resume when all items
51         have been processed (meaning that a task_done() call was received
52         for every item that had been put() into the queue).
53
54         Raises a ValueError if called more times than there were items
55         placed in the queue.
56         """
57         self.all_tasks_done.acquire()
58         try:
59             unfinished = self.unfinished_tasks - 1
60             if unfinished <= 0:
61                 if unfinished < 0:
62                     raise ValueError('task_done() called too many times')
63                 self.all_tasks_done.notifyAll()
64             self.unfinished_tasks = unfinished
65         finally:
66             self.all_tasks_done.release()
67
68     def join(self):
69         """Blocks until all items in the Queue have been gotten and processed.
70
71         The count of unfinished tasks goes up whenever an item is added to the
72         queue. The count goes down whenever a consumer thread calls task_done()
73         to indicate the item was retrieved and all work on it is complete.
74
75         When the count of unfinished tasks drops to zero, join() unblocks.
76         """
77         self.all_tasks_done.acquire()
78         try:
79             while self.unfinished_tasks:
80                 self.all_tasks_done.wait()
81         finally:
82             self.all_tasks_done.release()
83
84     def qsize(self):
85         """Return the approximate size of the queue (not reliable!)."""
86         self.mutex.acquire()
87         n = self._qsize()
88         self.mutex.release()
89         return n
90
91     def empty(self):
92         """Return True if the queue is empty, False otherwise (not reliable!)."""
93         self.mutex.acquire()
94         n = self._empty()
95         self.mutex.release()
96         return n
97
98     def full(self):
99         """Return True if the queue is full, False otherwise (not reliable!)."""
100         self.mutex.acquire()
101         n = self._full()
102         self.mutex.release()
103         return n
104
105     def put(self, item, block=True, timeout=None):
106         """Put an item into the queue.
107
108         If optional args 'block' is true and 'timeout' is None (the default),
109         block if necessary until a free slot is available. If 'timeout' is
110         a positive number, it blocks at most 'timeout' seconds and raises
111         the Full exception if no free slot was available within that time.
112         Otherwise ('block' is false), put an item on the queue if a free slot
113         is immediately available, else raise the Full exception ('timeout'
114         is ignored in that case).
115         """
116         self.not_full.acquire()
117         try:
118             if not block:
119                 if self._full():
120                     raise Full
121             elif timeout is None:
122                 while self._full():
123                     self.not_full.wait()
124             else:
125                 if timeout < 0:
126                     raise ValueError("'timeout' must be a positive number")
127                 endtime = _time() + timeout
128                 while self._full():
129                     remaining = endtime - _time()
130                     if remaining <= 0.0:
131                         raise Full
132                     self.not_full.wait(remaining)
133             self._put(item)
134             self.unfinished_tasks += 1
135             self.not_empty.notify()
136         finally:
137             self.not_full.release()
138
139     def put_nowait(self, item):
140         """Put an item into the queue without blocking.
141
142         Only enqueue the item if a free slot is immediately available.
143         Otherwise raise the Full exception.
144         """
145         return self.put(item, False)
146
147     def get(self, block=True, timeout=None):
148         """Remove and return an item from the queue.
149
150         If optional args 'block' is true and 'timeout' is None (the default),
151         block if necessary until an item is available. If 'timeout' is
152         a positive number, it blocks at most 'timeout' seconds and raises
153         the Empty exception if no item was available within that time.
154         Otherwise ('block' is false), return an item if one is immediately
155         available, else raise the Empty exception ('timeout' is ignored
156         in that case).
157         """
158         self.not_empty.acquire()
159         try:
160             if not block:
161                 if self._empty():
162                     raise Empty
163             elif timeout is None:
164                 while self._empty():
165                     self.not_empty.wait()
166             else:
167                 if timeout < 0:
168                     raise ValueError("'timeout' must be a positive number")
169                 endtime = _time() + timeout
170                 while self._empty():
171                     remaining = endtime - _time()
172                     if remaining <= 0.0:
173                         raise Empty
174                     self.not_empty.wait(remaining)
175             item = self._get()
176             self.not_full.notify()
177             return item
178         finally:
179             self.not_empty.release()
180
181     def get_nowait(self):
182         """Remove and return an item from the queue without blocking.
183
184         Only get an item if one is immediately available. Otherwise
185         raise the Empty exception.
186         """
187         return self.get(False)
188
189     # Override these methods to implement other queue organizations
190     # (e.g. stack or priority queue).
191     # These will only be called with appropriate locks held
192
193     # Initialize the queue representation
194     def _init(self, maxsize):
195         self.maxsize = maxsize
196         self.queue = deque()
197
198     def _qsize(self):
199         return len(self.queue)
200
201     # Check whether the queue is empty
202     def _empty(self):
203         return not self.queue
204
205     # Check whether the queue is full
206     def _full(self):
207         return self.maxsize > 0 and len(self.queue) == self.maxsize
208
209     # Put a new item in the queue
210     def _put(self, item):
211         self.queue.append(item)
212
213     # Get an item from the queue
214     def _get(self):
215         return self.queue.popleft()