Merge pull request #36 from nlevitt/safe-thread-raise

safen up brozzler.thread_raise() to avoid interrupting rethinkdb tran…
This commit is contained in:
Noah Levitt 2017-04-26 11:15:02 -07:00 committed by GitHub
commit d972919db0
5 changed files with 196 additions and 39 deletions

View File

@ -98,31 +98,128 @@ def behavior_script(url, template_parameters=None):
return script return script
return None return None
import threading
_thread_exception_gates = {}
_thread_exception_gates_lock = threading.Lock()
def thread_accept_exceptions():
'''
Returns a context manager whose purpose is best explained with a snippet:
# === thread1 ===
# If thread2 calls `thread_raise(thread1, ...)` while do_something() is
# executing, nothing will happen (no exception will be raised in
# thread1).
do_something()
try:
with thread_accept_exceptions():
# Now we're in the "runtime environment" (pep340) of the
# context manager. If thread2 calls `thread_raise(thread1,
# ...)` while do_something_else() is running, the exception
# will be raised here.
do_something_else()
# Here again if thread2 calls `thread_raise`, nothing happens.
do_yet_another_thing()
except:
handle_exception()
The context manager is reentrant, i.e. you can do this:
with thread_accept_exceptions():
with thread_accept_exceptions():
blah()
# `thread_raise` will still work here
toot()
'''
class ThreadExceptionGate:
def __init__(self):
self.lock = threading.Lock()
self.ok_to_raise = 0
def __enter__(self):
with self.lock:
self.ok_to_raise += 1
def __exit__(self, exc_type, exc_value, traceback):
with self.lock:
self.ok_to_raise -= 1
assert self.ok_to_raise >= 0
with _thread_exception_gates_lock:
if not threading.current_thread().ident in _thread_exception_gates:
_thread_exception_gates[
threading.current_thread().ident] = ThreadExceptionGate()
return _thread_exception_gates[threading.current_thread().ident]
def thread_raise(thread, exctype): def thread_raise(thread, exctype):
''' '''
Raises the exception exctype in the thread. Raises the exception `exctype` in the thread `thread`, if it is willing to
accept exceptions (see `thread_accept_exceptions`).
Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains: Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains:
"The exception will be raised only when executing python bytecode. If your "The exception will be raised only when executing python bytecode. If your
thread calls a native/built-in blocking function, the exception will be thread calls a native/built-in blocking function, the exception will be
raised only when execution returns to the python code." raised only when execution returns to the python code."
Returns:
True if exception was raised, False if `thread` is not accepting
exceptions, or another thread is in the middle of raising an exception
in `thread`
Raises:
threading.ThreadError if `thread` is not running
TypeError if `exctype` is not a class
ValueError, SystemError in case of unexpected problems
''' '''
import ctypes, inspect, threading import ctypes, inspect, threading, logging
if not thread.is_alive():
raise threading.ThreadError('thread %s is not running' % thread)
if not inspect.isclass(exctype): if not inspect.isclass(exctype):
raise TypeError( raise TypeError(
'cannot raise %s, only exception types can be raised (not ' 'cannot raise %s, only exception types can be raised (not '
'instances)' % exc_type) 'instances)' % exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(exctype)) if not thread.is_alive():
if res == 0: raise threading.ThreadError('thread %s is not running' % thread)
raise ValueError('invalid thread id? thread.ident=%s' % thread.ident)
elif res != 1: gate = _thread_exception_gates.get(thread.ident)
# if it returns a number greater than one, you're in trouble, if not gate:
# and you should call it again with exc=NULL to revert the effect logging.warn(
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) 'thread is not accepting exceptions (gate not initialized), '
raise SystemError('PyThreadState_SetAsyncExc failed') 'not raising %s in thread %s', exctype, thread)
return False
got_lock = gate.lock.acquire(blocking=False)
if not got_lock:
logging.warn(
'could not get acquire thread exception gate lock, not '
'raising %s in thread %s', exctype, thread)
return False
try:
if not gate.ok_to_raise:
logging.warn(
'thread is not accepting exceptions (gate.ok_to_raise is '
'%s), not raising %s in thread %s',
gate.ok_to_raise, exctype, thread)
return False
logging.info('raising %s in thread %s', exctype, thread)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(exctype))
if res == 0:
raise ValueError(
'invalid thread id? thread.ident=%s' % thread.ident)
elif res != 1:
# if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0)
raise SystemError('PyThreadState_SetAsyncExc failed')
return True
finally:
gate.lock.release()
def sleep(duration): def sleep(duration):
''' '''
@ -171,4 +268,5 @@ from brozzler.cli import suggest_default_chrome_exe
__all__ = ['Page', 'Site', 'BrozzlerWorker', 'is_permitted_by_robots', __all__ = ['Page', 'Site', 'BrozzlerWorker', 'is_permitted_by_robots',
'RethinkDbFrontier', 'Browser', 'BrowserPool', 'BrowsingException', 'RethinkDbFrontier', 'Browser', 'BrowserPool', 'BrowsingException',
'new_job', 'new_site', 'Job', 'new_job_file', 'InvalidJobConf'] 'new_job', 'new_site', 'Job', 'new_job_file', 'InvalidJobConf',
'sleep', 'thread_accept_exceptions', 'thread_raise']

