working on brozzler-easy, single process with brozzler-worker and warcprox working together (pywb to be added)

This commit is contained in:
Noah Levitt 2016-07-05 18:46:42 -05:00
parent 1a7b94cae7
commit 3b252002b7
3 changed files with 64 additions and 31 deletions

View file

@ -24,7 +24,6 @@ import datetime
import json import json
import logging import logging
import os import os
import pprint
import re import re
import requests import requests
import rethinkstuff import rethinkstuff
@ -233,7 +232,6 @@ def brozzler_worker():
raise brozzler.ShutdownRequested('shutdown requested (caught SIGINT)') raise brozzler.ShutdownRequested('shutdown requested (caught SIGINT)')
def dump_state(signum, frame): def dump_state(signum, frame):
pp = pprint.PrettyPrinter(indent=4)
state_strs = [] state_strs = []
for th in threading.enumerate(): for th in threading.enumerate():
@ -256,19 +254,15 @@ def brozzler_worker():
frontier, service_registry, max_browsers=int(args.max_browsers), frontier, service_registry, max_browsers=int(args.max_browsers),
chrome_exe=args.chrome_exe) chrome_exe=args.chrome_exe)
worker_thread = worker.start() worker.start()
try: try:
while worker_thread.is_alive(): while worker.is_alive():
time.sleep(0.5) time.sleep(0.5)
logging.critical("worker thread has died, shutting down") logging.critical("worker thread has died, shutting down")
except brozzler.ShutdownRequested as e: except brozzler.ShutdownRequested as e:
pass pass
finally: finally:
worker.shutdown_now() worker.shutdown_now()
for th in threading.enumerate():
if th != threading.current_thread():
th.join()
logging.info("brozzler-worker is all done, exiting") logging.info("brozzler-worker is all done, exiting")

View file

