Merge branch 'master' into qa

* master:
  fix exception from ReachedLimit.__repr__ when it has been instantiated implicitly and __init__ was not called
  improve thread_raise() so that the new tests pass
  even more, better failing tests for thread_raise
  failing test for forthcoming behavior of thread_raise
This commit is contained in:
Noah Levitt 2017-05-16 15:47:27 -07:00
commit 9c8f626c38
3 changed files with 193 additions and 132 deletions

View file

@ -48,7 +48,8 @@ class ReachedLimit(Exception):
def __repr__(self): def __repr__(self):
return "ReachedLimit(warcprox_meta=%s,http_payload=%s)" % ( return "ReachedLimit(warcprox_meta=%s,http_payload=%s)" % (
repr(self.warcprox_meta), repr(self.http_payload)) repr(self.warcprox_meta) if hasattr(self, 'warcprox_meta') else None,
repr(self.http_payload) if hasattr(self, 'http_payload') else None)
def __str__(self): def __str__(self):
return self.__repr__() return self.__repr__()
@ -98,80 +99,95 @@ def behavior_script(url, template_parameters=None):
return script return script
return None return None
class ThreadExceptionGate:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, thread):
self.thread = thread
self.ok_to_raise = threading.Event()
self.pending_exception = None
self.lock = threading.RLock()
def __enter__(self):
assert self.thread == threading.current_thread()
if self.pending_exception:
self.logger.info(
'raising pending exception %s', self.pending_exception)
tmp = self.pending_exception
self.pending_exception = None
raise tmp
else:
self.ok_to_raise.set()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.logger.info(
'self=%s exc_type=%s exc_value=%s', repr(self), repr(exc_type),
repr(exc_value))
assert self.thread == threading.current_thread()
self.ok_to_raise.clear()
return False # don't swallow exception
def queue_exception(self, e):
with self.lock:
if self.pending_exception:
self.logger.warn(
'%s already pending for thread %s, discarding %s',
repr(self.pending_exception), self.thread, repr(e))
else:
self.pending_exception = e
def __repr__(self):
return '<ThreadExceptionGate(%s)>' % self.thread
import threading import threading
_thread_exception_gates = {} _thread_exception_gates = {}
_thread_exception_gates_lock = threading.Lock() _thread_exception_gates_lock = threading.Lock()
def thread_accept_exceptions(): def thread_exception_gate(thread=None):
''' '''
Returns a context manager whose purpose is best explained with a snippet: Returns a `ThreadExceptionGate` for `thread` (current thread by default).
# === thread1 === `ThreadExceptionGate` is a context manager which allows exceptions to be
raised from threads other than the current one, by way of `thread_raise`.
# If thread2 calls `thread_raise(thread1, ...)` while do_something() is Example:
# executing, nothing will happen (no exception will be raised in
# thread1).
do_something()
try: try:
with thread_accept_exceptions(): with thread_exception_gate():
# Now we're in the "runtime environment" (pep340) of the # do something
# context manager. If thread2 calls `thread_raise(thread1,
# ...)` while do_something_else() is running, the exception
# will be raised here.
do_something_else()
# Here again if thread2 calls `thread_raise`, nothing happens.
do_yet_another_thing()
except: except:
handle_exception() # handle exception....
The context manager is reentrant, i.e. you can do this: If `thread_raise` is called on a thread that is not currently inside the
`ThreadExceptionGate` context (pep340 "runtime environment"), the exception
with thread_accept_exceptions(): is queued, and raised immediately if and when the thread enters the
with thread_accept_exceptions(): context. Only one exception will be queued this way at a time, others are
blah() discarded.
# `thread_raise` will still work here
toot()
''' '''
class ThreadExceptionGate: if not thread:
def __init__(self): thread = threading.current_thread()
self.lock = threading.Lock()
self.ok_to_raise = 0
def __enter__(self):
with self.lock:
self.ok_to_raise += 1
def __exit__(self, exc_type, exc_value, traceback):
with self.lock:
self.ok_to_raise -= 1
assert self.ok_to_raise >= 0
with _thread_exception_gates_lock: with _thread_exception_gates_lock:
if not threading.current_thread().ident in _thread_exception_gates: if not thread in _thread_exception_gates:
_thread_exception_gates[ _thread_exception_gates[thread] = ThreadExceptionGate(thread)
threading.current_thread().ident] = ThreadExceptionGate()
return _thread_exception_gates[threading.current_thread().ident] return _thread_exception_gates[thread]
thread_accept_exceptions = thread_exception_gate
def thread_raise(thread, exctype): def thread_raise(thread, exctype):
''' '''
Raises the exception `exctype` in the thread `thread`, if it is willing to Raises or queues the exception `exctype` for the thread `thread`.
accept exceptions (see `thread_accept_exceptions`).
See the documentation on the function `thread_exception_gate()` for more
information.
Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains: Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains:
"The exception will be raised only when executing python bytecode. If your "The exception will be raised only when executing python bytecode. If your
thread calls a native/built-in blocking function, the exception will be thread calls a native/built-in blocking function, the exception will be
raised only when execution returns to the python code." raised only when execution returns to the python code."
Returns:
True if exception was raised, False if `thread` is not accepting
exceptions, or another thread is in the middle of raising an exception
in `thread`
Raises: Raises:
threading.ThreadError if `thread` is not running
TypeError if `exctype` is not a class TypeError if `exctype` is not a class
ValueError, SystemError in case of unexpected problems ValueError, SystemError in case of unexpected problems
''' '''
@ -182,44 +198,24 @@ def thread_raise(thread, exctype):
'cannot raise %s, only exception types can be raised (not ' 'cannot raise %s, only exception types can be raised (not '
'instances)' % exctype) 'instances)' % exctype)
if not thread.is_alive(): gate = thread_exception_gate(thread)
raise threading.ThreadError('thread %s is not running' % thread) with gate.lock:
if gate.ok_to_raise.is_set() and thread.is_alive():
gate = _thread_exception_gates.get(thread.ident) gate.ok_to_raise.clear()
if not gate: logging.info('raising %s in thread %s', exctype, thread)
logging.warn( res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
'thread is not accepting exceptions (gate not initialized), ' ctypes.c_long(thread.ident), ctypes.py_object(exctype))
'not raising %s in thread %s', exctype, thread) if res == 0:
return False raise ValueError(
'invalid thread id? thread.ident=%s' % thread.ident)
got_lock = gate.lock.acquire(blocking=False) elif res != 1:
if not got_lock: # if it returns a number greater than one, you're in trouble,
logging.warn( # and you should call it again with exc=NULL to revert the effect
'could not get acquire thread exception gate lock, not ' ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0)
'raising %s in thread %s', exctype, thread) raise SystemError('PyThreadState_SetAsyncExc failed')
return False else:
try: logging.info('queueing %s for thread %s', exctype, thread)
if not gate.ok_to_raise: gate.queue_exception(exctype)
logging.warn(
'thread is not accepting exceptions (gate.ok_to_raise is '
'%s), not raising %s in thread %s',
gate.ok_to_raise, exctype, thread)
return False
logging.info('raising %s in thread %s', exctype, thread)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(exctype))
if res == 0:
raise ValueError(
'invalid thread id? thread.ident=%s' % thread.ident)
elif res != 1:
# if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0)
raise SystemError('PyThreadState_SetAsyncExc failed')
return True
finally:
gate.lock.release()
def sleep(duration): def sleep(duration):
''' '''