View File

@ -428,21 +428,22 @@ class Browser:
if on_response: if on_response:
self.websock_thread.on_response = on_response self.websock_thread.on_response = on_response
try: try:
self.navigate_to_page( with brozzler.thread_accept_exceptions():
page_url, extra_headers=extra_headers, self.navigate_to_page(
user_agent=user_agent, timeout=300) page_url, extra_headers=extra_headers,
if password: user_agent=user_agent, timeout=300)
self.try_login(username, password, timeout=300) if password:
if on_screenshot: self.try_login(username, password, timeout=300)
jpeg_bytes = self.screenshot() if on_screenshot:
on_screenshot(jpeg_bytes) jpeg_bytes = self.screenshot()
behavior_script = brozzler.behavior_script( on_screenshot(jpeg_bytes)
page_url, behavior_parameters) behavior_script = brozzler.behavior_script(
self.run_behavior(behavior_script, timeout=900) page_url, behavior_parameters)
outlinks = self.extract_outlinks() self.run_behavior(behavior_script, timeout=900)
self.visit_hashtags(page_url, hashtags, outlinks) outlinks = self.extract_outlinks()
final_page_url = self.url() self.visit_hashtags(page_url, hashtags, outlinks)
return final_page_url, outlinks final_page_url = self.url()
return final_page_url, outlinks
except brozzler.ReachedLimit: except brozzler.ReachedLimit:
# websock_thread has stashed the ReachedLimit exception with # websock_thread has stashed the ReachedLimit exception with
# more information, raise that one # more information, raise that one

View File

