Merge branch 'master' into qa

* master:
  add docstring to _chain_chrome_messages, remove debug logging, tweak name of websock thread
  add a timeout to the one post-behavior step that didn't already have one (getting a screenshot), and majorly refactored the post-behavior code to incorporate timeouts automatically into each step, and hopefully make it easier to follow
  logging tweaks
  reduce log level of messages from chrome, since it spews stuff that looks bad but usually isn't
  back to a dev version number
  1.1b3 for upload to pypi
This commit is contained in:
Noah Levitt 2016-07-28 20:32:33 -05:00
commit feff7a8d05
3 changed files with 194 additions and 176 deletions

View File

@ -1,21 +1,21 @@
#
# brozzler/behaviors.py - manages behaviors, which are javascript scripts that
# run in brozzled web pages
#
# Copyright (C) 2014-2016 Internet Archive
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''
brozzler/behaviors.py - manages behaviors, which are javascript scripts that
run in brozzled web pages
Copyright (C) 2014-2016 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import json
import itertools
@ -111,13 +111,21 @@ class Behavior:
# {'id': 59, 'result': {'result': {'type': 'boolean', 'value': True}, 'wasThrown': False}}
# {'id': 59, 'result': {'result': {'type': 'boolean', 'value': False}}
self.waiting_result_msg_ids.remove(chrome_message['id'])
if ('result' in chrome_message
and not ('wasThrown' in chrome_message['result'] and chrome_message['result']['wasThrown'])
if ('result' in chrome_message and not (
'wasThrown' in chrome_message['result']
and chrome_message['result']['wasThrown'])
and 'result' in chrome_message['result']
and type(chrome_message['result']['result']['value']) == bool):
self.script_finished = chrome_message['result']['result']['value']
else:
self.logger.error("chrome message doesn't look like a boolean result! {}".format(chrome_message))
# this happens if the behavior script doesn't define
# umbraBehaviorFinished, and I think it can also happen normally
# after the behavior has been sent to the browser but before
# the browser has it fully loaded... in any case the message
# was overwhelming the logs, so I'm bumping it down to debug level
self.logger.debug(
"chrome message doesn't look like a boolean result! %s",
chrome_message)
def notify_of_activity(self):
self.last_activity = time.time()
@ -127,5 +135,3 @@ if __name__ == "__main__":
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logger = logging.getLogger('umbra.behaviors')
logger.info("custom behaviors: {}".format(Behavior.behaviors()))

View File

@ -36,8 +36,8 @@ import re
import base64
import psutil
import signal
import string
import sqlite3
import datetime
__all__ = ["BrowserPool", "Browser"]
@ -95,6 +95,9 @@ class BrowsingException(Exception):
class BrowsingAborted(BrowsingException):
pass
class ResultMessageTimeout(BrowsingException):
pass
class Browser:
"""
Runs chrome/chromium to synchronously browse one page at a time using
@ -135,16 +138,20 @@ class Browser:
self.chrome_port = self._find_available_port()
self._work_dir = tempfile.TemporaryDirectory()
if cookie_db is not None:
cookie_dir = os.sep.join([self._work_dir.name, "chrome-user-data","Default"])
cookie_location = os.sep.join([cookie_dir,"Cookies"])
self.logger.debug("Cookie DB provided. Writing to: %s", cookie_location)
cookie_dir = os.path.join(
self._work_dir.name, "chrome-user-data", "Default")
cookie_location = os.path.join(cookie_dir, "Cookies")
self.logger.debug(
"cookie DB provided, writing to %s", cookie_location)
os.makedirs(cookie_dir, exist_ok=True)
try:
with open(cookie_location,'wb') as cookie_file:
with open(cookie_location, 'wb') as cookie_file:
cookie_file.write(cookie_db)
except OSError:
self.logger.error("exception writing cookie file at: %s", cookie_location, exc_info=True)
self.logger.error(
"exception writing cookie file at %s",
cookie_location, exc_info=True)
self._chrome_instance = Chrome(
port=self.chrome_port, executable=self.chrome_exe,
@ -175,8 +182,11 @@ class Browser:
self.logger.error("problem stopping", exc_info=True)
def persist_and_read_cookie_db(self):
cookie_location = os.sep.join([self._work_dir.name, "chrome-user-data","Default","Cookies"])
self.logger.debug("Saving Cookie DB from: %s", cookie_location)
cookie_location = os.path.join(
self._work_dir.name, "chrome-user-data", "Default", "Cookies")
self.logger.debug(
"marking cookies persistent then reading file into memory: %s ",
cookie_location)
try:
with sqlite3.connect(cookie_location) as conn:
cur = conn.cursor()
@ -187,9 +197,11 @@ class Browser:
cookie_db=None
try:
with open(cookie_location, "rb") as cookie_file:
cookie_db=cookie_file.read()
cookie_db = cookie_file.read()
except OSError:
self.logger.error("exception reading from cookie DB file at: %s", cookie_location, exc_info=True)
self.logger.error(
"exception reading from cookie DB file %s",
cookie_location, exc_info=True)
return cookie_db
def _find_available_port(self):
@ -219,7 +231,8 @@ class Browser:
self, url, extra_headers=None, behavior_parameters=None,
on_request=None, on_response=None, on_screenshot=None,
on_url_change=None):
"""Synchronously loads a page, takes a screenshot, and runs behaviors.
"""
Synchronously loads a page, takes a screenshot, and runs behaviors.
Raises BrowsingException if browsing the page fails in a non-critical
way.
@ -236,25 +249,20 @@ class Browser:
self.on_response = on_response
self.behavior_parameters = behavior_parameters
self._waiting_on_scroll_to_top_msg_id = None
self._waiting_on_scroll_to_top_start = None
self._waiting_on_screenshot_msg_id = None
self._waiting_on_document_url_msg_id = None
self._waiting_on_outlinks_msg_id = None
self._waiting_on_outlinks_start = None
self._waiting_on_outlinks_attempt = 0
self._outlinks = None
self._reached_limit = None
self._aw_snap_hes_dead_jim = None
self._abort_browse_page = False
self._has_screenshot = False
self._waiting_on_result_messages = {}
self._result_message_timeout = None
self._websock = websocket.WebSocketApp(
self._websocket_url, on_open=self._visit_page,
on_message=self._wrap_handle_message)
threadName = "WebsockThread:%s-%s" % (self.chrome_port, ''.join(
random.choice(string.ascii_letters) for _ in range(6)))
threadName = "WebsockThread:{}-{:%Y%m%d%H%M%S}".format(
self.chrome_port, datetime.datetime.utcnow())
websock_thread = threading.Thread(
target=self._websock.run_forever, name=threadName,
kwargs={'ping_timeout':0.5})
@ -266,11 +274,6 @@ class Browser:
while True:
time.sleep(0.5)
if self._browse_interval_func():
break
while True:
time.sleep(0.5)
if self._post_behavior_interval_func():
return self._outlinks
finally:
if (self._websock and self._websock.sock
@ -297,85 +300,6 @@ class Browser:
self._behavior = None
def _post_behavior_interval_func(self):
"""
Called periodically after behavior is finished on the page. Returns
true when post-behavior tasks are finished.
"""
if (not self._websock or not self._websock.sock
or not self._websock.sock.connected):
raise BrowsingException(
"websocket closed, did chrome die? {}".format(
self._websocket_url))
if (not self._has_screenshot
and not self._waiting_on_scroll_to_top_msg_id
and not self._waiting_on_screenshot_msg_id):
if time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS:
self.logger.info(
"reached hard timeout of {} seconds url={}".format(
Browser.HARD_TIMEOUT_SECONDS, self.url))
else:
self.logger.info(
"behavior decided it's finished with %s", self.url)
self.logger.info(
"scrolling to the top, then requesting screenshot %s",
self.url)
self._waiting_on_scroll_to_top_msg_id = self.send_to_chrome(
method="Runtime.evaluate",
params={"expression":"window.scrollTo(0, 0);"})
self._waiting_on_scroll_to_top_start = time.time()
return False
elif (self._waiting_on_scroll_to_top_msg_id
and time.time() - self._waiting_on_scroll_to_top_start > 30.0):
# chromium bug? occasionally we get no scroll-to-top result message
self.logger.warn(
"timed out after %.1fs waiting for scroll-to-top result "
"message, requesting screenshot now",
time.time() - self._waiting_on_scroll_to_top_start)
self._waiting_on_scroll_to_top_msg_id = None
self._waiting_on_scroll_to_top_start = None
self._waiting_on_screenshot_msg_id = self.send_to_chrome(
method="Page.captureScreenshot")
elif not self._has_screenshot and (
self._waiting_on_scroll_to_top_msg_id
or self._waiting_on_screenshot_msg_id):
return False
if self._outlinks is not None:
self.logger.info("got outlinks, finished browsing %s", self.url)
return True
elif not self._waiting_on_outlinks_msg_id:
self.logger.info("retrieving outlinks for %s", self.url)
self._request_outlinks()
return False
else: # self._waiting_on_outlinks_msg_id
if time.time() - self._waiting_on_outlinks_start > 30.0:
if self._waiting_on_outlinks_attempt < 5:
self.logger.warn(
"timed out after %.1fs on attempt %s to retrieve "
"outlinks, trying again",
time.time() - self._waiting_on_outlinks_start,
self._waiting_on_outlinks_attempt)
self._request_outlinks()
return False
else:
raise BrowsingException(
"timed out after %.1fs on (final) attempt %s "
"to retrieve outlinks" % (
time.time() - self._waiting_on_outlinks_start,
self._waiting_on_outlinks_attempt))
else: # just waiting for outlinks
return False
def _request_outlinks(self):
self._waiting_on_outlinks_msg_id = self.send_to_chrome(
method="Runtime.evaluate",
params={"expression": self.OUTLINKS_JS})
self._waiting_on_outlinks_attempt += 1
self._waiting_on_outlinks_start = time.time()
OUTLINKS_JS = """
var compileOutlinks = function(frame) {
var outlinks = Array.prototype.slice.call(
@ -390,6 +314,88 @@ var compileOutlinks = function(frame) {
compileOutlinks(window).join(' ');
"""
def _chain_chrome_messages(self, chain):
"""
Sends a series of messages to chrome/chromium on the debugging protocol
websocket. Waits for a reply from each one before sending the next.
Enforces a timeout waiting for each reply. If the timeout is hit, sets
self._result_message_timeout with a ResultMessageTimeout (an exception
class). Takes an array of dicts, each of which should look like this:
{
"info": "human readable description",
"chrome_msg": { ... }, # message to send to chrome, as a dict
"timeout": 30, # timeout in seconds
"callback": my_callback, # takes one arg, the result message
}
The code is rather convoluted because of the asynchronous nature of the
whole thing. See how it's used in _start_postbehavior_chain.
"""
timer = None
def callback(message):
if timer:
timer.cancel()
if message["id"] in self._waiting_on_result_messages:
del self._waiting_on_result_messages[message["id"]]
if "callback" in chain[0]:
chain[0]["callback"](message)
self._chain_chrome_messages(chain[1:])
def timeout():
self._result_message_timeout = ResultMessageTimeout(
"timed out after %.1fs waiting for result message "
"for %s", chain[0]["timeout"], chain[0]["chrome_msg"])
if chain:
msg_id = self.send_to_chrome(**chain[0]["chrome_msg"])
self._waiting_on_result_messages[msg_id] = callback
self.logger.info(
"msg_id=%s for message %s", msg_id, chain[0]["chrome_msg"])
timer = threading.Timer(chain[0]["timeout"], timeout)
timer.daemon = True
timer.start()
else:
self.logger.info("finished chrome message chain")
def _start_postbehavior_chain(self):
if self.on_screenshot:
chain = [{
"info": "scrolling to top",
"chrome_msg": {
"method": "Runtime.evaluate",
"params": {"expression": "window.scrollTo(0, 0);"},
},
"timeout": 30,
"callback": lambda message: None,
}, {
"info": "requesting screenshot",
"chrome_msg": {"method": "Page.captureScreenshot"},
"timeout": 30,
"callback": lambda message: (
self.on_screenshot and self.on_screenshot(
base64.b64decode(message["result"]["data"]))),
}]
else:
chain = []
def set_outlinks(message):
self._outlinks = frozenset(
message["result"]["result"]["value"].split())
chain.append({
"info": "retrieving outlinks",
"chrome_msg": {
"method": "Runtime.evaluate",
"params": {"expression": self.OUTLINKS_JS},
},
"timeout": 60,
"callback": set_outlinks,
})
self._chain_chrome_messages(chain)
def _browse_interval_func(self):
"""Called periodically while page is being browsed. Returns True when
finished browsing."""
@ -398,12 +404,27 @@ compileOutlinks(window).join(' ');
raise BrowsingException(
"websocket closed, did chrome die? {}".format(
self._websocket_url))
elif self._result_message_timeout:
raise self._result_message_timeout
elif self._aw_snap_hes_dead_jim:
raise BrowsingException(
"""chrome tab went "aw snap" or "he's dead jim"!""")
elif self._outlinks is not None:
# setting self._outlinks is the last thing that happens in the
# post-behavior chain
return True
elif (self._behavior != None and self._behavior.is_finished()
or time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS):
return True
if self._behavior and self._behavior.is_finished():
self.logger.info(
"behavior decided it's finished with %s", self.url)
else:
self.logger.info(
"reached hard timeout of %s seconds url=%s",
Browser.HARD_TIMEOUT_SECONDS, self.url)
self._behavior = None
self._start_postbehavior_chain()
return False
elif self._reached_limit:
raise self._reached_limit
elif self._abort_browse_page:
@ -413,10 +434,10 @@ compileOutlinks(window).join(' ');
def send_to_chrome(self, suppress_logging=False, **kwargs):
msg_id = next(self.command_id)
kwargs['id'] = msg_id
kwargs["id"] = msg_id
msg = json.dumps(kwargs)
if not suppress_logging:
self.logger.debug('sending message to {}: {}'.format(self._websock, msg))
self.logger.debug("sending message to %s: %s", self._websock, msg)
self._websock.send(msg)
return msg_id
@ -445,7 +466,9 @@ compileOutlinks(window).join(' ');
try:
self._handle_message(websock, message)
except:
self.logger.error("uncaught exception in _handle_message", exc_info=True)
self.logger.error(
"uncaught exception in _handle_message message=%s",
message, exc_info=True)
self.abort_browse_page()
def _network_request_will_be_sent(self, message):
@ -493,52 +516,41 @@ compileOutlinks(window).join(' ');
# resume execution
self.send_to_chrome(method="Debugger.resume")
def _handle_result_message(self, message):
if message["id"] == self._waiting_on_screenshot_msg_id:
if self.on_screenshot:
self.on_screenshot(base64.b64decode(message["result"]["data"]))
self._waiting_on_screenshot_msg_id = None
self._has_screenshot = True
self.logger.info("got screenshot, moving on to getting outlinks")
elif message["id"] == self._waiting_on_scroll_to_top_msg_id:
self._waiting_on_scroll_to_top_msg_id = None
self._waiting_on_scroll_to_top_start = None
self._waiting_on_screenshot_msg_id = self.send_to_chrome(
method="Page.captureScreenshot")
elif message["id"] == self._waiting_on_outlinks_msg_id:
self.logger.debug("got outlinks message=%s", message)
self._outlinks = frozenset(
message["result"]["result"]["value"].split())
elif message["id"] == self._waiting_on_document_url_msg_id:
if message["result"]["result"]["value"] != self.url:
if self.on_url_change:
self.on_url_change(message["result"]["result"]["value"])
self._waiting_on_document_url_msg_id = None
elif self._behavior and self._behavior.is_waiting_on_result(message["id"]):
self._behavior.notify_of_result(message)
def _handle_message(self, websock, json_message):
message = json.loads(json_message)
if "method" in message and message["method"] == "Network.requestWillBeSent":
self._network_request_will_be_sent(message)
elif "method" in message and message["method"] == "Network.responseReceived":
self._network_response_received(message)
elif "method" in message and message["method"] == "Page.loadEventFired":
self._page_load_event_fired(message)
elif "method" in message and message["method"] == "Console.messageAdded":
self._console_message_added(message)
elif "method" in message and message["method"] == "Debugger.paused":
self._debugger_paused(message)
elif "method" in message and message["method"] == "Inspector.targetCrashed":
self._aw_snap_hes_dead_jim = message
if "method" in message:
if message["method"] == "Network.requestWillBeSent":
self._network_request_will_be_sent(message)
elif message["method"] == "Network.responseReceived":
self._network_response_received(message)
elif message["method"] == "Page.loadEventFired":
self._page_load_event_fired(message)
elif message["method"] == "Console.messageAdded":
self._console_message_added(message)
elif message["method"] == "Debugger.paused":
self._debugger_paused(message)
elif message["method"] == "Inspector.targetCrashed":
self._aw_snap_hes_dead_jim = message
# elif message["method"] in (
# "Network.dataReceived", "Network.responseReceived",
# "Network.loadingFinished"):
# pass
# else:
# self.logger.debug("%s %s", message["method"], json_message)
elif "result" in message:
self._handle_result_message(message)
# elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"):
# pass
# elif "method" in message:
# self.logger.debug("{} {}".format(message["method"], json_message))
if message["id"] in self._waiting_on_result_messages:
callback = self._waiting_on_result_messages[message["id"]]
self.logger.info(
"received result for message id=%s, calling %s",
message["id"], callback)
callback(message)
elif self._behavior and self._behavior.is_waiting_on_result(
message["id"]):
self._behavior.notify_of_result(message)
# else:
# self.logger.debug("%s", json_message)
# else:
# self.logger.debug("[no-method] {}".format(json_message))
# self.logger.debug("%s", json_message)
class Chrome:
logger = logging.getLogger(__module__ + "." + __qualname__)
@ -643,7 +655,7 @@ class Chrome:
brozzler.TRACE, "chrome pid %s STDOUT %s",
self.chrome_process.pid, buf)
else:
logging.info(
logging.debug(
"chrome pid %s STDOUT %s",
self.chrome_process.pid, buf)
@ -658,7 +670,7 @@ class Chrome:
brozzler.TRACE, "chrome pid %s STDOUT %s",
self.chrome_process.pid, buf)
else:
logging.info(
logging.debug(
"chrome pid %s STDERR %s",
self.chrome_process.pid, buf)
except:

View File

@ -21,7 +21,7 @@ import setuptools
setuptools.setup(
name='brozzler',
version='1.1b3.dev58',
version='1.1b4.dev63',
description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler',
author='Noah Levitt',