@ -94,13 +94,33 @@ class BrozzlerWorker:
HEARTBEAT_INTERVAL = 20.0 HEARTBEAT_INTERVAL = 20.0
def __init__(self, frontier, service_registry=None, max_browsers=1, chrome_exe="chromium-browser"): def __init__(
self, frontier, service_registry=None, max_browsers=1,
chrome_exe="chromium-browser", proxy=None,
enable_warcprox_features=False):
self._frontier = frontier self._frontier = frontier
self._service_registry = service_registry self._service_registry = service_registry
self._max_browsers = max_browsers self._max_browsers = max_browsers
# these two settings can be overridden by the job/site configuration
self.__proxy = proxy
self.__enable_warcprox_features = enable_warcprox_features
self._browser_pool = brozzler.browser.BrowserPool(max_browsers, self._browser_pool = brozzler.browser.BrowserPool(max_browsers,
chrome_exe=chrome_exe, ignore_cert_errors=True) chrome_exe=chrome_exe, ignore_cert_errors=True)
self._shutdown_requested = threading.Event() self._shutdown_requested = threading.Event()
self._thread = None
self._start_stop_lock = threading.Lock()
self._browsing_threads = set()
def _proxy(self, site):
return site.proxy or self.__proxy
def _enable_warcprox_features(self, site):
if site.enable_warcprox_features is not None:
return site.enable_warcprox_features
else:
return self.__enable_warcprox_features
def _youtube_dl(self, destdir, site): def _youtube_dl(self, destdir, site):
ydl_opts = { ydl_opts = {
@ -114,12 +134,12 @@ class BrozzlerWorker:
"nopart": True, "nopart": True,
"no_color": True, "no_color": True,
} }
if site.proxy: if self._proxy(site):
ydl_opts["proxy"] = "http://{}".format(site.proxy) ydl_opts["proxy"] = "http://{}".format(self._proxy(site))
## XXX (sometimes?) causes chrome debug websocket to go through ## XXX (sometimes?) causes chrome debug websocket to go through
## proxy. Maybe not needed thanks to hls_prefer_native. ## proxy. Maybe not needed thanks to hls_prefer_native.
## # see https://github.com/rg3/youtube-dl/issues/6087 ## # see https://github.com/rg3/youtube-dl/issues/6087
## os.environ["http_proxy"] = "http://{}".format(site.proxy) ## os.environ["http_proxy"] = "http://{}".format(self._proxy(site))
ydl = youtube_dl.YoutubeDL(ydl_opts) ydl = youtube_dl.YoutubeDL(ydl_opts)
if site.extra_headers(): if site.extra_headers():
ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers())) ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers()))
@ -158,13 +178,13 @@ class BrozzlerWorker:
try: try:
self.logger.info("trying youtube-dl on {}".format(page)) self.logger.info("trying youtube-dl on {}".format(page))
info = ydl.extract_info(page.url) info = ydl.extract_info(page.url)
if site.proxy and site.enable_warcprox_features: if self._proxy(site) and self._enable_warcprox_features(site):
info_json = json.dumps(info, sort_keys=True, indent=4) info_json = json.dumps(info, sort_keys=True, indent=4)
self.logger.info( self.logger.info(
"sending WARCPROX_WRITE_RECORD request to warcprox " "sending WARCPROX_WRITE_RECORD request to warcprox "
"with youtube-dl json for %s", page) "with youtube-dl json for %s", page)
self._warcprox_write_record( self._warcprox_write_record(
warcprox_address=site.proxy, warcprox_address=self._proxy(site),
url="youtube-dl:%s" % page.url, warc_type="metadata", url="youtube-dl:%s" % page.url, warc_type="metadata",
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8", content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
payload=info_json.encode("utf-8"), payload=info_json.encode("utf-8"),
@ -199,17 +219,17 @@ class BrozzlerWorker:
def _on_screenshot(screenshot_png): def _on_screenshot(screenshot_png):
if on_screenshot: if on_screenshot:
on_screenshot(screenshot_png) on_screenshot(screenshot_png)
elif site.proxy and site.enable_warcprox_features: elif self._proxy(site) and self._enable_warcprox_features(site):
self.logger.info("sending WARCPROX_WRITE_RECORD request " self.logger.info("sending WARCPROX_WRITE_RECORD request "
"to warcprox with screenshot for %s", page) "to warcprox with screenshot for %s", page)
screenshot_jpeg, thumbnail_jpeg = self.full_and_thumb_jpegs( screenshot_jpeg, thumbnail_jpeg = self.full_and_thumb_jpegs(
screenshot_png) screenshot_png)
self._warcprox_write_record(warcprox_address=site.proxy, self._warcprox_write_record(warcprox_address=self._proxy(site),
url="screenshot:{}".format(page.url), url="screenshot:{}".format(page.url),
warc_type="resource", content_type="image/jpeg", warc_type="resource", content_type="image/jpeg",
payload=screenshot_jpeg, payload=screenshot_jpeg,
extra_headers=site.extra_headers()) extra_headers=site.extra_headers())
self._warcprox_write_record(warcprox_address=site.proxy, self._warcprox_write_record(warcprox_address=self._proxy(site),
url="thumbnail:{}".format(page.url), url="thumbnail:{}".format(page.url),
warc_type="resource", content_type="image/jpeg", warc_type="resource", content_type="image/jpeg",
payload=thumbnail_jpeg, payload=thumbnail_jpeg,
@ -237,7 +257,7 @@ class BrozzlerWorker:
if self._needs_browsing(page, ydl_spy): if self._needs_browsing(page, ydl_spy):
self.logger.info('needs browsing: %s', page) self.logger.info('needs browsing: %s', page)
if not browser.is_running(): if not browser.is_running():
browser.start(proxy=site.proxy) browser.start(proxy=self._proxy(site))
outlinks = browser.browse_page( outlinks = browser.browse_page(
page.url, extra_headers=site.extra_headers(), page.url, extra_headers=site.extra_headers(),
on_screenshot=_on_screenshot, on_screenshot=_on_screenshot,
@ -253,10 +273,10 @@ class BrozzlerWorker:
def _fetch_url(self, site, page): def _fetch_url(self, site, page):
proxies = None proxies = None
if site.proxy: if self._proxy(site):
proxies = { proxies = {
'http': 'http://%s' % site.proxy, 'http': 'http://%s' % self._proxy(site),
'https': 'http://%s' % site.proxy, 'https': 'http://%s' % self._proxy(site),
} }
self.logger.info('fetching %s', page) self.logger.info('fetching %s', page)
@ -293,7 +313,8 @@ class BrozzlerWorker:
socket.gethostname(), browser.chrome_port)) socket.gethostname(), browser.chrome_port))
outlinks = self.brozzle_page(browser, site, page) outlinks = self.brozzle_page(browser, site, page)
self._frontier.completed_page(site, page) self._frontier.completed_page(site, page)
self._frontier.scope_and_schedule_outlinks(site, page, outlinks) self._frontier.scope_and_schedule_outlinks(
site, page, outlinks)
page = None page = None
except brozzler.NothingToClaim: except brozzler.NothingToClaim:
self.logger.info("no pages left for site %s", site) self.logger.info("no pages left for site %s", site)
@ -311,6 +332,7 @@ class BrozzlerWorker:
browser.stop() browser.stop()
self._frontier.disclaim_site(site, page) self._frontier.disclaim_site(site, page)
self._browser_pool.release(browser) self._browser_pool.release(browser)
self._browsing_threads.remove(threading.current_thread())
def _service_heartbeat(self): def _service_heartbeat(self):
if hasattr(self, "status_info"): if hasattr(self, "status_info"):
@ -357,6 +379,7 @@ class BrozzlerWorker:
name="BrowsingThread:{}-{}".format( name="BrowsingThread:{}-{}".format(
browser.chrome_port, site.seed)) browser.chrome_port, site.seed))
th.start() th.start()
self._browsing_threads.add(th)
except: except:
self._browser_pool.release(browser) self._browser_pool.release(browser)
raise raise
@ -371,7 +394,9 @@ class BrozzlerWorker:
latest_state = "no-unclaimed-sites" latest_state = "no-unclaimed-sites"
time.sleep(0.5) time.sleep(0.5)
except: except:
self.logger.critical("thread exiting due to unexpected exception", exc_info=True) self.logger.critical(
"thread exiting due to unexpected exception",
exc_info=True)
finally: finally:
if self._service_registry and hasattr(self, "status_info"): if self._service_registry and hasattr(self, "status_info"):
try: try:
@ -382,12 +407,25 @@ class BrozzlerWorker:
exc_info=True) exc_info=True)
def start(self): def start(self):
th = threading.Thread(target=self.run, name="BrozzlerWorker") with self._start_stop_lock:
th.start() if self._thread:
return th self.logger.warn(
'ignoring start request because self._thread is '
'not None')
return
self._thread = threading.Thread(
target=self.run, name="BrozzlerWorker")
self._thread.start()
def shutdown_now(self): def shutdown_now(self):
self.logger.info("brozzler worker shutting down") with self._start_stop_lock:
self._shutdown_requested.set() self.logger.info("brozzler worker shutting down")
self._browser_pool.shutdown_now() self._shutdown_requested.set()
self._browser_pool.shutdown_now()
while self._browsing_threads:
time.sleep(0.5)
self._thread = None
def is_alive(self):
return self._thread and self._thread.is_alive()

View file

@ -21,7 +21,7 @@ import setuptools
setuptools.setup( setuptools.setup(
name='brozzler', name='brozzler',
version='1.1b3.dev46', version='1.1b3.dev47',
description='Distributed web crawling with browsers', description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler', url='https://github.com/internetarchive/brozzler',
author='Noah Levitt', author='Noah Levitt',
@ -38,6 +38,7 @@ setuptools.setup(
'brozzler-worker=brozzler.cli:brozzler_worker', 'brozzler-worker=brozzler.cli:brozzler_worker',
'brozzler-ensure-tables=brozzler.cli:brozzler_ensure_tables', 'brozzler-ensure-tables=brozzler.cli:brozzler_ensure_tables',
'brozzler-webconsole=brozzler.webconsole:run', 'brozzler-webconsole=brozzler.webconsole:run',
'brozzler-easy=brozzler.easy:main',
], ],
}, },
install_requires=[ install_requires=[
@ -54,7 +55,7 @@ setuptools.setup(
], ],
extras_require={ extras_require={
'webconsole': ['flask>=0.11', 'gunicorn'], 'webconsole': ['flask>=0.11', 'gunicorn'],
# 'brozzler-easy': ['warcprox', 'pywb'], 'easy': ['warcprox', 'pywb'],
}, },
zip_safe=False, zip_safe=False,
classifiers=[ classifiers=[