@ -118,6 +118,7 @@ class BrozzlerWorker:
self._thread = None self._thread = None
self._start_stop_lock = threading.Lock() self._start_stop_lock = threading.Lock()
self._shutdown = threading.Event()
def _proxy_for(self, site): def _proxy_for(self, site):
if self._proxy: if self._proxy:
@ -233,7 +234,8 @@ class BrozzlerWorker:
def _try_youtube_dl(self, ydl, site, page): def _try_youtube_dl(self, ydl, site, page):
try: try:
self.logger.info("trying youtube-dl on {}".format(page)) self.logger.info("trying youtube-dl on {}".format(page))
info = ydl.extract_info(page.url) with brozzler.thread_accept_exceptions():
info = ydl.extract_info(page.url)
self._remember_videos(page, ydl.brozzler_spy) self._remember_videos(page, ydl.brozzler_spy)
# logging.info('XXX %s', json.dumps(info)) # logging.info('XXX %s', json.dumps(info))
if self._using_warcprox(site): if self._using_warcprox(site):
@ -517,7 +519,7 @@ class BrozzlerWorker:
self.logger.info("brozzler worker starting") self.logger.info("brozzler worker starting")
try: try:
latest_state = None latest_state = None
while True: while not self._shutdown.is_set():
self._service_heartbeat_if_due() self._service_heartbeat_if_due()
try: try:
browser = self._browser_pool.acquire() browser = self._browser_pool.acquire()
@ -543,6 +545,7 @@ class BrozzlerWorker:
except brozzler.NothingToClaim: except brozzler.NothingToClaim:
pass pass
time.sleep(0.5) time.sleep(0.5)
self.logger.info("shutdown requested")
except brozzler.ShutdownRequested: except brozzler.ShutdownRequested:
self.logger.info("shutdown requested") self.logger.info("shutdown requested")
except: except:
@ -586,12 +589,7 @@ class BrozzlerWorker:
self.stop() self.stop()
def stop(self): def stop(self):
with self._start_stop_lock: self._shutdown.set()
if self._thread and self._thread.is_alive():
self.logger.info("brozzler worker shutting down")
brozzler.thread_raise(self._thread, brozzler.ShutdownRequested)
self._thread.join()
self._thread = None
def is_alive(self): def is_alive(self):
return self._thread and self._thread.is_alive() return self._thread and self._thread.is_alive()

View File

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup( setuptools.setup(
name='brozzler', name='brozzler',
version='1.1b11.dev239', version='1.1b11.dev240',
description='Distributed web crawling with browsers', description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler', url='https://github.com/internetarchive/brozzler',
author='Noah Levitt', author='Noah Levitt',

View File

@ -30,6 +30,13 @@ import requests
import tempfile import tempfile
import uuid import uuid
import socket import socket
import time
import sys
logging.basicConfig(
stream=sys.stderr, level=logging.INFO, format=(
'%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s'))
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def httpd(request): def httpd(request):
@ -185,3 +192,56 @@ def test_start_stop_backwards_compat():
assert not 'started' in job assert not 'started' in job
assert not 'finished' in job assert not 'finished' in job
def test_thread_raise():
let_thread_finish = threading.Event()
thread_preamble_done = threading.Event()
thread_caught_exception = None
def thread_target(accept_exceptions=False):
try:
if accept_exceptions:
with brozzler.thread_accept_exceptions():
thread_preamble_done.set()
logging.info('waiting (accepting exceptions)')
let_thread_finish.wait()
else:
thread_preamble_done.set()
logging.info('waiting (not accepting exceptions)')
let_thread_finish.wait()
except Exception as e:
logging.info('caught exception %s', repr(e))
nonlocal thread_caught_exception
thread_caught_exception = e
finally:
logging.info('finishing')
let_thread_finish.clear()
thread_preamble_done.clear()
# test that thread_raise does not raise exception in a thread that has not
# called thread_accept_exceptions
thread_caught_exception = None
th = threading.Thread(target=lambda: thread_target(accept_exceptions=False))
th.start()
thread_preamble_done.wait()
with pytest.raises(TypeError):
brozzler.thread_raise(
th, Exception("i'm an instance, which is not allowed"))
assert brozzler.thread_raise(th, Exception) is False
assert thread_caught_exception is None
let_thread_finish.set()
th.join()
assert thread_caught_exception is None
# test that thread_raise raises exception in a thread that has called
# thread_accept_exceptions
thread_caught_exception = None
th = threading.Thread(target=lambda: thread_target(accept_exceptions=True))
th.start()
thread_preamble_done.wait()
assert brozzler.thread_raise(th, Exception) is True
let_thread_finish.set()
th.join()
assert thread_caught_exception
with pytest.raises(threading.ThreadError): # thread is not running
brozzler.thread_raise(th, Exception)