register with service registry; only start chrome right before using it, so that web console vnc windows aren't always full of about:blank

This commit is contained in:
Noah Levitt 2015-11-12 02:56:27 +00:00
parent b91d7e4c3f
commit 343b5c0f82
4 changed files with 76 additions and 25 deletions

View File

@ -13,6 +13,8 @@ import signal
import pprint import pprint
import traceback import traceback
import rethinkstuff import rethinkstuff
import warnings
import requests
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@ -32,6 +34,9 @@ args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level, logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning)
warnings.simplefilter("ignore", category=requests.packages.urllib3.exceptions.InsecurePlatformWarning)
def sigterm(signum, frame): def sigterm(signum, frame):
raise brozzler.ShutdownRequested('shutdown requested (caught SIGTERM)') raise brozzler.ShutdownRequested('shutdown requested (caught SIGTERM)')
@ -55,7 +60,8 @@ signal.signal(signal.SIGINT, sigint)
r = rethinkstuff.Rethinker(args.rethinkdb_servers.split(","), args.rethinkdb_db) r = rethinkstuff.Rethinker(args.rethinkdb_servers.split(","), args.rethinkdb_db)
frontier = brozzler.RethinkDbFrontier(r) frontier = brozzler.RethinkDbFrontier(r)
worker = brozzler.worker.BrozzlerWorker(frontier, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe) service_registry = rethinkstuff.ServiceRegistry(r)
worker = brozzler.worker.BrozzlerWorker(frontier, service_registry, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe)
worker.start() worker.start()

View File

