working on major refactoring of browser management

This commit is contained in:
Noah Levitt 2016-12-09 16:28:11 -08:00
parent d68053764c
commit f23f928c16
6 changed files with 447 additions and 534 deletions

View File

@ -17,11 +17,7 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import json as _json
import logging as _logging
import surt as _surt
from pkg_resources import get_distribution as _get_distribution
__version__ = _get_distribution('brozzler').version
class ShutdownRequested(Exception):
@ -35,9 +31,11 @@ class CrawlJobStopped(Exception):
class ReachedLimit(Exception):
def __init__(self, http_error=None, warcprox_meta=None, http_payload=None):
import json
if http_error:
if "warcprox-meta" in http_error.headers:
self.warcprox_meta = _json.loads(http_error.headers["warcprox-meta"])
self.warcprox_meta = json.loads(
http_error.headers["warcprox-meta"])
else:
self.warcprox_meta = None
self.http_payload = http_error.read()
@ -69,6 +67,7 @@ def fixup(url):
'''
Does rudimentary canonicalization, such as converting IDN to punycode.
'''
import surt
hurl = _surt.handyurl.parse(url)
# handyurl.parse() already lowercases the scheme via urlsplit
if hurl.host:
@ -78,6 +77,60 @@ def fixup(url):
# logging level more fine-grained than logging.DEBUG==10
TRACE = 5
_behaviors = None
def behaviors():
import os, yaml, string
global _behaviors
if _behaviors is None:
behaviors_yaml = os.path.join(
os.path.dirname(__file__), 'behaviors.yaml')
with open(behaviors_yaml) as fin:
conf = yaml.load(fin)
_behaviors = conf['behaviors']
for behavior in _behaviors:
if 'behavior_js' in behavior:
behavior_js = os.path.join(
os.path.dirname(__file__), 'behaviors.d',
behavior['behavior_js'])
with open(behavior_js, encoding='utf-8') as fin:
behavior['script'] = fin.read()
elif 'behavior_js_template' in behavior:
behavior_js_template = os.path.join(
os.path.dirname(__file__), 'behaviors.d',
behavior['behavior_js_template'])
with open(behavior_js_template, encoding='utf-8') as fin:
behavior['template'] = string.Template(fin.read())
return _behaviors
def behavior_script(url, template_parameters=None):
'''
Returns the javascript behavior string populated with template_parameters.
'''
import re, logging
for behavior in behaviors():
if re.match(behavior['url_regex'], url):
if 'behavior_js' in behavior:
logging.info(
'using behavior %s for %s',
behavior['behavior_js'], url)
elif 'behavior_js_template' in behavior:
parameters = dict()
if 'default_parameters' in behavior:
parameters.update(behavior['default_parameters'])
if template_parameters:
parameters.update(template_parameters)
javascript = behavior['template'].safe_substitute(parameters)
logging.info(
'using template=%s populated with parameters=%s for %s',
repr(behavior['behavior_js_template']), parameters, url)
return behavior['script']
return None
from brozzler.site import Page, Site
from brozzler.worker import BrozzlerWorker
from brozzler.robots import is_permitted_by_robots

View File

