Package portage :: Package util :: Package _eventloop :: Module EventLoop
[hide private]

Source Code for Module portage.util._eventloop.EventLoop

  1  # Copyright 1999-2016 Gentoo Foundation 
  2  # Distributed under the terms of the GNU General Public License v2 
  3   
  4  from __future__ import division 
  5   
  6  import errno 
  7  import logging 
  8  import os 
  9  import select 
 10  import signal 
 11  import sys 
 12  import time 
 13   
 14  try: 
 15          import fcntl 
 16  except ImportError: 
 17          #  http://bugs.jython.org/issue1074 
 18          fcntl = None 
 19   
 20  try: 
 21          import threading 
 22  except ImportError: 
 23          import dummy_threading as threading 
 24   
 25  import portage 
 26  portage.proxy.lazyimport.lazyimport(globals(), 
 27          'portage.util.futures.futures:_EventLoopFuture', 
 28  ) 
 29   
 30  from portage import OrderedDict 
 31  from portage.util import writemsg_level 
 32  from ..SlotObject import SlotObject 
 33  from .PollConstants import PollConstants 
 34  from .PollSelectAdapter import PollSelectAdapter 
 35   
36 -class EventLoop(object):
37 """ 38 An event loop, intended to be compatible with the GLib event loop. 39 Call the iteration method in order to execute one iteration of the 40 loop. The idle_add and timeout_add methods serve as thread-safe 41 means to interact with the loop's thread. 42 """ 43 44 supports_multiprocessing = True 45 46 # TODO: Find out why SIGCHLD signals aren't delivered during poll 47 # calls, forcing us to wakeup in order to receive them. 48 _sigchld_interval = 250 49
50 - class _child_callback_class(SlotObject):
51 __slots__ = ("callback", "data", "pid", "source_id")
52
53 - class _idle_callback_class(SlotObject):
54 __slots__ = ("args", "callback", "calling", "source_id")
55
56 - class _io_handler_class(SlotObject):
57 __slots__ = ("args", "callback", "f", "source_id")
58
59 - class _timeout_handler_class(SlotObject):
60 __slots__ = ("args", "function", "calling", "interval", "source_id", 61 "timestamp")
62
63 - class _handle(object):
64 """ 65 A callback wrapper object, compatible with asyncio.Handle. 66 """ 67 __slots__ = ("_callback_id", "_loop") 68
69 - def __init__(self, callback_id, loop):
70 self._callback_id = callback_id 71 self._loop = loop
72
73 - def cancel(self):
74 """ 75 Cancel the call. If the callback is already canceled or executed, 76 this method has no effect. 77 """ 78 self._loop.source_remove(self._callback_id)
79
80 - class _call_soon_callback(object):
81 """ 82 Wraps a call_soon callback, and always returns False, since these 83 callbacks are only supposed to run once. 84 """ 85 __slots__ = ("_args", "_callback") 86
87 - def __init__(self, callback, args):
88 self._callback = callback 89 self._args = args
90
91 - def __call__(self):
92 self._callback(*self._args) 93 return False
94
95 - def __init__(self, main=True):
96 """ 97 @param main: If True then this is a singleton instance for use 98 in the main thread, otherwise it is a local instance which 99 can safely be use in a non-main thread (default is True, so 100 that global_event_loop does not need constructor arguments) 101 @type main: bool 102 """ 103 self._use_signal = main and fcntl is not None 104 self._thread_rlock = threading.RLock() 105 self._thread_condition = threading.Condition(self._thread_rlock) 106 self._poll_event_queue = [] 107 self._poll_event_handlers = {} 108 self._poll_event_handler_ids = {} 109 # Increment id for each new handler. 110 self._event_handler_id = 0 111 # New call_soon callbacks must have an opportunity to 112 # execute before it's safe to wait on self._thread_condition 113 # without a timeout, since delaying its execution indefinitely 114 # could lead to a deadlock. The following attribute stores the 115 # event handler id of the most recently added call_soon callback. 116 # If this attribute has changed since the last time that the 117 # call_soon callbacks have been called, then it's not safe to 118 # wait on self._thread_condition without a timeout. 119 self._call_soon_id = 0 120 # Use OrderedDict in order to emulate the FIFO queue behavior 121 # of the AbstractEventLoop.call_soon method. 122 self._idle_callbacks = OrderedDict() 123 self._timeout_handlers = {} 124 self._timeout_interval = None 125 126 self._poll_obj = None 127 try: 128 select.epoll 129 except AttributeError: 130 pass 131 else: 132 try: 133 epoll_obj = select.epoll() 134 except IOError: 135 # This happens with Linux 2.4 kernels: 136 # IOError: [Errno 38] Function not implemented 137 pass 138 else: 139 140 # FD_CLOEXEC is enabled by default in Python >=3.4. 141 if sys.hexversion < 0x3040000 and fcntl is not None: 142 try: 143 fcntl.FD_CLOEXEC 144 except AttributeError: 145 pass 146 else: 147 fcntl.fcntl(epoll_obj.fileno(), fcntl.F_SETFD, 148 fcntl.fcntl(epoll_obj.fileno(), 149 fcntl.F_GETFD) | fcntl.FD_CLOEXEC) 150 151 self._poll_obj = _epoll_adapter(epoll_obj) 152 self.IO_ERR = select.EPOLLERR 153 self.IO_HUP = select.EPOLLHUP 154 self.IO_IN = select.EPOLLIN 155 self.IO_NVAL = 0 156 self.IO_OUT = select.EPOLLOUT 157 self.IO_PRI = select.EPOLLPRI 158 159 if self._poll_obj is None: 160 self._poll_obj = create_poll_instance() 161 self.IO_ERR = PollConstants.POLLERR 162 self.IO_HUP = PollConstants.POLLHUP 163 self.IO_IN = PollConstants.POLLIN 164 self.IO_NVAL = PollConstants.POLLNVAL 165 self.IO_OUT = PollConstants.POLLOUT 166 self.IO_PRI = PollConstants.POLLPRI 167 168 self._child_handlers = {} 169 self._sigchld_read = None 170 self._sigchld_write = None 171 self._sigchld_src_id = None 172 self._pid = os.getpid()
173
174 - def create_future(self):
175 """ 176 Create a Future object attached to the loop. This returns 177 an instance of _EventLoopFuture, because EventLoop is currently 178 missing some of the asyncio.AbstractEventLoop methods that 179 asyncio.Future requires. 180 """ 181 return _EventLoopFuture(loop=self)
182
183 - def _new_source_id(self):
184 """ 185 Generate a new source id. This method is thread-safe. 186 """ 187 with self._thread_rlock: 188 self._event_handler_id += 1 189 return self._event_handler_id
190
191 - def _poll(self, timeout=None):
192 """ 193 All poll() calls pass through here. The poll events 194 are added directly to self._poll_event_queue. 195 In order to avoid endless blocking, this raises 196 StopIteration if timeout is None and there are 197 no file descriptors to poll. 198 """ 199 200 if timeout is None and \ 201 not self._poll_event_handlers: 202 raise StopIteration( 203 "timeout is None and there are no poll() event handlers") 204 205 while True: 206 try: 207 self._poll_event_queue.extend(self._poll_obj.poll(timeout)) 208 break 209 except (IOError, select.error) as e: 210 # Silently handle EINTR, which is normal when we have 211 # received a signal such as SIGINT (epoll objects may 212 # raise IOError rather than select.error, at least in 213 # Python 3.2). 214 if not (e.args and e.args[0] == errno.EINTR): 215 writemsg_level("\n!!! select error: %s\n" % (e,), 216 level=logging.ERROR, noiselevel=-1) 217 del e 218 219 # This typically means that we've received a SIGINT, so 220 # raise StopIteration in order to break out of our current 221 # iteration and respond appropriately to the signal as soon 222 # as possible. 223 raise StopIteration("interrupted")
224
225 - def iteration(self, *args):
226 """ 227 Like glib.MainContext.iteration(), runs a single iteration. In order 228 to avoid blocking forever when may_block is True (the default), 229 callers must be careful to ensure that at least one of the following 230 conditions is met: 231 1) An event source or timeout is registered which is guaranteed 232 to trigger at least on event (a call to an idle function 233 only counts as an event if it returns a False value which 234 causes it to stop being called) 235 2) Another thread is guaranteed to call one of the thread-safe 236 methods which notify iteration to stop waiting (such as 237 idle_add or timeout_add). 238 These rules ensure that iteration is able to block until an event 239 arrives, without doing any busy waiting that would waste CPU time. 240 @type may_block: bool 241 @param may_block: if True the call may block waiting for an event 242 (default is True). 243 @rtype: bool 244 @return: True if events were dispatched. 245 """ 246 247 may_block = True 248 249 if args: 250 if len(args) > 1: 251 raise TypeError( 252 "expected at most 1 argument (%s given)" % len(args)) 253 may_block = args[0] 254 255 event_queue = self._poll_event_queue 256 event_handlers = self._poll_event_handlers 257 events_handled = 0 258 timeouts_checked = False 259 260 if not event_handlers: 261 with self._thread_condition: 262 prev_call_soon_id = self._call_soon_id 263 if self._run_timeouts(): 264 events_handled += 1 265 timeouts_checked = True 266 267 call_soon = prev_call_soon_id != self._call_soon_id 268 269 if (not call_soon and not event_handlers 270 and not events_handled and may_block): 271 # Block so that we don't waste cpu time by looping too 272 # quickly. This makes EventLoop useful for code that needs 273 # to wait for timeout callbacks regardless of whether or 274 # not any IO handlers are currently registered. 275 timeout = self._get_poll_timeout() 276 if timeout is None: 277 wait_timeout = None 278 else: 279 wait_timeout = timeout / 1000 280 # NOTE: In order to avoid a possible infinite wait when 281 # wait_timeout is None, the previous _run_timeouts() 282 # call must have returned False *with* _thread_condition 283 # acquired. Otherwise, we would risk going to sleep after 284 # our only notify event has already passed. 285 self._thread_condition.wait(wait_timeout) 286 if self._run_timeouts(): 287 events_handled += 1 288 timeouts_checked = True 289 290 # If any timeouts have executed, then return immediately, 291 # in order to minimize latency in termination of iteration 292 # loops that they may control. 293 if events_handled or not event_handlers: 294 return bool(events_handled) 295 296 if not event_queue: 297 298 if may_block: 299 timeout = self._get_poll_timeout() 300 301 # Avoid blocking for IO if there are any timeout 302 # or idle callbacks available to process. 303 if timeout != 0 and not timeouts_checked: 304 if self._run_timeouts(): 305 events_handled += 1 306 timeouts_checked = True 307 if events_handled: 308 # Minimize latency for loops controlled 309 # by timeout or idle callback events. 310 timeout = 0 311 else: 312 timeout = 0 313 314 try: 315 self._poll(timeout=timeout) 316 except StopIteration: 317 # This can be triggered by EINTR which is caused by signals. 318 pass 319 320 # NOTE: IO event handlers may be re-entrant, in case something 321 # like AbstractPollTask._wait_loop() needs to be called inside 322 # a handler for some reason. 323 while event_queue: 324 events_handled += 1 325 f, event = event_queue.pop() 326 try: 327 x = event_handlers[f] 328 except KeyError: 329 # This is known to be triggered by the epoll 330 # implementation in qemu-user-1.2.2, and appears 331 # to be harmless (see bug #451326). 332 continue 333 if not x.callback(f, event, *x.args): 334 self.source_remove(x.source_id) 335 336 if not timeouts_checked: 337 if self._run_timeouts(): 338 events_handled += 1 339 timeouts_checked = True 340 341 return bool(events_handled)
342
343 - def _get_poll_timeout(self):
344 345 with self._thread_rlock: 346 if self._child_handlers: 347 if self._timeout_interval is None: 348 timeout = self._sigchld_interval 349 else: 350 timeout = min(self._sigchld_interval, 351 self._timeout_interval) 352 else: 353 timeout = self._timeout_interval 354 355 return timeout
356
357 - def child_watch_add(self, pid, callback, data=None):
358 """ 359 Like glib.child_watch_add(), sets callback to be called with the 360 user data specified by data when the child indicated by pid exits. 361 The signature for the callback is: 362 363 def callback(pid, condition, user_data) 364 365 where pid is is the child process id, condition is the status 366 information about the child process and user_data is data. 367 368 @type int 369 @param pid: process id of a child process to watch 370 @type callback: callable 371 @param callback: a function to call 372 @type data: object 373 @param data: the optional data to pass to function 374 @rtype: int 375 @return: an integer ID 376 """ 377 source_id = self._new_source_id() 378 self._child_handlers[source_id] = self._child_callback_class( 379 callback=callback, data=data, pid=pid, source_id=source_id) 380 381 if self._use_signal: 382 if self._sigchld_read is None: 383 self._sigchld_read, self._sigchld_write = os.pipe() 384 385 fcntl.fcntl(self._sigchld_read, fcntl.F_SETFL, 386 fcntl.fcntl(self._sigchld_read, 387 fcntl.F_GETFL) | os.O_NONBLOCK) 388 389 # FD_CLOEXEC is enabled by default in Python >=3.4. 390 if sys.hexversion < 0x3040000: 391 try: 392 fcntl.FD_CLOEXEC 393 except AttributeError: 394 pass 395 else: 396 fcntl.fcntl(self._sigchld_read, fcntl.F_SETFD, 397 fcntl.fcntl(self._sigchld_read, 398 fcntl.F_GETFD) | fcntl.FD_CLOEXEC) 399 400 # The IO watch is dynamically registered and unregistered as 401 # needed, since we don't want to consider it as a valid source 402 # of events when there are no child listeners. It's important 403 # to distinguish when there are no valid sources of IO events, 404 # in order to avoid an endless poll call if there's no timeout. 405 if self._sigchld_src_id is None: 406 self._sigchld_src_id = self.io_add_watch( 407 self._sigchld_read, self.IO_IN, self._sigchld_io_cb) 408 signal.signal(signal.SIGCHLD, self._sigchld_sig_cb) 409 410 # poll now, in case the SIGCHLD has already arrived 411 self._poll_child_processes() 412 return source_id
413
414 - def _sigchld_sig_cb(self, signum, frame):
415 # If this signal handler was not installed by the 416 # current process then the signal doesn't belong to 417 # this EventLoop instance. 418 if os.getpid() == self._pid: 419 os.write(self._sigchld_write, b'\0')
420
421 - def _sigchld_io_cb(self, fd, events):
422 try: 423 while True: 424 os.read(self._sigchld_read, 4096) 425 except OSError: 426 # read until EAGAIN 427 pass 428 self._poll_child_processes() 429 return True
430
431 - def _poll_child_processes(self):
432 if not self._child_handlers: 433 return False 434 435 calls = 0 436 437 for x in list(self._child_handlers.values()): 438 if x.source_id not in self._child_handlers: 439 # it's already been called via re-entrance 440 continue 441 try: 442 wait_retval = os.waitpid(x.pid, os.WNOHANG) 443 except OSError as e: 444 if e.errno != errno.ECHILD: 445 raise 446 del e 447 self.source_remove(x.source_id) 448 else: 449 # With waitpid and WNOHANG, only check the 450 # first element of the tuple since the second 451 # element may vary (bug #337465). 452 if wait_retval[0] != 0: 453 calls += 1 454 self.source_remove(x.source_id) 455 x.callback(x.pid, wait_retval[1], x.data) 456 457 return bool(calls)
458
459 - def idle_add(self, callback, *args):
460 """ 461 Like glib.idle_add(), if callback returns False it is 462 automatically removed from the list of event sources and will 463 not be called again. This method is thread-safe. 464 465 The idle_add method is deprecated. Use the call_soon and 466 call_soon_threadsafe methods instead. 467 468 @type callback: callable 469 @param callback: a function to call 470 @rtype: int 471 @return: an integer ID 472 """ 473 with self._thread_condition: 474 source_id = self._call_soon_id = self._new_source_id() 475 self._idle_callbacks[source_id] = self._idle_callback_class( 476 args=args, callback=callback, source_id=source_id) 477 self._thread_condition.notify() 478 return source_id
479
480 - def _run_idle_callbacks(self):
481 # assumes caller has acquired self._thread_rlock 482 if not self._idle_callbacks: 483 return False 484 state_change = 0 485 # Iterate of our local list, since self._idle_callbacks can be 486 # modified during the exection of these callbacks. 487 for x in list(self._idle_callbacks.values()): 488 if x.source_id not in self._idle_callbacks: 489 # it got cancelled while executing another callback 490 continue 491 if x.calling: 492 # don't call it recursively 493 continue 494 x.calling = True 495 try: 496 if not x.callback(*x.args): 497 state_change += 1 498 self.source_remove(x.source_id) 499 finally: 500 x.calling = False 501 502 return bool(state_change)
503
504 - def timeout_add(self, interval, function, *args):
505 """ 506 Like glib.timeout_add(), interval argument is the number of 507 milliseconds between calls to your function, and your function 508 should return False to stop being called, or True to continue 509 being called. Any additional positional arguments given here 510 are passed to your function when it's called. This method is 511 thread-safe. 512 """ 513 with self._thread_condition: 514 source_id = self._new_source_id() 515 self._timeout_handlers[source_id] = \ 516 self._timeout_handler_class( 517 interval=interval, function=function, args=args, 518 source_id=source_id, timestamp=time.time()) 519 if self._timeout_interval is None or \ 520 self._timeout_interval > interval: 521 self._timeout_interval = interval 522 self._thread_condition.notify() 523 return source_id
524
525 - def _run_timeouts(self):
526 527 calls = 0 528 if not self._use_signal: 529 if self._poll_child_processes(): 530 calls += 1 531 532 with self._thread_rlock: 533 534 if self._run_idle_callbacks(): 535 calls += 1 536 537 if not self._timeout_handlers: 538 return bool(calls) 539 540 ready_timeouts = [] 541 current_time = time.time() 542 for x in self._timeout_handlers.values(): 543 elapsed_seconds = current_time - x.timestamp 544 # elapsed_seconds < 0 means the system clock has been adjusted 545 if elapsed_seconds < 0 or \ 546 (x.interval - 1000 * elapsed_seconds) <= 0: 547 ready_timeouts.append(x) 548 549 # Iterate of our local list, since self._timeout_handlers can be 550 # modified during the exection of these callbacks. 551 for x in ready_timeouts: 552 if x.source_id not in self._timeout_handlers: 553 # it got cancelled while executing another timeout 554 continue 555 if x.calling: 556 # don't call it recursively 557 continue 558 calls += 1 559 x.calling = True 560 try: 561 x.timestamp = time.time() 562 if not x.function(*x.args): 563 self.source_remove(x.source_id) 564 finally: 565 x.calling = False 566 567 return bool(calls)
568
569 - def io_add_watch(self, f, condition, callback, *args):
570 """ 571 Like glib.io_add_watch(), your function should return False to 572 stop being called, or True to continue being called. Any 573 additional positional arguments given here are passed to your 574 function when it's called. 575 576 @type f: int or object with fileno() method 577 @param f: a file descriptor to monitor 578 @type condition: int 579 @param condition: a condition mask 580 @type callback: callable 581 @param callback: a function to call 582 @rtype: int 583 @return: an integer ID of the event source 584 """ 585 if f in self._poll_event_handlers: 586 raise AssertionError("fd %d is already registered" % f) 587 source_id = self._new_source_id() 588 self._poll_event_handler_ids[source_id] = f 589 self._poll_event_handlers[f] = self._io_handler_class( 590 args=args, callback=callback, f=f, source_id=source_id) 591 self._poll_obj.register(f, condition) 592 return source_id
593
594 - def source_remove(self, reg_id):
595 """ 596 Like glib.source_remove(), this returns True if the given reg_id 597 is found and removed, and False if the reg_id is invalid or has 598 already been removed. 599 """ 600 x = self._child_handlers.pop(reg_id, None) 601 if x is not None: 602 if not self._child_handlers and self._use_signal: 603 signal.signal(signal.SIGCHLD, signal.SIG_DFL) 604 self.source_remove(self._sigchld_src_id) 605 self._sigchld_src_id = None 606 return True 607 608 with self._thread_rlock: 609 idle_callback = self._idle_callbacks.pop(reg_id, None) 610 if idle_callback is not None: 611 return True 612 timeout_handler = self._timeout_handlers.pop(reg_id, None) 613 if timeout_handler is not None: 614 if timeout_handler.interval == self._timeout_interval: 615 if self._timeout_handlers: 616 self._timeout_interval = min(x.interval 617 for x in self._timeout_handlers.values()) 618 else: 619 self._timeout_interval = None 620 return True 621 622 f = self._poll_event_handler_ids.pop(reg_id, None) 623 if f is None: 624 return False 625 self._poll_obj.unregister(f) 626 if self._poll_event_queue: 627 # Discard any unhandled events that belong to this file, 628 # in order to prevent these events from being erroneously 629 # delivered to a future handler that is using a reallocated 630 # file descriptor of the same numeric value (causing 631 # extremely confusing bugs). 632 remaining_events = [] 633 discarded_events = False 634 for event in self._poll_event_queue: 635 if event[0] == f: 636 discarded_events = True 637 else: 638 remaining_events.append(event) 639 640 if discarded_events: 641 self._poll_event_queue[:] = remaining_events 642 643 del self._poll_event_handlers[f] 644 return True
645
646 - def run_until_complete(self, future):
647 """ 648 Run until the Future is done. 649 650 @type future: asyncio.Future 651 @param future: a Future to wait for 652 @rtype: object 653 @return: the Future's result 654 @raise: the Future's exception 655 """ 656 while not future.done(): 657 self.iteration() 658 659 return future.result()
660
661 - def call_soon(self, callback, *args):
662 """ 663 Arrange for a callback to be called as soon as possible. The callback 664 is called after call_soon() returns, when control returns to the event 665 loop. 666 667 This operates as a FIFO queue, callbacks are called in the order in 668 which they are registered. Each callback will be called exactly once. 669 670 Any positional arguments after the callback will be passed to the 671 callback when it is called. 672 673 An object compatible with asyncio.Handle is returned, which can 674 be used to cancel the callback. 675 676 @type callback: callable 677 @param callback: a function to call 678 @return: a handle which can be used to cancel the callback 679 @rtype: asyncio.Handle (or compatible) 680 """ 681 return self._handle(self.idle_add( 682 self._call_soon_callback(callback, args)), self)
683 684 # The call_soon method inherits thread safety from the idle_add method. 685 call_soon_threadsafe = call_soon
686 687 688 _can_poll_device = None 689
690 -def can_poll_device():
691 """ 692 Test if it's possible to use poll() on a device such as a pty. This 693 is known to fail on Darwin. 694 @rtype: bool 695 @return: True if poll() on a device succeeds, False otherwise. 696 """ 697 698 global _can_poll_device 699 if _can_poll_device is not None: 700 return _can_poll_device 701 702 if not hasattr(select, "poll"): 703 _can_poll_device = False 704 return _can_poll_device 705 706 try: 707 dev_null = open('/dev/null', 'rb') 708 except IOError: 709 _can_poll_device = False 710 return _can_poll_device 711 712 p = select.poll() 713 try: 714 p.register(dev_null.fileno(), PollConstants.POLLIN) 715 except TypeError: 716 # Jython: Object 'org.python.core.io.FileIO@f8f175' is not watchable 717 _can_poll_device = False 718 return _can_poll_device 719 720 invalid_request = False 721 for f, event in p.poll(): 722 if event & PollConstants.POLLNVAL: 723 invalid_request = True 724 break 725 dev_null.close() 726 727 _can_poll_device = not invalid_request 728 return _can_poll_device
729
730 -def create_poll_instance():
731 """ 732 Create an instance of select.poll, or an instance of 733 PollSelectAdapter there is no poll() implementation or 734 it is broken somehow. 735 """ 736 if can_poll_device(): 737 return select.poll() 738 return PollSelectAdapter()
739
740 -class _epoll_adapter(object):
741 """ 742 Wraps a select.epoll instance in order to make it compatible 743 with select.poll instances. This is necessary since epoll instances 744 interpret timeout arguments differently. Note that the file descriptor 745 that is associated with an epoll instance will close automatically when 746 it is garbage collected, so it's not necessary to close it explicitly. 747 """ 748 __slots__ = ('_epoll_obj',) 749
750 - def __init__(self, epoll_obj):
751 self._epoll_obj = epoll_obj
752
753 - def register(self, fd, *args):
754 self._epoll_obj.register(fd, *args)
755
756 - def unregister(self, fd):
757 self._epoll_obj.unregister(fd)
758
759 - def poll(self, *args):
760 if len(args) > 1: 761 raise TypeError( 762 "poll expected at most 2 arguments, got " + \ 763 repr(1 + len(args))) 764 timeout = -1 765 if args: 766 timeout = args[0] 767 if timeout is None or timeout < 0: 768 timeout = -1 769 elif timeout != 0: 770 timeout = timeout / 1000 771 772 return self._epoll_obj.poll(timeout)
773