yet more refactoring of browser.py, clearer separation of purpose, Browser class manages browsing, sends most of the messages to chrome, WebsockReceiverThread handles messages that come back from chrome

This commit is contained in:
Noah Levitt 2016-12-16 13:52:12 -08:00
parent 534d2e63d6
commit bc6e0d243f
2 changed files with 224 additions and 207 deletions

View File

@ -102,6 +102,106 @@ class BrowserPool:
def num_in_use(self):
return len(self._in_use)
class WebsockReceiverThread(threading.Thread):
logger = logging.getLogger(__module__ + '.' + __qualname__)
def __init__(self, websock, name=None, daemon=True):
super().__init__(name=name, daemon=daemon)
self.websock = websock
self.calling_thread = threading.current_thread()
self.websock.on_open = self._on_open
self.websock.on_message = self._on_message
self.websock.on_error = self._on_error
self.websock.on_close = self._on_close
self.is_open = False
self.got_page_load_event = None
self._result_messages = {}
def expect_result(self, msg_id):
self._result_messages[msg_id] = None
def received_result(self, msg_id):
return bool(self._result_messages.get(msg_id))
def pop_result(self, msg_id):
return self._result_messages.pop(msg_id)
def _on_close(self, websock):
pass
# self.logger.info('GOODBYE GOODBYE WEBSOCKET')
def _on_open(self, websock):
self.is_open = True
def _on_error(self, websock, e):
'''
Raises BrowsingException in the thread that created this instance.
'''
if isinstance(e, (
websocket.WebSocketConnectionClosedException,
ConnectionResetError)):
self.logger.error('websocket closed, did chrome die?')
else:
self.logger.error(
'exception from websocket receiver thread',
exc_info=1)
brozzler.thread_raise(self.calling_thread, BrowsingException)
def run(self):
self.websock.run_forever()
def _on_message(self, websock, message):
try:
self._handle_message(websock, message)
except:
self.logger.error(
'uncaught exception in _handle_message message=%s',
message, exc_info=True)
def _debugger_paused(self, message):
# we hit the breakpoint set in start(), get rid of google analytics
self.logger.debug('debugger paused! message=%s', message)
scriptId = message['params']['callFrames'][0]['location']['scriptId']
# replace script
self.websock.send(
json.dumps(dict(
id=0, method='Debugger.setScriptSource',
params={'scriptId': scriptId,
'scriptSource': 'console.log("google analytics is no more!");'})))
# resume execution
self.websock.send(json.dumps(dict(id=0, method='Debugger.resume')))
def _handle_message(self, websock, json_message):
self.logger.debug("%s", json_message)
message = json.loads(json_message)
if 'method' in message:
if message['method'] == 'Page.loadEventFired':
self.got_page_load_event = datetime.datetime.utcnow()
elif message['method'] == 'Debugger.paused':
self._debugger_paused(message)
elif message['method'] == 'Console.messageAdded':
self.logger.debug(
'%s console.%s %s', self.websock.url,
message['params']['message']['level'],
message['params']['message']['text'])
# else:
# self.logger.debug("%s %s", message["method"], json_message)
elif 'result' in message:
if message['id'] in self._result_messages:
self._result_messages[message['id']] = message
# else:
# self.logger.debug("%s", json_message)
# else:
# self.logger.debug("%s", json_message)
class Browser:
'''
Manages an instance of Chrome for browsing pages.
@ -116,9 +216,11 @@ class Browser:
**kwargs: arguments for Chrome(...)
'''
self.chrome = Chrome(**kwargs)
self.websocket_url = None
self.websock_url = None
self.websock = None
self.websock_thread = None
self.is_browsing = False
self._browser_controller = None
self._command_id = Counter()
def __enter__(self):
self.start()
@ -127,6 +229,31 @@ class Browser:
def __exit__(self, *args):
self.stop()
def _wait_for(self, callback, timeout=None):
'''
Spins until callback() returns truthy.
'''
start = time.time()
while True:
brozzler.sleep(0.5)
if callback():
return
elapsed = time.time() - start
if timeout and elapsed > timeout:
raise BrowsingTimeout(
'timed out after %.1fs waiting for: %s' % (
elapsed, callback))
def send_to_chrome(self, suppress_logging=False, **kwargs):
msg_id = next(self._command_id)
kwargs['id'] = msg_id
msg = json.dumps(kwargs)
logging.log(
brozzler.TRACE if suppress_logging else logging.DEBUG,
'sending message to %s: %s', self.websock, msg)
self.websock.send(msg)
return msg_id
def start(self, **kwargs):
'''
Starts chrome if it's not running.
@ -135,24 +262,69 @@ class Browser:
**kwargs: arguments for self.chrome.start(...)
'''
if not self.is_running():
self.websocket_url = self.chrome.start(**kwargs)
self._browser_controller = BrowserController(self.websocket_url)
self._browser_controller.start()
self.websock_url = self.chrome.start(**kwargs)
self.websock = websocket.WebSocketApp(self.websock_url)
thread_name = 'WebsockThread:{}-{:%Y%m%d%H%M%S}'.format(
surt.handyurl.parse(self.websock_url).port,
datetime.datetime.utcnow())
self.websock_thread = WebsockReceiverThread(self.websock)
self.websock_thread.start()
self._wait_for(lambda: self.websock_thread.is_open, timeout=10)
# tell browser to send us messages we're interested in
self.send_to_chrome(method='Network.enable')
self.send_to_chrome(method='Page.enable')
self.send_to_chrome(method='Console.enable')
self.send_to_chrome(method='Debugger.enable')
self.send_to_chrome(method='Runtime.enable')
# disable google analytics, see _handle_message() where breakpoint
# is caught Debugger.paused
self.send_to_chrome(
method='Debugger.setBreakpointByUrl',
params={
'lineNumber': 1,
'urlRegex': 'https?://www.google-analytics.com/analytics.js'})
def stop(self):
'''
Stops chrome if it's running.
'''
try:
if self._browser_controller:
self._browser_controller.stop()
self.websocket_url = None
if (self.websock and self.websock.sock
and self.websock.sock.connected):
self.logger.info('shutting down websocket connection')
try:
self.websock.close()
except BaseException as e:
self.logger.error(
'exception closing websocket %s - %s',
self.websock, e)
self.chrome.stop()
if self.websock_thread and (
self.websock_thread != threading.current_thread()):
self.websock_thread.join(timeout=30)
if self.websock_thread.is_alive():
self.logger.error(
'%s still alive 30 seconds after closing %s, will '
'forcefully nudge it again', self.websock_thread,
self.websock)
self.websock.keep_running = False
self.websock_thread.join(timeout=30)
if self.websock_thread.is_alive():
self.logger.critical(
'%s still alive 60 seconds after closing %s',
self.websock_thread, self.websock)
self.websock_url = None
except:
self.logger.error('problem stopping', exc_info=True)
def is_running(self):
return self.websocket_url is not None
return self.websock_url is not None
def browse_page(
self, page_url, ignore_cert_errors=False, extra_headers=None,
@ -201,24 +373,24 @@ class Browser:
raise BrowsingException('browser is already busy browsing a page')
self.is_browsing = True
try:
self._browser_controller.navigate_to_page(page_url, timeout=300)
self.navigate_to_page(page_url, timeout=300)
## if login_credentials:
## self._browser_controller.try_login(login_credentials) (5 min?)
## self.try_login(login_credentials) (5 min?)
behavior_script = brozzler.behavior_script(
page_url, behavior_parameters)
self._browser_controller.run_behavior(behavior_script, timeout=900)
self.run_behavior(behavior_script, timeout=900)
if on_screenshot:
self._browser_controller.scroll_to_top()
jpeg_bytes = self._browser_controller.screenshot()
self.scroll_to_top()
jpeg_bytes = self.screenshot()
on_screenshot(jpeg_bytes)
outlinks = self._browser_controller.extract_outlinks()
outlinks = self.extract_outlinks()
## for each hashtag not already visited:
## navigate_to_hashtag (nothing to wait for so no timeout?)
## if on_screenshot;
## take screenshot (30 sec)
## run behavior (3 min)
## outlinks += retrieve_outlinks (60 sec)
final_page_url = self._browser_controller.url()
final_page_url = self.url()
return final_page_url, outlinks
except websocket.WebSocketConnectionClosedException as e:
self.logger.error('websocket closed, did chrome die?')
@ -226,181 +398,8 @@ class Browser:
finally:
self.is_browsing = False
class Counter:
def __init__(self):
self.next_value = 0
def __next__(self):
try:
return self.next_value
finally:
self.next_value += 1
def peek_next(self):
return self.next_value
class BrowserController:
'''
'''
logger = logging.getLogger(__module__ + '.' + __qualname__)
def __init__(self, websocket_url):
self.websocket_url = websocket_url
self._command_id = Counter()
self._websock_thread = None
self._websock_open = None
self._result_messages = {}
def _wait_for(self, callback, timeout=None):
'''
Spins until callback() returns truthy.
'''
start = time.time()
while True:
brozzler.sleep(0.5)
if callback():
return
elapsed = time.time() - start
if timeout and elapsed > timeout:
raise BrowsingTimeout(
'timed out after %.1fs waiting for: %s' % (
elapsed, callback))
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
def start(self):
if not self._websock_thread:
calling_thread = threading.current_thread()
def on_open(websock):
self._websock_open = datetime.datetime.utcnow()
def on_error(websock, e):
'''
Raises BrowsingException in the thread that called start()
'''
if isinstance(e, websocket.WebSocketConnectionClosedException):
self.logger.error('websocket closed, did chrome die?')
else:
self.logger.error(
'exception from websocket receiver thread',
exc_info=1)
brozzler.thread_raise(calling_thread, BrowsingException)
# open websocket, start thread that receives messages
self._websock = websocket.WebSocketApp(
self.websocket_url, on_open=on_open,
on_message=self._on_message, on_error=on_error)
thread_name = 'WebsockThread:{}-{:%Y%m%d%H%M%S}'.format(
surt.handyurl.parse(self.websocket_url).port,
datetime.datetime.utcnow())
self._websock_thread = threading.Thread(
target=self._websock.run_forever, name=thread_name,
daemon=True)
self._websock_thread.start()
self._wait_for(lambda: self._websock_open, timeout=10)
# tell browser to send messages we're interested in
self.send_to_chrome(method='Network.enable')
self.send_to_chrome(method='Page.enable')
self.send_to_chrome(method='Console.enable')
self.send_to_chrome(method='Debugger.enable')
self.send_to_chrome(method='Runtime.enable')
# disable google analytics, see _handle_message() where breakpoint
# is caught Debugger.paused
self.send_to_chrome(
method='Debugger.setBreakpointByUrl',
params={
'lineNumber': 1,
'urlRegex': 'https?://www.google-analytics.com/analytics.js'})
def stop(self, *args):
if self._websock_thread:
if (self._websock and self._websock.sock
and self._websock.sock.connected):
self.logger.info('shutting down websocket connection')
try:
self._websock.close()
except BaseException as e:
self.logger.error(
'exception closing websocket %s - %s',
self._websock, e)
if self._websock_thread != threading.current_thread():
self._websock_thread.join(timeout=30)
if self._websock_thread.is_alive():
self.logger.error(
'%s still alive 30 seconds after closing %s, will '
'forcefully nudge it again', self._websock_thread,
self._websock)
self._websock.keep_running = False
self._websock_thread.join(timeout=30)
if self._websock_thread.is_alive():
self.logger.critical(
'%s still alive 60 seconds after closing %s',
self._websock_thread, self._websock)
def _on_message(self, websock, message):
try:
self._handle_message(websock, message)
except:
self.logger.error(
'uncaught exception in _handle_message message=%s',
message, exc_info=True)
def _handle_message(self, websock, json_message):
message = json.loads(json_message)
if 'method' in message:
if message['method'] == 'Page.loadEventFired':
self._got_page_load_event = datetime.datetime.utcnow()
elif message['method'] == 'Debugger.paused':
self._debugger_paused(message)
elif message['method'] == 'Console.messageAdded':
self.logger.debug(
'%s console.%s %s', self._websock.url,
message['params']['message']['level'],
message['params']['message']['text'])
# else:
# self.logger.debug("%s %s", message["method"], json_message)
elif 'result' in message:
if message['id'] in self._result_messages:
self._result_messages[message['id']] = message
# else:
# self.logger.debug("%s", json_message)
# else:
# self.logger.debug("%s", json_message)
def _debugger_paused(self, message):
# we hit the breakpoint set in start(), get rid of google analytics
self.logger.debug('debugger paused! message=%s', message)
scriptId = message['params']['callFrames'][0]['location']['scriptId']
# replace script
self.send_to_chrome(
method='Debugger.setScriptSource',
params={'scriptId': scriptId,
'scriptSource': 'console.log("google analytics is no more!");'})
# resume execution
self.send_to_chrome(method='Debugger.resume')
def send_to_chrome(self, suppress_logging=False, **kwargs):
msg_id = next(self._command_id)
kwargs['id'] = msg_id
msg = json.dumps(kwargs)
if not suppress_logging:
self.logger.debug('sending message to %s: %s', self._websock, msg)
self._websock.send(msg)
return msg_id
def navigate_to_page(
self, page_url, extra_headers=None, user_agent=None, timeout=300):
'''
'''
headers = extra_headers or {}
headers['Accept-Encoding'] = 'identity'
self.send_to_chrome(
@ -414,9 +413,11 @@ class BrowserController:
# navigate to the page!
self.logger.info('navigating to page %s', page_url)
self._got_page_load_event = None
self.websock_thread.got_page_load_event = None
self.send_to_chrome(method='Page.navigate', params={'url': page_url})
self._wait_for(lambda: self._got_page_load_event, timeout=timeout)
self._wait_for(
lambda: self.websock_thread.got_page_load_event,
timeout=timeout)
OUTLINKS_JS = r'''
var __brzl_framesDone = new Set();
@ -438,13 +439,14 @@ __brzl_compileOutlinks(window).join('\n');
'''
def extract_outlinks(self, timeout=60):
self.logger.info('extracting outlinks')
self._result_messages[self._command_id.peek_next()] = None
self.websock_thread.expect_result(self._command_id.peek())
msg_id = self.send_to_chrome(
method='Runtime.evaluate',
params={'expression': self.OUTLINKS_JS})
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
lambda: self.websock_thread.received_result(msg_id),
timeout=timeout)
message = self.websock_thread.pop_result(msg_id)
if message['result']['result']['value']:
return frozenset(message['result']['result']['value'].split('\n'))
else:
@ -452,35 +454,38 @@ __brzl_compileOutlinks(window).join('\n');
def screenshot(self, timeout=30):
self.logger.info('taking screenshot')
self._result_messages[self._command_id.peek_next()] = None
self.websock_thread.expect_result(self._command_id.peek())
msg_id = self.send_to_chrome(method='Page.captureScreenshot')
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
lambda: self.websock_thread.received_result(msg_id),
timeout=timeout)
message = self.websock_thread.pop_result(msg_id)
jpeg_bytes = base64.b64decode(message['result']['data'])
return jpeg_bytes
def scroll_to_top(self, timeout=30):
self.logger.info('scrolling to top')
self._result_messages[self._command_id.peek_next()] = None
self.websock_thread.expect_result(self._command_id.peek())
msg_id = self.send_to_chrome(
method='Runtime.evaluate',
params={'expression': 'window.scrollTo(0, 0);'})
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
self._result_messages.pop(msg_id)
lambda: self.websock_thread.received_result(msg_id),
timeout=timeout)
self.websock_thread.pop_result(msg_id)
def url(self, timeout=30):
'''
Returns value of document.URL from the browser.
'''
self._result_messages[self._command_id.peek_next()] = None
self.websock_thread.expect_result(self._command_id.peek())
msg_id = self.send_to_chrome(
method='Runtime.evaluate',
params={'expression': 'document.URL'})
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=timeout)
message = self._result_messages.pop(msg_id)
lambda: self.websock_thread.received_result(msg_id),
timeout=timeout)
message = self.websock_thread.pop_result(msg_id)
return message['result']['result']['value']
def run_behavior(self, behavior_script, timeout=900):
@ -498,14 +503,15 @@ __brzl_compileOutlinks(window).join('\n');
brozzler.sleep(7)
self._result_messages[self._command_id.peek_next()] = None
self.websock_thread.expect_result(self._command_id.peek())
msg_id = self.send_to_chrome(
method='Runtime.evaluate', suppress_logging=True,
params={'expression': 'umbraBehaviorFinished()'})
try:
self._wait_for(
lambda: self._result_messages.get(msg_id), timeout=5)
msg = self._result_messages.get(msg_id)
lambda: self.websock_thread.received_result(msg_id),
timeout=5)
msg = self.websock_thread.pop_result(msg_id)
if (msg and 'result' in msg
and not ('wasThrown' in msg['result']
and msg['result']['wasThrown'])
@ -517,4 +523,15 @@ __brzl_compileOutlinks(window).join('\n');
except BrowsingTimeout:
pass
class Counter:
def __init__(self):
self.next_value = 0
def __next__(self):
try:
return self.next_value
finally:
self.next_value += 1
def peek(self):
return self.next_value

View File

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup(
name='brozzler',
version='1.1b9.dev144',
version='1.1b9.dev145',
description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler',
author='Noah Levitt',