major refactoring of browsing code to make it easier to add functionality

This commit is contained in:
Noah Levitt 2016-12-15 16:42:45 -08:00 committed by GitHub
parent f6333df6ef
commit c71854127d
6 changed files with 618 additions and 639 deletions

View File

@ -17,11 +17,7 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import json as _json
import logging as _logging
import surt as _surt
from pkg_resources import get_distribution as _get_distribution
__version__ = _get_distribution('brozzler').version
class ShutdownRequested(Exception):
@ -35,9 +31,11 @@ class CrawlJobStopped(Exception):
class ReachedLimit(Exception):
def __init__(self, http_error=None, warcprox_meta=None, http_payload=None):
import json
if http_error:
if "warcprox-meta" in http_error.headers:
self.warcprox_meta = _json.loads(http_error.headers["warcprox-meta"])
self.warcprox_meta = json.loads(
http_error.headers["warcprox-meta"])
else:
self.warcprox_meta = None
self.http_payload = http_error.read()
@ -69,7 +67,8 @@ def fixup(url):
'''
Does rudimentary canonicalization, such as converting IDN to punycode.
'''
hurl = _surt.handyurl.parse(url)
import surt
hurl = surt.handyurl.parse(url)
# handyurl.parse() already lowercases the scheme via urlsplit
if hurl.host:
hurl.host = hurl.host.encode('idna').decode('ascii').lower()
@ -78,6 +77,98 @@ def fixup(url):
# logging level more fine-grained than logging.DEBUG==10
TRACE = 5
_behaviors = None
def behaviors():
import os, yaml, string
global _behaviors
if _behaviors is None:
behaviors_yaml = os.path.join(
os.path.dirname(__file__), 'behaviors.yaml')
with open(behaviors_yaml) as fin:
conf = yaml.load(fin)
_behaviors = conf['behaviors']
for behavior in _behaviors:
if 'behavior_js' in behavior:
behavior_js = os.path.join(
os.path.dirname(__file__), 'behaviors.d',
behavior['behavior_js'])
with open(behavior_js, encoding='utf-8') as fin:
behavior['script'] = fin.read()
elif 'behavior_js_template' in behavior:
behavior_js_template = os.path.join(
os.path.dirname(__file__), 'behaviors.d',
behavior['behavior_js_template'])
with open(behavior_js_template, encoding='utf-8') as fin:
behavior['template'] = string.Template(fin.read())
return _behaviors
def behavior_script(url, template_parameters=None):
'''
Returns the javascript behavior string populated with template_parameters.
'''
import re, logging
for behavior in behaviors():
if re.match(behavior['url_regex'], url):
if 'behavior_js' in behavior:
logging.info(
'using behavior %s for %s',
behavior['behavior_js'], url)
return behavior['script']
elif 'behavior_js_template' in behavior:
parameters = dict()
if 'default_parameters' in behavior:
parameters.update(behavior['default_parameters'])
if template_parameters:
parameters.update(template_parameters)
script = behavior['template'].safe_substitute(parameters)
logging.info(
'using template=%s populated with parameters=%s for %s',
repr(behavior['behavior_js_template']), parameters, url)
return script
return None
def thread_raise(thread, exctype):
'''
Raises the exception exctype in the thread.
Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains:
"The exception will be raised only when executing python bytecode. If your
thread calls a native/built-in blocking function, the exception will be
raised only when execution returns to the python code."
'''
import ctypes, inspect, threading
if not thread.is_alive():
raise threading.ThreadError('thread %s is not running' % thread)
if not inspect.isclass(exctype):
raise TypeError(
'cannot raise %s, only exception types can be raised (not '
'instances)' % exc_type)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(exctype))
if res == 0:
raise ValueError('invalid thread id? thread.ident=%s' % thread.ident)
elif res != 1:
# if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0)
raise SystemError('PyThreadState_SetAsyncExc failed')
def sleep(duration):
'''
Sleeps for duration seconds in increments of 0.5 seconds.
Use this so that the sleep can be interrupted by thread_raise().
'''
import time
start = time.time()
while True:
elapsed = time.time() - start
if elapsed >= duration:
break
time.sleep(min(duration - elapsed, 0.5))
from brozzler.site import Page, Site
from brozzler.worker import BrozzlerWorker
from brozzler.robots import is_permitted_by_robots

View File

