diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 345a3ca..28ced63 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -98,80 +98,95 @@ def behavior_script(url, template_parameters=None): return script return None +class ThreadExceptionGate: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, thread): + self.thread = thread + self.ok_to_raise = threading.Event() + self.pending_exception = None + self.lock = threading.RLock() + + def __enter__(self): + assert self.thread == threading.current_thread() + if self.pending_exception: + self.logger.info( + 'raising pending exception %s', self.pending_exception) + tmp = self.pending_exception + self.pending_exception = None + raise tmp + else: + self.ok_to_raise.set() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.logger.info( + 'self=%s exc_type=%s exc_value=%s', repr(self), repr(exc_type), + repr(exc_value)) + assert self.thread == threading.current_thread() + self.ok_to_raise.clear() + return False # don't swallow exception + + def queue_exception(self, e): + with self.lock: + if self.pending_exception: + self.logger.warn( + '%s already pending for thread %s, discarding %s', + repr(self.pending_exception), self.thread, repr(e)) + else: + self.pending_exception = e + + def __repr__(self): + return '' % self.thread + import threading _thread_exception_gates = {} _thread_exception_gates_lock = threading.Lock() -def thread_accept_exceptions(): +def thread_exception_gate(thread=None): ''' - Returns a context manager whose purpose is best explained with a snippet: + Returns a `ThreadExceptionGate` for `thread` (current thread by default). - # === thread1 === + `ThreadExceptionGate` is a context manager which allows exceptions to be + raised from threads other than the current one, by way of `thread_raise`. - # If thread2 calls `thread_raise(thread1, ...)` while do_something() is - # executing, nothing will happen (no exception will be raised in - # thread1). - do_something() + Example: try: - with thread_accept_exceptions(): - # Now we're in the "runtime environment" (pep340) of the - # context manager. If thread2 calls `thread_raise(thread1, - # ...)` while do_something_else() is running, the exception - # will be raised here. - do_something_else() - - # Here again if thread2 calls `thread_raise`, nothing happens. - do_yet_another_thing() - + with thread_exception_gate(): + # do something except: - handle_exception() + # handle exception.... - The context manager is reentrant, i.e. you can do this: - - with thread_accept_exceptions(): - with thread_accept_exceptions(): - blah() - - # `thread_raise` will still work here - toot() + If `thread_raise` is called on a thread that is not currently inside the + `ThreadExceptionGate` context (pep340 "runtime environment"), the exception + is queued, and raised immediately if and when the thread enters the + context. Only one exception will be queued this way at a time, others are + discarded. ''' - class ThreadExceptionGate: - def __init__(self): - self.lock = threading.Lock() - self.ok_to_raise = 0 - - def __enter__(self): - with self.lock: - self.ok_to_raise += 1 - - def __exit__(self, exc_type, exc_value, traceback): - with self.lock: - self.ok_to_raise -= 1 - assert self.ok_to_raise >= 0 + if not thread: + thread = threading.current_thread() with _thread_exception_gates_lock: - if not threading.current_thread().ident in _thread_exception_gates: - _thread_exception_gates[ - threading.current_thread().ident] = ThreadExceptionGate() - return _thread_exception_gates[threading.current_thread().ident] + if not thread in _thread_exception_gates: + _thread_exception_gates[thread] = ThreadExceptionGate(thread) + + return _thread_exception_gates[thread] + +thread_accept_exceptions = thread_exception_gate def thread_raise(thread, exctype): ''' - Raises the exception `exctype` in the thread `thread`, if it is willing to - accept exceptions (see `thread_accept_exceptions`). + Raises or queues the exception `exctype` for the thread `thread`. + + See the documentation on the function `thread_exception_gate()` for more + information. Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains: "The exception will be raised only when executing python bytecode. If your thread calls a native/built-in blocking function, the exception will be raised only when execution returns to the python code." - Returns: - True if exception was raised, False if `thread` is not accepting - exceptions, or another thread is in the middle of raising an exception - in `thread` - Raises: - threading.ThreadError if `thread` is not running TypeError if `exctype` is not a class ValueError, SystemError in case of unexpected problems ''' @@ -182,44 +197,24 @@ def thread_raise(thread, exctype): 'cannot raise %s, only exception types can be raised (not ' 'instances)' % exctype) - if not thread.is_alive(): - raise threading.ThreadError('thread %s is not running' % thread) - - gate = _thread_exception_gates.get(thread.ident) - if not gate: - logging.warn( - 'thread is not accepting exceptions (gate not initialized), ' - 'not raising %s in thread %s', exctype, thread) - return False - - got_lock = gate.lock.acquire(blocking=False) - if not got_lock: - logging.warn( - 'could not get acquire thread exception gate lock, not ' - 'raising %s in thread %s', exctype, thread) - return False - try: - if not gate.ok_to_raise: - logging.warn( - 'thread is not accepting exceptions (gate.ok_to_raise is ' - '%s), not raising %s in thread %s', - gate.ok_to_raise, exctype, thread) - return False - - logging.info('raising %s in thread %s', exctype, thread) - res = ctypes.pythonapi.PyThreadState_SetAsyncExc( - ctypes.c_long(thread.ident), ctypes.py_object(exctype)) - if res == 0: - raise ValueError( - 'invalid thread id? thread.ident=%s' % thread.ident) - elif res != 1: - # if it returns a number greater than one, you're in trouble, - # and you should call it again with exc=NULL to revert the effect - ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) - raise SystemError('PyThreadState_SetAsyncExc failed') - return True - finally: - gate.lock.release() + gate = thread_exception_gate(thread) + with gate.lock: + if gate.ok_to_raise.is_set() and thread.is_alive(): + gate.ok_to_raise.clear() + logging.info('raising %s in thread %s', exctype, thread) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread.ident), ctypes.py_object(exctype)) + if res == 0: + raise ValueError( + 'invalid thread id? thread.ident=%s' % thread.ident) + elif res != 1: + # if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect + ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) + raise SystemError('PyThreadState_SetAsyncExc failed') + else: + logging.info('queueing %s for thread %s', exctype, thread) + gate.queue_exception(exctype) def sleep(duration): ''' diff --git a/setup.py b/setup.py index 34e5087..ab4237e 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b11.dev247', + version='1.1b11.dev248', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt',