improve thread_raise() so that the new tests pass

1. If thread is not currently accepting exceptions, queue it and raise if and
   when it does start accepting them. This fixes problem of thread_raise
   exceptions being ignored when raised just before the target thread starts
   accepting exceptions.
2. Avoid problems caused by raising multiple exceptions in the same
   thread in quick succession by ensuring that only one is actually raised for
   a given `with` block. This type of occurrence had been putting brozzler into
   a borked/frozen state.
This commit is contained in:
Noah Levitt 2017-05-16 14:20:53 -07:00
parent d514eaec15
commit 31dc6a2d97
2 changed files with 85 additions and 90 deletions

View file

@ -98,80 +98,95 @@ def behavior_script(url, template_parameters=None):
return script return script
return None return None
class ThreadExceptionGate:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, thread):
self.thread = thread
self.ok_to_raise = threading.Event()
self.pending_exception = None
self.lock = threading.RLock()
def __enter__(self):
assert self.thread == threading.current_thread()
if self.pending_exception:
self.logger.info(
'raising pending exception %s', self.pending_exception)
tmp = self.pending_exception
self.pending_exception = None
raise tmp
else:
self.ok_to_raise.set()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.logger.info(
'self=%s exc_type=%s exc_value=%s', repr(self), repr(exc_type),
repr(exc_value))
assert self.thread == threading.current_thread()
self.ok_to_raise.clear()
return False # don't swallow exception
def queue_exception(self, e):
with self.lock:
if self.pending_exception:
self.logger.warn(
'%s already pending for thread %s, discarding %s',
repr(self.pending_exception), self.thread, repr(e))
else:
self.pending_exception = e
def __repr__(self):
return '<ThreadExceptionGate(%s)>' % self.thread
import threading import threading
_thread_exception_gates = {} _thread_exception_gates = {}
_thread_exception_gates_lock = threading.Lock() _thread_exception_gates_lock = threading.Lock()
def thread_accept_exceptions(): def thread_exception_gate(thread=None):
''' '''
Returns a context manager whose purpose is best explained with a snippet: Returns a `ThreadExceptionGate` for `thread` (current thread by default).
# === thread1 === `ThreadExceptionGate` is a context manager which allows exceptions to be
raised from threads other than the current one, by way of `thread_raise`.
# If thread2 calls `thread_raise(thread1, ...)` while do_something() is Example:
# executing, nothing will happen (no exception will be raised in
# thread1).
do_something()
try: try:
with thread_accept_exceptions(): with thread_exception_gate():
# Now we're in the "runtime environment" (pep340) of the # do something
# context manager. If thread2 calls `thread_raise(thread1,
# ...)` while do_something_else() is running, the exception
# will be raised here.
do_something_else()
# Here again if thread2 calls `thread_raise`, nothing happens.
do_yet_another_thing()
except: except:
handle_exception() # handle exception....
The context manager is reentrant, i.e. you can do this: If `thread_raise` is called on a thread that is not currently inside the
`ThreadExceptionGate` context (pep340 "runtime environment"), the exception
with thread_accept_exceptions(): is queued, and raised immediately if and when the thread enters the
with thread_accept_exceptions(): context. Only one exception will be queued this way at a time, others are
blah() discarded.
# `thread_raise` will still work here
toot()
''' '''
class ThreadExceptionGate: if not thread:
def __init__(self): thread = threading.current_thread()
self.lock = threading.Lock()
self.ok_to_raise = 0
def __enter__(self):
with self.lock:
self.ok_to_raise += 1
def __exit__(self, exc_type, exc_value, traceback):
with self.lock:
self.ok_to_raise -= 1
assert self.ok_to_raise >= 0
with _thread_exception_gates_lock: with _thread_exception_gates_lock:
if not threading.current_thread().ident in _thread_exception_gates: if not thread in _thread_exception_gates:
_thread_exception_gates[ _thread_exception_gates[thread] = ThreadExceptionGate(thread)
threading.current_thread().ident] = ThreadExceptionGate()
return _thread_exception_gates[threading.current_thread().ident] return _thread_exception_gates[thread]
thread_accept_exceptions = thread_exception_gate
def thread_raise(thread, exctype): def thread_raise(thread, exctype):
''' '''
Raises the exception `exctype` in the thread `thread`, if it is willing to Raises or queues the exception `exctype` for the thread `thread`.
accept exceptions (see `thread_accept_exceptions`).
See the documentation on the function `thread_exception_gate()` for more
information.
Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains: Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains:
"The exception will be raised only when executing python bytecode. If your "The exception will be raised only when executing python bytecode. If your
thread calls a native/built-in blocking function, the exception will be thread calls a native/built-in blocking function, the exception will be
raised only when execution returns to the python code." raised only when execution returns to the python code."
Returns:
True if exception was raised, False if `thread` is not accepting
exceptions, or another thread is in the middle of raising an exception
in `thread`
Raises: Raises:
threading.ThreadError if `thread` is not running
TypeError if `exctype` is not a class TypeError if `exctype` is not a class
ValueError, SystemError in case of unexpected problems ValueError, SystemError in case of unexpected problems
''' '''
@ -182,30 +197,10 @@ def thread_raise(thread, exctype):
'cannot raise %s, only exception types can be raised (not ' 'cannot raise %s, only exception types can be raised (not '
'instances)' % exctype) 'instances)' % exctype)
if not thread.is_alive(): gate = thread_exception_gate(thread)
raise threading.ThreadError('thread %s is not running' % thread) with gate.lock:
if gate.ok_to_raise.is_set() and thread.is_alive():
gate = _thread_exception_gates.get(thread.ident) gate.ok_to_raise.clear()
if not gate:
logging.warn(
'thread is not accepting exceptions (gate not initialized), '
'not raising %s in thread %s', exctype, thread)
return False
got_lock = gate.lock.acquire(blocking=False)
if not got_lock:
logging.warn(
'could not get acquire thread exception gate lock, not '
'raising %s in thread %s', exctype, thread)
return False
try:
if not gate.ok_to_raise:
logging.warn(
'thread is not accepting exceptions (gate.ok_to_raise is '
'%s), not raising %s in thread %s',
gate.ok_to_raise, exctype, thread)
return False
logging.info('raising %s in thread %s', exctype, thread) logging.info('raising %s in thread %s', exctype, thread)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc( res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(exctype)) ctypes.c_long(thread.ident), ctypes.py_object(exctype))
@ -217,9 +212,9 @@ def thread_raise(thread, exctype):
# and you should call it again with exc=NULL to revert the effect # and you should call it again with exc=NULL to revert the effect
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0)
raise SystemError('PyThreadState_SetAsyncExc failed') raise SystemError('PyThreadState_SetAsyncExc failed')
return True else:
finally: logging.info('queueing %s for thread %s', exctype, thread)
gate.lock.release() gate.queue_exception(exctype)
def sleep(duration): def sleep(duration):
''' '''

View file

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup( setuptools.setup(
name='brozzler', name='brozzler',
version='1.1b11.dev247', version='1.1b11.dev248',
description='Distributed web crawling with browsers', description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler', url='https://github.com/internetarchive/brozzler',
author='Noah Levitt', author='Noah Levitt',