From 31dc6a2d9799be25b26ef9307fa66463ac937623 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 16 May 2017 14:20:53 -0700 Subject: [PATCH] improve thread_raise() so that the new tests pass 1. If thread is not currently accepting exceptions, queue it and raise if and when it does start accepting them. This fixes problem of thread_raise exceptions being ignored when raised just before the target thread starts accepting exceptions. 2. Avoid problems caused by raising multiple exceptions in the same thread in quick succession by ensuring that only one is actually raised for a given `with` block. This type of occurrence had been putting brozzler into a borked/frozen state. --- brozzler/__init__.py | 173 +++++++++++++++++++++---------------------- setup.py | 2 +- 2 files changed, 85 insertions(+), 90 deletions(-) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 345a3ca..28ced63 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -98,80 +98,95 @@ def behavior_script(url, template_parameters=None): return script return None +class ThreadExceptionGate: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, thread): + self.thread = thread + self.ok_to_raise = threading.Event() + self.pending_exception = None + self.lock = threading.RLock() + + def __enter__(self): + assert self.thread == threading.current_thread() + if self.pending_exception: + self.logger.info( + 'raising pending exception %s', self.pending_exception) + tmp = self.pending_exception + self.pending_exception = None + raise tmp + else: + self.ok_to_raise.set() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.logger.info( + 'self=%s exc_type=%s exc_value=%s', repr(self), repr(exc_type), + repr(exc_value)) + assert self.thread == threading.current_thread() + self.ok_to_raise.clear() + return False # don't swallow exception + + def queue_exception(self, e): + with self.lock: + if self.pending_exception: + self.logger.warn( + '%s already pending for thread %s, discarding %s', + repr(self.pending_exception), self.thread, repr(e)) + else: + self.pending_exception = e + + def __repr__(self): + return '' % self.thread + import threading _thread_exception_gates = {} _thread_exception_gates_lock = threading.Lock() -def thread_accept_exceptions(): +def thread_exception_gate(thread=None): ''' - Returns a context manager whose purpose is best explained with a snippet: + Returns a `ThreadExceptionGate` for `thread` (current thread by default). - # === thread1 === + `ThreadExceptionGate` is a context manager which allows exceptions to be + raised from threads other than the current one, by way of `thread_raise`. - # If thread2 calls `thread_raise(thread1, ...)` while do_something() is - # executing, nothing will happen (no exception will be raised in - # thread1). - do_something() + Example: try: - with thread_accept_exceptions(): - # Now we're in the "runtime environment" (pep340) of the - # context manager. If thread2 calls `thread_raise(thread1, - # ...)` while do_something_else() is running, the exception - # will be raised here. - do_something_else() - - # Here again if thread2 calls `thread_raise`, nothing happens. - do_yet_another_thing() - + with thread_exception_gate(): + # do something except: - handle_exception() + # handle exception.... - The context manager is reentrant, i.e. you can do this: - - with thread_accept_exceptions(): - with thread_accept_exceptions(): - blah() - - # `thread_raise` will still work here - toot() + If `thread_raise` is called on a thread that is not currently inside the + `ThreadExceptionGate` context (pep340 "runtime environment"), the exception + is queued, and raised immediately if and when the thread enters the + context. Only one exception will be queued this way at a time, others are + discarded. ''' - class ThreadExceptionGate: - def __init__(self): - self.lock = threading.Lock() - self.ok_to_raise = 0 - - def __enter__(self): - with self.lock: - self.ok_to_raise += 1 - - def __exit__(self, exc_type, exc_value, traceback): - with self.lock: - self.ok_to_raise -= 1 - assert self.ok_to_raise >= 0 + if not thread: + thread = threading.current_thread() with _thread_exception_gates_lock: - if not threading.current_thread().ident in _thread_exception_gates: - _thread_exception_gates[ - threading.current_thread().ident] = ThreadExceptionGate() - return _thread_exception_gates[threading.current_thread().ident] + if not thread in _thread_exception_gates: + _thread_exception_gates[thread] = ThreadExceptionGate(thread) + + return _thread_exception_gates[thread] + +thread_accept_exceptions = thread_exception_gate def thread_raise(thread, exctype): ''' - Raises the exception `exctype` in the thread `thread`, if it is willing to - accept exceptions (see `thread_accept_exceptions`). + Raises or queues the exception `exctype` for the thread `thread`. + + See the documentation on the function `thread_exception_gate()` for more + information. Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains: "The exception will be raised only when executing python bytecode. If your thread calls a native/built-in blocking function, the exception will be raised only when execution returns to the python code." - Returns: - True if exception was raised, False if `thread` is not accepting - exceptions, or another thread is in the middle of raising an exception - in `thread` - Raises: - threading.ThreadError if `thread` is not running TypeError if `exctype` is not a class ValueError, SystemError in case of unexpected problems ''' @@ -182,44 +197,24 @@ def thread_raise(thread, exctype): 'cannot raise %s, only exception types can be raised (not ' 'instances)' % exctype) - if not thread.is_alive(): - raise threading.ThreadError('thread %s is not running' % thread) - - gate = _thread_exception_gates.get(thread.ident) - if not gate: - logging.warn( - 'thread is not accepting exceptions (gate not initialized), ' - 'not raising %s in thread %s', exctype, thread) - return False - - got_lock = gate.lock.acquire(blocking=False) - if not got_lock: - logging.warn( - 'could not get acquire thread exception gate lock, not ' - 'raising %s in thread %s', exctype, thread) - return False - try: - if not gate.ok_to_raise: - logging.warn( - 'thread is not accepting exceptions (gate.ok_to_raise is ' - '%s), not raising %s in thread %s', - gate.ok_to_raise, exctype, thread) - return False - - logging.info('raising %s in thread %s', exctype, thread) - res = ctypes.pythonapi.PyThreadState_SetAsyncExc( - ctypes.c_long(thread.ident), ctypes.py_object(exctype)) - if res == 0: - raise ValueError( - 'invalid thread id? thread.ident=%s' % thread.ident) - elif res != 1: - # if it returns a number greater than one, you're in trouble, - # and you should call it again with exc=NULL to revert the effect - ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) - raise SystemError('PyThreadState_SetAsyncExc failed') - return True - finally: - gate.lock.release() + gate = thread_exception_gate(thread) + with gate.lock: + if gate.ok_to_raise.is_set() and thread.is_alive(): + gate.ok_to_raise.clear() + logging.info('raising %s in thread %s', exctype, thread) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread.ident), ctypes.py_object(exctype)) + if res == 0: + raise ValueError( + 'invalid thread id? thread.ident=%s' % thread.ident) + elif res != 1: + # if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect + ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) + raise SystemError('PyThreadState_SetAsyncExc failed') + else: + logging.info('queueing %s for thread %s', exctype, thread) + gate.queue_exception(exctype) def sleep(duration): ''' diff --git a/setup.py b/setup.py index 34e5087..ab4237e 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b11.dev247', + version='1.1b11.dev248', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt',