Merge branch 'refactor-browsing' into qa

* refactor-browsing:
  more shutdown tweaks
  improving shutdown process
  working on major refactoring of browser management
This commit is contained in:
Noah Levitt 2016-12-15 12:28:21 -08:00
commit 7a68599057
6 changed files with 618 additions and 639 deletions

View file

@ -17,11 +17,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import json as _json
import logging as _logging
import surt as _surt
from pkg_resources import get_distribution as _get_distribution from pkg_resources import get_distribution as _get_distribution
__version__ = _get_distribution('brozzler').version __version__ = _get_distribution('brozzler').version
class ShutdownRequested(Exception): class ShutdownRequested(Exception):
@ -35,9 +31,11 @@ class CrawlJobStopped(Exception):
class ReachedLimit(Exception): class ReachedLimit(Exception):
def __init__(self, http_error=None, warcprox_meta=None, http_payload=None): def __init__(self, http_error=None, warcprox_meta=None, http_payload=None):
import json
if http_error: if http_error:
if "warcprox-meta" in http_error.headers: if "warcprox-meta" in http_error.headers:
self.warcprox_meta = _json.loads(http_error.headers["warcprox-meta"]) self.warcprox_meta = json.loads(
http_error.headers["warcprox-meta"])
else: else:
self.warcprox_meta = None self.warcprox_meta = None
self.http_payload = http_error.read() self.http_payload = http_error.read()
@ -69,7 +67,8 @@ def fixup(url):
''' '''
Does rudimentary canonicalization, such as converting IDN to punycode. Does rudimentary canonicalization, such as converting IDN to punycode.
''' '''
hurl = _surt.handyurl.parse(url) import surt
hurl = surt.handyurl.parse(url)
# handyurl.parse() already lowercases the scheme via urlsplit # handyurl.parse() already lowercases the scheme via urlsplit
if hurl.host: if hurl.host:
hurl.host = hurl.host.encode('idna').decode('ascii').lower() hurl.host = hurl.host.encode('idna').decode('ascii').lower()
@ -78,6 +77,98 @@ def fixup(url):
# logging level more fine-grained than logging.DEBUG==10 # logging level more fine-grained than logging.DEBUG==10
TRACE = 5 TRACE = 5
_behaviors = None
def behaviors():
import os, yaml, string
global _behaviors
if _behaviors is None:
behaviors_yaml = os.path.join(
os.path.dirname(__file__), 'behaviors.yaml')
with open(behaviors_yaml) as fin:
conf = yaml.load(fin)
_behaviors = conf['behaviors']
for behavior in _behaviors:
if 'behavior_js' in behavior:
behavior_js = os.path.join(
os.path.dirname(__file__), 'behaviors.d',
behavior['behavior_js'])
with open(behavior_js, encoding='utf-8') as fin:
behavior['script'] = fin.read()
elif 'behavior_js_template' in behavior:
behavior_js_template = os.path.join(
os.path.dirname(__file__), 'behaviors.d',
behavior['behavior_js_template'])
with open(behavior_js_template, encoding='utf-8') as fin:
behavior['template'] = string.Template(fin.read())
return _behaviors
def behavior_script(url, template_parameters=None):
'''
Returns the javascript behavior string populated with template_parameters.
'''
import re, logging
for behavior in behaviors():
if re.match(behavior['url_regex'], url):
if 'behavior_js' in behavior:
logging.info(
'using behavior %s for %s',
behavior['behavior_js'], url)
return behavior['script']
elif 'behavior_js_template' in behavior:
parameters = dict()
if 'default_parameters' in behavior:
parameters.update(behavior['default_parameters'])
if template_parameters:
parameters.update(template_parameters)
script = behavior['template'].safe_substitute(parameters)
logging.info(
'using template=%s populated with parameters=%s for %s',
repr(behavior['behavior_js_template']), parameters, url)
return script
return None
def thread_raise(thread, exctype):
'''
Raises the exception exctype in the thread.
Adapted from http://tomerfiliba.com/recipes/Thread2/ which explains:
"The exception will be raised only when executing python bytecode. If your
thread calls a native/built-in blocking function, the exception will be
raised only when execution returns to the python code."
'''
import ctypes, inspect, threading
if not thread.is_alive():
raise threading.ThreadError('thread %s is not running' % thread)
if not inspect.isclass(exctype):
raise TypeError(
'cannot raise %s, only exception types can be raised (not '
'instances)' % exc_type)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(exctype))
if res == 0:
raise ValueError('invalid thread id? thread.ident=%s' % thread.ident)
elif res != 1:
# if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0)
raise SystemError('PyThreadState_SetAsyncExc failed')
def sleep(duration):
'''
Sleeps for duration seconds in increments of 0.5 seconds.
Use this so that the sleep can be interrupted by thread_raise().
'''
import time
start = time.time()
while True:
elapsed = time.time() - start
if elapsed >= duration:
break
time.sleep(min(duration - elapsed, 0.5))
from brozzler.site import Page, Site from brozzler.site import Page, Site
from brozzler.worker import BrozzlerWorker from brozzler.worker import BrozzlerWorker
from brozzler.robots import is_permitted_by_robots from brozzler.robots import is_permitted_by_robots

View file

@ -1,137 +0,0 @@
'''
brozzler/behaviors.py - manages behaviors, which are javascript scripts that
run in brozzled web pages
Copyright (C) 2014-2016 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import json
import itertools
import os
import re
import logging
import time
import sys
import yaml
import string
__all__ = ["Behavior"]
class Behavior:
logger = logging.getLogger(__module__ + "." + __qualname__)
_behaviors = None
@staticmethod
def behaviors():
if Behavior._behaviors is None:
behaviors_yaml = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.yaml'])
with open(behaviors_yaml) as fin:
conf = yaml.load(fin)
Behavior._behaviors = conf['behaviors']
for behavior in Behavior._behaviors:
if "behavior_js" in behavior:
behavior_js = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ["behaviors.d"] + [behavior["behavior_js"]])
with open(behavior_js, encoding="utf-8") as fin:
behavior["script"] = fin.read()
elif "behavior_js_template" in behavior:
behavior_js_template = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ["behaviors.d"] + [behavior["behavior_js_template"]])
with open(behavior_js_template, encoding="utf-8") as fin:
behavior["template"] = string.Template(fin.read())
return Behavior._behaviors
def __init__(self, url, umbra_worker):
self.url = url
self.umbra_worker = umbra_worker
self.script_finished = False
self.waiting_result_msg_ids = []
self.active_behavior = None
self.last_activity = time.time()
def start(self, template_parameters=None):
for behavior in Behavior.behaviors():
if re.match(behavior['url_regex'], self.url):
if "behavior_js" in behavior:
self.logger.info("using %s behavior for %s",
behavior["behavior_js"], self.url)
elif "behavior_js_template" in behavior:
parameters = dict()
if "default_parameters" in behavior:
parameters.update(behavior["default_parameters"])
if template_parameters:
parameters.update(template_parameters)
behavior["script"] = behavior["template"].safe_substitute(parameters)
self.logger.info(
"using template=%s populated with parameters=%s for %s",
repr(behavior["behavior_js_template"]),
parameters, self.url)
self.active_behavior = behavior
self.umbra_worker.send_to_chrome(
method="Runtime.evaluate", suppress_logging=True,
params={"expression": behavior["script"]})
self.notify_of_activity()
return
self.logger.warn("no behavior to run on {}".format(self.url))
def is_finished(self):
"""Asynchronously asks behavior if it is finished, and in the mean time
returns the response from the previous such query."""
msg_id = self.umbra_worker.send_to_chrome(method="Runtime.evaluate",
suppress_logging=True, params={"expression":"umbraBehaviorFinished()"})
self.waiting_result_msg_ids.append(msg_id)
request_idle_timeout_sec = 30
if self.active_behavior and 'request_idle_timeout_sec' in self.active_behavior:
request_idle_timeout_sec = self.active_behavior['request_idle_timeout_sec']
idle_time = time.time() - self.last_activity
return self.script_finished and idle_time > request_idle_timeout_sec
def is_waiting_on_result(self, msg_id):
return msg_id in self.waiting_result_msg_ids
def notify_of_result(self, chrome_message):
# {'id': 59, 'result': {'result': {'type': 'boolean', 'value': True}, 'wasThrown': False}}
# {'id': 59, 'result': {'result': {'type': 'boolean', 'value': False}}
self.waiting_result_msg_ids.remove(chrome_message['id'])
if ('result' in chrome_message and not (
'wasThrown' in chrome_message['result']
and chrome_message['result']['wasThrown'])
and 'result' in chrome_message['result']
and type(chrome_message['result']['result']['value']) == bool):
self.script_finished = chrome_message['result']['result']['value']
else:
# this happens if the behavior script doesn't define
# umbraBehaviorFinished, and I think it can also happen normally
# after the behavior has been sent to the browser but before
# the browser has it fully loaded... in any case the message
# was overwhelming the logs, so I'm bumping it down to debug level
self.logger.debug(
"chrome message doesn't look like a boolean result! %s",
chrome_message)
def notify_of_activity(self):
self.last_activity = time.time()
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logger = logging.getLogger('umbra.behaviors')
logger.info("custom behaviors: {}".format(Behavior.behaviors()))

View file

@ -17,50 +17,67 @@ limitations under the License.
''' '''
import logging import logging
import json import time
import brozzler
import itertools import itertools
import json
import websocket import websocket
import time import time
import threading import threading
import os
import random
import brozzler import brozzler
from brozzler.chrome import Chrome
from brozzler.behaviors import Behavior
from requests.structures import CaseInsensitiveDict from requests.structures import CaseInsensitiveDict
import base64
import sqlite3
import datetime import datetime
import base64
from brozzler.chrome import Chrome
import surt
__all__ = ["BrowserPool", "Browser"] class BrowsingException(Exception):
pass
class NoBrowsersAvailable(Exception):
pass
class BrowsingTimeout(BrowsingException):
pass
class BrowserPool: class BrowserPool:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + '.' + __qualname__)
BASE_PORT = 9200 BASE_PORT = 9200
def __init__(self, size=3, **kwargs): def __init__(self, size=3, **kwargs):
"""kwargs are passed on to Browser.__init__""" '''
Initializes the pool.
Args:
size: size of pool (default 3)
**kwargs: arguments for Browser(...)
'''
self.size = size self.size = size
self._available = set() self._available = set()
self._in_use = set() self._in_use = set()
for i in range(0, size): for i in range(0, size):
browser = Browser(BrowserPool.BASE_PORT + i, **kwargs) browser = Browser(port=BrowserPool.BASE_PORT + i, **kwargs)
self._available.add(browser) self._available.add(browser)
self._lock = threading.Lock() self._lock = threading.Lock()
def acquire(self): def acquire(self):
""" '''
Returns browser from pool if available, raises NoBrowsersAvailable Returns an available instance.
otherwise.
""" Returns:
browser from pool, if available
Raises:
NoBrowsersAvailable if none available
'''
with self._lock: with self._lock:
try: try:
browser = self._available.pop() browser = self._available.pop()
except KeyError: except KeyError:
raise NoBrowsersAvailable() raise NoBrowsersAvailable
self._in_use.add(browser) self._in_use.add(browser)
return browser return browser
@ -70,8 +87,14 @@ class BrowserPool:
self._in_use.remove(browser) self._in_use.remove(browser)
def shutdown_now(self): def shutdown_now(self):
self.logger.info(
'shutting down browser pool (%s browsers in use)',
len(self._in_use))
with self._lock:
for browser in self._available:
browser.stop()
for browser in self._in_use: for browser in self._in_use:
browser.abort_browse_page() browser.stop()
def num_available(self): def num_available(self):
return len(self._available) return len(self._available)
@ -79,46 +102,23 @@ class BrowserPool:
def num_in_use(self): def num_in_use(self):
return len(self._in_use) return len(self._in_use)
class NoBrowsersAvailable(Exception):
pass
class BrowsingException(Exception):
pass
class BrowsingAborted(BrowsingException):
pass
class ResultMessageTimeout(BrowsingException):
pass
class Browser: class Browser:
""" '''
Runs chrome/chromium to synchronously browse one page at a time using Manages an instance of Chrome for browsing pages.
worker.browse_page(). Should not be accessed from multiple threads. '''
""" logger = logging.getLogger(__module__ + '.' + __qualname__)
logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, **kwargs):
'''
Initializes the Browser.
HARD_TIMEOUT_SECONDS = 20 * 60 Args:
**kwargs: arguments for Chrome(...)
def __init__( '''
self, chrome_port=9222, chrome_exe='chromium-browser', proxy=None, self.chrome = Chrome(**kwargs)
ignore_cert_errors=False): self.websocket_url = None
self.command_id = itertools.count(1) self.is_browsing = False
self.chrome_port = chrome_port self._browser_controller = None
self.chrome_exe = chrome_exe
self.proxy = proxy
self.ignore_cert_errors = ignore_cert_errors
self._behavior = None
self._websock = None
self._abort_browse_page = False
self._chrome_instance = None
self._aw_snap_hes_dead_jim = None
self._work_dir = None
self._websocket_url = None
def __repr__(self):
return "{}.{}:{}".format(Browser.__module__, Browser.__qualname__, self.chrome_port)
def __enter__(self): def __enter__(self):
self.start() self.start()
@ -127,115 +127,298 @@ class Browser:
def __exit__(self, *args): def __exit__(self, *args):
self.stop() self.stop()
def start(self, proxy=None, cookie_db=None): def start(self, **kwargs):
if not self._chrome_instance: '''
self._chrome_instance = Chrome( Starts chrome if it's not running.
port=self.chrome_port, executable=self.chrome_exe,
ignore_cert_errors=self.ignore_cert_errors, Args:
proxy=proxy or self.proxy, cookie_db=None) **kwargs: arguments for self.chrome.start(...)
try: '''
self._websocket_url = self._chrome_instance.start() if not self.is_running():
except: self.websocket_url = self.chrome.start(**kwargs)
self._chrome_instance = None self._browser_controller = BrowserController(self.websocket_url)
raise self._browser_controller.start()
def stop(self): def stop(self):
'''
Stops chrome if it's running.
'''
try: try:
if self.is_running(): if self._browser_controller:
self._chrome_instance.stop() self._browser_controller.stop()
self._chrome_instance = None self.websocket_url = None
self._websocket_url = None self.chrome.stop()
except: except:
self.logger.error("problem stopping", exc_info=True) self.logger.error('problem stopping', exc_info=True)
def is_running(self): def is_running(self):
return bool(self._websocket_url) return self.websocket_url is not None
def abort_browse_page(self):
self._abort_browse_page = True
def persist_and_read_cookie_db(self):
if self._chrome_instance:
return self._chrome_instance.persist_and_read_cookie_db()
else:
return None
def browse_page( def browse_page(
self, url, extra_headers=None, behavior_parameters=None, self, page_url, ignore_cert_errors=False, extra_headers=None,
user_agent=None, user_agent=None, behavior_parameters=None,
on_request=None, on_response=None, on_screenshot=None, on_request=None, on_response=None, on_screenshot=None):
on_url_change=None): '''
""" Browses page in browser.
Synchronously loads a page, runs behaviors, and takes a screenshot.
Raises BrowsingException if browsing the page fails in a non-critical Browser should already be running, i.e. start() should have been
way. called. Opens the page_url in the browser, runs behaviors, takes a
screenshot, extracts outlinks.
Returns extracted outlinks. Args:
""" page_url: url of the page to browse
extra_headers: dict of extra http headers to configure the browser
to send with every request (default None)
user_agent: user agent string, replaces browser default if
supplied (default None)
behavior_parameters: dict of parameters for populating the
javascript behavior template (default None)
on_request: callback to invoke on every Network.requestWillBeSent
event, takes one argument, the json-decoded message (default
None)
on_response: callback to invoke on every Network.responseReceived
event, takes one argument, the json-decoded message (default
None)
on_screenshot: callback to invoke when screenshot is obtained,
takes one argument, the the raw jpeg bytes (default None)
# XXX takes two arguments, the url of the page at the time the
# screenshot was taken, and the raw jpeg bytes (default None)
Returns:
A tuple (final_page_url, outlinks).
final_page_url: the url in the location bar at the end of the
browse_page cycle, which could be different from the original
page url if the page redirects, javascript has changed the url
in the location bar, etc
outlinks: a list of navigational links extracted from the page
Raises:
BrowsingException: if browsing the page fails
'''
if not self.is_running(): if not self.is_running():
raise BrowsingException("browser has not been started") raise BrowsingException('browser has not been started')
self.url = url if self.is_browsing:
self.extra_headers = extra_headers raise BrowsingException('browser is already busy browsing a page')
self.user_agent = user_agent self.is_browsing = True
self.on_request = on_request
self.on_screenshot = on_screenshot
self.on_url_change = on_url_change
self.on_response = on_response
self.behavior_parameters = behavior_parameters
self._outlinks = None
self._reached_limit = None
self._aw_snap_hes_dead_jim = None
self._abort_browse_page = False
self._has_screenshot = False
self._waiting_on_result_messages = {}
self._result_message_timeout = None
self._websock = websocket.WebSocketApp(
self._websocket_url, on_open=self._visit_page,
on_message=self._wrap_handle_message)
thread_name = "WebsockThread:{}-{:%Y%m%d%H%M%S}".format(
self.chrome_port, datetime.datetime.utcnow())
websock_thread = threading.Thread(
target=self._websock.run_forever, name=thread_name,
kwargs={'ping_timeout':0.5})
websock_thread.start()
self._start = time.time()
aborted = False
try: try:
while True: self._browser_controller.navigate_to_page(page_url, timeout=300)
time.sleep(0.5) ## if login_credentials:
if self._browse_interval_func(): ## self._browser_controller.try_login(login_credentials) (5 min?)
return self._outlinks behavior_script = brozzler.behavior_script(
page_url, behavior_parameters)
self._browser_controller.run_behavior(behavior_script, timeout=900)
if on_screenshot:
self._browser_controller.scroll_to_top()
jpeg_bytes = self._browser_controller.screenshot()
on_screenshot(jpeg_bytes)
outlinks = self._browser_controller.extract_outlinks()
## for each hashtag not already visited:
## navigate_to_hashtag (nothing to wait for so no timeout?)
## if on_screenshot;
## take screenshot (30 sec)
## run behavior (3 min)
## outlinks += retrieve_outlinks (60 sec)
final_page_url = self._browser_controller.url()
return final_page_url, outlinks
except websocket.WebSocketConnectionClosedException as e:
self.logger.error('websocket closed, did chrome die?')
raise BrowsingException(e)
finally: finally:
self.is_browsing = False
class Counter:
def __init__(self):
self.next_value = 0
def __next__(self):
try:
return self.next_value
finally:
self.next_value += 1
def peek_next(self):
return self.next_value
class BrowserController:
'''
'''
logger = logging.getLogger(__module__ + '.' + __qualname__)
def __init__(self, websocket_url):
self.websocket_url = websocket_url
self._command_id = Counter()
self._websock_thread = None
self._websock_open = None
self._result_messages = {}
def _wait_for(self, callback, timeout=None):
'''
Spins until callback() returns truthy.
'''
start = time.time()
while True:
brozzler.sleep(0.5)
if callback():
return
elapsed = time.time() - start
if timeout and elapsed > timeout:
raise BrowsingTimeout(
'timed out after %.1fs waiting for: %s' % (
elapsed, callback))
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
def start(self):
if not self._websock_thread:
calling_thread = threading.current_thread()
def on_open(websock):
self._websock_open = datetime.datetime.utcnow()
def on_error(websock, e):
'''
Raises BrowsingException in the thread that called start()
'''
if isinstance(e, websocket.WebSocketConnectionClosedException):
self.logger.error('websocket closed, did chrome die?')
else:
self.logger.error(
'exception from websocket receiver thread',
exc_info=1)
brozzler.thread_raise(calling_thread, BrowsingException)
# open websocket, start thread that receives messages
self._websock = websocket.WebSocketApp(
self.websocket_url, on_open=on_open,
on_message=self._on_message, on_error=on_error)
thread_name = 'WebsockThread:{}-{:%Y%m%d%H%M%S}'.format(
surt.handyurl.parse(self.websocket_url).port,
datetime.datetime.utcnow())
self._websock_thread = threading.Thread(
target=self._websock.run_forever, name=thread_name,
daemon=True)
self._websock_thread.start()
self._wait_for(lambda: self._websock_open, timeout=10)
# tell browser to send messages we're interested in
self.send_to_chrome(method='Network.enable')
self.send_to_chrome(method='Page.enable')
self.send_to_chrome(method='Console.enable')
self.send_to_chrome(method='Debugger.enable')
self.send_to_chrome(method='Runtime.enable')
# disable google analytics, see _handle_message() where breakpoint
# is caught Debugger.paused
self.send_to_chrome(
method='Debugger.setBreakpointByUrl',
params={
'lineNumber': 1,
'urlRegex': 'https?://www.google-analytics.com/analytics.js'})
def stop(self, *args):
if self._websock_thread:
if (self._websock and self._websock.sock if (self._websock and self._websock.sock
and self._websock.sock.connected): and self._websock.sock.connected):
self.logger.info('shutting down websocket connection')
try: try:
self._websock.close() self._websock.close()
except BaseException as e: except BaseException as e:
self.logger.error( self.logger.error(
"exception closing websocket %s - %s" % ( 'exception closing websocket %s - %s',
self._websock, e)) self._websock, e)
websock_thread.join(timeout=30) if self._websock_thread != threading.current_thread():
if websock_thread.is_alive(): self._websock_thread.join(timeout=30)
if self._websock_thread.is_alive():
self.logger.error( self.logger.error(
"%s still alive 30 seconds after closing %s, will " '%s still alive 30 seconds after closing %s, will '
"forcefully nudge it again" % ( 'forcefully nudge it again', self._websock_thread,
websock_thread, self._websock)) self._websock)
self._websock.keep_running = False self._websock.keep_running = False
websock_thread.join(timeout=30) self._websock_thread.join(timeout=30)
if websock_thread.is_alive(): if self._websock_thread.is_alive():
self.logger.critical( self.logger.critical(
"%s still alive 60 seconds after closing %s" % ( '%s still alive 60 seconds after closing %s',
websock_thread, self._websock)) self._websock_thread, self._websock)
self._behavior = None def _on_message(self, websock, message):
try:
self._handle_message(websock, message)
except:
self.logger.error(
'uncaught exception in _handle_message message=%s',
message, exc_info=True)
OUTLINKS_JS = r""" def _handle_message(self, websock, json_message):
message = json.loads(json_message)
if 'method' in message:
if message['method'] == 'Page.loadEventFired':
self._got_page_load_event = datetime.datetime.utcnow()
elif message['method'] == 'Debugger.paused':
self._debugger_paused(message)
elif message['method'] == 'Console.messageAdded':
self.logger.debug(
'%s console.%s %s', self._websock.url,
message['params']['message']['level'],
message['params']['message']['text'])
else:
self.logger.debug("%s %s", message["method"], json_message)
elif 'result' in message:
if message['id'] in self._result_messages:
self._result_messages[message['id']] = message
else:
self.logger.debug("%s", json_message)
else:
self.logger.debug("%s", json_message)
def _debugger_paused(self, message):
# we hit the breakpoint set in start(), get rid of google analytics
self.logger.debug('debugger paused! message=%s', message)
scriptId = message['params']['callFrames'][0]['location']['scriptId']
# replace script
self.send_to_chrome(
method='Debugger.setScriptSource',
params={'scriptId': scriptId,
'scriptSource': 'console.log("google analytics is no more!");'})
# resume execution
self.send_to_chrome(method='Debugger.resume')
def send_to_chrome(self, suppress_logging=False, **kwargs):
msg_id = next(self._command_id)
kwargs['id'] = msg_id
msg = json.dumps(kwargs)
if not suppress_logging:
self.logger.debug('sending message to %s: %s', self._websock, msg)
self._websock.send(msg)
return msg_id
def navigate_to_page(
self, page_url, extra_headers=None, user_agent=None, timeout=300):
'''
'''
headers = extra_headers or {}
headers['Accept-Encoding'] = 'gzip, deflate'
self.send_to_chrome(
method='Network.setExtraHTTPHeaders',
params={'headers': headers})
if user_agent:
self.send_to_chrome(
method='Network.setUserAgentOverride',
params={'userAgent': user_agent})
# navigate to the page!
self.logger.info('navigating to page %s', page_url)
self._got_page_load_event = None
self.send_to_chrome(method='Page.navigate', params={'url': page_url})
self._wait_for(lambda: self._got_page_load_event, timeout=timeout)
OUTLINKS_JS = r'''
var __brzl_framesDone = new Set(); var __brzl_framesDone = new Set();
var __brzl_compileOutlinks = function(frame) { var __brzl_compileOutlinks = function(frame) {
__brzl_framesDone.add(frame); __brzl_framesDone.add(frame);
@ -244,265 +427,94 @@ var __brzl_compileOutlinks = function(frame) {
frame.document.querySelectorAll('a[href]')); frame.document.querySelectorAll('a[href]'));
for (var i = 0; i < frame.frames.length; i++) { for (var i = 0; i < frame.frames.length; i++) {
if (frame.frames[i] && !__brzl_framesDone.has(frame.frames[i])) { if (frame.frames[i] && !__brzl_framesDone.has(frame.frames[i])) {
outlinks = outlinks.concat(__brzl_compileOutlinks(frame.frames[i])); outlinks = outlinks.concat(
__brzl_compileOutlinks(frame.frames[i]));
} }
} }
} }
return outlinks; return outlinks;
} }
__brzl_compileOutlinks(window).join('\n'); __brzl_compileOutlinks(window).join('\n');
""" '''
def extract_outlinks(self, timeout=60):
def _chain_chrome_messages(self, chain): self.logger.info('extracting outlinks')
""" self._result_messages[self._command_id.peek_next()] = None
Sends a series of messages to chrome/chromium on the debugging protocol msg_id = self.send_to_chrome(
websocket. Waits for a reply from each one before sending the next. method='Runtime.evaluate',
Enforces a timeout waiting for each reply. If the timeout is hit, sets params={'expression': self.OUTLINKS_JS})
self._result_message_timeout with a ResultMessageTimeout (an exception self._wait_for(
class). Takes an array of dicts, each of which should look like this: lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
{ if message['result']['result']['value']:
"info": "human readable description", return frozenset(message['result']['result']['value'].split('\n'))
"chrome_msg": { ... }, # message to send to chrome, as a dict
"timeout": 30, # timeout in seconds
"callback": my_callback, # takes one arg, the result message
}
The code is rather convoluted because of the asynchronous nature of the
whole thing. See how it's used in _start_postbehavior_chain.
"""
timer = None
def callback(message):
if timer:
timer.cancel()
if "callback" in chain[0]:
chain[0]["callback"](message)
self._chain_chrome_messages(chain[1:])
def timeout():
self._result_message_timeout = ResultMessageTimeout(
"timed out after %.1fs waiting for result message "
"for %s", chain[0]["timeout"], chain[0]["chrome_msg"])
if chain:
msg_id = self.send_to_chrome(**chain[0]["chrome_msg"])
self._waiting_on_result_messages[msg_id] = callback
self.logger.info(
"msg_id=%s for message %s", msg_id, chain[0]["chrome_msg"])
timer = threading.Timer(chain[0]["timeout"], timeout)
timer.daemon = True
timer.start()
else:
self.logger.info("finished chrome message chain")
def _start_postbehavior_chain(self):
if self.on_screenshot:
chain = [{
"info": "scrolling to top",
"chrome_msg": {
"method": "Runtime.evaluate",
"params": {"expression": "window.scrollTo(0, 0);"},
},
"timeout": 30,
"callback": lambda message: None,
}, {
"info": "requesting screenshot",
"chrome_msg": {"method": "Page.captureScreenshot"},
"timeout": 30,
"callback": lambda message: (
self.on_screenshot and self.on_screenshot(
base64.b64decode(message["result"]["data"]))),
}]
else:
chain = []
def set_outlinks(message):
if message["result"]["result"]["value"]:
self._outlinks = frozenset(
message["result"]["result"]["value"].split("\n"))
else: else:
self._outlinks = frozenset() self._outlinks = frozenset()
chain.append({ def screenshot(self, timeout=30):
"info": "retrieving outlinks", self.logger.info('taking screenshot')
"chrome_msg": { self._result_messages[self._command_id.peek_next()] = None
"method": "Runtime.evaluate", msg_id = self.send_to_chrome(method='Page.captureScreenshot')
"params": {"expression": self.OUTLINKS_JS}, self._wait_for(
}, lambda: self._result_messages.get(msg_id), timeout=timeout)
"timeout": 60, message = self._result_messages.pop(msg_id)
"callback": set_outlinks, jpeg_bytes = base64.b64decode(message['result']['data'])
}) return jpeg_bytes
self._chain_chrome_messages(chain) def scroll_to_top(self, timeout=30):
self.logger.info('scrolling to top')
def _browse_interval_func(self): self._result_messages[self._command_id.peek_next()] = None
"""Called periodically while page is being browsed. Returns True when
finished browsing."""
if (not self._websock or not self._websock.sock
or not self._websock.sock.connected):
raise BrowsingException(
"websocket closed, did chrome die? {}".format(
self._websocket_url))
elif self._result_message_timeout:
raise self._result_message_timeout
elif self._aw_snap_hes_dead_jim:
raise BrowsingException(
"""chrome tab went "aw snap" or "he's dead jim"!""")
elif self._outlinks is not None:
# setting self._outlinks is the last thing that happens in the
# post-behavior chain
return True
elif (self._behavior != None and self._behavior.is_finished()
or time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS):
if self._behavior and self._behavior.is_finished():
self.logger.info(
"behavior decided it's finished with %s", self.url)
else:
self.logger.info(
"reached hard timeout of %s seconds url=%s",
Browser.HARD_TIMEOUT_SECONDS, self.url)
self._behavior = None
self._start_postbehavior_chain()
return False
elif self._reached_limit:
raise self._reached_limit
elif self._abort_browse_page:
raise BrowsingAborted("browsing page aborted")
else:
return False
def send_to_chrome(self, suppress_logging=False, **kwargs):
msg_id = next(self.command_id)
kwargs["id"] = msg_id
msg = json.dumps(kwargs)
if not suppress_logging:
self.logger.debug("sending message to %s: %s", self._websock, msg)
self._websock.send(msg)
return msg_id
def _visit_page(self, websock):
# navigate to about:blank here to avoid situation where we navigate to
# the same page that we're currently on, perhaps with a different
# #fragment, which prevents Page.loadEventFired from happening
self.send_to_chrome(method="Page.navigate", params={"url": "about:blank"})
self.send_to_chrome(method="Network.enable")
self.send_to_chrome(method="Page.enable")
self.send_to_chrome(method="Console.enable")
self.send_to_chrome(method="Debugger.enable")
self.send_to_chrome(method="Runtime.enable")
headers = self.extra_headers or {}
headers['Accept-Encoding'] = 'gzip, deflate'
self.send_to_chrome(
method="Network.setExtraHTTPHeaders",
params={"headers":headers})
if self.user_agent:
self.send_to_chrome(method="Network.setUserAgentOverride", params={"userAgent": self.user_agent})
# disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused"
self.send_to_chrome(method="Debugger.setBreakpointByUrl", params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"})
# navigate to the page!
self.send_to_chrome(method="Page.navigate", params={"url": self.url})
def _wrap_handle_message(self, websock, message):
try:
self._handle_message(websock, message)
except:
self.logger.error(
"uncaught exception in _handle_message message=%s",
message, exc_info=True)
self.abort_browse_page()
def _network_request_will_be_sent(self, message):
if self._behavior:
self._behavior.notify_of_activity()
if message["params"]["request"]["url"].lower().startswith("data:"):
self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80]))
elif self.on_request:
self.on_request(message)
def _network_response_received(self, message):
if (not self._reached_limit
and message["params"]["response"]["status"] == 420
and "Warcprox-Meta" in CaseInsensitiveDict(
message["params"]["response"]["headers"])):
warcprox_meta = json.loads(CaseInsensitiveDict(
message["params"]["response"]["headers"])["Warcprox-Meta"])
self._reached_limit = brozzler.ReachedLimit(
warcprox_meta=warcprox_meta)
self.logger.info("reached limit %s", self._reached_limit)
if self.on_response:
self.on_response(message)
def _page_load_event_fired(self, message):
def page_url_after_load_event(message):
if message["result"]["result"]["value"] != self.url:
if self.on_url_change:
self.on_url_change(message["result"]["result"]["value"])
msg_id = self.send_to_chrome( msg_id = self.send_to_chrome(
method="Runtime.evaluate", method='Runtime.evaluate',
params={"expression":"document.URL"}) params={'expression': 'window.scrollTo(0, 0);'})
self._waiting_on_result_messages[msg_id] = page_url_after_load_event self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
self._result_messages.pop(msg_id)
self.logger.info("Page.loadEventFired, moving on to starting behaviors url={}".format(self.url)) def url(self, timeout=30):
self._behavior = Behavior(self.url, self) '''
self._behavior.start(self.behavior_parameters) Returns value of document.URL from the browser.
'''
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(
method='Runtime.evaluate',
params={'expression': 'document.URL'})
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
return message['result']['result']['value']
def _console_message_added(self, message): def run_behavior(self, behavior_script, timeout=900):
self.logger.debug("%s console.%s %s", self._websock.url, self.send_to_chrome(
message["params"]["message"]["level"], method='Runtime.evaluate', suppress_logging=True,
message["params"]["message"]["text"]) params={'expression': behavior_script})
def _debugger_paused(self, message): start = time.time()
# We hit the breakpoint set in visit_page. Get rid of google while True:
# analytics script! elapsed = time.time() - start
self.logger.debug("debugger paused! message={}".format(message)) if elapsed > timeout:
scriptId = message['params']['callFrames'][0]['location']['scriptId'] logging.info(
'behavior reached hard timeout after %.1fs', elapsed)
return
# replace script brozzler.sleep(7)
self.send_to_chrome(method="Debugger.setScriptSource", params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"})
# resume execution self._result_messages[self._command_id.peek_next()] = None
self.send_to_chrome(method="Debugger.resume") msg_id = self.send_to_chrome(
method='Runtime.evaluate', suppress_logging=True,
params={'expression': 'umbraBehaviorFinished()'})
try:
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=5)
msg = self._result_messages.get(msg_id)
if (msg and 'result' in msg
and not ('wasThrown' in msg['result']
and msg['result']['wasThrown'])
and 'result' in msg['result']
and type(msg['result']['result']['value']) == bool
and msg['result']['result']['value']):
self.logger.info('behavior decided it has finished')
return
except BrowsingTimeout:
pass
def _handle_message(self, websock, json_message):
message = json.loads(json_message)
if "method" in message:
if message["method"] == "Network.requestWillBeSent":
self._network_request_will_be_sent(message)
elif message["method"] == "Network.responseReceived":
self._network_response_received(message)
elif message["method"] == "Page.loadEventFired":
self._page_load_event_fired(message)
elif message["method"] == "Console.messageAdded":
self._console_message_added(message)
elif message["method"] == "Debugger.paused":
self._debugger_paused(message)
elif message["method"] == "Inspector.targetCrashed":
self._aw_snap_hes_dead_jim = message
# elif message["method"] in (
# "Network.dataReceived", "Network.responseReceived",
# "Network.loadingFinished"):
# pass
# else:
# self.logger.debug("%s %s", message["method"], json_message)
elif "result" in message:
if message["id"] in self._waiting_on_result_messages:
callback = self._waiting_on_result_messages[message["id"]]
del self._waiting_on_result_messages[message["id"]]
self.logger.debug(
"received result for message id=%s, calling %s",
message["id"], callback)
callback(message)
elif self._behavior and self._behavior.is_waiting_on_result(
message["id"]):
self._behavior.notify_of_result(message)
# else:
# self.logger.debug("%s", json_message)
# else:
# self.logger.debug("%s", json_message)

