blob: b495a5f6ccc017eed3859be17843acbd7c857fca [file] [log] [blame]
Olivier Deprezf4ef2d02021-04-20 13:36:24 +02001# mock.py
2# Test tools for mocking and patching.
3# Maintained by Michael Foord
4# Backport for other versions of Python available from
5# https://pypi.org/project/mock
6
7__all__ = (
8 'Mock',
9 'MagicMock',
10 'patch',
11 'sentinel',
12 'DEFAULT',
13 'ANY',
14 'call',
15 'create_autospec',
16 'AsyncMock',
17 'FILTER_DIR',
18 'NonCallableMock',
19 'NonCallableMagicMock',
20 'mock_open',
21 'PropertyMock',
22 'seal',
23)
24
25
26import asyncio
27import contextlib
28import io
29import inspect
30import pprint
31import sys
32import builtins
33from asyncio import iscoroutinefunction
34from types import CodeType, ModuleType, MethodType
35from unittest.util import safe_repr
36from functools import wraps, partial
37
38
39_builtins = {name for name in dir(builtins) if not name.startswith('_')}
40
41FILTER_DIR = True
42
43# Workaround for issue #12370
44# Without this, the __class__ properties wouldn't be set correctly
45_safe_super = super
46
47def _is_async_obj(obj):
48 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
49 return False
50 if hasattr(obj, '__func__'):
51 obj = getattr(obj, '__func__')
52 return iscoroutinefunction(obj) or inspect.isawaitable(obj)
53
54
55def _is_async_func(func):
56 if getattr(func, '__code__', None):
57 return iscoroutinefunction(func)
58 else:
59 return False
60
61
62def _is_instance_mock(obj):
63 # can't use isinstance on Mock objects because they override __class__
64 # The base class for all mocks is NonCallableMock
65 return issubclass(type(obj), NonCallableMock)
66
67
68def _is_exception(obj):
69 return (
70 isinstance(obj, BaseException) or
71 isinstance(obj, type) and issubclass(obj, BaseException)
72 )
73
74
75def _extract_mock(obj):
76 # Autospecced functions will return a FunctionType with "mock" attribute
77 # which is the actual mock object that needs to be used.
78 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
79 return obj.mock
80 else:
81 return obj
82
83
84def _get_signature_object(func, as_instance, eat_self):
85 """
86 Given an arbitrary, possibly callable object, try to create a suitable
87 signature object.
88 Return a (reduced func, signature) tuple, or None.
89 """
90 if isinstance(func, type) and not as_instance:
91 # If it's a type and should be modelled as a type, use __init__.
92 func = func.__init__
93 # Skip the `self` argument in __init__
94 eat_self = True
95 elif not isinstance(func, FunctionTypes):
96 # If we really want to model an instance of the passed type,
97 # __call__ should be looked up, not __init__.
98 try:
99 func = func.__call__
100 except AttributeError:
101 return None
102 if eat_self:
103 sig_func = partial(func, None)
104 else:
105 sig_func = func
106 try:
107 return func, inspect.signature(sig_func)
108 except ValueError:
109 # Certain callable types are not supported by inspect.signature()
110 return None
111
112
113def _check_signature(func, mock, skipfirst, instance=False):
114 sig = _get_signature_object(func, instance, skipfirst)
115 if sig is None:
116 return
117 func, sig = sig
118 def checksig(self, /, *args, **kwargs):
119 sig.bind(*args, **kwargs)
120 _copy_func_details(func, checksig)
121 type(mock)._mock_check_sig = checksig
122 type(mock).__signature__ = sig
123
124
125def _copy_func_details(func, funcopy):
126 # we explicitly don't copy func.__dict__ into this copy as it would
127 # expose original attributes that should be mocked
128 for attribute in (
129 '__name__', '__doc__', '__text_signature__',
130 '__module__', '__defaults__', '__kwdefaults__',
131 ):
132 try:
133 setattr(funcopy, attribute, getattr(func, attribute))
134 except AttributeError:
135 pass
136
137
138def _callable(obj):
139 if isinstance(obj, type):
140 return True
141 if isinstance(obj, (staticmethod, classmethod, MethodType)):
142 return _callable(obj.__func__)
143 if getattr(obj, '__call__', None) is not None:
144 return True
145 return False
146
147
148def _is_list(obj):
149 # checks for list or tuples
150 # XXXX badly named!
151 return type(obj) in (list, tuple)
152
153
154def _instance_callable(obj):
155 """Given an object, return True if the object is callable.
156 For classes, return True if instances would be callable."""
157 if not isinstance(obj, type):
158 # already an instance
159 return getattr(obj, '__call__', None) is not None
160
161 # *could* be broken by a class overriding __mro__ or __dict__ via
162 # a metaclass
163 for base in (obj,) + obj.__mro__:
164 if base.__dict__.get('__call__') is not None:
165 return True
166 return False
167
168
169def _set_signature(mock, original, instance=False):
170 # creates a function with signature (*args, **kwargs) that delegates to a
171 # mock. It still does signature checking by calling a lambda with the same
172 # signature as the original.
173
174 skipfirst = isinstance(original, type)
175 result = _get_signature_object(original, instance, skipfirst)
176 if result is None:
177 return mock
178 func, sig = result
179 def checksig(*args, **kwargs):
180 sig.bind(*args, **kwargs)
181 _copy_func_details(func, checksig)
182
183 name = original.__name__
184 if not name.isidentifier():
185 name = 'funcopy'
186 context = {'_checksig_': checksig, 'mock': mock}
187 src = """def %s(*args, **kwargs):
188 _checksig_(*args, **kwargs)
189 return mock(*args, **kwargs)""" % name
190 exec (src, context)
191 funcopy = context[name]
192 _setup_func(funcopy, mock, sig)
193 return funcopy
194
195
196def _setup_func(funcopy, mock, sig):
197 funcopy.mock = mock
198
199 def assert_called_with(*args, **kwargs):
200 return mock.assert_called_with(*args, **kwargs)
201 def assert_called(*args, **kwargs):
202 return mock.assert_called(*args, **kwargs)
203 def assert_not_called(*args, **kwargs):
204 return mock.assert_not_called(*args, **kwargs)
205 def assert_called_once(*args, **kwargs):
206 return mock.assert_called_once(*args, **kwargs)
207 def assert_called_once_with(*args, **kwargs):
208 return mock.assert_called_once_with(*args, **kwargs)
209 def assert_has_calls(*args, **kwargs):
210 return mock.assert_has_calls(*args, **kwargs)
211 def assert_any_call(*args, **kwargs):
212 return mock.assert_any_call(*args, **kwargs)
213 def reset_mock():
214 funcopy.method_calls = _CallList()
215 funcopy.mock_calls = _CallList()
216 mock.reset_mock()
217 ret = funcopy.return_value
218 if _is_instance_mock(ret) and not ret is mock:
219 ret.reset_mock()
220
221 funcopy.called = False
222 funcopy.call_count = 0
223 funcopy.call_args = None
224 funcopy.call_args_list = _CallList()
225 funcopy.method_calls = _CallList()
226 funcopy.mock_calls = _CallList()
227
228 funcopy.return_value = mock.return_value
229 funcopy.side_effect = mock.side_effect
230 funcopy._mock_children = mock._mock_children
231
232 funcopy.assert_called_with = assert_called_with
233 funcopy.assert_called_once_with = assert_called_once_with
234 funcopy.assert_has_calls = assert_has_calls
235 funcopy.assert_any_call = assert_any_call
236 funcopy.reset_mock = reset_mock
237 funcopy.assert_called = assert_called
238 funcopy.assert_not_called = assert_not_called
239 funcopy.assert_called_once = assert_called_once
240 funcopy.__signature__ = sig
241
242 mock._mock_delegate = funcopy
243
244
245def _setup_async_mock(mock):
246 mock._is_coroutine = asyncio.coroutines._is_coroutine
247 mock.await_count = 0
248 mock.await_args = None
249 mock.await_args_list = _CallList()
250
251 # Mock is not configured yet so the attributes are set
252 # to a function and then the corresponding mock helper function
253 # is called when the helper is accessed similar to _setup_func.
254 def wrapper(attr, /, *args, **kwargs):
255 return getattr(mock.mock, attr)(*args, **kwargs)
256
257 for attribute in ('assert_awaited',
258 'assert_awaited_once',
259 'assert_awaited_with',
260 'assert_awaited_once_with',
261 'assert_any_await',
262 'assert_has_awaits',
263 'assert_not_awaited'):
264
265 # setattr(mock, attribute, wrapper) causes late binding
266 # hence attribute will always be the last value in the loop
267 # Use partial(wrapper, attribute) to ensure the attribute is bound
268 # correctly.
269 setattr(mock, attribute, partial(wrapper, attribute))
270
271
272def _is_magic(name):
273 return '__%s__' % name[2:-2] == name
274
275
276class _SentinelObject(object):
277 "A unique, named, sentinel object."
278 def __init__(self, name):
279 self.name = name
280
281 def __repr__(self):
282 return 'sentinel.%s' % self.name
283
284 def __reduce__(self):
285 return 'sentinel.%s' % self.name
286
287
288class _Sentinel(object):
289 """Access attributes to return a named object, usable as a sentinel."""
290 def __init__(self):
291 self._sentinels = {}
292
293 def __getattr__(self, name):
294 if name == '__bases__':
295 # Without this help(unittest.mock) raises an exception
296 raise AttributeError
297 return self._sentinels.setdefault(name, _SentinelObject(name))
298
299 def __reduce__(self):
300 return 'sentinel'
301
302
303sentinel = _Sentinel()
304
305DEFAULT = sentinel.DEFAULT
306_missing = sentinel.MISSING
307_deleted = sentinel.DELETED
308
309
310_allowed_names = {
311 'return_value', '_mock_return_value', 'side_effect',
312 '_mock_side_effect', '_mock_parent', '_mock_new_parent',
313 '_mock_name', '_mock_new_name'
314}
315
316
317def _delegating_property(name):
318 _allowed_names.add(name)
319 _the_name = '_mock_' + name
320 def _get(self, name=name, _the_name=_the_name):
321 sig = self._mock_delegate
322 if sig is None:
323 return getattr(self, _the_name)
324 return getattr(sig, name)
325 def _set(self, value, name=name, _the_name=_the_name):
326 sig = self._mock_delegate
327 if sig is None:
328 self.__dict__[_the_name] = value
329 else:
330 setattr(sig, name, value)
331
332 return property(_get, _set)
333
334
335
336class _CallList(list):
337
338 def __contains__(self, value):
339 if not isinstance(value, list):
340 return list.__contains__(self, value)
341 len_value = len(value)
342 len_self = len(self)
343 if len_value > len_self:
344 return False
345
346 for i in range(0, len_self - len_value + 1):
347 sub_list = self[i:i+len_value]
348 if sub_list == value:
349 return True
350 return False
351
352 def __repr__(self):
353 return pprint.pformat(list(self))
354
355
356def _check_and_set_parent(parent, value, name, new_name):
357 value = _extract_mock(value)
358
359 if not _is_instance_mock(value):
360 return False
361 if ((value._mock_name or value._mock_new_name) or
362 (value._mock_parent is not None) or
363 (value._mock_new_parent is not None)):
364 return False
365
366 _parent = parent
367 while _parent is not None:
368 # setting a mock (value) as a child or return value of itself
369 # should not modify the mock
370 if _parent is value:
371 return False
372 _parent = _parent._mock_new_parent
373
374 if new_name:
375 value._mock_new_parent = parent
376 value._mock_new_name = new_name
377 if name:
378 value._mock_parent = parent
379 value._mock_name = name
380 return True
381
382# Internal class to identify if we wrapped an iterator object or not.
383class _MockIter(object):
384 def __init__(self, obj):
385 self.obj = iter(obj)
386 def __next__(self):
387 return next(self.obj)
388
389class Base(object):
390 _mock_return_value = DEFAULT
391 _mock_side_effect = None
392 def __init__(self, /, *args, **kwargs):
393 pass
394
395
396
397class NonCallableMock(Base):
398 """A non-callable version of `Mock`"""
399
400 def __new__(cls, /, *args, **kw):
401 # every instance has its own class
402 # so we can create magic methods on the
403 # class without stomping on other mocks
404 bases = (cls,)
405 if not issubclass(cls, AsyncMockMixin):
406 # Check if spec is an async object or function
407 bound_args = _MOCK_SIG.bind_partial(cls, *args, **kw).arguments
408 spec_arg = bound_args.get('spec_set', bound_args.get('spec'))
409 if spec_arg and _is_async_obj(spec_arg):
410 bases = (AsyncMockMixin, cls)
411 new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
412 instance = _safe_super(NonCallableMock, cls).__new__(new)
413 return instance
414
415
416 def __init__(
417 self, spec=None, wraps=None, name=None, spec_set=None,
418 parent=None, _spec_state=None, _new_name='', _new_parent=None,
419 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
420 ):
421 if _new_parent is None:
422 _new_parent = parent
423
424 __dict__ = self.__dict__
425 __dict__['_mock_parent'] = parent
426 __dict__['_mock_name'] = name
427 __dict__['_mock_new_name'] = _new_name
428 __dict__['_mock_new_parent'] = _new_parent
429 __dict__['_mock_sealed'] = False
430
431 if spec_set is not None:
432 spec = spec_set
433 spec_set = True
434 if _eat_self is None:
435 _eat_self = parent is not None
436
437 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
438
439 __dict__['_mock_children'] = {}
440 __dict__['_mock_wraps'] = wraps
441 __dict__['_mock_delegate'] = None
442
443 __dict__['_mock_called'] = False
444 __dict__['_mock_call_args'] = None
445 __dict__['_mock_call_count'] = 0
446 __dict__['_mock_call_args_list'] = _CallList()
447 __dict__['_mock_mock_calls'] = _CallList()
448
449 __dict__['method_calls'] = _CallList()
450 __dict__['_mock_unsafe'] = unsafe
451
452 if kwargs:
453 self.configure_mock(**kwargs)
454
455 _safe_super(NonCallableMock, self).__init__(
456 spec, wraps, name, spec_set, parent,
457 _spec_state
458 )
459
460
461 def attach_mock(self, mock, attribute):
462 """
463 Attach a mock as an attribute of this one, replacing its name and
464 parent. Calls to the attached mock will be recorded in the
465 `method_calls` and `mock_calls` attributes of this one."""
466 inner_mock = _extract_mock(mock)
467
468 inner_mock._mock_parent = None
469 inner_mock._mock_new_parent = None
470 inner_mock._mock_name = ''
471 inner_mock._mock_new_name = None
472
473 setattr(self, attribute, mock)
474
475
476 def mock_add_spec(self, spec, spec_set=False):
477 """Add a spec to a mock. `spec` can either be an object or a
478 list of strings. Only attributes on the `spec` can be fetched as
479 attributes from the mock.
480
481 If `spec_set` is True then only attributes on the spec can be set."""
482 self._mock_add_spec(spec, spec_set)
483
484
485 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
486 _eat_self=False):
487 _spec_class = None
488 _spec_signature = None
489 _spec_asyncs = []
490
491 for attr in dir(spec):
492 if iscoroutinefunction(getattr(spec, attr, None)):
493 _spec_asyncs.append(attr)
494
495 if spec is not None and not _is_list(spec):
496 if isinstance(spec, type):
497 _spec_class = spec
498 else:
499 _spec_class = type(spec)
500 res = _get_signature_object(spec,
501 _spec_as_instance, _eat_self)
502 _spec_signature = res and res[1]
503
504 spec = dir(spec)
505
506 __dict__ = self.__dict__
507 __dict__['_spec_class'] = _spec_class
508 __dict__['_spec_set'] = spec_set
509 __dict__['_spec_signature'] = _spec_signature
510 __dict__['_mock_methods'] = spec
511 __dict__['_spec_asyncs'] = _spec_asyncs
512
513 def __get_return_value(self):
514 ret = self._mock_return_value
515 if self._mock_delegate is not None:
516 ret = self._mock_delegate.return_value
517
518 if ret is DEFAULT:
519 ret = self._get_child_mock(
520 _new_parent=self, _new_name='()'
521 )
522 self.return_value = ret
523 return ret
524
525
526 def __set_return_value(self, value):
527 if self._mock_delegate is not None:
528 self._mock_delegate.return_value = value
529 else:
530 self._mock_return_value = value
531 _check_and_set_parent(self, value, None, '()')
532
533 __return_value_doc = "The value to be returned when the mock is called."
534 return_value = property(__get_return_value, __set_return_value,
535 __return_value_doc)
536
537
538 @property
539 def __class__(self):
540 if self._spec_class is None:
541 return type(self)
542 return self._spec_class
543
544 called = _delegating_property('called')
545 call_count = _delegating_property('call_count')
546 call_args = _delegating_property('call_args')
547 call_args_list = _delegating_property('call_args_list')
548 mock_calls = _delegating_property('mock_calls')
549
550
551 def __get_side_effect(self):
552 delegated = self._mock_delegate
553 if delegated is None:
554 return self._mock_side_effect
555 sf = delegated.side_effect
556 if (sf is not None and not callable(sf)
557 and not isinstance(sf, _MockIter) and not _is_exception(sf)):
558 sf = _MockIter(sf)
559 delegated.side_effect = sf
560 return sf
561
562 def __set_side_effect(self, value):
563 value = _try_iter(value)
564 delegated = self._mock_delegate
565 if delegated is None:
566 self._mock_side_effect = value
567 else:
568 delegated.side_effect = value
569
570 side_effect = property(__get_side_effect, __set_side_effect)
571
572
573 def reset_mock(self, visited=None,*, return_value=False, side_effect=False):
574 "Restore the mock object to its initial state."
575 if visited is None:
576 visited = []
577 if id(self) in visited:
578 return
579 visited.append(id(self))
580
581 self.called = False
582 self.call_args = None
583 self.call_count = 0
584 self.mock_calls = _CallList()
585 self.call_args_list = _CallList()
586 self.method_calls = _CallList()
587
588 if return_value:
589 self._mock_return_value = DEFAULT
590 if side_effect:
591 self._mock_side_effect = None
592
593 for child in self._mock_children.values():
594 if isinstance(child, _SpecState) or child is _deleted:
595 continue
596 child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
597
598 ret = self._mock_return_value
599 if _is_instance_mock(ret) and ret is not self:
600 ret.reset_mock(visited)
601
602
603 def configure_mock(self, /, **kwargs):
604 """Set attributes on the mock through keyword arguments.
605
606 Attributes plus return values and side effects can be set on child
607 mocks using standard dot notation and unpacking a dictionary in the
608 method call:
609
610 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
611 >>> mock.configure_mock(**attrs)"""
612 for arg, val in sorted(kwargs.items(),
613 # we sort on the number of dots so that
614 # attributes are set before we set attributes on
615 # attributes
616 key=lambda entry: entry[0].count('.')):
617 args = arg.split('.')
618 final = args.pop()
619 obj = self
620 for entry in args:
621 obj = getattr(obj, entry)
622 setattr(obj, final, val)
623
624
625 def __getattr__(self, name):
626 if name in {'_mock_methods', '_mock_unsafe'}:
627 raise AttributeError(name)
628 elif self._mock_methods is not None:
629 if name not in self._mock_methods or name in _all_magics:
630 raise AttributeError("Mock object has no attribute %r" % name)
631 elif _is_magic(name):
632 raise AttributeError(name)
633 if not self._mock_unsafe:
634 if name.startswith(('assert', 'assret')):
635 raise AttributeError("Attributes cannot start with 'assert' "
636 "or 'assret'")
637
638 result = self._mock_children.get(name)
639 if result is _deleted:
640 raise AttributeError(name)
641 elif result is None:
642 wraps = None
643 if self._mock_wraps is not None:
644 # XXXX should we get the attribute without triggering code
645 # execution?
646 wraps = getattr(self._mock_wraps, name)
647
648 result = self._get_child_mock(
649 parent=self, name=name, wraps=wraps, _new_name=name,
650 _new_parent=self
651 )
652 self._mock_children[name] = result
653
654 elif isinstance(result, _SpecState):
655 result = create_autospec(
656 result.spec, result.spec_set, result.instance,
657 result.parent, result.name
658 )
659 self._mock_children[name] = result
660
661 return result
662
663
664 def _extract_mock_name(self):
665 _name_list = [self._mock_new_name]
666 _parent = self._mock_new_parent
667 last = self
668
669 dot = '.'
670 if _name_list == ['()']:
671 dot = ''
672
673 while _parent is not None:
674 last = _parent
675
676 _name_list.append(_parent._mock_new_name + dot)
677 dot = '.'
678 if _parent._mock_new_name == '()':
679 dot = ''
680
681 _parent = _parent._mock_new_parent
682
683 _name_list = list(reversed(_name_list))
684 _first = last._mock_name or 'mock'
685 if len(_name_list) > 1:
686 if _name_list[1] not in ('()', '().'):
687 _first += '.'
688 _name_list[0] = _first
689 return ''.join(_name_list)
690
691 def __repr__(self):
692 name = self._extract_mock_name()
693
694 name_string = ''
695 if name not in ('mock', 'mock.'):
696 name_string = ' name=%r' % name
697
698 spec_string = ''
699 if self._spec_class is not None:
700 spec_string = ' spec=%r'
701 if self._spec_set:
702 spec_string = ' spec_set=%r'
703 spec_string = spec_string % self._spec_class.__name__
704 return "<%s%s%s id='%s'>" % (
705 type(self).__name__,
706 name_string,
707 spec_string,
708 id(self)
709 )
710
711
712 def __dir__(self):
713 """Filter the output of `dir(mock)` to only useful members."""
714 if not FILTER_DIR:
715 return object.__dir__(self)
716
717 extras = self._mock_methods or []
718 from_type = dir(type(self))
719 from_dict = list(self.__dict__)
720 from_child_mocks = [
721 m_name for m_name, m_value in self._mock_children.items()
722 if m_value is not _deleted]
723
724 from_type = [e for e in from_type if not e.startswith('_')]
725 from_dict = [e for e in from_dict if not e.startswith('_') or
726 _is_magic(e)]
727 return sorted(set(extras + from_type + from_dict + from_child_mocks))
728
729
730 def __setattr__(self, name, value):
731 if name in _allowed_names:
732 # property setters go through here
733 return object.__setattr__(self, name, value)
734 elif (self._spec_set and self._mock_methods is not None and
735 name not in self._mock_methods and
736 name not in self.__dict__):
737 raise AttributeError("Mock object has no attribute '%s'" % name)
738 elif name in _unsupported_magics:
739 msg = 'Attempting to set unsupported magic method %r.' % name
740 raise AttributeError(msg)
741 elif name in _all_magics:
742 if self._mock_methods is not None and name not in self._mock_methods:
743 raise AttributeError("Mock object has no attribute '%s'" % name)
744
745 if not _is_instance_mock(value):
746 setattr(type(self), name, _get_method(name, value))
747 original = value
748 value = lambda *args, **kw: original(self, *args, **kw)
749 else:
750 # only set _new_name and not name so that mock_calls is tracked
751 # but not method calls
752 _check_and_set_parent(self, value, None, name)
753 setattr(type(self), name, value)
754 self._mock_children[name] = value
755 elif name == '__class__':
756 self._spec_class = value
757 return
758 else:
759 if _check_and_set_parent(self, value, name, name):
760 self._mock_children[name] = value
761
762 if self._mock_sealed and not hasattr(self, name):
763 mock_name = f'{self._extract_mock_name()}.{name}'
764 raise AttributeError(f'Cannot set {mock_name}')
765
766 return object.__setattr__(self, name, value)
767
768
769 def __delattr__(self, name):
770 if name in _all_magics and name in type(self).__dict__:
771 delattr(type(self), name)
772 if name not in self.__dict__:
773 # for magic methods that are still MagicProxy objects and
774 # not set on the instance itself
775 return
776
777 obj = self._mock_children.get(name, _missing)
778 if name in self.__dict__:
779 _safe_super(NonCallableMock, self).__delattr__(name)
780 elif obj is _deleted:
781 raise AttributeError(name)
782 if obj is not _missing:
783 del self._mock_children[name]
784 self._mock_children[name] = _deleted
785
786
787 def _format_mock_call_signature(self, args, kwargs):
788 name = self._mock_name or 'mock'
789 return _format_call_signature(name, args, kwargs)
790
791
792 def _format_mock_failure_message(self, args, kwargs, action='call'):
793 message = 'expected %s not found.\nExpected: %s\nActual: %s'
794 expected_string = self._format_mock_call_signature(args, kwargs)
795 call_args = self.call_args
796 actual_string = self._format_mock_call_signature(*call_args)
797 return message % (action, expected_string, actual_string)
798
799
800 def _get_call_signature_from_name(self, name):
801 """
802 * If call objects are asserted against a method/function like obj.meth1
803 then there could be no name for the call object to lookup. Hence just
804 return the spec_signature of the method/function being asserted against.
805 * If the name is not empty then remove () and split by '.' to get
806 list of names to iterate through the children until a potential
807 match is found. A child mock is created only during attribute access
808 so if we get a _SpecState then no attributes of the spec were accessed
809 and can be safely exited.
810 """
811 if not name:
812 return self._spec_signature
813
814 sig = None
815 names = name.replace('()', '').split('.')
816 children = self._mock_children
817
818 for name in names:
819 child = children.get(name)
820 if child is None or isinstance(child, _SpecState):
821 break
822 else:
823 # If an autospecced object is attached using attach_mock the
824 # child would be a function with mock object as attribute from
825 # which signature has to be derived.
826 child = _extract_mock(child)
827 children = child._mock_children
828 sig = child._spec_signature
829
830 return sig
831
832
833 def _call_matcher(self, _call):
834 """
835 Given a call (or simply an (args, kwargs) tuple), return a
836 comparison key suitable for matching with other calls.
837 This is a best effort method which relies on the spec's signature,
838 if available, or falls back on the arguments themselves.
839 """
840
841 if isinstance(_call, tuple) and len(_call) > 2:
842 sig = self._get_call_signature_from_name(_call[0])
843 else:
844 sig = self._spec_signature
845
846 if sig is not None:
847 if len(_call) == 2:
848 name = ''
849 args, kwargs = _call
850 else:
851 name, args, kwargs = _call
852 try:
853 bound_call = sig.bind(*args, **kwargs)
854 return call(name, bound_call.args, bound_call.kwargs)
855 except TypeError as e:
856 return e.with_traceback(None)
857 else:
858 return _call
859
860 def assert_not_called(self):
861 """assert that the mock was never called.
862 """
863 if self.call_count != 0:
864 msg = ("Expected '%s' to not have been called. Called %s times.%s"
865 % (self._mock_name or 'mock',
866 self.call_count,
867 self._calls_repr()))
868 raise AssertionError(msg)
869
870 def assert_called(self):
871 """assert that the mock was called at least once
872 """
873 if self.call_count == 0:
874 msg = ("Expected '%s' to have been called." %
875 (self._mock_name or 'mock'))
876 raise AssertionError(msg)
877
878 def assert_called_once(self):
879 """assert that the mock was called only once.
880 """
881 if not self.call_count == 1:
882 msg = ("Expected '%s' to have been called once. Called %s times.%s"
883 % (self._mock_name or 'mock',
884 self.call_count,
885 self._calls_repr()))
886 raise AssertionError(msg)
887
888 def assert_called_with(self, /, *args, **kwargs):
889 """assert that the last call was made with the specified arguments.
890
891 Raises an AssertionError if the args and keyword args passed in are
892 different to the last call to the mock."""
893 if self.call_args is None:
894 expected = self._format_mock_call_signature(args, kwargs)
895 actual = 'not called.'
896 error_message = ('expected call not found.\nExpected: %s\nActual: %s'
897 % (expected, actual))
898 raise AssertionError(error_message)
899
900 def _error_message():
901 msg = self._format_mock_failure_message(args, kwargs)
902 return msg
903 expected = self._call_matcher(_Call((args, kwargs), two=True))
904 actual = self._call_matcher(self.call_args)
905 if actual != expected:
906 cause = expected if isinstance(expected, Exception) else None
907 raise AssertionError(_error_message()) from cause
908
909
910 def assert_called_once_with(self, /, *args, **kwargs):
911 """assert that the mock was called exactly once and that that call was
912 with the specified arguments."""
913 if not self.call_count == 1:
914 msg = ("Expected '%s' to be called once. Called %s times.%s"
915 % (self._mock_name or 'mock',
916 self.call_count,
917 self._calls_repr()))
918 raise AssertionError(msg)
919 return self.assert_called_with(*args, **kwargs)
920
921
922 def assert_has_calls(self, calls, any_order=False):
923 """assert the mock has been called with the specified calls.
924 The `mock_calls` list is checked for the calls.
925
926 If `any_order` is False (the default) then the calls must be
927 sequential. There can be extra calls before or after the
928 specified calls.
929
930 If `any_order` is True then the calls can be in any order, but
931 they must all appear in `mock_calls`."""
932 expected = [self._call_matcher(c) for c in calls]
933 cause = next((e for e in expected if isinstance(e, Exception)), None)
934 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
935 if not any_order:
936 if expected not in all_calls:
937 if cause is None:
938 problem = 'Calls not found.'
939 else:
940 problem = ('Error processing expected calls.\n'
941 'Errors: {}').format(
942 [e if isinstance(e, Exception) else None
943 for e in expected])
944 raise AssertionError(
945 f'{problem}\n'
946 f'Expected: {_CallList(calls)}'
947 f'{self._calls_repr(prefix="Actual").rstrip(".")}'
948 ) from cause
949 return
950
951 all_calls = list(all_calls)
952
953 not_found = []
954 for kall in expected:
955 try:
956 all_calls.remove(kall)
957 except ValueError:
958 not_found.append(kall)
959 if not_found:
960 raise AssertionError(
961 '%r does not contain all of %r in its call list, '
962 'found %r instead' % (self._mock_name or 'mock',
963 tuple(not_found), all_calls)
964 ) from cause
965
966
967 def assert_any_call(self, /, *args, **kwargs):
968 """assert the mock has been called with the specified arguments.
969
970 The assert passes if the mock has *ever* been called, unlike
971 `assert_called_with` and `assert_called_once_with` that only pass if
972 the call is the most recent one."""
973 expected = self._call_matcher(_Call((args, kwargs), two=True))
974 cause = expected if isinstance(expected, Exception) else None
975 actual = [self._call_matcher(c) for c in self.call_args_list]
976 if cause or expected not in _AnyComparer(actual):
977 expected_string = self._format_mock_call_signature(args, kwargs)
978 raise AssertionError(
979 '%s call not found' % expected_string
980 ) from cause
981
982
983 def _get_child_mock(self, /, **kw):
984 """Create the child mocks for attributes and return value.
985 By default child mocks will be the same type as the parent.
986 Subclasses of Mock may want to override this to customize the way
987 child mocks are made.
988
989 For non-callable mocks the callable variant will be used (rather than
990 any custom subclass)."""
991 _new_name = kw.get("_new_name")
992 if _new_name in self.__dict__['_spec_asyncs']:
993 return AsyncMock(**kw)
994
995 _type = type(self)
996 if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
997 # Any asynchronous magic becomes an AsyncMock
998 klass = AsyncMock
999 elif issubclass(_type, AsyncMockMixin):
1000 if (_new_name in _all_sync_magics or
1001 self._mock_methods and _new_name in self._mock_methods):
1002 # Any synchronous method on AsyncMock becomes a MagicMock
1003 klass = MagicMock
1004 else:
1005 klass = AsyncMock
1006 elif not issubclass(_type, CallableMixin):
1007 if issubclass(_type, NonCallableMagicMock):
1008 klass = MagicMock
1009 elif issubclass(_type, NonCallableMock):
1010 klass = Mock
1011 else:
1012 klass = _type.__mro__[1]
1013
1014 if self._mock_sealed:
1015 attribute = "." + kw["name"] if "name" in kw else "()"
1016 mock_name = self._extract_mock_name() + attribute
1017 raise AttributeError(mock_name)
1018
1019 return klass(**kw)
1020
1021
1022 def _calls_repr(self, prefix="Calls"):
1023 """Renders self.mock_calls as a string.
1024
1025 Example: "\nCalls: [call(1), call(2)]."
1026
1027 If self.mock_calls is empty, an empty string is returned. The
1028 output will be truncated if very long.
1029 """
1030 if not self.mock_calls:
1031 return ""
1032 return f"\n{prefix}: {safe_repr(self.mock_calls)}."
1033
1034
1035_MOCK_SIG = inspect.signature(NonCallableMock.__init__)
1036
1037
1038class _AnyComparer(list):
1039 """A list which checks if it contains a call which may have an
1040 argument of ANY, flipping the components of item and self from
1041 their traditional locations so that ANY is guaranteed to be on
1042 the left."""
1043 def __contains__(self, item):
1044 for _call in self:
1045 assert len(item) == len(_call)
1046 if all([
1047 expected == actual
1048 for expected, actual in zip(item, _call)
1049 ]):
1050 return True
1051 return False
1052
1053
1054def _try_iter(obj):
1055 if obj is None:
1056 return obj
1057 if _is_exception(obj):
1058 return obj
1059 if _callable(obj):
1060 return obj
1061 try:
1062 return iter(obj)
1063 except TypeError:
1064 # XXXX backwards compatibility
1065 # but this will blow up on first call - so maybe we should fail early?
1066 return obj
1067
1068
1069class CallableMixin(Base):
1070
1071 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1072 wraps=None, name=None, spec_set=None, parent=None,
1073 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1074 self.__dict__['_mock_return_value'] = return_value
1075 _safe_super(CallableMixin, self).__init__(
1076 spec, wraps, name, spec_set, parent,
1077 _spec_state, _new_name, _new_parent, **kwargs
1078 )
1079
1080 self.side_effect = side_effect
1081
1082
1083 def _mock_check_sig(self, /, *args, **kwargs):
1084 # stub method that can be replaced with one with a specific signature
1085 pass
1086
1087
1088 def __call__(self, /, *args, **kwargs):
1089 # can't use self in-case a function / method we are mocking uses self
1090 # in the signature
1091 self._mock_check_sig(*args, **kwargs)
1092 self._increment_mock_call(*args, **kwargs)
1093 return self._mock_call(*args, **kwargs)
1094
1095
1096 def _mock_call(self, /, *args, **kwargs):
1097 return self._execute_mock_call(*args, **kwargs)
1098
1099 def _increment_mock_call(self, /, *args, **kwargs):
1100 self.called = True
1101 self.call_count += 1
1102
1103 # handle call_args
1104 # needs to be set here so assertions on call arguments pass before
1105 # execution in the case of awaited calls
1106 _call = _Call((args, kwargs), two=True)
1107 self.call_args = _call
1108 self.call_args_list.append(_call)
1109
1110 # initial stuff for method_calls:
1111 do_method_calls = self._mock_parent is not None
1112 method_call_name = self._mock_name
1113
1114 # initial stuff for mock_calls:
1115 mock_call_name = self._mock_new_name
1116 is_a_call = mock_call_name == '()'
1117 self.mock_calls.append(_Call(('', args, kwargs)))
1118
1119 # follow up the chain of mocks:
1120 _new_parent = self._mock_new_parent
1121 while _new_parent is not None:
1122
1123 # handle method_calls:
1124 if do_method_calls:
1125 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1126 do_method_calls = _new_parent._mock_parent is not None
1127 if do_method_calls:
1128 method_call_name = _new_parent._mock_name + '.' + method_call_name
1129
1130 # handle mock_calls:
1131 this_mock_call = _Call((mock_call_name, args, kwargs))
1132 _new_parent.mock_calls.append(this_mock_call)
1133
1134 if _new_parent._mock_new_name:
1135 if is_a_call:
1136 dot = ''
1137 else:
1138 dot = '.'
1139 is_a_call = _new_parent._mock_new_name == '()'
1140 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1141
1142 # follow the parental chain:
1143 _new_parent = _new_parent._mock_new_parent
1144
1145 def _execute_mock_call(self, /, *args, **kwargs):
1146 # separate from _increment_mock_call so that awaited functions are
1147 # executed separately from their call, also AsyncMock overrides this method
1148
1149 effect = self.side_effect
1150 if effect is not None:
1151 if _is_exception(effect):
1152 raise effect
1153 elif not _callable(effect):
1154 result = next(effect)
1155 if _is_exception(result):
1156 raise result
1157 else:
1158 result = effect(*args, **kwargs)
1159
1160 if result is not DEFAULT:
1161 return result
1162
1163 if self._mock_return_value is not DEFAULT:
1164 return self.return_value
1165
1166 if self._mock_wraps is not None:
1167 return self._mock_wraps(*args, **kwargs)
1168
1169 return self.return_value
1170
1171
1172
1173class Mock(CallableMixin, NonCallableMock):
1174 """
1175 Create a new `Mock` object. `Mock` takes several optional arguments
1176 that specify the behaviour of the Mock object:
1177
1178 * `spec`: This can be either a list of strings or an existing object (a
1179 class or instance) that acts as the specification for the mock object. If
1180 you pass in an object then a list of strings is formed by calling dir on
1181 the object (excluding unsupported magic attributes and methods). Accessing
1182 any attribute not in this list will raise an `AttributeError`.
1183
1184 If `spec` is an object (rather than a list of strings) then
1185 `mock.__class__` returns the class of the spec object. This allows mocks
1186 to pass `isinstance` tests.
1187
1188 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1189 or get an attribute on the mock that isn't on the object passed as
1190 `spec_set` will raise an `AttributeError`.
1191
1192 * `side_effect`: A function to be called whenever the Mock is called. See
1193 the `side_effect` attribute. Useful for raising exceptions or
1194 dynamically changing return values. The function is called with the same
1195 arguments as the mock, and unless it returns `DEFAULT`, the return
1196 value of this function is used as the return value.
1197
1198 If `side_effect` is an iterable then each call to the mock will return
1199 the next value from the iterable. If any of the members of the iterable
1200 are exceptions they will be raised instead of returned.
1201
1202 * `return_value`: The value returned when the mock is called. By default
1203 this is a new Mock (created on first access). See the
1204 `return_value` attribute.
1205
1206 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1207 calling the Mock will pass the call through to the wrapped object
1208 (returning the real result). Attribute access on the mock will return a
1209 Mock object that wraps the corresponding attribute of the wrapped object
1210 (so attempting to access an attribute that doesn't exist will raise an
1211 `AttributeError`).
1212
1213 If the mock has an explicit `return_value` set then calls are not passed
1214 to the wrapped object and the `return_value` is returned instead.
1215
1216 * `name`: If the mock has a name then it will be used in the repr of the
1217 mock. This can be useful for debugging. The name is propagated to child
1218 mocks.
1219
1220 Mocks can also be called with arbitrary keyword arguments. These will be
1221 used to set attributes on the mock after it is created.
1222 """
1223
1224
1225def _dot_lookup(thing, comp, import_path):
1226 try:
1227 return getattr(thing, comp)
1228 except AttributeError:
1229 __import__(import_path)
1230 return getattr(thing, comp)
1231
1232
1233def _importer(target):
1234 components = target.split('.')
1235 import_path = components.pop(0)
1236 thing = __import__(import_path)
1237
1238 for comp in components:
1239 import_path += ".%s" % comp
1240 thing = _dot_lookup(thing, comp, import_path)
1241 return thing
1242
1243
1244class _patch(object):
1245
1246 attribute_name = None
1247 _active_patches = []
1248
1249 def __init__(
1250 self, getter, attribute, new, spec, create,
1251 spec_set, autospec, new_callable, kwargs
1252 ):
1253 if new_callable is not None:
1254 if new is not DEFAULT:
1255 raise ValueError(
1256 "Cannot use 'new' and 'new_callable' together"
1257 )
1258 if autospec is not None:
1259 raise ValueError(
1260 "Cannot use 'autospec' and 'new_callable' together"
1261 )
1262
1263 self.getter = getter
1264 self.attribute = attribute
1265 self.new = new
1266 self.new_callable = new_callable
1267 self.spec = spec
1268 self.create = create
1269 self.has_local = False
1270 self.spec_set = spec_set
1271 self.autospec = autospec
1272 self.kwargs = kwargs
1273 self.additional_patchers = []
1274
1275
1276 def copy(self):
1277 patcher = _patch(
1278 self.getter, self.attribute, self.new, self.spec,
1279 self.create, self.spec_set,
1280 self.autospec, self.new_callable, self.kwargs
1281 )
1282 patcher.attribute_name = self.attribute_name
1283 patcher.additional_patchers = [
1284 p.copy() for p in self.additional_patchers
1285 ]
1286 return patcher
1287
1288
1289 def __call__(self, func):
1290 if isinstance(func, type):
1291 return self.decorate_class(func)
1292 if inspect.iscoroutinefunction(func):
1293 return self.decorate_async_callable(func)
1294 return self.decorate_callable(func)
1295
1296
1297 def decorate_class(self, klass):
1298 for attr in dir(klass):
1299 if not attr.startswith(patch.TEST_PREFIX):
1300 continue
1301
1302 attr_value = getattr(klass, attr)
1303 if not hasattr(attr_value, "__call__"):
1304 continue
1305
1306 patcher = self.copy()
1307 setattr(klass, attr, patcher(attr_value))
1308 return klass
1309
1310
1311 @contextlib.contextmanager
1312 def decoration_helper(self, patched, args, keywargs):
1313 extra_args = []
1314 with contextlib.ExitStack() as exit_stack:
1315 for patching in patched.patchings:
1316 arg = exit_stack.enter_context(patching)
1317 if patching.attribute_name is not None:
1318 keywargs.update(arg)
1319 elif patching.new is DEFAULT:
1320 extra_args.append(arg)
1321
1322 args += tuple(extra_args)
1323 yield (args, keywargs)
1324
1325
1326 def decorate_callable(self, func):
1327 # NB. Keep the method in sync with decorate_async_callable()
1328 if hasattr(func, 'patchings'):
1329 func.patchings.append(self)
1330 return func
1331
1332 @wraps(func)
1333 def patched(*args, **keywargs):
1334 with self.decoration_helper(patched,
1335 args,
1336 keywargs) as (newargs, newkeywargs):
1337 return func(*newargs, **newkeywargs)
1338
1339 patched.patchings = [self]
1340 return patched
1341
1342
1343 def decorate_async_callable(self, func):
1344 # NB. Keep the method in sync with decorate_callable()
1345 if hasattr(func, 'patchings'):
1346 func.patchings.append(self)
1347 return func
1348
1349 @wraps(func)
1350 async def patched(*args, **keywargs):
1351 with self.decoration_helper(patched,
1352 args,
1353 keywargs) as (newargs, newkeywargs):
1354 return await func(*newargs, **newkeywargs)
1355
1356 patched.patchings = [self]
1357 return patched
1358
1359
1360 def get_original(self):
1361 target = self.getter()
1362 name = self.attribute
1363
1364 original = DEFAULT
1365 local = False
1366
1367 try:
1368 original = target.__dict__[name]
1369 except (AttributeError, KeyError):
1370 original = getattr(target, name, DEFAULT)
1371 else:
1372 local = True
1373
1374 if name in _builtins and isinstance(target, ModuleType):
1375 self.create = True
1376
1377 if not self.create and original is DEFAULT:
1378 raise AttributeError(
1379 "%s does not have the attribute %r" % (target, name)
1380 )
1381 return original, local
1382
1383
1384 def __enter__(self):
1385 """Perform the patch."""
1386 new, spec, spec_set = self.new, self.spec, self.spec_set
1387 autospec, kwargs = self.autospec, self.kwargs
1388 new_callable = self.new_callable
1389 self.target = self.getter()
1390
1391 # normalise False to None
1392 if spec is False:
1393 spec = None
1394 if spec_set is False:
1395 spec_set = None
1396 if autospec is False:
1397 autospec = None
1398
1399 if spec is not None and autospec is not None:
1400 raise TypeError("Can't specify spec and autospec")
1401 if ((spec is not None or autospec is not None) and
1402 spec_set not in (True, None)):
1403 raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1404
1405 original, local = self.get_original()
1406
1407 if new is DEFAULT and autospec is None:
1408 inherit = False
1409 if spec is True:
1410 # set spec to the object we are replacing
1411 spec = original
1412 if spec_set is True:
1413 spec_set = original
1414 spec = None
1415 elif spec is not None:
1416 if spec_set is True:
1417 spec_set = spec
1418 spec = None
1419 elif spec_set is True:
1420 spec_set = original
1421
1422 if spec is not None or spec_set is not None:
1423 if original is DEFAULT:
1424 raise TypeError("Can't use 'spec' with create=True")
1425 if isinstance(original, type):
1426 # If we're patching out a class and there is a spec
1427 inherit = True
1428 if spec is None and _is_async_obj(original):
1429 Klass = AsyncMock
1430 else:
1431 Klass = MagicMock
1432 _kwargs = {}
1433 if new_callable is not None:
1434 Klass = new_callable
1435 elif spec is not None or spec_set is not None:
1436 this_spec = spec
1437 if spec_set is not None:
1438 this_spec = spec_set
1439 if _is_list(this_spec):
1440 not_callable = '__call__' not in this_spec
1441 else:
1442 not_callable = not callable(this_spec)
1443 if _is_async_obj(this_spec):
1444 Klass = AsyncMock
1445 elif not_callable:
1446 Klass = NonCallableMagicMock
1447
1448 if spec is not None:
1449 _kwargs['spec'] = spec
1450 if spec_set is not None:
1451 _kwargs['spec_set'] = spec_set
1452
1453 # add a name to mocks
1454 if (isinstance(Klass, type) and
1455 issubclass(Klass, NonCallableMock) and self.attribute):
1456 _kwargs['name'] = self.attribute
1457
1458 _kwargs.update(kwargs)
1459 new = Klass(**_kwargs)
1460
1461 if inherit and _is_instance_mock(new):
1462 # we can only tell if the instance should be callable if the
1463 # spec is not a list
1464 this_spec = spec
1465 if spec_set is not None:
1466 this_spec = spec_set
1467 if (not _is_list(this_spec) and not
1468 _instance_callable(this_spec)):
1469 Klass = NonCallableMagicMock
1470
1471 _kwargs.pop('name')
1472 new.return_value = Klass(_new_parent=new, _new_name='()',
1473 **_kwargs)
1474 elif autospec is not None:
1475 # spec is ignored, new *must* be default, spec_set is treated
1476 # as a boolean. Should we check spec is not None and that spec_set
1477 # is a bool?
1478 if new is not DEFAULT:
1479 raise TypeError(
1480 "autospec creates the mock for you. Can't specify "
1481 "autospec and new."
1482 )
1483 if original is DEFAULT:
1484 raise TypeError("Can't use 'autospec' with create=True")
1485 spec_set = bool(spec_set)
1486 if autospec is True:
1487 autospec = original
1488
1489 new = create_autospec(autospec, spec_set=spec_set,
1490 _name=self.attribute, **kwargs)
1491 elif kwargs:
1492 # can't set keyword args when we aren't creating the mock
1493 # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1494 raise TypeError("Can't pass kwargs to a mock we aren't creating")
1495
1496 new_attr = new
1497
1498 self.temp_original = original
1499 self.is_local = local
1500 self._exit_stack = contextlib.ExitStack()
1501 try:
1502 setattr(self.target, self.attribute, new_attr)
1503 if self.attribute_name is not None:
1504 extra_args = {}
1505 if self.new is DEFAULT:
1506 extra_args[self.attribute_name] = new
1507 for patching in self.additional_patchers:
1508 arg = self._exit_stack.enter_context(patching)
1509 if patching.new is DEFAULT:
1510 extra_args.update(arg)
1511 return extra_args
1512
1513 return new
1514 except:
1515 if not self.__exit__(*sys.exc_info()):
1516 raise
1517
1518 def __exit__(self, *exc_info):
1519 """Undo the patch."""
1520 if self.is_local and self.temp_original is not DEFAULT:
1521 setattr(self.target, self.attribute, self.temp_original)
1522 else:
1523 delattr(self.target, self.attribute)
1524 if not self.create and (not hasattr(self.target, self.attribute) or
1525 self.attribute in ('__doc__', '__module__',
1526 '__defaults__', '__annotations__',
1527 '__kwdefaults__')):
1528 # needed for proxy objects like django settings
1529 setattr(self.target, self.attribute, self.temp_original)
1530
1531 del self.temp_original
1532 del self.is_local
1533 del self.target
1534 exit_stack = self._exit_stack
1535 del self._exit_stack
1536 return exit_stack.__exit__(*exc_info)
1537
1538
1539 def start(self):
1540 """Activate a patch, returning any created mock."""
1541 result = self.__enter__()
1542 self._active_patches.append(self)
1543 return result
1544
1545
1546 def stop(self):
1547 """Stop an active patch."""
1548 try:
1549 self._active_patches.remove(self)
1550 except ValueError:
1551 # If the patch hasn't been started this will fail
1552 return None
1553
1554 return self.__exit__(None, None, None)
1555
1556
1557
1558def _get_target(target):
1559 try:
1560 target, attribute = target.rsplit('.', 1)
1561 except (TypeError, ValueError):
1562 raise TypeError("Need a valid target to patch. You supplied: %r" %
1563 (target,))
1564 getter = lambda: _importer(target)
1565 return getter, attribute
1566
1567
1568def _patch_object(
1569 target, attribute, new=DEFAULT, spec=None,
1570 create=False, spec_set=None, autospec=None,
1571 new_callable=None, **kwargs
1572 ):
1573 """
1574 patch the named member (`attribute`) on an object (`target`) with a mock
1575 object.
1576
1577 `patch.object` can be used as a decorator, class decorator or a context
1578 manager. Arguments `new`, `spec`, `create`, `spec_set`,
1579 `autospec` and `new_callable` have the same meaning as for `patch`. Like
1580 `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1581 the mock object it creates.
1582
1583 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1584 for choosing which methods to wrap.
1585 """
1586 if type(target) is str:
1587 raise TypeError(
1588 f"{target!r} must be the actual object to be patched, not a str"
1589 )
1590 getter = lambda: target
1591 return _patch(
1592 getter, attribute, new, spec, create,
1593 spec_set, autospec, new_callable, kwargs
1594 )
1595
1596
1597def _patch_multiple(target, spec=None, create=False, spec_set=None,
1598 autospec=None, new_callable=None, **kwargs):
1599 """Perform multiple patches in a single call. It takes the object to be
1600 patched (either as an object or a string to fetch the object by importing)
1601 and keyword arguments for the patches::
1602
1603 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1604 ...
1605
1606 Use `DEFAULT` as the value if you want `patch.multiple` to create
1607 mocks for you. In this case the created mocks are passed into a decorated
1608 function by keyword, and a dictionary is returned when `patch.multiple` is
1609 used as a context manager.
1610
1611 `patch.multiple` can be used as a decorator, class decorator or a context
1612 manager. The arguments `spec`, `spec_set`, `create`,
1613 `autospec` and `new_callable` have the same meaning as for `patch`. These
1614 arguments will be applied to *all* patches done by `patch.multiple`.
1615
1616 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1617 for choosing which methods to wrap.
1618 """
1619 if type(target) is str:
1620 getter = lambda: _importer(target)
1621 else:
1622 getter = lambda: target
1623
1624 if not kwargs:
1625 raise ValueError(
1626 'Must supply at least one keyword argument with patch.multiple'
1627 )
1628 # need to wrap in a list for python 3, where items is a view
1629 items = list(kwargs.items())
1630 attribute, new = items[0]
1631 patcher = _patch(
1632 getter, attribute, new, spec, create, spec_set,
1633 autospec, new_callable, {}
1634 )
1635 patcher.attribute_name = attribute
1636 for attribute, new in items[1:]:
1637 this_patcher = _patch(
1638 getter, attribute, new, spec, create, spec_set,
1639 autospec, new_callable, {}
1640 )
1641 this_patcher.attribute_name = attribute
1642 patcher.additional_patchers.append(this_patcher)
1643 return patcher
1644
1645
1646def patch(
1647 target, new=DEFAULT, spec=None, create=False,
1648 spec_set=None, autospec=None, new_callable=None, **kwargs
1649 ):
1650 """
1651 `patch` acts as a function decorator, class decorator or a context
1652 manager. Inside the body of the function or with statement, the `target`
1653 is patched with a `new` object. When the function/with statement exits
1654 the patch is undone.
1655
1656 If `new` is omitted, then the target is replaced with an
1657 `AsyncMock if the patched object is an async function or a
1658 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1659 omitted, the created mock is passed in as an extra argument to the
1660 decorated function. If `patch` is used as a context manager the created
1661 mock is returned by the context manager.
1662
1663 `target` should be a string in the form `'package.module.ClassName'`. The
1664 `target` is imported and the specified object replaced with the `new`
1665 object, so the `target` must be importable from the environment you are
1666 calling `patch` from. The target is imported when the decorated function
1667 is executed, not at decoration time.
1668
1669 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1670 if patch is creating one for you.
1671
1672 In addition you can pass `spec=True` or `spec_set=True`, which causes
1673 patch to pass in the object being mocked as the spec/spec_set object.
1674
1675 `new_callable` allows you to specify a different class, or callable object,
1676 that will be called to create the `new` object. By default `AsyncMock` is
1677 used for async functions and `MagicMock` for the rest.
1678
1679 A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1680 then the mock will be created with a spec from the object being replaced.
1681 All attributes of the mock will also have the spec of the corresponding
1682 attribute of the object being replaced. Methods and functions being
1683 mocked will have their arguments checked and will raise a `TypeError` if
1684 they are called with the wrong signature. For mocks replacing a class,
1685 their return value (the 'instance') will have the same spec as the class.
1686
1687 Instead of `autospec=True` you can pass `autospec=some_object` to use an
1688 arbitrary object as the spec instead of the one being replaced.
1689
1690 By default `patch` will fail to replace attributes that don't exist. If
1691 you pass in `create=True`, and the attribute doesn't exist, patch will
1692 create the attribute for you when the patched function is called, and
1693 delete it again afterwards. This is useful for writing tests against
1694 attributes that your production code creates at runtime. It is off by
1695 default because it can be dangerous. With it switched on you can write
1696 passing tests against APIs that don't actually exist!
1697
1698 Patch can be used as a `TestCase` class decorator. It works by
1699 decorating each test method in the class. This reduces the boilerplate
1700 code when your test methods share a common patchings set. `patch` finds
1701 tests by looking for method names that start with `patch.TEST_PREFIX`.
1702 By default this is `test`, which matches the way `unittest` finds tests.
1703 You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1704
1705 Patch can be used as a context manager, with the with statement. Here the
1706 patching applies to the indented block after the with statement. If you
1707 use "as" then the patched object will be bound to the name after the
1708 "as"; very useful if `patch` is creating a mock object for you.
1709
1710 `patch` takes arbitrary keyword arguments. These will be passed to
1711 `AsyncMock` if the patched object is asynchronous, to `MagicMock`
1712 otherwise or to `new_callable` if specified.
1713
1714 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1715 available for alternate use-cases.
1716 """
1717 getter, attribute = _get_target(target)
1718 return _patch(
1719 getter, attribute, new, spec, create,
1720 spec_set, autospec, new_callable, kwargs
1721 )
1722
1723
1724class _patch_dict(object):
1725 """
1726 Patch a dictionary, or dictionary like object, and restore the dictionary
1727 to its original state after the test.
1728
1729 `in_dict` can be a dictionary or a mapping like container. If it is a
1730 mapping then it must at least support getting, setting and deleting items
1731 plus iterating over keys.
1732
1733 `in_dict` can also be a string specifying the name of the dictionary, which
1734 will then be fetched by importing it.
1735
1736 `values` can be a dictionary of values to set in the dictionary. `values`
1737 can also be an iterable of `(key, value)` pairs.
1738
1739 If `clear` is True then the dictionary will be cleared before the new
1740 values are set.
1741
1742 `patch.dict` can also be called with arbitrary keyword arguments to set
1743 values in the dictionary::
1744
1745 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1746 ...
1747
1748 `patch.dict` can be used as a context manager, decorator or class
1749 decorator. When used as a class decorator `patch.dict` honours
1750 `patch.TEST_PREFIX` for choosing which methods to wrap.
1751 """
1752
1753 def __init__(self, in_dict, values=(), clear=False, **kwargs):
1754 self.in_dict = in_dict
1755 # support any argument supported by dict(...) constructor
1756 self.values = dict(values)
1757 self.values.update(kwargs)
1758 self.clear = clear
1759 self._original = None
1760
1761
1762 def __call__(self, f):
1763 if isinstance(f, type):
1764 return self.decorate_class(f)
1765 @wraps(f)
1766 def _inner(*args, **kw):
1767 self._patch_dict()
1768 try:
1769 return f(*args, **kw)
1770 finally:
1771 self._unpatch_dict()
1772
1773 return _inner
1774
1775
1776 def decorate_class(self, klass):
1777 for attr in dir(klass):
1778 attr_value = getattr(klass, attr)
1779 if (attr.startswith(patch.TEST_PREFIX) and
1780 hasattr(attr_value, "__call__")):
1781 decorator = _patch_dict(self.in_dict, self.values, self.clear)
1782 decorated = decorator(attr_value)
1783 setattr(klass, attr, decorated)
1784 return klass
1785
1786
1787 def __enter__(self):
1788 """Patch the dict."""
1789 self._patch_dict()
1790 return self.in_dict
1791
1792
1793 def _patch_dict(self):
1794 values = self.values
1795 if isinstance(self.in_dict, str):
1796 self.in_dict = _importer(self.in_dict)
1797 in_dict = self.in_dict
1798 clear = self.clear
1799
1800 try:
1801 original = in_dict.copy()
1802 except AttributeError:
1803 # dict like object with no copy method
1804 # must support iteration over keys
1805 original = {}
1806 for key in in_dict:
1807 original[key] = in_dict[key]
1808 self._original = original
1809
1810 if clear:
1811 _clear_dict(in_dict)
1812
1813 try:
1814 in_dict.update(values)
1815 except AttributeError:
1816 # dict like object with no update method
1817 for key in values:
1818 in_dict[key] = values[key]
1819
1820
1821 def _unpatch_dict(self):
1822 in_dict = self.in_dict
1823 original = self._original
1824
1825 _clear_dict(in_dict)
1826
1827 try:
1828 in_dict.update(original)
1829 except AttributeError:
1830 for key in original:
1831 in_dict[key] = original[key]
1832
1833
1834 def __exit__(self, *args):
1835 """Unpatch the dict."""
1836 if self._original is not None:
1837 self._unpatch_dict()
1838 return False
1839
1840
1841 def start(self):
1842 """Activate a patch, returning any created mock."""
1843 result = self.__enter__()
1844 _patch._active_patches.append(self)
1845 return result
1846
1847
1848 def stop(self):
1849 """Stop an active patch."""
1850 try:
1851 _patch._active_patches.remove(self)
1852 except ValueError:
1853 # If the patch hasn't been started this will fail
1854 return None
1855
1856 return self.__exit__(None, None, None)
1857
1858
1859def _clear_dict(in_dict):
1860 try:
1861 in_dict.clear()
1862 except AttributeError:
1863 keys = list(in_dict)
1864 for key in keys:
1865 del in_dict[key]
1866
1867
1868def _patch_stopall():
1869 """Stop all active patches. LIFO to unroll nested patches."""
1870 for patch in reversed(_patch._active_patches):
1871 patch.stop()
1872
1873
1874patch.object = _patch_object
1875patch.dict = _patch_dict
1876patch.multiple = _patch_multiple
1877patch.stopall = _patch_stopall
1878patch.TEST_PREFIX = 'test'
1879
1880magic_methods = (
1881 "lt le gt ge eq ne "
1882 "getitem setitem delitem "
1883 "len contains iter "
1884 "hash str sizeof "
1885 "enter exit "
1886 # we added divmod and rdivmod here instead of numerics
1887 # because there is no idivmod
1888 "divmod rdivmod neg pos abs invert "
1889 "complex int float index "
1890 "round trunc floor ceil "
1891 "bool next "
1892 "fspath "
1893 "aiter "
1894)
1895
1896numerics = (
1897 "add sub mul matmul div floordiv mod lshift rshift and xor or pow truediv"
1898)
1899inplace = ' '.join('i%s' % n for n in numerics.split())
1900right = ' '.join('r%s' % n for n in numerics.split())
1901
1902# not including __prepare__, __instancecheck__, __subclasscheck__
1903# (as they are metaclass methods)
1904# __del__ is not supported at all as it causes problems if it exists
1905
1906_non_defaults = {
1907 '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
1908 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
1909 '__getstate__', '__setstate__', '__getformat__', '__setformat__',
1910 '__repr__', '__dir__', '__subclasses__', '__format__',
1911 '__getnewargs_ex__',
1912}
1913
1914
1915def _get_method(name, func):
1916 "Turns a callable object (like a mock) into a real function"
1917 def method(self, /, *args, **kw):
1918 return func(self, *args, **kw)
1919 method.__name__ = name
1920 return method
1921
1922
1923_magics = {
1924 '__%s__' % method for method in
1925 ' '.join([magic_methods, numerics, inplace, right]).split()
1926}
1927
1928# Magic methods used for async `with` statements
1929_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
1930# Magic methods that are only used with async calls but are synchronous functions themselves
1931_sync_async_magics = {"__aiter__"}
1932_async_magics = _async_method_magics | _sync_async_magics
1933
1934_all_sync_magics = _magics | _non_defaults
1935_all_magics = _all_sync_magics | _async_magics
1936
1937_unsupported_magics = {
1938 '__getattr__', '__setattr__',
1939 '__init__', '__new__', '__prepare__',
1940 '__instancecheck__', '__subclasscheck__',
1941 '__del__'
1942}
1943
1944_calculate_return_value = {
1945 '__hash__': lambda self: object.__hash__(self),
1946 '__str__': lambda self: object.__str__(self),
1947 '__sizeof__': lambda self: object.__sizeof__(self),
1948 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
1949}
1950
1951_return_values = {
1952 '__lt__': NotImplemented,
1953 '__gt__': NotImplemented,
1954 '__le__': NotImplemented,
1955 '__ge__': NotImplemented,
1956 '__int__': 1,
1957 '__contains__': False,
1958 '__len__': 0,
1959 '__exit__': False,
1960 '__complex__': 1j,
1961 '__float__': 1.0,
1962 '__bool__': True,
1963 '__index__': 1,
1964 '__aexit__': False,
1965}
1966
1967
1968def _get_eq(self):
1969 def __eq__(other):
1970 ret_val = self.__eq__._mock_return_value
1971 if ret_val is not DEFAULT:
1972 return ret_val
1973 if self is other:
1974 return True
1975 return NotImplemented
1976 return __eq__
1977
1978def _get_ne(self):
1979 def __ne__(other):
1980 if self.__ne__._mock_return_value is not DEFAULT:
1981 return DEFAULT
1982 if self is other:
1983 return False
1984 return NotImplemented
1985 return __ne__
1986
1987def _get_iter(self):
1988 def __iter__():
1989 ret_val = self.__iter__._mock_return_value
1990 if ret_val is DEFAULT:
1991 return iter([])
1992 # if ret_val was already an iterator, then calling iter on it should
1993 # return the iterator unchanged
1994 return iter(ret_val)
1995 return __iter__
1996
1997def _get_async_iter(self):
1998 def __aiter__():
1999 ret_val = self.__aiter__._mock_return_value
2000 if ret_val is DEFAULT:
2001 return _AsyncIterator(iter([]))
2002 return _AsyncIterator(iter(ret_val))
2003 return __aiter__
2004
2005_side_effect_methods = {
2006 '__eq__': _get_eq,
2007 '__ne__': _get_ne,
2008 '__iter__': _get_iter,
2009 '__aiter__': _get_async_iter
2010}
2011
2012
2013
2014def _set_return_value(mock, method, name):
2015 fixed = _return_values.get(name, DEFAULT)
2016 if fixed is not DEFAULT:
2017 method.return_value = fixed
2018 return
2019
2020 return_calculator = _calculate_return_value.get(name)
2021 if return_calculator is not None:
2022 return_value = return_calculator(mock)
2023 method.return_value = return_value
2024 return
2025
2026 side_effector = _side_effect_methods.get(name)
2027 if side_effector is not None:
2028 method.side_effect = side_effector(mock)
2029
2030
2031
2032class MagicMixin(Base):
2033 def __init__(self, /, *args, **kw):
2034 self._mock_set_magics() # make magic work for kwargs in init
2035 _safe_super(MagicMixin, self).__init__(*args, **kw)
2036 self._mock_set_magics() # fix magic broken by upper level init
2037
2038
2039 def _mock_set_magics(self):
2040 orig_magics = _magics | _async_method_magics
2041 these_magics = orig_magics
2042
2043 if getattr(self, "_mock_methods", None) is not None:
2044 these_magics = orig_magics.intersection(self._mock_methods)
2045
2046 remove_magics = set()
2047 remove_magics = orig_magics - these_magics
2048
2049 for entry in remove_magics:
2050 if entry in type(self).__dict__:
2051 # remove unneeded magic methods
2052 delattr(self, entry)
2053
2054 # don't overwrite existing attributes if called a second time
2055 these_magics = these_magics - set(type(self).__dict__)
2056
2057 _type = type(self)
2058 for entry in these_magics:
2059 setattr(_type, entry, MagicProxy(entry, self))
2060
2061
2062
2063class NonCallableMagicMock(MagicMixin, NonCallableMock):
2064 """A version of `MagicMock` that isn't callable."""
2065 def mock_add_spec(self, spec, spec_set=False):
2066 """Add a spec to a mock. `spec` can either be an object or a
2067 list of strings. Only attributes on the `spec` can be fetched as
2068 attributes from the mock.
2069
2070 If `spec_set` is True then only attributes on the spec can be set."""
2071 self._mock_add_spec(spec, spec_set)
2072 self._mock_set_magics()
2073
2074
2075class AsyncMagicMixin(MagicMixin):
2076 def __init__(self, /, *args, **kw):
2077 self._mock_set_magics() # make magic work for kwargs in init
2078 _safe_super(AsyncMagicMixin, self).__init__(*args, **kw)
2079 self._mock_set_magics() # fix magic broken by upper level init
2080
2081class MagicMock(MagicMixin, Mock):
2082 """
2083 MagicMock is a subclass of Mock with default implementations
2084 of most of the magic methods. You can use MagicMock without having to
2085 configure the magic methods yourself.
2086
2087 If you use the `spec` or `spec_set` arguments then *only* magic
2088 methods that exist in the spec will be created.
2089
2090 Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2091 """
2092 def mock_add_spec(self, spec, spec_set=False):
2093 """Add a spec to a mock. `spec` can either be an object or a
2094 list of strings. Only attributes on the `spec` can be fetched as
2095 attributes from the mock.
2096
2097 If `spec_set` is True then only attributes on the spec can be set."""
2098 self._mock_add_spec(spec, spec_set)
2099 self._mock_set_magics()
2100
2101
2102
2103class MagicProxy(Base):
2104 def __init__(self, name, parent):
2105 self.name = name
2106 self.parent = parent
2107
2108 def create_mock(self):
2109 entry = self.name
2110 parent = self.parent
2111 m = parent._get_child_mock(name=entry, _new_name=entry,
2112 _new_parent=parent)
2113 setattr(parent, entry, m)
2114 _set_return_value(parent, m, entry)
2115 return m
2116
2117 def __get__(self, obj, _type=None):
2118 return self.create_mock()
2119
2120
2121class AsyncMockMixin(Base):
2122 await_count = _delegating_property('await_count')
2123 await_args = _delegating_property('await_args')
2124 await_args_list = _delegating_property('await_args_list')
2125
2126 def __init__(self, /, *args, **kwargs):
2127 super().__init__(*args, **kwargs)
2128 # iscoroutinefunction() checks _is_coroutine property to say if an
2129 # object is a coroutine. Without this check it looks to see if it is a
2130 # function/method, which in this case it is not (since it is an
2131 # AsyncMock).
2132 # It is set through __dict__ because when spec_set is True, this
2133 # attribute is likely undefined.
2134 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2135 self.__dict__['_mock_await_count'] = 0
2136 self.__dict__['_mock_await_args'] = None
2137 self.__dict__['_mock_await_args_list'] = _CallList()
2138 code_mock = NonCallableMock(spec_set=CodeType)
2139 code_mock.co_flags = inspect.CO_COROUTINE
2140 self.__dict__['__code__'] = code_mock
2141
2142 async def _execute_mock_call(self, /, *args, **kwargs):
2143 # This is nearly just like super(), except for special handling
2144 # of coroutines
2145
2146 _call = _Call((args, kwargs), two=True)
2147 self.await_count += 1
2148 self.await_args = _call
2149 self.await_args_list.append(_call)
2150
2151 effect = self.side_effect
2152 if effect is not None:
2153 if _is_exception(effect):
2154 raise effect
2155 elif not _callable(effect):
2156 try:
2157 result = next(effect)
2158 except StopIteration:
2159 # It is impossible to propogate a StopIteration
2160 # through coroutines because of PEP 479
2161 raise StopAsyncIteration
2162 if _is_exception(result):
2163 raise result
2164 elif iscoroutinefunction(effect):
2165 result = await effect(*args, **kwargs)
2166 else:
2167 result = effect(*args, **kwargs)
2168
2169 if result is not DEFAULT:
2170 return result
2171
2172 if self._mock_return_value is not DEFAULT:
2173 return self.return_value
2174
2175 if self._mock_wraps is not None:
2176 if iscoroutinefunction(self._mock_wraps):
2177 return await self._mock_wraps(*args, **kwargs)
2178 return self._mock_wraps(*args, **kwargs)
2179
2180 return self.return_value
2181
2182 def assert_awaited(self):
2183 """
2184 Assert that the mock was awaited at least once.
2185 """
2186 if self.await_count == 0:
2187 msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2188 raise AssertionError(msg)
2189
2190 def assert_awaited_once(self):
2191 """
2192 Assert that the mock was awaited exactly once.
2193 """
2194 if not self.await_count == 1:
2195 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2196 f" Awaited {self.await_count} times.")
2197 raise AssertionError(msg)
2198
2199 def assert_awaited_with(self, /, *args, **kwargs):
2200 """
2201 Assert that the last await was with the specified arguments.
2202 """
2203 if self.await_args is None:
2204 expected = self._format_mock_call_signature(args, kwargs)
2205 raise AssertionError(f'Expected await: {expected}\nNot awaited')
2206
2207 def _error_message():
2208 msg = self._format_mock_failure_message(args, kwargs, action='await')
2209 return msg
2210
2211 expected = self._call_matcher(_Call((args, kwargs), two=True))
2212 actual = self._call_matcher(self.await_args)
2213 if actual != expected:
2214 cause = expected if isinstance(expected, Exception) else None
2215 raise AssertionError(_error_message()) from cause
2216
2217 def assert_awaited_once_with(self, /, *args, **kwargs):
2218 """
2219 Assert that the mock was awaited exactly once and with the specified
2220 arguments.
2221 """
2222 if not self.await_count == 1:
2223 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2224 f" Awaited {self.await_count} times.")
2225 raise AssertionError(msg)
2226 return self.assert_awaited_with(*args, **kwargs)
2227
2228 def assert_any_await(self, /, *args, **kwargs):
2229 """
2230 Assert the mock has ever been awaited with the specified arguments.
2231 """
2232 expected = self._call_matcher(_Call((args, kwargs), two=True))
2233 cause = expected if isinstance(expected, Exception) else None
2234 actual = [self._call_matcher(c) for c in self.await_args_list]
2235 if cause or expected not in _AnyComparer(actual):
2236 expected_string = self._format_mock_call_signature(args, kwargs)
2237 raise AssertionError(
2238 '%s await not found' % expected_string
2239 ) from cause
2240
2241 def assert_has_awaits(self, calls, any_order=False):
2242 """
2243 Assert the mock has been awaited with the specified calls.
2244 The :attr:`await_args_list` list is checked for the awaits.
2245
2246 If `any_order` is False (the default) then the awaits must be
2247 sequential. There can be extra calls before or after the
2248 specified awaits.
2249
2250 If `any_order` is True then the awaits can be in any order, but
2251 they must all appear in :attr:`await_args_list`.
2252 """
2253 expected = [self._call_matcher(c) for c in calls]
2254 cause = next((e for e in expected if isinstance(e, Exception)), None)
2255 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2256 if not any_order:
2257 if expected not in all_awaits:
2258 if cause is None:
2259 problem = 'Awaits not found.'
2260 else:
2261 problem = ('Error processing expected awaits.\n'
2262 'Errors: {}').format(
2263 [e if isinstance(e, Exception) else None
2264 for e in expected])
2265 raise AssertionError(
2266 f'{problem}\n'
2267 f'Expected: {_CallList(calls)}\n'
2268 f'Actual: {self.await_args_list}'
2269 ) from cause
2270 return
2271
2272 all_awaits = list(all_awaits)
2273
2274 not_found = []
2275 for kall in expected:
2276 try:
2277 all_awaits.remove(kall)
2278 except ValueError:
2279 not_found.append(kall)
2280 if not_found:
2281 raise AssertionError(
2282 '%r not all found in await list' % (tuple(not_found),)
2283 ) from cause
2284
2285 def assert_not_awaited(self):
2286 """
2287 Assert that the mock was never awaited.
2288 """
2289 if self.await_count != 0:
2290 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2291 f" Awaited {self.await_count} times.")
2292 raise AssertionError(msg)
2293
2294 def reset_mock(self, /, *args, **kwargs):
2295 """
2296 See :func:`.Mock.reset_mock()`
2297 """
2298 super().reset_mock(*args, **kwargs)
2299 self.await_count = 0
2300 self.await_args = None
2301 self.await_args_list = _CallList()
2302
2303
2304class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
2305 """
2306 Enhance :class:`Mock` with features allowing to mock
2307 an async function.
2308
2309 The :class:`AsyncMock` object will behave so the object is
2310 recognized as an async function, and the result of a call is an awaitable:
2311
2312 >>> mock = AsyncMock()
2313 >>> iscoroutinefunction(mock)
2314 True
2315 >>> inspect.isawaitable(mock())
2316 True
2317
2318
2319 The result of ``mock()`` is an async function which will have the outcome
2320 of ``side_effect`` or ``return_value``:
2321
2322 - if ``side_effect`` is a function, the async function will return the
2323 result of that function,
2324 - if ``side_effect`` is an exception, the async function will raise the
2325 exception,
2326 - if ``side_effect`` is an iterable, the async function will return the
2327 next value of the iterable, however, if the sequence of result is
2328 exhausted, ``StopIteration`` is raised immediately,
2329 - if ``side_effect`` is not defined, the async function will return the
2330 value defined by ``return_value``, hence, by default, the async function
2331 returns a new :class:`AsyncMock` object.
2332
2333 If the outcome of ``side_effect`` or ``return_value`` is an async function,
2334 the mock async function obtained when the mock object is called will be this
2335 async function itself (and not an async function returning an async
2336 function).
2337
2338 The test author can also specify a wrapped object with ``wraps``. In this
2339 case, the :class:`Mock` object behavior is the same as with an
2340 :class:`.Mock` object: the wrapped object may have methods
2341 defined as async function functions.
2342
2343 Based on Martin Richard's asynctest project.
2344 """
2345
2346
2347class _ANY(object):
2348 "A helper object that compares equal to everything."
2349
2350 def __eq__(self, other):
2351 return True
2352
2353 def __ne__(self, other):
2354 return False
2355
2356 def __repr__(self):
2357 return '<ANY>'
2358
2359ANY = _ANY()
2360
2361
2362
2363def _format_call_signature(name, args, kwargs):
2364 message = '%s(%%s)' % name
2365 formatted_args = ''
2366 args_string = ', '.join([repr(arg) for arg in args])
2367 kwargs_string = ', '.join([
2368 '%s=%r' % (key, value) for key, value in kwargs.items()
2369 ])
2370 if args_string:
2371 formatted_args = args_string
2372 if kwargs_string:
2373 if formatted_args:
2374 formatted_args += ', '
2375 formatted_args += kwargs_string
2376
2377 return message % formatted_args
2378
2379
2380
2381class _Call(tuple):
2382 """
2383 A tuple for holding the results of a call to a mock, either in the form
2384 `(args, kwargs)` or `(name, args, kwargs)`.
2385
2386 If args or kwargs are empty then a call tuple will compare equal to
2387 a tuple without those values. This makes comparisons less verbose::
2388
2389 _Call(('name', (), {})) == ('name',)
2390 _Call(('name', (1,), {})) == ('name', (1,))
2391 _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2392
2393 The `_Call` object provides a useful shortcut for comparing with call::
2394
2395 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2396 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2397
2398 If the _Call has no name then it will match any name.
2399 """
2400 def __new__(cls, value=(), name='', parent=None, two=False,
2401 from_kall=True):
2402 args = ()
2403 kwargs = {}
2404 _len = len(value)
2405 if _len == 3:
2406 name, args, kwargs = value
2407 elif _len == 2:
2408 first, second = value
2409 if isinstance(first, str):
2410 name = first
2411 if isinstance(second, tuple):
2412 args = second
2413 else:
2414 kwargs = second
2415 else:
2416 args, kwargs = first, second
2417 elif _len == 1:
2418 value, = value
2419 if isinstance(value, str):
2420 name = value
2421 elif isinstance(value, tuple):
2422 args = value
2423 else:
2424 kwargs = value
2425
2426 if two:
2427 return tuple.__new__(cls, (args, kwargs))
2428
2429 return tuple.__new__(cls, (name, args, kwargs))
2430
2431
2432 def __init__(self, value=(), name=None, parent=None, two=False,
2433 from_kall=True):
2434 self._mock_name = name
2435 self._mock_parent = parent
2436 self._mock_from_kall = from_kall
2437
2438
2439 def __eq__(self, other):
2440 try:
2441 len_other = len(other)
2442 except TypeError:
2443 return NotImplemented
2444
2445 self_name = ''
2446 if len(self) == 2:
2447 self_args, self_kwargs = self
2448 else:
2449 self_name, self_args, self_kwargs = self
2450
2451 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2452 and self._mock_parent != other._mock_parent):
2453 return False
2454
2455 other_name = ''
2456 if len_other == 0:
2457 other_args, other_kwargs = (), {}
2458 elif len_other == 3:
2459 other_name, other_args, other_kwargs = other
2460 elif len_other == 1:
2461 value, = other
2462 if isinstance(value, tuple):
2463 other_args = value
2464 other_kwargs = {}
2465 elif isinstance(value, str):
2466 other_name = value
2467 other_args, other_kwargs = (), {}
2468 else:
2469 other_args = ()
2470 other_kwargs = value
2471 elif len_other == 2:
2472 # could be (name, args) or (name, kwargs) or (args, kwargs)
2473 first, second = other
2474 if isinstance(first, str):
2475 other_name = first
2476 if isinstance(second, tuple):
2477 other_args, other_kwargs = second, {}
2478 else:
2479 other_args, other_kwargs = (), second
2480 else:
2481 other_args, other_kwargs = first, second
2482 else:
2483 return False
2484
2485 if self_name and other_name != self_name:
2486 return False
2487
2488 # this order is important for ANY to work!
2489 return (other_args, other_kwargs) == (self_args, self_kwargs)
2490
2491
2492 __ne__ = object.__ne__
2493
2494
2495 def __call__(self, /, *args, **kwargs):
2496 if self._mock_name is None:
2497 return _Call(('', args, kwargs), name='()')
2498
2499 name = self._mock_name + '()'
2500 return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2501
2502
2503 def __getattr__(self, attr):
2504 if self._mock_name is None:
2505 return _Call(name=attr, from_kall=False)
2506 name = '%s.%s' % (self._mock_name, attr)
2507 return _Call(name=name, parent=self, from_kall=False)
2508
2509
2510 def __getattribute__(self, attr):
2511 if attr in tuple.__dict__:
2512 raise AttributeError
2513 return tuple.__getattribute__(self, attr)
2514
2515
2516 def _get_call_arguments(self):
2517 if len(self) == 2:
2518 args, kwargs = self
2519 else:
2520 name, args, kwargs = self
2521
2522 return args, kwargs
2523
2524 @property
2525 def args(self):
2526 return self._get_call_arguments()[0]
2527
2528 @property
2529 def kwargs(self):
2530 return self._get_call_arguments()[1]
2531
2532 def __repr__(self):
2533 if not self._mock_from_kall:
2534 name = self._mock_name or 'call'
2535 if name.startswith('()'):
2536 name = 'call%s' % name
2537 return name
2538
2539 if len(self) == 2:
2540 name = 'call'
2541 args, kwargs = self
2542 else:
2543 name, args, kwargs = self
2544 if not name:
2545 name = 'call'
2546 elif not name.startswith('()'):
2547 name = 'call.%s' % name
2548 else:
2549 name = 'call%s' % name
2550 return _format_call_signature(name, args, kwargs)
2551
2552
2553 def call_list(self):
2554 """For a call object that represents multiple calls, `call_list`
2555 returns a list of all the intermediate calls as well as the
2556 final call."""
2557 vals = []
2558 thing = self
2559 while thing is not None:
2560 if thing._mock_from_kall:
2561 vals.append(thing)
2562 thing = thing._mock_parent
2563 return _CallList(reversed(vals))
2564
2565
2566call = _Call(from_kall=False)
2567
2568
2569def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2570 _name=None, **kwargs):
2571 """Create a mock object using another object as a spec. Attributes on the
2572 mock will use the corresponding attribute on the `spec` object as their
2573 spec.
2574
2575 Functions or methods being mocked will have their arguments checked
2576 to check that they are called with the correct signature.
2577
2578 If `spec_set` is True then attempting to set attributes that don't exist
2579 on the spec object will raise an `AttributeError`.
2580
2581 If a class is used as a spec then the return value of the mock (the
2582 instance of the class) will have the same spec. You can use a class as the
2583 spec for an instance object by passing `instance=True`. The returned mock
2584 will only be callable if instances of the mock are callable.
2585
2586 `create_autospec` also takes arbitrary keyword arguments that are passed to
2587 the constructor of the created mock."""
2588 if _is_list(spec):
2589 # can't pass a list instance to the mock constructor as it will be
2590 # interpreted as a list of strings
2591 spec = type(spec)
2592
2593 is_type = isinstance(spec, type)
2594 is_async_func = _is_async_func(spec)
2595 _kwargs = {'spec': spec}
2596 if spec_set:
2597 _kwargs = {'spec_set': spec}
2598 elif spec is None:
2599 # None we mock with a normal mock without a spec
2600 _kwargs = {}
2601 if _kwargs and instance:
2602 _kwargs['_spec_as_instance'] = True
2603
2604 _kwargs.update(kwargs)
2605
2606 Klass = MagicMock
2607 if inspect.isdatadescriptor(spec):
2608 # descriptors don't have a spec
2609 # because we don't know what type they return
2610 _kwargs = {}
2611 elif is_async_func:
2612 if instance:
2613 raise RuntimeError("Instance can not be True when create_autospec "
2614 "is mocking an async function")
2615 Klass = AsyncMock
2616 elif not _callable(spec):
2617 Klass = NonCallableMagicMock
2618 elif is_type and instance and not _instance_callable(spec):
2619 Klass = NonCallableMagicMock
2620
2621 _name = _kwargs.pop('name', _name)
2622
2623 _new_name = _name
2624 if _parent is None:
2625 # for a top level object no _new_name should be set
2626 _new_name = ''
2627
2628 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2629 name=_name, **_kwargs)
2630
2631 if isinstance(spec, FunctionTypes):
2632 # should only happen at the top level because we don't
2633 # recurse for functions
2634 mock = _set_signature(mock, spec)
2635 if is_async_func:
2636 _setup_async_mock(mock)
2637 else:
2638 _check_signature(spec, mock, is_type, instance)
2639
2640 if _parent is not None and not instance:
2641 _parent._mock_children[_name] = mock
2642
2643 if is_type and not instance and 'return_value' not in kwargs:
2644 mock.return_value = create_autospec(spec, spec_set, instance=True,
2645 _name='()', _parent=mock)
2646
2647 for entry in dir(spec):
2648 if _is_magic(entry):
2649 # MagicMock already does the useful magic methods for us
2650 continue
2651
2652 # XXXX do we need a better way of getting attributes without
2653 # triggering code execution (?) Probably not - we need the actual
2654 # object to mock it so we would rather trigger a property than mock
2655 # the property descriptor. Likewise we want to mock out dynamically
2656 # provided attributes.
2657 # XXXX what about attributes that raise exceptions other than
2658 # AttributeError on being fetched?
2659 # we could be resilient against it, or catch and propagate the
2660 # exception when the attribute is fetched from the mock
2661 try:
2662 original = getattr(spec, entry)
2663 except AttributeError:
2664 continue
2665
2666 kwargs = {'spec': original}
2667 if spec_set:
2668 kwargs = {'spec_set': original}
2669
2670 if not isinstance(original, FunctionTypes):
2671 new = _SpecState(original, spec_set, mock, entry, instance)
2672 mock._mock_children[entry] = new
2673 else:
2674 parent = mock
2675 if isinstance(spec, FunctionTypes):
2676 parent = mock.mock
2677
2678 skipfirst = _must_skip(spec, entry, is_type)
2679 kwargs['_eat_self'] = skipfirst
2680 if iscoroutinefunction(original):
2681 child_klass = AsyncMock
2682 else:
2683 child_klass = MagicMock
2684 new = child_klass(parent=parent, name=entry, _new_name=entry,
2685 _new_parent=parent,
2686 **kwargs)
2687 mock._mock_children[entry] = new
2688 _check_signature(original, new, skipfirst=skipfirst)
2689
2690 # so functions created with _set_signature become instance attributes,
2691 # *plus* their underlying mock exists in _mock_children of the parent
2692 # mock. Adding to _mock_children may be unnecessary where we are also
2693 # setting as an instance attribute?
2694 if isinstance(new, FunctionTypes):
2695 setattr(mock, entry, new)
2696
2697 return mock
2698
2699
2700def _must_skip(spec, entry, is_type):
2701 """
2702 Return whether we should skip the first argument on spec's `entry`
2703 attribute.
2704 """
2705 if not isinstance(spec, type):
2706 if entry in getattr(spec, '__dict__', {}):
2707 # instance attribute - shouldn't skip
2708 return False
2709 spec = spec.__class__
2710
2711 for klass in spec.__mro__:
2712 result = klass.__dict__.get(entry, DEFAULT)
2713 if result is DEFAULT:
2714 continue
2715 if isinstance(result, (staticmethod, classmethod)):
2716 return False
2717 elif isinstance(result, FunctionTypes):
2718 # Normal method => skip if looked up on type
2719 # (if looked up on instance, self is already skipped)
2720 return is_type
2721 else:
2722 return False
2723
2724 # function is a dynamically provided attribute
2725 return is_type
2726
2727
2728class _SpecState(object):
2729
2730 def __init__(self, spec, spec_set=False, parent=None,
2731 name=None, ids=None, instance=False):
2732 self.spec = spec
2733 self.ids = ids
2734 self.spec_set = spec_set
2735 self.parent = parent
2736 self.instance = instance
2737 self.name = name
2738
2739
2740FunctionTypes = (
2741 # python function
2742 type(create_autospec),
2743 # instance method
2744 type(ANY.__eq__),
2745)
2746
2747
2748file_spec = None
2749
2750
2751def _to_stream(read_data):
2752 if isinstance(read_data, bytes):
2753 return io.BytesIO(read_data)
2754 else:
2755 return io.StringIO(read_data)
2756
2757
2758def mock_open(mock=None, read_data=''):
2759 """
2760 A helper function to create a mock to replace the use of `open`. It works
2761 for `open` called directly or used as a context manager.
2762
2763 The `mock` argument is the mock object to configure. If `None` (the
2764 default) then a `MagicMock` will be created for you, with the API limited
2765 to methods or attributes available on standard file handles.
2766
2767 `read_data` is a string for the `read`, `readline` and `readlines` of the
2768 file handle to return. This is an empty string by default.
2769 """
2770 _read_data = _to_stream(read_data)
2771 _state = [_read_data, None]
2772
2773 def _readlines_side_effect(*args, **kwargs):
2774 if handle.readlines.return_value is not None:
2775 return handle.readlines.return_value
2776 return _state[0].readlines(*args, **kwargs)
2777
2778 def _read_side_effect(*args, **kwargs):
2779 if handle.read.return_value is not None:
2780 return handle.read.return_value
2781 return _state[0].read(*args, **kwargs)
2782
2783 def _readline_side_effect(*args, **kwargs):
2784 yield from _iter_side_effect()
2785 while True:
2786 yield _state[0].readline(*args, **kwargs)
2787
2788 def _iter_side_effect():
2789 if handle.readline.return_value is not None:
2790 while True:
2791 yield handle.readline.return_value
2792 for line in _state[0]:
2793 yield line
2794
2795 def _next_side_effect():
2796 if handle.readline.return_value is not None:
2797 return handle.readline.return_value
2798 return next(_state[0])
2799
2800 global file_spec
2801 if file_spec is None:
2802 import _io
2803 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
2804
2805 if mock is None:
2806 mock = MagicMock(name='open', spec=open)
2807
2808 handle = MagicMock(spec=file_spec)
2809 handle.__enter__.return_value = handle
2810
2811 handle.write.return_value = None
2812 handle.read.return_value = None
2813 handle.readline.return_value = None
2814 handle.readlines.return_value = None
2815
2816 handle.read.side_effect = _read_side_effect
2817 _state[1] = _readline_side_effect()
2818 handle.readline.side_effect = _state[1]
2819 handle.readlines.side_effect = _readlines_side_effect
2820 handle.__iter__.side_effect = _iter_side_effect
2821 handle.__next__.side_effect = _next_side_effect
2822
2823 def reset_data(*args, **kwargs):
2824 _state[0] = _to_stream(read_data)
2825 if handle.readline.side_effect == _state[1]:
2826 # Only reset the side effect if the user hasn't overridden it.
2827 _state[1] = _readline_side_effect()
2828 handle.readline.side_effect = _state[1]
2829 return DEFAULT
2830
2831 mock.side_effect = reset_data
2832 mock.return_value = handle
2833 return mock
2834
2835
2836class PropertyMock(Mock):
2837 """
2838 A mock intended to be used as a property, or other descriptor, on a class.
2839 `PropertyMock` provides `__get__` and `__set__` methods so you can specify
2840 a return value when it is fetched.
2841
2842 Fetching a `PropertyMock` instance from an object calls the mock, with
2843 no args. Setting it calls the mock with the value being set.
2844 """
2845 def _get_child_mock(self, /, **kwargs):
2846 return MagicMock(**kwargs)
2847
2848 def __get__(self, obj, obj_type=None):
2849 return self()
2850 def __set__(self, obj, val):
2851 self(val)
2852
2853
2854def seal(mock):
2855 """Disable the automatic generation of child mocks.
2856
2857 Given an input Mock, seals it to ensure no further mocks will be generated
2858 when accessing an attribute that was not already defined.
2859
2860 The operation recursively seals the mock passed in, meaning that
2861 the mock itself, any mocks generated by accessing one of its attributes,
2862 and all assigned mocks without a name or spec will be sealed.
2863 """
2864 mock._mock_sealed = True
2865 for attr in dir(mock):
2866 try:
2867 m = getattr(mock, attr)
2868 except AttributeError:
2869 continue
2870 if not isinstance(m, NonCallableMock):
2871 continue
2872 if m._mock_new_parent is mock:
2873 seal(m)
2874
2875
2876class _AsyncIterator:
2877 """
2878 Wraps an iterator in an asynchronous iterator.
2879 """
2880 def __init__(self, iterator):
2881 self.iterator = iterator
2882 code_mock = NonCallableMock(spec_set=CodeType)
2883 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
2884 self.__dict__['__code__'] = code_mock
2885
2886 async def __anext__(self):
2887 try:
2888 return next(self.iterator)
2889 except StopIteration:
2890 pass
2891 raise StopAsyncIteration