blob: 691ccddfa450ad242e45458e693dbdead2d56bf4 [file] [log] [blame]
Olivier Deprezf4ef2d02021-04-20 13:36:24 +02001"""Python part of the warnings subsystem."""
2
3import sys
4
5
6__all__ = ["warn", "warn_explicit", "showwarning",
7 "formatwarning", "filterwarnings", "simplefilter",
8 "resetwarnings", "catch_warnings"]
9
10def showwarning(message, category, filename, lineno, file=None, line=None):
11 """Hook to write a warning to a file; replace if you like."""
12 msg = WarningMessage(message, category, filename, lineno, file, line)
13 _showwarnmsg_impl(msg)
14
15def formatwarning(message, category, filename, lineno, line=None):
16 """Function to format a warning the standard way."""
17 msg = WarningMessage(message, category, filename, lineno, None, line)
18 return _formatwarnmsg_impl(msg)
19
20def _showwarnmsg_impl(msg):
21 file = msg.file
22 if file is None:
23 file = sys.stderr
24 if file is None:
25 # sys.stderr is None when run with pythonw.exe:
26 # warnings get lost
27 return
28 text = _formatwarnmsg(msg)
29 try:
30 file.write(text)
31 except OSError:
32 # the file (probably stderr) is invalid - this warning gets lost.
33 pass
34
35def _formatwarnmsg_impl(msg):
36 category = msg.category.__name__
37 s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
38
39 if msg.line is None:
40 try:
41 import linecache
42 line = linecache.getline(msg.filename, msg.lineno)
43 except Exception:
44 # When a warning is logged during Python shutdown, linecache
45 # and the import machinery don't work anymore
46 line = None
47 linecache = None
48 else:
49 line = msg.line
50 if line:
51 line = line.strip()
52 s += " %s\n" % line
53
54 if msg.source is not None:
55 try:
56 import tracemalloc
57 # Logging a warning should not raise a new exception:
58 # catch Exception, not only ImportError and RecursionError.
59 except Exception:
60 # don't suggest to enable tracemalloc if it's not available
61 tracing = True
62 tb = None
63 else:
64 tracing = tracemalloc.is_tracing()
65 try:
66 tb = tracemalloc.get_object_traceback(msg.source)
67 except Exception:
68 # When a warning is logged during Python shutdown, tracemalloc
69 # and the import machinery don't work anymore
70 tb = None
71
72 if tb is not None:
73 s += 'Object allocated at (most recent call last):\n'
74 for frame in tb:
75 s += (' File "%s", lineno %s\n'
76 % (frame.filename, frame.lineno))
77
78 try:
79 if linecache is not None:
80 line = linecache.getline(frame.filename, frame.lineno)
81 else:
82 line = None
83 except Exception:
84 line = None
85 if line:
86 line = line.strip()
87 s += ' %s\n' % line
88 elif not tracing:
89 s += (f'{category}: Enable tracemalloc to get the object '
90 f'allocation traceback\n')
91 return s
92
93# Keep a reference to check if the function was replaced
94_showwarning_orig = showwarning
95
96def _showwarnmsg(msg):
97 """Hook to write a warning to a file; replace if you like."""
98 try:
99 sw = showwarning
100 except NameError:
101 pass
102 else:
103 if sw is not _showwarning_orig:
104 # warnings.showwarning() was replaced
105 if not callable(sw):
106 raise TypeError("warnings.showwarning() must be set to a "
107 "function or method")
108
109 sw(msg.message, msg.category, msg.filename, msg.lineno,
110 msg.file, msg.line)
111 return
112 _showwarnmsg_impl(msg)
113
114# Keep a reference to check if the function was replaced
115_formatwarning_orig = formatwarning
116
117def _formatwarnmsg(msg):
118 """Function to format a warning the standard way."""
119 try:
120 fw = formatwarning
121 except NameError:
122 pass
123 else:
124 if fw is not _formatwarning_orig:
125 # warnings.formatwarning() was replaced
126 return fw(msg.message, msg.category,
127 msg.filename, msg.lineno, msg.line)
128 return _formatwarnmsg_impl(msg)
129
130def filterwarnings(action, message="", category=Warning, module="", lineno=0,
131 append=False):
132 """Insert an entry into the list of warnings filters (at the front).
133
134 'action' -- one of "error", "ignore", "always", "default", "module",
135 or "once"
136 'message' -- a regex that the warning message must match
137 'category' -- a class that the warning must be a subclass of
138 'module' -- a regex that the module name must match
139 'lineno' -- an integer line number, 0 matches all warnings
140 'append' -- if true, append to the list of filters
141 """
142 assert action in ("error", "ignore", "always", "default", "module",
143 "once"), "invalid action: %r" % (action,)
144 assert isinstance(message, str), "message must be a string"
145 assert isinstance(category, type), "category must be a class"
146 assert issubclass(category, Warning), "category must be a Warning subclass"
147 assert isinstance(module, str), "module must be a string"
148 assert isinstance(lineno, int) and lineno >= 0, \
149 "lineno must be an int >= 0"
150
151 if message or module:
152 import re
153
154 if message:
155 message = re.compile(message, re.I)
156 else:
157 message = None
158 if module:
159 module = re.compile(module)
160 else:
161 module = None
162
163 _add_filter(action, message, category, module, lineno, append=append)
164
165def simplefilter(action, category=Warning, lineno=0, append=False):
166 """Insert a simple entry into the list of warnings filters (at the front).
167
168 A simple filter matches all modules and messages.
169 'action' -- one of "error", "ignore", "always", "default", "module",
170 or "once"
171 'category' -- a class that the warning must be a subclass of
172 'lineno' -- an integer line number, 0 matches all warnings
173 'append' -- if true, append to the list of filters
174 """
175 assert action in ("error", "ignore", "always", "default", "module",
176 "once"), "invalid action: %r" % (action,)
177 assert isinstance(lineno, int) and lineno >= 0, \
178 "lineno must be an int >= 0"
179 _add_filter(action, None, category, None, lineno, append=append)
180
181def _add_filter(*item, append):
182 # Remove possible duplicate filters, so new one will be placed
183 # in correct place. If append=True and duplicate exists, do nothing.
184 if not append:
185 try:
186 filters.remove(item)
187 except ValueError:
188 pass
189 filters.insert(0, item)
190 else:
191 if item not in filters:
192 filters.append(item)
193 _filters_mutated()
194
195def resetwarnings():
196 """Clear the list of warning filters, so that no filters are active."""
197 filters[:] = []
198 _filters_mutated()
199
200class _OptionError(Exception):
201 """Exception used by option processing helpers."""
202 pass
203
204# Helper to process -W options passed via sys.warnoptions
205def _processoptions(args):
206 for arg in args:
207 try:
208 _setoption(arg)
209 except _OptionError as msg:
210 print("Invalid -W option ignored:", msg, file=sys.stderr)
211
212# Helper for _processoptions()
213def _setoption(arg):
214 parts = arg.split(':')
215 if len(parts) > 5:
216 raise _OptionError("too many fields (max 5): %r" % (arg,))
217 while len(parts) < 5:
218 parts.append('')
219 action, message, category, module, lineno = [s.strip()
220 for s in parts]
221 action = _getaction(action)
222 category = _getcategory(category)
223 if message or module:
224 import re
225 if message:
226 message = re.escape(message)
227 if module:
228 module = re.escape(module) + r'\Z'
229 if lineno:
230 try:
231 lineno = int(lineno)
232 if lineno < 0:
233 raise ValueError
234 except (ValueError, OverflowError):
235 raise _OptionError("invalid lineno %r" % (lineno,)) from None
236 else:
237 lineno = 0
238 filterwarnings(action, message, category, module, lineno)
239
240# Helper for _setoption()
241def _getaction(action):
242 if not action:
243 return "default"
244 if action == "all": return "always" # Alias
245 for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
246 if a.startswith(action):
247 return a
248 raise _OptionError("invalid action: %r" % (action,))
249
250# Helper for _setoption()
251def _getcategory(category):
252 if not category:
253 return Warning
254 if '.' not in category:
255 import builtins as m
256 klass = category
257 else:
258 module, _, klass = category.rpartition('.')
259 try:
260 m = __import__(module, None, None, [klass])
261 except ImportError:
262 raise _OptionError("invalid module name: %r" % (module,)) from None
263 try:
264 cat = getattr(m, klass)
265 except AttributeError:
266 raise _OptionError("unknown warning category: %r" % (category,)) from None
267 if not issubclass(cat, Warning):
268 raise _OptionError("invalid warning category: %r" % (category,))
269 return cat
270
271
272def _is_internal_frame(frame):
273 """Signal whether the frame is an internal CPython implementation detail."""
274 filename = frame.f_code.co_filename
275 return 'importlib' in filename and '_bootstrap' in filename
276
277
278def _next_external_frame(frame):
279 """Find the next frame that doesn't involve CPython internals."""
280 frame = frame.f_back
281 while frame is not None and _is_internal_frame(frame):
282 frame = frame.f_back
283 return frame
284
285
286# Code typically replaced by _warnings
287def warn(message, category=None, stacklevel=1, source=None):
288 """Issue a warning, or maybe ignore it or raise an exception."""
289 # Check if message is already a Warning object
290 if isinstance(message, Warning):
291 category = message.__class__
292 # Check category argument
293 if category is None:
294 category = UserWarning
295 if not (isinstance(category, type) and issubclass(category, Warning)):
296 raise TypeError("category must be a Warning subclass, "
297 "not '{:s}'".format(type(category).__name__))
298 # Get context information
299 try:
300 if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
301 # If frame is too small to care or if the warning originated in
302 # internal code, then do not try to hide any frames.
303 frame = sys._getframe(stacklevel)
304 else:
305 frame = sys._getframe(1)
306 # Look for one frame less since the above line starts us off.
307 for x in range(stacklevel-1):
308 frame = _next_external_frame(frame)
309 if frame is None:
310 raise ValueError
311 except ValueError:
312 globals = sys.__dict__
313 filename = "sys"
314 lineno = 1
315 else:
316 globals = frame.f_globals
317 filename = frame.f_code.co_filename
318 lineno = frame.f_lineno
319 if '__name__' in globals:
320 module = globals['__name__']
321 else:
322 module = "<string>"
323 registry = globals.setdefault("__warningregistry__", {})
324 warn_explicit(message, category, filename, lineno, module, registry,
325 globals, source)
326
327def warn_explicit(message, category, filename, lineno,
328 module=None, registry=None, module_globals=None,
329 source=None):
330 lineno = int(lineno)
331 if module is None:
332 module = filename or "<unknown>"
333 if module[-3:].lower() == ".py":
334 module = module[:-3] # XXX What about leading pathname?
335 if registry is None:
336 registry = {}
337 if registry.get('version', 0) != _filters_version:
338 registry.clear()
339 registry['version'] = _filters_version
340 if isinstance(message, Warning):
341 text = str(message)
342 category = message.__class__
343 else:
344 text = message
345 message = category(message)
346 key = (text, category, lineno)
347 # Quick test for common case
348 if registry.get(key):
349 return
350 # Search the filters
351 for item in filters:
352 action, msg, cat, mod, ln = item
353 if ((msg is None or msg.match(text)) and
354 issubclass(category, cat) and
355 (mod is None or mod.match(module)) and
356 (ln == 0 or lineno == ln)):
357 break
358 else:
359 action = defaultaction
360 # Early exit actions
361 if action == "ignore":
362 return
363
364 # Prime the linecache for formatting, in case the
365 # "file" is actually in a zipfile or something.
366 import linecache
367 linecache.getlines(filename, module_globals)
368
369 if action == "error":
370 raise message
371 # Other actions
372 if action == "once":
373 registry[key] = 1
374 oncekey = (text, category)
375 if onceregistry.get(oncekey):
376 return
377 onceregistry[oncekey] = 1
378 elif action == "always":
379 pass
380 elif action == "module":
381 registry[key] = 1
382 altkey = (text, category, 0)
383 if registry.get(altkey):
384 return
385 registry[altkey] = 1
386 elif action == "default":
387 registry[key] = 1
388 else:
389 # Unrecognized actions are errors
390 raise RuntimeError(
391 "Unrecognized action (%r) in warnings.filters:\n %s" %
392 (action, item))
393 # Print message and context
394 msg = WarningMessage(message, category, filename, lineno, source)
395 _showwarnmsg(msg)
396
397
398class WarningMessage(object):
399
400 _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
401 "line", "source")
402
403 def __init__(self, message, category, filename, lineno, file=None,
404 line=None, source=None):
405 self.message = message
406 self.category = category
407 self.filename = filename
408 self.lineno = lineno
409 self.file = file
410 self.line = line
411 self.source = source
412 self._category_name = category.__name__ if category else None
413
414 def __str__(self):
415 return ("{message : %r, category : %r, filename : %r, lineno : %s, "
416 "line : %r}" % (self.message, self._category_name,
417 self.filename, self.lineno, self.line))
418
419
420class catch_warnings(object):
421
422 """A context manager that copies and restores the warnings filter upon
423 exiting the context.
424
425 The 'record' argument specifies whether warnings should be captured by a
426 custom implementation of warnings.showwarning() and be appended to a list
427 returned by the context manager. Otherwise None is returned by the context
428 manager. The objects appended to the list are arguments whose attributes
429 mirror the arguments to showwarning().
430
431 The 'module' argument is to specify an alternative module to the module
432 named 'warnings' and imported under that name. This argument is only useful
433 when testing the warnings module itself.
434
435 """
436
437 def __init__(self, *, record=False, module=None):
438 """Specify whether to record warnings and if an alternative module
439 should be used other than sys.modules['warnings'].
440
441 For compatibility with Python 3.0, please consider all arguments to be
442 keyword-only.
443
444 """
445 self._record = record
446 self._module = sys.modules['warnings'] if module is None else module
447 self._entered = False
448
449 def __repr__(self):
450 args = []
451 if self._record:
452 args.append("record=True")
453 if self._module is not sys.modules['warnings']:
454 args.append("module=%r" % self._module)
455 name = type(self).__name__
456 return "%s(%s)" % (name, ", ".join(args))
457
458 def __enter__(self):
459 if self._entered:
460 raise RuntimeError("Cannot enter %r twice" % self)
461 self._entered = True
462 self._filters = self._module.filters
463 self._module.filters = self._filters[:]
464 self._module._filters_mutated()
465 self._showwarning = self._module.showwarning
466 self._showwarnmsg_impl = self._module._showwarnmsg_impl
467 if self._record:
468 log = []
469 self._module._showwarnmsg_impl = log.append
470 # Reset showwarning() to the default implementation to make sure
471 # that _showwarnmsg() calls _showwarnmsg_impl()
472 self._module.showwarning = self._module._showwarning_orig
473 return log
474 else:
475 return None
476
477 def __exit__(self, *exc_info):
478 if not self._entered:
479 raise RuntimeError("Cannot exit %r without entering first" % self)
480 self._module.filters = self._filters
481 self._module._filters_mutated()
482 self._module.showwarning = self._showwarning
483 self._module._showwarnmsg_impl = self._showwarnmsg_impl
484
485
486# Private utility function called by _PyErr_WarnUnawaitedCoroutine
487def _warn_unawaited_coroutine(coro):
488 msg_lines = [
489 f"coroutine '{coro.__qualname__}' was never awaited\n"
490 ]
491 if coro.cr_origin is not None:
492 import linecache, traceback
493 def extract():
494 for filename, lineno, funcname in reversed(coro.cr_origin):
495 line = linecache.getline(filename, lineno)
496 yield (filename, lineno, funcname, line)
497 msg_lines.append("Coroutine created at (most recent call last)\n")
498 msg_lines += traceback.format_list(list(extract()))
499 msg = "".join(msg_lines).rstrip("\n")
500 # Passing source= here means that if the user happens to have tracemalloc
501 # enabled and tracking where the coroutine was created, the warning will
502 # contain that traceback. This does mean that if they have *both*
503 # coroutine origin tracking *and* tracemalloc enabled, they'll get two
504 # partially-redundant tracebacks. If we wanted to be clever we could
505 # probably detect this case and avoid it, but for now we don't bother.
506 warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
507
508
509# filters contains a sequence of filter 5-tuples
510# The components of the 5-tuple are:
511# - an action: error, ignore, always, default, module, or once
512# - a compiled regex that must match the warning message
513# - a class representing the warning category
514# - a compiled regex that must match the module that is being warned
515# - a line number for the line being warning, or 0 to mean any line
516# If either if the compiled regexs are None, match anything.
517try:
518 from _warnings import (filters, _defaultaction, _onceregistry,
519 warn, warn_explicit, _filters_mutated)
520 defaultaction = _defaultaction
521 onceregistry = _onceregistry
522 _warnings_defaults = True
523except ImportError:
524 filters = []
525 defaultaction = "default"
526 onceregistry = {}
527
528 _filters_version = 1
529
530 def _filters_mutated():
531 global _filters_version
532 _filters_version += 1
533
534 _warnings_defaults = False
535
536
537# Module initialization
538_processoptions(sys.warnoptions)
539if not _warnings_defaults:
540 # Several warning categories are ignored by default in regular builds
541 if not hasattr(sys, 'gettotalrefcount'):
542 filterwarnings("default", category=DeprecationWarning,
543 module="__main__", append=1)
544 simplefilter("ignore", category=DeprecationWarning, append=1)
545 simplefilter("ignore", category=PendingDeprecationWarning, append=1)
546 simplefilter("ignore", category=ImportWarning, append=1)
547 simplefilter("ignore", category=ResourceWarning, append=1)
548
549del _warnings_defaults