handle multiple clients, browsers

This commit is contained in:
Noah Levitt 2014-02-13 01:59:09 -08:00
parent 4dbe111aee
commit f69edd5a87
2 changed files with 130 additions and 48 deletions

View file

@ -17,6 +17,8 @@ arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672//', arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672//',
help='URL identifying the amqp server to talk to') help='URL identifying the amqp server to talk to')
arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url.0',
help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to')
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra') arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra')
args = arg_parser.parse_args(args=sys.argv[1:]) args = arg_parser.parse_args(args=sys.argv[1:])
@ -24,5 +26,5 @@ umbra_exchange = Exchange('umbra', 'direct', durable=True)
with Connection(args.amqp_url) as conn: with Connection(args.amqp_url) as conn:
producer = conn.Producer(serializer='json') producer = conn.Producer(serializer='json')
for url in args.urls: for url in args.urls:
producer.publish({'url': url, 'metadata' : {}}, 'url', exchange=umbra_exchange) producer.publish({'url': url, 'metadata': {}, 'clientId': args.client_id}, 'url', exchange=umbra_exchange)

View file

@ -9,61 +9,132 @@ import time
import uuid import uuid
import logging import logging
import threading import threading
import subprocess
import signal
from kombu import Connection, Exchange, Queue from kombu import Connection, Exchange, Queue
class UmbraWorker:
logger = logging.getLogger('umbra.UmbraWorker')
def __init__(self, umbra, chrome_port=9222, chrome_exe='chromium-browser', chrome_wait=10, client_id='request'):
self.command_id = count(1)
self.lock = threading.Lock()
self.umbra = umbra
self.chrome_port = chrome_port
self.chrome_exe = chrome_exe
self.chrome_wait = chrome_wait
self.client_id = client_id
def browse_page(self, url, url_metadata):
with self.lock:
self.url = url
self.url_metadata = url_metadata
with Chrome(self.chrome_port, self.chrome_exe, self.chrome_wait) as websocket_url:
websock = websocket.WebSocketApp(websocket_url,
on_open=self.visit_page, on_message=self.handle_message)
websock_thread = threading.Thread(target=websock.run_forever)
websock_thread.start()
# XXX more logic goes here
time.sleep(10)
websock.close()
def visit_page(self, websock):
msg = dumps(dict(method="Network.enable", id=next(self.command_id)))
self.logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)
msg = dumps(dict(method="Page.navigate", id=next(self.command_id), params={"url": self.url}))
self.logger.debug('sending message to {}: {}'.format(websock, msg))
websock.send(msg)
def handle_message(self, websock, message):
# self.logger.debug("handling message from websocket {} - {}".format(websock, message))
message = loads(message)
if "method" in message.keys() and message["method"] == "Network.requestWillBeSent":
payload = message['params']['request']
payload['parentUrl'] = self.url
payload['parentUrlMetadata'] = self.url_metadata
self.logger.debug('sending to amqp exchange={} routing_key={} payload={}'.format(self.umbra.umbra_exchange, self.client_id, payload))
# bind a queue with the same name as the return routing key
# (AMQPUrlReceiver in heritrix expects this)
request_queue = Queue(self.client_id,
routing_key=self.client_id,
exchange=self.umbra.umbra_exchange)
with self.umbra.producer_lock:
# self.umbra.producer.publish(payload,
# routing_key=self.client_id,
# exchange=self.umbra.umbra_exchange,
# declare=[request_queue])
self.umbra.producer.publish(payload,
routing_key=self.client_id,
exchange=self.umbra.umbra_exchange)
class Umbra: class Umbra:
logger = logging.getLogger('umbra.Umbra') logger = logging.getLogger('umbra.Umbra')
def __init__(self, amqp_url, chrome_args):
self.producer = None
self.amqp_url = amqp_url
self.chrome_args = chrome_args
self.producer_lock = threading.Lock()
self.consume_amqp()
def get_message_handler(self, url, url_metadata): def __init__(self, amqp_url, chrome_exe, browser_wait):
def handle_message(ws, message): self.amqp_url = amqp_url
message = loads(message) self.chrome_exe = chrome_exe
self.logger.info(message) self.browser_wait = browser_wait
if "method" in message.keys() and message["method"] == "Network.requestWillBeSent": self.producer = None
to_send = {} self.producer_lock = None
to_send.update(message['params']['request']) self.workers = {}
to_send.update(dict(parentUrl=url,parentUrlMetadata=url_metadata)) self.workers_lock = threading.Lock()
self.logger.debug('sending to amqp: {}'.format(to_send)) self.amqp_thread = threading.Thread(target=self.consume_amqp)
with self.producer_lock: self.amqp_stop = threading.Event()
self.producer.publish(to_send, self.amqp_thread.start()
routing_key='request',
exchange=self.umbra_exchange) def shutdown(self):
return handle_message self.logger.info("shutting down amqp consumer {}".format(self.amqp_url))
self.amqp_stop.set()
self.amqp_thread.join()
def consume_amqp(self): def consume_amqp(self):
self.umbra_exchange = Exchange('umbra', 'direct', durable=True) self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True)
url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange) url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange)
self.logger.info("connecting to amqp {} at {}".format(repr(self.umbra_exchange), self.amqp_url)) self.logger.info("connecting to amqp {} at {}".format(repr(self.umbra_exchange), self.amqp_url))
with Connection(self.amqp_url) as conn: with Connection(self.amqp_url) as conn:
self.producer = conn.Producer(serializer='json') self.producer = conn.Producer(serializer='json')
self.producer_lock = threading.Lock()
with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer: with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer:
while True: import socket
conn.drain_events() while not self.amqp_stop.is_set():
try:
conn.drain_events(timeout=0.5)
except socket.timeout:
pass
self.logger.debug('out of "while not self.amqp_stop.is_set()"')
self.logger.debug('out of "with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer"')
self.logger.debug('out of "with Connection(self.amqp_url) as conn"')
def fetch_url(self, body, message): def fetch_url(self, body, message):
url, metadata = body['url'], body['metadata'] self.logger.debug("body={} message={} message.headers={} message.payload={} message.delivery_info={}".format(repr(body), message, message.headers, message.payload, message.delivery_info))
def send_websocket_commands(ws): self.logger.debug("dir(message)={}".format(dir(message)))
command_id = count(1) self.logger.debug("dir(message.delivery_info)={}".format(dir(message.delivery_info)))
ws.send(dumps(dict(method="Network.enable", id=next(command_id)))) self.logger.debug("message.channel={}".format(message.channel))
ws.send(dumps(dict(method="Page.navigate", id=next(command_id), params={"url": url})))
#from umbra import behaviors client_id = body['clientId']
#behaviors.execute(url, ws, command_id) with self.workers_lock:
if not client_id in self.workers:
port = 9222 + len(self.workers)
t = UmbraWorker(umbra=self, chrome_port=port,
chrome_exe=self.chrome_exe,
chrome_wait=self.browser_wait,
client_id=client_id)
self.workers[client_id] = t
message.ack() def browse_page_async():
self.logger.info('client_id={} body={}'.format(client_id, body))
self.workers[client_id].browse_page(body['url'], body['metadata'])
with Chrome(*self.chrome_args) as websocket_url: threading.Thread(target=browse_page_async).start()
websock = websocket.WebSocketApp(websocket_url)
websock.on_message = self.get_message_handler(url, metadata)
websock.on_open = send_websocket_commands
websock.run_forever()
class Chrome(): message.ack()
class Chrome:
logger = logging.getLogger('umbra.Chrome') logger = logging.getLogger('umbra.Chrome')
def __init__(self, port, executable, browser_wait): def __init__(self, port, executable, browser_wait):
@ -76,19 +147,20 @@ class Chrome():
json = raw_json.decode('utf-8') json = raw_json.decode('utf-8')
return loads(json) return loads(json)
# returns websocket url to chrome window with about:blank loaded
def __enter__(self): def __enter__(self):
import subprocess
chrome_args = [self.executable, "--temp-profile", chrome_args = [self.executable, "--temp-profile",
"--remote-debugging-port=%s" % self.port, "--remote-debugging-port=%s" % self.port,
"--disable-web-sockets", "--disable-cache", "--disable-web-sockets", "--disable-cache",
"--window-size=1100,900", "--enable-logging" "--window-size=1100,900", "--enable-logging",
"--homepage=about:blank", "about:blank"] "--homepage=about:blank", "about:blank"]
self.logger.info("running {}".format(chrome_args)) self.logger.info("running {}".format(chrome_args))
self.chrome_process = subprocess.Popen(chrome_args) self.chrome_process = subprocess.Popen(chrome_args, start_new_session=True)
self.logger.info("chrome running, pid {}".format(self.chrome_process.pid)) self.logger.info("chrome running, pid {}".format(self.chrome_process.pid))
start = time.time() start = time.time()
json_url = "http://localhost:%s/json" % self.port json_url = "http://localhost:%s/json" % self.port
while True: while True:
try: try:
raw_json = urllib.request.urlopen(json_url).read() raw_json = urllib.request.urlopen(json_url).read()
@ -110,7 +182,9 @@ class Chrome():
def __exit__(self, *args): def __exit__(self, *args):
self.logger.info("killing chrome pid {}".format(self.chrome_process.pid)) self.logger.info("killing chrome pid {}".format(self.chrome_process.pid))
self.chrome_process.kill() os.killpg(self.chrome_process.pid, signal.SIGINT)
# self.chrome_process.send_signal(signal.SIGINT)
self.chrome_process.wait()
def main(): def main():
# logging.basicConfig(stream=sys.stdout, level=logging.INFO, # logging.basicConfig(stream=sys.stdout, level=logging.INFO,
@ -124,13 +198,19 @@ def main():
help='Seconds to wait for browser initialization') help='Seconds to wait for browser initialization')
arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser', arg_parser.add_argument('-e', '--executable', dest='executable', default='chromium-browser',
help='Executable to use to invoke chrome') help='Executable to use to invoke chrome')
arg_parser.add_argument('-p', '--port', dest='port', default='9222',
help='Port to have invoked chrome listen on for debugging connections')
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to') help='URL identifying the amqp server to talk to')
args = arg_parser.parse_args(args=sys.argv[1:]) args = arg_parser.parse_args(args=sys.argv[1:])
chrome_args = (args.port, args.executable, args.browser_wait)
umbra = Umbra(args.amqp_url, chrome_args) umbra = Umbra(args.amqp_url, args.executable, args.browser_wait)
try:
while True:
time.sleep(0.5)
except:
pass
finally:
umbra.shutdown()
if __name__ == "__main__": if __name__ == "__main__":
main() main()