diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 6a4303e..0cbaf0d 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -98,31 +98,88 @@ def behavior_script(url, template_parameters=None): return script return None +def thread_accept_exceptions(): + import threading + thread = threading.current_thread() + if hasattr(thread, 'thread_raise_lock'): + lock = thread.thread_raise_lock + else: + lock = threading.Lock() + with lock: + thread.thread_raise_lock = lock + thread.thread_raise_ok = True + +def thread_block_exceptions(): + import threading + thread = threading.current_thread() + if hasattr(thread, 'thread_raise_lock'): + with thread.thread_raise_lock: + thread.thread_raise_ok = False + def thread_raise(thread, exctype): ''' - Raises the exception exctype in the thread. + If `thread` has declared itself willing to accept exceptions by calling + `thread_accept_exceptions`, raises the exception `exctype` in the thread + `thread`. Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains: "The exception will be raised only when executing python bytecode. If your thread calls a native/built-in blocking function, the exception will be raised only when execution returns to the python code." + + Returns: + True if exception was raised, False if the thread is not accepting + exceptions or another thread is holding `thread.thread_raise_lock` + + Raises: + threading.ThreadError if `thread` is not running + TypeError if `exctype` is not a class + ValueError, SystemError in case of unexpected problems ''' - import ctypes, inspect, threading - if not thread.is_alive(): - raise threading.ThreadError('thread %s is not running' % thread) + import ctypes, inspect, threading, logging + if not inspect.isclass(exctype): raise TypeError( 'cannot raise %s, only exception types can be raised (not ' - 'instances)' % exc_type) - res = ctypes.pythonapi.PyThreadState_SetAsyncExc( - ctypes.c_long(thread.ident), ctypes.py_object(exctype)) - if res == 0: - raise ValueError('invalid thread id? thread.ident=%s' % thread.ident) - elif res != 1: - # if it returns a number greater than one, you're in trouble, - # and you should call it again with exc=NULL to revert the effect - ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) - raise SystemError('PyThreadState_SetAsyncExc failed') + 'instances)' % exctype) + + if not hasattr(thread, 'thread_raise_lock'): + logging.warn( + 'thread is not accepting exceptions (no member variable ' + '"thread_raise_lock"), not raising %s in thread %s', + exctype, thread) + return False + + got_lock = thread.thread_raise_lock.acquire(timeout=0.5) + if not got_lock: + logging.warn( + 'could not get acquire "thread_raise_lock", not raising %s in ' + 'thread %s', exctype, thread) + return False + try: + if not thread.thread_raise_ok: + logging.warn( + 'thread is not accepting exceptions (thread_raise_ok is ' + '%s), not raising %s in thread %s', + thread.thread_raise_ok, exctype, thread) + return False + + if not thread.is_alive(): + raise threading.ThreadError('thread %s is not running' % thread) + logging.info('raising %s in thread %s', exctype, thread) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread.ident), ctypes.py_object(exctype)) + if res == 0: + raise ValueError( + 'invalid thread id? thread.ident=%s' % thread.ident) + elif res != 1: + # if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect + ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) + raise SystemError('PyThreadState_SetAsyncExc failed') + return True + finally: + thread.thread_raise_lock.release() def sleep(duration): ''' diff --git a/brozzler/browser.py b/brozzler/browser.py index bd705f0..f4f0e99 100644 --- a/brozzler/browser.py +++ b/brozzler/browser.py @@ -427,6 +427,7 @@ class Browser: self.websock_thread.on_request = on_request if on_response: self.websock_thread.on_response = on_response + brozzler.thread_accept_exceptions() try: self.navigate_to_page( page_url, extra_headers=extra_headers, @@ -444,13 +445,16 @@ class Browser: final_page_url = self.url() return final_page_url, outlinks except brozzler.ReachedLimit: + brozzler.thread_block_exceptions() # websock_thread has stashed the ReachedLimit exception with # more information, raise that one raise self.websock_thread.reached_limit except websocket.WebSocketConnectionClosedException as e: + brozzler.thread_block_exceptions() self.logger.error('websocket closed, did chrome die?') raise BrowsingException(e) finally: + brozzler.thread_block_exceptions() self.is_browsing = False self.websock_thread.on_request = None self.websock_thread.on_response = None diff --git a/brozzler/worker.py b/brozzler/worker.py index bc5bb17..6625b56 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -118,6 +118,7 @@ class BrozzlerWorker: self._thread = None self._start_stop_lock = threading.Lock() + self._shutdown = threading.Event() def _proxy_for(self, site): if self._proxy: @@ -233,7 +234,9 @@ class BrozzlerWorker: def _try_youtube_dl(self, ydl, site, page): try: self.logger.info("trying youtube-dl on {}".format(page)) + brozzler.thread_accept_exceptions() info = ydl.extract_info(page.url) + brozzler.thread_block_exceptions() self._remember_videos(page, ydl.brozzler_spy) # logging.info('XXX %s', json.dumps(info)) if self._using_warcprox(site): @@ -517,7 +520,7 @@ class BrozzlerWorker: self.logger.info("brozzler worker starting") try: latest_state = None - while True: + while not self._shutdown.is_set(): self._service_heartbeat_if_due() try: browser = self._browser_pool.acquire() @@ -543,6 +546,7 @@ class BrozzlerWorker: except brozzler.NothingToClaim: pass time.sleep(0.5) + self.logger.info("shutdown requested") except brozzler.ShutdownRequested: self.logger.info("shutdown requested") except: @@ -586,12 +590,7 @@ class BrozzlerWorker: self.stop() def stop(self): - with self._start_stop_lock: - if self._thread and self._thread.is_alive(): - self.logger.info("brozzler worker shutting down") - brozzler.thread_raise(self._thread, brozzler.ShutdownRequested) - self._thread.join() - self._thread = None + self._shutdown.set() def is_alive(self): return self._thread and self._thread.is_alive() diff --git a/setup.py b/setup.py index 227bf00..8fad7fa 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b11.dev237', + version='1.1b11.dev238', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', diff --git a/tests/test_units.py b/tests/test_units.py index 911fceb..0af5202 100644 --- a/tests/test_units.py +++ b/tests/test_units.py @@ -30,6 +30,13 @@ import requests import tempfile import uuid import socket +import time +import sys + +logging.basicConfig( + stream=sys.stderr, level=logging.INFO, format=( + '%(asctime)s %(process)d %(levelname)s %(threadName)s ' + '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) @pytest.fixture(scope='module') def httpd(request): @@ -185,3 +192,67 @@ def test_start_stop_backwards_compat(): assert not 'started' in job assert not 'finished' in job +def test_thread_raise(): + let_thread_finish = threading.Event() + thread_preamble_done = threading.Event() + thread_caught_exception = None + + def thread_target(accept_exceptions=False, block_exceptions=False): + if accept_exceptions: + brozzler.thread_accept_exceptions() + if block_exceptions: + brozzler.thread_block_exceptions() + thread_preamble_done.set() + + try: + logging.info('waiting') + let_thread_finish.wait() + except Exception as e: + logging.info('caught exception %s', e) + nonlocal thread_caught_exception + thread_caught_exception = e + finally: + logging.info('finishing') + let_thread_finish.clear() + thread_preamble_done.clear() + + # test that thread_raise does not raise exception in a thread that has not + # called thread_accept_exceptions + thread_caught_exception = None + th = threading.Thread(target=lambda: thread_target()) + th.start() + thread_preamble_done.wait() + with pytest.raises(TypeError): + brozzler.thread_raise( + th, Exception("i'm an instance, which is not allowed")) + assert brozzler.thread_raise(th, Exception) is False + assert thread_caught_exception is None + let_thread_finish.set() + th.join() + assert thread_caught_exception is None + + # test that thread_raise raises exception in a thread that has called + # thread_accept_exceptions + thread_caught_exception = None + th = threading.Thread(target=lambda: thread_target(accept_exceptions=True)) + th.start() + thread_preamble_done.wait() + assert brozzler.thread_raise(th, Exception) is True + let_thread_finish.set() + th.join() + assert thread_caught_exception + with pytest.raises(threading.ThreadError): # thread is not running + brozzler.thread_raise(th, Exception) + + # test that thread_raise does not raise exception in a thread that has + # called thread_block_exceptions + thread_caught_exception = None + th = threading.Thread(target=lambda: thread_target(block_exceptions=True)) + th.start() + thread_preamble_done.wait() + assert brozzler.thread_raise(th, Exception) is False + let_thread_finish.set() + th.join() + assert thread_caught_exception is None + +