blob: cd3f7c6a56789152052a2d00143b852bca384e0e [file] [log] [blame]
Olivier Deprezf4ef2d02021-04-20 13:36:24 +02001__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
2
3import collections
4import heapq
5import warnings
6
7from . import events
8from . import locks
9
10
11class QueueEmpty(Exception):
12 """Raised when Queue.get_nowait() is called on an empty Queue."""
13 pass
14
15
16class QueueFull(Exception):
17 """Raised when the Queue.put_nowait() method is called on a full Queue."""
18 pass
19
20
21class Queue:
22 """A queue, useful for coordinating producer and consumer coroutines.
23
24 If maxsize is less than or equal to zero, the queue size is infinite. If it
25 is an integer greater than 0, then "await put()" will block when the
26 queue reaches maxsize, until an item is removed by get().
27
28 Unlike the standard library Queue, you can reliably know this Queue's size
29 with qsize(), since your single-threaded asyncio application won't be
30 interrupted between calling qsize() and doing an operation on the Queue.
31 """
32
33 def __init__(self, maxsize=0, *, loop=None):
34 if loop is None:
35 self._loop = events.get_event_loop()
36 else:
37 self._loop = loop
38 warnings.warn("The loop argument is deprecated since Python 3.8, "
39 "and scheduled for removal in Python 3.10.",
40 DeprecationWarning, stacklevel=2)
41 self._maxsize = maxsize
42
43 # Futures.
44 self._getters = collections.deque()
45 # Futures.
46 self._putters = collections.deque()
47 self._unfinished_tasks = 0
48 self._finished = locks.Event(loop=loop)
49 self._finished.set()
50 self._init(maxsize)
51
52 # These three are overridable in subclasses.
53
54 def _init(self, maxsize):
55 self._queue = collections.deque()
56
57 def _get(self):
58 return self._queue.popleft()
59
60 def _put(self, item):
61 self._queue.append(item)
62
63 # End of the overridable methods.
64
65 def _wakeup_next(self, waiters):
66 # Wake up the next waiter (if any) that isn't cancelled.
67 while waiters:
68 waiter = waiters.popleft()
69 if not waiter.done():
70 waiter.set_result(None)
71 break
72
73 def __repr__(self):
74 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
75
76 def __str__(self):
77 return f'<{type(self).__name__} {self._format()}>'
78
79 def __class_getitem__(cls, type):
80 return cls
81
82 def _format(self):
83 result = f'maxsize={self._maxsize!r}'
84 if getattr(self, '_queue', None):
85 result += f' _queue={list(self._queue)!r}'
86 if self._getters:
87 result += f' _getters[{len(self._getters)}]'
88 if self._putters:
89 result += f' _putters[{len(self._putters)}]'
90 if self._unfinished_tasks:
91 result += f' tasks={self._unfinished_tasks}'
92 return result
93
94 def qsize(self):
95 """Number of items in the queue."""
96 return len(self._queue)
97
98 @property
99 def maxsize(self):
100 """Number of items allowed in the queue."""
101 return self._maxsize
102
103 def empty(self):
104 """Return True if the queue is empty, False otherwise."""
105 return not self._queue
106
107 def full(self):
108 """Return True if there are maxsize items in the queue.
109
110 Note: if the Queue was initialized with maxsize=0 (the default),
111 then full() is never True.
112 """
113 if self._maxsize <= 0:
114 return False
115 else:
116 return self.qsize() >= self._maxsize
117
118 async def put(self, item):
119 """Put an item into the queue.
120
121 Put an item into the queue. If the queue is full, wait until a free
122 slot is available before adding item.
123 """
124 while self.full():
125 putter = self._loop.create_future()
126 self._putters.append(putter)
127 try:
128 await putter
129 except:
130 putter.cancel() # Just in case putter is not done yet.
131 try:
132 # Clean self._putters from canceled putters.
133 self._putters.remove(putter)
134 except ValueError:
135 # The putter could be removed from self._putters by a
136 # previous get_nowait call.
137 pass
138 if not self.full() and not putter.cancelled():
139 # We were woken up by get_nowait(), but can't take
140 # the call. Wake up the next in line.
141 self._wakeup_next(self._putters)
142 raise
143 return self.put_nowait(item)
144
145 def put_nowait(self, item):
146 """Put an item into the queue without blocking.
147
148 If no free slot is immediately available, raise QueueFull.
149 """
150 if self.full():
151 raise QueueFull
152 self._put(item)
153 self._unfinished_tasks += 1
154 self._finished.clear()
155 self._wakeup_next(self._getters)
156
157 async def get(self):
158 """Remove and return an item from the queue.
159
160 If queue is empty, wait until an item is available.
161 """
162 while self.empty():
163 getter = self._loop.create_future()
164 self._getters.append(getter)
165 try:
166 await getter
167 except:
168 getter.cancel() # Just in case getter is not done yet.
169 try:
170 # Clean self._getters from canceled getters.
171 self._getters.remove(getter)
172 except ValueError:
173 # The getter could be removed from self._getters by a
174 # previous put_nowait call.
175 pass
176 if not self.empty() and not getter.cancelled():
177 # We were woken up by put_nowait(), but can't take
178 # the call. Wake up the next in line.
179 self._wakeup_next(self._getters)
180 raise
181 return self.get_nowait()
182
183 def get_nowait(self):
184 """Remove and return an item from the queue.
185
186 Return an item if one is immediately available, else raise QueueEmpty.
187 """
188 if self.empty():
189 raise QueueEmpty
190 item = self._get()
191 self._wakeup_next(self._putters)
192 return item
193
194 def task_done(self):
195 """Indicate that a formerly enqueued task is complete.
196
197 Used by queue consumers. For each get() used to fetch a task,
198 a subsequent call to task_done() tells the queue that the processing
199 on the task is complete.
200
201 If a join() is currently blocking, it will resume when all items have
202 been processed (meaning that a task_done() call was received for every
203 item that had been put() into the queue).
204
205 Raises ValueError if called more times than there were items placed in
206 the queue.
207 """
208 if self._unfinished_tasks <= 0:
209 raise ValueError('task_done() called too many times')
210 self._unfinished_tasks -= 1
211 if self._unfinished_tasks == 0:
212 self._finished.set()
213
214 async def join(self):
215 """Block until all items in the queue have been gotten and processed.
216
217 The count of unfinished tasks goes up whenever an item is added to the
218 queue. The count goes down whenever a consumer calls task_done() to
219 indicate that the item was retrieved and all work on it is complete.
220 When the count of unfinished tasks drops to zero, join() unblocks.
221 """
222 if self._unfinished_tasks > 0:
223 await self._finished.wait()
224
225
226class PriorityQueue(Queue):
227 """A subclass of Queue; retrieves entries in priority order (lowest first).
228
229 Entries are typically tuples of the form: (priority number, data).
230 """
231
232 def _init(self, maxsize):
233 self._queue = []
234
235 def _put(self, item, heappush=heapq.heappush):
236 heappush(self._queue, item)
237
238 def _get(self, heappop=heapq.heappop):
239 return heappop(self._queue)
240
241
242class LifoQueue(Queue):
243 """A subclass of Queue that retrieves most recently added entries first."""
244
245 def _init(self, maxsize):
246 self._queue = []
247
248 def _put(self, item):
249 self._queue.append(item)
250
251 def _get(self):
252 return self._queue.pop()