View file

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup( setuptools.setup(
name='brozzler', name='brozzler',
version='1.1b11.dev245', version='1.1b11.dev249',
description='Distributed web crawling with browsers', description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler', url='https://github.com/internetarchive/brozzler',
author='Noah Levitt', author='Noah Levitt',

View file

@ -192,56 +192,121 @@ def test_start_stop_backwards_compat():
assert not 'started' in job assert not 'started' in job
assert not 'finished' in job assert not 'finished' in job
def test_thread_raise(): class Exception1(Exception):
let_thread_finish = threading.Event() pass
thread_preamble_done = threading.Event() class Exception2(Exception):
thread_caught_exception = None pass
def thread_target(accept_exceptions=False): def test_thread_raise_not_accept():
def never_accept():
try: try:
if accept_exceptions: brozzler.sleep(2)
with brozzler.thread_accept_exceptions(): except Exception as e:
thread_preamble_done.set() nonlocal thread_caught_exception
logging.info('waiting (accepting exceptions)') thread_caught_exception = e
let_thread_finish.wait()
else: # test that thread_raise does not raise exception in a thread that has no
thread_preamble_done.set() # `with thread_exception_gate()` block
logging.info('waiting (not accepting exceptions)') thread_caught_exception = None
let_thread_finish.wait() th = threading.Thread(target=never_accept)
th.start()
brozzler.thread_raise(th, Exception1)
th.join()
assert thread_caught_exception is None
def test_thread_raise_immediate():
def accept_immediately():
try:
with brozzler.thread_accept_exceptions():
brozzler.sleep(2)
except Exception as e:
nonlocal thread_caught_exception
thread_caught_exception = e
# test immediate exception raise
thread_caught_exception = None
th = threading.Thread(target=accept_immediately)
th.start()
brozzler.thread_raise(th, Exception1)
start = time.time()
th.join()
assert thread_caught_exception
assert isinstance(thread_caught_exception, Exception1)
assert time.time() - start < 1.0
def test_thread_raise_safe_exit():
def delay_context_exit():
gate = brozzler.thread_accept_exceptions()
orig_exit = type(gate).__exit__
try:
type(gate).__exit__ = lambda self, et, ev, t: (
brozzler.sleep(2), orig_exit(self, et, ev, t), False)[-1]
with brozzler.thread_accept_exceptions() as gate:
brozzler.sleep(2)
except Exception as e: except Exception as e:
logging.info('caught exception %s', repr(e))
nonlocal thread_caught_exception nonlocal thread_caught_exception
thread_caught_exception = e thread_caught_exception = e
finally: finally:
logging.info('finishing') type(gate).__exit__ = orig_exit
let_thread_finish.clear()
thread_preamble_done.clear()
# test that thread_raise does not raise exception in a thread that has not # test that a second thread_raise() doesn't result in an exception in
# called thread_accept_exceptions # ThreadExceptionGate.__exit__
thread_caught_exception = None thread_caught_exception = None
th = threading.Thread(target=lambda: thread_target(accept_exceptions=False)) th = threading.Thread(target=delay_context_exit)
th.start() th.start()
thread_preamble_done.wait() time.sleep(0.2)
with pytest.raises(TypeError): brozzler.thread_raise(th, Exception1)
brozzler.thread_raise( time.sleep(0.2)
th, Exception("i'm an instance, which is not allowed")) brozzler.thread_raise(th, Exception2)
assert brozzler.thread_raise(th, Exception) is False
assert thread_caught_exception is None
let_thread_finish.set()
th.join()
assert thread_caught_exception is None
# test that thread_raise raises exception in a thread that has called
# thread_accept_exceptions
thread_caught_exception = None
th = threading.Thread(target=lambda: thread_target(accept_exceptions=True))
th.start()
thread_preamble_done.wait()
assert brozzler.thread_raise(th, Exception) is True
let_thread_finish.set()
th.join() th.join()
assert thread_caught_exception assert thread_caught_exception
with pytest.raises(threading.ThreadError): # thread is not running assert isinstance(thread_caught_exception, Exception1)
brozzler.thread_raise(th, Exception)
def test_thread_raise_pending_exception():
def accept_eventually():
try:
brozzler.sleep(2)
with brozzler.thread_accept_exceptions():
pass
except Exception as e:
nonlocal thread_caught_exception
thread_caught_exception = e
# test exception that has to wait for `with thread_exception_gate()` block
thread_caught_exception = None
th = threading.Thread(target=accept_eventually)
th.start()
brozzler.thread_raise(th, Exception1)
start = time.time()
th.join()
assert isinstance(thread_caught_exception, Exception1)
assert time.time() - start > 1.0
def test_thread_raise_second_with_block():
def two_with_blocks():
try:
with brozzler.thread_accept_exceptions():
time.sleep(2)
return # test fails
except Exception1 as e:
pass
except:
return # fail test
try:
with brozzler.thread_accept_exceptions():
brozzler.sleep(2)
except Exception as e:
nonlocal thread_caught_exception
thread_caught_exception = e
# test that second `with` block gets second exception raised during first
# `with` block
thread_caught_exception = None
th = threading.Thread(target=two_with_blocks)
th.start()
brozzler.thread_raise(th, Exception1)
brozzler.thread_raise(th, Exception2)
th.join()
assert isinstance(thread_caught_exception, Exception2)