Package portage :: Package util :: Package _eventloop :: Module EventLoop
[hide private]

Source Code for Module portage.util._eventloop.EventLoop

  1  # Copyright 1999-2013 Gentoo Foundation 
  2  # Distributed under the terms of the GNU General Public License v2 
  3   
  4  import errno 
  5  import logging 
  6  import os 
  7  import select 
  8  import signal 
  9  import sys 
 10  import time 
 11   
 12  try: 
 13          import fcntl 
 14  except ImportError: 
 15          #  http://bugs.jython.org/issue1074 
 16          fcntl = None 
 17   
 18  try: 
 19          import threading 
 20  except ImportError: 
 21          import dummy_threading as threading 
 22   
 23  from portage.util import writemsg_level 
 24  from ..SlotObject import SlotObject 
 25  from .PollConstants import PollConstants 
 26  from .PollSelectAdapter import PollSelectAdapter 
 27   
28 -class EventLoop(object):
29 """ 30 An event loop, intended to be compatible with the GLib event loop. 31 Call the iteration method in order to execute one iteration of the 32 loop. The idle_add and timeout_add methods serve as thread-safe 33 means to interact with the loop's thread. 34 """ 35 36 supports_multiprocessing = True 37 38 # TODO: Find out why SIGCHLD signals aren't delivered during poll 39 # calls, forcing us to wakeup in order to receive them. 40 _sigchld_interval = 250 41
42 - class _child_callback_class(SlotObject):
43 __slots__ = ("callback", "data", "pid", "source_id")
44
45 - class _idle_callback_class(SlotObject):
46 __slots__ = ("args", "callback", "calling", "source_id")
47
48 - class _io_handler_class(SlotObject):
49 __slots__ = ("args", "callback", "f", "source_id")
50
51 - class _timeout_handler_class(SlotObject):
52 __slots__ = ("args", "function", "calling", "interval", "source_id", 53 "timestamp")
54
55 - def __init__(self, main=True):
56 """ 57 @param main: If True then this is a singleton instance for use 58 in the main thread, otherwise it is a local instance which 59 can safely be use in a non-main thread (default is True, so 60 that global_event_loop does not need constructor arguments) 61 @type main: bool 62 """ 63 self._use_signal = main and fcntl is not None 64 self._thread_rlock = threading.RLock() 65 self._thread_condition = threading.Condition(self._thread_rlock) 66 self._poll_event_queue = [] 67 self._poll_event_handlers = {} 68 self._poll_event_handler_ids = {} 69 # Increment id for each new handler. 70 self._event_handler_id = 0 71 self._idle_callbacks = {} 72 self._timeout_handlers = {} 73 self._timeout_interval = None 74 75 self._poll_obj = None 76 try: 77 select.epoll 78 except AttributeError: 79 pass 80 else: 81 try: 82 epoll_obj = select.epoll() 83 except IOError: 84 # This happens with Linux 2.4 kernels: 85 # IOError: [Errno 38] Function not implemented 86 pass 87 else: 88 89 # FD_CLOEXEC is enabled by default in Python >=3.4. 90 if sys.hexversion < 0x3040000 and fcntl is not None: 91 try: 92 fcntl.FD_CLOEXEC 93 except AttributeError: 94 pass 95 else: 96 fcntl.fcntl(epoll_obj.fileno(), fcntl.F_SETFD, 97 fcntl.fcntl(epoll_obj.fileno(), 98 fcntl.F_GETFD) | fcntl.FD_CLOEXEC) 99 100 self._poll_obj = _epoll_adapter(epoll_obj) 101 self.IO_ERR = select.EPOLLERR 102 self.IO_HUP = select.EPOLLHUP 103 self.IO_IN = select.EPOLLIN 104 self.IO_NVAL = 0 105 self.IO_OUT = select.EPOLLOUT 106 self.IO_PRI = select.EPOLLPRI 107 108 if self._poll_obj is None: 109 self._poll_obj = create_poll_instance() 110 self.IO_ERR = PollConstants.POLLERR 111 self.IO_HUP = PollConstants.POLLHUP 112 self.IO_IN = PollConstants.POLLIN 113 self.IO_NVAL = PollConstants.POLLNVAL 114 self.IO_OUT = PollConstants.POLLOUT 115 self.IO_PRI = PollConstants.POLLPRI 116 117 self._child_handlers = {} 118 self._sigchld_read = None 119 self._sigchld_write = None 120 self._sigchld_src_id = None 121 self._pid = os.getpid()
122
123 - def _new_source_id(self):
124 """ 125 Generate a new source id. This method is thread-safe. 126 """ 127 with self._thread_rlock: 128 self._event_handler_id += 1 129 return self._event_handler_id
130
131 - def _poll(self, timeout=None):
132 """ 133 All poll() calls pass through here. The poll events 134 are added directly to self._poll_event_queue. 135 In order to avoid endless blocking, this raises 136 StopIteration if timeout is None and there are 137 no file descriptors to poll. 138 """ 139 140 if timeout is None and \ 141 not self._poll_event_handlers: 142 raise StopIteration( 143 "timeout is None and there are no poll() event handlers") 144 145 while True: 146 try: 147 self._poll_event_queue.extend(self._poll_obj.poll(timeout)) 148 break 149 except (IOError, select.error) as e: 150 # Silently handle EINTR, which is normal when we have 151 # received a signal such as SIGINT (epoll objects may 152 # raise IOError rather than select.error, at least in 153 # Python 3.2). 154 if not (e.args and e.args[0] == errno.EINTR): 155 writemsg_level("\n!!! select error: %s\n" % (e,), 156 level=logging.ERROR, noiselevel=-1) 157 del e 158 159 # This typically means that we've received a SIGINT, so 160 # raise StopIteration in order to break out of our current 161 # iteration and respond appropriately to the signal as soon 162 # as possible. 163 raise StopIteration("interrupted")
164
165 - def iteration(self, *args):
166 """ 167 Like glib.MainContext.iteration(), runs a single iteration. In order 168 to avoid blocking forever when may_block is True (the default), 169 callers must be careful to ensure that at least one of the following 170 conditions is met: 171 1) An event source or timeout is registered which is guaranteed 172 to trigger at least on event (a call to an idle function 173 only counts as an event if it returns a False value which 174 causes it to stop being called) 175 2) Another thread is guaranteed to call one of the thread-safe 176 methods which notify iteration to stop waiting (such as 177 idle_add or timeout_add). 178 These rules ensure that iteration is able to block until an event 179 arrives, without doing any busy waiting that would waste CPU time. 180 @type may_block: bool 181 @param may_block: if True the call may block waiting for an event 182 (default is True). 183 @rtype: bool 184 @return: True if events were dispatched. 185 """ 186 187 may_block = True 188 189 if args: 190 if len(args) > 1: 191 raise TypeError( 192 "expected at most 1 argument (%s given)" % len(args)) 193 may_block = args[0] 194 195 event_queue = self._poll_event_queue 196 event_handlers = self._poll_event_handlers 197 events_handled = 0 198 timeouts_checked = False 199 200 if not event_handlers: 201 with self._thread_condition: 202 if self._run_timeouts(): 203 events_handled += 1 204 timeouts_checked = True 205 if not event_handlers and not events_handled and may_block: 206 # Block so that we don't waste cpu time by looping too 207 # quickly. This makes EventLoop useful for code that needs 208 # to wait for timeout callbacks regardless of whether or 209 # not any IO handlers are currently registered. 210 timeout = self._get_poll_timeout() 211 if timeout is None: 212 wait_timeout = None 213 else: 214 wait_timeout = float(timeout) / 1000 215 # NOTE: In order to avoid a possible infinite wait when 216 # wait_timeout is None, the previous _run_timeouts() 217 # call must have returned False *with* _thread_condition 218 # acquired. Otherwise, we would risk going to sleep after 219 # our only notify event has already passed. 220 self._thread_condition.wait(wait_timeout) 221 if self._run_timeouts(): 222 events_handled += 1 223 timeouts_checked = True 224 225 # If any timeouts have executed, then return immediately, 226 # in order to minimize latency in termination of iteration 227 # loops that they may control. 228 if events_handled or not event_handlers: 229 return bool(events_handled) 230 231 if not event_queue: 232 233 if may_block: 234 timeout = self._get_poll_timeout() 235 236 # Avoid blocking for IO if there are any timeout 237 # or idle callbacks available to process. 238 if timeout != 0 and not timeouts_checked: 239 if self._run_timeouts(): 240 events_handled += 1 241 timeouts_checked = True 242 if events_handled: 243 # Minimize latency for loops controlled 244 # by timeout or idle callback events. 245 timeout = 0 246 else: 247 timeout = 0 248 249 try: 250 self._poll(timeout=timeout) 251 except StopIteration: 252 # This can be triggered by EINTR which is caused by signals. 253 pass 254 255 # NOTE: IO event handlers may be re-entrant, in case something 256 # like AbstractPollTask._wait_loop() needs to be called inside 257 # a handler for some reason. 258 while event_queue: 259 events_handled += 1 260 f, event = event_queue.pop() 261 try: 262 x = event_handlers[f] 263 except KeyError: 264 # This is known to be triggered by the epoll 265 # implementation in qemu-user-1.2.2, and appears 266 # to be harmless (see bug #451326). 267 continue 268 if not x.callback(f, event, *x.args): 269 self.source_remove(x.source_id) 270 271 if not timeouts_checked: 272 if self._run_timeouts(): 273 events_handled += 1 274 timeouts_checked = True 275 276 return bool(events_handled)
277
278 - def _get_poll_timeout(self):
279 280 with self._thread_rlock: 281 if self._child_handlers: 282 if self._timeout_interval is None: 283 timeout = self._sigchld_interval 284 else: 285 timeout = min(self._sigchld_interval, 286 self._timeout_interval) 287 else: 288 timeout = self._timeout_interval 289 290 return timeout
291
292 - def child_watch_add(self, pid, callback, data=None):
293 """ 294 Like glib.child_watch_add(), sets callback to be called with the 295 user data specified by data when the child indicated by pid exits. 296 The signature for the callback is: 297 298 def callback(pid, condition, user_data) 299 300 where pid is is the child process id, condition is the status 301 information about the child process and user_data is data. 302 303 @type int 304 @param pid: process id of a child process to watch 305 @type callback: callable 306 @param callback: a function to call 307 @type data: object 308 @param data: the optional data to pass to function 309 @rtype: int 310 @return: an integer ID 311 """ 312 source_id = self._new_source_id() 313 self._child_handlers[source_id] = self._child_callback_class( 314 callback=callback, data=data, pid=pid, source_id=source_id) 315 316 if self._use_signal: 317 if self._sigchld_read is None: 318 self._sigchld_read, self._sigchld_write = os.pipe() 319 320 fcntl.fcntl(self._sigchld_read, fcntl.F_SETFL, 321 fcntl.fcntl(self._sigchld_read, 322 fcntl.F_GETFL) | os.O_NONBLOCK) 323 324 # FD_CLOEXEC is enabled by default in Python >=3.4. 325 if sys.hexversion < 0x3040000: 326 try: 327 fcntl.FD_CLOEXEC 328 except AttributeError: 329 pass 330 else: 331 fcntl.fcntl(self._sigchld_read, fcntl.F_SETFD, 332 fcntl.fcntl(self._sigchld_read, 333 fcntl.F_GETFD) | fcntl.FD_CLOEXEC) 334 335 # The IO watch is dynamically registered and unregistered as 336 # needed, since we don't want to consider it as a valid source 337 # of events when there are no child listeners. It's important 338 # to distinguish when there are no valid sources of IO events, 339 # in order to avoid an endless poll call if there's no timeout. 340 if self._sigchld_src_id is None: 341 self._sigchld_src_id = self.io_add_watch( 342 self._sigchld_read, self.IO_IN, self._sigchld_io_cb) 343 signal.signal(signal.SIGCHLD, self._sigchld_sig_cb) 344 345 # poll now, in case the SIGCHLD has already arrived 346 self._poll_child_processes() 347 return source_id
348
349 - def _sigchld_sig_cb(self, signum, frame):
350 # If this signal handler was not installed by the 351 # current process then the signal doesn't belong to 352 # this EventLoop instance. 353 if os.getpid() == self._pid: 354 os.write(self._sigchld_write, b'\0')
355
356 - def _sigchld_io_cb(self, fd, events):
357 try: 358 while True: 359 os.read(self._sigchld_read, 4096) 360 except OSError: 361 # read until EAGAIN 362 pass 363 self._poll_child_processes() 364 return True
365
366 - def _poll_child_processes(self):
367 if not self._child_handlers: 368 return False 369 370 calls = 0 371 372 for x in list(self._child_handlers.values()): 373 if x.source_id not in self._child_handlers: 374 # it's already been called via re-entrance 375 continue 376 try: 377 wait_retval = os.waitpid(x.pid, os.WNOHANG) 378 except OSError as e: 379 if e.errno != errno.ECHILD: 380 raise 381 del e 382 self.source_remove(x.source_id) 383 else: 384 # With waitpid and WNOHANG, only check the 385 # first element of the tuple since the second 386 # element may vary (bug #337465). 387 if wait_retval[0] != 0: 388 calls += 1 389 self.source_remove(x.source_id) 390 x.callback(x.pid, wait_retval[1], x.data) 391 392 return bool(calls)
393
394 - def idle_add(self, callback, *args):
395 """ 396 Like glib.idle_add(), if callback returns False it is 397 automatically removed from the list of event sources and will 398 not be called again. This method is thread-safe. 399 400 @type callback: callable 401 @param callback: a function to call 402 @rtype: int 403 @return: an integer ID 404 """ 405 with self._thread_condition: 406 source_id = self._new_source_id() 407 self._idle_callbacks[source_id] = self._idle_callback_class( 408 args=args, callback=callback, source_id=source_id) 409 self._thread_condition.notify() 410 return source_id
411
412 - def _run_idle_callbacks(self):
413 # assumes caller has acquired self._thread_rlock 414 if not self._idle_callbacks: 415 return False 416 state_change = 0 417 # Iterate of our local list, since self._idle_callbacks can be 418 # modified during the exection of these callbacks. 419 for x in list(self._idle_callbacks.values()): 420 if x.source_id not in self._idle_callbacks: 421 # it got cancelled while executing another callback 422 continue 423 if x.calling: 424 # don't call it recursively 425 continue 426 x.calling = True 427 try: 428 if not x.callback(*x.args): 429 state_change += 1 430 self.source_remove(x.source_id) 431 finally: 432 x.calling = False 433 434 return bool(state_change)
435
436 - def timeout_add(self, interval, function, *args):
437 """ 438 Like glib.timeout_add(), interval argument is the number of 439 milliseconds between calls to your function, and your function 440 should return False to stop being called, or True to continue 441 being called. Any additional positional arguments given here 442 are passed to your function when it's called. This method is 443 thread-safe. 444 """ 445 with self._thread_condition: 446 source_id = self._new_source_id() 447 self._timeout_handlers[source_id] = \ 448 self._timeout_handler_class( 449 interval=interval, function=function, args=args, 450 source_id=source_id, timestamp=time.time()) 451 if self._timeout_interval is None or \ 452 self._timeout_interval > interval: 453 self._timeout_interval = interval 454 self._thread_condition.notify() 455 return source_id
456
457 - def _run_timeouts(self):
458 459 calls = 0 460 if not self._use_signal: 461 if self._poll_child_processes(): 462 calls += 1 463 464 with self._thread_rlock: 465 466 if self._run_idle_callbacks(): 467 calls += 1 468 469 if not self._timeout_handlers: 470 return bool(calls) 471 472 ready_timeouts = [] 473 current_time = time.time() 474 for x in self._timeout_handlers.values(): 475 elapsed_seconds = current_time - x.timestamp 476 # elapsed_seconds < 0 means the system clock has been adjusted 477 if elapsed_seconds < 0 or \ 478 (x.interval - 1000 * elapsed_seconds) <= 0: 479 ready_timeouts.append(x) 480 481 # Iterate of our local list, since self._timeout_handlers can be 482 # modified during the exection of these callbacks. 483 for x in ready_timeouts: 484 if x.source_id not in self._timeout_handlers: 485 # it got cancelled while executing another timeout 486 continue 487 if x.calling: 488 # don't call it recursively 489 continue 490 calls += 1 491 x.calling = True 492 try: 493 x.timestamp = time.time() 494 if not x.function(*x.args): 495 self.source_remove(x.source_id) 496 finally: 497 x.calling = False 498 499 return bool(calls)
500
501 - def io_add_watch(self, f, condition, callback, *args):
502 """ 503 Like glib.io_add_watch(), your function should return False to 504 stop being called, or True to continue being called. Any 505 additional positional arguments given here are passed to your 506 function when it's called. 507 508 @type f: int or object with fileno() method 509 @param f: a file descriptor to monitor 510 @type condition: int 511 @param condition: a condition mask 512 @type callback: callable 513 @param callback: a function to call 514 @rtype: int 515 @return: an integer ID of the event source 516 """ 517 if f in self._poll_event_handlers: 518 raise AssertionError("fd %d is already registered" % f) 519 source_id = self._new_source_id() 520 self._poll_event_handler_ids[source_id] = f 521 self._poll_event_handlers[f] = self._io_handler_class( 522 args=args, callback=callback, f=f, source_id=source_id) 523 self._poll_obj.register(f, condition) 524 return source_id
525
526 - def source_remove(self, reg_id):
527 """ 528 Like glib.source_remove(), this returns True if the given reg_id 529 is found and removed, and False if the reg_id is invalid or has 530 already been removed. 531 """ 532 x = self._child_handlers.pop(reg_id, None) 533 if x is not None: 534 if not self._child_handlers and self._use_signal: 535 signal.signal(signal.SIGCHLD, signal.SIG_DFL) 536 self.source_remove(self._sigchld_src_id) 537 self._sigchld_src_id = None 538 return True 539 540 with self._thread_rlock: 541 idle_callback = self._idle_callbacks.pop(reg_id, None) 542 if idle_callback is not None: 543 return True 544 timeout_handler = self._timeout_handlers.pop(reg_id, None) 545 if timeout_handler is not None: 546 if timeout_handler.interval == self._timeout_interval: 547 if self._timeout_handlers: 548 self._timeout_interval = min(x.interval 549 for x in self._timeout_handlers.values()) 550 else: 551 self._timeout_interval = None 552 return True 553 554 f = self._poll_event_handler_ids.pop(reg_id, None) 555 if f is None: 556 return False 557 self._poll_obj.unregister(f) 558 if self._poll_event_queue: 559 # Discard any unhandled events that belong to this file, 560 # in order to prevent these events from being erroneously 561 # delivered to a future handler that is using a reallocated 562 # file descriptor of the same numeric value (causing 563 # extremely confusing bugs). 564 remaining_events = [] 565 discarded_events = False 566 for event in self._poll_event_queue: 567 if event[0] == f: 568 discarded_events = True 569 else: 570 remaining_events.append(event) 571 572 if discarded_events: 573 self._poll_event_queue[:] = remaining_events 574 575 del self._poll_event_handlers[f] 576 return True
577 578 _can_poll_device = None 579
580 -def can_poll_device():
581 """ 582 Test if it's possible to use poll() on a device such as a pty. This 583 is known to fail on Darwin. 584 @rtype: bool 585 @return: True if poll() on a device succeeds, False otherwise. 586 """ 587 588 global _can_poll_device 589 if _can_poll_device is not None: 590 return _can_poll_device 591 592 if not hasattr(select, "poll"): 593 _can_poll_device = False 594 return _can_poll_device 595 596 try: 597 dev_null = open('/dev/null', 'rb') 598 except IOError: 599 _can_poll_device = False 600 return _can_poll_device 601 602 p = select.poll() 603 try: 604 p.register(dev_null.fileno(), PollConstants.POLLIN) 605 except TypeError: 606 # Jython: Object 'org.python.core.io.FileIO@f8f175' is not watchable 607 _can_poll_device = False 608 return _can_poll_device 609 610 invalid_request = False 611 for f, event in p.poll(): 612 if event & PollConstants.POLLNVAL: 613 invalid_request = True 614 break 615 dev_null.close() 616 617 _can_poll_device = not invalid_request 618 return _can_poll_device
619
620 -def create_poll_instance():
621 """ 622 Create an instance of select.poll, or an instance of 623 PollSelectAdapter there is no poll() implementation or 624 it is broken somehow. 625 """ 626 if can_poll_device(): 627 return select.poll() 628 return PollSelectAdapter()
629
630 -class _epoll_adapter(object):
631 """ 632 Wraps a select.epoll instance in order to make it compatible 633 with select.poll instances. This is necessary since epoll instances 634 interpret timeout arguments differently. Note that the file descriptor 635 that is associated with an epoll instance will close automatically when 636 it is garbage collected, so it's not necessary to close it explicitly. 637 """ 638 __slots__ = ('_epoll_obj',) 639
640 - def __init__(self, epoll_obj):
641 self._epoll_obj = epoll_obj
642
643 - def register(self, fd, *args):
644 self._epoll_obj.register(fd, *args)
645
646 - def unregister(self, fd):
647 self._epoll_obj.unregister(fd)
648
649 - def poll(self, *args):
650 if len(args) > 1: 651 raise TypeError( 652 "poll expected at most 2 arguments, got " + \ 653 repr(1 + len(args))) 654 timeout = -1 655 if args: 656 timeout = args[0] 657 if timeout is None or timeout < 0: 658 timeout = -1 659 elif timeout != 0: 660 timeout = float(timeout) / 1000 661 662 return self._epoll_obj.poll(timeout)
663