@ -1,137 +0,0 @@
'''
brozzler/behaviors.py - manages behaviors, which are javascript scripts that
run in brozzled web pages
Copyright (C) 2014-2016 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import json
import itertools
import os
import re
import logging
import time
import sys
import yaml
import string
__all__ = ["Behavior"]
class Behavior:
logger = logging.getLogger(__module__ + "." + __qualname__)
_behaviors = None
@staticmethod
def behaviors():
if Behavior._behaviors is None:
behaviors_yaml = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.yaml'])
with open(behaviors_yaml) as fin:
conf = yaml.load(fin)
Behavior._behaviors = conf['behaviors']
for behavior in Behavior._behaviors:
if "behavior_js" in behavior:
behavior_js = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ["behaviors.d"] + [behavior["behavior_js"]])
with open(behavior_js, encoding="utf-8") as fin:
behavior["script"] = fin.read()
elif "behavior_js_template" in behavior:
behavior_js_template = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ["behaviors.d"] + [behavior["behavior_js_template"]])
with open(behavior_js_template, encoding="utf-8") as fin:
behavior["template"] = string.Template(fin.read())
return Behavior._behaviors
def __init__(self, url, umbra_worker):
self.url = url
self.umbra_worker = umbra_worker
self.script_finished = False
self.waiting_result_msg_ids = []
self.active_behavior = None
self.last_activity = time.time()
def start(self, template_parameters=None):
for behavior in Behavior.behaviors():
if re.match(behavior['url_regex'], self.url):
if "behavior_js" in behavior:
self.logger.info("using %s behavior for %s",
behavior["behavior_js"], self.url)
elif "behavior_js_template" in behavior:
parameters = dict()
if "default_parameters" in behavior:
parameters.update(behavior["default_parameters"])
if template_parameters:
parameters.update(template_parameters)
behavior["script"] = behavior["template"].safe_substitute(parameters)
self.logger.info(
"using template=%s populated with parameters=%s for %s",
repr(behavior["behavior_js_template"]),
parameters, self.url)
self.active_behavior = behavior
self.umbra_worker.send_to_chrome(
method="Runtime.evaluate", suppress_logging=True,
params={"expression": behavior["script"]})
self.notify_of_activity()
return
self.logger.warn("no behavior to run on {}".format(self.url))
def is_finished(self):
"""Asynchronously asks behavior if it is finished, and in the mean time
returns the response from the previous such query."""
msg_id = self.umbra_worker.send_to_chrome(method="Runtime.evaluate",
suppress_logging=True, params={"expression":"umbraBehaviorFinished()"})
self.waiting_result_msg_ids.append(msg_id)
request_idle_timeout_sec = 30
if self.active_behavior and 'request_idle_timeout_sec' in self.active_behavior:
request_idle_timeout_sec = self.active_behavior['request_idle_timeout_sec']
idle_time = time.time() - self.last_activity
return self.script_finished and idle_time > request_idle_timeout_sec
def is_waiting_on_result(self, msg_id):
return msg_id in self.waiting_result_msg_ids
def notify_of_result(self, chrome_message):
# {'id': 59, 'result': {'result': {'type': 'boolean', 'value': True}, 'wasThrown': False}}
# {'id': 59, 'result': {'result': {'type': 'boolean', 'value': False}}
self.waiting_result_msg_ids.remove(chrome_message['id'])
if ('result' in chrome_message and not (
'wasThrown' in chrome_message['result']
and chrome_message['result']['wasThrown'])
and 'result' in chrome_message['result']
and type(chrome_message['result']['result']['value']) == bool):
self.script_finished = chrome_message['result']['result']['value']
else:
# this happens if the behavior script doesn't define
# umbraBehaviorFinished, and I think it can also happen normally
# after the behavior has been sent to the browser but before
# the browser has it fully loaded... in any case the message
# was overwhelming the logs, so I'm bumping it down to debug level
self.logger.debug(
"chrome message doesn't look like a boolean result! %s",
chrome_message)
def notify_of_activity(self):
self.last_activity = time.time()
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logger = logging.getLogger('umbra.behaviors')
logger.info("custom behaviors: {}".format(Behavior.behaviors()))

View File

@ -17,45 +17,61 @@ limitations under the License.
'''
import logging
import json
import time
import brozzler
import itertools
import json
import websocket
import time
import threading
import os
import random
import brozzler
from brozzler.chrome import Chrome
from brozzler.behaviors import Behavior
from requests.structures import CaseInsensitiveDict
import base64
import sqlite3
import datetime
import base64
from brozzler.chrome import Chrome
__all__ = ["BrowserPool", "Browser"]
class BrowsingException(Exception):
pass
class NoBrowsersAvailable(Exception):
pass
class BrowsingTimeout(BrowsingException):
pass
class BrowserPool:
logger = logging.getLogger(__module__ + "." + __qualname__)
logger = logging.getLogger(__module__ + '.' + __qualname__)
BASE_PORT = 9200
def __init__(self, size=3, **kwargs):
"""kwargs are passed on to Browser.__init__"""
'''
Initializes the pool.
Args:
size: size of pool (default 3)
**kwargs: arguments for Browser(...)
'''
self.size = size
self._available = set()
self._in_use = set()
for i in range(0, size):
browser = Browser(BrowserPool.BASE_PORT + i, **kwargs)
browser = Browser(port=BrowserPool.BASE_PORT + i, **kwargs)
self._available.add(browser)
self._lock = threading.Lock()
def acquire(self):
"""
Returns browser from pool if available, raises NoBrowsersAvailable
otherwise.
"""
'''
Returns an available instance.
Returns:
browser from pool, if available
Raises:
NoBrowsersAvailable if none available
'''
with self._lock:
try:
browser = self._available.pop()
@ -79,46 +95,22 @@ class BrowserPool:
def num_in_use(self):
return len(self._in_use)
class NoBrowsersAvailable(Exception):
pass
class BrowsingException(Exception):
pass
class BrowsingAborted(BrowsingException):
pass
class ResultMessageTimeout(BrowsingException):
pass
class Browser:
"""
Runs chrome/chromium to synchronously browse one page at a time using
worker.browse_page(). Should not be accessed from multiple threads.
"""
'''
Manages an instance of Chrome for browsing pages.
'''
logger = logging.getLogger(__module__ + '.' + __qualname__)
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, **kwargs):
'''
Initializes the Browser.
HARD_TIMEOUT_SECONDS = 20 * 60
def __init__(
self, chrome_port=9222, chrome_exe='chromium-browser', proxy=None,
ignore_cert_errors=False):
self.command_id = itertools.count(1)
self.chrome_port = chrome_port
self.chrome_exe = chrome_exe
self.proxy = proxy
self.ignore_cert_errors = ignore_cert_errors
self._behavior = None
self._websock = None
self._abort_browse_page = False
self._chrome_instance = None
self._aw_snap_hes_dead_jim = None
self._work_dir = None
self._websocket_url = None
def __repr__(self):
return "{}.{}:{}".format(Browser.__module__, Browser.__qualname__, self.chrome_port)
Args:
**kwargs: arguments for Chrome(...)
'''
self.chrome = Chrome(**kwargs)
self.websocket_url = None
self.is_browsing = False
def __enter__(self):
self.start()
@ -127,115 +119,274 @@ class Browser:
def __exit__(self, *args):
self.stop()
def start(self, proxy=None, cookie_db=None):
if not self._chrome_instance:
self._chrome_instance = Chrome(
port=self.chrome_port, executable=self.chrome_exe,
ignore_cert_errors=self.ignore_cert_errors,
proxy=proxy or self.proxy, cookie_db=None)
try:
self._websocket_url = self._chrome_instance.start()
except:
self._chrome_instance = None
raise
def start(self, **kwargs):
'''
Starts chrome if it's not running.
Args:
**kwargs: arguments for self.chrome.start(...)
'''
if not self.is_running():
self.websocket_url = self.chrome.start(**kwargs)
self._browser_controller = BrowserController(self.websocket_url)
self._browser_controller.start()
def stop(self):
'''
Stops chrome if it's running.
'''
try:
if self.is_running():
self._chrome_instance.stop()
self._chrome_instance = None
self._websocket_url = None
self.chrome.stop()
self.websocket_url = None
except:
self.logger.error("problem stopping", exc_info=True)
self.logger.error('problem stopping', exc_info=True)
def is_running(self):
return bool(self._websocket_url)
def abort_browse_page(self):
self._abort_browse_page = True
def persist_and_read_cookie_db(self):
if self._chrome_instance:
return self._chrome_instance.persist_and_read_cookie_db()
else:
return None
return self.websocket_url is not None
def browse_page(
self, url, extra_headers=None, behavior_parameters=None,
user_agent=None,
on_request=None, on_response=None, on_screenshot=None,
on_url_change=None):
"""
Synchronously loads a page, runs behaviors, and takes a screenshot.
self, page_url, ignore_cert_errors=False, extra_headers=None,
user_agent=None, behavior_parameters=None,
on_request=None, on_response=None, on_screenshot=None):
'''
Browses page in browser.
Raises BrowsingException if browsing the page fails in a non-critical
way.
Browser should already be running, i.e. start() should have been
called. Opens the page_url in the browser, runs behaviors, takes a
screenshot, extracts outlinks.
Returns extracted outlinks.
"""
Args:
page_url: url of the page to browse
extra_headers: dict of extra http headers to configure the browser
to send with every request (default None)
user_agent: user agent string, replaces browser default if
supplied (default None)
behavior_parameters: dict of parameters for populating the
javascript behavior template (default None)
on_request: callback to invoke on every Network.requestWillBeSent
event, takes one argument, the json-decoded message (default
None)
on_response: callback to invoke on every Network.responseReceived
event, takes one argument, the json-decoded message (default
None)
on_screenshot: callback to invoke when screenshot is obtained,
takes one argument, the the raw jpeg bytes (default None)
# XXX takes two arguments, the url of the page at the time the
# screenshot was taken, and the raw jpeg bytes (default None)
Returns:
A tuple (final_page_url, outlinks).
final_page_url: the url in the location bar at the end of the
browse_page cycle, which could be different from the original
page url if the page redirects, javascript has changed the url
in the location bar, etc
outlinks: a list of navigational links extracted from the page
Raises:
BrowsingException: if browsing the page fails
'''
if not self.is_running():
raise BrowsingException("browser has not been started")
self.url = url
self.extra_headers = extra_headers
self.user_agent = user_agent
self.on_request = on_request
self.on_screenshot = on_screenshot
self.on_url_change = on_url_change
self.on_response = on_response
self.behavior_parameters = behavior_parameters
raise BrowsingException('browser has not been started')
if self.is_browsing:
raise BrowsingException('browser is already busy browsing a page')
self.is_browsing = True
self._browser_controller.navigate_to_page(page_url, timeout=300)
## if login_credentials:
## self._browser_controller.try_login(login_credentials) (5 min?)
behavior_script = brozzler.behavior_script(
page_url, behavior_parameters)
self._browser_controller.run_behavior(behavior_script, timeout=900)
if on_screenshot:
self._browser_controller.scroll_to_top()
jpeg_bytes = self._browser_controller.screenshot()
on_screenshot(jpeg_bytes)
outlinks = self._browser_controller.extract_outlinks()
## for each hashtag not already visited:
## navigate_to_hashtag (nothing to wait for so no timeout?)
## if on_screenshot;
## take screenshot (30 sec)
## run behavior (3 min)
## outlinks += retrieve_outlinks (60 sec)
final_page_url = self._browser_controller.url()
self._outlinks = None
self._reached_limit = None
self._aw_snap_hes_dead_jim = None
self._abort_browse_page = False
self._has_screenshot = False
self._waiting_on_result_messages = {}
self._result_message_timeout = None
self._websock = websocket.WebSocketApp(
self._websocket_url, on_open=self._visit_page,
on_message=self._wrap_handle_message)
thread_name = "WebsockThread:{}-{:%Y%m%d%H%M%S}".format(
self.chrome_port, datetime.datetime.utcnow())
websock_thread = threading.Thread(
target=self._websock.run_forever, name=thread_name,
kwargs={'ping_timeout':0.5})
websock_thread.start()
self._start = time.time()
aborted = False
self.is_browsing = False
return final_page_url, outlinks
class Counter:
def __init__(self):
self.next_value = 0
def __next__(self):
try:
while True:
time.sleep(0.5)
if self._browse_interval_func():
return self._outlinks
return self.next_value
finally:
self.next_value += 1
def peek_next(self):
return self.next_value
class BrowserController:
'''
'''
logger = logging.getLogger(__module__ + '.' + __qualname__)
def __init__(self, websocket_url):
self.websocket_url = websocket_url
self._command_id = Counter()
self._websock_thread = None
self._websock_open = None
self._got_page_load_event = None
self._result_messages = {}
def _wait_for(self, callback, timeout=None):
'''
Spins until callback() returns truthy.
'''
start = time.time()
while True:
if callback():
return
elapsed = time.time() - start
if timeout and elapsed > timeout:
raise BrowsingTimeout(
'timed out after %.1fs waiting for: %s' % (
elapsed, callback))
time.sleep(0.5)
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
def start(self):
if not self._websock_thread:
def on_open(websock):
self._websock_open = datetime.datetime.utcnow()
# open websocket, start thread that receives messages
self._websock = websocket.WebSocketApp(
self.websocket_url, on_open=on_open,
on_message=self._on_message)
thread_name = 'WebsockThread:{}-{:%Y%m%d%H%M%S}'.format(
self.websocket_url, datetime.datetime.utcnow())
self._websock_thread = threading.Thread(
target=self._websock.run_forever, name=thread_name,
kwargs={'ping_timeout': 0.5})
self._websock_thread.start()
self._wait_for(lambda: self._websock_open, timeout=10)
# tell browser to send messages we're interested in
self.send_to_chrome(method='Network.enable')
self.send_to_chrome(method='Page.enable')
self.send_to_chrome(method='Console.enable')
self.send_to_chrome(method='Debugger.enable')
self.send_to_chrome(method='Runtime.enable')
# disable google analytics, see _handle_message() where breakpoint
# is caught Debugger.paused
self.send_to_chrome(
method='Debugger.setBreakpointByUrl',
params={
'lineNumber': 1,
'urlRegex': 'https?://www.google-analytics.com/analytics.js'})
def stop(self, *args):
if self._websock_thread:
if (self._websock and self._websock.sock
and self._websock.sock.connected):
try:
self._websock.close()
except BaseException as e:
self.logger.error(
"exception closing websocket %s - %s" % (
self._websock, e))
'exception closing websocket %s - %s',
self._websock, e)
websock_thread.join(timeout=30)
if websock_thread.is_alive():
self._websock_thread.join(timeout=30)
if self._websock_thread.is_alive():
self.logger.error(
"%s still alive 30 seconds after closing %s, will "
"forcefully nudge it again" % (
websock_thread, self._websock))
'%s still alive 30 seconds after closing %s, will '
'forcefully nudge it again', self._websock_thread,
self._websock)
self._websock.keep_running = False
websock_thread.join(timeout=30)
if websock_thread.is_alive():
self._websock_thread.join(timeout=30)
if self._websock_thread.is_alive():
self.logger.critical(
"%s still alive 60 seconds after closing %s" % (
websock_thread, self._websock))
'%s still alive 60 seconds after closing %s',
self._websock_thread, self._websock)
self._behavior = None
def _on_message(self, websock, message):
try:
self._handle_message(websock, message)
except:
self.logger.error(
'uncaught exception in _handle_message message=%s',
message, exc_info=True)
self.abort = True
OUTLINKS_JS = r"""
def _handle_message(self, websock, json_message):
message = json.loads(json_message)
if 'method' in message:
if message['method'] == 'Page.loadEventFired':
self._got_page_load_event = datetime.datetime.utcnow()
elif message["method"] == "Debugger.paused":
self._debugger_paused(message)
elif 'result' in message:
if message['id'] in self._result_messages:
self._result_messages[message['id']] = message
def _debugger_paused(self, message):
# we hit the breakpoint set in start(), get rid of google analytics
self.logger.debug('debugger paused! message=%s', message)
scriptId = message['params']['callFrames'][0]['location']['scriptId']
# replace script
self.send_to_chrome(
method='Debugger.setScriptSource',
params={'scriptId': scriptId,
'scriptSource': 'console.log("google analytics is no more!");'})
# resume execution
self.send_to_chrome(method='Debugger.resume')
def send_to_chrome(self, suppress_logging=False, **kwargs):
msg_id = next(self._command_id)
kwargs['id'] = msg_id
msg = json.dumps(kwargs)
if not suppress_logging:
self.logger.debug('sending message to %s: %s', self._websock, msg)
self._websock.send(msg)
return msg_id
def navigate_to_page(
self, page_url, extra_headers=None, user_agent=None, timeout=300):
'''
'''
# navigate to about:blank here to avoid situation where we navigate to
# the same page that we're currently on, perhaps with a different
# #fragment, which prevents Page.loadEventFired from happening
self.send_to_chrome(
method='Page.navigate', params={'url': 'about:blank'})
headers = extra_headers or {}
headers['Accept-Encoding'] = 'identity'
self.send_to_chrome(
method='Network.setExtraHTTPHeaders',
params={'headers': headers})
if user_agent:
self.send_to_chrome(
method='Network.setUserAgentOverride',
params={'userAgent': user_agent})
# navigate to the page!
self.logger.info('navigating to page %s', page_url)
self.send_to_chrome(method='Page.navigate', params={'url': page_url})
self._wait_for(lambda:self._got_page_load_event, timeout=timeout)
OUTLINKS_JS = r'''
var __brzl_framesDone = new Set();
var __brzl_compileOutlinks = function(frame) {
__brzl_framesDone.add(frame);
@ -244,265 +395,94 @@ var __brzl_compileOutlinks = function(frame) {
frame.document.querySelectorAll('a[href]'));
for (var i = 0; i < frame.frames.length; i++) {
if (frame.frames[i] && !__brzl_framesDone.has(frame.frames[i])) {
outlinks = outlinks.concat(__brzl_compileOutlinks(frame.frames[i]));
outlinks = outlinks.concat(
__brzl_compileOutlinks(frame.frames[i]));
}
}
}
return outlinks;
}
__brzl_compileOutlinks(window).join('\n');
"""
def _chain_chrome_messages(self, chain):
"""
Sends a series of messages to chrome/chromium on the debugging protocol
websocket. Waits for a reply from each one before sending the next.
Enforces a timeout waiting for each reply. If the timeout is hit, sets
self._result_message_timeout with a ResultMessageTimeout (an exception
class). Takes an array of dicts, each of which should look like this:
{
"info": "human readable description",
"chrome_msg": { ... }, # message to send to chrome, as a dict
"timeout": 30, # timeout in seconds
"callback": my_callback, # takes one arg, the result message
}
The code is rather convoluted because of the asynchronous nature of the
whole thing. See how it's used in _start_postbehavior_chain.
"""
timer = None
def callback(message):
if timer:
timer.cancel()
if "callback" in chain[0]:
chain[0]["callback"](message)
self._chain_chrome_messages(chain[1:])
def timeout():
self._result_message_timeout = ResultMessageTimeout(
"timed out after %.1fs waiting for result message "
"for %s", chain[0]["timeout"], chain[0]["chrome_msg"])
if chain:
msg_id = self.send_to_chrome(**chain[0]["chrome_msg"])
self._waiting_on_result_messages[msg_id] = callback
self.logger.info(
"msg_id=%s for message %s", msg_id, chain[0]["chrome_msg"])
timer = threading.Timer(chain[0]["timeout"], timeout)
timer.daemon = True
timer.start()
else:
self.logger.info("finished chrome message chain")
def _start_postbehavior_chain(self):
if self.on_screenshot:
chain = [{
"info": "scrolling to top",
"chrome_msg": {
"method": "Runtime.evaluate",
"params": {"expression": "window.scrollTo(0, 0);"},
},
"timeout": 30,
"callback": lambda message: None,
}, {
"info": "requesting screenshot",
"chrome_msg": {"method": "Page.captureScreenshot"},
"timeout": 30,
"callback": lambda message: (
self.on_screenshot and self.on_screenshot(
base64.b64decode(message["result"]["data"]))),
}]
else:
chain = []
def set_outlinks(message):
if message["result"]["result"]["value"]:
self._outlinks = frozenset(
message["result"]["result"]["value"].split("\n"))
else:
self._outlinks = frozenset()
chain.append({
"info": "retrieving outlinks",
"chrome_msg": {
"method": "Runtime.evaluate",
"params": {"expression": self.OUTLINKS_JS},
},
"timeout": 60,
"callback": set_outlinks,
})
self._chain_chrome_messages(chain)
def _browse_interval_func(self):
"""Called periodically while page is being browsed. Returns True when
finished browsing."""
if (not self._websock or not self._websock.sock
or not self._websock.sock.connected):
raise BrowsingException(
"websocket closed, did chrome die? {}".format(
self._websocket_url))
elif self._result_message_timeout:
raise self._result_message_timeout
elif self._aw_snap_hes_dead_jim:
raise BrowsingException(
"""chrome tab went "aw snap" or "he's dead jim"!""")
elif self._outlinks is not None:
# setting self._outlinks is the last thing that happens in the
# post-behavior chain
return True
elif (self._behavior != None and self._behavior.is_finished()
or time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS):
if self._behavior and self._behavior.is_finished():
self.logger.info(
"behavior decided it's finished with %s", self.url)
else:
self.logger.info(
"reached hard timeout of %s seconds url=%s",
Browser.HARD_TIMEOUT_SECONDS, self.url)
self._behavior = None
self._start_postbehavior_chain()
return False
elif self._reached_limit:
raise self._reached_limit
elif self._abort_browse_page:
raise BrowsingAborted("browsing page aborted")
else:
return False
def send_to_chrome(self, suppress_logging=False, **kwargs):
msg_id = next(self.command_id)
kwargs["id"] = msg_id
msg = json.dumps(kwargs)
if not suppress_logging:
self.logger.debug("sending message to %s: %s", self._websock, msg)
self._websock.send(msg)
return msg_id
def _visit_page(self, websock):
# navigate to about:blank here to avoid situation where we navigate to
# the same page that we're currently on, perhaps with a different
# #fragment, which prevents Page.loadEventFired from happening
self.send_to_chrome(method="Page.navigate", params={"url": "about:blank"})
self.send_to_chrome(method="Network.enable")
self.send_to_chrome(method="Page.enable")
self.send_to_chrome(method="Console.enable")
self.send_to_chrome(method="Debugger.enable")
self.send_to_chrome(method="Runtime.enable")
headers = self.extra_headers or {}
headers['Accept-Encoding'] = 'identity'
self.send_to_chrome(
method="Network.setExtraHTTPHeaders",
params={"headers":headers})
if self.user_agent:
self.send_to_chrome(method="Network.setUserAgentOverride", params={"userAgent": self.user_agent})
# disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused"
self.send_to_chrome(method="Debugger.setBreakpointByUrl", params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"})
# navigate to the page!
self.send_to_chrome(method="Page.navigate", params={"url": self.url})
def _wrap_handle_message(self, websock, message):
try:
self._handle_message(websock, message)
except:
self.logger.error(
"uncaught exception in _handle_message message=%s",
message, exc_info=True)
self.abort_browse_page()
def _network_request_will_be_sent(self, message):
if self._behavior:
self._behavior.notify_of_activity()
if message["params"]["request"]["url"].lower().startswith("data:"):
self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80]))
elif self.on_request:
self.on_request(message)
def _network_response_received(self, message):
if (not self._reached_limit
and message["params"]["response"]["status"] == 420
and "Warcprox-Meta" in CaseInsensitiveDict(
message["params"]["response"]["headers"])):
warcprox_meta = json.loads(CaseInsensitiveDict(
message["params"]["response"]["headers"])["Warcprox-Meta"])
self._reached_limit = brozzler.ReachedLimit(
warcprox_meta=warcprox_meta)
self.logger.info("reached limit %s", self._reached_limit)
if self.on_response:
self.on_response(message)
def _page_load_event_fired(self, message):
def page_url_after_load_event(message):
if message["result"]["result"]["value"] != self.url:
if self.on_url_change:
self.on_url_change(message["result"]["result"]["value"])
'''
def extract_outlinks(self, timeout=60):
self.logger.info('extracting outlinks')
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(
method="Runtime.evaluate",
params={"expression":"document.URL"})
self._waiting_on_result_messages[msg_id] = page_url_after_load_event
method='Runtime.evaluate',
params={'expression': self.OUTLINKS_JS})
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
if message['result']['result']['value']:
return frozenset(message['result']['result']['value'].split('\n'))
else:
self._outlinks = frozenset()
self.logger.info("Page.loadEventFired, moving on to starting behaviors url={}".format(self.url))
self._behavior = Behavior(self.url, self)
self._behavior.start(self.behavior_parameters)
def screenshot(self, timeout=30):
self.logger.info('taking screenshot')
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(method='Page.captureScreenshot')
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
jpeg_bytes = base64.b64decode(message['result']['data'])
return jpeg_bytes
def _console_message_added(self, message):
self.logger.debug("%s console.%s %s", self._websock.url,
message["params"]["message"]["level"],
message["params"]["message"]["text"])
def scroll_to_top(self, timeout=30):
self.logger.info('scrolling to top')
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(
method='Runtime.evaluate',
params={'expression': 'window.scrollTo(0, 0);'})
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
self._result_messages.pop(msg_id)
def _debugger_paused(self, message):
# We hit the breakpoint set in visit_page. Get rid of google
# analytics script!
self.logger.debug("debugger paused! message={}".format(message))
scriptId = message['params']['callFrames'][0]['location']['scriptId']
def url(self, timeout=30):
'''
Returns value of document.URL from the browser.
'''
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(
method='Runtime.evaluate',
params={'expression': 'document.URL'})
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
return message['result']['result']['value']
# replace script
self.send_to_chrome(method="Debugger.setScriptSource", params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"})
def run_behavior(self, behavior_script, timeout=900):
self.send_to_chrome(
method='Runtime.evaluate', suppress_logging=True,
params={'expression': behavior_script})
# resume execution
self.send_to_chrome(method="Debugger.resume")
start = time.time()
while True:
elapsed = time.time() - start
if elapsed > timeout:
logging.info(
'behavior reached hard timeout after %.1fs', elapsed)
return
time.sleep(7)
self._result_messages[self._command_id.peek_next()] = None
msg_id = self.send_to_chrome(
method='Runtime.evaluate', suppress_logging=True,
params={'expression': 'umbraBehaviorFinished()'})
try:
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=5)
msg = self._result_messages.get(msg_id)
if (msg and 'result' in msg
and not ('wasThrown' in msg['result']
and msg['result']['wasThrown'])
and 'result' in msg['result']
and type(msg['result']['result']['value']) == bool
and msg['result']['result']['value']):
self.logger.info('behavior decided it has finished')
return
except BrowsingTimeout:
pass
def _handle_message(self, websock, json_message):
message = json.loads(json_message)
if "method" in message:
if message["method"] == "Network.requestWillBeSent":
self._network_request_will_be_sent(message)
elif message["method"] == "Network.responseReceived":
self._network_response_received(message)
elif message["method"] == "Page.loadEventFired":
self._page_load_event_fired(message)
elif message["method"] == "Console.messageAdded":
self._console_message_added(message)
elif message["method"] == "Debugger.paused":
self._debugger_paused(message)
elif message["method"] == "Inspector.targetCrashed":
self._aw_snap_hes_dead_jim = message
# elif message["method"] in (
# "Network.dataReceived", "Network.responseReceived",
# "Network.loadingFinished"):
# pass
# else:
# self.logger.debug("%s %s", message["method"], json_message)
elif "result" in message:
if message["id"] in self._waiting_on_result_messages:
callback = self._waiting_on_result_messages[message["id"]]
del self._waiting_on_result_messages[message["id"]]
self.logger.debug(
"received result for message id=%s, calling %s",
message["id"], callback)
callback(message)
elif self._behavior and self._behavior.is_waiting_on_result(
message["id"]):
self._behavior.notify_of_result(message)
# else:
# self.logger.debug("%s", json_message)
# else:
# self.logger.debug("%s", json_message)

View File

@ -35,10 +35,26 @@ class Chrome:
logger = logging.getLogger(__module__ + '.' + __qualname__)
def __init__(
self, port, executable, proxy=None, ignore_cert_errors=False,
self, chrome_exe, port=9222, ignore_cert_errors=False, proxy=None,
cookie_db=None):
'''
Initializes instance of this class.
Doesn't start the browser, start() does that.
Args:
chrome_exe: filesystem path to chrome/chromium executable
port: chrome debugging protocol port (default 9222)
ignore_cert_errors: configure chrome to accept all certs (default
False)
proxy: http proxy 'host:port' (default None)
cookie_db: raw bytes of chrome/chromium sqlite3 cookies database,
which, if supplied, will be written to
{chrome_user_data_dir}/Default/Cookies before running the
browser (default None)
'''
self.port = port
self.executable = executable
self.chrome_exe = chrome_exe
self.proxy = proxy
self.ignore_cert_errors = ignore_cert_errors
self.cookie_db = cookie_db
@ -124,7 +140,7 @@ class Chrome:
new_env['HOME'] = self._home_tmpdir.name
self.port = self._find_available_port(self.port)
chrome_args = [
self.executable, '--use-mock-keychain', # mac thing
self.chrome_exe, '--use-mock-keychain', # mac thing
'--user-data-dir=%s' % self._chrome_user_data_dir,
'--remote-debugging-port=%s' % self.port,
'--disable-web-sockets', '--disable-cache',

View File

@ -159,8 +159,8 @@ def brozzle_page():
f.write(screenshot_png)
logging.info('wrote screenshot to %s', filename)
browser = brozzler.Browser(chrome_exe=args.chrome_exe)
browser.start(proxy=site.proxy)
browser = brozzler.Browser(chrome_exe=args.chrome_exe, proxy=site.proxy)
browser.start()
try:
outlinks = worker.brozzle_page(
browser, site, page, on_screenshot=on_screenshot)

View File

@ -272,12 +272,13 @@ class BrozzlerWorker:
self.logger.info('needs browsing: %s', page)
if not browser.is_running():
browser.start(proxy=self._proxy(site), cookie_db=site.cookie_db)
outlinks = browser.browse_page(
final_page_url, outlinks = browser.browse_page(
page.url, extra_headers=site.extra_headers(),
behavior_parameters=site.behavior_parameters,
user_agent=site.user_agent,
on_screenshot=_on_screenshot,
on_url_change=page.note_redirect)
on_screenshot=_on_screenshot)
if final_page_url != page.url:
page.note_redirect(final_page_url)
return outlinks
else:
if not self._already_fetched(page, ydl_spy):
@ -336,7 +337,7 @@ class BrozzlerWorker:
self._frontier.scope_and_schedule_outlinks(
site, page, outlinks)
if browser.is_running():
site.cookie_db = browser.persist_and_read_cookie_db()
site.cookie_db = browser.chrome.persist_and_read_cookie_db()
self._frontier.completed_page(site, page)
page = None
@ -394,8 +395,8 @@ class BrozzlerWorker:
try:
browser = self._browser_pool.acquire()
try:
site = self._frontier.claim_site("{}:{}".format(
socket.gethostname(), browser.chrome_port))
site = self._frontier.claim_site("%s:%s" % (
socket.gethostname(), browser.chrome.port))
self.logger.info(
"brozzling site (proxy=%s) %s",
repr(self._proxy(site)), site)