diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 02cc9b2..c7bb63c 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -17,11 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. """ -import json as _json -import logging as _logging -import surt as _surt from pkg_resources import get_distribution as _get_distribution - __version__ = _get_distribution('brozzler').version class ShutdownRequested(Exception): @@ -35,9 +31,11 @@ class CrawlJobStopped(Exception): class ReachedLimit(Exception): def __init__(self, http_error=None, warcprox_meta=None, http_payload=None): + import json if http_error: if "warcprox-meta" in http_error.headers: - self.warcprox_meta = _json.loads(http_error.headers["warcprox-meta"]) + self.warcprox_meta = json.loads( + http_error.headers["warcprox-meta"]) else: self.warcprox_meta = None self.http_payload = http_error.read() @@ -69,6 +67,7 @@ def fixup(url): ''' Does rudimentary canonicalization, such as converting IDN to punycode. ''' + import surt hurl = _surt.handyurl.parse(url) # handyurl.parse() already lowercases the scheme via urlsplit if hurl.host: @@ -78,6 +77,60 @@ def fixup(url): # logging level more fine-grained than logging.DEBUG==10 TRACE = 5 +_behaviors = None +def behaviors(): + import os, yaml, string + global _behaviors + if _behaviors is None: + behaviors_yaml = os.path.join( + os.path.dirname(__file__), 'behaviors.yaml') + with open(behaviors_yaml) as fin: + conf = yaml.load(fin) + _behaviors = conf['behaviors'] + + for behavior in _behaviors: + if 'behavior_js' in behavior: + behavior_js = os.path.join( + os.path.dirname(__file__), 'behaviors.d', + behavior['behavior_js']) + with open(behavior_js, encoding='utf-8') as fin: + behavior['script'] = fin.read() + elif 'behavior_js_template' in behavior: + behavior_js_template = os.path.join( + os.path.dirname(__file__), 'behaviors.d', + behavior['behavior_js_template']) + with open(behavior_js_template, encoding='utf-8') as fin: + behavior['template'] = string.Template(fin.read()) + + return _behaviors + +def behavior_script(url, template_parameters=None): + ''' + Returns the javascript behavior string populated with template_parameters. + ''' + import re, logging + for behavior in behaviors(): + if re.match(behavior['url_regex'], url): + if 'behavior_js' in behavior: + logging.info( + 'using behavior %s for %s', + behavior['behavior_js'], url) + elif 'behavior_js_template' in behavior: + parameters = dict() + if 'default_parameters' in behavior: + parameters.update(behavior['default_parameters']) + if template_parameters: + parameters.update(template_parameters) + javascript = behavior['template'].safe_substitute(parameters) + + logging.info( + 'using template=%s populated with parameters=%s for %s', + repr(behavior['behavior_js_template']), parameters, url) + + return behavior['script'] + + return None + from brozzler.site import Page, Site from brozzler.worker import BrozzlerWorker from brozzler.robots import is_permitted_by_robots diff --git a/brozzler/behaviors.py b/brozzler/behaviors.py deleted file mode 100644 index 878895f..0000000 --- a/brozzler/behaviors.py +++ /dev/null @@ -1,137 +0,0 @@ -''' -brozzler/behaviors.py - manages behaviors, which are javascript scripts that -run in brozzled web pages - -Copyright (C) 2014-2016 Internet Archive - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -import json -import itertools -import os -import re -import logging -import time -import sys -import yaml -import string - -__all__ = ["Behavior"] - -class Behavior: - logger = logging.getLogger(__module__ + "." + __qualname__) - - _behaviors = None - - @staticmethod - def behaviors(): - if Behavior._behaviors is None: - behaviors_yaml = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.yaml']) - with open(behaviors_yaml) as fin: - conf = yaml.load(fin) - Behavior._behaviors = conf['behaviors'] - - for behavior in Behavior._behaviors: - if "behavior_js" in behavior: - behavior_js = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ["behaviors.d"] + [behavior["behavior_js"]]) - with open(behavior_js, encoding="utf-8") as fin: - behavior["script"] = fin.read() - elif "behavior_js_template" in behavior: - behavior_js_template = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ["behaviors.d"] + [behavior["behavior_js_template"]]) - with open(behavior_js_template, encoding="utf-8") as fin: - behavior["template"] = string.Template(fin.read()) - - return Behavior._behaviors - - def __init__(self, url, umbra_worker): - self.url = url - self.umbra_worker = umbra_worker - self.script_finished = False - self.waiting_result_msg_ids = [] - self.active_behavior = None - self.last_activity = time.time() - - def start(self, template_parameters=None): - for behavior in Behavior.behaviors(): - if re.match(behavior['url_regex'], self.url): - if "behavior_js" in behavior: - self.logger.info("using %s behavior for %s", - behavior["behavior_js"], self.url) - elif "behavior_js_template" in behavior: - parameters = dict() - if "default_parameters" in behavior: - parameters.update(behavior["default_parameters"]) - if template_parameters: - parameters.update(template_parameters) - behavior["script"] = behavior["template"].safe_substitute(parameters) - - self.logger.info( - "using template=%s populated with parameters=%s for %s", - repr(behavior["behavior_js_template"]), - parameters, self.url) - - self.active_behavior = behavior - self.umbra_worker.send_to_chrome( - method="Runtime.evaluate", suppress_logging=True, - params={"expression": behavior["script"]}) - self.notify_of_activity() - return - - self.logger.warn("no behavior to run on {}".format(self.url)) - - def is_finished(self): - """Asynchronously asks behavior if it is finished, and in the mean time - returns the response from the previous such query.""" - msg_id = self.umbra_worker.send_to_chrome(method="Runtime.evaluate", - suppress_logging=True, params={"expression":"umbraBehaviorFinished()"}) - self.waiting_result_msg_ids.append(msg_id) - - request_idle_timeout_sec = 30 - if self.active_behavior and 'request_idle_timeout_sec' in self.active_behavior: - request_idle_timeout_sec = self.active_behavior['request_idle_timeout_sec'] - idle_time = time.time() - self.last_activity - - return self.script_finished and idle_time > request_idle_timeout_sec - - def is_waiting_on_result(self, msg_id): - return msg_id in self.waiting_result_msg_ids - - def notify_of_result(self, chrome_message): - # {'id': 59, 'result': {'result': {'type': 'boolean', 'value': True}, 'wasThrown': False}} - # {'id': 59, 'result': {'result': {'type': 'boolean', 'value': False}} - self.waiting_result_msg_ids.remove(chrome_message['id']) - if ('result' in chrome_message and not ( - 'wasThrown' in chrome_message['result'] - and chrome_message['result']['wasThrown']) - and 'result' in chrome_message['result'] - and type(chrome_message['result']['result']['value']) == bool): - self.script_finished = chrome_message['result']['result']['value'] - else: - # this happens if the behavior script doesn't define - # umbraBehaviorFinished, and I think it can also happen normally - # after the behavior has been sent to the browser but before - # the browser has it fully loaded... in any case the message - # was overwhelming the logs, so I'm bumping it down to debug level - self.logger.debug( - "chrome message doesn't look like a boolean result! %s", - chrome_message) - - def notify_of_activity(self): - self.last_activity = time.time() - -if __name__ == "__main__": - logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - logger = logging.getLogger('umbra.behaviors') - logger.info("custom behaviors: {}".format(Behavior.behaviors())) diff --git a/brozzler/browser.py b/brozzler/browser.py index f545f5f..7052aaf 100644 --- a/brozzler/browser.py +++ b/brozzler/browser.py @@ -17,45 +17,61 @@ limitations under the License. ''' import logging -import json +import time +import brozzler import itertools +import json import websocket import time import threading -import os -import random import brozzler -from brozzler.chrome import Chrome -from brozzler.behaviors import Behavior from requests.structures import CaseInsensitiveDict -import base64 -import sqlite3 import datetime +import base64 +from brozzler.chrome import Chrome -__all__ = ["BrowserPool", "Browser"] +class BrowsingException(Exception): + pass + +class NoBrowsersAvailable(Exception): + pass + +class BrowsingTimeout(BrowsingException): + pass class BrowserPool: - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = logging.getLogger(__module__ + '.' + __qualname__) BASE_PORT = 9200 def __init__(self, size=3, **kwargs): - """kwargs are passed on to Browser.__init__""" + ''' + Initializes the pool. + + Args: + size: size of pool (default 3) + **kwargs: arguments for Browser(...) + ''' self.size = size self._available = set() self._in_use = set() for i in range(0, size): - browser = Browser(BrowserPool.BASE_PORT + i, **kwargs) + browser = Browser(port=BrowserPool.BASE_PORT + i, **kwargs) self._available.add(browser) self._lock = threading.Lock() def acquire(self): - """ - Returns browser from pool if available, raises NoBrowsersAvailable - otherwise. - """ + ''' + Returns an available instance. + + Returns: + browser from pool, if available + + Raises: + NoBrowsersAvailable if none available + ''' with self._lock: try: browser = self._available.pop() @@ -79,46 +95,22 @@ class BrowserPool: def num_in_use(self): return len(self._in_use) -class NoBrowsersAvailable(Exception): - pass - -class BrowsingException(Exception): - pass - -class BrowsingAborted(BrowsingException): - pass - -class ResultMessageTimeout(BrowsingException): - pass - class Browser: - """ - Runs chrome/chromium to synchronously browse one page at a time using - worker.browse_page(). Should not be accessed from multiple threads. - """ + ''' + Manages an instance of Chrome for browsing pages. + ''' + logger = logging.getLogger(__module__ + '.' + __qualname__) - logger = logging.getLogger(__module__ + "." + __qualname__) + def __init__(self, **kwargs): + ''' + Initializes the Browser. - HARD_TIMEOUT_SECONDS = 20 * 60 - - def __init__( - self, chrome_port=9222, chrome_exe='chromium-browser', proxy=None, - ignore_cert_errors=False): - self.command_id = itertools.count(1) - self.chrome_port = chrome_port - self.chrome_exe = chrome_exe - self.proxy = proxy - self.ignore_cert_errors = ignore_cert_errors - self._behavior = None - self._websock = None - self._abort_browse_page = False - self._chrome_instance = None - self._aw_snap_hes_dead_jim = None - self._work_dir = None - self._websocket_url = None - - def __repr__(self): - return "{}.{}:{}".format(Browser.__module__, Browser.__qualname__, self.chrome_port) + Args: + **kwargs: arguments for Chrome(...) + ''' + self.chrome = Chrome(**kwargs) + self.websocket_url = None + self.is_browsing = False def __enter__(self): self.start() @@ -127,115 +119,274 @@ class Browser: def __exit__(self, *args): self.stop() - def start(self, proxy=None, cookie_db=None): - if not self._chrome_instance: - self._chrome_instance = Chrome( - port=self.chrome_port, executable=self.chrome_exe, - ignore_cert_errors=self.ignore_cert_errors, - proxy=proxy or self.proxy, cookie_db=None) - try: - self._websocket_url = self._chrome_instance.start() - except: - self._chrome_instance = None - raise + def start(self, **kwargs): + ''' + Starts chrome if it's not running. + + Args: + **kwargs: arguments for self.chrome.start(...) + ''' + if not self.is_running(): + self.websocket_url = self.chrome.start(**kwargs) + self._browser_controller = BrowserController(self.websocket_url) + self._browser_controller.start() def stop(self): + ''' + Stops chrome if it's running. + ''' try: if self.is_running(): - self._chrome_instance.stop() - self._chrome_instance = None - self._websocket_url = None + self.chrome.stop() + self.websocket_url = None except: - self.logger.error("problem stopping", exc_info=True) + self.logger.error('problem stopping', exc_info=True) def is_running(self): - return bool(self._websocket_url) - - def abort_browse_page(self): - self._abort_browse_page = True - - def persist_and_read_cookie_db(self): - if self._chrome_instance: - return self._chrome_instance.persist_and_read_cookie_db() - else: - return None + return self.websocket_url is not None def browse_page( - self, url, extra_headers=None, behavior_parameters=None, - user_agent=None, - on_request=None, on_response=None, on_screenshot=None, - on_url_change=None): - """ - Synchronously loads a page, runs behaviors, and takes a screenshot. + self, page_url, ignore_cert_errors=False, extra_headers=None, + user_agent=None, behavior_parameters=None, + on_request=None, on_response=None, on_screenshot=None): + ''' + Browses page in browser. - Raises BrowsingException if browsing the page fails in a non-critical - way. + Browser should already be running, i.e. start() should have been + called. Opens the page_url in the browser, runs behaviors, takes a + screenshot, extracts outlinks. - Returns extracted outlinks. - """ + Args: + page_url: url of the page to browse + extra_headers: dict of extra http headers to configure the browser + to send with every request (default None) + user_agent: user agent string, replaces browser default if + supplied (default None) + behavior_parameters: dict of parameters for populating the + javascript behavior template (default None) + on_request: callback to invoke on every Network.requestWillBeSent + event, takes one argument, the json-decoded message (default + None) + on_response: callback to invoke on every Network.responseReceived + event, takes one argument, the json-decoded message (default + None) + on_screenshot: callback to invoke when screenshot is obtained, + takes one argument, the the raw jpeg bytes (default None) + # XXX takes two arguments, the url of the page at the time the + # screenshot was taken, and the raw jpeg bytes (default None) + + Returns: + A tuple (final_page_url, outlinks). + final_page_url: the url in the location bar at the end of the + browse_page cycle, which could be different from the original + page url if the page redirects, javascript has changed the url + in the location bar, etc + outlinks: a list of navigational links extracted from the page + + Raises: + BrowsingException: if browsing the page fails + ''' if not self.is_running(): - raise BrowsingException("browser has not been started") - self.url = url - self.extra_headers = extra_headers - self.user_agent = user_agent - self.on_request = on_request - self.on_screenshot = on_screenshot - self.on_url_change = on_url_change - self.on_response = on_response - self.behavior_parameters = behavior_parameters + raise BrowsingException('browser has not been started') + if self.is_browsing: + raise BrowsingException('browser is already busy browsing a page') + self.is_browsing = True + self._browser_controller.navigate_to_page(page_url, timeout=300) + ## if login_credentials: + ## self._browser_controller.try_login(login_credentials) (5 min?) + behavior_script = brozzler.behavior_script( + page_url, behavior_parameters) + self._browser_controller.run_behavior(behavior_script, timeout=900) + if on_screenshot: + self._browser_controller.scroll_to_top() + jpeg_bytes = self._browser_controller.screenshot() + on_screenshot(jpeg_bytes) + outlinks = self._browser_controller.extract_outlinks() + ## for each hashtag not already visited: + ## navigate_to_hashtag (nothing to wait for so no timeout?) + ## if on_screenshot; + ## take screenshot (30 sec) + ## run behavior (3 min) + ## outlinks += retrieve_outlinks (60 sec) + final_page_url = self._browser_controller.url() - self._outlinks = None - self._reached_limit = None - self._aw_snap_hes_dead_jim = None - self._abort_browse_page = False - self._has_screenshot = False - self._waiting_on_result_messages = {} - self._result_message_timeout = None - - self._websock = websocket.WebSocketApp( - self._websocket_url, on_open=self._visit_page, - on_message=self._wrap_handle_message) - - thread_name = "WebsockThread:{}-{:%Y%m%d%H%M%S}".format( - self.chrome_port, datetime.datetime.utcnow()) - websock_thread = threading.Thread( - target=self._websock.run_forever, name=thread_name, - kwargs={'ping_timeout':0.5}) - websock_thread.start() - self._start = time.time() - aborted = False + self.is_browsing = False + return final_page_url, outlinks +class Counter: + def __init__(self): + self.next_value = 0 + def __next__(self): try: - while True: - time.sleep(0.5) - if self._browse_interval_func(): - return self._outlinks + return self.next_value finally: + self.next_value += 1 + def peek_next(self): + return self.next_value + +class BrowserController: + ''' + ''' + + logger = logging.getLogger(__module__ + '.' + __qualname__) + + def __init__(self, websocket_url): + self.websocket_url = websocket_url + self._command_id = Counter() + self._websock_thread = None + self._websock_open = None + self._got_page_load_event = None + self._result_messages = {} + + def _wait_for(self, callback, timeout=None): + ''' + Spins until callback() returns truthy. + ''' + start = time.time() + while True: + if callback(): + return + elapsed = time.time() - start + if timeout and elapsed > timeout: + raise BrowsingTimeout( + 'timed out after %.1fs waiting for: %s' % ( + elapsed, callback)) + time.sleep(0.5) + + def __enter__(self): + self.start() + return self + + def __exit__(self, *args): + self.stop() + + def start(self): + if not self._websock_thread: + def on_open(websock): + self._websock_open = datetime.datetime.utcnow() + + # open websocket, start thread that receives messages + self._websock = websocket.WebSocketApp( + self.websocket_url, on_open=on_open, + on_message=self._on_message) + thread_name = 'WebsockThread:{}-{:%Y%m%d%H%M%S}'.format( + self.websocket_url, datetime.datetime.utcnow()) + self._websock_thread = threading.Thread( + target=self._websock.run_forever, name=thread_name, + kwargs={'ping_timeout': 0.5}) + self._websock_thread.start() + self._wait_for(lambda: self._websock_open, timeout=10) + + # tell browser to send messages we're interested in + self.send_to_chrome(method='Network.enable') + self.send_to_chrome(method='Page.enable') + self.send_to_chrome(method='Console.enable') + self.send_to_chrome(method='Debugger.enable') + self.send_to_chrome(method='Runtime.enable') + + # disable google analytics, see _handle_message() where breakpoint + # is caught Debugger.paused + self.send_to_chrome( + method='Debugger.setBreakpointByUrl', + params={ + 'lineNumber': 1, + 'urlRegex': 'https?://www.google-analytics.com/analytics.js'}) + + def stop(self, *args): + if self._websock_thread: if (self._websock and self._websock.sock and self._websock.sock.connected): try: self._websock.close() except BaseException as e: self.logger.error( - "exception closing websocket %s - %s" % ( - self._websock, e)) + 'exception closing websocket %s - %s', + self._websock, e) - websock_thread.join(timeout=30) - if websock_thread.is_alive(): + self._websock_thread.join(timeout=30) + if self._websock_thread.is_alive(): self.logger.error( - "%s still alive 30 seconds after closing %s, will " - "forcefully nudge it again" % ( - websock_thread, self._websock)) + '%s still alive 30 seconds after closing %s, will ' + 'forcefully nudge it again', self._websock_thread, + self._websock) self._websock.keep_running = False - websock_thread.join(timeout=30) - if websock_thread.is_alive(): + self._websock_thread.join(timeout=30) + if self._websock_thread.is_alive(): self.logger.critical( - "%s still alive 60 seconds after closing %s" % ( - websock_thread, self._websock)) + '%s still alive 60 seconds after closing %s', + self._websock_thread, self._websock) - self._behavior = None + def _on_message(self, websock, message): + try: + self._handle_message(websock, message) + except: + self.logger.error( + 'uncaught exception in _handle_message message=%s', + message, exc_info=True) + self.abort = True - OUTLINKS_JS = r""" + def _handle_message(self, websock, json_message): + message = json.loads(json_message) + if 'method' in message: + if message['method'] == 'Page.loadEventFired': + self._got_page_load_event = datetime.datetime.utcnow() + elif message["method"] == "Debugger.paused": + self._debugger_paused(message) + elif 'result' in message: + if message['id'] in self._result_messages: + self._result_messages[message['id']] = message + + def _debugger_paused(self, message): + # we hit the breakpoint set in start(), get rid of google analytics + self.logger.debug('debugger paused! message=%s', message) + scriptId = message['params']['callFrames'][0]['location']['scriptId'] + + # replace script + self.send_to_chrome( + method='Debugger.setScriptSource', + params={'scriptId': scriptId, + 'scriptSource': 'console.log("google analytics is no more!");'}) + + # resume execution + self.send_to_chrome(method='Debugger.resume') + + def send_to_chrome(self, suppress_logging=False, **kwargs): + msg_id = next(self._command_id) + kwargs['id'] = msg_id + msg = json.dumps(kwargs) + if not suppress_logging: + self.logger.debug('sending message to %s: %s', self._websock, msg) + self._websock.send(msg) + return msg_id + + def navigate_to_page( + self, page_url, extra_headers=None, user_agent=None, timeout=300): + ''' + ''' + # navigate to about:blank here to avoid situation where we navigate to + # the same page that we're currently on, perhaps with a different + # #fragment, which prevents Page.loadEventFired from happening + self.send_to_chrome( + method='Page.navigate', params={'url': 'about:blank'}) + + headers = extra_headers or {} + headers['Accept-Encoding'] = 'identity' + self.send_to_chrome( + method='Network.setExtraHTTPHeaders', + params={'headers': headers}) + + if user_agent: + self.send_to_chrome( + method='Network.setUserAgentOverride', + params={'userAgent': user_agent}) + + # navigate to the page! + self.logger.info('navigating to page %s', page_url) + self.send_to_chrome(method='Page.navigate', params={'url': page_url}) + + self._wait_for(lambda:self._got_page_load_event, timeout=timeout) + + OUTLINKS_JS = r''' var __brzl_framesDone = new Set(); var __brzl_compileOutlinks = function(frame) { __brzl_framesDone.add(frame); @@ -244,265 +395,94 @@ var __brzl_compileOutlinks = function(frame) { frame.document.querySelectorAll('a[href]')); for (var i = 0; i < frame.frames.length; i++) { if (frame.frames[i] && !__brzl_framesDone.has(frame.frames[i])) { - outlinks = outlinks.concat(__brzl_compileOutlinks(frame.frames[i])); + outlinks = outlinks.concat( + __brzl_compileOutlinks(frame.frames[i])); } } } return outlinks; } __brzl_compileOutlinks(window).join('\n'); -""" - - def _chain_chrome_messages(self, chain): - """ - Sends a series of messages to chrome/chromium on the debugging protocol - websocket. Waits for a reply from each one before sending the next. - Enforces a timeout waiting for each reply. If the timeout is hit, sets - self._result_message_timeout with a ResultMessageTimeout (an exception - class). Takes an array of dicts, each of which should look like this: - - { - "info": "human readable description", - "chrome_msg": { ... }, # message to send to chrome, as a dict - "timeout": 30, # timeout in seconds - "callback": my_callback, # takes one arg, the result message - } - - The code is rather convoluted because of the asynchronous nature of the - whole thing. See how it's used in _start_postbehavior_chain. - """ - timer = None - - def callback(message): - if timer: - timer.cancel() - if "callback" in chain[0]: - chain[0]["callback"](message) - self._chain_chrome_messages(chain[1:]) - - def timeout(): - self._result_message_timeout = ResultMessageTimeout( - "timed out after %.1fs waiting for result message " - "for %s", chain[0]["timeout"], chain[0]["chrome_msg"]) - - if chain: - msg_id = self.send_to_chrome(**chain[0]["chrome_msg"]) - self._waiting_on_result_messages[msg_id] = callback - self.logger.info( - "msg_id=%s for message %s", msg_id, chain[0]["chrome_msg"]) - timer = threading.Timer(chain[0]["timeout"], timeout) - timer.daemon = True - timer.start() - else: - self.logger.info("finished chrome message chain") - - def _start_postbehavior_chain(self): - if self.on_screenshot: - chain = [{ - "info": "scrolling to top", - "chrome_msg": { - "method": "Runtime.evaluate", - "params": {"expression": "window.scrollTo(0, 0);"}, - }, - "timeout": 30, - "callback": lambda message: None, - }, { - "info": "requesting screenshot", - "chrome_msg": {"method": "Page.captureScreenshot"}, - "timeout": 30, - "callback": lambda message: ( - self.on_screenshot and self.on_screenshot( - base64.b64decode(message["result"]["data"]))), - }] - else: - chain = [] - - def set_outlinks(message): - if message["result"]["result"]["value"]: - self._outlinks = frozenset( - message["result"]["result"]["value"].split("\n")) - else: - self._outlinks = frozenset() - - chain.append({ - "info": "retrieving outlinks", - "chrome_msg": { - "method": "Runtime.evaluate", - "params": {"expression": self.OUTLINKS_JS}, - }, - "timeout": 60, - "callback": set_outlinks, - }) - - self._chain_chrome_messages(chain) - - def _browse_interval_func(self): - """Called periodically while page is being browsed. Returns True when - finished browsing.""" - if (not self._websock or not self._websock.sock - or not self._websock.sock.connected): - raise BrowsingException( - "websocket closed, did chrome die? {}".format( - self._websocket_url)) - elif self._result_message_timeout: - raise self._result_message_timeout - elif self._aw_snap_hes_dead_jim: - raise BrowsingException( - """chrome tab went "aw snap" or "he's dead jim"!""") - elif self._outlinks is not None: - # setting self._outlinks is the last thing that happens in the - # post-behavior chain - return True - elif (self._behavior != None and self._behavior.is_finished() - or time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS): - if self._behavior and self._behavior.is_finished(): - self.logger.info( - "behavior decided it's finished with %s", self.url) - else: - self.logger.info( - "reached hard timeout of %s seconds url=%s", - Browser.HARD_TIMEOUT_SECONDS, self.url) - self._behavior = None - self._start_postbehavior_chain() - return False - elif self._reached_limit: - raise self._reached_limit - elif self._abort_browse_page: - raise BrowsingAborted("browsing page aborted") - else: - return False - - def send_to_chrome(self, suppress_logging=False, **kwargs): - msg_id = next(self.command_id) - kwargs["id"] = msg_id - msg = json.dumps(kwargs) - if not suppress_logging: - self.logger.debug("sending message to %s: %s", self._websock, msg) - self._websock.send(msg) - return msg_id - - def _visit_page(self, websock): - # navigate to about:blank here to avoid situation where we navigate to - # the same page that we're currently on, perhaps with a different - # #fragment, which prevents Page.loadEventFired from happening - self.send_to_chrome(method="Page.navigate", params={"url": "about:blank"}) - - self.send_to_chrome(method="Network.enable") - self.send_to_chrome(method="Page.enable") - self.send_to_chrome(method="Console.enable") - self.send_to_chrome(method="Debugger.enable") - self.send_to_chrome(method="Runtime.enable") - - headers = self.extra_headers or {} - headers['Accept-Encoding'] = 'identity' - self.send_to_chrome( - method="Network.setExtraHTTPHeaders", - params={"headers":headers}) - - if self.user_agent: - self.send_to_chrome(method="Network.setUserAgentOverride", params={"userAgent": self.user_agent}) - - # disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused" - self.send_to_chrome(method="Debugger.setBreakpointByUrl", params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"}) - - # navigate to the page! - self.send_to_chrome(method="Page.navigate", params={"url": self.url}) - - def _wrap_handle_message(self, websock, message): - try: - self._handle_message(websock, message) - except: - self.logger.error( - "uncaught exception in _handle_message message=%s", - message, exc_info=True) - self.abort_browse_page() - - def _network_request_will_be_sent(self, message): - if self._behavior: - self._behavior.notify_of_activity() - if message["params"]["request"]["url"].lower().startswith("data:"): - self.logger.debug("ignoring data url {}".format(message["params"]["request"]["url"][:80])) - elif self.on_request: - self.on_request(message) - - def _network_response_received(self, message): - if (not self._reached_limit - and message["params"]["response"]["status"] == 420 - and "Warcprox-Meta" in CaseInsensitiveDict( - message["params"]["response"]["headers"])): - warcprox_meta = json.loads(CaseInsensitiveDict( - message["params"]["response"]["headers"])["Warcprox-Meta"]) - self._reached_limit = brozzler.ReachedLimit( - warcprox_meta=warcprox_meta) - self.logger.info("reached limit %s", self._reached_limit) - if self.on_response: - self.on_response(message) - - def _page_load_event_fired(self, message): - def page_url_after_load_event(message): - if message["result"]["result"]["value"] != self.url: - if self.on_url_change: - self.on_url_change(message["result"]["result"]["value"]) +''' + def extract_outlinks(self, timeout=60): + self.logger.info('extracting outlinks') + self._result_messages[self._command_id.peek_next()] = None msg_id = self.send_to_chrome( - method="Runtime.evaluate", - params={"expression":"document.URL"}) - self._waiting_on_result_messages[msg_id] = page_url_after_load_event + method='Runtime.evaluate', + params={'expression': self.OUTLINKS_JS}) + self._wait_for( + lambda: self._result_messages.get(msg_id), timeout=timeout) + message = self._result_messages.pop(msg_id) + if message['result']['result']['value']: + return frozenset(message['result']['result']['value'].split('\n')) + else: + self._outlinks = frozenset() - self.logger.info("Page.loadEventFired, moving on to starting behaviors url={}".format(self.url)) - self._behavior = Behavior(self.url, self) - self._behavior.start(self.behavior_parameters) + def screenshot(self, timeout=30): + self.logger.info('taking screenshot') + self._result_messages[self._command_id.peek_next()] = None + msg_id = self.send_to_chrome(method='Page.captureScreenshot') + self._wait_for( + lambda: self._result_messages.get(msg_id), timeout=timeout) + message = self._result_messages.pop(msg_id) + jpeg_bytes = base64.b64decode(message['result']['data']) + return jpeg_bytes - def _console_message_added(self, message): - self.logger.debug("%s console.%s %s", self._websock.url, - message["params"]["message"]["level"], - message["params"]["message"]["text"]) + def scroll_to_top(self, timeout=30): + self.logger.info('scrolling to top') + self._result_messages[self._command_id.peek_next()] = None + msg_id = self.send_to_chrome( + method='Runtime.evaluate', + params={'expression': 'window.scrollTo(0, 0);'}) + self._wait_for( + lambda: self._result_messages.get(msg_id), timeout=timeout) + self._result_messages.pop(msg_id) - def _debugger_paused(self, message): - # We hit the breakpoint set in visit_page. Get rid of google - # analytics script! - self.logger.debug("debugger paused! message={}".format(message)) - scriptId = message['params']['callFrames'][0]['location']['scriptId'] + def url(self, timeout=30): + ''' + Returns value of document.URL from the browser. + ''' + self._result_messages[self._command_id.peek_next()] = None + msg_id = self.send_to_chrome( + method='Runtime.evaluate', + params={'expression': 'document.URL'}) + self._wait_for( + lambda: self._result_messages.get(msg_id), timeout=timeout) + message = self._result_messages.pop(msg_id) + return message['result']['result']['value'] - # replace script - self.send_to_chrome(method="Debugger.setScriptSource", params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"}) + def run_behavior(self, behavior_script, timeout=900): + self.send_to_chrome( + method='Runtime.evaluate', suppress_logging=True, + params={'expression': behavior_script}) - # resume execution - self.send_to_chrome(method="Debugger.resume") + start = time.time() + while True: + elapsed = time.time() - start + if elapsed > timeout: + logging.info( + 'behavior reached hard timeout after %.1fs', elapsed) + return + + time.sleep(7) + + self._result_messages[self._command_id.peek_next()] = None + msg_id = self.send_to_chrome( + method='Runtime.evaluate', suppress_logging=True, + params={'expression': 'umbraBehaviorFinished()'}) + try: + self._wait_for( + lambda: self._result_messages.get(msg_id), timeout=5) + msg = self._result_messages.get(msg_id) + if (msg and 'result' in msg + and not ('wasThrown' in msg['result'] + and msg['result']['wasThrown']) + and 'result' in msg['result'] + and type(msg['result']['result']['value']) == bool + and msg['result']['result']['value']): + self.logger.info('behavior decided it has finished') + return + except BrowsingTimeout: + pass - def _handle_message(self, websock, json_message): - message = json.loads(json_message) - if "method" in message: - if message["method"] == "Network.requestWillBeSent": - self._network_request_will_be_sent(message) - elif message["method"] == "Network.responseReceived": - self._network_response_received(message) - elif message["method"] == "Page.loadEventFired": - self._page_load_event_fired(message) - elif message["method"] == "Console.messageAdded": - self._console_message_added(message) - elif message["method"] == "Debugger.paused": - self._debugger_paused(message) - elif message["method"] == "Inspector.targetCrashed": - self._aw_snap_hes_dead_jim = message - # elif message["method"] in ( - # "Network.dataReceived", "Network.responseReceived", - # "Network.loadingFinished"): - # pass - # else: - # self.logger.debug("%s %s", message["method"], json_message) - elif "result" in message: - if message["id"] in self._waiting_on_result_messages: - callback = self._waiting_on_result_messages[message["id"]] - del self._waiting_on_result_messages[message["id"]] - self.logger.debug( - "received result for message id=%s, calling %s", - message["id"], callback) - callback(message) - elif self._behavior and self._behavior.is_waiting_on_result( - message["id"]): - self._behavior.notify_of_result(message) - # else: - # self.logger.debug("%s", json_message) - # else: - # self.logger.debug("%s", json_message) diff --git a/brozzler/chrome.py b/brozzler/chrome.py index 5208442..fbd5c2f 100644 --- a/brozzler/chrome.py +++ b/brozzler/chrome.py @@ -35,10 +35,26 @@ class Chrome: logger = logging.getLogger(__module__ + '.' + __qualname__) def __init__( - self, port, executable, proxy=None, ignore_cert_errors=False, + self, chrome_exe, port=9222, ignore_cert_errors=False, proxy=None, cookie_db=None): + ''' + Initializes instance of this class. + + Doesn't start the browser, start() does that. + + Args: + chrome_exe: filesystem path to chrome/chromium executable + port: chrome debugging protocol port (default 9222) + ignore_cert_errors: configure chrome to accept all certs (default + False) + proxy: http proxy 'host:port' (default None) + cookie_db: raw bytes of chrome/chromium sqlite3 cookies database, + which, if supplied, will be written to + {chrome_user_data_dir}/Default/Cookies before running the + browser (default None) + ''' self.port = port - self.executable = executable + self.chrome_exe = chrome_exe self.proxy = proxy self.ignore_cert_errors = ignore_cert_errors self.cookie_db = cookie_db @@ -124,7 +140,7 @@ class Chrome: new_env['HOME'] = self._home_tmpdir.name self.port = self._find_available_port(self.port) chrome_args = [ - self.executable, '--use-mock-keychain', # mac thing + self.chrome_exe, '--use-mock-keychain', # mac thing '--user-data-dir=%s' % self._chrome_user_data_dir, '--remote-debugging-port=%s' % self.port, '--disable-web-sockets', '--disable-cache', diff --git a/brozzler/cli.py b/brozzler/cli.py index b022771..69a3fb4 100644 --- a/brozzler/cli.py +++ b/brozzler/cli.py @@ -159,8 +159,8 @@ def brozzle_page(): f.write(screenshot_png) logging.info('wrote screenshot to %s', filename) - browser = brozzler.Browser(chrome_exe=args.chrome_exe) - browser.start(proxy=site.proxy) + browser = brozzler.Browser(chrome_exe=args.chrome_exe, proxy=site.proxy) + browser.start() try: outlinks = worker.brozzle_page( browser, site, page, on_screenshot=on_screenshot) diff --git a/brozzler/worker.py b/brozzler/worker.py index 436c08b..9da9969 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -272,12 +272,13 @@ class BrozzlerWorker: self.logger.info('needs browsing: %s', page) if not browser.is_running(): browser.start(proxy=self._proxy(site), cookie_db=site.cookie_db) - outlinks = browser.browse_page( + final_page_url, outlinks = browser.browse_page( page.url, extra_headers=site.extra_headers(), behavior_parameters=site.behavior_parameters, user_agent=site.user_agent, - on_screenshot=_on_screenshot, - on_url_change=page.note_redirect) + on_screenshot=_on_screenshot) + if final_page_url != page.url: + page.note_redirect(final_page_url) return outlinks else: if not self._already_fetched(page, ydl_spy): @@ -336,7 +337,7 @@ class BrozzlerWorker: self._frontier.scope_and_schedule_outlinks( site, page, outlinks) if browser.is_running(): - site.cookie_db = browser.persist_and_read_cookie_db() + site.cookie_db = browser.chrome.persist_and_read_cookie_db() self._frontier.completed_page(site, page) page = None @@ -394,8 +395,8 @@ class BrozzlerWorker: try: browser = self._browser_pool.acquire() try: - site = self._frontier.claim_site("{}:{}".format( - socket.gethostname(), browser.chrome_port)) + site = self._frontier.claim_site("%s:%s" % ( + socket.gethostname(), browser.chrome.port)) self.logger.info( "brozzling site (proxy=%s) %s", repr(self._proxy(site)), site)