blob: fa5dbd09eab8693a0a50fbf98ce2d1ee59b9404c [file] [log] [blame]
Olivier Deprezf4ef2d02021-04-20 13:36:24 +02001"""Cache lines from Python source files.
2
3This is intended to read lines from modules imported -- hence if a filename
4is not found, it will look down the module search path for a file by
5that name.
6"""
7
8import functools
9import sys
10import os
11import tokenize
12
13__all__ = ["getline", "clearcache", "checkcache", "lazycache"]
14
15
16# The cache. Maps filenames to either a thunk which will provide source code,
17# or a tuple (size, mtime, lines, fullname) once loaded.
18cache = {}
19
20
21def clearcache():
22 """Clear the cache entirely."""
23 cache.clear()
24
25
26def getline(filename, lineno, module_globals=None):
27 """Get a line for a Python source file from the cache.
28 Update the cache if it doesn't contain an entry for this file already."""
29
30 lines = getlines(filename, module_globals)
31 if 1 <= lineno <= len(lines):
32 return lines[lineno - 1]
33 return ''
34
35
36def getlines(filename, module_globals=None):
37 """Get the lines for a Python source file from the cache.
38 Update the cache if it doesn't contain an entry for this file already."""
39
40 if filename in cache:
41 entry = cache[filename]
42 if len(entry) != 1:
43 return cache[filename][2]
44
45 try:
46 return updatecache(filename, module_globals)
47 except MemoryError:
48 clearcache()
49 return []
50
51
52def checkcache(filename=None):
53 """Discard cache entries that are out of date.
54 (This is not checked upon each call!)"""
55
56 if filename is None:
57 filenames = list(cache.keys())
58 elif filename in cache:
59 filenames = [filename]
60 else:
61 return
62
63 for filename in filenames:
64 entry = cache[filename]
65 if len(entry) == 1:
66 # lazy cache entry, leave it lazy.
67 continue
68 size, mtime, lines, fullname = entry
69 if mtime is None:
70 continue # no-op for files loaded via a __loader__
71 try:
72 stat = os.stat(fullname)
73 except OSError:
74 cache.pop(filename, None)
75 continue
76 if size != stat.st_size or mtime != stat.st_mtime:
77 cache.pop(filename, None)
78
79
80def updatecache(filename, module_globals=None):
81 """Update a cache entry and return its list of lines.
82 If something's wrong, print a message, discard the cache entry,
83 and return an empty list."""
84
85 if filename in cache:
86 if len(cache[filename]) != 1:
87 cache.pop(filename, None)
88 if not filename or (filename.startswith('<') and filename.endswith('>')):
89 return []
90
91 fullname = filename
92 try:
93 stat = os.stat(fullname)
94 except OSError:
95 basename = filename
96
97 # Realise a lazy loader based lookup if there is one
98 # otherwise try to lookup right now.
99 if lazycache(filename, module_globals):
100 try:
101 data = cache[filename][0]()
102 except (ImportError, OSError):
103 pass
104 else:
105 if data is None:
106 # No luck, the PEP302 loader cannot find the source
107 # for this module.
108 return []
109 cache[filename] = (
110 len(data),
111 None,
112 [line + '\n' for line in data.splitlines()],
113 fullname
114 )
115 return cache[filename][2]
116
117 # Try looking through the module search path, which is only useful
118 # when handling a relative filename.
119 if os.path.isabs(filename):
120 return []
121
122 for dirname in sys.path:
123 try:
124 fullname = os.path.join(dirname, basename)
125 except (TypeError, AttributeError):
126 # Not sufficiently string-like to do anything useful with.
127 continue
128 try:
129 stat = os.stat(fullname)
130 break
131 except OSError:
132 pass
133 else:
134 return []
135 try:
136 with tokenize.open(fullname) as fp:
137 lines = fp.readlines()
138 except OSError:
139 return []
140 if lines and not lines[-1].endswith('\n'):
141 lines[-1] += '\n'
142 size, mtime = stat.st_size, stat.st_mtime
143 cache[filename] = size, mtime, lines, fullname
144 return lines
145
146
147def lazycache(filename, module_globals):
148 """Seed the cache for filename with module_globals.
149
150 The module loader will be asked for the source only when getlines is
151 called, not immediately.
152
153 If there is an entry in the cache already, it is not altered.
154
155 :return: True if a lazy load is registered in the cache,
156 otherwise False. To register such a load a module loader with a
157 get_source method must be found, the filename must be a cachable
158 filename, and the filename must not be already cached.
159 """
160 if filename in cache:
161 if len(cache[filename]) == 1:
162 return True
163 else:
164 return False
165 if not filename or (filename.startswith('<') and filename.endswith('>')):
166 return False
167 # Try for a __loader__, if available
168 if module_globals and '__loader__' in module_globals:
169 name = module_globals.get('__name__')
170 loader = module_globals['__loader__']
171 get_source = getattr(loader, 'get_source', None)
172
173 if name and get_source:
174 get_lines = functools.partial(get_source, name)
175 cache[filename] = (get_lines,)
176 return True
177 return False