View file

@ -34,15 +34,23 @@ import tempfile
class Chrome: class Chrome:
logger = logging.getLogger(__module__ + '.' + __qualname__) logger = logging.getLogger(__module__ + '.' + __qualname__)
def __init__( def __init__(self, chrome_exe, port=9222, ignore_cert_errors=False):
self, port, executable, proxy=None, ignore_cert_errors=False, '''
cookie_db=None): Initializes instance of this class.
Doesn't start the browser, start() does that.
Args:
chrome_exe: filesystem path to chrome/chromium executable
port: chrome debugging protocol port (default 9222)
ignore_cert_errors: configure chrome to accept all certs (default
False)
'''
self.port = port self.port = port
self.executable = executable self.chrome_exe = chrome_exe
self.proxy = proxy
self.ignore_cert_errors = ignore_cert_errors self.ignore_cert_errors = ignore_cert_errors
self.cookie_db = cookie_db
self._shutdown = threading.Event() self._shutdown = threading.Event()
self.chrome_process = None
def __enter__(self): def __enter__(self):
''' '''
@ -71,17 +79,15 @@ class Chrome:
return default_port return default_port
def _init_cookie_db(self): def _init_cookie_db(self, cookie_db):
if self.cookie_db is not None:
cookie_dir = os.path.join(self._chrome_user_data_dir, 'Default') cookie_dir = os.path.join(self._chrome_user_data_dir, 'Default')
cookie_location = os.path.join(cookie_dir, 'Cookies') cookie_location = os.path.join(cookie_dir, 'Cookies')
self.logger.debug( self.logger.debug('cookie DB provided, writing to %s', cookie_location)
'cookie DB provided, writing to %s', cookie_location)
os.makedirs(cookie_dir, exist_ok=True) os.makedirs(cookie_dir, exist_ok=True)
try: try:
with open(cookie_location, 'wb') as cookie_file: with open(cookie_location, 'wb') as cookie_file:
cookie_file.write(self.cookie_db) cookie_file.write(cookie_db)
except OSError: except OSError:
self.logger.error( self.logger.error(
'exception writing cookie file at %s', 'exception writing cookie file at %s',
@ -110,21 +116,32 @@ class Chrome:
cookie_location, exc_info=True) cookie_location, exc_info=True)
return cookie_db return cookie_db
def start(self): def start(self, proxy=None, cookie_db=None):
''' '''
Returns websocket url to chrome window with about:blank loaded. Starts chrome/chromium process.
Args:
proxy: http proxy 'host:port' (default None)
cookie_db: raw bytes of chrome/chromium sqlite3 cookies database,
which, if supplied, will be written to
{chrome_user_data_dir}/Default/Cookies before running the
browser (default None)
Returns:
websocket url to chrome window with about:blank loaded
''' '''
# these can raise exceptions # these can raise exceptions
self._home_tmpdir = tempfile.TemporaryDirectory() self._home_tmpdir = tempfile.TemporaryDirectory()
self._chrome_user_data_dir = os.path.join( self._chrome_user_data_dir = os.path.join(
self._home_tmpdir.name, 'chrome-user-data') self._home_tmpdir.name, 'chrome-user-data')
self._init_cookie_db() if cookie_db:
self._init_cookie_db(cookie_db)
new_env = os.environ.copy() new_env = os.environ.copy()
new_env['HOME'] = self._home_tmpdir.name new_env['HOME'] = self._home_tmpdir.name
self.port = self._find_available_port(self.port) self.port = self._find_available_port(self.port)
chrome_args = [ chrome_args = [
self.executable, '--use-mock-keychain', # mac thing self.chrome_exe, '--use-mock-keychain', # mac thing
'--user-data-dir=%s' % self._chrome_user_data_dir, '--user-data-dir=%s' % self._chrome_user_data_dir,
'--remote-debugging-port=%s' % self.port, '--remote-debugging-port=%s' % self.port,
'--disable-web-sockets', '--disable-cache', '--disable-web-sockets', '--disable-cache',
@ -135,8 +152,8 @@ class Chrome:
'--disable-extensions', '--disable-save-password-bubble'] '--disable-extensions', '--disable-save-password-bubble']
if self.ignore_cert_errors: if self.ignore_cert_errors:
chrome_args.append('--ignore-certificate-errors') chrome_args.append('--ignore-certificate-errors')
if self.proxy: if proxy:
chrome_args.append('--proxy-server=%s' % self.proxy) chrome_args.append('--proxy-server=%s' % proxy)
chrome_args.append('about:blank') chrome_args.append('about:blank')
self.logger.info( self.logger.info(
'running: %s', repr(subprocess.list2cmdline(chrome_args))) 'running: %s', repr(subprocess.list2cmdline(chrome_args)))
@ -146,7 +163,8 @@ class Chrome:
stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0) stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0)
self._out_reader_thread = threading.Thread( self._out_reader_thread = threading.Thread(
target=self._read_stderr_stdout, target=self._read_stderr_stdout,
name='ChromeOutReaderThread(pid=%s)' % self.chrome_process.pid) name='ChromeOutReaderThread(pid=%s)' % self.chrome_process.pid,
daemon=True)
self._out_reader_thread.start() self._out_reader_thread.start()
self.logger.info('chrome running, pid %s' % self.chrome_process.pid) self.logger.info('chrome running, pid %s' % self.chrome_process.pid)
@ -171,6 +189,8 @@ class Chrome:
'got chrome window websocket debug url %s from %s', 'got chrome window websocket debug url %s from %s',
url, json_url) url, json_url)
return url return url
except brozzler.ShutdownRequested:
raise
except BaseException as e: except BaseException as e:
if int(time.time() - self._start) % 10 == 5: if int(time.time() - self._start) % 10 == 5:
self.logger.warn( self.logger.warn(
@ -236,18 +256,18 @@ class Chrome:
def stop(self): def stop(self):
if not self.chrome_process or self._shutdown.is_set(): if not self.chrome_process or self._shutdown.is_set():
return return
self._shutdown.set()
timeout_sec = 300 timeout_sec = 300
self._shutdown.set() if self.chrome_process.poll() is None:
self.logger.info('terminating chrome pgid %s' % self.chrome_process.pid) self.logger.info(
'terminating chrome pgid %s', self.chrome_process.pid)
os.killpg(self.chrome_process.pid, signal.SIGTERM) os.killpg(self.chrome_process.pid, signal.SIGTERM)
first_sigterm = time.time() t0 = time.time()
try: try:
while time.time() - first_sigterm < timeout_sec: while time.time() - t0 < timeout_sec:
time.sleep(0.5)
status = self.chrome_process.poll() status = self.chrome_process.poll()
if status is not None: if status is not None:
if status == 0: if status == 0:
@ -264,11 +284,12 @@ class Chrome:
# around, but there's a chance I suppose that some other # around, but there's a chance I suppose that some other
# process could have started with the same pgid # process could have started with the same pgid
return return
time.sleep(0.5)
self.logger.warn( self.logger.warn(
'chrome pid %s still alive %.1f seconds after sending ' 'chrome pid %s still alive %.1f seconds after sending '
'SIGTERM, sending SIGKILL', self.chrome_process.pid, 'SIGTERM, sending SIGKILL', self.chrome_process.pid,
time.time() - first_sigterm) time.time() - t0)
os.killpg(self.chrome_process.pid, signal.SIGKILL) os.killpg(self.chrome_process.pid, signal.SIGKILL)
status = self.chrome_process.wait() status = self.chrome_process.wait()
self.logger.warn( self.logger.warn(
@ -284,3 +305,4 @@ class Chrome:
finally: finally:
self._out_reader_thread.join() self._out_reader_thread.join()
self.chrome_process = None self.chrome_process = None

View file

@ -276,9 +276,7 @@ def brozzler_worker():
def sigint(signum, frame): def sigint(signum, frame):
raise brozzler.ShutdownRequested('shutdown requested (caught SIGINT)') raise brozzler.ShutdownRequested('shutdown requested (caught SIGINT)')
# do not print in signal handler to avoid RuntimeError: reentrant call def dump_state(signum, frame):
state_dump_msgs = []
def queue_state_dump(signum, frame):
signal.signal(signal.SIGQUIT, signal.SIG_IGN) signal.signal(signal.SIGQUIT, signal.SIG_IGN)
try: try:
state_strs = [] state_strs = []
@ -291,15 +289,15 @@ def brozzler_worker():
state_strs.append('<???:thread:ident=%s>' % ident) state_strs.append('<???:thread:ident=%s>' % ident)
stack = traceback.format_stack(frames[ident]) stack = traceback.format_stack(frames[ident])
state_strs.append(''.join(stack)) state_strs.append(''.join(stack))
state_dump_msgs.append( logging.info(
'dumping state (caught signal %s)\n%s' % ( 'dumping state (caught signal %s)\n%s' % (
signum, '\n'.join(state_strs))) signum, '\n'.join(state_strs)))
except BaseException as e: except BaseException as e:
state_dump_msgs.append('exception dumping state: %s' % e) logging.error('exception dumping state: %s' % e)
finally: finally:
signal.signal(signal.SIGQUIT, queue_state_dump) signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGQUIT, queue_state_dump) signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint) signal.signal(signal.SIGINT, sigint)
@ -311,17 +309,7 @@ def brozzler_worker():
frontier, service_registry, max_browsers=int(args.max_browsers), frontier, service_registry, max_browsers=int(args.max_browsers),
chrome_exe=args.chrome_exe) chrome_exe=args.chrome_exe)
worker.start() worker.run()
try:
while worker.is_alive():
while state_dump_msgs:
logging.warn(state_dump_msgs.pop(0))
time.sleep(0.5)
logging.critical('worker thread has died, shutting down')
except brozzler.ShutdownRequested as e:
pass
finally:
worker.shutdown_now()
logging.info('brozzler-worker is all done, exiting') logging.info('brozzler-worker is all done, exiting')

