diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 345a3ca..af51e61 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -48,7 +48,8 @@ class ReachedLimit(Exception): def __repr__(self): return "ReachedLimit(warcprox_meta=%s,http_payload=%s)" % ( - repr(self.warcprox_meta), repr(self.http_payload)) + repr(self.warcprox_meta) if hasattr(self, 'warcprox_meta') else None, + repr(self.http_payload) if hasattr(self, 'http_payload') else None) def __str__(self): return self.__repr__() @@ -98,80 +99,95 @@ def behavior_script(url, template_parameters=None): return script return None +class ThreadExceptionGate: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, thread): + self.thread = thread + self.ok_to_raise = threading.Event() + self.pending_exception = None + self.lock = threading.RLock() + + def __enter__(self): + assert self.thread == threading.current_thread() + if self.pending_exception: + self.logger.info( + 'raising pending exception %s', self.pending_exception) + tmp = self.pending_exception + self.pending_exception = None + raise tmp + else: + self.ok_to_raise.set() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.logger.info( + 'self=%s exc_type=%s exc_value=%s', repr(self), repr(exc_type), + repr(exc_value)) + assert self.thread == threading.current_thread() + self.ok_to_raise.clear() + return False # don't swallow exception + + def queue_exception(self, e): + with self.lock: + if self.pending_exception: + self.logger.warn( + '%s already pending for thread %s, discarding %s', + repr(self.pending_exception), self.thread, repr(e)) + else: + self.pending_exception = e + + def __repr__(self): + return '' % self.thread + import threading _thread_exception_gates = {} _thread_exception_gates_lock = threading.Lock() -def thread_accept_exceptions(): +def thread_exception_gate(thread=None): ''' - Returns a context manager whose purpose is best explained with a snippet: + Returns a `ThreadExceptionGate` for `thread` (current thread by default). - # === thread1 === + `ThreadExceptionGate` is a context manager which allows exceptions to be + raised from threads other than the current one, by way of `thread_raise`. - # If thread2 calls `thread_raise(thread1, ...)` while do_something() is - # executing, nothing will happen (no exception will be raised in - # thread1). - do_something() + Example: try: - with thread_accept_exceptions(): - # Now we're in the "runtime environment" (pep340) of the - # context manager. If thread2 calls `thread_raise(thread1, - # ...)` while do_something_else() is running, the exception - # will be raised here. - do_something_else() - - # Here again if thread2 calls `thread_raise`, nothing happens. - do_yet_another_thing() - + with thread_exception_gate(): + # do something except: - handle_exception() + # handle exception.... - The context manager is reentrant, i.e. you can do this: - - with thread_accept_exceptions(): - with thread_accept_exceptions(): - blah() - - # `thread_raise` will still work here - toot() + If `thread_raise` is called on a thread that is not currently inside the + `ThreadExceptionGate` context (pep340 "runtime environment"), the exception + is queued, and raised immediately if and when the thread enters the + context. Only one exception will be queued this way at a time, others are + discarded. ''' - class ThreadExceptionGate: - def __init__(self): - self.lock = threading.Lock() - self.ok_to_raise = 0 - - def __enter__(self): - with self.lock: - self.ok_to_raise += 1 - - def __exit__(self, exc_type, exc_value, traceback): - with self.lock: - self.ok_to_raise -= 1 - assert self.ok_to_raise >= 0 + if not thread: + thread = threading.current_thread() with _thread_exception_gates_lock: - if not threading.current_thread().ident in _thread_exception_gates: - _thread_exception_gates[ - threading.current_thread().ident] = ThreadExceptionGate() - return _thread_exception_gates[threading.current_thread().ident] + if not thread in _thread_exception_gates: + _thread_exception_gates[thread] = ThreadExceptionGate(thread) + + return _thread_exception_gates[thread] + +thread_accept_exceptions = thread_exception_gate def thread_raise(thread, exctype): ''' - Raises the exception `exctype` in the thread `thread`, if it is willing to - accept exceptions (see `thread_accept_exceptions`). + Raises or queues the exception `exctype` for the thread `thread`. + + See the documentation on the function `thread_exception_gate()` for more + information. Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains: "The exception will be raised only when executing python bytecode. If your thread calls a native/built-in blocking function, the exception will be raised only when execution returns to the python code." - Returns: - True if exception was raised, False if `thread` is not accepting - exceptions, or another thread is in the middle of raising an exception - in `thread` - Raises: - threading.ThreadError if `thread` is not running TypeError if `exctype` is not a class ValueError, SystemError in case of unexpected problems ''' @@ -182,44 +198,24 @@ def thread_raise(thread, exctype): 'cannot raise %s, only exception types can be raised (not ' 'instances)' % exctype) - if not thread.is_alive(): - raise threading.ThreadError('thread %s is not running' % thread) - - gate = _thread_exception_gates.get(thread.ident) - if not gate: - logging.warn( - 'thread is not accepting exceptions (gate not initialized), ' - 'not raising %s in thread %s', exctype, thread) - return False - - got_lock = gate.lock.acquire(blocking=False) - if not got_lock: - logging.warn( - 'could not get acquire thread exception gate lock, not ' - 'raising %s in thread %s', exctype, thread) - return False - try: - if not gate.ok_to_raise: - logging.warn( - 'thread is not accepting exceptions (gate.ok_to_raise is ' - '%s), not raising %s in thread %s', - gate.ok_to_raise, exctype, thread) - return False - - logging.info('raising %s in thread %s', exctype, thread) - res = ctypes.pythonapi.PyThreadState_SetAsyncExc( - ctypes.c_long(thread.ident), ctypes.py_object(exctype)) - if res == 0: - raise ValueError( - 'invalid thread id? thread.ident=%s' % thread.ident) - elif res != 1: - # if it returns a number greater than one, you're in trouble, - # and you should call it again with exc=NULL to revert the effect - ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) - raise SystemError('PyThreadState_SetAsyncExc failed') - return True - finally: - gate.lock.release() + gate = thread_exception_gate(thread) + with gate.lock: + if gate.ok_to_raise.is_set() and thread.is_alive(): + gate.ok_to_raise.clear() + logging.info('raising %s in thread %s', exctype, thread) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread.ident), ctypes.py_object(exctype)) + if res == 0: + raise ValueError( + 'invalid thread id? thread.ident=%s' % thread.ident) + elif res != 1: + # if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect + ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) + raise SystemError('PyThreadState_SetAsyncExc failed') + else: + logging.info('queueing %s for thread %s', exctype, thread) + gate.queue_exception(exctype) def sleep(duration): ''' diff --git a/setup.py b/setup.py index 2ad0800..3491ba2 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b11.dev245', + version='1.1b11.dev249', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', diff --git a/tests/test_units.py b/tests/test_units.py index 2ff7d36..3f40863 100644 --- a/tests/test_units.py +++ b/tests/test_units.py @@ -192,56 +192,121 @@ def test_start_stop_backwards_compat(): assert not 'started' in job assert not 'finished' in job -def test_thread_raise(): - let_thread_finish = threading.Event() - thread_preamble_done = threading.Event() - thread_caught_exception = None +class Exception1(Exception): + pass +class Exception2(Exception): + pass - def thread_target(accept_exceptions=False): +def test_thread_raise_not_accept(): + def never_accept(): try: - if accept_exceptions: - with brozzler.thread_accept_exceptions(): - thread_preamble_done.set() - logging.info('waiting (accepting exceptions)') - let_thread_finish.wait() - else: - thread_preamble_done.set() - logging.info('waiting (not accepting exceptions)') - let_thread_finish.wait() + brozzler.sleep(2) + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + + # test that thread_raise does not raise exception in a thread that has no + # `with thread_exception_gate()` block + thread_caught_exception = None + th = threading.Thread(target=never_accept) + th.start() + brozzler.thread_raise(th, Exception1) + th.join() + assert thread_caught_exception is None + +def test_thread_raise_immediate(): + def accept_immediately(): + try: + with brozzler.thread_accept_exceptions(): + brozzler.sleep(2) + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + + # test immediate exception raise + thread_caught_exception = None + th = threading.Thread(target=accept_immediately) + th.start() + brozzler.thread_raise(th, Exception1) + start = time.time() + th.join() + assert thread_caught_exception + assert isinstance(thread_caught_exception, Exception1) + assert time.time() - start < 1.0 + +def test_thread_raise_safe_exit(): + def delay_context_exit(): + gate = brozzler.thread_accept_exceptions() + orig_exit = type(gate).__exit__ + try: + type(gate).__exit__ = lambda self, et, ev, t: ( + brozzler.sleep(2), orig_exit(self, et, ev, t), False)[-1] + with brozzler.thread_accept_exceptions() as gate: + brozzler.sleep(2) except Exception as e: - logging.info('caught exception %s', repr(e)) nonlocal thread_caught_exception thread_caught_exception = e finally: - logging.info('finishing') - let_thread_finish.clear() - thread_preamble_done.clear() + type(gate).__exit__ = orig_exit - # test that thread_raise does not raise exception in a thread that has not - # called thread_accept_exceptions + # test that a second thread_raise() doesn't result in an exception in + # ThreadExceptionGate.__exit__ thread_caught_exception = None - th = threading.Thread(target=lambda: thread_target(accept_exceptions=False)) + th = threading.Thread(target=delay_context_exit) th.start() - thread_preamble_done.wait() - with pytest.raises(TypeError): - brozzler.thread_raise( - th, Exception("i'm an instance, which is not allowed")) - assert brozzler.thread_raise(th, Exception) is False - assert thread_caught_exception is None - let_thread_finish.set() - th.join() - assert thread_caught_exception is None - - # test that thread_raise raises exception in a thread that has called - # thread_accept_exceptions - thread_caught_exception = None - th = threading.Thread(target=lambda: thread_target(accept_exceptions=True)) - th.start() - thread_preamble_done.wait() - assert brozzler.thread_raise(th, Exception) is True - let_thread_finish.set() + time.sleep(0.2) + brozzler.thread_raise(th, Exception1) + time.sleep(0.2) + brozzler.thread_raise(th, Exception2) th.join() assert thread_caught_exception - with pytest.raises(threading.ThreadError): # thread is not running - brozzler.thread_raise(th, Exception) + assert isinstance(thread_caught_exception, Exception1) + +def test_thread_raise_pending_exception(): + def accept_eventually(): + try: + brozzler.sleep(2) + with brozzler.thread_accept_exceptions(): + pass + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + + # test exception that has to wait for `with thread_exception_gate()` block + thread_caught_exception = None + th = threading.Thread(target=accept_eventually) + th.start() + brozzler.thread_raise(th, Exception1) + start = time.time() + th.join() + assert isinstance(thread_caught_exception, Exception1) + assert time.time() - start > 1.0 + +def test_thread_raise_second_with_block(): + def two_with_blocks(): + try: + with brozzler.thread_accept_exceptions(): + time.sleep(2) + return # test fails + except Exception1 as e: + pass + except: + return # fail test + + try: + with brozzler.thread_accept_exceptions(): + brozzler.sleep(2) + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + + # test that second `with` block gets second exception raised during first + # `with` block + thread_caught_exception = None + th = threading.Thread(target=two_with_blocks) + th.start() + brozzler.thread_raise(th, Exception1) + brozzler.thread_raise(th, Exception2) + th.join() + assert isinstance(thread_caught_exception, Exception2)