mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-04-19 15:25:59 -04:00
safen up brozzler.thread_raise() to avoid interrupting rethinkdb transactions and such
This commit is contained in:
parent
b3fa7a4e39
commit
7706bab8b8
@ -98,31 +98,88 @@ def behavior_script(url, template_parameters=None):
|
||||
return script
|
||||
return None
|
||||
|
||||
def thread_accept_exceptions():
|
||||
import threading
|
||||
thread = threading.current_thread()
|
||||
if hasattr(thread, 'thread_raise_lock'):
|
||||
lock = thread.thread_raise_lock
|
||||
else:
|
||||
lock = threading.Lock()
|
||||
with lock:
|
||||
thread.thread_raise_lock = lock
|
||||
thread.thread_raise_ok = True
|
||||
|
||||
def thread_block_exceptions():
|
||||
import threading
|
||||
thread = threading.current_thread()
|
||||
if hasattr(thread, 'thread_raise_lock'):
|
||||
with thread.thread_raise_lock:
|
||||
thread.thread_raise_ok = False
|
||||
|
||||
def thread_raise(thread, exctype):
|
||||
'''
|
||||
Raises the exception exctype in the thread.
|
||||
If `thread` has declared itself willing to accept exceptions by calling
|
||||
`thread_accept_exceptions`, raises the exception `exctype` in the thread
|
||||
`thread`.
|
||||
|
||||
Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains:
|
||||
"The exception will be raised only when executing python bytecode. If your
|
||||
thread calls a native/built-in blocking function, the exception will be
|
||||
raised only when execution returns to the python code."
|
||||
|
||||
Returns:
|
||||
True if exception was raised, False if the thread is not accepting
|
||||
exceptions or another thread is holding `thread.thread_raise_lock`
|
||||
|
||||
Raises:
|
||||
threading.ThreadError if `thread` is not running
|
||||
TypeError if `exctype` is not a class
|
||||
ValueError, SystemError in case of unexpected problems
|
||||
'''
|
||||
import ctypes, inspect, threading
|
||||
if not thread.is_alive():
|
||||
raise threading.ThreadError('thread %s is not running' % thread)
|
||||
import ctypes, inspect, threading, logging
|
||||
|
||||
if not inspect.isclass(exctype):
|
||||
raise TypeError(
|
||||
'cannot raise %s, only exception types can be raised (not '
|
||||
'instances)' % exc_type)
|
||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
|
||||
ctypes.c_long(thread.ident), ctypes.py_object(exctype))
|
||||
if res == 0:
|
||||
raise ValueError('invalid thread id? thread.ident=%s' % thread.ident)
|
||||
elif res != 1:
|
||||
# if it returns a number greater than one, you're in trouble,
|
||||
# and you should call it again with exc=NULL to revert the effect
|
||||
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0)
|
||||
raise SystemError('PyThreadState_SetAsyncExc failed')
|
||||
'instances)' % exctype)
|
||||
|
||||
if not hasattr(thread, 'thread_raise_lock'):
|
||||
logging.warn(
|
||||
'thread is not accepting exceptions (no member variable '
|
||||
'"thread_raise_lock"), not raising %s in thread %s',
|
||||
exctype, thread)
|
||||
return False
|
||||
|
||||
got_lock = thread.thread_raise_lock.acquire(timeout=0.5)
|
||||
if not got_lock:
|
||||
logging.warn(
|
||||
'could not get acquire "thread_raise_lock", not raising %s in '
|
||||
'thread %s', exctype, thread)
|
||||
return False
|
||||
try:
|
||||
if not thread.thread_raise_ok:
|
||||
logging.warn(
|
||||
'thread is not accepting exceptions (thread_raise_ok is '
|
||||
'%s), not raising %s in thread %s',
|
||||
thread.thread_raise_ok, exctype, thread)
|
||||
return False
|
||||
|
||||
if not thread.is_alive():
|
||||
raise threading.ThreadError('thread %s is not running' % thread)
|
||||
logging.info('raising %s in thread %s', exctype, thread)
|
||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
|
||||
ctypes.c_long(thread.ident), ctypes.py_object(exctype))
|
||||
if res == 0:
|
||||
raise ValueError(
|
||||
'invalid thread id? thread.ident=%s' % thread.ident)
|
||||
elif res != 1:
|
||||
# if it returns a number greater than one, you're in trouble,
|
||||
# and you should call it again with exc=NULL to revert the effect
|
||||
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0)
|
||||
raise SystemError('PyThreadState_SetAsyncExc failed')
|
||||
return True
|
||||
finally:
|
||||
thread.thread_raise_lock.release()
|
||||
|
||||
def sleep(duration):
|
||||
'''
|
||||
|
@ -427,6 +427,7 @@ class Browser:
|
||||
self.websock_thread.on_request = on_request
|
||||
if on_response:
|
||||
self.websock_thread.on_response = on_response
|
||||
brozzler.thread_accept_exceptions()
|
||||
try:
|
||||
self.navigate_to_page(
|
||||
page_url, extra_headers=extra_headers,
|
||||
@ -444,13 +445,16 @@ class Browser:
|
||||
final_page_url = self.url()
|
||||
return final_page_url, outlinks
|
||||
except brozzler.ReachedLimit:
|
||||
brozzler.thread_block_exceptions()
|
||||
# websock_thread has stashed the ReachedLimit exception with
|
||||
# more information, raise that one
|
||||
raise self.websock_thread.reached_limit
|
||||
except websocket.WebSocketConnectionClosedException as e:
|
||||
brozzler.thread_block_exceptions()
|
||||
self.logger.error('websocket closed, did chrome die?')
|
||||
raise BrowsingException(e)
|
||||
finally:
|
||||
brozzler.thread_block_exceptions()
|
||||
self.is_browsing = False
|
||||
self.websock_thread.on_request = None
|
||||
self.websock_thread.on_response = None
|
||||
|
@ -118,6 +118,7 @@ class BrozzlerWorker:
|
||||
|
||||
self._thread = None
|
||||
self._start_stop_lock = threading.Lock()
|
||||
self._shutdown = threading.Event()
|
||||
|
||||
def _proxy_for(self, site):
|
||||
if self._proxy:
|
||||
@ -233,7 +234,9 @@ class BrozzlerWorker:
|
||||
def _try_youtube_dl(self, ydl, site, page):
|
||||
try:
|
||||
self.logger.info("trying youtube-dl on {}".format(page))
|
||||
brozzler.thread_accept_exceptions()
|
||||
info = ydl.extract_info(page.url)
|
||||
brozzler.thread_block_exceptions()
|
||||
self._remember_videos(page, ydl.brozzler_spy)
|
||||
# logging.info('XXX %s', json.dumps(info))
|
||||
if self._using_warcprox(site):
|
||||
@ -517,7 +520,7 @@ class BrozzlerWorker:
|
||||
self.logger.info("brozzler worker starting")
|
||||
try:
|
||||
latest_state = None
|
||||
while True:
|
||||
while not self._shutdown.is_set():
|
||||
self._service_heartbeat_if_due()
|
||||
try:
|
||||
browser = self._browser_pool.acquire()
|
||||
@ -543,6 +546,7 @@ class BrozzlerWorker:
|
||||
except brozzler.NothingToClaim:
|
||||
pass
|
||||
time.sleep(0.5)
|
||||
self.logger.info("shutdown requested")
|
||||
except brozzler.ShutdownRequested:
|
||||
self.logger.info("shutdown requested")
|
||||
except:
|
||||
@ -586,12 +590,7 @@ class BrozzlerWorker:
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
with self._start_stop_lock:
|
||||
if self._thread and self._thread.is_alive():
|
||||
self.logger.info("brozzler worker shutting down")
|
||||
brozzler.thread_raise(self._thread, brozzler.ShutdownRequested)
|
||||
self._thread.join()
|
||||
self._thread = None
|
||||
self._shutdown.set()
|
||||
|
||||
def is_alive(self):
|
||||
return self._thread and self._thread.is_alive()
|
||||
|
2
setup.py
2
setup.py
@ -32,7 +32,7 @@ def find_package_data(package):
|
||||
|
||||
setuptools.setup(
|
||||
name='brozzler',
|
||||
version='1.1b11.dev237',
|
||||
version='1.1b11.dev238',
|
||||
description='Distributed web crawling with browsers',
|
||||
url='https://github.com/internetarchive/brozzler',
|
||||
author='Noah Levitt',
|
||||
|
@ -30,6 +30,13 @@ import requests
|
||||
import tempfile
|
||||
import uuid
|
||||
import socket
|
||||
import time
|
||||
import sys
|
||||
|
||||
logging.basicConfig(
|
||||
stream=sys.stderr, level=logging.INFO, format=(
|
||||
'%(asctime)s %(process)d %(levelname)s %(threadName)s '
|
||||
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s'))
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def httpd(request):
|
||||
@ -185,3 +192,67 @@ def test_start_stop_backwards_compat():
|
||||
assert not 'started' in job
|
||||
assert not 'finished' in job
|
||||
|
||||
def test_thread_raise():
|
||||
let_thread_finish = threading.Event()
|
||||
thread_preamble_done = threading.Event()
|
||||
thread_caught_exception = None
|
||||
|
||||
def thread_target(accept_exceptions=False, block_exceptions=False):
|
||||
if accept_exceptions:
|
||||
brozzler.thread_accept_exceptions()
|
||||
if block_exceptions:
|
||||
brozzler.thread_block_exceptions()
|
||||
thread_preamble_done.set()
|
||||
|
||||
try:
|
||||
logging.info('waiting')
|
||||
let_thread_finish.wait()
|
||||
except Exception as e:
|
||||
logging.info('caught exception %s', e)
|
||||
nonlocal thread_caught_exception
|
||||
thread_caught_exception = e
|
||||
finally:
|
||||
logging.info('finishing')
|
||||
let_thread_finish.clear()
|
||||
thread_preamble_done.clear()
|
||||
|
||||
# test that thread_raise does not raise exception in a thread that has not
|
||||
# called thread_accept_exceptions
|
||||
thread_caught_exception = None
|
||||
th = threading.Thread(target=lambda: thread_target())
|
||||
th.start()
|
||||
thread_preamble_done.wait()
|
||||
with pytest.raises(TypeError):
|
||||
brozzler.thread_raise(
|
||||
th, Exception("i'm an instance, which is not allowed"))
|
||||
assert brozzler.thread_raise(th, Exception) is False
|
||||
assert thread_caught_exception is None
|
||||
let_thread_finish.set()
|
||||
th.join()
|
||||
assert thread_caught_exception is None
|
||||
|
||||
# test that thread_raise raises exception in a thread that has called
|
||||
# thread_accept_exceptions
|
||||
thread_caught_exception = None
|
||||
th = threading.Thread(target=lambda: thread_target(accept_exceptions=True))
|
||||
th.start()
|
||||
thread_preamble_done.wait()
|
||||
assert brozzler.thread_raise(th, Exception) is True
|
||||
let_thread_finish.set()
|
||||
th.join()
|
||||
assert thread_caught_exception
|
||||
with pytest.raises(threading.ThreadError): # thread is not running
|
||||
brozzler.thread_raise(th, Exception)
|
||||
|
||||
# test that thread_raise does not raise exception in a thread that has
|
||||
# called thread_block_exceptions
|
||||
thread_caught_exception = None
|
||||
th = threading.Thread(target=lambda: thread_target(block_exceptions=True))
|
||||
th.start()
|
||||
thread_preamble_done.wait()
|
||||
assert brozzler.thread_raise(th, Exception) is False
|
||||
let_thread_finish.set()
|
||||
th.join()
|
||||
assert thread_caught_exception is None
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user