View file

@ -104,12 +104,10 @@ class BrozzlerWorker:
self._default_proxy = proxy self._default_proxy = proxy
self._default_enable_warcprox_features = enable_warcprox_features self._default_enable_warcprox_features = enable_warcprox_features
self._browser_pool = brozzler.browser.BrowserPool(max_browsers, self._browser_pool = brozzler.browser.BrowserPool(
chrome_exe=chrome_exe, ignore_cert_errors=True) max_browsers, chrome_exe=chrome_exe, ignore_cert_errors=True)
self._shutdown_requested = threading.Event()
self._thread = None
self._start_stop_lock = threading.Lock()
self._browsing_threads = set() self._browsing_threads = set()
self._browsing_threads_lock = threading.Lock()
def _proxy(self, site): def _proxy(self, site):
if site.proxy: if site.proxy:
@ -203,6 +201,8 @@ class BrozzlerWorker:
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8", content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
payload=info_json.encode("utf-8"), payload=info_json.encode("utf-8"),
extra_headers=site.extra_headers()) extra_headers=site.extra_headers())
except brozzler.ShutdownRequested as e:
raise
except BaseException as e: except BaseException as e:
if hasattr(e, "exc_info") and e.exc_info[0] == youtube_dl.utils.UnsupportedError: if hasattr(e, "exc_info") and e.exc_info[0] == youtube_dl.utils.UnsupportedError:
pass pass
@ -257,6 +257,8 @@ class BrozzlerWorker:
self._try_youtube_dl(ydl, site, page) self._try_youtube_dl(ydl, site, page)
except brozzler.ReachedLimit as e: except brozzler.ReachedLimit as e:
raise raise
except brozzler.ShutdownRequested:
raise
except Exception as e: except Exception as e:
if (hasattr(e, 'exc_info') and len(e.exc_info) >= 2 if (hasattr(e, 'exc_info') and len(e.exc_info) >= 2
and hasattr(e.exc_info[1], 'code') and hasattr(e.exc_info[1], 'code')
@ -272,12 +274,13 @@ class BrozzlerWorker:
self.logger.info('needs browsing: %s', page) self.logger.info('needs browsing: %s', page)
if not browser.is_running(): if not browser.is_running():
browser.start(proxy=self._proxy(site), cookie_db=site.cookie_db) browser.start(proxy=self._proxy(site), cookie_db=site.cookie_db)
outlinks = browser.browse_page( final_page_url, outlinks = browser.browse_page(
page.url, extra_headers=site.extra_headers(), page.url, extra_headers=site.extra_headers(),
behavior_parameters=site.behavior_parameters, behavior_parameters=site.behavior_parameters,
user_agent=site.user_agent, user_agent=site.user_agent,
on_screenshot=_on_screenshot, on_screenshot=_on_screenshot)
on_url_change=page.note_redirect) if final_page_url != page.url:
page.note_redirect(final_page_url)
return outlinks return outlinks
else: else:
if not self._already_fetched(page, ydl_spy): if not self._already_fetched(page, ydl_spy):
@ -319,14 +322,13 @@ class BrozzlerWorker:
return False return False
def _brozzle_site(self, browser, site): def _brozzle_site(self, browser, site):
start = time.time()
page = None page = None
try: try:
while (not self._shutdown_requested.is_set() start = time.time()
and time.time() - start < 7 * 60): while time.time() - start < 7 * 60:
self._frontier.honor_stop_request(site.job_id) self._frontier.honor_stop_request(site.job_id)
page = self._frontier.claim_page(site, "%s:%s" % ( page = self._frontier.claim_page(site, "%s:%s" % (
socket.gethostname(), browser.chrome_port)) socket.gethostname(), browser.chrome.port))
if (page.needs_robots_check and if (page.needs_robots_check and
not brozzler.is_permitted_by_robots(site, page.url)): not brozzler.is_permitted_by_robots(site, page.url)):
@ -336,26 +338,27 @@ class BrozzlerWorker:
self._frontier.scope_and_schedule_outlinks( self._frontier.scope_and_schedule_outlinks(
site, page, outlinks) site, page, outlinks)
if browser.is_running(): if browser.is_running():
site.cookie_db = browser.persist_and_read_cookie_db() site.cookie_db = browser.chrome.persist_and_read_cookie_db()
self._frontier.completed_page(site, page) self._frontier.completed_page(site, page)
page = None page = None
except brozzler.ShutdownRequested:
self.logger.info("shutdown requested")
except brozzler.NothingToClaim: except brozzler.NothingToClaim:
self.logger.info("no pages left for site %s", site) self.logger.info("no pages left for site %s", site)
except brozzler.ReachedLimit as e: except brozzler.ReachedLimit as e:
self._frontier.reached_limit(site, e) self._frontier.reached_limit(site, e)
except brozzler.CrawlJobStopped: except brozzler.CrawlJobStopped:
self._frontier.finished(site, "FINISHED_STOP_REQUESTED") self._frontier.finished(site, "FINISHED_STOP_REQUESTED")
except brozzler.browser.BrowsingAborted: # except brozzler.browser.BrowsingAborted:
self.logger.info("{} shut down".format(browser)) # self.logger.info("{} shut down".format(browser))
except: except:
self.logger.critical("unexpected exception", exc_info=True) self.logger.critical("unexpected exception", exc_info=True)
finally: finally:
self.logger.info("finished session brozzling site, stopping "
"browser and disclaiming site")
browser.stop() browser.stop()
self._frontier.disclaim_site(site, page) self._frontier.disclaim_site(site, page)
self._browser_pool.release(browser) self._browser_pool.release(browser)
with self._browsing_threads_lock:
self._browsing_threads.remove(threading.current_thread()) self._browsing_threads.remove(threading.current_thread())
def _service_heartbeat(self): def _service_heartbeat(self):
@ -380,31 +383,39 @@ class BrozzlerWorker:
"failed to send heartbeat and update service registry " "failed to send heartbeat and update service registry "
"with info %s: %s", status_info, e) "with info %s: %s", status_info, e)
def _service_heartbeat_if_due(self):
'''Sends service registry heartbeat if due'''
due = False
if self._service_registry:
if not hasattr(self, "status_info"):
due = True
else:
d = rethinkstuff.utcnow() - self.status_info["last_heartbeat"]
due = d.total_seconds() > self.HEARTBEAT_INTERVAL
if due:
self._service_heartbeat()
def run(self): def run(self):
try: try:
latest_state = None latest_state = None
while not self._shutdown_requested.is_set(): while True:
if self._service_registry and ( self._service_heartbeat_if_due()
not hasattr(self, "status_info")
or (rethinkstuff.utcnow() -
self.status_info["last_heartbeat"]).total_seconds()
> self.HEARTBEAT_INTERVAL):
self._service_heartbeat()
try: try:
browser = self._browser_pool.acquire() browser = self._browser_pool.acquire()
try: try:
site = self._frontier.claim_site("{}:{}".format( site = self._frontier.claim_site("%s:%s" % (
socket.gethostname(), browser.chrome_port)) socket.gethostname(), browser.chrome.port))
self.logger.info( self.logger.info(
"brozzling site (proxy=%s) %s", "brozzling site (proxy=%s) %s",
repr(self._proxy(site)), site) repr(self._proxy(site)), site)
th = threading.Thread( th = threading.Thread(
target=lambda: self._brozzle_site( target=self._brozzle_site, args=(browser, site),
browser, site), name="BrozzlingThread:%s" % site.seed,
name="BrozzlingThread:%s" % site.seed) daemon=True)
th.start() with self._browsing_threads_lock:
self._browsing_threads.add(th) self._browsing_threads.add(th)
th.start()
except: except:
self._browser_pool.release(browser) self._browser_pool.release(browser)
raise raise
@ -418,6 +429,8 @@ class BrozzlerWorker:
self.logger.info("no unclaimed sites to browse") self.logger.info("no unclaimed sites to browse")
latest_state = "no-unclaimed-sites" latest_state = "no-unclaimed-sites"
time.sleep(0.5) time.sleep(0.5)
except brozzler.ShutdownRequested:
self.logger.info("shutdown requested")
except: except:
self.logger.critical( self.logger.critical(
"thread exiting due to unexpected exception", "thread exiting due to unexpected exception",
@ -431,26 +444,16 @@ class BrozzlerWorker:
"failed to unregister from service registry", "failed to unregister from service registry",
exc_info=True) exc_info=True)
def start(self): self.logger.info(
with self._start_stop_lock: 'shutting down %s brozzling threads',
if self._thread: len(self._browsing_threads))
self.logger.warn( with self._browsing_threads_lock:
'ignoring start request because self._thread is ' for th in self._browsing_threads:
'not None') if th.is_alive():
return brozzler.thread_raise(th, brozzler.ShutdownRequested)
self._thread = threading.Thread(
target=self.run, name="BrozzlerWorker")
self._thread.start()
def shutdown_now(self):
with self._start_stop_lock:
self.logger.info("brozzler worker shutting down")
self._shutdown_requested.set()
self._browser_pool.shutdown_now() self._browser_pool.shutdown_now()
while self._browsing_threads: # copy to avoid "RuntimeError: Set changed size during iteration"
time.sleep(0.5) thredz = set(self._browsing_threads)
self._thread = None for th in thredz:
th.join()
def is_alive(self):
return self._thread and self._thread.is_alive()