improving shutdown process

This commit is contained in:
Noah Levitt 2016-12-14 14:49:41 -08:00
parent f23f928c16
commit 5fa96b6438
5 changed files with 214 additions and 152 deletions

View File

@ -68,7 +68,7 @@ def fixup(url):
Does rudimentary canonicalization, such as converting IDN to punycode.
'''
import surt
hurl = _surt.handyurl.parse(url)
hurl = surt.handyurl.parse(url)
# handyurl.parse() already lowercases the scheme via urlsplit
if hurl.host:
hurl.host = hurl.host.encode('idna').decode('ascii').lower()
@ -115,22 +115,60 @@ def behavior_script(url, template_parameters=None):
logging.info(
'using behavior %s for %s',
behavior['behavior_js'], url)
return behavior['script']
elif 'behavior_js_template' in behavior:
parameters = dict()
if 'default_parameters' in behavior:
parameters.update(behavior['default_parameters'])
if template_parameters:
parameters.update(template_parameters)
javascript = behavior['template'].safe_substitute(parameters)
script = behavior['template'].safe_substitute(parameters)
logging.info(
'using template=%s populated with parameters=%s for %s',
repr(behavior['behavior_js_template']), parameters, url)
return behavior['script']
return script
return None
def thread_raise(thread, exctype):
'''
Raises the exception exctype in the thread.
Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains:
"The exception will be raised only when executing python bytecode. If your
thread calls a native/built-in blocking function, the exception will be
raised only when execution returns to the python code."
'''
import ctypes, inspect, threading
if not thread.is_alive():
raise threading.ThreadError('thread %s is not running' % thread)
if not inspect.isclass(exctype):
raise TypeError(
'cannot raise %s, only exception types can be raised (not '
'instances)' % exc_type)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(exctype))
if res == 0:
raise ValueError('invalid thread id? thread.ident=%s' % thread.ident)
elif res != 1:
# if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0)
raise SystemError('PyThreadState_SetAsyncExc failed')
def sleep(duration):
'''
Sleeps for duration seconds in increments of 0.5 seconds.
Use this so that the sleep can be interrupted by thread_raise().
'''
import time
start = time.time()
while True:
elapsed = time.time() - start
if elapsed >= duration:
break
time.sleep(min(duration - elapsed, 0.5))
from brozzler.site import Page, Site
from brozzler.worker import BrozzlerWorker
from brozzler.robots import is_permitted_by_robots

View File

@ -29,6 +29,7 @@ from requests.structures import CaseInsensitiveDict
import datetime
import base64
from brozzler.chrome import Chrome
import surt
class BrowsingException(Exception):
pass
@ -86,8 +87,10 @@ class BrowserPool:
self._in_use.remove(browser)
def shutdown_now(self):
for browser in self._in_use:
browser.abort_browse_page()
self.logger.info('shutting down browser pool')
with self._lock:
for browser in self._in_use:
browser.stop()
def num_available(self):
return len(self._available)
@ -137,8 +140,9 @@ class Browser:
'''
try:
if self.is_running():
self.chrome.stop()
self._browser_controller.stop()
self.websocket_url = None
self.chrome.stop()
except:
self.logger.error('problem stopping', exc_info=True)
@ -191,27 +195,41 @@ class Browser:
if self.is_browsing:
raise BrowsingException('browser is already busy browsing a page')
self.is_browsing = True
self._browser_controller.navigate_to_page(page_url, timeout=300)
## if login_credentials:
## self._browser_controller.try_login(login_credentials) (5 min?)
behavior_script = brozzler.behavior_script(
page_url, behavior_parameters)
self._browser_controller.run_behavior(behavior_script, timeout=900)
if on_screenshot:
self._browser_controller.scroll_to_top()
jpeg_bytes = self._browser_controller.screenshot()
on_screenshot(jpeg_bytes)
outlinks = self._browser_controller.extract_outlinks()
## for each hashtag not already visited:
## navigate_to_hashtag (nothing to wait for so no timeout?)
## if on_screenshot;
## take screenshot (30 sec)
## run behavior (3 min)
## outlinks += retrieve_outlinks (60 sec)
final_page_url = self._browser_controller.url()
self.is_browsing = False
return final_page_url, outlinks
try:
self._browser_controller.navigate_to_page(page_url, timeout=300)
## if login_credentials:
## self._browser_controller.try_login(login_credentials) (5 min?)
behavior_script = brozzler.behavior_script(
page_url, behavior_parameters)
self._browser_controller.run_behavior(behavior_script, timeout=900)
if on_screenshot:
self._browser_controller.scroll_to_top()
jpeg_bytes = self._browser_controller.screenshot()
on_screenshot(jpeg_bytes)
outlinks = self._browser_controller.extract_outlinks()
## for each hashtag not already visited:
## navigate_to_hashtag (nothing to wait for so no timeout?)
## if on_screenshot;
## take screenshot (30 sec)
## run behavior (3 min)
## outlinks += retrieve_outlinks (60 sec)
final_page_url = self._browser_controller.url()
return final_page_url, outlinks
except brozzler.ShutdownRequested:
self.logger.info('shutdown requested')
raise
except websocket.WebSocketConnectionClosedException as e:
# import pdb; pdb.set_trace()
raise BrowsingException(e)
# if not self.is_running():
# logging.info('appears shutdown was requested')
# return None, None
# else:
# raise BrowsingException(
# "websocket closed, did chrome die? %s" % (
# self.websocket_url))
finally:
self.is_browsing = False
class Counter:
def __init__(self):
@ -235,7 +253,6 @@ class BrowserController:
self._command_id = Counter()
self._websock_thread = None
self._websock_open = None
self._got_page_load_event = None
self._result_messages = {}
def _wait_for(self, callback, timeout=None):
@ -244,6 +261,7 @@ class BrowserController:
'''
start = time.time()
while True:
brozzler.sleep(0.5)
if callback():
return
elapsed = time.time() - start
@ -251,7 +269,6 @@ class BrowserController:
raise BrowsingTimeout(
'timed out after %.1fs waiting for: %s' % (
elapsed, callback))
time.sleep(0.5)
def __enter__(self):
self.start()
@ -262,18 +279,28 @@ class BrowserController:
def start(self):
if not self._websock_thread:
calling_thread = threading.current_thread()
def on_open(websock):
self._websock_open = datetime.datetime.utcnow()
def on_error(websock, e):
'''
Raises BrowsingException in the thread that called start()
'''
self.logger.error(
'exception from websocket receiver thread', exc_info=1)
brozzler.thread_raise(calling_thread, BrowsingException)
# open websocket, start thread that receives messages
self._websock = websocket.WebSocketApp(
self.websocket_url, on_open=on_open,
on_message=self._on_message)
on_message=self._on_message, on_error=on_error)
thread_name = 'WebsockThread:{}-{:%Y%m%d%H%M%S}'.format(
self.websocket_url, datetime.datetime.utcnow())
surt.handyurl.parse(self.websocket_url).port,
datetime.datetime.utcnow())
self._websock_thread = threading.Thread(
target=self._websock.run_forever, name=thread_name,
kwargs={'ping_timeout': 0.5})
daemon=True)
self._websock_thread.start()
self._wait_for(lambda: self._websock_open, timeout=10)
@ -296,6 +323,7 @@ class BrowserController:
if self._websock_thread:
if (self._websock and self._websock.sock
and self._websock.sock.connected):
self.logger.info('shutting down websocket connection')
try:
self._websock.close()
except BaseException as e:
@ -303,18 +331,19 @@ class BrowserController:
'exception closing websocket %s - %s',
self._websock, e)
self._websock_thread.join(timeout=30)
if self._websock_thread.is_alive():
self.logger.error(
'%s still alive 30 seconds after closing %s, will '
'forcefully nudge it again', self._websock_thread,
self._websock)
self._websock.keep_running = False
if self._websock_thread != threading.current_thread():
self._websock_thread.join(timeout=30)
if self._websock_thread.is_alive():
self.logger.critical(
'%s still alive 60 seconds after closing %s',
self._websock_thread, self._websock)
self.logger.error(
'%s still alive 30 seconds after closing %s, will '
'forcefully nudge it again', self._websock_thread,
self._websock)
self._websock.keep_running = False
self._websock_thread.join(timeout=30)
if self._websock_thread.is_alive():
self.logger.critical(
'%s still alive 60 seconds after closing %s',
self._websock_thread, self._websock)
def _on_message(self, websock, message):
try:
@ -323,18 +352,28 @@ class BrowserController:
self.logger.error(
'uncaught exception in _handle_message message=%s',
message, exc_info=True)
self.abort = True
def _handle_message(self, websock, json_message):
message = json.loads(json_message)
if 'method' in message:
if message['method'] == 'Page.loadEventFired':
self._got_page_load_event = datetime.datetime.utcnow()
elif message["method"] == "Debugger.paused":
elif message['method'] == 'Debugger.paused':
self._debugger_paused(message)
elif message['method'] == 'Console.messageAdded':
self.logger.debug(
'%s console.%s %s', self._websock.url,
message['params']['message']['level'],
message['params']['message']['text'])
else:
self.logger.debug("%s %s", message["method"], json_message)
elif 'result' in message:
if message['id'] in self._result_messages:
self._result_messages[message['id']] = message
else:
self.logger.debug("%s", json_message)
else:
self.logger.debug("%s", json_message)
def _debugger_paused(self, message):
# we hit the breakpoint set in start(), get rid of google analytics
@ -363,12 +402,6 @@ class BrowserController:
self, page_url, extra_headers=None, user_agent=None, timeout=300):
'''
'''
# navigate to about:blank here to avoid situation where we navigate to
# the same page that we're currently on, perhaps with a different
# #fragment, which prevents Page.loadEventFired from happening
self.send_to_chrome(
method='Page.navigate', params={'url': 'about:blank'})
headers = extra_headers or {}
headers['Accept-Encoding'] = 'identity'
self.send_to_chrome(
@ -382,9 +415,9 @@ class BrowserController:
# navigate to the page!
self.logger.info('navigating to page %s', page_url)
self._got_page_load_event = None
self.send_to_chrome(method='Page.navigate', params={'url': page_url})
self._wait_for(lambda:self._got_page_load_event, timeout=timeout)
self._wait_for(lambda: self._got_page_load_event, timeout=timeout)
OUTLINKS_JS = r'''
var __brzl_framesDone = new Set();
@ -464,7 +497,7 @@ __brzl_compileOutlinks(window).join('\n');
'behavior reached hard timeout after %.1fs', elapsed)
return
time.sleep(7)
brozzler.sleep(7)
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(

View File

@ -34,9 +34,7 @@ import tempfile
class Chrome:
logger = logging.getLogger(__module__ + '.' + __qualname__)
def __init__(
self, chrome_exe, port=9222, ignore_cert_errors=False, proxy=None,
cookie_db=None):
def __init__(self, chrome_exe, port=9222, ignore_cert_errors=False):
'''
Initializes instance of this class.
@ -47,17 +45,10 @@ class Chrome:
port: chrome debugging protocol port (default 9222)
ignore_cert_errors: configure chrome to accept all certs (default
False)
proxy: http proxy 'host:port' (default None)
cookie_db: raw bytes of chrome/chromium sqlite3 cookies database,
which, if supplied, will be written to
{chrome_user_data_dir}/Default/Cookies before running the
browser (default None)
'''
self.port = port
self.chrome_exe = chrome_exe
self.proxy = proxy
self.ignore_cert_errors = ignore_cert_errors
self.cookie_db = cookie_db
self._shutdown = threading.Event()
def __enter__(self):
@ -87,21 +78,19 @@ class Chrome:
return default_port
def _init_cookie_db(self):
if self.cookie_db is not None:
cookie_dir = os.path.join(self._chrome_user_data_dir, 'Default')
cookie_location = os.path.join(cookie_dir, 'Cookies')
self.logger.debug(
'cookie DB provided, writing to %s', cookie_location)
os.makedirs(cookie_dir, exist_ok=True)
def _init_cookie_db(self, cookie_db):
cookie_dir = os.path.join(self._chrome_user_data_dir, 'Default')
cookie_location = os.path.join(cookie_dir, 'Cookies')
self.logger.debug('cookie DB provided, writing to %s', cookie_location)
os.makedirs(cookie_dir, exist_ok=True)
try:
with open(cookie_location, 'wb') as cookie_file:
cookie_file.write(self.cookie_db)
except OSError:
self.logger.error(
'exception writing cookie file at %s',
cookie_location, exc_info=True)
try:
with open(cookie_location, 'wb') as cookie_file:
cookie_file.write(cookie_db)
except OSError:
self.logger.error(
'exception writing cookie file at %s',
cookie_location, exc_info=True)
def persist_and_read_cookie_db(self):
cookie_location = os.path.join(
@ -126,15 +115,26 @@ class Chrome:
cookie_location, exc_info=True)
return cookie_db
def start(self):
def start(self, proxy=None, cookie_db=None):
'''
Returns websocket url to chrome window with about:blank loaded.
Starts chrome/chromium process.
Args:
proxy: http proxy 'host:port' (default None)
cookie_db: raw bytes of chrome/chromium sqlite3 cookies database,
which, if supplied, will be written to
{chrome_user_data_dir}/Default/Cookies before running the
browser (default None)
Returns:
websocket url to chrome window with about:blank loaded
'''
# these can raise exceptions
self._home_tmpdir = tempfile.TemporaryDirectory()
self._chrome_user_data_dir = os.path.join(
self._home_tmpdir.name, 'chrome-user-data')
self._init_cookie_db()
if cookie_db:
self._init_cookie_db(cookie_db)
new_env = os.environ.copy()
new_env['HOME'] = self._home_tmpdir.name
@ -151,8 +151,8 @@ class Chrome:
'--disable-extensions', '--disable-save-password-bubble']
if self.ignore_cert_errors:
chrome_args.append('--ignore-certificate-errors')
if self.proxy:
chrome_args.append('--proxy-server=%s' % self.proxy)
if proxy:
chrome_args.append('--proxy-server=%s' % proxy)
chrome_args.append('about:blank')
self.logger.info(
'running: %s', repr(subprocess.list2cmdline(chrome_args)))
@ -162,7 +162,8 @@ class Chrome:
stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0)
self._out_reader_thread = threading.Thread(
target=self._read_stderr_stdout,
name='ChromeOutReaderThread(pid=%s)' % self.chrome_process.pid)
name='ChromeOutReaderThread(pid=%s)' % self.chrome_process.pid,
daemon=True)
self._out_reader_thread.start()
self.logger.info('chrome running, pid %s' % self.chrome_process.pid)

View File

@ -159,8 +159,8 @@ def brozzle_page():
f.write(screenshot_png)
logging.info('wrote screenshot to %s', filename)
browser = brozzler.Browser(chrome_exe=args.chrome_exe, proxy=site.proxy)
browser.start()
browser = brozzler.Browser(chrome_exe=args.chrome_exe)
browser.start(proxy=site.proxy)
try:
outlinks = worker.brozzle_page(
browser, site, page, on_screenshot=on_screenshot)
@ -276,9 +276,7 @@ def brozzler_worker():
def sigint(signum, frame):
raise brozzler.ShutdownRequested('shutdown requested (caught SIGINT)')
# do not print in signal handler to avoid RuntimeError: reentrant call
state_dump_msgs = []
def queue_state_dump(signum, frame):
def dump_state(signum, frame):
signal.signal(signal.SIGQUIT, signal.SIG_IGN)
try:
state_strs = []
@ -291,15 +289,15 @@ def brozzler_worker():
state_strs.append('<???:thread:ident=%s>' % ident)
stack = traceback.format_stack(frames[ident])
state_strs.append(''.join(stack))
state_dump_msgs.append(
logging.info(
'dumping state (caught signal %s)\n%s' % (
signum, '\n'.join(state_strs)))
except BaseException as e:
state_dump_msgs.append('exception dumping state: %s' % e)
logging.error('exception dumping state: %s' % e)
finally:
signal.signal(signal.SIGQUIT, queue_state_dump)
signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGQUIT, queue_state_dump)
signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint)
@ -311,17 +309,7 @@ def brozzler_worker():
frontier, service_registry, max_browsers=int(args.max_browsers),
chrome_exe=args.chrome_exe)
worker.start()
try:
while worker.is_alive():
while state_dump_msgs:
logging.warn(state_dump_msgs.pop(0))
time.sleep(0.5)
logging.critical('worker thread has died, shutting down')
except brozzler.ShutdownRequested as e:
pass
finally:
worker.shutdown_now()
worker.run()
logging.info('brozzler-worker is all done, exiting')

View File

@ -104,12 +104,10 @@ class BrozzlerWorker:
self._default_proxy = proxy
self._default_enable_warcprox_features = enable_warcprox_features
self._browser_pool = brozzler.browser.BrowserPool(max_browsers,
chrome_exe=chrome_exe, ignore_cert_errors=True)
self._shutdown_requested = threading.Event()
self._thread = None
self._start_stop_lock = threading.Lock()
self._browser_pool = brozzler.browser.BrowserPool(
max_browsers, chrome_exe=chrome_exe, ignore_cert_errors=True)
self._browsing_threads = set()
self._browsing_threads_lock = threading.Lock()
def _proxy(self, site):
if site.proxy:
@ -203,6 +201,8 @@ class BrozzlerWorker:
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
payload=info_json.encode("utf-8"),
extra_headers=site.extra_headers())
except brozzler.ShutdownRequested as e:
raise
except BaseException as e:
if hasattr(e, "exc_info") and e.exc_info[0] == youtube_dl.utils.UnsupportedError:
pass
@ -257,6 +257,8 @@ class BrozzlerWorker:
self._try_youtube_dl(ydl, site, page)
except brozzler.ReachedLimit as e:
raise
except brozzler.ShutdownRequested:
raise
except Exception as e:
if (hasattr(e, 'exc_info') and len(e.exc_info) >= 2
and hasattr(e.exc_info[1], 'code')
@ -323,11 +325,10 @@ class BrozzlerWorker:
start = time.time()
page = None
try:
while (not self._shutdown_requested.is_set()
and time.time() - start < 7 * 60):
while time.time() - start < 7 * 60:
self._frontier.honor_stop_request(site.job_id)
page = self._frontier.claim_page(site, "%s:%s" % (
socket.gethostname(), browser.chrome_port))
socket.gethostname(), browser.chrome.port))
if (page.needs_robots_check and
not brozzler.is_permitted_by_robots(site, page.url)):
@ -341,23 +342,24 @@ class BrozzlerWorker:
self._frontier.completed_page(site, page)
page = None
except brozzler.ShutdownRequested:
self.logger.info("shutdown requested")
except brozzler.NothingToClaim:
self.logger.info("no pages left for site %s", site)
except brozzler.ReachedLimit as e:
self._frontier.reached_limit(site, e)
except brozzler.CrawlJobStopped:
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
except brozzler.browser.BrowsingAborted:
self.logger.info("{} shut down".format(browser))
# except brozzler.browser.BrowsingAborted:
# self.logger.info("{} shut down".format(browser))
except:
self.logger.critical("unexpected exception", exc_info=True)
finally:
self.logger.info("finished session brozzling site, stopping "
"browser and disclaiming site")
browser.stop()
self._frontier.disclaim_site(site, page)
self._browser_pool.release(browser)
self._browsing_threads.remove(threading.current_thread())
with self._browsing_threads_lock:
self._browsing_threads.remove(threading.current_thread())
def _service_heartbeat(self):
if hasattr(self, "status_info"):
@ -381,17 +383,24 @@ class BrozzlerWorker:
"failed to send heartbeat and update service registry "
"with info %s: %s", status_info, e)
def _service_heartbeat_if_due(self):
'''Sends service registry heartbeat if due'''
due = False
if self._service_registry:
if not hasattr(self, "status_info"):
due = True
else:
d = rethinkstuff.utcnow() - self.status_info["last_heartbeat"]
due = d.total_seconds() > self.HEARTBEAT_INTERVAL
if due:
self._service_heartbeat()
def run(self):
try:
latest_state = None
while not self._shutdown_requested.is_set():
if self._service_registry and (
not hasattr(self, "status_info")
or (rethinkstuff.utcnow() -
self.status_info["last_heartbeat"]).total_seconds()
> self.HEARTBEAT_INTERVAL):
self._service_heartbeat()
while True:
self._service_heartbeat_if_due()
try:
browser = self._browser_pool.acquire()
try:
@ -401,11 +410,12 @@ class BrozzlerWorker:
"brozzling site (proxy=%s) %s",
repr(self._proxy(site)), site)
th = threading.Thread(
target=lambda: self._brozzle_site(
browser, site),
name="BrozzlingThread:%s" % site.seed)
target=self._brozzle_site, args=(browser, site),
name="BrozzlingThread:%s" % site.seed,
daemon=True)
with self._browsing_threads_lock:
self._browsing_threads.add(th)
th.start()
self._browsing_threads.add(th)
except:
self._browser_pool.release(browser)
raise
@ -419,6 +429,8 @@ class BrozzlerWorker:
self.logger.info("no unclaimed sites to browse")
latest_state = "no-unclaimed-sites"
time.sleep(0.5)
except brozzler.ShutdownRequested:
self.logger.info("shutdown requested")
except:
self.logger.critical(
"thread exiting due to unexpected exception",
@ -432,26 +444,16 @@ class BrozzlerWorker:
"failed to unregister from service registry",
exc_info=True)
def start(self):
with self._start_stop_lock:
if self._thread:
self.logger.warn(
'ignoring start request because self._thread is '
'not None')
return
self._thread = threading.Thread(
target=self.run, name="BrozzlerWorker")
self._thread.start()
def shutdown_now(self):
with self._start_stop_lock:
self.logger.info("brozzler worker shutting down")
self._shutdown_requested.set()
self.logger.info(
'shutting down %s brozzling threads',
len(self._browsing_threads))
with self._browsing_threads_lock:
for th in self._browsing_threads:
if th.is_alive():
brozzler.thread_raise(th, brozzler.ShutdownRequested)
self._browser_pool.shutdown_now()
while self._browsing_threads:
time.sleep(0.5)
self._thread = None
def is_alive(self):
return self._thread and self._thread.is_alive()
# copy to avoid "RuntimeError: Set changed size during iteration"
thredz = set(self._browsing_threads)
for th in thredz:
th.join()