@ -1,137 +0,0 @@
'''
brozzler/behaviors.py - manages behaviors, which are javascript scripts that
run in brozzled web pages
Copyright (C) 2014-2016 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import json
import itertools
import os
import re
import logging
import time
import sys
import yaml
import string
__all__ = ["Behavior"]
class Behavior:
logger = logging.getLogger(__module__ + "." + __qualname__)
_behaviors = None
@staticmethod
def behaviors():
if Behavior._behaviors is None:
behaviors_yaml = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.yaml'])
with open(behaviors_yaml) as fin:
conf = yaml.load(fin)
Behavior._behaviors = conf['behaviors']
for behavior in Behavior._behaviors:
if "behavior_js" in behavior:
behavior_js = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ["behaviors.d"] + [behavior["behavior_js"]])
with open(behavior_js, encoding="utf-8") as fin:
behavior["script"] = fin.read()
elif "behavior_js_template" in behavior:
behavior_js_template = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ["behaviors.d"] + [behavior["behavior_js_template"]])
with open(behavior_js_template, encoding="utf-8") as fin:
behavior["template"] = string.Template(fin.read())
return Behavior._behaviors
def __init__(self, url, umbra_worker):
self.url = url
self.umbra_worker = umbra_worker
self.script_finished = False
self.waiting_result_msg_ids = []
self.active_behavior = None
self.last_activity = time.time()
def start(self, template_parameters=None):
for behavior in Behavior.behaviors():
if re.match(behavior['url_regex'], self.url):
if "behavior_js" in behavior:
self.logger.info("using %s behavior for %s",
behavior["behavior_js"], self.url)
elif "behavior_js_template" in behavior:
parameters = dict()
if "default_parameters" in behavior:
parameters.update(behavior["default_parameters"])
if template_parameters:
parameters.update(template_parameters)
behavior["script"] = behavior["template"].safe_substitute(parameters)
self.logger.info(
"using template=%s populated with parameters=%s for %s",
repr(behavior["behavior_js_template"]),
parameters, self.url)
self.active_behavior = behavior
self.umbra_worker.send_to_chrome(
method="Runtime.evaluate", suppress_logging=True,
params={"expression": behavior["script"]})
self.notify_of_activity()
return
self.logger.warn("no behavior to run on {}".format(self.url))
def is_finished(self):
"""Asynchronously asks behavior if it is finished, and in the mean time
returns the response from the previous such query."""
msg_id = self.umbra_worker.send_to_chrome(method="Runtime.evaluate",
suppress_logging=True, params={"expression":"umbraBehaviorFinished()"})
self.waiting_result_msg_ids.append(msg_id)
request_idle_timeout_sec = 30
if self.active_behavior and 'request_idle_timeout_sec' in self.active_behavior:
request_idle_timeout_sec = self.active_behavior['request_idle_timeout_sec']
idle_time = time.time() - self.last_activity
return self.script_finished and idle_time > request_idle_timeout_sec
def is_waiting_on_result(self, msg_id):
return msg_id in self.waiting_result_msg_ids
def notify_of_result(self, chrome_message):
# {'id': 59, 'result': {'result': {'type': 'boolean', 'value': True}, 'wasThrown': False}}
# {'id': 59, 'result': {'result': {'type': 'boolean', 'value': False}}
self.waiting_result_msg_ids.remove(chrome_message['id'])
if ('result' in chrome_message and not (
'wasThrown' in chrome_message['result']
and chrome_message['result']['wasThrown'])
and 'result' in chrome_message['result']
and type(chrome_message['result']['result']['value']) == bool):
self.script_finished = chrome_message['result']['result']['value']
else:
# this happens if the behavior script doesn't define
# umbraBehaviorFinished, and I think it can also happen normally
# after the behavior has been sent to the browser but before
# the browser has it fully loaded... in any case the message
# was overwhelming the logs, so I'm bumping it down to debug level
self.logger.debug(
"chrome message doesn't look like a boolean result! %s",
chrome_message)
def notify_of_activity(self):
self.last_activity = time.time()
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logger = logging.getLogger('umbra.behaviors')
logger.info("custom behaviors: {}".format(Behavior.behaviors()))

View File

@ -17,50 +17,67 @@ limitations under the License.
'''
import logging
import json
import time
import brozzler
import itertools
import json
import websocket
import time
import threading
import os
import random
import brozzler
from brozzler.chrome import Chrome
from brozzler.behaviors import Behavior
from requests.structures import CaseInsensitiveDict
import base64
import sqlite3
import datetime
import base64
from brozzler.chrome import Chrome
import surt
__all__ = ["BrowserPool", "Browser"]
class BrowsingException(Exception):
pass
class NoBrowsersAvailable(Exception):
pass
class BrowsingTimeout(BrowsingException):
pass
class BrowserPool:
logger = logging.getLogger(__module__ + "." + __qualname__)
logger = logging.getLogger(__module__ + '.' + __qualname__)
BASE_PORT = 9200
def __init__(self, size=3, **kwargs):
"""kwargs are passed on to Browser.__init__"""
'''
Initializes the pool.
Args:
size: size of pool (default 3)
**kwargs: arguments for Browser(...)
'''
self.size = size
self._available = set()
self._in_use = set()
for i in range(0, size):
browser = Browser(BrowserPool.BASE_PORT + i, **kwargs)
browser = Browser(port=BrowserPool.BASE_PORT + i, **kwargs)
self._available.add(browser)
self._lock = threading.Lock()
def acquire(self):
"""
Returns browser from pool if available, raises NoBrowsersAvailable
otherwise.
"""
'''
Returns an available instance.
Returns:
browser from pool, if available
Raises:
NoBrowsersAvailable if none available
'''
with self._lock:
try:
browser = self._available.pop()
except KeyError:
raise NoBrowsersAvailable()
raise NoBrowsersAvailable
self._in_use.add(browser)
return browser
@ -70,8 +87,14 @@ class BrowserPool:
self._in_use.remove(browser)
def shutdown_now(self):
for browser in self._in_use:
browser.abort_browse_page()
self.logger.info(
'shutting down browser pool (%s browsers in use)',
len(self._in_use))
with self._lock:
for browser in self._available:
browser.stop()
for browser in self._in_use:
browser.stop()
def num_available(self):
return len(self._available)
@ -79,46 +102,23 @@ class BrowserPool:
def num_in_use(self):
return len(self._in_use)
class NoBrowsersAvailable(Exception):
pass
class BrowsingException(Exception):
pass
class BrowsingAborted(BrowsingException):
pass
class ResultMessageTimeout(BrowsingException):
pass
class Browser:
"""
Runs chrome/chromium to synchronously browse one page at a time using
worker.browse_page(). Should not be accessed from multiple threads.
"""
'''
Manages an instance of Chrome for browsing pages.
'''
logger = logging.getLogger(__module__ + '.' + __qualname__)
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, **kwargs):
'''
Initializes the Browser.
HARD_TIMEOUT_SECONDS = 20 * 60
def __init__(
self, chrome_port=9222, chrome_exe='chromium-browser', proxy=None,
ignore_cert_errors=False):
self.command_id = itertools.count(1)
self.chrome_port = chrome_port
self.chrome_exe = chrome_exe
self.proxy = proxy
self.ignore_cert_errors = ignore_cert_errors
self._behavior = None
self._websock = None
self._abort_browse_page = False
self._chrome_instance = None
self._aw_snap_hes_dead_jim = None
self._work_dir = None
self._websocket_url = None
def __repr__(self):
return "{}.{}:{}".format(Browser.__module__, Browser.__qualname__, self.chrome_port)
Args:
**kwargs: arguments for Chrome(...)
'''
self.chrome = Chrome(**kwargs)
self.websocket_url = None
self.is_browsing = False
self._browser_controller = None
def __enter__(self):
self.start()
@ -127,115 +127,298 @@ class Browser:
def __exit__(self, *args):
self.stop()
def start(self, proxy=None, cookie_db=None):
if not self._chrome_instance:
self._chrome_instance = Chrome(
port=self.chrome_port, executable=self.chrome_exe,
ignore_cert_errors=self.ignore_cert_errors,
proxy=proxy or self.proxy, cookie_db=None)
try:
self._websocket_url = self._chrome_instance.start()
except:
self._chrome_instance = None
raise
def start(self, **kwargs):
'''
Starts chrome if it's not running.
Args:
**kwargs: arguments for self.chrome.start(...)
'''
if not self.is_running():
self.websocket_url = self.chrome.start(**kwargs)
self._browser_controller = BrowserController(self.websocket_url)
self._browser_controller.start()
def stop(self):
'''
Stops chrome if it's running.
'''
try:
if self.is_running():
self._chrome_instance.stop()
self._chrome_instance = None
self._websocket_url = None
if self._browser_controller:
self._browser_controller.stop()
self.websocket_url = None
self.chrome.stop()
except:
self.logger.error("problem stopping", exc_info=True)
self.logger.error('problem stopping', exc_info=True)
def is_running(self):
return bool(self._websocket_url)
def abort_browse_page(self):
self._abort_browse_page = True
def persist_and_read_cookie_db(self):
if self._chrome_instance:
return self._chrome_instance.persist_and_read_cookie_db()
else:
return None
return self.websocket_url is not None
def browse_page(
self, url, extra_headers=None, behavior_parameters=None,
user_agent=None,
on_request=None, on_response=None, on_screenshot=None,
on_url_change=None):
"""
Synchronously loads a page, runs behaviors, and takes a screenshot.
self, page_url, ignore_cert_errors=False, extra_headers=None,
user_agent=None, behavior_parameters=None,
on_request=None, on_response=None, on_screenshot=None):
'''
Browses page in browser.
Raises BrowsingException if browsing the page fails in a non-critical
way.
Browser should already be running, i.e. start() should have been
called. Opens the page_url in the browser, runs behaviors, takes a
screenshot, extracts outlinks.
Returns extracted outlinks.
"""
Args:
page_url: url of the page to browse
extra_headers: dict of extra http headers to configure the browser
to send with every request (default None)
user_agent: user agent string, replaces browser default if
supplied (default None)
behavior_parameters: dict of parameters for populating the
javascript behavior template (default None)
on_request: callback to invoke on every Network.requestWillBeSent
event, takes one argument, the json-decoded message (default
None)
on_response: callback to invoke on every Network.responseReceived
event, takes one argument, the json-decoded message (default
None)
on_screenshot: callback to invoke when screenshot is obtained,
takes one argument, the the raw jpeg bytes (default None)
# XXX takes two arguments, the url of the page at the time the
# screenshot was taken, and the raw jpeg bytes (default None)
Returns:
A tuple (final_page_url, outlinks).
final_page_url: the url in the location bar at the end of the
browse_page cycle, which could be different from the original
page url if the page redirects, javascript has changed the url
in the location bar, etc
outlinks: a list of navigational links extracted from the page
Raises:
BrowsingException: if browsing the page fails
'''
if not self.is_running():
raise BrowsingException("browser has not been started")
self.url = url
self.extra_headers = extra_headers
self.user_agent = user_agent
self.on_request = on_request
self.on_screenshot = on_screenshot
self.on_url_change = on_url_change
self.on_response = on_response
self.behavior_parameters = behavior_parameters
self._outlinks = None
self._reached_limit = None
self._aw_snap_hes_dead_jim = None
self._abort_browse_page = False
self._has_screenshot = False
self._waiting_on_result_messages = {}
self._result_message_timeout = None
self._websock = websocket.WebSocketApp(
self._websocket_url, on_open=self._visit_page,
on_message=self._wrap_handle_message)
thread_name = "WebsockThread:{}-{:%Y%m%d%H%M%S}".format(
self.chrome_port, datetime.datetime.utcnow())
websock_thread = threading.Thread(
target=self._websock.run_forever, name=thread_name,
kwargs={'ping_timeout':0.5})
websock_thread.start()
self._start = time.time()
aborted = False
raise BrowsingException('browser has not been started')
if self.is_browsing:
raise BrowsingException('browser is already busy browsing a page')
self.is_browsing = True
try:
while True:
time.sleep(0.5)
if self._browse_interval_func():
return self._outlinks
self._browser_controller.navigate_to_page(page_url, timeout=300)
## if login_credentials:
## self._browser_controller.try_login(login_credentials) (5 min?)
behavior_script = brozzler.behavior_script(
page_url, behavior_parameters)
self._browser_controller.run_behavior(behavior_script, timeout=900)
if on_screenshot:
self._browser_controller.scroll_to_top()
jpeg_bytes = self._browser_controller.screenshot()
on_screenshot(jpeg_bytes)
outlinks = self._browser_controller.extract_outlinks()
## for each hashtag not already visited:
## navigate_to_hashtag (nothing to wait for so no timeout?)
## if on_screenshot;
## take screenshot (30 sec)
## run behavior (3 min)
## outlinks += retrieve_outlinks (60 sec)
final_page_url = self._browser_controller.url()
return final_page_url, outlinks
except websocket.WebSocketConnectionClosedException as e:
self.logger.error('websocket closed, did chrome die?')
raise BrowsingException(e)
finally:
self.is_browsing = False
class Counter:
def __init__(self):
self.next_value = 0
def __next__(self):
try:
return self.next_value
finally:
self.next_value += 1
def peek_next(self):
return self.next_value
class BrowserController:
'''
'''
logger = logging.getLogger(__module__ + '.' + __qualname__)
def __init__(self, websocket_url):
self.websocket_url = websocket_url
self._command_id = Counter()
self._websock_thread = None
self._websock_open = None
self._result_messages = {}
def _wait_for(self, callback, timeout=None):
'''
Spins until callback() returns truthy.
'''
start = time.time()
while True:
brozzler.sleep(0.5)
if callback():
return
elapsed = time.time() - start
if timeout and elapsed > timeout:
raise BrowsingTimeout(
'timed out after %.1fs waiting for: %s' % (
elapsed, callback))
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
def start(self):
if not self._websock_thread:
calling_thread = threading.current_thread()
def on_open(websock):
self._websock_open = datetime.datetime.utcnow()
def on_error(websock, e):
'''
Raises BrowsingException in the thread that called start()
'''
if isinstance(e, websocket.WebSocketConnectionClosedException):
self.logger.error('websocket closed, did chrome die?')
else:
self.logger.error(
'exception from websocket receiver thread',
exc_info=1)
brozzler.thread_raise(calling_thread, BrowsingException)
# open websocket, start thread that receives messages
self._websock = websocket.WebSocketApp(
self.websocket_url, on_open=on_open,
on_message=self._on_message, on_error=on_error)
thread_name = 'WebsockThread:{}-{:%Y%m%d%H%M%S}'.format(
surt.handyurl.parse(self.websocket_url).port,
datetime.datetime.utcnow())
self._websock_thread = threading.Thread(
target=self._websock.run_forever, name=thread_name,
daemon=True)
self._websock_thread.start()
self._wait_for(lambda: self._websock_open, timeout=10)
# tell browser to send messages we're interested in
self.send_to_chrome(method='Network.enable')
self.send_to_chrome(method='Page.enable')
self.send_to_chrome(method='Console.enable')
self.send_to_chrome(method='Debugger.enable')
self.send_to_chrome(method='Runtime.enable')
# disable google analytics, see _handle_message() where breakpoint
# is caught Debugger.paused
self.send_to_chrome(
method='Debugger.setBreakpointByUrl',
params={
'lineNumber': 1,
'urlRegex': 'https?://www.google-analytics.com/analytics.js'})
def stop(self, *args):
if self._websock_thread:
if (self._websock and self._websock.sock
and self._websock.sock.connected):
self.logger.info('shutting down websocket connection')
try:
self._websock.close()
except BaseException as e:
self.logger.error(
"exception closing websocket %s - %s" % (
self._websock, e))
'exception closing websocket %s - %s',
self._websock, e)
websock_thread.join(timeout=30)
if websock_thread.is_alive():
self.logger.error(
"%s still alive 30 seconds after closing %s, will "
"forcefully nudge it again" % (
websock_thread, self._websock))
self._websock.keep_running = False
websock_thread.join(timeout=30)
if websock_thread.is_alive():
self.logger.critical(
"%s still alive 60 seconds after closing %s" % (
websock_thread, self._websock))
if self._websock_thread != threading.current_thread():
self._websock_thread.join(timeout=30)
if self._websock_thread.is_alive():
self.logger.error(
'%s still alive 30 seconds after closing %s, will '
'forcefully nudge it again', self._websock_thread,
self._websock)
self._websock.keep_running = False
self._websock_thread.join(timeout=30)
if self._websock_thread.is_alive():
self.logger.critical(
'%s still alive 60 seconds after closing %s',
self._websock_thread, self._websock)
self._behavior = None
def _on_message(self, websock, message):
try:
self._handle_message(websock, message)
except:
self.logger.error(
'uncaught exception in _handle_message message=%s',
message, exc_info=True)
OUTLINKS_JS = r"""
def _handle_message(self, websock, json_message):
message = json.loads(json_message)
if 'method' in message:
if message['method'] == 'Page.loadEventFired':
self._got_page_load_event = datetime.datetime.utcnow()
elif message['method'] == 'Debugger.paused':
self._debugger_paused(message)
elif message['method'] == 'Console.messageAdded':
self.logger.debug(
'%s console.%s %s', self._websock.url,
message['params']['message']['level'],
message['params']['message']['text'])
# else:
# self.logger.debug("%s %s", message["method"], json_message)
elif 'result' in message:
if message['id'] in self._result_messages:
self._result_messages[message['id']] = message
# else:
# self.logger.debug("%s", json_message)
# else:
# self.logger.debug("%s", json_message)
def _debugger_paused(self, message):
# we hit the breakpoint set in start(), get rid of google analytics
self.logger.debug('debugger paused! message=%s', message)
scriptId = message['params']['callFrames'][0]['location']['scriptId']
# replace script
self.send_to_chrome(
method='Debugger.setScriptSource',
params={'scriptId': scriptId,
'scriptSource': 'console.log("google analytics is no more!");'})
# resume execution
self.send_to_chrome(method='Debugger.resume')
def send_to_chrome(self, suppress_logging=False, **kwargs):
msg_id = next(self._command_id)
kwargs['id'] = msg_id
msg = json.dumps(kwargs)
if not suppress_logging:
self.logger.debug('sending message to %s: %s', self._websock, msg)
self._websock.send(msg)
return msg_id
def navigate_to_page(
self, page_url, extra_headers=None, user_agent=None, timeout=300):
'''
'''
headers = extra_headers or {}
headers['Accept-Encoding'] = 'identity'
self.send_to_chrome(
method='Network.setExtraHTTPHeaders',
params={'headers': headers})
if user_agent:
self.send_to_chrome(
method='Network.setUserAgentOverride',
params={'userAgent': user_agent})
# navigate to the page!
self.logger.info('navigating to page %s', page_url)
self._got_page_load_event = None
self.send_to_chrome(method='Page.navigate', params={'url': page_url})
self._wait_for(lambda: self._got_page_load_event, timeout=timeout)
OUTLINKS_JS = r'''
var __brzl_framesDone = new Set();
var __brzl_compileOutlinks = function(frame) {
__brzl_framesDone.add(frame);
@ -244,265 +427,94 @@ var __brzl_compileOutlinks = function(frame) {
frame.document.querySelectorAll('a[href]'));
for (var i = 0; i < frame.frames.length; i++) {
if (frame.frames[i] && !__brzl_framesDone.has(frame.frames[i])) {
outlinks = outlinks.concat(__brzl_compileOutlinks(frame.frames[i]));
outlinks = outlinks.concat(
__brzl_compileOutlinks(frame.frames[i]));
}
}
}
return outlinks;
}
__brzl_compileOutlinks(window).join('\n');
"""
def _chain_chrome_messages(self, chain):
"""
Sends a series of messages to chrome/chromium on the debugging protocol
websocket. Waits for a reply from each one before sending the next.
Enforces a timeout waiting for each reply. If the timeout is hit, sets
self._result_message_timeout with a ResultMessageTimeout (an exception
class). Takes an array of dicts, each of which should look like this:
{
"info": "human readable description",
"chrome_msg": { ... }, # message to send to chrome, as a dict
"timeout": 30, # timeout in seconds
"callback": my_callback, # takes one arg, the result message
}
The code is rather convoluted because of the asynchronous nature of the
whole thing. See how it's used in _start_postbehavior_chain.
"""
timer = None
def callback(message):
if timer:
timer.cancel()
if "callback" in chain[0]:
chain[0]["callback"](message)
self._chain_chrome_messages(chain[1:])
def timeout():
self._result_message_timeout = ResultMessageTimeout(
"timed out after %.1fs waiting for result message "
"for %s", chain[0]["timeout"], chain[0]["chrome_msg"])
if chain:
msg_id = self.send_to_chrome(**chain[0]["chrome_msg"])
self._waiting_on_result_messages[msg_id] = callback
self.logger.info(
"msg_id=%s for message %s", msg_id, chain[0]["chrome_msg"])
timer = threading.Timer(chain[0]["timeout"], timeout)
timer.daemon = True
timer.start()
else:
self.logger.info("finished chrome message chain")
def _start_postbehavior_chain(self):
if self.on_screenshot:
chain = [{
"info": "scrolling to top",
"chrome_msg": {
"method": "Runtime.evaluate",
"params": {"expression": "window.scrollTo(0, 0);"},
},
"timeout": 30,
"callback": lambda message: None,
}, {
"info": "requesting screenshot",
"chrome_msg": {"method": "Page.captureScreenshot"},
"timeout": 30,
"callback": lambda message: (
self.on_screenshot and self.on_screenshot(
base64.b64decode(message["result"]["data"]))),
}]
else:
chain = []
def set_outlinks(message):
if message["result"]["result"]["value"]:
self._outlinks = frozenset(
message["result"]["result"]["value"].split("\n"))
else:
self._outlinks = frozenset()
chain.append({
"info": "retrieving outlinks",
"chrome_msg": {
"method": "Runtime.evaluate",
"params": {"expression": self.OUTLINKS_JS},
},
"timeout": 60,
"callback": set_outlinks,
})
self._chain_chrome_messages(chain)
def _browse_interval_func(self):
"""Called periodically while page is being browsed. Returns True when
finished browsing."""
if (not self._websock or not self._websock.sock
or not self._websock.sock.connected):
raise BrowsingException(
"websocket closed, did chrome die? {}".format(
self._websocket_url))
elif self._result_message_timeout:
raise self._result_message_timeout
elif self._aw_snap_hes_dead_jim:
raise BrowsingException(
"""chrome tab went "aw snap" or "he's dead jim"!""")
elif self._outlinks is not None:
# setting self._outlinks is the last thing that happens in the
# post-behavior chain
return True
elif (self._behavior != None and self._behavior.is_finished()
or time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS):
if self._behavior and self._behavior.is_finished():
self.logger.info(
"behavior decided it's finished with %s", self.url)
else:
self.logger.info(
"reached hard timeout of %s seconds url=%s",
Browser.HARD_TIMEOUT_SECONDS, self.url)
self._behavior = None
self._start_postbehavior_chain()
return False
elif self._reached_limit:
raise self._reached_limit
elif self._abort_browse_page:
raise BrowsingAborted("browsing page aborted")
else:
return False
def send_to_chrome(self, suppress_logging=False, **kwargs):
msg_id = next(self.command_id)
kwargs["id"] = msg_id
msg = json.dumps(kwargs)
if not suppress_logging:
self.logger.debug("sending message to %s: %s", self._websock, msg)
self._websock.send(msg)
return msg_id
def _visit_page(self, websock):
# navigate to about:blank here to avoid situation where we navigate to
# the same page that we're currently on, perhaps with a different
# #fragment, which prevents Page.loadEventFired from happening
self.send_to_chrome(method="Page.navigate", params={"url": "about:blank"})
self.send_to_chrome(method="Network.enable")
self.send_to_chrome(method="Page.enable")
self.send_to_chrome(method="Console.enable")
self.send_to_chrome(method="Debugger.enable")
self.send_to_chrome(method="Runtime.enable")
headers = self.extra_headers or {}
headers['Accept-Encoding'] = 'identity'
self.send_to_chrome(
method="Network.setExtraHTTPHeaders",
params={"headers":headers})
if self.user_agent:
self.send_to_chrome(method="Network.setUserAgentOverride", params={"userAgent": self.user_agent})
# disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused"
self.send_to_chrome(method="Debugger.setBreakpointByUrl", params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"})
# navigate to the page!
self.send_to_chrome(method="Page.navigate", params={"url": self.url})
def _wrap_handle_message(self, websock, message):
try:
self._handle_message(websock, message)
except:
self.logger.error(
"uncaught exception in _handle_message message=%s",
message, exc_info=True)
self.abort_browse_page()
def _network_request_will_be_sent(self, message):
if self._behavior:
self._behavior.notify_of_activity()
if message["params"]["request"]["url"].lower().startswith("data:"):
self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80]))
elif self.on_request:
self.on_request(message)
def _network_response_received(self, message):
if (not self._reached_limit
and message["params"]["response"]["status"] == 420
and "Warcprox-Meta" in CaseInsensitiveDict(
message["params"]["response"]["headers"])):
warcprox_meta = json.loads(CaseInsensitiveDict(
message["params"]["response"]["headers"])["Warcprox-Meta"])
self._reached_limit = brozzler.ReachedLimit(
warcprox_meta=warcprox_meta)
self.logger.info("reached limit %s", self._reached_limit)
if self.on_response:
self.on_response(message)
def _page_load_event_fired(self, message):
def page_url_after_load_event(message):
if message["result"]["result"]["value"] != self.url:
if self.on_url_change:
self.on_url_change(message["result"]["result"]["value"])
'''
def extract_outlinks(self, timeout=60):
self.logger.info('extracting outlinks')
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(
method="Runtime.evaluate",
params={"expression":"document.URL"})
self._waiting_on_result_messages[msg_id] = page_url_after_load_event
method='Runtime.evaluate',
params={'expression': self.OUTLINKS_JS})
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
if message['result']['result']['value']:
return frozenset(message['result']['result']['value'].split('\n'))
else:
self._outlinks = frozenset()
self.logger.info("Page.loadEventFired, moving on to starting behaviors url={}".format(self.url))
self._behavior = Behavior(self.url, self)
self._behavior.start(self.behavior_parameters)
def screenshot(self, timeout=30):
self.logger.info('taking screenshot')
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(method='Page.captureScreenshot')
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
jpeg_bytes = base64.b64decode(message['result']['data'])
return jpeg_bytes
def _console_message_added(self, message):
self.logger.debug("%s console.%s %s", self._websock.url,
message["params"]["message"]["level"],
message["params"]["message"]["text"])
def scroll_to_top(self, timeout=30):
self.logger.info('scrolling to top')
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(
method='Runtime.evaluate',
params={'expression': 'window.scrollTo(0, 0);'})
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
self._result_messages.pop(msg_id)
def _debugger_paused(self, message):
# We hit the breakpoint set in visit_page. Get rid of google
# analytics script!
self.logger.debug("debugger paused! message={}".format(message))
scriptId = message['params']['callFrames'][0]['location']['scriptId']
def url(self, timeout=30):
'''
Returns value of document.URL from the browser.
'''
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(
method='Runtime.evaluate',
params={'expression': 'document.URL'})
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
return message['result']['result']['value']
# replace script
self.send_to_chrome(method="Debugger.setScriptSource", params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"})
def run_behavior(self, behavior_script, timeout=900):
self.send_to_chrome(
method='Runtime.evaluate', suppress_logging=True,
params={'expression': behavior_script})
# resume execution
self.send_to_chrome(method="Debugger.resume")
start = time.time()
while True:
elapsed = time.time() - start
if elapsed > timeout:
logging.info(
'behavior reached hard timeout after %.1fs', elapsed)
return
brozzler.sleep(7)
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(
method='Runtime.evaluate', suppress_logging=True,
params={'expression': 'umbraBehaviorFinished()'})
try:
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=5)
msg = self._result_messages.get(msg_id)
if (msg and 'result' in msg
and not ('wasThrown' in msg['result']
and msg['result']['wasThrown'])
and 'result' in msg['result']
and type(msg['result']['result']['value']) == bool
and msg['result']['result']['value']):
self.logger.info('behavior decided it has finished')
return
except BrowsingTimeout:
pass
def _handle_message(self, websock, json_message):
message = json.loads(json_message)
if "method" in message:
if message["method"] == "Network.requestWillBeSent":
self._network_request_will_be_sent(message)
elif message["method"] == "Network.responseReceived":
self._network_response_received(message)
elif message["method"] == "Page.loadEventFired":
self._page_load_event_fired(message)
elif message["method"] == "Console.messageAdded":
self._console_message_added(message)
elif message["method"] == "Debugger.paused":
self._debugger_paused(message)
elif message["method"] == "Inspector.targetCrashed":
self._aw_snap_hes_dead_jim = message
# elif message["method"] in (
# "Network.dataReceived", "Network.responseReceived",
# "Network.loadingFinished"):
# pass
# else:
# self.logger.debug("%s %s", message["method"], json_message)
elif "result" in message:
if message["id"] in self._waiting_on_result_messages:
callback = self._waiting_on_result_messages[message["id"]]
del self._waiting_on_result_messages[message["id"]]
self.logger.debug(
"received result for message id=%s, calling %s",
message["id"], callback)
callback(message)
elif self._behavior and self._behavior.is_waiting_on_result(
message["id"]):
self._behavior.notify_of_result(message)
# else:
# self.logger.debug("%s", json_message)
# else:
# self.logger.debug("%s", json_message)

View File

@ -34,15 +34,23 @@ import tempfile
class Chrome:
logger = logging.getLogger(__module__ + '.' + __qualname__)
def __init__(
self, port, executable, proxy=None, ignore_cert_errors=False,
cookie_db=None):
def __init__(self, chrome_exe, port=9222, ignore_cert_errors=False):
'''
Initializes instance of this class.
Doesn't start the browser, start() does that.
Args:
chrome_exe: filesystem path to chrome/chromium executable
port: chrome debugging protocol port (default 9222)
ignore_cert_errors: configure chrome to accept all certs (default
False)
'''
self.port = port
self.executable = executable
self.proxy = proxy
self.chrome_exe = chrome_exe
self.ignore_cert_errors = ignore_cert_errors
self.cookie_db = cookie_db
self._shutdown = threading.Event()
self.chrome_process = None
def __enter__(self):
'''
@ -71,21 +79,19 @@ class Chrome:
return default_port
def _init_cookie_db(self):
if self.cookie_db is not None:
cookie_dir = os.path.join(self._chrome_user_data_dir, 'Default')
cookie_location = os.path.join(cookie_dir, 'Cookies')
self.logger.debug(
'cookie DB provided, writing to %s', cookie_location)
os.makedirs(cookie_dir, exist_ok=True)
def _init_cookie_db(self, cookie_db):
cookie_dir = os.path.join(self._chrome_user_data_dir, 'Default')
cookie_location = os.path.join(cookie_dir, 'Cookies')
self.logger.debug('cookie DB provided, writing to %s', cookie_location)
os.makedirs(cookie_dir, exist_ok=True)
try:
with open(cookie_location, 'wb') as cookie_file:
cookie_file.write(self.cookie_db)
except OSError:
self.logger.error(
'exception writing cookie file at %s',
cookie_location, exc_info=True)
try:
with open(cookie_location, 'wb') as cookie_file:
cookie_file.write(cookie_db)
except OSError:
self.logger.error(
'exception writing cookie file at %s',
cookie_location, exc_info=True)
def persist_and_read_cookie_db(self):
cookie_location = os.path.join(
@ -110,21 +116,32 @@ class Chrome:
cookie_location, exc_info=True)
return cookie_db
def start(self):
def start(self, proxy=None, cookie_db=None):
'''
Returns websocket url to chrome window with about:blank loaded.
Starts chrome/chromium process.
Args:
proxy: http proxy 'host:port' (default None)
cookie_db: raw bytes of chrome/chromium sqlite3 cookies database,
which, if supplied, will be written to
{chrome_user_data_dir}/Default/Cookies before running the
browser (default None)
Returns:
websocket url to chrome window with about:blank loaded
'''
# these can raise exceptions
self._home_tmpdir = tempfile.TemporaryDirectory()
self._chrome_user_data_dir = os.path.join(
self._home_tmpdir.name, 'chrome-user-data')
self._init_cookie_db()
if cookie_db:
self._init_cookie_db(cookie_db)
new_env = os.environ.copy()
new_env['HOME'] = self._home_tmpdir.name
self.port = self._find_available_port(self.port)
chrome_args = [
self.executable, '--use-mock-keychain', # mac thing
self.chrome_exe, '--use-mock-keychain', # mac thing
'--user-data-dir=%s' % self._chrome_user_data_dir,
'--remote-debugging-port=%s' % self.port,
'--disable-web-sockets', '--disable-cache',
@ -135,8 +152,8 @@ class Chrome:
'--disable-extensions', '--disable-save-password-bubble']
if self.ignore_cert_errors:
chrome_args.append('--ignore-certificate-errors')
if self.proxy:
chrome_args.append('--proxy-server=%s' % self.proxy)
if proxy:
chrome_args.append('--proxy-server=%s' % proxy)
chrome_args.append('about:blank')
self.logger.info(
'running: %s', repr(subprocess.list2cmdline(chrome_args)))
@ -146,7 +163,8 @@ class Chrome:
stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0)
self._out_reader_thread = threading.Thread(
target=self._read_stderr_stdout,
name='ChromeOutReaderThread(pid=%s)' % self.chrome_process.pid)
name='ChromeOutReaderThread(pid=%s)' % self.chrome_process.pid,
daemon=True)
self._out_reader_thread.start()
self.logger.info('chrome running, pid %s' % self.chrome_process.pid)
@ -171,6 +189,8 @@ class Chrome:
'got chrome window websocket debug url %s from %s',
url, json_url)
return url
except brozzler.ShutdownRequested:
raise
except BaseException as e:
if int(time.time() - self._start) % 10 == 5:
self.logger.warn(
@ -236,18 +256,18 @@ class Chrome:
def stop(self):
if not self.chrome_process or self._shutdown.is_set():
return
self._shutdown.set()
timeout_sec = 300
self._shutdown.set()
self.logger.info('terminating chrome pgid %s' % self.chrome_process.pid)
if self.chrome_process.poll() is None:
self.logger.info(
'terminating chrome pgid %s', self.chrome_process.pid)
os.killpg(self.chrome_process.pid, signal.SIGTERM)
first_sigterm = time.time()
os.killpg(self.chrome_process.pid, signal.SIGTERM)
t0 = time.time()
try:
while time.time() - first_sigterm < timeout_sec:
time.sleep(0.5)
while time.time() - t0 < timeout_sec:
status = self.chrome_process.poll()
if status is not None:
if status == 0:
@ -264,11 +284,12 @@ class Chrome:
# around, but there's a chance I suppose that some other
# process could have started with the same pgid
return
time.sleep(0.5)
self.logger.warn(
'chrome pid %s still alive %.1f seconds after sending '
'SIGTERM, sending SIGKILL', self.chrome_process.pid,
time.time() - first_sigterm)
time.time() - t0)
os.killpg(self.chrome_process.pid, signal.SIGKILL)
status = self.chrome_process.wait()
self.logger.warn(
@ -284,3 +305,4 @@ class Chrome:
finally:
self._out_reader_thread.join()
self.chrome_process = None

View File

@ -276,9 +276,7 @@ def brozzler_worker():
def sigint(signum, frame):
raise brozzler.ShutdownRequested('shutdown requested (caught SIGINT)')
# do not print in signal handler to avoid RuntimeError: reentrant call
state_dump_msgs = []
def queue_state_dump(signum, frame):
def dump_state(signum, frame):
signal.signal(signal.SIGQUIT, signal.SIG_IGN)
try:
state_strs = []
@ -291,15 +289,15 @@ def brozzler_worker():
state_strs.append('<???:thread:ident=%s>' % ident)
stack = traceback.format_stack(frames[ident])
state_strs.append(''.join(stack))
state_dump_msgs.append(
logging.info(
'dumping state (caught signal %s)\n%s' % (
signum, '\n'.join(state_strs)))
except BaseException as e:
state_dump_msgs.append('exception dumping state: %s' % e)
logging.error('exception dumping state: %s' % e)
finally:
signal.signal(signal.SIGQUIT, queue_state_dump)
signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGQUIT, queue_state_dump)
signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint)
@ -311,17 +309,7 @@ def brozzler_worker():
frontier, service_registry, max_browsers=int(args.max_browsers),
chrome_exe=args.chrome_exe)
worker.start()
try:
while worker.is_alive():
while state_dump_msgs:
logging.warn(state_dump_msgs.pop(0))
time.sleep(0.5)
logging.critical('worker thread has died, shutting down')
except brozzler.ShutdownRequested as e:
pass
finally:
worker.shutdown_now()
worker.run()
logging.info('brozzler-worker is all done, exiting')

View File

@ -104,12 +104,10 @@ class BrozzlerWorker:
self._default_proxy = proxy
self._default_enable_warcprox_features = enable_warcprox_features
self._browser_pool = brozzler.browser.BrowserPool(max_browsers,
chrome_exe=chrome_exe, ignore_cert_errors=True)
self._shutdown_requested = threading.Event()
self._thread = None
self._start_stop_lock = threading.Lock()
self._browser_pool = brozzler.browser.BrowserPool(
max_browsers, chrome_exe=chrome_exe, ignore_cert_errors=True)
self._browsing_threads = set()
self._browsing_threads_lock = threading.Lock()
def _proxy(self, site):
if site.proxy:
@ -203,6 +201,8 @@ class BrozzlerWorker:
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
payload=info_json.encode("utf-8"),
extra_headers=site.extra_headers())
except brozzler.ShutdownRequested as e:
raise
except BaseException as e:
if hasattr(e, "exc_info") and e.exc_info[0] == youtube_dl.utils.UnsupportedError:
pass
@ -257,6 +257,8 @@ class BrozzlerWorker:
self._try_youtube_dl(ydl, site, page)
except brozzler.ReachedLimit as e:
raise
except brozzler.ShutdownRequested:
raise
except Exception as e:
if (hasattr(e, 'exc_info') and len(e.exc_info) >= 2
and hasattr(e.exc_info[1], 'code')
@ -272,12 +274,13 @@ class BrozzlerWorker:
self.logger.info('needs browsing: %s', page)
if not browser.is_running():
browser.start(proxy=self._proxy(site), cookie_db=site.cookie_db)
outlinks = browser.browse_page(
final_page_url, outlinks = browser.browse_page(
page.url, extra_headers=site.extra_headers(),
behavior_parameters=site.behavior_parameters,
user_agent=site.user_agent,
on_screenshot=_on_screenshot,
on_url_change=page.note_redirect)
on_screenshot=_on_screenshot)
if final_page_url != page.url:
page.note_redirect(final_page_url)
return outlinks
else:
if not self._already_fetched(page, ydl_spy):
@ -319,14 +322,13 @@ class BrozzlerWorker:
return False
def _brozzle_site(self, browser, site):
start = time.time()
page = None
try:
while (not self._shutdown_requested.is_set()
and time.time() - start < 7 * 60):
start = time.time()
while time.time() - start < 7 * 60:
self._frontier.honor_stop_request(site.job_id)
page = self._frontier.claim_page(site, "%s:%s" % (
socket.gethostname(), browser.chrome_port))
socket.gethostname(), browser.chrome.port))
if (page.needs_robots_check and
not brozzler.is_permitted_by_robots(site, page.url)):
@ -336,27 +338,28 @@ class BrozzlerWorker:
self._frontier.scope_and_schedule_outlinks(
site, page, outlinks)
if browser.is_running():
site.cookie_db = browser.persist_and_read_cookie_db()
site.cookie_db = browser.chrome.persist_and_read_cookie_db()
self._frontier.completed_page(site, page)
page = None
except brozzler.ShutdownRequested:
self.logger.info("shutdown requested")
except brozzler.NothingToClaim:
self.logger.info("no pages left for site %s", site)
except brozzler.ReachedLimit as e:
self._frontier.reached_limit(site, e)
except brozzler.CrawlJobStopped:
self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
except brozzler.browser.BrowsingAborted:
self.logger.info("{} shut down".format(browser))
# except brozzler.browser.BrowsingAborted:
# self.logger.info("{} shut down".format(browser))
except:
self.logger.critical("unexpected exception", exc_info=True)
finally:
self.logger.info("finished session brozzling site, stopping "
"browser and disclaiming site")
browser.stop()
self._frontier.disclaim_site(site, page)
self._browser_pool.release(browser)
self._browsing_threads.remove(threading.current_thread())
with self._browsing_threads_lock:
self._browsing_threads.remove(threading.current_thread())
def _service_heartbeat(self):
if hasattr(self, "status_info"):
@ -380,31 +383,39 @@ class BrozzlerWorker:
"failed to send heartbeat and update service registry "
"with info %s: %s", status_info, e)
def _service_heartbeat_if_due(self):
'''Sends service registry heartbeat if due'''
due = False
if self._service_registry:
if not hasattr(self, "status_info"):
due = True
else:
d = rethinkstuff.utcnow() - self.status_info["last_heartbeat"]
due = d.total_seconds() > self.HEARTBEAT_INTERVAL
if due:
self._service_heartbeat()
def run(self):
try:
latest_state = None
while not self._shutdown_requested.is_set():
if self._service_registry and (
not hasattr(self, "status_info")
or (rethinkstuff.utcnow() -
self.status_info["last_heartbeat"]).total_seconds()
> self.HEARTBEAT_INTERVAL):
self._service_heartbeat()
while True:
self._service_heartbeat_if_due()
try:
browser = self._browser_pool.acquire()
try:
site = self._frontier.claim_site("{}:{}".format(
socket.gethostname(), browser.chrome_port))
site = self._frontier.claim_site("%s:%s" % (
socket.gethostname(), browser.chrome.port))
self.logger.info(
"brozzling site (proxy=%s) %s",
repr(self._proxy(site)), site)
th = threading.Thread(
target=lambda: self._brozzle_site(
browser, site),
name="BrozzlingThread:%s" % site.seed)
target=self._brozzle_site, args=(browser, site),
name="BrozzlingThread:%s" % site.seed,
daemon=True)
with self._browsing_threads_lock:
self._browsing_threads.add(th)
th.start()
self._browsing_threads.add(th)
except:
self._browser_pool.release(browser)
raise
@ -418,6 +429,8 @@ class BrozzlerWorker:
self.logger.info("no unclaimed sites to browse")
latest_state = "no-unclaimed-sites"
time.sleep(0.5)
except brozzler.ShutdownRequested:
self.logger.info("shutdown requested")
except:
self.logger.critical(
"thread exiting due to unexpected exception",
@ -431,26 +444,16 @@ class BrozzlerWorker:
"failed to unregister from service registry",
exc_info=True)
def start(self):
with self._start_stop_lock:
if self._thread:
self.logger.warn(
'ignoring start request because self._thread is '
'not None')
return
self._thread = threading.Thread(
target=self.run, name="BrozzlerWorker")
self._thread.start()
def shutdown_now(self):
with self._start_stop_lock:
self.logger.info("brozzler worker shutting down")
self._shutdown_requested.set()
self.logger.info(
'shutting down %s brozzling threads',
len(self._browsing_threads))
with self._browsing_threads_lock:
for th in self._browsing_threads:
if th.is_alive():
brozzler.thread_raise(th, brozzler.ShutdownRequested)
self._browser_pool.shutdown_now()
while self._browsing_threads:
time.sleep(0.5)
self._thread = None
def is_alive(self):
return self._thread and self._thread.is_alive()
# copy to avoid "RuntimeError: Set changed size during iteration"
thredz = set(self._browsing_threads)
for th in thredz:
th.join()