mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-05-02 14:46:18 -04:00
add a timeout to the one post-behavior step that didn't already have one (getting a screenshot), and majorly refactored the post-behavior code to incorporate timeouts automatically into each step, and hopefully make it easier to follow
This commit is contained in:
parent
b2b07b79a9
commit
2046ee36e0
2 changed files with 143 additions and 147 deletions
|
@ -95,6 +95,9 @@ class BrowsingException(Exception):
|
||||||
class BrowsingAborted(BrowsingException):
|
class BrowsingAborted(BrowsingException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class ResultMessageTimeout(BrowsingException):
|
||||||
|
pass
|
||||||
|
|
||||||
class Browser:
|
class Browser:
|
||||||
"""
|
"""
|
||||||
Runs chrome/chromium to synchronously browse one page at a time using
|
Runs chrome/chromium to synchronously browse one page at a time using
|
||||||
|
@ -135,16 +138,20 @@ class Browser:
|
||||||
self.chrome_port = self._find_available_port()
|
self.chrome_port = self._find_available_port()
|
||||||
self._work_dir = tempfile.TemporaryDirectory()
|
self._work_dir = tempfile.TemporaryDirectory()
|
||||||
if cookie_db is not None:
|
if cookie_db is not None:
|
||||||
cookie_dir = os.sep.join([self._work_dir.name, "chrome-user-data","Default"])
|
cookie_dir = os.path.join(
|
||||||
cookie_location = os.sep.join([cookie_dir,"Cookies"])
|
self._work_dir.name, "chrome-user-data", "Default")
|
||||||
self.logger.debug("Cookie DB provided. Writing to: %s", cookie_location)
|
cookie_location = os.path.join(cookie_dir, "Cookies")
|
||||||
|
self.logger.debug(
|
||||||
|
"cookie DB provided, writing to %s", cookie_location)
|
||||||
os.makedirs(cookie_dir, exist_ok=True)
|
os.makedirs(cookie_dir, exist_ok=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(cookie_location,'wb') as cookie_file:
|
with open(cookie_location, 'wb') as cookie_file:
|
||||||
cookie_file.write(cookie_db)
|
cookie_file.write(cookie_db)
|
||||||
except OSError:
|
except OSError:
|
||||||
self.logger.error("exception writing cookie file at: %s", cookie_location, exc_info=True)
|
self.logger.error(
|
||||||
|
"exception writing cookie file at %s",
|
||||||
|
cookie_location, exc_info=True)
|
||||||
|
|
||||||
self._chrome_instance = Chrome(
|
self._chrome_instance = Chrome(
|
||||||
port=self.chrome_port, executable=self.chrome_exe,
|
port=self.chrome_port, executable=self.chrome_exe,
|
||||||
|
@ -175,8 +182,11 @@ class Browser:
|
||||||
self.logger.error("problem stopping", exc_info=True)
|
self.logger.error("problem stopping", exc_info=True)
|
||||||
|
|
||||||
def persist_and_read_cookie_db(self):
|
def persist_and_read_cookie_db(self):
|
||||||
cookie_location = os.sep.join([self._work_dir.name, "chrome-user-data","Default","Cookies"])
|
cookie_location = os.path.join(
|
||||||
self.logger.debug("Saving Cookie DB from: %s", cookie_location)
|
self._work_dir.name, "chrome-user-data", "Default", "Cookies")
|
||||||
|
self.logger.debug(
|
||||||
|
"marking cookies persistent then reading file into memory: %s ",
|
||||||
|
cookie_location)
|
||||||
try:
|
try:
|
||||||
with sqlite3.connect(cookie_location) as conn:
|
with sqlite3.connect(cookie_location) as conn:
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
|
@ -187,9 +197,11 @@ class Browser:
|
||||||
cookie_db=None
|
cookie_db=None
|
||||||
try:
|
try:
|
||||||
with open(cookie_location, "rb") as cookie_file:
|
with open(cookie_location, "rb") as cookie_file:
|
||||||
cookie_db=cookie_file.read()
|
cookie_db = cookie_file.read()
|
||||||
except OSError:
|
except OSError:
|
||||||
self.logger.error("exception reading from cookie DB file at: %s", cookie_location, exc_info=True)
|
self.logger.error(
|
||||||
|
"exception reading from cookie DB file %s",
|
||||||
|
cookie_location, exc_info=True)
|
||||||
return cookie_db
|
return cookie_db
|
||||||
|
|
||||||
def _find_available_port(self):
|
def _find_available_port(self):
|
||||||
|
@ -219,7 +231,8 @@ class Browser:
|
||||||
self, url, extra_headers=None, behavior_parameters=None,
|
self, url, extra_headers=None, behavior_parameters=None,
|
||||||
on_request=None, on_response=None, on_screenshot=None,
|
on_request=None, on_response=None, on_screenshot=None,
|
||||||
on_url_change=None):
|
on_url_change=None):
|
||||||
"""Synchronously loads a page, takes a screenshot, and runs behaviors.
|
"""
|
||||||
|
Synchronously loads a page, takes a screenshot, and runs behaviors.
|
||||||
|
|
||||||
Raises BrowsingException if browsing the page fails in a non-critical
|
Raises BrowsingException if browsing the page fails in a non-critical
|
||||||
way.
|
way.
|
||||||
|
@ -236,18 +249,13 @@ class Browser:
|
||||||
self.on_response = on_response
|
self.on_response = on_response
|
||||||
self.behavior_parameters = behavior_parameters
|
self.behavior_parameters = behavior_parameters
|
||||||
|
|
||||||
self._waiting_on_scroll_to_top_msg_id = None
|
|
||||||
self._waiting_on_scroll_to_top_start = None
|
|
||||||
self._waiting_on_screenshot_msg_id = None
|
|
||||||
self._waiting_on_document_url_msg_id = None
|
|
||||||
self._waiting_on_outlinks_msg_id = None
|
|
||||||
self._waiting_on_outlinks_start = None
|
|
||||||
self._waiting_on_outlinks_attempt = 0
|
|
||||||
self._outlinks = None
|
self._outlinks = None
|
||||||
self._reached_limit = None
|
self._reached_limit = None
|
||||||
self._aw_snap_hes_dead_jim = None
|
self._aw_snap_hes_dead_jim = None
|
||||||
self._abort_browse_page = False
|
self._abort_browse_page = False
|
||||||
self._has_screenshot = False
|
self._has_screenshot = False
|
||||||
|
self._waiting_on_result_messages = {}
|
||||||
|
self._result_message_timeout = None
|
||||||
|
|
||||||
self._websock = websocket.WebSocketApp(
|
self._websock = websocket.WebSocketApp(
|
||||||
self._websocket_url, on_open=self._visit_page,
|
self._websocket_url, on_open=self._visit_page,
|
||||||
|
@ -266,11 +274,6 @@ class Browser:
|
||||||
while True:
|
while True:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
if self._browse_interval_func():
|
if self._browse_interval_func():
|
||||||
break
|
|
||||||
|
|
||||||
while True:
|
|
||||||
time.sleep(0.5)
|
|
||||||
if self._post_behavior_interval_func():
|
|
||||||
return self._outlinks
|
return self._outlinks
|
||||||
finally:
|
finally:
|
||||||
if (self._websock and self._websock.sock
|
if (self._websock and self._websock.sock
|
||||||
|
@ -297,85 +300,6 @@ class Browser:
|
||||||
|
|
||||||
self._behavior = None
|
self._behavior = None
|
||||||
|
|
||||||
def _post_behavior_interval_func(self):
|
|
||||||
"""
|
|
||||||
Called periodically after behavior is finished on the page. Returns
|
|
||||||
true when post-behavior tasks are finished.
|
|
||||||
"""
|
|
||||||
if (not self._websock or not self._websock.sock
|
|
||||||
or not self._websock.sock.connected):
|
|
||||||
raise BrowsingException(
|
|
||||||
"websocket closed, did chrome die? {}".format(
|
|
||||||
self._websocket_url))
|
|
||||||
|
|
||||||
if (not self._has_screenshot
|
|
||||||
and not self._waiting_on_scroll_to_top_msg_id
|
|
||||||
and not self._waiting_on_screenshot_msg_id):
|
|
||||||
if time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS:
|
|
||||||
self.logger.info(
|
|
||||||
"reached hard timeout of {} seconds url={}".format(
|
|
||||||
Browser.HARD_TIMEOUT_SECONDS, self.url))
|
|
||||||
else:
|
|
||||||
self.logger.info(
|
|
||||||
"behavior decided it's finished with %s", self.url)
|
|
||||||
|
|
||||||
self.logger.info(
|
|
||||||
"scrolling to the top, then requesting screenshot %s",
|
|
||||||
self.url)
|
|
||||||
self._waiting_on_scroll_to_top_msg_id = self.send_to_chrome(
|
|
||||||
method="Runtime.evaluate",
|
|
||||||
params={"expression":"window.scrollTo(0, 0);"})
|
|
||||||
self._waiting_on_scroll_to_top_start = time.time()
|
|
||||||
return False
|
|
||||||
elif (self._waiting_on_scroll_to_top_msg_id
|
|
||||||
and time.time() - self._waiting_on_scroll_to_top_start > 30.0):
|
|
||||||
# chromium bug? occasionally we get no scroll-to-top result message
|
|
||||||
self.logger.warn(
|
|
||||||
"timed out after %.1fs waiting for scroll-to-top result "
|
|
||||||
"message, requesting screenshot now",
|
|
||||||
time.time() - self._waiting_on_scroll_to_top_start)
|
|
||||||
self._waiting_on_scroll_to_top_msg_id = None
|
|
||||||
self._waiting_on_scroll_to_top_start = None
|
|
||||||
self._waiting_on_screenshot_msg_id = self.send_to_chrome(
|
|
||||||
method="Page.captureScreenshot")
|
|
||||||
elif not self._has_screenshot and (
|
|
||||||
self._waiting_on_scroll_to_top_msg_id
|
|
||||||
or self._waiting_on_screenshot_msg_id):
|
|
||||||
return False
|
|
||||||
|
|
||||||
if self._outlinks is not None:
|
|
||||||
self.logger.info("got outlinks, finished browsing %s", self.url)
|
|
||||||
return True
|
|
||||||
elif not self._waiting_on_outlinks_msg_id:
|
|
||||||
self.logger.info("retrieving outlinks for %s", self.url)
|
|
||||||
self._request_outlinks()
|
|
||||||
return False
|
|
||||||
else: # self._waiting_on_outlinks_msg_id
|
|
||||||
if time.time() - self._waiting_on_outlinks_start > 30.0:
|
|
||||||
if self._waiting_on_outlinks_attempt < 5:
|
|
||||||
self.logger.warn(
|
|
||||||
"timed out after %.1fs on attempt %s to retrieve "
|
|
||||||
"outlinks, trying again",
|
|
||||||
time.time() - self._waiting_on_outlinks_start,
|
|
||||||
self._waiting_on_outlinks_attempt)
|
|
||||||
self._request_outlinks()
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
raise BrowsingException(
|
|
||||||
"timed out after %.1fs on (final) attempt %s "
|
|
||||||
"to retrieve outlinks" % (
|
|
||||||
time.time() - self._waiting_on_outlinks_start,
|
|
||||||
self._waiting_on_outlinks_attempt))
|
|
||||||
else: # just waiting for outlinks
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _request_outlinks(self):
|
|
||||||
self._waiting_on_outlinks_msg_id = self.send_to_chrome(
|
|
||||||
method="Runtime.evaluate",
|
|
||||||
params={"expression": self.OUTLINKS_JS})
|
|
||||||
self._waiting_on_outlinks_attempt += 1
|
|
||||||
self._waiting_on_outlinks_start = time.time()
|
|
||||||
|
|
||||||
OUTLINKS_JS = """
|
OUTLINKS_JS = """
|
||||||
var compileOutlinks = function(frame) {
|
var compileOutlinks = function(frame) {
|
||||||
var outlinks = Array.prototype.slice.call(
|
var outlinks = Array.prototype.slice.call(
|
||||||
|
@ -390,6 +314,74 @@ var compileOutlinks = function(frame) {
|
||||||
compileOutlinks(window).join(' ');
|
compileOutlinks(window).join(' ');
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def _chain_chrome_messages(self, chain):
|
||||||
|
timer = None
|
||||||
|
|
||||||
|
def callback(message):
|
||||||
|
self.logger.info(
|
||||||
|
"timer=%s chain[0]['callback']=%s len(chain[1:])=%s",
|
||||||
|
timer, chain[0]["callback"], len(chain[1:]))
|
||||||
|
if timer:
|
||||||
|
timer.cancel()
|
||||||
|
if message["id"] in self._waiting_on_result_messages:
|
||||||
|
del self._waiting_on_result_messages[message["id"]]
|
||||||
|
if "callback" in chain[0]:
|
||||||
|
chain[0]["callback"](message)
|
||||||
|
self._chain_chrome_messages(chain[1:])
|
||||||
|
|
||||||
|
def timeout():
|
||||||
|
self._result_message_timeout = ResultMessageTimeout(
|
||||||
|
"timed out after %.1fs waiting for result message "
|
||||||
|
"for %s", chain[0]["timeout"], chain[0]["chrome_msg"])
|
||||||
|
|
||||||
|
if chain:
|
||||||
|
msg_id = self.send_to_chrome(**chain[0]["chrome_msg"])
|
||||||
|
self._waiting_on_result_messages[msg_id] = callback
|
||||||
|
self.logger.info(
|
||||||
|
"msg_id=%s for message %s", msg_id, chain[0]["chrome_msg"])
|
||||||
|
timer = threading.Timer(chain[0]["timeout"], timeout)
|
||||||
|
timer.daemon = True
|
||||||
|
timer.start()
|
||||||
|
else:
|
||||||
|
self.logger.info("finished chrome message chain")
|
||||||
|
|
||||||
|
def _start_postbehavior_chain(self):
|
||||||
|
if self.on_screenshot:
|
||||||
|
chain = [{
|
||||||
|
"info": "scrolling to top",
|
||||||
|
"chrome_msg": {
|
||||||
|
"method": "Runtime.evaluate",
|
||||||
|
"params": {"expression": "window.scrollTo(0, 0);"},
|
||||||
|
},
|
||||||
|
"timeout": 30,
|
||||||
|
"callback": lambda message: None,
|
||||||
|
}, {
|
||||||
|
"info": "requesting screenshot",
|
||||||
|
"chrome_msg": {"method": "Page.captureScreenshot"},
|
||||||
|
"timeout": 30,
|
||||||
|
"callback": lambda message: (
|
||||||
|
self.on_screenshot and self.on_screenshot(
|
||||||
|
base64.b64decode(message["result"]["data"]))),
|
||||||
|
}]
|
||||||
|
else:
|
||||||
|
chain = []
|
||||||
|
|
||||||
|
def set_outlinks(message):
|
||||||
|
self._outlinks = frozenset(
|
||||||
|
message["result"]["result"]["value"].split())
|
||||||
|
|
||||||
|
chain.append({
|
||||||
|
"info": "retrieving outlinks",
|
||||||
|
"chrome_msg": {
|
||||||
|
"method": "Runtime.evaluate",
|
||||||
|
"params": {"expression": self.OUTLINKS_JS},
|
||||||
|
},
|
||||||
|
"timeout": 60,
|
||||||
|
"callback": set_outlinks,
|
||||||
|
})
|
||||||
|
|
||||||
|
self._chain_chrome_messages(chain)
|
||||||
|
|
||||||
def _browse_interval_func(self):
|
def _browse_interval_func(self):
|
||||||
"""Called periodically while page is being browsed. Returns True when
|
"""Called periodically while page is being browsed. Returns True when
|
||||||
finished browsing."""
|
finished browsing."""
|
||||||
|
@ -398,12 +390,27 @@ compileOutlinks(window).join(' ');
|
||||||
raise BrowsingException(
|
raise BrowsingException(
|
||||||
"websocket closed, did chrome die? {}".format(
|
"websocket closed, did chrome die? {}".format(
|
||||||
self._websocket_url))
|
self._websocket_url))
|
||||||
|
elif self._result_message_timeout:
|
||||||
|
raise self._result_message_timeout
|
||||||
elif self._aw_snap_hes_dead_jim:
|
elif self._aw_snap_hes_dead_jim:
|
||||||
raise BrowsingException(
|
raise BrowsingException(
|
||||||
"""chrome tab went "aw snap" or "he's dead jim"!""")
|
"""chrome tab went "aw snap" or "he's dead jim"!""")
|
||||||
|
elif self._outlinks is not None:
|
||||||
|
# setting self._outlinks is the last thing that happens in the
|
||||||
|
# post-behavior chain
|
||||||
|
return True
|
||||||
elif (self._behavior != None and self._behavior.is_finished()
|
elif (self._behavior != None and self._behavior.is_finished()
|
||||||
or time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS):
|
or time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS):
|
||||||
return True
|
if self._behavior and self._behavior.is_finished():
|
||||||
|
self.logger.info(
|
||||||
|
"behavior decided it's finished with %s", self.url)
|
||||||
|
else:
|
||||||
|
self.logger.info(
|
||||||
|
"reached hard timeout of %s seconds url=%s",
|
||||||
|
Browser.HARD_TIMEOUT_SECONDS, self.url)
|
||||||
|
self._behavior = None
|
||||||
|
self._start_postbehavior_chain()
|
||||||
|
return False
|
||||||
elif self._reached_limit:
|
elif self._reached_limit:
|
||||||
raise self._reached_limit
|
raise self._reached_limit
|
||||||
elif self._abort_browse_page:
|
elif self._abort_browse_page:
|
||||||
|
@ -413,10 +420,10 @@ compileOutlinks(window).join(' ');
|
||||||
|
|
||||||
def send_to_chrome(self, suppress_logging=False, **kwargs):
|
def send_to_chrome(self, suppress_logging=False, **kwargs):
|
||||||
msg_id = next(self.command_id)
|
msg_id = next(self.command_id)
|
||||||
kwargs['id'] = msg_id
|
kwargs["id"] = msg_id
|
||||||
msg = json.dumps(kwargs)
|
msg = json.dumps(kwargs)
|
||||||
if not suppress_logging:
|
if not suppress_logging:
|
||||||
self.logger.debug('sending message to {}: {}'.format(self._websock, msg))
|
self.logger.debug("sending message to %s: %s", self._websock, msg)
|
||||||
self._websock.send(msg)
|
self._websock.send(msg)
|
||||||
return msg_id
|
return msg_id
|
||||||
|
|
||||||
|
@ -495,52 +502,41 @@ compileOutlinks(window).join(' ');
|
||||||
# resume execution
|
# resume execution
|
||||||
self.send_to_chrome(method="Debugger.resume")
|
self.send_to_chrome(method="Debugger.resume")
|
||||||
|
|
||||||
def _handle_result_message(self, message):
|
|
||||||
if message["id"] == self._waiting_on_screenshot_msg_id:
|
|
||||||
if self.on_screenshot:
|
|
||||||
self.on_screenshot(base64.b64decode(message["result"]["data"]))
|
|
||||||
self._waiting_on_screenshot_msg_id = None
|
|
||||||
self._has_screenshot = True
|
|
||||||
self.logger.info("got screenshot, moving on to getting outlinks")
|
|
||||||
elif message["id"] == self._waiting_on_scroll_to_top_msg_id:
|
|
||||||
self._waiting_on_scroll_to_top_msg_id = None
|
|
||||||
self._waiting_on_scroll_to_top_start = None
|
|
||||||
self._waiting_on_screenshot_msg_id = self.send_to_chrome(
|
|
||||||
method="Page.captureScreenshot")
|
|
||||||
elif message["id"] == self._waiting_on_outlinks_msg_id:
|
|
||||||
self.logger.debug("got outlinks message=%s", message)
|
|
||||||
self._outlinks = frozenset(
|
|
||||||
message["result"]["result"]["value"].split())
|
|
||||||
elif message["id"] == self._waiting_on_document_url_msg_id:
|
|
||||||
if message["result"]["result"]["value"] != self.url:
|
|
||||||
if self.on_url_change:
|
|
||||||
self.on_url_change(message["result"]["result"]["value"])
|
|
||||||
self._waiting_on_document_url_msg_id = None
|
|
||||||
elif self._behavior and self._behavior.is_waiting_on_result(message["id"]):
|
|
||||||
self._behavior.notify_of_result(message)
|
|
||||||
|
|
||||||
def _handle_message(self, websock, json_message):
|
def _handle_message(self, websock, json_message):
|
||||||
message = json.loads(json_message)
|
message = json.loads(json_message)
|
||||||
if "method" in message and message["method"] == "Network.requestWillBeSent":
|
if "method" in message:
|
||||||
self._network_request_will_be_sent(message)
|
if message["method"] == "Network.requestWillBeSent":
|
||||||
elif "method" in message and message["method"] == "Network.responseReceived":
|
self._network_request_will_be_sent(message)
|
||||||
self._network_response_received(message)
|
elif message["method"] == "Network.responseReceived":
|
||||||
elif "method" in message and message["method"] == "Page.loadEventFired":
|
self._network_response_received(message)
|
||||||
self._page_load_event_fired(message)
|
elif message["method"] == "Page.loadEventFired":
|
||||||
elif "method" in message and message["method"] == "Console.messageAdded":
|
self._page_load_event_fired(message)
|
||||||
self._console_message_added(message)
|
elif message["method"] == "Console.messageAdded":
|
||||||
elif "method" in message and message["method"] == "Debugger.paused":
|
self._console_message_added(message)
|
||||||
self._debugger_paused(message)
|
elif message["method"] == "Debugger.paused":
|
||||||
elif "method" in message and message["method"] == "Inspector.targetCrashed":
|
self._debugger_paused(message)
|
||||||
self._aw_snap_hes_dead_jim = message
|
elif message["method"] == "Inspector.targetCrashed":
|
||||||
|
self._aw_snap_hes_dead_jim = message
|
||||||
|
# elif message["method"] in (
|
||||||
|
# "Network.dataReceived", "Network.responseReceived",
|
||||||
|
# "Network.loadingFinished"):
|
||||||
|
# pass
|
||||||
|
# else:
|
||||||
|
# self.logger.debug("%s %s", message["method"], json_message)
|
||||||
elif "result" in message:
|
elif "result" in message:
|
||||||
self._handle_result_message(message)
|
if message["id"] in self._waiting_on_result_messages:
|
||||||
# elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"):
|
callback = self._waiting_on_result_messages[message["id"]]
|
||||||
# pass
|
self.logger.info(
|
||||||
# elif "method" in message:
|
"received result for message id=%s, calling %s",
|
||||||
# self.logger.debug("{} {}".format(message["method"], json_message))
|
message["id"], callback)
|
||||||
|
callback(message)
|
||||||
|
elif self._behavior and self._behavior.is_waiting_on_result(
|
||||||
|
message["id"]):
|
||||||
|
self._behavior.notify_of_result(message)
|
||||||
|
# else:
|
||||||
|
# self.logger.debug("%s", json_message)
|
||||||
# else:
|
# else:
|
||||||
# self.logger.debug("[no-method] {}".format(json_message))
|
# self.logger.debug("%s", json_message)
|
||||||
|
|
||||||
class Chrome:
|
class Chrome:
|
||||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -21,7 +21,7 @@ import setuptools
|
||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='brozzler',
|
name='brozzler',
|
||||||
version='1.1b4.dev61',
|
version='1.1b4.dev62',
|
||||||
description='Distributed web crawling with browsers',
|
description='Distributed web crawling with browsers',
|
||||||
url='https://github.com/internetarchive/brozzler',
|
url='https://github.com/internetarchive/brozzler',
|
||||||
author='Noah Levitt',
|
author='Noah Levitt',
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue