diff --git a/brozzler/browser.py b/brozzler/browser.py index 25e95c6..d412188 100644 --- a/brozzler/browser.py +++ b/brozzler/browser.py @@ -102,6 +102,106 @@ class BrowserPool: def num_in_use(self): return len(self._in_use) +class WebsockReceiverThread(threading.Thread): + logger = logging.getLogger(__module__ + '.' + __qualname__) + + def __init__(self, websock, name=None, daemon=True): + super().__init__(name=name, daemon=daemon) + + self.websock = websock + + self.calling_thread = threading.current_thread() + + self.websock.on_open = self._on_open + self.websock.on_message = self._on_message + self.websock.on_error = self._on_error + self.websock.on_close = self._on_close + + self.is_open = False + self.got_page_load_event = None + + self._result_messages = {} + + def expect_result(self, msg_id): + self._result_messages[msg_id] = None + + def received_result(self, msg_id): + return bool(self._result_messages.get(msg_id)) + + def pop_result(self, msg_id): + return self._result_messages.pop(msg_id) + + def _on_close(self, websock): + pass + # self.logger.info('GOODBYE GOODBYE WEBSOCKET') + + def _on_open(self, websock): + self.is_open = True + + def _on_error(self, websock, e): + ''' + Raises BrowsingException in the thread that created this instance. + ''' + if isinstance(e, ( + websocket.WebSocketConnectionClosedException, + ConnectionResetError)): + self.logger.error('websocket closed, did chrome die?') + else: + self.logger.error( + 'exception from websocket receiver thread', + exc_info=1) + brozzler.thread_raise(self.calling_thread, BrowsingException) + + def run(self): + self.websock.run_forever() + + def _on_message(self, websock, message): + try: + self._handle_message(websock, message) + except: + self.logger.error( + 'uncaught exception in _handle_message message=%s', + message, exc_info=True) + + def _debugger_paused(self, message): + # we hit the breakpoint set in start(), get rid of google analytics + self.logger.debug('debugger paused! message=%s', message) + scriptId = message['params']['callFrames'][0]['location']['scriptId'] + + # replace script + self.websock.send( + json.dumps(dict( + id=0, method='Debugger.setScriptSource', + params={'scriptId': scriptId, + 'scriptSource': 'console.log("google analytics is no more!");'}))) + + # resume execution + self.websock.send(json.dumps(dict(id=0, method='Debugger.resume'))) + + def _handle_message(self, websock, json_message): + self.logger.debug("%s", json_message) + message = json.loads(json_message) + if 'method' in message: + if message['method'] == 'Page.loadEventFired': + self.got_page_load_event = datetime.datetime.utcnow() + elif message['method'] == 'Debugger.paused': + self._debugger_paused(message) + elif message['method'] == 'Console.messageAdded': + self.logger.debug( + '%s console.%s %s', self.websock.url, + message['params']['message']['level'], + message['params']['message']['text']) + # else: + # self.logger.debug("%s %s", message["method"], json_message) + elif 'result' in message: + if message['id'] in self._result_messages: + self._result_messages[message['id']] = message + # else: + # self.logger.debug("%s", json_message) + # else: + # self.logger.debug("%s", json_message) + + class Browser: ''' Manages an instance of Chrome for browsing pages. @@ -116,9 +216,11 @@ class Browser: **kwargs: arguments for Chrome(...) ''' self.chrome = Chrome(**kwargs) - self.websocket_url = None + self.websock_url = None + self.websock = None + self.websock_thread = None self.is_browsing = False - self._browser_controller = None + self._command_id = Counter() def __enter__(self): self.start() @@ -127,6 +229,31 @@ class Browser: def __exit__(self, *args): self.stop() + def _wait_for(self, callback, timeout=None): + ''' + Spins until callback() returns truthy. + ''' + start = time.time() + while True: + brozzler.sleep(0.5) + if callback(): + return + elapsed = time.time() - start + if timeout and elapsed > timeout: + raise BrowsingTimeout( + 'timed out after %.1fs waiting for: %s' % ( + elapsed, callback)) + + def send_to_chrome(self, suppress_logging=False, **kwargs): + msg_id = next(self._command_id) + kwargs['id'] = msg_id + msg = json.dumps(kwargs) + logging.log( + brozzler.TRACE if suppress_logging else logging.DEBUG, + 'sending message to %s: %s', self.websock, msg) + self.websock.send(msg) + return msg_id + def start(self, **kwargs): ''' Starts chrome if it's not running. @@ -135,24 +262,69 @@ class Browser: **kwargs: arguments for self.chrome.start(...) ''' if not self.is_running(): - self.websocket_url = self.chrome.start(**kwargs) - self._browser_controller = BrowserController(self.websocket_url) - self._browser_controller.start() + self.websock_url = self.chrome.start(**kwargs) + self.websock = websocket.WebSocketApp(self.websock_url) + thread_name = 'WebsockThread:{}-{:%Y%m%d%H%M%S}'.format( + surt.handyurl.parse(self.websock_url).port, + datetime.datetime.utcnow()) + self.websock_thread = WebsockReceiverThread(self.websock) + self.websock_thread.start() + + self._wait_for(lambda: self.websock_thread.is_open, timeout=10) + + # tell browser to send us messages we're interested in + self.send_to_chrome(method='Network.enable') + self.send_to_chrome(method='Page.enable') + self.send_to_chrome(method='Console.enable') + self.send_to_chrome(method='Debugger.enable') + self.send_to_chrome(method='Runtime.enable') + + # disable google analytics, see _handle_message() where breakpoint + # is caught Debugger.paused + self.send_to_chrome( + method='Debugger.setBreakpointByUrl', + params={ + 'lineNumber': 1, + 'urlRegex': 'https?://www.google-analytics.com/analytics.js'}) def stop(self): ''' Stops chrome if it's running. ''' try: - if self._browser_controller: - self._browser_controller.stop() - self.websocket_url = None + if (self.websock and self.websock.sock + and self.websock.sock.connected): + self.logger.info('shutting down websocket connection') + try: + self.websock.close() + except BaseException as e: + self.logger.error( + 'exception closing websocket %s - %s', + self.websock, e) + self.chrome.stop() + + if self.websock_thread and ( + self.websock_thread != threading.current_thread()): + self.websock_thread.join(timeout=30) + if self.websock_thread.is_alive(): + self.logger.error( + '%s still alive 30 seconds after closing %s, will ' + 'forcefully nudge it again', self.websock_thread, + self.websock) + self.websock.keep_running = False + self.websock_thread.join(timeout=30) + if self.websock_thread.is_alive(): + self.logger.critical( + '%s still alive 60 seconds after closing %s', + self.websock_thread, self.websock) + + self.websock_url = None except: self.logger.error('problem stopping', exc_info=True) def is_running(self): - return self.websocket_url is not None + return self.websock_url is not None def browse_page( self, page_url, ignore_cert_errors=False, extra_headers=None, @@ -201,24 +373,24 @@ class Browser: raise BrowsingException('browser is already busy browsing a page') self.is_browsing = True try: - self._browser_controller.navigate_to_page(page_url, timeout=300) + self.navigate_to_page(page_url, timeout=300) ## if login_credentials: - ## self._browser_controller.try_login(login_credentials) (5 min?) + ## self.try_login(login_credentials) (5 min?) behavior_script = brozzler.behavior_script( page_url, behavior_parameters) - self._browser_controller.run_behavior(behavior_script, timeout=900) + self.run_behavior(behavior_script, timeout=900) if on_screenshot: - self._browser_controller.scroll_to_top() - jpeg_bytes = self._browser_controller.screenshot() + self.scroll_to_top() + jpeg_bytes = self.screenshot() on_screenshot(jpeg_bytes) - outlinks = self._browser_controller.extract_outlinks() + outlinks = self.extract_outlinks() ## for each hashtag not already visited: ## navigate_to_hashtag (nothing to wait for so no timeout?) ## if on_screenshot; ## take screenshot (30 sec) ## run behavior (3 min) ## outlinks += retrieve_outlinks (60 sec) - final_page_url = self._browser_controller.url() + final_page_url = self.url() return final_page_url, outlinks except websocket.WebSocketConnectionClosedException as e: self.logger.error('websocket closed, did chrome die?') @@ -226,181 +398,8 @@ class Browser: finally: self.is_browsing = False -class Counter: - def __init__(self): - self.next_value = 0 - def __next__(self): - try: - return self.next_value - finally: - self.next_value += 1 - def peek_next(self): - return self.next_value - -class BrowserController: - ''' - ''' - - logger = logging.getLogger(__module__ + '.' + __qualname__) - - def __init__(self, websocket_url): - self.websocket_url = websocket_url - self._command_id = Counter() - self._websock_thread = None - self._websock_open = None - self._result_messages = {} - - def _wait_for(self, callback, timeout=None): - ''' - Spins until callback() returns truthy. - ''' - start = time.time() - while True: - brozzler.sleep(0.5) - if callback(): - return - elapsed = time.time() - start - if timeout and elapsed > timeout: - raise BrowsingTimeout( - 'timed out after %.1fs waiting for: %s' % ( - elapsed, callback)) - - def __enter__(self): - self.start() - return self - - def __exit__(self, *args): - self.stop() - - def start(self): - if not self._websock_thread: - calling_thread = threading.current_thread() - - def on_open(websock): - self._websock_open = datetime.datetime.utcnow() - def on_error(websock, e): - ''' - Raises BrowsingException in the thread that called start() - ''' - if isinstance(e, websocket.WebSocketConnectionClosedException): - self.logger.error('websocket closed, did chrome die?') - else: - self.logger.error( - 'exception from websocket receiver thread', - exc_info=1) - brozzler.thread_raise(calling_thread, BrowsingException) - - # open websocket, start thread that receives messages - self._websock = websocket.WebSocketApp( - self.websocket_url, on_open=on_open, - on_message=self._on_message, on_error=on_error) - thread_name = 'WebsockThread:{}-{:%Y%m%d%H%M%S}'.format( - surt.handyurl.parse(self.websocket_url).port, - datetime.datetime.utcnow()) - self._websock_thread = threading.Thread( - target=self._websock.run_forever, name=thread_name, - daemon=True) - self._websock_thread.start() - self._wait_for(lambda: self._websock_open, timeout=10) - - # tell browser to send messages we're interested in - self.send_to_chrome(method='Network.enable') - self.send_to_chrome(method='Page.enable') - self.send_to_chrome(method='Console.enable') - self.send_to_chrome(method='Debugger.enable') - self.send_to_chrome(method='Runtime.enable') - - # disable google analytics, see _handle_message() where breakpoint - # is caught Debugger.paused - self.send_to_chrome( - method='Debugger.setBreakpointByUrl', - params={ - 'lineNumber': 1, - 'urlRegex': 'https?://www.google-analytics.com/analytics.js'}) - - def stop(self, *args): - if self._websock_thread: - if (self._websock and self._websock.sock - and self._websock.sock.connected): - self.logger.info('shutting down websocket connection') - try: - self._websock.close() - except BaseException as e: - self.logger.error( - 'exception closing websocket %s - %s', - self._websock, e) - - if self._websock_thread != threading.current_thread(): - self._websock_thread.join(timeout=30) - if self._websock_thread.is_alive(): - self.logger.error( - '%s still alive 30 seconds after closing %s, will ' - 'forcefully nudge it again', self._websock_thread, - self._websock) - self._websock.keep_running = False - self._websock_thread.join(timeout=30) - if self._websock_thread.is_alive(): - self.logger.critical( - '%s still alive 60 seconds after closing %s', - self._websock_thread, self._websock) - - def _on_message(self, websock, message): - try: - self._handle_message(websock, message) - except: - self.logger.error( - 'uncaught exception in _handle_message message=%s', - message, exc_info=True) - - def _handle_message(self, websock, json_message): - message = json.loads(json_message) - if 'method' in message: - if message['method'] == 'Page.loadEventFired': - self._got_page_load_event = datetime.datetime.utcnow() - elif message['method'] == 'Debugger.paused': - self._debugger_paused(message) - elif message['method'] == 'Console.messageAdded': - self.logger.debug( - '%s console.%s %s', self._websock.url, - message['params']['message']['level'], - message['params']['message']['text']) - # else: - # self.logger.debug("%s %s", message["method"], json_message) - elif 'result' in message: - if message['id'] in self._result_messages: - self._result_messages[message['id']] = message - # else: - # self.logger.debug("%s", json_message) - # else: - # self.logger.debug("%s", json_message) - - def _debugger_paused(self, message): - # we hit the breakpoint set in start(), get rid of google analytics - self.logger.debug('debugger paused! message=%s', message) - scriptId = message['params']['callFrames'][0]['location']['scriptId'] - - # replace script - self.send_to_chrome( - method='Debugger.setScriptSource', - params={'scriptId': scriptId, - 'scriptSource': 'console.log("google analytics is no more!");'}) - - # resume execution - self.send_to_chrome(method='Debugger.resume') - - def send_to_chrome(self, suppress_logging=False, **kwargs): - msg_id = next(self._command_id) - kwargs['id'] = msg_id - msg = json.dumps(kwargs) - if not suppress_logging: - self.logger.debug('sending message to %s: %s', self._websock, msg) - self._websock.send(msg) - return msg_id - def navigate_to_page( self, page_url, extra_headers=None, user_agent=None, timeout=300): - ''' - ''' headers = extra_headers or {} headers['Accept-Encoding'] = 'identity' self.send_to_chrome( @@ -414,9 +413,11 @@ class BrowserController: # navigate to the page! self.logger.info('navigating to page %s', page_url) - self._got_page_load_event = None + self.websock_thread.got_page_load_event = None self.send_to_chrome(method='Page.navigate', params={'url': page_url}) - self._wait_for(lambda: self._got_page_load_event, timeout=timeout) + self._wait_for( + lambda: self.websock_thread.got_page_load_event, + timeout=timeout) OUTLINKS_JS = r''' var __brzl_framesDone = new Set(); @@ -438,13 +439,14 @@ __brzl_compileOutlinks(window).join('\n'); ''' def extract_outlinks(self, timeout=60): self.logger.info('extracting outlinks') - self._result_messages[self._command_id.peek_next()] = None + self.websock_thread.expect_result(self._command_id.peek()) msg_id = self.send_to_chrome( method='Runtime.evaluate', params={'expression': self.OUTLINKS_JS}) self._wait_for( - lambda: self._result_messages.get(msg_id), timeout=timeout) - message = self._result_messages.pop(msg_id) + lambda: self.websock_thread.received_result(msg_id), + timeout=timeout) + message = self.websock_thread.pop_result(msg_id) if message['result']['result']['value']: return frozenset(message['result']['result']['value'].split('\n')) else: @@ -452,35 +454,38 @@ __brzl_compileOutlinks(window).join('\n'); def screenshot(self, timeout=30): self.logger.info('taking screenshot') - self._result_messages[self._command_id.peek_next()] = None + self.websock_thread.expect_result(self._command_id.peek()) msg_id = self.send_to_chrome(method='Page.captureScreenshot') self._wait_for( - lambda: self._result_messages.get(msg_id), timeout=timeout) - message = self._result_messages.pop(msg_id) + lambda: self.websock_thread.received_result(msg_id), + timeout=timeout) + message = self.websock_thread.pop_result(msg_id) jpeg_bytes = base64.b64decode(message['result']['data']) return jpeg_bytes def scroll_to_top(self, timeout=30): self.logger.info('scrolling to top') - self._result_messages[self._command_id.peek_next()] = None + self.websock_thread.expect_result(self._command_id.peek()) msg_id = self.send_to_chrome( method='Runtime.evaluate', params={'expression': 'window.scrollTo(0, 0);'}) self._wait_for( - lambda: self._result_messages.get(msg_id), timeout=timeout) - self._result_messages.pop(msg_id) + lambda: self.websock_thread.received_result(msg_id), + timeout=timeout) + self.websock_thread.pop_result(msg_id) def url(self, timeout=30): ''' Returns value of document.URL from the browser. ''' - self._result_messages[self._command_id.peek_next()] = None + self.websock_thread.expect_result(self._command_id.peek()) msg_id = self.send_to_chrome( method='Runtime.evaluate', params={'expression': 'document.URL'}) self._wait_for( - lambda: self._result_messages.get(msg_id), timeout=timeout) - message = self._result_messages.pop(msg_id) + lambda: self.websock_thread.received_result(msg_id), + timeout=timeout) + message = self.websock_thread.pop_result(msg_id) return message['result']['result']['value'] def run_behavior(self, behavior_script, timeout=900): @@ -498,14 +503,15 @@ __brzl_compileOutlinks(window).join('\n'); brozzler.sleep(7) - self._result_messages[self._command_id.peek_next()] = None + self.websock_thread.expect_result(self._command_id.peek()) msg_id = self.send_to_chrome( method='Runtime.evaluate', suppress_logging=True, params={'expression': 'umbraBehaviorFinished()'}) try: self._wait_for( - lambda: self._result_messages.get(msg_id), timeout=5) - msg = self._result_messages.get(msg_id) + lambda: self.websock_thread.received_result(msg_id), + timeout=5) + msg = self.websock_thread.pop_result(msg_id) if (msg and 'result' in msg and not ('wasThrown' in msg['result'] and msg['result']['wasThrown']) @@ -517,4 +523,15 @@ __brzl_compileOutlinks(window).join('\n'); except BrowsingTimeout: pass +class Counter: + def __init__(self): + self.next_value = 0 + def __next__(self): + try: + return self.next_value + finally: + self.next_value += 1 + def peek(self): + return self.next_value + diff --git a/setup.py b/setup.py index c9ea4dc..de4b25b 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b9.dev144', + version='1.1b9.dev145', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt',