mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-02-24 08:39:59 -05:00
UmbraWorker.send_to_chrome() - central place to send message to chrome via websocket
This commit is contained in:
parent
a62a07e6b7
commit
55fad80553
@ -1,7 +1,7 @@
|
||||
# vim: set sw=4 et:
|
||||
|
||||
import json
|
||||
from itertools import chain
|
||||
import itertools
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
@ -18,7 +18,7 @@ class Behavior:
|
||||
def behaviors():
|
||||
if Behavior._behaviors is None:
|
||||
behaviors_directory = os.path.sep.join(__file__.split(os.path.sep)[:-1] + ['behaviors.d'])
|
||||
behavior_files = chain(*[[os.path.join(dir, file) for file in files if file.endswith('.js') and file != 'default.js'] for dir, dirs, files in os.walk(behaviors_directory)])
|
||||
behavior_files = itertools.chain(*[[os.path.join(dir, file) for file in files if file.endswith('.js') and file != 'default.js'] for dir, dirs, files in os.walk(behaviors_directory)])
|
||||
Behavior._behaviors = []
|
||||
for file_name in behavior_files:
|
||||
Behavior.logger.debug("reading behavior file {}".format(file_name))
|
||||
@ -46,10 +46,9 @@ class Behavior:
|
||||
Behavior._default_behavior = behavior
|
||||
return Behavior._default_behavior
|
||||
|
||||
def __init__(self, url, websock, command_id):
|
||||
def __init__(self, url, umbra_worker):
|
||||
self.url = url
|
||||
self.websock = websock
|
||||
self.command_id = command_id
|
||||
self.umbra_worker = umbra_worker
|
||||
|
||||
self.script_finished = False
|
||||
self.waiting_result_msg_ids = []
|
||||
@ -65,18 +64,12 @@ class Behavior:
|
||||
if self.active_behavior is None:
|
||||
self.active_behavior = Behavior.default_behavior()
|
||||
|
||||
msg = json.dumps(dict(method="Runtime.evaluate", params={"expression": self.active_behavior['script']}, id=next(self.command_id)))
|
||||
self.logger.debug('sending message to {}: {}'.format(self.websock, msg))
|
||||
self.websock.send(msg)
|
||||
|
||||
self.umbra_worker.send_to_chrome(method="Runtime.evaluate", params={"expression": self.active_behavior['script']})
|
||||
self.notify_of_activity()
|
||||
|
||||
def is_finished(self):
|
||||
msg_id = next(self.command_id)
|
||||
msg_id = self.umbra_worker.send_to_chrome(method="Runtime.evaluate", params={"expression": "umbraBehaviorFinished()"})
|
||||
self.waiting_result_msg_ids.append(msg_id)
|
||||
msg = json.dumps(dict(method="Runtime.evaluate", params={"expression": "umbraBehaviorFinished()"}, id=msg_id))
|
||||
self.logger.debug('sending message to {}: {}'.format(self.websock, msg))
|
||||
self.websock.send(msg)
|
||||
|
||||
request_idle_timeout_sec = 30
|
||||
if self.active_behavior and 'request_idle_timeout_sec' in self.active_behavior:
|
||||
|
@ -2,13 +2,15 @@
|
||||
# vim: set sw=4 et:
|
||||
|
||||
import logging
|
||||
import os, sys, argparse
|
||||
import sys
|
||||
|
||||
# logging.basicConfig(stream=sys.stdout, level=logging.INFO,
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
|
||||
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
|
||||
|
||||
from json import dumps, loads
|
||||
import os
|
||||
import argparse
|
||||
import json
|
||||
import urllib.request, urllib.error, urllib.parse
|
||||
import itertools
|
||||
import websocket
|
||||
@ -17,7 +19,7 @@ import uuid
|
||||
import threading
|
||||
import subprocess
|
||||
import signal
|
||||
from kombu import Connection, Exchange, Queue
|
||||
import kombu
|
||||
import tempfile
|
||||
from umbra.behaviors import Behavior
|
||||
|
||||
@ -39,6 +41,7 @@ class UmbraWorker:
|
||||
self.chrome_wait = chrome_wait
|
||||
self.client_id = client_id
|
||||
self._behavior = None
|
||||
self.websock = None
|
||||
|
||||
def browse_page(self, url, url_metadata):
|
||||
"""Synchronously browse a page and run behaviors."""
|
||||
@ -47,17 +50,17 @@ class UmbraWorker:
|
||||
self.url_metadata = url_metadata
|
||||
with tempfile.TemporaryDirectory() as user_data_dir:
|
||||
with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait, user_data_dir) as websocket_url:
|
||||
websock = websocket.WebSocketApp(websocket_url,
|
||||
self.websock = websocket.WebSocketApp(websocket_url,
|
||||
on_open=self._visit_page,
|
||||
on_message=self._handle_message)
|
||||
websock_thread = threading.Thread(target=websock.run_forever, kwargs={'ping_timeout':0.5})
|
||||
websock_thread = threading.Thread(target=self.websock.run_forever, kwargs={'ping_timeout':0.5})
|
||||
websock_thread.start()
|
||||
start = time.time()
|
||||
|
||||
while True:
|
||||
time.sleep(0.5)
|
||||
if not websock or not websock.sock or not websock.sock.connected:
|
||||
self.logger.error("websocket closed, did chrome die??? {}".format(websock))
|
||||
if not self.websock or not self.websock.sock or not self.websock.sock.connected:
|
||||
self.logger.error("websocket closed, did chrome die??? {}".format(self.websock))
|
||||
break
|
||||
elif time.time() - start > UmbraWorker.HARD_TIMEOUT_SECONDS:
|
||||
self.logger.info("finished browsing page, reached hard timeout of {} seconds url={}".format(UmbraWorker.HARD_TIMEOUT_SECONDS, self.url))
|
||||
@ -67,41 +70,32 @@ class UmbraWorker:
|
||||
break
|
||||
|
||||
try:
|
||||
websock.close()
|
||||
self.websock.close()
|
||||
except BaseException as e:
|
||||
self.logger.error("exception closing websocket {} - {}".format(websock, e))
|
||||
self.logger.error("exception closing websocket {} - {}".format(self.websock, e))
|
||||
|
||||
websock_thread.join()
|
||||
|
||||
def send_to_chrome(self, **kwargs):
|
||||
msg_id = next(self.command_id)
|
||||
kwargs['id'] = msg_id
|
||||
msg = json.dumps(kwargs)
|
||||
self.logger.debug('sending message to {}: {}'.format(self.websock, msg))
|
||||
self.websock.send(msg)
|
||||
return msg_id
|
||||
|
||||
def _visit_page(self, websock):
|
||||
msg = dumps(dict(method="Network.enable", id=next(self.command_id)))
|
||||
self.logger.debug('sending message to {}: {}'.format(websock, msg))
|
||||
websock.send(msg)
|
||||
|
||||
msg = dumps(dict(method="Page.enable", id=next(self.command_id)))
|
||||
self.logger.debug('sending message to {}: {}'.format(websock, msg))
|
||||
websock.send(msg)
|
||||
|
||||
msg = dumps(dict(method="Console.enable", id=next(self.command_id)))
|
||||
self.logger.debug('sending message to {}: {}'.format(websock, msg))
|
||||
websock.send(msg)
|
||||
|
||||
msg = dumps(dict(method="Debugger.enable", id=next(self.command_id)))
|
||||
self.logger.debug('sending message to {}: {}'.format(websock, msg))
|
||||
websock.send(msg)
|
||||
|
||||
msg = dumps(dict(method="Runtime.enable", id=next(self.command_id)))
|
||||
self.logger.debug('sending message to {}: {}'.format(websock, msg))
|
||||
websock.send(msg)
|
||||
self.send_to_chrome(method="Network.enable")
|
||||
self.send_to_chrome(method="Page.enable")
|
||||
self.send_to_chrome(method="Console.enable")
|
||||
self.send_to_chrome(method="Debugger.enable")
|
||||
self.send_to_chrome(method="Runtime.enable")
|
||||
|
||||
# disable google analytics, see _handle_message() where breakpoint is caught "Debugger.paused"
|
||||
msg = dumps(dict(method="Debugger.setBreakpointByUrl", id=next(self.command_id), params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"}))
|
||||
self.logger.debug('sending message to {}: {}'.format(websock, msg))
|
||||
websock.send(msg)
|
||||
self.send_to_chrome(method="Debugger.setBreakpointByUrl", params={"lineNumber": 1, "urlRegex":"https?://www.google-analytics.com/analytics.js"})
|
||||
|
||||
msg = dumps(dict(method="Page.navigate", id=next(self.command_id), params={"url": self.url}))
|
||||
self.logger.debug('sending message to {}: {}'.format(websock, msg))
|
||||
websock.send(msg)
|
||||
# navigate to the page!
|
||||
self.send_to_chrome(method="Page.navigate", params={"url": self.url})
|
||||
|
||||
# XXX should this class know anything about amqp? or should it
|
||||
# delegate this back up to the Umbra class?
|
||||
@ -118,7 +112,7 @@ class UmbraWorker:
|
||||
def _handle_message(self, websock, message):
|
||||
# self.logger.debug("message from {} - {}".format(websock.url, message[:95]))
|
||||
# self.logger.debug("message from {} - {}".format(websock.url, message))
|
||||
message = loads(message)
|
||||
message = json.loads(message)
|
||||
if "method" in message and message["method"] == "Network.requestWillBeSent":
|
||||
if self._behavior:
|
||||
self._behavior.notify_of_activity()
|
||||
@ -129,7 +123,7 @@ class UmbraWorker:
|
||||
elif "method" in message and message["method"] == "Page.loadEventFired":
|
||||
if self._behavior is None:
|
||||
self.logger.info("Page.loadEventFired, starting behaviors url={} message={}".format(self.url, message))
|
||||
self._behavior = Behavior(self.url, websock, self.command_id)
|
||||
self._behavior = Behavior(self.url, self)
|
||||
self._behavior.start()
|
||||
else:
|
||||
self.logger.warn("Page.loadEventFired but behaviors already running url={} message={}".format(self.url, message))
|
||||
@ -145,14 +139,10 @@ class UmbraWorker:
|
||||
scriptId = message['params']['callFrames'][0]['location']['scriptId']
|
||||
|
||||
# replace script
|
||||
msg = dumps(dict(method="Debugger.setScriptSource", id=next(self.command_id), params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"}))
|
||||
self.logger.debug('sending message to {}: {}'.format(websock, msg))
|
||||
websock.send(msg)
|
||||
self.send_to_chrome(method="Debugger.setScriptSource", params={"scriptId": scriptId, "scriptSource":"console.log('google analytics is no more!');"})
|
||||
|
||||
# resume execution
|
||||
msg = dumps(dict(method="Debugger.resume", id=next(self.command_id)))
|
||||
self.logger.debug('sending message to {}: {}'.format(websock, msg))
|
||||
websock.send(msg)
|
||||
self.send_to_chrome(method="Debugger.resume")
|
||||
elif "result" in message:
|
||||
if self._behavior and self._behavior.is_waiting_on_result(message['id']):
|
||||
self._behavior.notify_of_result(message)
|
||||
@ -217,10 +207,10 @@ class Umbra:
|
||||
def _consume_amqp(self):
|
||||
while not self.amqp_stop.is_set():
|
||||
try:
|
||||
self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True)
|
||||
url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange)
|
||||
self.umbra_exchange = kombu.Exchange(name='umbra', type='direct', durable=True)
|
||||
url_queue = kombu.Queue('urls', routing_key='url', exchange=self.umbra_exchange)
|
||||
self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url))
|
||||
with Connection(self.amqp_url) as conn:
|
||||
with kombu.Connection(self.amqp_url) as conn:
|
||||
if self.producer_lock is None:
|
||||
self.producer_lock = threading.Lock()
|
||||
with self.producer_lock:
|
||||
@ -273,7 +263,7 @@ class Chrome:
|
||||
def fetch_debugging_json():
|
||||
raw_json = urllib.request.urlopen("http://localhost:%s/json" % self.port).read()
|
||||
json = raw_json.decode('utf-8')
|
||||
return loads(json)
|
||||
return json.loads(json)
|
||||
|
||||
# returns websocket url to chrome window with about:blank loaded
|
||||
def __enter__(self):
|
||||
@ -294,7 +284,7 @@ class Chrome:
|
||||
while True:
|
||||
try:
|
||||
raw_json = urllib.request.urlopen(json_url).read()
|
||||
all_debug_info = loads(raw_json.decode('utf-8'))
|
||||
all_debug_info = json.loads(raw_json.decode('utf-8'))
|
||||
debug_info = [x for x in all_debug_info if x['url'] == 'about:blank']
|
||||
|
||||
if debug_info and 'webSocketDebuggerUrl' in debug_info[0]:
|
||||
|
Loading…
x
Reference in New Issue
Block a user