diff --git a/brozzler/cli.py b/brozzler/cli.py index 76a33a4..5500179 100644 --- a/brozzler/cli.py +++ b/brozzler/cli.py @@ -24,7 +24,6 @@ import datetime import json import logging import os -import pprint import re import requests import rethinkstuff @@ -233,7 +232,6 @@ def brozzler_worker(): raise brozzler.ShutdownRequested('shutdown requested (caught SIGINT)') def dump_state(signum, frame): - pp = pprint.PrettyPrinter(indent=4) state_strs = [] for th in threading.enumerate(): @@ -256,19 +254,15 @@ def brozzler_worker(): frontier, service_registry, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe) - worker_thread = worker.start() - + worker.start() try: - while worker_thread.is_alive(): + while worker.is_alive(): time.sleep(0.5) logging.critical("worker thread has died, shutting down") except brozzler.ShutdownRequested as e: pass finally: worker.shutdown_now() - for th in threading.enumerate(): - if th != threading.current_thread(): - th.join() logging.info("brozzler-worker is all done, exiting") diff --git a/brozzler/worker.py b/brozzler/worker.py index c1415b7..84dd0f7 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -94,13 +94,33 @@ class BrozzlerWorker: HEARTBEAT_INTERVAL = 20.0 - def __init__(self, frontier, service_registry=None, max_browsers=1, chrome_exe="chromium-browser"): + def __init__( + self, frontier, service_registry=None, max_browsers=1, + chrome_exe="chromium-browser", proxy=None, + enable_warcprox_features=False): self._frontier = frontier self._service_registry = service_registry self._max_browsers = max_browsers + + # these two settings can be overridden by the job/site configuration + self.__proxy = proxy + self.__enable_warcprox_features = enable_warcprox_features + self._browser_pool = brozzler.browser.BrowserPool(max_browsers, chrome_exe=chrome_exe, ignore_cert_errors=True) self._shutdown_requested = threading.Event() + self._thread = None + self._start_stop_lock = threading.Lock() + self._browsing_threads = set() + + def _proxy(self, site): + return site.proxy or self.__proxy + + def _enable_warcprox_features(self, site): + if site.enable_warcprox_features is not None: + return site.enable_warcprox_features + else: + return self.__enable_warcprox_features def _youtube_dl(self, destdir, site): ydl_opts = { @@ -114,12 +134,12 @@ class BrozzlerWorker: "nopart": True, "no_color": True, } - if site.proxy: - ydl_opts["proxy"] = "http://{}".format(site.proxy) + if self._proxy(site): + ydl_opts["proxy"] = "http://{}".format(self._proxy(site)) ## XXX (sometimes?) causes chrome debug websocket to go through ## proxy. Maybe not needed thanks to hls_prefer_native. ## # see https://github.com/rg3/youtube-dl/issues/6087 - ## os.environ["http_proxy"] = "http://{}".format(site.proxy) + ## os.environ["http_proxy"] = "http://{}".format(self._proxy(site)) ydl = youtube_dl.YoutubeDL(ydl_opts) if site.extra_headers(): ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers())) @@ -158,13 +178,13 @@ class BrozzlerWorker: try: self.logger.info("trying youtube-dl on {}".format(page)) info = ydl.extract_info(page.url) - if site.proxy and site.enable_warcprox_features: + if self._proxy(site) and self._enable_warcprox_features(site): info_json = json.dumps(info, sort_keys=True, indent=4) self.logger.info( "sending WARCPROX_WRITE_RECORD request to warcprox " "with youtube-dl json for %s", page) self._warcprox_write_record( - warcprox_address=site.proxy, + warcprox_address=self._proxy(site), url="youtube-dl:%s" % page.url, warc_type="metadata", content_type="application/vnd.youtube-dl_formats+json;charset=utf-8", payload=info_json.encode("utf-8"), @@ -199,17 +219,17 @@ class BrozzlerWorker: def _on_screenshot(screenshot_png): if on_screenshot: on_screenshot(screenshot_png) - elif site.proxy and site.enable_warcprox_features: + elif self._proxy(site) and self._enable_warcprox_features(site): self.logger.info("sending WARCPROX_WRITE_RECORD request " "to warcprox with screenshot for %s", page) screenshot_jpeg, thumbnail_jpeg = self.full_and_thumb_jpegs( screenshot_png) - self._warcprox_write_record(warcprox_address=site.proxy, + self._warcprox_write_record(warcprox_address=self._proxy(site), url="screenshot:{}".format(page.url), warc_type="resource", content_type="image/jpeg", payload=screenshot_jpeg, extra_headers=site.extra_headers()) - self._warcprox_write_record(warcprox_address=site.proxy, + self._warcprox_write_record(warcprox_address=self._proxy(site), url="thumbnail:{}".format(page.url), warc_type="resource", content_type="image/jpeg", payload=thumbnail_jpeg, @@ -237,7 +257,7 @@ class BrozzlerWorker: if self._needs_browsing(page, ydl_spy): self.logger.info('needs browsing: %s', page) if not browser.is_running(): - browser.start(proxy=site.proxy) + browser.start(proxy=self._proxy(site)) outlinks = browser.browse_page( page.url, extra_headers=site.extra_headers(), on_screenshot=_on_screenshot, @@ -253,10 +273,10 @@ class BrozzlerWorker: def _fetch_url(self, site, page): proxies = None - if site.proxy: + if self._proxy(site): proxies = { - 'http': 'http://%s' % site.proxy, - 'https': 'http://%s' % site.proxy, + 'http': 'http://%s' % self._proxy(site), + 'https': 'http://%s' % self._proxy(site), } self.logger.info('fetching %s', page) @@ -293,7 +313,8 @@ class BrozzlerWorker: socket.gethostname(), browser.chrome_port)) outlinks = self.brozzle_page(browser, site, page) self._frontier.completed_page(site, page) - self._frontier.scope_and_schedule_outlinks(site, page, outlinks) + self._frontier.scope_and_schedule_outlinks( + site, page, outlinks) page = None except brozzler.NothingToClaim: self.logger.info("no pages left for site %s", site) @@ -311,6 +332,7 @@ class BrozzlerWorker: browser.stop() self._frontier.disclaim_site(site, page) self._browser_pool.release(browser) + self._browsing_threads.remove(threading.current_thread()) def _service_heartbeat(self): if hasattr(self, "status_info"): @@ -357,6 +379,7 @@ class BrozzlerWorker: name="BrowsingThread:{}-{}".format( browser.chrome_port, site.seed)) th.start() + self._browsing_threads.add(th) except: self._browser_pool.release(browser) raise @@ -371,7 +394,9 @@ class BrozzlerWorker: latest_state = "no-unclaimed-sites" time.sleep(0.5) except: - self.logger.critical("thread exiting due to unexpected exception", exc_info=True) + self.logger.critical( + "thread exiting due to unexpected exception", + exc_info=True) finally: if self._service_registry and hasattr(self, "status_info"): try: @@ -382,12 +407,25 @@ class BrozzlerWorker: exc_info=True) def start(self): - th = threading.Thread(target=self.run, name="BrozzlerWorker") - th.start() - return th + with self._start_stop_lock: + if self._thread: + self.logger.warn( + 'ignoring start request because self._thread is ' + 'not None') + return + self._thread = threading.Thread( + target=self.run, name="BrozzlerWorker") + self._thread.start() def shutdown_now(self): - self.logger.info("brozzler worker shutting down") - self._shutdown_requested.set() - self._browser_pool.shutdown_now() + with self._start_stop_lock: + self.logger.info("brozzler worker shutting down") + self._shutdown_requested.set() + self._browser_pool.shutdown_now() + while self._browsing_threads: + time.sleep(0.5) + self._thread = None + + def is_alive(self): + return self._thread and self._thread.is_alive() diff --git a/setup.py b/setup.py index 360cd91..dd66f27 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ import setuptools setuptools.setup( name='brozzler', - version='1.1b3.dev46', + version='1.1b3.dev47', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', @@ -38,6 +38,7 @@ setuptools.setup( 'brozzler-worker=brozzler.cli:brozzler_worker', 'brozzler-ensure-tables=brozzler.cli:brozzler_ensure_tables', 'brozzler-webconsole=brozzler.webconsole:run', + 'brozzler-easy=brozzler.easy:main', ], }, install_requires=[ @@ -54,7 +55,7 @@ setuptools.setup( ], extras_require={ 'webconsole': ['flask>=0.11', 'gunicorn'], - # 'brozzler-easy': ['warcprox', 'pywb'], + 'easy': ['warcprox', 'pywb'], }, zip_safe=False, classifiers=[