@ -28,6 +28,7 @@ class BrowserPool:
def __init__(self, size=3, **kwargs): def __init__(self, size=3, **kwargs):
"""kwargs are passed on to Browser.__init__""" """kwargs are passed on to Browser.__init__"""
self.size = size
self._available = set() self._available = set()
self._in_use = set() self._in_use = set()
@ -58,6 +59,12 @@ class BrowserPool:
for browser in self._in_use: for browser in self._in_use:
browser.abort_browse_page() browser.abort_browse_page()
def num_available(self):
return len(self._available)
def num_in_use(self):
return len(self._in_use)
class NoBrowsersAvailable(Exception): class NoBrowsersAvailable(Exception):
pass pass
@ -68,10 +75,10 @@ class BrowsingAborted(BrowsingException):
pass pass
class Browser: class Browser:
"""Runs chrome/chromium to synchronously browse one page at a time using """
worker.browse_page(). Currently the implementation starts up a new instance Runs chrome/chromium to synchronously browse one page at a time using
of chrome for each page browsed, always on the same debug port. (In the worker.browse_page(). Should not be accessed from multiple threads.
future, it may keep the browser running indefinitely.)""" """
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
@ -88,6 +95,8 @@ class Browser:
self._abort_browse_page = False self._abort_browse_page = False
self._chrome_instance = None self._chrome_instance = None
self._aw_snap_hes_dead_jim = None self._aw_snap_hes_dead_jim = None
self._work_dir = None
self._websocket_url = None
def __repr__(self): def __repr__(self):
return "{}.{}:{}".format(Browser.__module__, Browser.__qualname__, self.chrome_port) return "{}.{}:{}".format(Browser.__module__, Browser.__qualname__, self.chrome_port)
@ -100,6 +109,7 @@ class Browser:
self.stop() self.stop()
def start(self, proxy=None): def start(self, proxy=None):
if not self._chrome_instance:
# these can raise exceptions # these can raise exceptions
self._work_dir = tempfile.TemporaryDirectory() self._work_dir = tempfile.TemporaryDirectory()
self._chrome_instance = Chrome(port=self.chrome_port, self._chrome_instance = Chrome(port=self.chrome_port,
@ -112,15 +122,18 @@ class Browser:
def stop(self): def stop(self):
try: try:
if self._chrome_instance: if self.is_running():
self._chrome_instance.stop() self._chrome_instance.stop()
self._chrome_instance = None self._chrome_instance = None
if self._work_dir:
self._work_dir.cleanup() self._work_dir.cleanup()
self._work_dir = None self._work_dir = None
self._websocket_url = None
except: except:
self.logger.error("problem stopping", exc_info=True) self.logger.error("problem stopping", exc_info=True)
def is_running(self):
return bool(self._websocket_url)
def abort_browse_page(self): def abort_browse_page(self):
self._abort_browse_page = True self._abort_browse_page = True
@ -133,6 +146,8 @@ class Browser:
Returns extracted outlinks. Returns extracted outlinks.
""" """
if not self.is_running():
raise BrowsingException("browser has not been started")
self.url = url self.url = url
self.extra_headers = extra_headers self.extra_headers = extra_headers
self.on_request = on_request self.on_request = on_request
@ -430,8 +445,9 @@ class Chrome:
logging.error("unexpected exception", exc_info=True) logging.error("unexpected exception", exc_info=True)
def stop(self): def stop(self):
if self._shutdown.is_set(): if not self.chrome_process or self._shutdown.is_set():
return return
timeout_sec = 300 timeout_sec = 300
self._shutdown.set() self._shutdown.set()
self.logger.info("terminating chrome pid {}".format(self.chrome_process.pid)) self.logger.info("terminating chrome pid {}".format(self.chrome_process.pid))

View File

@ -1,5 +1,3 @@
# vim: set sw=4 et:
import os import os
import logging import logging
import brozzler import brozzler
@ -13,12 +11,16 @@ import json
import PIL.Image import PIL.Image
import io import io
import socket import socket
import datetime
class BrozzlerWorker: class BrozzlerWorker:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, frontier, max_browsers=1, chrome_exe="chromium-browser"): HEARTBEAT_INTERVAL = 20.0
def __init__(self, frontier, service_registry=None, max_browsers=1, chrome_exe="chromium-browser"):
self._frontier = frontier self._frontier = frontier
self._service_registry = service_registry
self._max_browsers = max_browsers self._max_browsers = max_browsers
self._browser_pool = brozzler.browser.BrowserPool(max_browsers, self._browser_pool = brozzler.browser.BrowserPool(max_browsers,
chrome_exe=chrome_exe, ignore_cert_errors=True) chrome_exe=chrome_exe, ignore_cert_errors=True)
@ -74,7 +76,7 @@ class BrozzlerWorker:
info_json = json.dumps(info, sort_keys=True, indent=4) info_json = json.dumps(info, sort_keys=True, indent=4)
self.logger.info("sending WARCPROX_WRITE_RECORD request to warcprox with youtube-dl json for %s", page) self.logger.info("sending WARCPROX_WRITE_RECORD request to warcprox with youtube-dl json for %s", page)
self._warcprox_write_record(warcprox_address=site.proxy, self._warcprox_write_record(warcprox_address=site.proxy,
url=page.url, warc_type="metadata", url="youtube-dl:%s" % page.url, warc_type="metadata",
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8", content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
payload=info_json.encode("utf-8"), payload=info_json.encode("utf-8"),
extra_headers=site.extra_headers) extra_headers=site.extra_headers)
@ -128,6 +130,8 @@ class BrozzlerWorker:
except: except:
self.logger.error("youtube_dl raised exception on {}".format(page), exc_info=True) self.logger.error("youtube_dl raised exception on {}".format(page), exc_info=True)
if not browser.is_running():
browser.start(proxy=site.proxy)
outlinks = browser.browse_page(page.url, outlinks = browser.browse_page(page.url,
extra_headers=site.extra_headers, on_screenshot=on_screenshot, extra_headers=site.extra_headers, on_screenshot=on_screenshot,
on_url_change=page.note_redirect) on_url_change=page.note_redirect)
@ -137,8 +141,7 @@ class BrozzlerWorker:
start = time.time() start = time.time()
page = None page = None
try: try:
browser.start(proxy=site.proxy) while not self._shutdown_requested.is_set() and time.time() - start < 7 * 60:
while not self._shutdown_requested.is_set() and time.time() - start < 60:
page = self._frontier.claim_page(site, self._id) page = self._frontier.claim_page(site, self._id)
outlinks = self.brozzle_page(browser, ydl, site, page) outlinks = self.brozzle_page(browser, ydl, site, page)
self._frontier.completed_page(site, page) self._frontier.completed_page(site, page)
@ -158,10 +161,28 @@ class BrozzlerWorker:
self._frontier.disclaim_site(site, page) self._frontier.disclaim_site(site, page)
self._browser_pool.release(browser) self._browser_pool.release(browser)
def _service_heartbeat(self):
if hasattr(self, "status_info"):
status_info = self.status_info
else:
status_info = {
"role": "brozzler-worker",
"heartbeat_interval": self.HEARTBEAT_INTERVAL,
}
status_info["load"] = 1.0 * self._browser_pool.num_in_use() / self._browser_pool.size
status_info["browser_pool_size"] = self._browser_pool.size
status_info["browsers_in_use"] = self._browser_pool.num_in_use()
self.status_info = self._service_registry.heartbeat(status_info)
self.logger.debug("status in service registry: %s", self.status_info)
def run(self): def run(self):
try: try:
latest_state = None latest_state = None
while not self._shutdown_requested.is_set(): while not self._shutdown_requested.is_set():
if self._service_registry and (not hasattr(self, "status_info") or (datetime.datetime.now(datetime.timezone.utc) - self.status_info["last_heartbeat"]).total_seconds() > self.HEARTBEAT_INTERVAL):
self._service_heartbeat()
try: try:
browser = self._browser_pool.acquire() browser = self._browser_pool.acquire()
try: try:
@ -185,6 +206,9 @@ class BrozzlerWorker:
time.sleep(0.5) time.sleep(0.5)
except: except:
self.logger.critical("thread exiting due to unexpected exception", exc_info=True) self.logger.critical("thread exiting due to unexpected exception", exc_info=True)
finally:
if self._service_registry and hasattr(self, "status_info"):
self._service_registry.unregister(self.status_info["id"])
def start(self): def start(self):
th = threading.Thread(target=self.run, name="BrozzlerWorker") th = threading.Thread(target=self.run, name="BrozzlerWorker")

View File

@ -70,8 +70,13 @@ def job(job_id):
@app.route("/api/workers") @app.route("/api/workers")
def workers(): def workers():
workers_ = [{"host":host,"vnc_websocket_port":8901} for host in ["aidata400", "aidata401", "aidata400-bu", "aidata401-bu"]] workers_ = r.table("services").filter({"role":"brozzler-worker"}).run()
return flask.jsonify(workers=workers_) return flask.jsonify(workers=list(workers_))
@app.route("/api/services")
def services():
services_ = r.table("services").run()
return flask.jsonify(services=list(services_))
@app.route("/api/jobs") @app.route("/api/jobs")
def jobs(): def jobs():