Package portage :: Package util :: Package _eventloop :: Module EventLoop
[hide private]

Source Code for Module portage.util._eventloop.EventLoop

  1  # Copyright 1999-2014 Gentoo Foundation 
  2  # Distributed under the terms of the GNU General Public License v2 
  3   
  4  from __future__ import division 
  5   
  6  import errno 
  7  import logging 
  8  import os 
  9  import select 
 10  import signal 
 11  import sys 
 12  import time 
 13   
 14  try: 
 15          import fcntl 
 16  except ImportError: 
 17          #  http://bugs.jython.org/issue1074 
 18          fcntl = None 
 19   
 20  try: 
 21          import threading 
 22  except ImportError: 
 23          import dummy_threading as threading 
 24   
 25  from portage.util import writemsg_level 
 26  from ..SlotObject import SlotObject 
 27  from .PollConstants import PollConstants 
 28  from .PollSelectAdapter import PollSelectAdapter 
 29   
30 -class EventLoop(object):
31 """ 32 An event loop, intended to be compatible with the GLib event loop. 33 Call the iteration method in order to execute one iteration of the 34 loop. The idle_add and timeout_add methods serve as thread-safe 35 means to interact with the loop's thread. 36 """ 37 38 supports_multiprocessing = True 39 40 # TODO: Find out why SIGCHLD signals aren't delivered during poll 41 # calls, forcing us to wakeup in order to receive them. 42 _sigchld_interval = 250 43
44 - class _child_callback_class(SlotObject):
45 __slots__ = ("callback", "data", "pid", "source_id")
46
47 - class _idle_callback_class(SlotObject):
48 __slots__ = ("args", "callback", "calling", "source_id")
49
50 - class _io_handler_class(SlotObject):
51 __slots__ = ("args", "callback", "f", "source_id")
52
53 - class _timeout_handler_class(SlotObject):
54 __slots__ = ("args", "function", "calling", "interval", "source_id", 55 "timestamp")
56
57 - def __init__(self, main=True):
58 """ 59 @param main: If True then this is a singleton instance for use 60 in the main thread, otherwise it is a local instance which 61 can safely be use in a non-main thread (default is True, so 62 that global_event_loop does not need constructor arguments) 63 @type main: bool 64 """ 65 self._use_signal = main and fcntl is not None 66 self._thread_rlock = threading.RLock() 67 self._thread_condition = threading.Condition(self._thread_rlock) 68 self._poll_event_queue = [] 69 self._poll_event_handlers = {} 70 self._poll_event_handler_ids = {} 71 # Increment id for each new handler. 72 self._event_handler_id = 0 73 self._idle_callbacks = {} 74 self._timeout_handlers = {} 75 self._timeout_interval = None 76 77 self._poll_obj = None 78 try: 79 select.epoll 80 except AttributeError: 81 pass 82 else: 83 try: 84 epoll_obj = select.epoll() 85 except IOError: 86 # This happens with Linux 2.4 kernels: 87 # IOError: [Errno 38] Function not implemented 88 pass 89 else: 90 91 # FD_CLOEXEC is enabled by default in Python >=3.4. 92 if sys.hexversion < 0x3040000 and fcntl is not None: 93 try: 94 fcntl.FD_CLOEXEC 95 except AttributeError: 96 pass 97 else: 98 fcntl.fcntl(epoll_obj.fileno(), fcntl.F_SETFD, 99 fcntl.fcntl(epoll_obj.fileno(), 100 fcntl.F_GETFD) | fcntl.FD_CLOEXEC) 101 102 self._poll_obj = _epoll_adapter(epoll_obj) 103 self.IO_ERR = select.EPOLLERR 104 self.IO_HUP = select.EPOLLHUP 105 self.IO_IN = select.EPOLLIN 106 self.IO_NVAL = 0 107 self.IO_OUT = select.EPOLLOUT 108 self.IO_PRI = select.EPOLLPRI 109 110 if self._poll_obj is None: 111 self._poll_obj = create_poll_instance() 112 self.IO_ERR = PollConstants.POLLERR 113 self.IO_HUP = PollConstants.POLLHUP 114 self.IO_IN = PollConstants.POLLIN 115 self.IO_NVAL = PollConstants.POLLNVAL 116 self.IO_OUT = PollConstants.POLLOUT 117 self.IO_PRI = PollConstants.POLLPRI 118 119 self._child_handlers = {} 120 self._sigchld_read = None 121 self._sigchld_write = None 122 self._sigchld_src_id = None 123 self._pid = os.getpid()
124
125 - def _new_source_id(self):
126 """ 127 Generate a new source id. This method is thread-safe. 128 """ 129 with self._thread_rlock: 130 self._event_handler_id += 1 131 return self._event_handler_id
132
133 - def _poll(self, timeout=None):
134 """ 135 All poll() calls pass through here. The poll events 136 are added directly to self._poll_event_queue. 137 In order to avoid endless blocking, this raises 138 StopIteration if timeout is None and there are 139 no file descriptors to poll. 140 """ 141 142 if timeout is None and \ 143 not self._poll_event_handlers: 144 raise StopIteration( 145 "timeout is None and there are no poll() event handlers") 146 147 while True: 148 try: 149 self._poll_event_queue.extend(self._poll_obj.poll(timeout)) 150 break 151 except (IOError, select.error) as e: 152 # Silently handle EINTR, which is normal when we have 153 # received a signal such as SIGINT (epoll objects may 154 # raise IOError rather than select.error, at least in 155 # Python 3.2). 156 if not (e.args and e.args[0] == errno.EINTR): 157 writemsg_level("\n!!! select error: %s\n" % (e,), 158 level=logging.ERROR, noiselevel=-1) 159 del e 160 161 # This typically means that we've received a SIGINT, so 162 # raise StopIteration in order to break out of our current 163 # iteration and respond appropriately to the signal as soon 164 # as possible. 165 raise StopIteration("interrupted")
166
167 - def iteration(self, *args):
168 """ 169 Like glib.MainContext.iteration(), runs a single iteration. In order 170 to avoid blocking forever when may_block is True (the default), 171 callers must be careful to ensure that at least one of the following 172 conditions is met: 173 1) An event source or timeout is registered which is guaranteed 174 to trigger at least on event (a call to an idle function 175 only counts as an event if it returns a False value which 176 causes it to stop being called) 177 2) Another thread is guaranteed to call one of the thread-safe 178 methods which notify iteration to stop waiting (such as 179 idle_add or timeout_add). 180 These rules ensure that iteration is able to block until an event 181 arrives, without doing any busy waiting that would waste CPU time. 182 @type may_block: bool 183 @param may_block: if True the call may block waiting for an event 184 (default is True). 185 @rtype: bool 186 @return: True if events were dispatched. 187 """ 188 189 may_block = True 190 191 if args: 192 if len(args) > 1: 193 raise TypeError( 194 "expected at most 1 argument (%s given)" % len(args)) 195 may_block = args[0] 196 197 event_queue = self._poll_event_queue 198 event_handlers = self._poll_event_handlers 199 events_handled = 0 200 timeouts_checked = False 201 202 if not event_handlers: 203 with self._thread_condition: 204 if self._run_timeouts(): 205 events_handled += 1 206 timeouts_checked = True 207 if not event_handlers and not events_handled and may_block: 208 # Block so that we don't waste cpu time by looping too 209 # quickly. This makes EventLoop useful for code that needs 210 # to wait for timeout callbacks regardless of whether or 211 # not any IO handlers are currently registered. 212 timeout = self._get_poll_timeout() 213 if timeout is None: 214 wait_timeout = None 215 else: 216 wait_timeout = timeout / 1000 217 # NOTE: In order to avoid a possible infinite wait when 218 # wait_timeout is None, the previous _run_timeouts() 219 # call must have returned False *with* _thread_condition 220 # acquired. Otherwise, we would risk going to sleep after 221 # our only notify event has already passed. 222 self._thread_condition.wait(wait_timeout) 223 if self._run_timeouts(): 224 events_handled += 1 225 timeouts_checked = True 226 227 # If any timeouts have executed, then return immediately, 228 # in order to minimize latency in termination of iteration 229 # loops that they may control. 230 if events_handled or not event_handlers: 231 return bool(events_handled) 232 233 if not event_queue: 234 235 if may_block: 236 timeout = self._get_poll_timeout() 237 238 # Avoid blocking for IO if there are any timeout 239 # or idle callbacks available to process. 240 if timeout != 0 and not timeouts_checked: 241 if self._run_timeouts(): 242 events_handled += 1 243 timeouts_checked = True 244 if events_handled: 245 # Minimize latency for loops controlled 246 # by timeout or idle callback events. 247 timeout = 0 248 else: 249 timeout = 0 250 251 try: 252 self._poll(timeout=timeout) 253 except StopIteration: 254 # This can be triggered by EINTR which is caused by signals. 255 pass 256 257 # NOTE: IO event handlers may be re-entrant, in case something 258 # like AbstractPollTask._wait_loop() needs to be called inside 259 # a handler for some reason. 260 while event_queue: 261 events_handled += 1 262 f, event = event_queue.pop() 263 try: 264 x = event_handlers[f] 265 except KeyError: 266 # This is known to be triggered by the epoll 267 # implementation in qemu-user-1.2.2, and appears 268 # to be harmless (see bug #451326). 269 continue 270 if not x.callback(f, event, *x.args): 271 self.source_remove(x.source_id) 272 273 if not timeouts_checked: 274 if self._run_timeouts(): 275 events_handled += 1 276 timeouts_checked = True 277 278 return bool(events_handled)
279
280 - def _get_poll_timeout(self):
281 282 with self._thread_rlock: 283 if self._child_handlers: 284 if self._timeout_interval is None: 285 timeout = self._sigchld_interval 286 else: 287 timeout = min(self._sigchld_interval, 288 self._timeout_interval) 289 else: 290 timeout = self._timeout_interval 291 292 return timeout
293
294 - def child_watch_add(self, pid, callback, data=None):
295 """ 296 Like glib.child_watch_add(), sets callback to be called with the 297 user data specified by data when the child indicated by pid exits. 298 The signature for the callback is: 299 300 def callback(pid, condition, user_data) 301 302 where pid is is the child process id, condition is the status 303 information about the child process and user_data is data. 304 305 @type int 306 @param pid: process id of a child process to watch 307 @type callback: callable 308 @param callback: a function to call 309 @type data: object 310 @param data: the optional data to pass to function 311 @rtype: int 312 @return: an integer ID 313 """ 314 source_id = self._new_source_id() 315 self._child_handlers[source_id] = self._child_callback_class( 316 callback=callback, data=data, pid=pid, source_id=source_id) 317 318 if self._use_signal: 319 if self._sigchld_read is None: 320 self._sigchld_read, self._sigchld_write = os.pipe() 321 322 fcntl.fcntl(self._sigchld_read, fcntl.F_SETFL, 323 fcntl.fcntl(self._sigchld_read, 324 fcntl.F_GETFL) | os.O_NONBLOCK) 325 326 # FD_CLOEXEC is enabled by default in Python >=3.4. 327 if sys.hexversion < 0x3040000: 328 try: 329 fcntl.FD_CLOEXEC 330 except AttributeError: 331 pass 332 else: 333 fcntl.fcntl(self._sigchld_read, fcntl.F_SETFD, 334 fcntl.fcntl(self._sigchld_read, 335 fcntl.F_GETFD) | fcntl.FD_CLOEXEC) 336 337 # The IO watch is dynamically registered and unregistered as 338 # needed, since we don't want to consider it as a valid source 339 # of events when there are no child listeners. It's important 340 # to distinguish when there are no valid sources of IO events, 341 # in order to avoid an endless poll call if there's no timeout. 342 if self._sigchld_src_id is None: 343 self._sigchld_src_id = self.io_add_watch( 344 self._sigchld_read, self.IO_IN, self._sigchld_io_cb) 345 signal.signal(signal.SIGCHLD, self._sigchld_sig_cb) 346 347 # poll now, in case the SIGCHLD has already arrived 348 self._poll_child_processes() 349 return source_id
350
351 - def _sigchld_sig_cb(self, signum, frame):
352 # If this signal handler was not installed by the 353 # current process then the signal doesn't belong to 354 # this EventLoop instance. 355 if os.getpid() == self._pid: 356 os.write(self._sigchld_write, b'\0')
357
358 - def _sigchld_io_cb(self, fd, events):
359 try: 360 while True: 361 os.read(self._sigchld_read, 4096) 362 except OSError: 363 # read until EAGAIN 364 pass 365 self._poll_child_processes() 366 return True
367
368 - def _poll_child_processes(self):
369 if not self._child_handlers: 370 return False 371 372 calls = 0 373 374 for x in list(self._child_handlers.values()): 375 if x.source_id not in self._child_handlers: 376 # it's already been called via re-entrance 377 continue 378 try: 379 wait_retval = os.waitpid(x.pid, os.WNOHANG) 380 except OSError as e: 381 if e.errno != errno.ECHILD: 382 raise 383 del e 384 self.source_remove(x.source_id) 385 else: 386 # With waitpid and WNOHANG, only check the 387 # first element of the tuple since the second 388 # element may vary (bug #337465). 389 if wait_retval[0] != 0: 390 calls += 1 391 self.source_remove(x.source_id) 392 x.callback(x.pid, wait_retval[1], x.data) 393 394 return bool(calls)
395
396 - def idle_add(self, callback, *args):
397 """ 398 Like glib.idle_add(), if callback returns False it is 399 automatically removed from the list of event sources and will 400 not be called again. This method is thread-safe. 401 402 @type callback: callable 403 @param callback: a function to call 404 @rtype: int 405 @return: an integer ID 406 """ 407 with self._thread_condition: 408 source_id = self._new_source_id() 409 self._idle_callbacks[source_id] = self._idle_callback_class( 410 args=args, callback=callback, source_id=source_id) 411 self._thread_condition.notify() 412 return source_id
413
414 - def _run_idle_callbacks(self):
415 # assumes caller has acquired self._thread_rlock 416 if not self._idle_callbacks: 417 return False 418 state_change = 0 419 # Iterate of our local list, since self._idle_callbacks can be 420 # modified during the exection of these callbacks. 421 for x in list(self._idle_callbacks.values()): 422 if x.source_id not in self._idle_callbacks: 423 # it got cancelled while executing another callback 424 continue 425 if x.calling: 426 # don't call it recursively 427 continue 428 x.calling = True 429 try: 430 if not x.callback(*x.args): 431 state_change += 1 432 self.source_remove(x.source_id) 433 finally: 434 x.calling = False 435 436 return bool(state_change)
437
438 - def timeout_add(self, interval, function, *args):
439 """ 440 Like glib.timeout_add(), interval argument is the number of 441 milliseconds between calls to your function, and your function 442 should return False to stop being called, or True to continue 443 being called. Any additional positional arguments given here 444 are passed to your function when it's called. This method is 445 thread-safe. 446 """ 447 with self._thread_condition: 448 source_id = self._new_source_id() 449 self._timeout_handlers[source_id] = \ 450 self._timeout_handler_class( 451 interval=interval, function=function, args=args, 452 source_id=source_id, timestamp=time.time()) 453 if self._timeout_interval is None or \ 454 self._timeout_interval > interval: 455 self._timeout_interval = interval 456 self._thread_condition.notify() 457 return source_id
458
459 - def _run_timeouts(self):
460 461 calls = 0 462 if not self._use_signal: 463 if self._poll_child_processes(): 464 calls += 1 465 466 with self._thread_rlock: 467 468 if self._run_idle_callbacks(): 469 calls += 1 470 471 if not self._timeout_handlers: 472 return bool(calls) 473 474 ready_timeouts = [] 475 current_time = time.time() 476 for x in self._timeout_handlers.values(): 477 elapsed_seconds = current_time - x.timestamp 478 # elapsed_seconds < 0 means the system clock has been adjusted 479 if elapsed_seconds < 0 or \ 480 (x.interval - 1000 * elapsed_seconds) <= 0: 481 ready_timeouts.append(x) 482 483 # Iterate of our local list, since self._timeout_handlers can be 484 # modified during the exection of these callbacks. 485 for x in ready_timeouts: 486 if x.source_id not in self._timeout_handlers: 487 # it got cancelled while executing another timeout 488 continue 489 if x.calling: 490 # don't call it recursively 491 continue 492 calls += 1 493 x.calling = True 494 try: 495 x.timestamp = time.time() 496 if not x.function(*x.args): 497 self.source_remove(x.source_id) 498 finally: 499 x.calling = False 500 501 return bool(calls)
502
503 - def io_add_watch(self, f, condition, callback, *args):
504 """ 505 Like glib.io_add_watch(), your function should return False to 506 stop being called, or True to continue being called. Any 507 additional positional arguments given here are passed to your 508 function when it's called. 509 510 @type f: int or object with fileno() method 511 @param f: a file descriptor to monitor 512 @type condition: int 513 @param condition: a condition mask 514 @type callback: callable 515 @param callback: a function to call 516 @rtype: int 517 @return: an integer ID of the event source 518 """ 519 if f in self._poll_event_handlers: 520 raise AssertionError("fd %d is already registered" % f) 521 source_id = self._new_source_id() 522 self._poll_event_handler_ids[source_id] = f 523 self._poll_event_handlers[f] = self._io_handler_class( 524 args=args, callback=callback, f=f, source_id=source_id) 525 self._poll_obj.register(f, condition) 526 return source_id
527
528 - def source_remove(self, reg_id):
529 """ 530 Like glib.source_remove(), this returns True if the given reg_id 531 is found and removed, and False if the reg_id is invalid or has 532 already been removed. 533 """ 534 x = self._child_handlers.pop(reg_id, None) 535 if x is not None: 536 if not self._child_handlers and self._use_signal: 537 signal.signal(signal.SIGCHLD, signal.SIG_DFL) 538 self.source_remove(self._sigchld_src_id) 539 self._sigchld_src_id = None 540 return True 541 542 with self._thread_rlock: 543 idle_callback = self._idle_callbacks.pop(reg_id, None) 544 if idle_callback is not None: 545 return True 546 timeout_handler = self._timeout_handlers.pop(reg_id, None) 547 if timeout_handler is not None: 548 if timeout_handler.interval == self._timeout_interval: 549 if self._timeout_handlers: 550 self._timeout_interval = min(x.interval 551 for x in self._timeout_handlers.values()) 552 else: 553 self._timeout_interval = None 554 return True 555 556 f = self._poll_event_handler_ids.pop(reg_id, None) 557 if f is None: 558 return False 559 self._poll_obj.unregister(f) 560 if self._poll_event_queue: 561 # Discard any unhandled events that belong to this file, 562 # in order to prevent these events from being erroneously 563 # delivered to a future handler that is using a reallocated 564 # file descriptor of the same numeric value (causing 565 # extremely confusing bugs). 566 remaining_events = [] 567 discarded_events = False 568 for event in self._poll_event_queue: 569 if event[0] == f: 570 discarded_events = True 571 else: 572 remaining_events.append(event) 573 574 if discarded_events: 575 self._poll_event_queue[:] = remaining_events 576 577 del self._poll_event_handlers[f] 578 return True
579 580 _can_poll_device = None 581
582 -def can_poll_device():
583 """ 584 Test if it's possible to use poll() on a device such as a pty. This 585 is known to fail on Darwin. 586 @rtype: bool 587 @return: True if poll() on a device succeeds, False otherwise. 588 """ 589 590 global _can_poll_device 591 if _can_poll_device is not None: 592 return _can_poll_device 593 594 if not hasattr(select, "poll"): 595 _can_poll_device = False 596 return _can_poll_device 597 598 try: 599 dev_null = open('/dev/null', 'rb') 600 except IOError: 601 _can_poll_device = False 602 return _can_poll_device 603 604 p = select.poll() 605 try: 606 p.register(dev_null.fileno(), PollConstants.POLLIN) 607 except TypeError: 608 # Jython: Object 'org.python.core.io.FileIO@f8f175' is not watchable 609 _can_poll_device = False 610 return _can_poll_device 611 612 invalid_request = False 613 for f, event in p.poll(): 614 if event & PollConstants.POLLNVAL: 615 invalid_request = True 616 break 617 dev_null.close() 618 619 _can_poll_device = not invalid_request 620 return _can_poll_device
621
622 -def create_poll_instance():
623 """ 624 Create an instance of select.poll, or an instance of 625 PollSelectAdapter there is no poll() implementation or 626 it is broken somehow. 627 """ 628 if can_poll_device(): 629 return select.poll() 630 return PollSelectAdapter()
631
632 -class _epoll_adapter(object):
633 """ 634 Wraps a select.epoll instance in order to make it compatible 635 with select.poll instances. This is necessary since epoll instances 636 interpret timeout arguments differently. Note that the file descriptor 637 that is associated with an epoll instance will close automatically when 638 it is garbage collected, so it's not necessary to close it explicitly. 639 """ 640 __slots__ = ('_epoll_obj',) 641
642 - def __init__(self, epoll_obj):
643 self._epoll_obj = epoll_obj
644
645 - def register(self, fd, *args):
646 self._epoll_obj.register(fd, *args)
647
648 - def unregister(self, fd):
649 self._epoll_obj.unregister(fd)
650
651 - def poll(self, *args):
652 if len(args) > 1: 653 raise TypeError( 654 "poll expected at most 2 arguments, got " + \ 655 repr(1 + len(args))) 656 timeout = -1 657 if args: 658 timeout = args[0] 659 if timeout is None or timeout < 0: 660 timeout = -1 661 elif timeout != 0: 662 timeout = timeout / 1000 663 664 return self._epoll_obj.poll(timeout)
665