blob: f486b6722941119b896d97412ec425dbe2a7a9db [file] [log] [blame]
Olivier Deprezf4ef2d02021-04-20 13:36:24 +02001"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4 'Task', 'create_task',
5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep',
7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8 'current_task', 'all_tasks',
9 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
11
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import itertools
17import types
18import warnings
19import weakref
20
21from . import base_tasks
22from . import coroutines
23from . import events
24from . import exceptions
25from . import futures
26from .coroutines import _is_coroutine
27
28# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
33
34def current_task(loop=None):
35 """Return a currently executed task."""
36 if loop is None:
37 loop = events.get_running_loop()
38 return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42 """Return a set of all tasks for the loop."""
43 if loop is None:
44 loop = events.get_running_loop()
45 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46 # thread while we do so. Therefore we cast it to list prior to filtering. The list
47 # cast itself requires iteration, so we repeat it several times ignoring
48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49 # details.
50 i = 0
51 while True:
52 try:
53 tasks = list(_all_tasks)
54 except RuntimeError:
55 i += 1
56 if i >= 1000:
57 raise
58 else:
59 break
60 return {t for t in tasks
61 if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65 # Different from "all_task()" by returning *all* Tasks, including
66 # the completed ones. Used to implement deprecated "Tasks.all_task()"
67 # method.
68 if loop is None:
69 loop = events.get_event_loop()
70 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71 # thread while we do so. Therefore we cast it to list prior to filtering. The list
72 # cast itself requires iteration, so we repeat it several times ignoring
73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74 # details.
75 i = 0
76 while True:
77 try:
78 tasks = list(_all_tasks)
79 except RuntimeError:
80 i += 1
81 if i >= 1000:
82 raise
83 else:
84 break
85 return {t for t in tasks if futures._get_loop(t) is loop}
86
87
88def _set_task_name(task, name):
89 if name is not None:
90 try:
91 set_name = task.set_name
92 except AttributeError:
93 pass
94 else:
95 set_name(name)
96
97
98class Task(futures._PyFuture): # Inherit Python Task implementation
99 # from a Python Future implementation.
100
101 """A coroutine wrapped in a Future."""
102
103 # An important invariant maintained while a Task not done:
104 #
105 # - Either _fut_waiter is None, and _step() is scheduled;
106 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107 #
108 # The only transition from the latter to the former is through
109 # _wakeup(). When _fut_waiter is not None, one of its callbacks
110 # must be _wakeup().
111
112 # If False, don't log a message if the task is destroyed whereas its
113 # status is still pending
114 _log_destroy_pending = True
115
116 def __init__(self, coro, *, loop=None, name=None):
117 super().__init__(loop=loop)
118 if self._source_traceback:
119 del self._source_traceback[-1]
120 if not coroutines.iscoroutine(coro):
121 # raise after Future.__init__(), attrs are required for __del__
122 # prevent logging for pending task in __del__
123 self._log_destroy_pending = False
124 raise TypeError(f"a coroutine was expected, got {coro!r}")
125
126 if name is None:
127 self._name = f'Task-{_task_name_counter()}'
128 else:
129 self._name = str(name)
130
131 self._must_cancel = False
132 self._fut_waiter = None
133 self._coro = coro
134 self._context = contextvars.copy_context()
135
136 self._loop.call_soon(self.__step, context=self._context)
137 _register_task(self)
138
139 def __del__(self):
140 if self._state == futures._PENDING and self._log_destroy_pending:
141 context = {
142 'task': self,
143 'message': 'Task was destroyed but it is pending!',
144 }
145 if self._source_traceback:
146 context['source_traceback'] = self._source_traceback
147 self._loop.call_exception_handler(context)
148 super().__del__()
149
150 def __class_getitem__(cls, type):
151 return cls
152
153 def _repr_info(self):
154 return base_tasks._task_repr_info(self)
155
156 def get_coro(self):
157 return self._coro
158
159 def get_name(self):
160 return self._name
161
162 def set_name(self, value):
163 self._name = str(value)
164
165 def set_result(self, result):
166 raise RuntimeError('Task does not support set_result operation')
167
168 def set_exception(self, exception):
169 raise RuntimeError('Task does not support set_exception operation')
170
171 def get_stack(self, *, limit=None):
172 """Return the list of stack frames for this task's coroutine.
173
174 If the coroutine is not done, this returns the stack where it is
175 suspended. If the coroutine has completed successfully or was
176 cancelled, this returns an empty list. If the coroutine was
177 terminated by an exception, this returns the list of traceback
178 frames.
179
180 The frames are always ordered from oldest to newest.
181
182 The optional limit gives the maximum number of frames to
183 return; by default all available frames are returned. Its
184 meaning differs depending on whether a stack or a traceback is
185 returned: the newest frames of a stack are returned, but the
186 oldest frames of a traceback are returned. (This matches the
187 behavior of the traceback module.)
188
189 For reasons beyond our control, only one stack frame is
190 returned for a suspended coroutine.
191 """
192 return base_tasks._task_get_stack(self, limit)
193
194 def print_stack(self, *, limit=None, file=None):
195 """Print the stack or traceback for this task's coroutine.
196
197 This produces output similar to that of the traceback module,
198 for the frames retrieved by get_stack(). The limit argument
199 is passed to get_stack(). The file argument is an I/O stream
200 to which the output is written; by default output is written
201 to sys.stderr.
202 """
203 return base_tasks._task_print_stack(self, limit, file)
204
205 def cancel(self, msg=None):
206 """Request that this task cancel itself.
207
208 This arranges for a CancelledError to be thrown into the
209 wrapped coroutine on the next cycle through the event loop.
210 The coroutine then has a chance to clean up or even deny
211 the request using try/except/finally.
212
213 Unlike Future.cancel, this does not guarantee that the
214 task will be cancelled: the exception might be caught and
215 acted upon, delaying cancellation of the task or preventing
216 cancellation completely. The task may also return a value or
217 raise a different exception.
218
219 Immediately after this method is called, Task.cancelled() will
220 not return True (unless the task was already cancelled). A
221 task will be marked as cancelled when the wrapped coroutine
222 terminates with a CancelledError exception (even if cancel()
223 was not called).
224 """
225 self._log_traceback = False
226 if self.done():
227 return False
228 if self._fut_waiter is not None:
229 if self._fut_waiter.cancel(msg=msg):
230 # Leave self._fut_waiter; it may be a Task that
231 # catches and ignores the cancellation so we may have
232 # to cancel it again later.
233 return True
234 # It must be the case that self.__step is already scheduled.
235 self._must_cancel = True
236 self._cancel_message = msg
237 return True
238
239 def __step(self, exc=None):
240 if self.done():
241 raise exceptions.InvalidStateError(
242 f'_step(): already done: {self!r}, {exc!r}')
243 if self._must_cancel:
244 if not isinstance(exc, exceptions.CancelledError):
245 exc = self._make_cancelled_error()
246 self._must_cancel = False
247 coro = self._coro
248 self._fut_waiter = None
249
250 _enter_task(self._loop, self)
251 # Call either coro.throw(exc) or coro.send(None).
252 try:
253 if exc is None:
254 # We use the `send` method directly, because coroutines
255 # don't have `__iter__` and `__next__` methods.
256 result = coro.send(None)
257 else:
258 result = coro.throw(exc)
259 except StopIteration as exc:
260 if self._must_cancel:
261 # Task is cancelled right before coro stops.
262 self._must_cancel = False
263 super().cancel(msg=self._cancel_message)
264 else:
265 super().set_result(exc.value)
266 except exceptions.CancelledError as exc:
267 # Save the original exception so we can chain it later.
268 self._cancelled_exc = exc
269 super().cancel() # I.e., Future.cancel(self).
270 except (KeyboardInterrupt, SystemExit) as exc:
271 super().set_exception(exc)
272 raise
273 except BaseException as exc:
274 super().set_exception(exc)
275 else:
276 blocking = getattr(result, '_asyncio_future_blocking', None)
277 if blocking is not None:
278 # Yielded Future must come from Future.__iter__().
279 if futures._get_loop(result) is not self._loop:
280 new_exc = RuntimeError(
281 f'Task {self!r} got Future '
282 f'{result!r} attached to a different loop')
283 self._loop.call_soon(
284 self.__step, new_exc, context=self._context)
285 elif blocking:
286 if result is self:
287 new_exc = RuntimeError(
288 f'Task cannot await on itself: {self!r}')
289 self._loop.call_soon(
290 self.__step, new_exc, context=self._context)
291 else:
292 result._asyncio_future_blocking = False
293 result.add_done_callback(
294 self.__wakeup, context=self._context)
295 self._fut_waiter = result
296 if self._must_cancel:
297 if self._fut_waiter.cancel(
298 msg=self._cancel_message):
299 self._must_cancel = False
300 else:
301 new_exc = RuntimeError(
302 f'yield was used instead of yield from '
303 f'in task {self!r} with {result!r}')
304 self._loop.call_soon(
305 self.__step, new_exc, context=self._context)
306
307 elif result is None:
308 # Bare yield relinquishes control for one event loop iteration.
309 self._loop.call_soon(self.__step, context=self._context)
310 elif inspect.isgenerator(result):
311 # Yielding a generator is just wrong.
312 new_exc = RuntimeError(
313 f'yield was used instead of yield from for '
314 f'generator in task {self!r} with {result!r}')
315 self._loop.call_soon(
316 self.__step, new_exc, context=self._context)
317 else:
318 # Yielding something else is an error.
319 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
320 self._loop.call_soon(
321 self.__step, new_exc, context=self._context)
322 finally:
323 _leave_task(self._loop, self)
324 self = None # Needed to break cycles when an exception occurs.
325
326 def __wakeup(self, future):
327 try:
328 future.result()
329 except BaseException as exc:
330 # This may also be a cancellation.
331 self.__step(exc)
332 else:
333 # Don't pass the value of `future.result()` explicitly,
334 # as `Future.__iter__` and `Future.__await__` don't need it.
335 # If we call `_step(value, None)` instead of `_step()`,
336 # Python eval loop would use `.send(value)` method call,
337 # instead of `__next__()`, which is slower for futures
338 # that return non-generator iterators from their `__iter__`.
339 self.__step()
340 self = None # Needed to break cycles when an exception occurs.
341
342
343_PyTask = Task
344
345
346try:
347 import _asyncio
348except ImportError:
349 pass
350else:
351 # _CTask is needed for tests.
352 Task = _CTask = _asyncio.Task
353
354
355def create_task(coro, *, name=None):
356 """Schedule the execution of a coroutine object in a spawn task.
357
358 Return a Task object.
359 """
360 loop = events.get_running_loop()
361 task = loop.create_task(coro)
362 _set_task_name(task, name)
363 return task
364
365
366# wait() and as_completed() similar to those in PEP 3148.
367
368FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
369FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
370ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
371
372
373async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
374 """Wait for the Futures and coroutines given by fs to complete.
375
376 The fs iterable must not be empty.
377
378 Coroutines will be wrapped in Tasks.
379
380 Returns two sets of Future: (done, pending).
381
382 Usage:
383
384 done, pending = await asyncio.wait(fs)
385
386 Note: This does not raise TimeoutError! Futures that aren't done
387 when the timeout occurs are returned in the second set.
388 """
389 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
390 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
391 if not fs:
392 raise ValueError('Set of coroutines/Futures is empty.')
393 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
394 raise ValueError(f'Invalid return_when value: {return_when}')
395
396 if loop is None:
397 loop = events.get_running_loop()
398 else:
399 warnings.warn("The loop argument is deprecated since Python 3.8, "
400 "and scheduled for removal in Python 3.10.",
401 DeprecationWarning, stacklevel=2)
402
403 fs = set(fs)
404
405 if any(coroutines.iscoroutine(f) for f in fs):
406 warnings.warn("The explicit passing of coroutine objects to "
407 "asyncio.wait() is deprecated since Python 3.8, and "
408 "scheduled for removal in Python 3.11.",
409 DeprecationWarning, stacklevel=2)
410
411 fs = {ensure_future(f, loop=loop) for f in fs}
412
413 return await _wait(fs, timeout, return_when, loop)
414
415
416def _release_waiter(waiter, *args):
417 if not waiter.done():
418 waiter.set_result(None)
419
420
421async def wait_for(fut, timeout, *, loop=None):
422 """Wait for the single Future or coroutine to complete, with timeout.
423
424 Coroutine will be wrapped in Task.
425
426 Returns result of the Future or coroutine. When a timeout occurs,
427 it cancels the task and raises TimeoutError. To avoid the task
428 cancellation, wrap it in shield().
429
430 If the wait is cancelled, the task is also cancelled.
431
432 This function is a coroutine.
433 """
434 if loop is None:
435 loop = events.get_running_loop()
436 else:
437 warnings.warn("The loop argument is deprecated since Python 3.8, "
438 "and scheduled for removal in Python 3.10.",
439 DeprecationWarning, stacklevel=2)
440
441 if timeout is None:
442 return await fut
443
444 if timeout <= 0:
445 fut = ensure_future(fut, loop=loop)
446
447 if fut.done():
448 return fut.result()
449
450 await _cancel_and_wait(fut, loop=loop)
451 try:
452 fut.result()
453 except exceptions.CancelledError as exc:
454 raise exceptions.TimeoutError() from exc
455 else:
456 raise exceptions.TimeoutError()
457
458 waiter = loop.create_future()
459 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
460 cb = functools.partial(_release_waiter, waiter)
461
462 fut = ensure_future(fut, loop=loop)
463 fut.add_done_callback(cb)
464
465 try:
466 # wait until the future completes or the timeout
467 try:
468 await waiter
469 except exceptions.CancelledError:
470 if fut.done():
471 return fut.result()
472 else:
473 fut.remove_done_callback(cb)
474 fut.cancel()
475 raise
476
477 if fut.done():
478 return fut.result()
479 else:
480 fut.remove_done_callback(cb)
481 # We must ensure that the task is not running
482 # after wait_for() returns.
483 # See https://bugs.python.org/issue32751
484 await _cancel_and_wait(fut, loop=loop)
485 # In case task cancellation failed with some
486 # exception, we should re-raise it
487 # See https://bugs.python.org/issue40607
488 try:
489 fut.result()
490 except exceptions.CancelledError as exc:
491 raise exceptions.TimeoutError() from exc
492 else:
493 raise exceptions.TimeoutError()
494 finally:
495 timeout_handle.cancel()
496
497
498async def _wait(fs, timeout, return_when, loop):
499 """Internal helper for wait().
500
501 The fs argument must be a collection of Futures.
502 """
503 assert fs, 'Set of Futures is empty.'
504 waiter = loop.create_future()
505 timeout_handle = None
506 if timeout is not None:
507 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
508 counter = len(fs)
509
510 def _on_completion(f):
511 nonlocal counter
512 counter -= 1
513 if (counter <= 0 or
514 return_when == FIRST_COMPLETED or
515 return_when == FIRST_EXCEPTION and (not f.cancelled() and
516 f.exception() is not None)):
517 if timeout_handle is not None:
518 timeout_handle.cancel()
519 if not waiter.done():
520 waiter.set_result(None)
521
522 for f in fs:
523 f.add_done_callback(_on_completion)
524
525 try:
526 await waiter
527 finally:
528 if timeout_handle is not None:
529 timeout_handle.cancel()
530 for f in fs:
531 f.remove_done_callback(_on_completion)
532
533 done, pending = set(), set()
534 for f in fs:
535 if f.done():
536 done.add(f)
537 else:
538 pending.add(f)
539 return done, pending
540
541
542async def _cancel_and_wait(fut, loop):
543 """Cancel the *fut* future or task and wait until it completes."""
544
545 waiter = loop.create_future()
546 cb = functools.partial(_release_waiter, waiter)
547 fut.add_done_callback(cb)
548
549 try:
550 fut.cancel()
551 # We cannot wait on *fut* directly to make
552 # sure _cancel_and_wait itself is reliably cancellable.
553 await waiter
554 finally:
555 fut.remove_done_callback(cb)
556
557
558# This is *not* a @coroutine! It is just an iterator (yielding Futures).
559def as_completed(fs, *, loop=None, timeout=None):
560 """Return an iterator whose values are coroutines.
561
562 When waiting for the yielded coroutines you'll get the results (or
563 exceptions!) of the original Futures (or coroutines), in the order
564 in which and as soon as they complete.
565
566 This differs from PEP 3148; the proper way to use this is:
567
568 for f in as_completed(fs):
569 result = await f # The 'await' may raise.
570 # Use result.
571
572 If a timeout is specified, the 'await' will raise
573 TimeoutError when the timeout occurs before all Futures are done.
574
575 Note: The futures 'f' are not necessarily members of fs.
576 """
577 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
578 raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
579
580 from .queues import Queue # Import here to avoid circular import problem.
581 done = Queue(loop=loop)
582
583 if loop is None:
584 loop = events.get_event_loop()
585 else:
586 warnings.warn("The loop argument is deprecated since Python 3.8, "
587 "and scheduled for removal in Python 3.10.",
588 DeprecationWarning, stacklevel=2)
589 todo = {ensure_future(f, loop=loop) for f in set(fs)}
590 timeout_handle = None
591
592 def _on_timeout():
593 for f in todo:
594 f.remove_done_callback(_on_completion)
595 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
596 todo.clear() # Can't do todo.remove(f) in the loop.
597
598 def _on_completion(f):
599 if not todo:
600 return # _on_timeout() was here first.
601 todo.remove(f)
602 done.put_nowait(f)
603 if not todo and timeout_handle is not None:
604 timeout_handle.cancel()
605
606 async def _wait_for_one():
607 f = await done.get()
608 if f is None:
609 # Dummy value from _on_timeout().
610 raise exceptions.TimeoutError
611 return f.result() # May raise f.exception().
612
613 for f in todo:
614 f.add_done_callback(_on_completion)
615 if todo and timeout is not None:
616 timeout_handle = loop.call_later(timeout, _on_timeout)
617 for _ in range(len(todo)):
618 yield _wait_for_one()
619
620
621@types.coroutine
622def __sleep0():
623 """Skip one event loop run cycle.
624
625 This is a private helper for 'asyncio.sleep()', used
626 when the 'delay' is set to 0. It uses a bare 'yield'
627 expression (which Task.__step knows how to handle)
628 instead of creating a Future object.
629 """
630 yield
631
632
633async def sleep(delay, result=None, *, loop=None):
634 """Coroutine that completes after a given time (in seconds)."""
635 if delay <= 0:
636 await __sleep0()
637 return result
638
639 if loop is None:
640 loop = events.get_running_loop()
641 else:
642 warnings.warn("The loop argument is deprecated since Python 3.8, "
643 "and scheduled for removal in Python 3.10.",
644 DeprecationWarning, stacklevel=2)
645
646 future = loop.create_future()
647 h = loop.call_later(delay,
648 futures._set_result_unless_cancelled,
649 future, result)
650 try:
651 return await future
652 finally:
653 h.cancel()
654
655
656def ensure_future(coro_or_future, *, loop=None):
657 """Wrap a coroutine or an awaitable in a future.
658
659 If the argument is a Future, it is returned directly.
660 """
661 if coroutines.iscoroutine(coro_or_future):
662 if loop is None:
663 loop = events.get_event_loop()
664 task = loop.create_task(coro_or_future)
665 if task._source_traceback:
666 del task._source_traceback[-1]
667 return task
668 elif futures.isfuture(coro_or_future):
669 if loop is not None and loop is not futures._get_loop(coro_or_future):
670 raise ValueError('The future belongs to a different loop than '
671 'the one specified as the loop argument')
672 return coro_or_future
673 elif inspect.isawaitable(coro_or_future):
674 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
675 else:
676 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
677 'required')
678
679
680@types.coroutine
681def _wrap_awaitable(awaitable):
682 """Helper for asyncio.ensure_future().
683
684 Wraps awaitable (an object with __await__) into a coroutine
685 that will later be wrapped in a Task by ensure_future().
686 """
687 return (yield from awaitable.__await__())
688
689_wrap_awaitable._is_coroutine = _is_coroutine
690
691
692class _GatheringFuture(futures.Future):
693 """Helper for gather().
694
695 This overrides cancel() to cancel all the children and act more
696 like Task.cancel(), which doesn't immediately mark itself as
697 cancelled.
698 """
699
700 def __init__(self, children, *, loop=None):
701 super().__init__(loop=loop)
702 self._children = children
703 self._cancel_requested = False
704
705 def cancel(self, msg=None):
706 if self.done():
707 return False
708 ret = False
709 for child in self._children:
710 if child.cancel(msg=msg):
711 ret = True
712 if ret:
713 # If any child tasks were actually cancelled, we should
714 # propagate the cancellation request regardless of
715 # *return_exceptions* argument. See issue 32684.
716 self._cancel_requested = True
717 return ret
718
719
720def gather(*coros_or_futures, loop=None, return_exceptions=False):
721 """Return a future aggregating results from the given coroutines/futures.
722
723 Coroutines will be wrapped in a future and scheduled in the event
724 loop. They will not necessarily be scheduled in the same order as
725 passed in.
726
727 All futures must share the same event loop. If all the tasks are
728 done successfully, the returned future's result is the list of
729 results (in the order of the original sequence, not necessarily
730 the order of results arrival). If *return_exceptions* is True,
731 exceptions in the tasks are treated the same as successful
732 results, and gathered in the result list; otherwise, the first
733 raised exception will be immediately propagated to the returned
734 future.
735
736 Cancellation: if the outer Future is cancelled, all children (that
737 have not completed yet) are also cancelled. If any child is
738 cancelled, this is treated as if it raised CancelledError --
739 the outer Future is *not* cancelled in this case. (This is to
740 prevent the cancellation of one child to cause other children to
741 be cancelled.)
742
743 If *return_exceptions* is False, cancelling gather() after it
744 has been marked done won't cancel any submitted awaitables.
745 For instance, gather can be marked done after propagating an
746 exception to the caller, therefore, calling ``gather.cancel()``
747 after catching an exception (raised by one of the awaitables) from
748 gather won't cancel any other awaitables.
749 """
750 if not coros_or_futures:
751 if loop is None:
752 loop = events.get_event_loop()
753 else:
754 warnings.warn("The loop argument is deprecated since Python 3.8, "
755 "and scheduled for removal in Python 3.10.",
756 DeprecationWarning, stacklevel=2)
757 outer = loop.create_future()
758 outer.set_result([])
759 return outer
760
761 def _done_callback(fut):
762 nonlocal nfinished
763 nfinished += 1
764
765 if outer.done():
766 if not fut.cancelled():
767 # Mark exception retrieved.
768 fut.exception()
769 return
770
771 if not return_exceptions:
772 if fut.cancelled():
773 # Check if 'fut' is cancelled first, as
774 # 'fut.exception()' will *raise* a CancelledError
775 # instead of returning it.
776 exc = fut._make_cancelled_error()
777 outer.set_exception(exc)
778 return
779 else:
780 exc = fut.exception()
781 if exc is not None:
782 outer.set_exception(exc)
783 return
784
785 if nfinished == nfuts:
786 # All futures are done; create a list of results
787 # and set it to the 'outer' future.
788 results = []
789
790 for fut in children:
791 if fut.cancelled():
792 # Check if 'fut' is cancelled first, as 'fut.exception()'
793 # will *raise* a CancelledError instead of returning it.
794 # Also, since we're adding the exception return value
795 # to 'results' instead of raising it, don't bother
796 # setting __context__. This also lets us preserve
797 # calling '_make_cancelled_error()' at most once.
798 res = exceptions.CancelledError(
799 '' if fut._cancel_message is None else
800 fut._cancel_message)
801 else:
802 res = fut.exception()
803 if res is None:
804 res = fut.result()
805 results.append(res)
806
807 if outer._cancel_requested:
808 # If gather is being cancelled we must propagate the
809 # cancellation regardless of *return_exceptions* argument.
810 # See issue 32684.
811 exc = fut._make_cancelled_error()
812 outer.set_exception(exc)
813 else:
814 outer.set_result(results)
815
816 arg_to_fut = {}
817 children = []
818 nfuts = 0
819 nfinished = 0
820 for arg in coros_or_futures:
821 if arg not in arg_to_fut:
822 fut = ensure_future(arg, loop=loop)
823 if loop is None:
824 loop = futures._get_loop(fut)
825 if fut is not arg:
826 # 'arg' was not a Future, therefore, 'fut' is a new
827 # Future created specifically for 'arg'. Since the caller
828 # can't control it, disable the "destroy pending task"
829 # warning.
830 fut._log_destroy_pending = False
831
832 nfuts += 1
833 arg_to_fut[arg] = fut
834 fut.add_done_callback(_done_callback)
835
836 else:
837 # There's a duplicate Future object in coros_or_futures.
838 fut = arg_to_fut[arg]
839
840 children.append(fut)
841
842 outer = _GatheringFuture(children, loop=loop)
843 return outer
844
845
846def shield(arg, *, loop=None):
847 """Wait for a future, shielding it from cancellation.
848
849 The statement
850
851 res = await shield(something())
852
853 is exactly equivalent to the statement
854
855 res = await something()
856
857 *except* that if the coroutine containing it is cancelled, the
858 task running in something() is not cancelled. From the POV of
859 something(), the cancellation did not happen. But its caller is
860 still cancelled, so the yield-from expression still raises
861 CancelledError. Note: If something() is cancelled by other means
862 this will still cancel shield().
863
864 If you want to completely ignore cancellation (not recommended)
865 you can combine shield() with a try/except clause, as follows:
866
867 try:
868 res = await shield(something())
869 except CancelledError:
870 res = None
871 """
872 if loop is not None:
873 warnings.warn("The loop argument is deprecated since Python 3.8, "
874 "and scheduled for removal in Python 3.10.",
875 DeprecationWarning, stacklevel=2)
876 inner = ensure_future(arg, loop=loop)
877 if inner.done():
878 # Shortcut.
879 return inner
880 loop = futures._get_loop(inner)
881 outer = loop.create_future()
882
883 def _inner_done_callback(inner):
884 if outer.cancelled():
885 if not inner.cancelled():
886 # Mark inner's result as retrieved.
887 inner.exception()
888 return
889
890 if inner.cancelled():
891 outer.cancel()
892 else:
893 exc = inner.exception()
894 if exc is not None:
895 outer.set_exception(exc)
896 else:
897 outer.set_result(inner.result())
898
899
900 def _outer_done_callback(outer):
901 if not inner.done():
902 inner.remove_done_callback(_inner_done_callback)
903
904 inner.add_done_callback(_inner_done_callback)
905 outer.add_done_callback(_outer_done_callback)
906 return outer
907
908
909def run_coroutine_threadsafe(coro, loop):
910 """Submit a coroutine object to a given event loop.
911
912 Return a concurrent.futures.Future to access the result.
913 """
914 if not coroutines.iscoroutine(coro):
915 raise TypeError('A coroutine object is required')
916 future = concurrent.futures.Future()
917
918 def callback():
919 try:
920 futures._chain_future(ensure_future(coro, loop=loop), future)
921 except (SystemExit, KeyboardInterrupt):
922 raise
923 except BaseException as exc:
924 if future.set_running_or_notify_cancel():
925 future.set_exception(exc)
926 raise
927
928 loop.call_soon_threadsafe(callback)
929 return future
930
931
932# WeakSet containing all alive tasks.
933_all_tasks = weakref.WeakSet()
934
935# Dictionary containing tasks that are currently active in
936# all running event loops. {EventLoop: Task}
937_current_tasks = {}
938
939
940def _register_task(task):
941 """Register a new task in asyncio as executed by loop."""
942 _all_tasks.add(task)
943
944
945def _enter_task(loop, task):
946 current_task = _current_tasks.get(loop)
947 if current_task is not None:
948 raise RuntimeError(f"Cannot enter into task {task!r} while another "
949 f"task {current_task!r} is being executed.")
950 _current_tasks[loop] = task
951
952
953def _leave_task(loop, task):
954 current_task = _current_tasks.get(loop)
955 if current_task is not task:
956 raise RuntimeError(f"Leaving task {task!r} does not match "
957 f"the current task {current_task!r}.")
958 del _current_tasks[loop]
959
960
961def _unregister_task(task):
962 """Unregister a task."""
963 _all_tasks.discard(task)
964
965
966_py_register_task = _register_task
967_py_unregister_task = _unregister_task
968_py_enter_task = _enter_task
969_py_leave_task = _leave_task
970
971
972try:
973 from _asyncio import (_register_task, _unregister_task,
974 _enter_task, _leave_task,
975 _all_tasks, _current_tasks)
976except ImportError:
977 pass
978else:
979 _c_register_task = _register_task
980 _c_unregister_task = _unregister_task
981 _c_enter_task = _enter_task
982 _c_leave_task = _leave_task