blob: bb15a1cb1bada1ae70ad4fb2a0863017215754a0 [file] [log] [blame]
Olivier Deprezf4ef2d02021-04-20 13:36:24 +02001"""Selectors module.
2
3This module allows high-level and efficient I/O multiplexing, built upon the
4`select` module primitives.
5"""
6
7
8from abc import ABCMeta, abstractmethod
9from collections import namedtuple
10from collections.abc import Mapping
11import math
12import select
13import sys
14
15
16# generic events, that must be mapped to implementation-specific ones
17EVENT_READ = (1 << 0)
18EVENT_WRITE = (1 << 1)
19
20
21def _fileobj_to_fd(fileobj):
22 """Return a file descriptor from a file object.
23
24 Parameters:
25 fileobj -- file object or file descriptor
26
27 Returns:
28 corresponding file descriptor
29
30 Raises:
31 ValueError if the object is invalid
32 """
33 if isinstance(fileobj, int):
34 fd = fileobj
35 else:
36 try:
37 fd = int(fileobj.fileno())
38 except (AttributeError, TypeError, ValueError):
39 raise ValueError("Invalid file object: "
40 "{!r}".format(fileobj)) from None
41 if fd < 0:
42 raise ValueError("Invalid file descriptor: {}".format(fd))
43 return fd
44
45
46SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
47
48SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data)
49
50 Object used to associate a file object to its backing
51 file descriptor, selected event mask, and attached data.
52"""
53if sys.version_info >= (3, 5):
54 SelectorKey.fileobj.__doc__ = 'File object registered.'
55 SelectorKey.fd.__doc__ = 'Underlying file descriptor.'
56 SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.'
57 SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object.
58 For example, this could be used to store a per-client session ID.''')
59
60
61class _SelectorMapping(Mapping):
62 """Mapping of file objects to selector keys."""
63
64 def __init__(self, selector):
65 self._selector = selector
66
67 def __len__(self):
68 return len(self._selector._fd_to_key)
69
70 def __getitem__(self, fileobj):
71 try:
72 fd = self._selector._fileobj_lookup(fileobj)
73 return self._selector._fd_to_key[fd]
74 except KeyError:
75 raise KeyError("{!r} is not registered".format(fileobj)) from None
76
77 def __iter__(self):
78 return iter(self._selector._fd_to_key)
79
80
81class BaseSelector(metaclass=ABCMeta):
82 """Selector abstract base class.
83
84 A selector supports registering file objects to be monitored for specific
85 I/O events.
86
87 A file object is a file descriptor or any object with a `fileno()` method.
88 An arbitrary object can be attached to the file object, which can be used
89 for example to store context information, a callback, etc.
90
91 A selector can use various implementations (select(), poll(), epoll()...)
92 depending on the platform. The default `Selector` class uses the most
93 efficient implementation on the current platform.
94 """
95
96 @abstractmethod
97 def register(self, fileobj, events, data=None):
98 """Register a file object.
99
100 Parameters:
101 fileobj -- file object or file descriptor
102 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
103 data -- attached data
104
105 Returns:
106 SelectorKey instance
107
108 Raises:
109 ValueError if events is invalid
110 KeyError if fileobj is already registered
111 OSError if fileobj is closed or otherwise is unacceptable to
112 the underlying system call (if a system call is made)
113
114 Note:
115 OSError may or may not be raised
116 """
117 raise NotImplementedError
118
119 @abstractmethod
120 def unregister(self, fileobj):
121 """Unregister a file object.
122
123 Parameters:
124 fileobj -- file object or file descriptor
125
126 Returns:
127 SelectorKey instance
128
129 Raises:
130 KeyError if fileobj is not registered
131
132 Note:
133 If fileobj is registered but has since been closed this does
134 *not* raise OSError (even if the wrapped syscall does)
135 """
136 raise NotImplementedError
137
138 def modify(self, fileobj, events, data=None):
139 """Change a registered file object monitored events or attached data.
140
141 Parameters:
142 fileobj -- file object or file descriptor
143 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
144 data -- attached data
145
146 Returns:
147 SelectorKey instance
148
149 Raises:
150 Anything that unregister() or register() raises
151 """
152 self.unregister(fileobj)
153 return self.register(fileobj, events, data)
154
155 @abstractmethod
156 def select(self, timeout=None):
157 """Perform the actual selection, until some monitored file objects are
158 ready or a timeout expires.
159
160 Parameters:
161 timeout -- if timeout > 0, this specifies the maximum wait time, in
162 seconds
163 if timeout <= 0, the select() call won't block, and will
164 report the currently ready file objects
165 if timeout is None, select() will block until a monitored
166 file object becomes ready
167
168 Returns:
169 list of (key, events) for ready file objects
170 `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
171 """
172 raise NotImplementedError
173
174 def close(self):
175 """Close the selector.
176
177 This must be called to make sure that any underlying resource is freed.
178 """
179 pass
180
181 def get_key(self, fileobj):
182 """Return the key associated to a registered file object.
183
184 Returns:
185 SelectorKey for this file object
186 """
187 mapping = self.get_map()
188 if mapping is None:
189 raise RuntimeError('Selector is closed')
190 try:
191 return mapping[fileobj]
192 except KeyError:
193 raise KeyError("{!r} is not registered".format(fileobj)) from None
194
195 @abstractmethod
196 def get_map(self):
197 """Return a mapping of file objects to selector keys."""
198 raise NotImplementedError
199
200 def __enter__(self):
201 return self
202
203 def __exit__(self, *args):
204 self.close()
205
206
207class _BaseSelectorImpl(BaseSelector):
208 """Base selector implementation."""
209
210 def __init__(self):
211 # this maps file descriptors to keys
212 self._fd_to_key = {}
213 # read-only mapping returned by get_map()
214 self._map = _SelectorMapping(self)
215
216 def _fileobj_lookup(self, fileobj):
217 """Return a file descriptor from a file object.
218
219 This wraps _fileobj_to_fd() to do an exhaustive search in case
220 the object is invalid but we still have it in our map. This
221 is used by unregister() so we can unregister an object that
222 was previously registered even if it is closed. It is also
223 used by _SelectorMapping.
224 """
225 try:
226 return _fileobj_to_fd(fileobj)
227 except ValueError:
228 # Do an exhaustive search.
229 for key in self._fd_to_key.values():
230 if key.fileobj is fileobj:
231 return key.fd
232 # Raise ValueError after all.
233 raise
234
235 def register(self, fileobj, events, data=None):
236 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
237 raise ValueError("Invalid events: {!r}".format(events))
238
239 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
240
241 if key.fd in self._fd_to_key:
242 raise KeyError("{!r} (FD {}) is already registered"
243 .format(fileobj, key.fd))
244
245 self._fd_to_key[key.fd] = key
246 return key
247
248 def unregister(self, fileobj):
249 try:
250 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
251 except KeyError:
252 raise KeyError("{!r} is not registered".format(fileobj)) from None
253 return key
254
255 def modify(self, fileobj, events, data=None):
256 try:
257 key = self._fd_to_key[self._fileobj_lookup(fileobj)]
258 except KeyError:
259 raise KeyError("{!r} is not registered".format(fileobj)) from None
260 if events != key.events:
261 self.unregister(fileobj)
262 key = self.register(fileobj, events, data)
263 elif data != key.data:
264 # Use a shortcut to update the data.
265 key = key._replace(data=data)
266 self._fd_to_key[key.fd] = key
267 return key
268
269 def close(self):
270 self._fd_to_key.clear()
271 self._map = None
272
273 def get_map(self):
274 return self._map
275
276 def _key_from_fd(self, fd):
277 """Return the key associated to a given file descriptor.
278
279 Parameters:
280 fd -- file descriptor
281
282 Returns:
283 corresponding key, or None if not found
284 """
285 try:
286 return self._fd_to_key[fd]
287 except KeyError:
288 return None
289
290
291class SelectSelector(_BaseSelectorImpl):
292 """Select-based selector."""
293
294 def __init__(self):
295 super().__init__()
296 self._readers = set()
297 self._writers = set()
298
299 def register(self, fileobj, events, data=None):
300 key = super().register(fileobj, events, data)
301 if events & EVENT_READ:
302 self._readers.add(key.fd)
303 if events & EVENT_WRITE:
304 self._writers.add(key.fd)
305 return key
306
307 def unregister(self, fileobj):
308 key = super().unregister(fileobj)
309 self._readers.discard(key.fd)
310 self._writers.discard(key.fd)
311 return key
312
313 if sys.platform == 'win32':
314 def _select(self, r, w, _, timeout=None):
315 r, w, x = select.select(r, w, w, timeout)
316 return r, w + x, []
317 else:
318 _select = select.select
319
320 def select(self, timeout=None):
321 timeout = None if timeout is None else max(timeout, 0)
322 ready = []
323 try:
324 r, w, _ = self._select(self._readers, self._writers, [], timeout)
325 except InterruptedError:
326 return ready
327 r = set(r)
328 w = set(w)
329 for fd in r | w:
330 events = 0
331 if fd in r:
332 events |= EVENT_READ
333 if fd in w:
334 events |= EVENT_WRITE
335
336 key = self._key_from_fd(fd)
337 if key:
338 ready.append((key, events & key.events))
339 return ready
340
341
342class _PollLikeSelector(_BaseSelectorImpl):
343 """Base class shared between poll, epoll and devpoll selectors."""
344 _selector_cls = None
345 _EVENT_READ = None
346 _EVENT_WRITE = None
347
348 def __init__(self):
349 super().__init__()
350 self._selector = self._selector_cls()
351
352 def register(self, fileobj, events, data=None):
353 key = super().register(fileobj, events, data)
354 poller_events = 0
355 if events & EVENT_READ:
356 poller_events |= self._EVENT_READ
357 if events & EVENT_WRITE:
358 poller_events |= self._EVENT_WRITE
359 try:
360 self._selector.register(key.fd, poller_events)
361 except:
362 super().unregister(fileobj)
363 raise
364 return key
365
366 def unregister(self, fileobj):
367 key = super().unregister(fileobj)
368 try:
369 self._selector.unregister(key.fd)
370 except OSError:
371 # This can happen if the FD was closed since it
372 # was registered.
373 pass
374 return key
375
376 def modify(self, fileobj, events, data=None):
377 try:
378 key = self._fd_to_key[self._fileobj_lookup(fileobj)]
379 except KeyError:
380 raise KeyError(f"{fileobj!r} is not registered") from None
381
382 changed = False
383 if events != key.events:
384 selector_events = 0
385 if events & EVENT_READ:
386 selector_events |= self._EVENT_READ
387 if events & EVENT_WRITE:
388 selector_events |= self._EVENT_WRITE
389 try:
390 self._selector.modify(key.fd, selector_events)
391 except:
392 super().unregister(fileobj)
393 raise
394 changed = True
395 if data != key.data:
396 changed = True
397
398 if changed:
399 key = key._replace(events=events, data=data)
400 self._fd_to_key[key.fd] = key
401 return key
402
403 def select(self, timeout=None):
404 # This is shared between poll() and epoll().
405 # epoll() has a different signature and handling of timeout parameter.
406 if timeout is None:
407 timeout = None
408 elif timeout <= 0:
409 timeout = 0
410 else:
411 # poll() has a resolution of 1 millisecond, round away from
412 # zero to wait *at least* timeout seconds.
413 timeout = math.ceil(timeout * 1e3)
414 ready = []
415 try:
416 fd_event_list = self._selector.poll(timeout)
417 except InterruptedError:
418 return ready
419 for fd, event in fd_event_list:
420 events = 0
421 if event & ~self._EVENT_READ:
422 events |= EVENT_WRITE
423 if event & ~self._EVENT_WRITE:
424 events |= EVENT_READ
425
426 key = self._key_from_fd(fd)
427 if key:
428 ready.append((key, events & key.events))
429 return ready
430
431
432if hasattr(select, 'poll'):
433
434 class PollSelector(_PollLikeSelector):
435 """Poll-based selector."""
436 _selector_cls = select.poll
437 _EVENT_READ = select.POLLIN
438 _EVENT_WRITE = select.POLLOUT
439
440
441if hasattr(select, 'epoll'):
442
443 class EpollSelector(_PollLikeSelector):
444 """Epoll-based selector."""
445 _selector_cls = select.epoll
446 _EVENT_READ = select.EPOLLIN
447 _EVENT_WRITE = select.EPOLLOUT
448
449 def fileno(self):
450 return self._selector.fileno()
451
452 def select(self, timeout=None):
453 if timeout is None:
454 timeout = -1
455 elif timeout <= 0:
456 timeout = 0
457 else:
458 # epoll_wait() has a resolution of 1 millisecond, round away
459 # from zero to wait *at least* timeout seconds.
460 timeout = math.ceil(timeout * 1e3) * 1e-3
461
462 # epoll_wait() expects `maxevents` to be greater than zero;
463 # we want to make sure that `select()` can be called when no
464 # FD is registered.
465 max_ev = max(len(self._fd_to_key), 1)
466
467 ready = []
468 try:
469 fd_event_list = self._selector.poll(timeout, max_ev)
470 except InterruptedError:
471 return ready
472 for fd, event in fd_event_list:
473 events = 0
474 if event & ~select.EPOLLIN:
475 events |= EVENT_WRITE
476 if event & ~select.EPOLLOUT:
477 events |= EVENT_READ
478
479 key = self._key_from_fd(fd)
480 if key:
481 ready.append((key, events & key.events))
482 return ready
483
484 def close(self):
485 self._selector.close()
486 super().close()
487
488
489if hasattr(select, 'devpoll'):
490
491 class DevpollSelector(_PollLikeSelector):
492 """Solaris /dev/poll selector."""
493 _selector_cls = select.devpoll
494 _EVENT_READ = select.POLLIN
495 _EVENT_WRITE = select.POLLOUT
496
497 def fileno(self):
498 return self._selector.fileno()
499
500 def close(self):
501 self._selector.close()
502 super().close()
503
504
505if hasattr(select, 'kqueue'):
506
507 class KqueueSelector(_BaseSelectorImpl):
508 """Kqueue-based selector."""
509
510 def __init__(self):
511 super().__init__()
512 self._selector = select.kqueue()
513
514 def fileno(self):
515 return self._selector.fileno()
516
517 def register(self, fileobj, events, data=None):
518 key = super().register(fileobj, events, data)
519 try:
520 if events & EVENT_READ:
521 kev = select.kevent(key.fd, select.KQ_FILTER_READ,
522 select.KQ_EV_ADD)
523 self._selector.control([kev], 0, 0)
524 if events & EVENT_WRITE:
525 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
526 select.KQ_EV_ADD)
527 self._selector.control([kev], 0, 0)
528 except:
529 super().unregister(fileobj)
530 raise
531 return key
532
533 def unregister(self, fileobj):
534 key = super().unregister(fileobj)
535 if key.events & EVENT_READ:
536 kev = select.kevent(key.fd, select.KQ_FILTER_READ,
537 select.KQ_EV_DELETE)
538 try:
539 self._selector.control([kev], 0, 0)
540 except OSError:
541 # This can happen if the FD was closed since it
542 # was registered.
543 pass
544 if key.events & EVENT_WRITE:
545 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
546 select.KQ_EV_DELETE)
547 try:
548 self._selector.control([kev], 0, 0)
549 except OSError:
550 # See comment above.
551 pass
552 return key
553
554 def select(self, timeout=None):
555 timeout = None if timeout is None else max(timeout, 0)
556 # If max_ev is 0, kqueue will ignore the timeout. For consistent
557 # behavior with the other selector classes, we prevent that here
558 # (using max). See https://bugs.python.org/issue29255
559 max_ev = max(len(self._fd_to_key), 1)
560 ready = []
561 try:
562 kev_list = self._selector.control(None, max_ev, timeout)
563 except InterruptedError:
564 return ready
565 for kev in kev_list:
566 fd = kev.ident
567 flag = kev.filter
568 events = 0
569 if flag == select.KQ_FILTER_READ:
570 events |= EVENT_READ
571 if flag == select.KQ_FILTER_WRITE:
572 events |= EVENT_WRITE
573
574 key = self._key_from_fd(fd)
575 if key:
576 ready.append((key, events & key.events))
577 return ready
578
579 def close(self):
580 self._selector.close()
581 super().close()
582
583
584def _can_use(method):
585 """Check if we can use the selector depending upon the
586 operating system. """
587 # Implementation based upon https://github.com/sethmlarson/selectors2/blob/master/selectors2.py
588 selector = getattr(select, method, None)
589 if selector is None:
590 # select module does not implement method
591 return False
592 # check if the OS and Kernel actually support the method. Call may fail with
593 # OSError: [Errno 38] Function not implemented
594 try:
595 selector_obj = selector()
596 if method == 'poll':
597 # check that poll actually works
598 selector_obj.poll(0)
599 else:
600 # close epoll, kqueue, and devpoll fd
601 selector_obj.close()
602 return True
603 except OSError:
604 return False
605
606
607# Choose the best implementation, roughly:
608# epoll|kqueue|devpoll > poll > select.
609# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
610if _can_use('kqueue'):
611 DefaultSelector = KqueueSelector
612elif _can_use('epoll'):
613 DefaultSelector = EpollSelector
614elif _can_use('devpoll'):
615 DefaultSelector = DevpollSelector
616elif _can_use('poll'):
617 DefaultSelector = PollSelector
618else:
619 DefaultSelector = SelectSelector