From 7706bab8b824e0a18c2e883b712966faf2c5b271 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 20 Apr 2017 17:08:16 -0700 Subject: [PATCH 1/3] safen up brozzler.thread_raise() to avoid interrupting rethinkdb transactions and such --- brozzler/__init__.py | 85 ++++++++++++++++++++++++++++++++++++-------- brozzler/browser.py | 4 +++ brozzler/worker.py | 13 ++++--- setup.py | 2 +- tests/test_units.py | 71 ++++++++++++++++++++++++++++++++++++ 5 files changed, 153 insertions(+), 22 deletions(-) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 6a4303e..0cbaf0d 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -98,31 +98,88 @@ def behavior_script(url, template_parameters=None): return script return None +def thread_accept_exceptions(): + import threading + thread = threading.current_thread() + if hasattr(thread, 'thread_raise_lock'): + lock = thread.thread_raise_lock + else: + lock = threading.Lock() + with lock: + thread.thread_raise_lock = lock + thread.thread_raise_ok = True + +def thread_block_exceptions(): + import threading + thread = threading.current_thread() + if hasattr(thread, 'thread_raise_lock'): + with thread.thread_raise_lock: + thread.thread_raise_ok = False + def thread_raise(thread, exctype): ''' - Raises the exception exctype in the thread. + If `thread` has declared itself willing to accept exceptions by calling + `thread_accept_exceptions`, raises the exception `exctype` in the thread + `thread`. Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains: "The exception will be raised only when executing python bytecode. If your thread calls a native/built-in blocking function, the exception will be raised only when execution returns to the python code." + + Returns: + True if exception was raised, False if the thread is not accepting + exceptions or another thread is holding `thread.thread_raise_lock` + + Raises: + threading.ThreadError if `thread` is not running + TypeError if `exctype` is not a class + ValueError, SystemError in case of unexpected problems ''' - import ctypes, inspect, threading - if not thread.is_alive(): - raise threading.ThreadError('thread %s is not running' % thread) + import ctypes, inspect, threading, logging + if not inspect.isclass(exctype): raise TypeError( 'cannot raise %s, only exception types can be raised (not ' - 'instances)' % exc_type) - res = ctypes.pythonapi.PyThreadState_SetAsyncExc( - ctypes.c_long(thread.ident), ctypes.py_object(exctype)) - if res == 0: - raise ValueError('invalid thread id? thread.ident=%s' % thread.ident) - elif res != 1: - # if it returns a number greater than one, you're in trouble, - # and you should call it again with exc=NULL to revert the effect - ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) - raise SystemError('PyThreadState_SetAsyncExc failed') + 'instances)' % exctype) + + if not hasattr(thread, 'thread_raise_lock'): + logging.warn( + 'thread is not accepting exceptions (no member variable ' + '"thread_raise_lock"), not raising %s in thread %s', + exctype, thread) + return False + + got_lock = thread.thread_raise_lock.acquire(timeout=0.5) + if not got_lock: + logging.warn( + 'could not get acquire "thread_raise_lock", not raising %s in ' + 'thread %s', exctype, thread) + return False + try: + if not thread.thread_raise_ok: + logging.warn( + 'thread is not accepting exceptions (thread_raise_ok is ' + '%s), not raising %s in thread %s', + thread.thread_raise_ok, exctype, thread) + return False + + if not thread.is_alive(): + raise threading.ThreadError('thread %s is not running' % thread) + logging.info('raising %s in thread %s', exctype, thread) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread.ident), ctypes.py_object(exctype)) + if res == 0: + raise ValueError( + 'invalid thread id? thread.ident=%s' % thread.ident) + elif res != 1: + # if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect + ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) + raise SystemError('PyThreadState_SetAsyncExc failed') + return True + finally: + thread.thread_raise_lock.release() def sleep(duration): ''' diff --git a/brozzler/browser.py b/brozzler/browser.py index bd705f0..f4f0e99 100644 --- a/brozzler/browser.py +++ b/brozzler/browser.py @@ -427,6 +427,7 @@ class Browser: self.websock_thread.on_request = on_request if on_response: self.websock_thread.on_response = on_response + brozzler.thread_accept_exceptions() try: self.navigate_to_page( page_url, extra_headers=extra_headers, @@ -444,13 +445,16 @@ class Browser: final_page_url = self.url() return final_page_url, outlinks except brozzler.ReachedLimit: + brozzler.thread_block_exceptions() # websock_thread has stashed the ReachedLimit exception with # more information, raise that one raise self.websock_thread.reached_limit except websocket.WebSocketConnectionClosedException as e: + brozzler.thread_block_exceptions() self.logger.error('websocket closed, did chrome die?') raise BrowsingException(e) finally: + brozzler.thread_block_exceptions() self.is_browsing = False self.websock_thread.on_request = None self.websock_thread.on_response = None diff --git a/brozzler/worker.py b/brozzler/worker.py index bc5bb17..6625b56 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -118,6 +118,7 @@ class BrozzlerWorker: self._thread = None self._start_stop_lock = threading.Lock() + self._shutdown = threading.Event() def _proxy_for(self, site): if self._proxy: @@ -233,7 +234,9 @@ class BrozzlerWorker: def _try_youtube_dl(self, ydl, site, page): try: self.logger.info("trying youtube-dl on {}".format(page)) + brozzler.thread_accept_exceptions() info = ydl.extract_info(page.url) + brozzler.thread_block_exceptions() self._remember_videos(page, ydl.brozzler_spy) # logging.info('XXX %s', json.dumps(info)) if self._using_warcprox(site): @@ -517,7 +520,7 @@ class BrozzlerWorker: self.logger.info("brozzler worker starting") try: latest_state = None - while True: + while not self._shutdown.is_set(): self._service_heartbeat_if_due() try: browser = self._browser_pool.acquire() @@ -543,6 +546,7 @@ class BrozzlerWorker: except brozzler.NothingToClaim: pass time.sleep(0.5) + self.logger.info("shutdown requested") except brozzler.ShutdownRequested: self.logger.info("shutdown requested") except: @@ -586,12 +590,7 @@ class BrozzlerWorker: self.stop() def stop(self): - with self._start_stop_lock: - if self._thread and self._thread.is_alive(): - self.logger.info("brozzler worker shutting down") - brozzler.thread_raise(self._thread, brozzler.ShutdownRequested) - self._thread.join() - self._thread = None + self._shutdown.set() def is_alive(self): return self._thread and self._thread.is_alive() diff --git a/setup.py b/setup.py index 227bf00..8fad7fa 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b11.dev237', + version='1.1b11.dev238', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', diff --git a/tests/test_units.py b/tests/test_units.py index 911fceb..0af5202 100644 --- a/tests/test_units.py +++ b/tests/test_units.py @@ -30,6 +30,13 @@ import requests import tempfile import uuid import socket +import time +import sys + +logging.basicConfig( + stream=sys.stderr, level=logging.INFO, format=( + '%(asctime)s %(process)d %(levelname)s %(threadName)s ' + '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) @pytest.fixture(scope='module') def httpd(request): @@ -185,3 +192,67 @@ def test_start_stop_backwards_compat(): assert not 'started' in job assert not 'finished' in job +def test_thread_raise(): + let_thread_finish = threading.Event() + thread_preamble_done = threading.Event() + thread_caught_exception = None + + def thread_target(accept_exceptions=False, block_exceptions=False): + if accept_exceptions: + brozzler.thread_accept_exceptions() + if block_exceptions: + brozzler.thread_block_exceptions() + thread_preamble_done.set() + + try: + logging.info('waiting') + let_thread_finish.wait() + except Exception as e: + logging.info('caught exception %s', e) + nonlocal thread_caught_exception + thread_caught_exception = e + finally: + logging.info('finishing') + let_thread_finish.clear() + thread_preamble_done.clear() + + # test that thread_raise does not raise exception in a thread that has not + # called thread_accept_exceptions + thread_caught_exception = None + th = threading.Thread(target=lambda: thread_target()) + th.start() + thread_preamble_done.wait() + with pytest.raises(TypeError): + brozzler.thread_raise( + th, Exception("i'm an instance, which is not allowed")) + assert brozzler.thread_raise(th, Exception) is False + assert thread_caught_exception is None + let_thread_finish.set() + th.join() + assert thread_caught_exception is None + + # test that thread_raise raises exception in a thread that has called + # thread_accept_exceptions + thread_caught_exception = None + th = threading.Thread(target=lambda: thread_target(accept_exceptions=True)) + th.start() + thread_preamble_done.wait() + assert brozzler.thread_raise(th, Exception) is True + let_thread_finish.set() + th.join() + assert thread_caught_exception + with pytest.raises(threading.ThreadError): # thread is not running + brozzler.thread_raise(th, Exception) + + # test that thread_raise does not raise exception in a thread that has + # called thread_block_exceptions + thread_caught_exception = None + th = threading.Thread(target=lambda: thread_target(block_exceptions=True)) + th.start() + thread_preamble_done.wait() + assert brozzler.thread_raise(th, Exception) is False + let_thread_finish.set() + th.join() + assert thread_caught_exception is None + + From 0953e6972e152157d8f17564ea90cde33b25c478 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 24 Apr 2017 19:51:51 -0700 Subject: [PATCH 2/3] refactor thread_raise safety to use a context manager --- brozzler/__init__.py | 109 +++++++++++++++++++++++++++++-------------- tests/test_units.py | 35 +++++--------- 2 files changed, 87 insertions(+), 57 deletions(-) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 0cbaf0d..345a3ca 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -98,29 +98,67 @@ def behavior_script(url, template_parameters=None): return script return None +import threading +_thread_exception_gates = {} +_thread_exception_gates_lock = threading.Lock() def thread_accept_exceptions(): - import threading - thread = threading.current_thread() - if hasattr(thread, 'thread_raise_lock'): - lock = thread.thread_raise_lock - else: - lock = threading.Lock() - with lock: - thread.thread_raise_lock = lock - thread.thread_raise_ok = True + ''' + Returns a context manager whose purpose is best explained with a snippet: -def thread_block_exceptions(): - import threading - thread = threading.current_thread() - if hasattr(thread, 'thread_raise_lock'): - with thread.thread_raise_lock: - thread.thread_raise_ok = False + # === thread1 === + + # If thread2 calls `thread_raise(thread1, ...)` while do_something() is + # executing, nothing will happen (no exception will be raised in + # thread1). + do_something() + + try: + with thread_accept_exceptions(): + # Now we're in the "runtime environment" (pep340) of the + # context manager. If thread2 calls `thread_raise(thread1, + # ...)` while do_something_else() is running, the exception + # will be raised here. + do_something_else() + + # Here again if thread2 calls `thread_raise`, nothing happens. + do_yet_another_thing() + + except: + handle_exception() + + The context manager is reentrant, i.e. you can do this: + + with thread_accept_exceptions(): + with thread_accept_exceptions(): + blah() + + # `thread_raise` will still work here + toot() + ''' + class ThreadExceptionGate: + def __init__(self): + self.lock = threading.Lock() + self.ok_to_raise = 0 + + def __enter__(self): + with self.lock: + self.ok_to_raise += 1 + + def __exit__(self, exc_type, exc_value, traceback): + with self.lock: + self.ok_to_raise -= 1 + assert self.ok_to_raise >= 0 + + with _thread_exception_gates_lock: + if not threading.current_thread().ident in _thread_exception_gates: + _thread_exception_gates[ + threading.current_thread().ident] = ThreadExceptionGate() + return _thread_exception_gates[threading.current_thread().ident] def thread_raise(thread, exctype): ''' - If `thread` has declared itself willing to accept exceptions by calling - `thread_accept_exceptions`, raises the exception `exctype` in the thread - `thread`. + Raises the exception `exctype` in the thread `thread`, if it is willing to + accept exceptions (see `thread_accept_exceptions`). Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains: "The exception will be raised only when executing python bytecode. If your @@ -128,8 +166,9 @@ def thread_raise(thread, exctype): raised only when execution returns to the python code." Returns: - True if exception was raised, False if the thread is not accepting - exceptions or another thread is holding `thread.thread_raise_lock` + True if exception was raised, False if `thread` is not accepting + exceptions, or another thread is in the middle of raising an exception + in `thread` Raises: threading.ThreadError if `thread` is not running @@ -143,29 +182,30 @@ def thread_raise(thread, exctype): 'cannot raise %s, only exception types can be raised (not ' 'instances)' % exctype) - if not hasattr(thread, 'thread_raise_lock'): + if not thread.is_alive(): + raise threading.ThreadError('thread %s is not running' % thread) + + gate = _thread_exception_gates.get(thread.ident) + if not gate: logging.warn( - 'thread is not accepting exceptions (no member variable ' - '"thread_raise_lock"), not raising %s in thread %s', - exctype, thread) + 'thread is not accepting exceptions (gate not initialized), ' + 'not raising %s in thread %s', exctype, thread) return False - got_lock = thread.thread_raise_lock.acquire(timeout=0.5) + got_lock = gate.lock.acquire(blocking=False) if not got_lock: logging.warn( - 'could not get acquire "thread_raise_lock", not raising %s in ' - 'thread %s', exctype, thread) + 'could not get acquire thread exception gate lock, not ' + 'raising %s in thread %s', exctype, thread) return False try: - if not thread.thread_raise_ok: + if not gate.ok_to_raise: logging.warn( - 'thread is not accepting exceptions (thread_raise_ok is ' + 'thread is not accepting exceptions (gate.ok_to_raise is ' '%s), not raising %s in thread %s', - thread.thread_raise_ok, exctype, thread) + gate.ok_to_raise, exctype, thread) return False - if not thread.is_alive(): - raise threading.ThreadError('thread %s is not running' % thread) logging.info('raising %s in thread %s', exctype, thread) res = ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(thread.ident), ctypes.py_object(exctype)) @@ -179,7 +219,7 @@ def thread_raise(thread, exctype): raise SystemError('PyThreadState_SetAsyncExc failed') return True finally: - thread.thread_raise_lock.release() + gate.lock.release() def sleep(duration): ''' @@ -228,4 +268,5 @@ from brozzler.cli import suggest_default_chrome_exe __all__ = ['Page', 'Site', 'BrozzlerWorker', 'is_permitted_by_robots', 'RethinkDbFrontier', 'Browser', 'BrowserPool', 'BrowsingException', - 'new_job', 'new_site', 'Job', 'new_job_file', 'InvalidJobConf'] + 'new_job', 'new_site', 'Job', 'new_job_file', 'InvalidJobConf', + 'sleep', 'thread_accept_exceptions', 'thread_raise'] diff --git a/tests/test_units.py b/tests/test_units.py index 0af5202..2ff7d36 100644 --- a/tests/test_units.py +++ b/tests/test_units.py @@ -197,18 +197,19 @@ def test_thread_raise(): thread_preamble_done = threading.Event() thread_caught_exception = None - def thread_target(accept_exceptions=False, block_exceptions=False): - if accept_exceptions: - brozzler.thread_accept_exceptions() - if block_exceptions: - brozzler.thread_block_exceptions() - thread_preamble_done.set() - + def thread_target(accept_exceptions=False): try: - logging.info('waiting') - let_thread_finish.wait() + if accept_exceptions: + with brozzler.thread_accept_exceptions(): + thread_preamble_done.set() + logging.info('waiting (accepting exceptions)') + let_thread_finish.wait() + else: + thread_preamble_done.set() + logging.info('waiting (not accepting exceptions)') + let_thread_finish.wait() except Exception as e: - logging.info('caught exception %s', e) + logging.info('caught exception %s', repr(e)) nonlocal thread_caught_exception thread_caught_exception = e finally: @@ -219,7 +220,7 @@ def test_thread_raise(): # test that thread_raise does not raise exception in a thread that has not # called thread_accept_exceptions thread_caught_exception = None - th = threading.Thread(target=lambda: thread_target()) + th = threading.Thread(target=lambda: thread_target(accept_exceptions=False)) th.start() thread_preamble_done.wait() with pytest.raises(TypeError): @@ -244,15 +245,3 @@ def test_thread_raise(): with pytest.raises(threading.ThreadError): # thread is not running brozzler.thread_raise(th, Exception) - # test that thread_raise does not raise exception in a thread that has - # called thread_block_exceptions - thread_caught_exception = None - th = threading.Thread(target=lambda: thread_target(block_exceptions=True)) - th.start() - thread_preamble_done.wait() - assert brozzler.thread_raise(th, Exception) is False - let_thread_finish.set() - th.join() - assert thread_caught_exception is None - - From d916b68ab937a3b4ef953d736eb7974302dbca8b Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 24 Apr 2017 20:02:34 -0700 Subject: [PATCH 3/3] use the new api `with brozzler.thread_accept_exceptions()` --- brozzler/browser.py | 35 ++++++++++++++++------------------- brozzler/worker.py | 5 ++--- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/brozzler/browser.py b/brozzler/browser.py index f4f0e99..76036a7 100644 --- a/brozzler/browser.py +++ b/brozzler/browser.py @@ -427,34 +427,31 @@ class Browser: self.websock_thread.on_request = on_request if on_response: self.websock_thread.on_response = on_response - brozzler.thread_accept_exceptions() try: - self.navigate_to_page( - page_url, extra_headers=extra_headers, - user_agent=user_agent, timeout=300) - if password: - self.try_login(username, password, timeout=300) - if on_screenshot: - jpeg_bytes = self.screenshot() - on_screenshot(jpeg_bytes) - behavior_script = brozzler.behavior_script( - page_url, behavior_parameters) - self.run_behavior(behavior_script, timeout=900) - outlinks = self.extract_outlinks() - self.visit_hashtags(page_url, hashtags, outlinks) - final_page_url = self.url() - return final_page_url, outlinks + with brozzler.thread_accept_exceptions(): + self.navigate_to_page( + page_url, extra_headers=extra_headers, + user_agent=user_agent, timeout=300) + if password: + self.try_login(username, password, timeout=300) + if on_screenshot: + jpeg_bytes = self.screenshot() + on_screenshot(jpeg_bytes) + behavior_script = brozzler.behavior_script( + page_url, behavior_parameters) + self.run_behavior(behavior_script, timeout=900) + outlinks = self.extract_outlinks() + self.visit_hashtags(page_url, hashtags, outlinks) + final_page_url = self.url() + return final_page_url, outlinks except brozzler.ReachedLimit: - brozzler.thread_block_exceptions() # websock_thread has stashed the ReachedLimit exception with # more information, raise that one raise self.websock_thread.reached_limit except websocket.WebSocketConnectionClosedException as e: - brozzler.thread_block_exceptions() self.logger.error('websocket closed, did chrome die?') raise BrowsingException(e) finally: - brozzler.thread_block_exceptions() self.is_browsing = False self.websock_thread.on_request = None self.websock_thread.on_response = None diff --git a/brozzler/worker.py b/brozzler/worker.py index 6625b56..365dff1 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -234,9 +234,8 @@ class BrozzlerWorker: def _try_youtube_dl(self, ydl, site, page): try: self.logger.info("trying youtube-dl on {}".format(page)) - brozzler.thread_accept_exceptions() - info = ydl.extract_info(page.url) - brozzler.thread_block_exceptions() + with brozzler.thread_accept_exceptions(): + info = ydl.extract_info(page.url) self._remember_videos(page, ydl.brozzler_spy) # logging.info('XXX %s', json.dumps(info)) if self._using_warcprox(site):