From d2525e2e8771e6ad74832bd0ac9489339a3a254e Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 15 May 2017 16:20:20 -0700 Subject: [PATCH 1/4] failing test for forthcoming behavior of thread_raise --- setup.py | 2 +- tests/test_units.py | 108 ++++++++++++++++++++++++++++++-------------- 2 files changed, 74 insertions(+), 36 deletions(-) diff --git a/setup.py b/setup.py index 2ad0800..f2f42fc 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b11.dev245', + version='1.1b11.dev246', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', diff --git a/tests/test_units.py b/tests/test_units.py index 2ff7d36..9046d91 100644 --- a/tests/test_units.py +++ b/tests/test_units.py @@ -193,55 +193,93 @@ def test_start_stop_backwards_compat(): assert not 'finished' in job def test_thread_raise(): - let_thread_finish = threading.Event() - thread_preamble_done = threading.Event() thread_caught_exception = None - def thread_target(accept_exceptions=False): + class Exception1(Exception): + pass + class Exception2(Exception): + pass + + def accept_immediately(): try: - if accept_exceptions: - with brozzler.thread_accept_exceptions(): - thread_preamble_done.set() - logging.info('waiting (accepting exceptions)') - let_thread_finish.wait() - else: - thread_preamble_done.set() - logging.info('waiting (not accepting exceptions)') - let_thread_finish.wait() + with brozzler.thread_accept_exceptions(): + brozzler.sleep(2) except Exception as e: - logging.info('caught exception %s', repr(e)) nonlocal thread_caught_exception thread_caught_exception = e - finally: - logging.info('finishing') - let_thread_finish.clear() - thread_preamble_done.clear() - # test that thread_raise does not raise exception in a thread that has not - # called thread_accept_exceptions + def accept_eventually(): + try: + brozzler.sleep(2) + with brozzler.thread_accept_exceptions(): + pass + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + + def never_accept(): + try: + brozzler.sleep(2) + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + + def delay_context_exit(): + try: + with brozzler.thread_accept_exceptions() as gate: + logging.info('gate=%s', gate) + orig_exit = gate.__exit__ + import traceback + gate.__exit__ = lambda et, ev, t: ( + logging.info('fake exit'), traceback.print_stack(), + brozzler.sleep(2), orig_exit(et, ev, t)) + try: + brozzler.sleep(2) + except Exception as e: + raise + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + + # test that thread_raise does not raise exception in a thread that has no + # `with thread_exception_gate()` block thread_caught_exception = None - th = threading.Thread(target=lambda: thread_target(accept_exceptions=False)) + th = threading.Thread(target=never_accept) th.start() - thread_preamble_done.wait() - with pytest.raises(TypeError): - brozzler.thread_raise( - th, Exception("i'm an instance, which is not allowed")) - assert brozzler.thread_raise(th, Exception) is False - assert thread_caught_exception is None - let_thread_finish.set() + brozzler.thread_raise(th, Exception) th.join() assert thread_caught_exception is None - # test that thread_raise raises exception in a thread that has called - # thread_accept_exceptions + # test immediate exception raise thread_caught_exception = None - th = threading.Thread(target=lambda: thread_target(accept_exceptions=True)) + th = threading.Thread(target=accept_immediately) th.start() - thread_preamble_done.wait() - assert brozzler.thread_raise(th, Exception) is True - let_thread_finish.set() + brozzler.thread_raise(th, Exception) + start = time.time() th.join() assert thread_caught_exception - with pytest.raises(threading.ThreadError): # thread is not running - brozzler.thread_raise(th, Exception) + assert time.time() - start < 1.0 + + # test that a second thread_raise() doesn't result in an exception in + # ThreadExceptionGate.__exit__ + thread_caught_exception = None + th = threading.Thread(target=delay_context_exit) + th.start() + time.sleep(0.2) + brozzler.thread_raise(th, Exception1) + time.sleep(0.2) + brozzler.thread_raise(th, Exception2) + th.join() + assert thread_caught_exception + assert isinstance(thread_caught_exception, Exception1) + + # test exception that has to wait for `with thread_exception_gate()` block + thread_caught_exception = None + th = threading.Thread(target=accept_eventually) + th.start() + brozzler.thread_raise(th, Exception) + start = time.time() + th.join() + assert thread_caught_exception + assert time.time() - start > 1.0 From d514eaec15424f6d290f9243c398fd51ea25fa1d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 16 May 2017 14:00:10 -0700 Subject: [PATCH 2/4] even more, better failing tests for thread_raise --- setup.py | 2 +- tests/test_units.py | 117 +++++++++++++++++++++++++++----------------- 2 files changed, 73 insertions(+), 46 deletions(-) diff --git a/setup.py b/setup.py index f2f42fc..34e5087 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b11.dev246', + version='1.1b11.dev247', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', diff --git a/tests/test_units.py b/tests/test_units.py index 9046d91..3f40863 100644 --- a/tests/test_units.py +++ b/tests/test_units.py @@ -192,31 +192,12 @@ def test_start_stop_backwards_compat(): assert not 'started' in job assert not 'finished' in job -def test_thread_raise(): - thread_caught_exception = None - - class Exception1(Exception): - pass - class Exception2(Exception): - pass - - def accept_immediately(): - try: - with brozzler.thread_accept_exceptions(): - brozzler.sleep(2) - except Exception as e: - nonlocal thread_caught_exception - thread_caught_exception = e - - def accept_eventually(): - try: - brozzler.sleep(2) - with brozzler.thread_accept_exceptions(): - pass - except Exception as e: - nonlocal thread_caught_exception - thread_caught_exception = e +class Exception1(Exception): + pass +class Exception2(Exception): + pass +def test_thread_raise_not_accept(): def never_accept(): try: brozzler.sleep(2) @@ -224,42 +205,50 @@ def test_thread_raise(): nonlocal thread_caught_exception thread_caught_exception = e - def delay_context_exit(): - try: - with brozzler.thread_accept_exceptions() as gate: - logging.info('gate=%s', gate) - orig_exit = gate.__exit__ - import traceback - gate.__exit__ = lambda et, ev, t: ( - logging.info('fake exit'), traceback.print_stack(), - brozzler.sleep(2), orig_exit(et, ev, t)) - try: - brozzler.sleep(2) - except Exception as e: - raise - except Exception as e: - nonlocal thread_caught_exception - thread_caught_exception = e - # test that thread_raise does not raise exception in a thread that has no # `with thread_exception_gate()` block thread_caught_exception = None th = threading.Thread(target=never_accept) th.start() - brozzler.thread_raise(th, Exception) + brozzler.thread_raise(th, Exception1) th.join() assert thread_caught_exception is None +def test_thread_raise_immediate(): + def accept_immediately(): + try: + with brozzler.thread_accept_exceptions(): + brozzler.sleep(2) + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + # test immediate exception raise thread_caught_exception = None th = threading.Thread(target=accept_immediately) th.start() - brozzler.thread_raise(th, Exception) + brozzler.thread_raise(th, Exception1) start = time.time() th.join() assert thread_caught_exception + assert isinstance(thread_caught_exception, Exception1) assert time.time() - start < 1.0 +def test_thread_raise_safe_exit(): + def delay_context_exit(): + gate = brozzler.thread_accept_exceptions() + orig_exit = type(gate).__exit__ + try: + type(gate).__exit__ = lambda self, et, ev, t: ( + brozzler.sleep(2), orig_exit(self, et, ev, t), False)[-1] + with brozzler.thread_accept_exceptions() as gate: + brozzler.sleep(2) + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + finally: + type(gate).__exit__ = orig_exit + # test that a second thread_raise() doesn't result in an exception in # ThreadExceptionGate.__exit__ thread_caught_exception = None @@ -273,13 +262,51 @@ def test_thread_raise(): assert thread_caught_exception assert isinstance(thread_caught_exception, Exception1) +def test_thread_raise_pending_exception(): + def accept_eventually(): + try: + brozzler.sleep(2) + with brozzler.thread_accept_exceptions(): + pass + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + # test exception that has to wait for `with thread_exception_gate()` block thread_caught_exception = None th = threading.Thread(target=accept_eventually) th.start() - brozzler.thread_raise(th, Exception) + brozzler.thread_raise(th, Exception1) start = time.time() th.join() - assert thread_caught_exception + assert isinstance(thread_caught_exception, Exception1) assert time.time() - start > 1.0 +def test_thread_raise_second_with_block(): + def two_with_blocks(): + try: + with brozzler.thread_accept_exceptions(): + time.sleep(2) + return # test fails + except Exception1 as e: + pass + except: + return # fail test + + try: + with brozzler.thread_accept_exceptions(): + brozzler.sleep(2) + except Exception as e: + nonlocal thread_caught_exception + thread_caught_exception = e + + # test that second `with` block gets second exception raised during first + # `with` block + thread_caught_exception = None + th = threading.Thread(target=two_with_blocks) + th.start() + brozzler.thread_raise(th, Exception1) + brozzler.thread_raise(th, Exception2) + th.join() + assert isinstance(thread_caught_exception, Exception2) + From 31dc6a2d9799be25b26ef9307fa66463ac937623 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 16 May 2017 14:20:53 -0700 Subject: [PATCH 3/4] improve thread_raise() so that the new tests pass 1. If thread is not currently accepting exceptions, queue it and raise if and when it does start accepting them. This fixes problem of thread_raise exceptions being ignored when raised just before the target thread starts accepting exceptions. 2. Avoid problems caused by raising multiple exceptions in the same thread in quick succession by ensuring that only one is actually raised for a given `with` block. This type of occurrence had been putting brozzler into a borked/frozen state. --- brozzler/__init__.py | 173 +++++++++++++++++++++---------------------- setup.py | 2 +- 2 files changed, 85 insertions(+), 90 deletions(-) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 345a3ca..28ced63 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -98,80 +98,95 @@ def behavior_script(url, template_parameters=None): return script return None +class ThreadExceptionGate: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, thread): + self.thread = thread + self.ok_to_raise = threading.Event() + self.pending_exception = None + self.lock = threading.RLock() + + def __enter__(self): + assert self.thread == threading.current_thread() + if self.pending_exception: + self.logger.info( + 'raising pending exception %s', self.pending_exception) + tmp = self.pending_exception + self.pending_exception = None + raise tmp + else: + self.ok_to_raise.set() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.logger.info( + 'self=%s exc_type=%s exc_value=%s', repr(self), repr(exc_type), + repr(exc_value)) + assert self.thread == threading.current_thread() + self.ok_to_raise.clear() + return False # don't swallow exception + + def queue_exception(self, e): + with self.lock: + if self.pending_exception: + self.logger.warn( + '%s already pending for thread %s, discarding %s', + repr(self.pending_exception), self.thread, repr(e)) + else: + self.pending_exception = e + + def __repr__(self): + return '' % self.thread + import threading _thread_exception_gates = {} _thread_exception_gates_lock = threading.Lock() -def thread_accept_exceptions(): +def thread_exception_gate(thread=None): ''' - Returns a context manager whose purpose is best explained with a snippet: + Returns a `ThreadExceptionGate` for `thread` (current thread by default). - # === thread1 === + `ThreadExceptionGate` is a context manager which allows exceptions to be + raised from threads other than the current one, by way of `thread_raise`. - # If thread2 calls `thread_raise(thread1, ...)` while do_something() is - # executing, nothing will happen (no exception will be raised in - # thread1). - do_something() + Example: try: - with thread_accept_exceptions(): - # Now we're in the "runtime environment" (pep340) of the - # context manager. If thread2 calls `thread_raise(thread1, - # ...)` while do_something_else() is running, the exception - # will be raised here. - do_something_else() - - # Here again if thread2 calls `thread_raise`, nothing happens. - do_yet_another_thing() - + with thread_exception_gate(): + # do something except: - handle_exception() + # handle exception.... - The context manager is reentrant, i.e. you can do this: - - with thread_accept_exceptions(): - with thread_accept_exceptions(): - blah() - - # `thread_raise` will still work here - toot() + If `thread_raise` is called on a thread that is not currently inside the + `ThreadExceptionGate` context (pep340 "runtime environment"), the exception + is queued, and raised immediately if and when the thread enters the + context. Only one exception will be queued this way at a time, others are + discarded. ''' - class ThreadExceptionGate: - def __init__(self): - self.lock = threading.Lock() - self.ok_to_raise = 0 - - def __enter__(self): - with self.lock: - self.ok_to_raise += 1 - - def __exit__(self, exc_type, exc_value, traceback): - with self.lock: - self.ok_to_raise -= 1 - assert self.ok_to_raise >= 0 + if not thread: + thread = threading.current_thread() with _thread_exception_gates_lock: - if not threading.current_thread().ident in _thread_exception_gates: - _thread_exception_gates[ - threading.current_thread().ident] = ThreadExceptionGate() - return _thread_exception_gates[threading.current_thread().ident] + if not thread in _thread_exception_gates: + _thread_exception_gates[thread] = ThreadExceptionGate(thread) + + return _thread_exception_gates[thread] + +thread_accept_exceptions = thread_exception_gate def thread_raise(thread, exctype): ''' - Raises the exception `exctype` in the thread `thread`, if it is willing to - accept exceptions (see `thread_accept_exceptions`). + Raises or queues the exception `exctype` for the thread `thread`. + + See the documentation on the function `thread_exception_gate()` for more + information. Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains: "The exception will be raised only when executing python bytecode. If your thread calls a native/built-in blocking function, the exception will be raised only when execution returns to the python code." - Returns: - True if exception was raised, False if `thread` is not accepting - exceptions, or another thread is in the middle of raising an exception - in `thread` - Raises: - threading.ThreadError if `thread` is not running TypeError if `exctype` is not a class ValueError, SystemError in case of unexpected problems ''' @@ -182,44 +197,24 @@ def thread_raise(thread, exctype): 'cannot raise %s, only exception types can be raised (not ' 'instances)' % exctype) - if not thread.is_alive(): - raise threading.ThreadError('thread %s is not running' % thread) - - gate = _thread_exception_gates.get(thread.ident) - if not gate: - logging.warn( - 'thread is not accepting exceptions (gate not initialized), ' - 'not raising %s in thread %s', exctype, thread) - return False - - got_lock = gate.lock.acquire(blocking=False) - if not got_lock: - logging.warn( - 'could not get acquire thread exception gate lock, not ' - 'raising %s in thread %s', exctype, thread) - return False - try: - if not gate.ok_to_raise: - logging.warn( - 'thread is not accepting exceptions (gate.ok_to_raise is ' - '%s), not raising %s in thread %s', - gate.ok_to_raise, exctype, thread) - return False - - logging.info('raising %s in thread %s', exctype, thread) - res = ctypes.pythonapi.PyThreadState_SetAsyncExc( - ctypes.c_long(thread.ident), ctypes.py_object(exctype)) - if res == 0: - raise ValueError( - 'invalid thread id? thread.ident=%s' % thread.ident) - elif res != 1: - # if it returns a number greater than one, you're in trouble, - # and you should call it again with exc=NULL to revert the effect - ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) - raise SystemError('PyThreadState_SetAsyncExc failed') - return True - finally: - gate.lock.release() + gate = thread_exception_gate(thread) + with gate.lock: + if gate.ok_to_raise.is_set() and thread.is_alive(): + gate.ok_to_raise.clear() + logging.info('raising %s in thread %s', exctype, thread) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread.ident), ctypes.py_object(exctype)) + if res == 0: + raise ValueError( + 'invalid thread id? thread.ident=%s' % thread.ident) + elif res != 1: + # if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect + ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) + raise SystemError('PyThreadState_SetAsyncExc failed') + else: + logging.info('queueing %s for thread %s', exctype, thread) + gate.queue_exception(exctype) def sleep(duration): ''' diff --git a/setup.py b/setup.py index 34e5087..ab4237e 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b11.dev247', + version='1.1b11.dev248', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', From 89e7c8b07900cca8828cb0cf00b52d57b83d058f Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 16 May 2017 15:47:18 -0700 Subject: [PATCH 4/4] fix exception from ReachedLimit.__repr__ when it has been instantiated implicitly and __init__ was not called --- brozzler/__init__.py | 3 ++- setup.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 28ced63..af51e61 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -48,7 +48,8 @@ class ReachedLimit(Exception): def __repr__(self): return "ReachedLimit(warcprox_meta=%s,http_payload=%s)" % ( - repr(self.warcprox_meta), repr(self.http_payload)) + repr(self.warcprox_meta) if hasattr(self, 'warcprox_meta') else None, + repr(self.http_payload) if hasattr(self, 'http_payload') else None) def __str__(self): return self.__repr__() diff --git a/setup.py b/setup.py index ab4237e..3491ba2 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b11.dev248', + version='1.1b11.dev249', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt',