blob: bed4da52fd4d98b9413be0f92541d41f86cead46 [file] [log] [blame]
Olivier Deprezf4ef2d02021-04-20 13:36:24 +02001"""A Future class similar to the one in PEP 3148."""
2
3__all__ = (
4 'Future', 'wrap_future', 'isfuture',
5)
6
7import concurrent.futures
8import contextvars
9import logging
10import sys
11
12from . import base_futures
13from . import events
14from . import exceptions
15from . import format_helpers
16
17
18isfuture = base_futures.isfuture
19
20
21_PENDING = base_futures._PENDING
22_CANCELLED = base_futures._CANCELLED
23_FINISHED = base_futures._FINISHED
24
25
26STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
27
28
29class Future:
30 """This class is *almost* compatible with concurrent.futures.Future.
31
32 Differences:
33
34 - This class is not thread-safe.
35
36 - result() and exception() do not take a timeout argument and
37 raise an exception when the future isn't done yet.
38
39 - Callbacks registered with add_done_callback() are always called
40 via the event loop's call_soon().
41
42 - This class is not compatible with the wait() and as_completed()
43 methods in the concurrent.futures package.
44
45 (In Python 3.4 or later we may be able to unify the implementations.)
46 """
47
48 # Class variables serving as defaults for instance variables.
49 _state = _PENDING
50 _result = None
51 _exception = None
52 _loop = None
53 _source_traceback = None
54 _cancel_message = None
55 # A saved CancelledError for later chaining as an exception context.
56 _cancelled_exc = None
57
58 # This field is used for a dual purpose:
59 # - Its presence is a marker to declare that a class implements
60 # the Future protocol (i.e. is intended to be duck-type compatible).
61 # The value must also be not-None, to enable a subclass to declare
62 # that it is not compatible by setting this to None.
63 # - It is set by __iter__() below so that Task._step() can tell
64 # the difference between
65 # `await Future()` or`yield from Future()` (correct) vs.
66 # `yield Future()` (incorrect).
67 _asyncio_future_blocking = False
68
69 __log_traceback = False
70
71 def __init__(self, *, loop=None):
72 """Initialize the future.
73
74 The optional event_loop argument allows explicitly setting the event
75 loop object used by the future. If it's not provided, the future uses
76 the default event loop.
77 """
78 if loop is None:
79 self._loop = events.get_event_loop()
80 else:
81 self._loop = loop
82 self._callbacks = []
83 if self._loop.get_debug():
84 self._source_traceback = format_helpers.extract_stack(
85 sys._getframe(1))
86
87 _repr_info = base_futures._future_repr_info
88
89 def __repr__(self):
90 return '<{} {}>'.format(self.__class__.__name__,
91 ' '.join(self._repr_info()))
92
93 def __del__(self):
94 if not self.__log_traceback:
95 # set_exception() was not called, or result() or exception()
96 # has consumed the exception
97 return
98 exc = self._exception
99 context = {
100 'message':
101 f'{self.__class__.__name__} exception was never retrieved',
102 'exception': exc,
103 'future': self,
104 }
105 if self._source_traceback:
106 context['source_traceback'] = self._source_traceback
107 self._loop.call_exception_handler(context)
108
109 def __class_getitem__(cls, type):
110 return cls
111
112 @property
113 def _log_traceback(self):
114 return self.__log_traceback
115
116 @_log_traceback.setter
117 def _log_traceback(self, val):
118 if bool(val):
119 raise ValueError('_log_traceback can only be set to False')
120 self.__log_traceback = False
121
122 def get_loop(self):
123 """Return the event loop the Future is bound to."""
124 loop = self._loop
125 if loop is None:
126 raise RuntimeError("Future object is not initialized.")
127 return loop
128
129 def _make_cancelled_error(self):
130 """Create the CancelledError to raise if the Future is cancelled.
131
132 This should only be called once when handling a cancellation since
133 it erases the saved context exception value.
134 """
135 if self._cancel_message is None:
136 exc = exceptions.CancelledError()
137 else:
138 exc = exceptions.CancelledError(self._cancel_message)
139 exc.__context__ = self._cancelled_exc
140 # Remove the reference since we don't need this anymore.
141 self._cancelled_exc = None
142 return exc
143
144 def cancel(self, msg=None):
145 """Cancel the future and schedule callbacks.
146
147 If the future is already done or cancelled, return False. Otherwise,
148 change the future's state to cancelled, schedule the callbacks and
149 return True.
150 """
151 self.__log_traceback = False
152 if self._state != _PENDING:
153 return False
154 self._state = _CANCELLED
155 self._cancel_message = msg
156 self.__schedule_callbacks()
157 return True
158
159 def __schedule_callbacks(self):
160 """Internal: Ask the event loop to call all callbacks.
161
162 The callbacks are scheduled to be called as soon as possible. Also
163 clears the callback list.
164 """
165 callbacks = self._callbacks[:]
166 if not callbacks:
167 return
168
169 self._callbacks[:] = []
170 for callback, ctx in callbacks:
171 self._loop.call_soon(callback, self, context=ctx)
172
173 def cancelled(self):
174 """Return True if the future was cancelled."""
175 return self._state == _CANCELLED
176
177 # Don't implement running(); see http://bugs.python.org/issue18699
178
179 def done(self):
180 """Return True if the future is done.
181
182 Done means either that a result / exception are available, or that the
183 future was cancelled.
184 """
185 return self._state != _PENDING
186
187 def result(self):
188 """Return the result this future represents.
189
190 If the future has been cancelled, raises CancelledError. If the
191 future's result isn't yet available, raises InvalidStateError. If
192 the future is done and has an exception set, this exception is raised.
193 """
194 if self._state == _CANCELLED:
195 exc = self._make_cancelled_error()
196 raise exc
197 if self._state != _FINISHED:
198 raise exceptions.InvalidStateError('Result is not ready.')
199 self.__log_traceback = False
200 if self._exception is not None:
201 raise self._exception
202 return self._result
203
204 def exception(self):
205 """Return the exception that was set on this future.
206
207 The exception (or None if no exception was set) is returned only if
208 the future is done. If the future has been cancelled, raises
209 CancelledError. If the future isn't done yet, raises
210 InvalidStateError.
211 """
212 if self._state == _CANCELLED:
213 exc = self._make_cancelled_error()
214 raise exc
215 if self._state != _FINISHED:
216 raise exceptions.InvalidStateError('Exception is not set.')
217 self.__log_traceback = False
218 return self._exception
219
220 def add_done_callback(self, fn, *, context=None):
221 """Add a callback to be run when the future becomes done.
222
223 The callback is called with a single argument - the future object. If
224 the future is already done when this is called, the callback is
225 scheduled with call_soon.
226 """
227 if self._state != _PENDING:
228 self._loop.call_soon(fn, self, context=context)
229 else:
230 if context is None:
231 context = contextvars.copy_context()
232 self._callbacks.append((fn, context))
233
234 # New method not in PEP 3148.
235
236 def remove_done_callback(self, fn):
237 """Remove all instances of a callback from the "call when done" list.
238
239 Returns the number of callbacks removed.
240 """
241 filtered_callbacks = [(f, ctx)
242 for (f, ctx) in self._callbacks
243 if f != fn]
244 removed_count = len(self._callbacks) - len(filtered_callbacks)
245 if removed_count:
246 self._callbacks[:] = filtered_callbacks
247 return removed_count
248
249 # So-called internal methods (note: no set_running_or_notify_cancel()).
250
251 def set_result(self, result):
252 """Mark the future done and set its result.
253
254 If the future is already done when this method is called, raises
255 InvalidStateError.
256 """
257 if self._state != _PENDING:
258 raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
259 self._result = result
260 self._state = _FINISHED
261 self.__schedule_callbacks()
262
263 def set_exception(self, exception):
264 """Mark the future done and set an exception.
265
266 If the future is already done when this method is called, raises
267 InvalidStateError.
268 """
269 if self._state != _PENDING:
270 raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
271 if isinstance(exception, type):
272 exception = exception()
273 if type(exception) is StopIteration:
274 raise TypeError("StopIteration interacts badly with generators "
275 "and cannot be raised into a Future")
276 self._exception = exception
277 self._state = _FINISHED
278 self.__schedule_callbacks()
279 self.__log_traceback = True
280
281 def __await__(self):
282 if not self.done():
283 self._asyncio_future_blocking = True
284 yield self # This tells Task to wait for completion.
285 if not self.done():
286 raise RuntimeError("await wasn't used with future")
287 return self.result() # May raise too.
288
289 __iter__ = __await__ # make compatible with 'yield from'.
290
291
292# Needed for testing purposes.
293_PyFuture = Future
294
295
296def _get_loop(fut):
297 # Tries to call Future.get_loop() if it's available.
298 # Otherwise fallbacks to using the old '_loop' property.
299 try:
300 get_loop = fut.get_loop
301 except AttributeError:
302 pass
303 else:
304 return get_loop()
305 return fut._loop
306
307
308def _set_result_unless_cancelled(fut, result):
309 """Helper setting the result only if the future was not cancelled."""
310 if fut.cancelled():
311 return
312 fut.set_result(result)
313
314
315def _convert_future_exc(exc):
316 exc_class = type(exc)
317 if exc_class is concurrent.futures.CancelledError:
318 return exceptions.CancelledError(*exc.args)
319 elif exc_class is concurrent.futures.TimeoutError:
320 return exceptions.TimeoutError(*exc.args)
321 elif exc_class is concurrent.futures.InvalidStateError:
322 return exceptions.InvalidStateError(*exc.args)
323 else:
324 return exc
325
326
327def _set_concurrent_future_state(concurrent, source):
328 """Copy state from a future to a concurrent.futures.Future."""
329 assert source.done()
330 if source.cancelled():
331 concurrent.cancel()
332 if not concurrent.set_running_or_notify_cancel():
333 return
334 exception = source.exception()
335 if exception is not None:
336 concurrent.set_exception(_convert_future_exc(exception))
337 else:
338 result = source.result()
339 concurrent.set_result(result)
340
341
342def _copy_future_state(source, dest):
343 """Internal helper to copy state from another Future.
344
345 The other Future may be a concurrent.futures.Future.
346 """
347 assert source.done()
348 if dest.cancelled():
349 return
350 assert not dest.done()
351 if source.cancelled():
352 dest.cancel()
353 else:
354 exception = source.exception()
355 if exception is not None:
356 dest.set_exception(_convert_future_exc(exception))
357 else:
358 result = source.result()
359 dest.set_result(result)
360
361
362def _chain_future(source, destination):
363 """Chain two futures so that when one completes, so does the other.
364
365 The result (or exception) of source will be copied to destination.
366 If destination is cancelled, source gets cancelled too.
367 Compatible with both asyncio.Future and concurrent.futures.Future.
368 """
369 if not isfuture(source) and not isinstance(source,
370 concurrent.futures.Future):
371 raise TypeError('A future is required for source argument')
372 if not isfuture(destination) and not isinstance(destination,
373 concurrent.futures.Future):
374 raise TypeError('A future is required for destination argument')
375 source_loop = _get_loop(source) if isfuture(source) else None
376 dest_loop = _get_loop(destination) if isfuture(destination) else None
377
378 def _set_state(future, other):
379 if isfuture(future):
380 _copy_future_state(other, future)
381 else:
382 _set_concurrent_future_state(future, other)
383
384 def _call_check_cancel(destination):
385 if destination.cancelled():
386 if source_loop is None or source_loop is dest_loop:
387 source.cancel()
388 else:
389 source_loop.call_soon_threadsafe(source.cancel)
390
391 def _call_set_state(source):
392 if (destination.cancelled() and
393 dest_loop is not None and dest_loop.is_closed()):
394 return
395 if dest_loop is None or dest_loop is source_loop:
396 _set_state(destination, source)
397 else:
398 dest_loop.call_soon_threadsafe(_set_state, destination, source)
399
400 destination.add_done_callback(_call_check_cancel)
401 source.add_done_callback(_call_set_state)
402
403
404def wrap_future(future, *, loop=None):
405 """Wrap concurrent.futures.Future object."""
406 if isfuture(future):
407 return future
408 assert isinstance(future, concurrent.futures.Future), \
409 f'concurrent.futures.Future is expected, got {future!r}'
410 if loop is None:
411 loop = events.get_event_loop()
412 new_future = loop.create_future()
413 _chain_future(future, new_future)
414 return new_future
415
416
417try:
418 import _asyncio
419except ImportError:
420 pass
421else:
422 # _CFuture is needed for tests.
423 Future = _CFuture = _asyncio.Future