diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 22a4a6e..8acee2d 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -1,21 +1,21 @@ -# -# brozzler/__init__.py - __init__.py for brozzler package, contains some common -# code -# -# Copyright (C) 2014-2016 Internet Archive -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# +""" +brozzler/__init__.py - __init__.py for brozzler package, contains some common +code + +Copyright (C) 2014-2016 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" import json as _json import logging as _logging diff --git a/brozzler/browser.py b/brozzler/browser.py index bde1376..1008fd7 100644 --- a/brozzler/browser.py +++ b/brozzler/browser.py @@ -35,6 +35,9 @@ import select import re import base64 import psutil +import signal +import string +import sqlite3 __all__ = ["BrowserPool", "Browser"] @@ -126,11 +129,23 @@ class Browser: def __exit__(self, *args): self.stop() - def start(self, proxy=None): + def start(self, proxy=None, cookie_db=None): if not self._chrome_instance: # these can raise exceptions self.chrome_port = self._find_available_port() self._work_dir = tempfile.TemporaryDirectory() + if cookie_db is not None: + cookie_dir = os.sep.join([self._work_dir.name, "chrome-user-data","Default"]) + cookie_location = os.sep.join([cookie_dir,"Cookies"]) + self.logger.debug("Cookie DB provided. Writing to: %s", cookie_location) + os.makedirs(cookie_dir, exist_ok=True) + + try: + with open(cookie_location,'wb') as cookie_file: + cookie_file.write(cookie_db) + except OSError: + self.logger.error("exception writing cookie file at: %s", cookie_location, exc_info=True) + self._chrome_instance = Chrome( port=self.chrome_port, executable=self.chrome_exe, user_home_dir=self._work_dir.name, @@ -159,6 +174,24 @@ class Browser: except: self.logger.error("problem stopping", exc_info=True) + def persist_and_read_cookie_db(self): + cookie_location = os.sep.join([self._work_dir.name, "chrome-user-data","Default","Cookies"]) + self.logger.debug("Saving Cookie DB from: %s", cookie_location) + try: + with sqlite3.connect(cookie_location) as conn: + cur = conn.cursor() + cur.execute("UPDATE cookies SET persistent = 1") + except sqlite3.Error: + self.logger.error("exception updating cookie DB", exc_info=True) + + cookie_db=None + try: + with open(cookie_location, "rb") as cookie_file: + cookie_db=cookie_file.read() + except OSError: + self.logger.error("exception reading from cookie DB file at: %s", cookie_location, exc_info=True) + return cookie_db + def _find_available_port(self): port_available = False port = self.chrome_port @@ -208,18 +241,23 @@ class Browser: self._waiting_on_screenshot_msg_id = None self._waiting_on_document_url_msg_id = None self._waiting_on_outlinks_msg_id = None + self._waiting_on_outlinks_start = None + self._waiting_on_outlinks_attempt = 0 self._outlinks = None self._reached_limit = None self._aw_snap_hes_dead_jim = None self._abort_browse_page = False self._has_screenshot = False - self._websock = websocket.WebSocketApp(self._websocket_url, - on_open=self._visit_page, on_message=self._wrap_handle_message) + self._websock = websocket.WebSocketApp( + self._websocket_url, on_open=self._visit_page, + on_message=self._wrap_handle_message) - threadName = "WebsockThread{}-{}".format(self.chrome_port, - ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)))) - websock_thread = threading.Thread(target=self._websock.run_forever, name=threadName, kwargs={'ping_timeout':0.5}) + threadName = "WebsockThread:%s-%s" % (self.chrome_port, ''.join( + random.choice(string.ascii_letters) for _ in range(6))) + websock_thread = threading.Thread( + target=self._websock.run_forever, name=threadName, + kwargs={'ping_timeout':0.5}) websock_thread.start() self._start = time.time() aborted = False @@ -235,27 +273,43 @@ class Browser: if self._post_behavior_interval_func(): return self._outlinks finally: - if self._websock and self._websock.sock and self._websock.sock.connected: + if (self._websock and self._websock.sock + and self._websock.sock.connected): try: self._websock.close() except BaseException as e: - self.logger.error("exception closing websocket {} - {}".format(self._websock, e)) + self.logger.error( + "exception closing websocket %s - %s" % ( + self._websock, e)) websock_thread.join(timeout=30) if websock_thread.is_alive(): - self.logger.error("{} still alive 30 seconds after closing {}, will forcefully nudge it again".format(websock_thread, self._websock)) + self.logger.error( + "%s still alive 30 seconds after closing %s, will " + "forcefully nudge it again" % ( + websock_thread, self._websock)) self._websock.keep_running = False websock_thread.join(timeout=30) if websock_thread.is_alive(): - self.logger.critical("{} still alive 60 seconds after closing {}".format(websock_thread, self._websock)) + self.logger.critical( + "%s still alive 60 seconds after closing %s" % ( + websock_thread, self._websock)) self._behavior = None def _post_behavior_interval_func(self): - """Called periodically after behavior is finished on the page. Returns - true when post-behavior tasks are finished.""" - if not self._has_screenshot and ( - not self._waiting_on_scroll_to_top_msg_id + """ + Called periodically after behavior is finished on the page. Returns + true when post-behavior tasks are finished. + """ + if (not self._websock or not self._websock.sock + or not self._websock.sock.connected): + raise BrowsingException( + "websocket closed, did chrome die? {}".format( + self._websocket_url)) + + if (not self._has_screenshot + and not self._waiting_on_scroll_to_top_msg_id and not self._waiting_on_screenshot_msg_id): if time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS: self.logger.info( @@ -274,7 +328,7 @@ class Browser: self._waiting_on_scroll_to_top_start = time.time() return False elif (self._waiting_on_scroll_to_top_msg_id - and time.time() - self._waiting_on_scroll_to_top_start > 30): + and time.time() - self._waiting_on_scroll_to_top_start > 30.0): # chromium bug? occasionally we get no scroll-to-top result message self.logger.warn( "timed out after %.1fs waiting for scroll-to-top result " @@ -294,12 +348,33 @@ class Browser: return True elif not self._waiting_on_outlinks_msg_id: self.logger.info("retrieving outlinks for %s", self.url) - self._waiting_on_outlinks_msg_id = self.send_to_chrome( - method="Runtime.evaluate", - params={"expression": self.OUTLINKS_JS}) + self._request_outlinks() return False else: # self._waiting_on_outlinks_msg_id - return False + if time.time() - self._waiting_on_outlinks_start > 30.0: + if self._waiting_on_outlinks_attempt < 5: + self.logger.warn( + "timed out after %.1fs on attempt %s to retrieve " + "outlinks, trying again", + time.time() - self._waiting_on_outlinks_start, + self._waiting_on_outlinks_attempt) + self._request_outlinks() + return False + else: + raise BrowsingException( + "timed out after %.1fs on (final) attempt %s " + "to retrieve outlinks" % ( + time.time() - self._waiting_on_outlinks_start, + self._waiting_on_outlinks_attempt)) + else: # just waiting for outlinks + return False + + def _request_outlinks(self): + self._waiting_on_outlinks_msg_id = self.send_to_chrome( + method="Runtime.evaluate", + params={"expression": self.OUTLINKS_JS}) + self._waiting_on_outlinks_attempt += 1 + self._waiting_on_outlinks_start = time.time() OUTLINKS_JS = """ var compileOutlinks = function(frame) { @@ -318,10 +393,14 @@ compileOutlinks(window).join(' '); def _browse_interval_func(self): """Called periodically while page is being browsed. Returns True when finished browsing.""" - if not self._websock or not self._websock.sock or not self._websock.sock.connected: - raise BrowsingException("websocket closed, did chrome die? {}".format(self._websocket_url)) + if (not self._websock or not self._websock.sock + or not self._websock.sock.connected): + raise BrowsingException( + "websocket closed, did chrome die? {}".format( + self._websocket_url)) elif self._aw_snap_hes_dead_jim: - raise BrowsingException("""chrome tab went "aw snap" or "he's dead jim"!""") + raise BrowsingException( + """chrome tab went "aw snap" or "he's dead jim"!""") elif (self._behavior != None and self._behavior.is_finished() or time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS): return True @@ -393,7 +472,7 @@ compileOutlinks(window).join(' '); def _page_load_event_fired(self, message): self.logger.info("Page.loadEventFired, moving on to starting behaviors url={}".format(self.url)) self._behavior = Behavior(self.url, self) - self._behavior.start() + self._behavior.start(self.behavior_parameters) self._waiting_on_document_url_msg_id = self.send_to_chrome(method="Runtime.evaluate", params={"expression":"document.URL"}) @@ -420,7 +499,7 @@ compileOutlinks(window).join(' '); self.on_screenshot(base64.b64decode(message["result"]["data"])) self._waiting_on_screenshot_msg_id = None self._has_screenshot = True - self.logger.info("got screenshot, moving on to getting outlinks url={}".format(self.url)) + self.logger.info("got screenshot, moving on to getting outlinks") elif message["id"] == self._waiting_on_scroll_to_top_msg_id: self._waiting_on_scroll_to_top_msg_id = None self._waiting_on_scroll_to_top_start = None @@ -473,15 +552,19 @@ class Chrome: self.ignore_cert_errors = ignore_cert_errors self._shutdown = threading.Event() - # returns websocket url to chrome window with about:blank loaded def __enter__(self): + ''' + Returns websocket url to chrome window with about:blank loaded. + ''' return self.start() def __exit__(self, *args): self.stop() - # returns websocket url to chrome window with about:blank loaded def start(self): + ''' + Returns websocket url to chrome window with about:blank loaded. + ''' timeout_sec = 600 new_env = os.environ.copy() new_env["HOME"] = self.user_home_dir @@ -502,9 +585,10 @@ class Chrome: chrome_args.append("--proxy-server={}".format(self.proxy)) chrome_args.append("about:blank") self.logger.info("running: {}".format(" ".join(chrome_args))) + # start_new_session - new process group so we can kill the whole group self.chrome_process = subprocess.Popen(chrome_args, env=new_env, - start_new_session=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, bufsize=0) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, + start_new_session=True) self._out_reader_thread = threading.Thread(target=self._read_stderr_stdout, name="ChromeOutReaderThread(pid={})".format(self.chrome_process.pid)) self._out_reader_thread.start() @@ -540,7 +624,9 @@ class Chrome: # XXX select doesn't work on windows def readline_nonblock(f): buf = b"" - while not self._shutdown.is_set() and (len(buf) == 0 or buf[-1] != 0xa) and select.select([f],[],[],0.5)[0]: + while not self._shutdown.is_set() and ( + len(buf) == 0 or buf[-1] != 0xa) and select.select( + [f],[],[],0.5)[0]: buf += f.read(1) return buf @@ -548,17 +634,33 @@ class Chrome: while not self._shutdown.is_set(): buf = readline_nonblock(self.chrome_process.stdout) if buf: - if re.search(b"Xlib: extension|CERT_PKIXVerifyCert for [^ ]* failed|^ALSA lib|ERROR:gl_surface_glx.cc|ERROR:gpu_child_thread.cc", buf): - logging.debug("chrome pid %s STDERR %s", self.chrome_process.pid, buf) + if re.search( + b"Xlib: extension|" + b"CERT_PKIXVerifyCert for [^ ]* failed|" + b"^ALSA lib|ERROR:gl_surface_glx.cc|" + b"ERROR:gpu_child_thread.cc", buf): + logging.log( + brozzler.TRACE, "chrome pid %s STDOUT %s", + self.chrome_process.pid, buf) else: - logging.warn("chrome pid %s STDERR %s", self.chrome_process.pid, buf) + logging.info( + "chrome pid %s STDOUT %s", + self.chrome_process.pid, buf) buf = readline_nonblock(self.chrome_process.stderr) if buf: - if re.search(b"Xlib: extension|CERT_PKIXVerifyCert for [^ ]* failed|^ALSA lib|ERROR:gl_surface_glx.cc|ERROR:gpu_child_thread.cc", buf): - logging.debug("chrome pid %s STDERR %s", self.chrome_process.pid, buf) + if re.search( + b"Xlib: extension|" + b"CERT_PKIXVerifyCert for [^ ]* failed|" + b"^ALSA lib|ERROR:gl_surface_glx.cc|" + b"ERROR:gpu_child_thread.cc", buf): + logging.log( + brozzler.TRACE, "chrome pid %s STDOUT %s", + self.chrome_process.pid, buf) else: - logging.warn("chrome pid %s STDERR %s", self.chrome_process.pid, buf) + logging.info( + "chrome pid %s STDERR %s", + self.chrome_process.pid, buf) except: logging.error("unexpected exception", exc_info=True) @@ -568,10 +670,10 @@ class Chrome: timeout_sec = 300 self._shutdown.set() - self.logger.info("terminating chrome pid {}".format(self.chrome_process.pid)) + self.logger.info("terminating chrome pgid %s" % self.chrome_process.pid) - self.chrome_process.terminate() - first_sigterm = last_sigterm = time.time() + os.killpg(self.chrome_process.pid, signal.SIGTERM) + first_sigterm = time.time() try: while time.time() - first_sigterm < timeout_sec: @@ -580,21 +682,29 @@ class Chrome: status = self.chrome_process.poll() if status is not None: if status == 0: - self.logger.info("chrome pid {} exited normally".format(self.chrome_process.pid, status)) + self.logger.info( + "chrome pid %s exited normally", + self.chrome_process.pid) else: - self.logger.warn("chrome pid {} exited with nonzero status {}".format(self.chrome_process.pid, status)) + self.logger.warn( + "chrome pid %s exited with nonzero status %s", + self.chrome_process.pid, status) + + # XXX I would like to forcefully kill the process group + # here to guarantee no orphaned chromium subprocesses hang + # around, but there's a chance I suppose that some other + # process could have started with the same pgid return - # sometimes a hung chrome process will terminate on repeated sigterms - if time.time() - last_sigterm > 10: - self.chrome_process.terminate() - last_sigterm = time.time() - - self.logger.warn("chrome pid {} still alive {} seconds after sending SIGTERM, sending SIGKILL".format(self.chrome_process.pid, timeout_sec)) - self.chrome_process.kill() + self.logger.warn( + "chrome pid %s still alive %.1f seconds after sending " + "SIGTERM, sending SIGKILL", self.chrome_process.pid, + time.time() - first_sigterm) + os.killpg(self.chrome_process.pid, signal.SIGKILL) status = self.chrome_process.wait() - self.logger.warn("chrome pid {} reaped (status={}) after killing with SIGKILL".format(self.chrome_process.pid, status)) + self.logger.warn( + "chrome pid %s reaped (status=%s) after killing with " + "SIGKILL", self.chrome_process.pid, status) finally: self._out_reader_thread.join() self.chrome_process = None - diff --git a/brozzler/cli.py b/brozzler/cli.py index 9d4cca4..7d7e7a7 100644 --- a/brozzler/cli.py +++ b/brozzler/cli.py @@ -24,7 +24,6 @@ import datetime import json import logging import os -import pprint import re import requests import rethinkstuff @@ -36,14 +35,21 @@ import time import traceback import warnings import yaml +import shutil def _add_common_options(arg_parser): + arg_parser.add_argument( + '-q', '--quiet', dest='log_level', + action='store_const', default=logging.INFO, const=logging.WARN) arg_parser.add_argument( '-v', '--verbose', dest='log_level', action='store_const', default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument( '--trace', dest='log_level', action='store_const', default=logging.INFO, const=brozzler.TRACE) + # arg_parser.add_argument( + # '-s', '--silent', dest='log_level', action='store_const', + # default=logging.INFO, const=logging.CRITICAL) arg_parser.add_argument( '--version', action='version', version='brozzler %s - %s' % ( @@ -80,6 +86,26 @@ def _configure_logging(args): warnings.simplefilter( 'ignore', category=requests.packages.urllib3.exceptions.InsecurePlatformWarning) +def suggest_default_chome_exe(): + # mac os x application executable paths + for path in [ + '/Applications/Chromium.app/Contents/MacOS/Chromium', + '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']: + if os.path.exists(path): + return path + + # "chromium-browser" is the executable on ubuntu trusty + # https://github.com/internetarchive/brozzler/pull/6/files uses "chromium" + # google chrome executable names taken from these packages: + # http://www.ubuntuupdates.org/ppa/google_chrome + for exe in [ + 'chromium-browser', 'chromium', 'google-chrome', + 'google-chrome-stable', 'google-chrome-beta', + 'google-chrome-unstable']: + if shutil.which(exe): + return exe + return 'chromium-browser' + def brozzle_page(): ''' Command line utility entry point for brozzling a single page. Opens url in @@ -91,7 +117,8 @@ def brozzle_page(): formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('url', metavar='URL', help='page url') arg_parser.add_argument( - '-e', '--executable', dest='chrome_exe', default='chromium-browser', + '-e', '--chrome-exe', dest='chrome_exe', + default=suggest_default_chome_exe(), help='executable to use to invoke chrome') arg_parser.add_argument( '--proxy', dest='proxy', default=None, @@ -157,10 +184,9 @@ def brozzler_new_job(): frontier = brozzler.RethinkDbFrontier(r) brozzler.job.new_job_file(frontier, args.job_conf_file) - def brozzler_new_site(): ''' - Command line utility entry point for queuing a new brozzler site. + Command line utility entry point for queuing a new brozzler site. Takes a seed url and creates a site and page object in rethinkdb, which brozzler-workers will look at and start crawling. ''' @@ -206,13 +232,13 @@ def brozzler_worker(): Main entrypoint for brozzler, gets sites and pages to brozzle from rethinkdb, brozzles them. ''' - arg_parser = argparse.ArgumentParser( prog=os.path.basename(__file__), formatter_class=argparse.ArgumentDefaultsHelpFormatter) _add_rethinkdb_options(arg_parser) arg_parser.add_argument( - '-e', '--executable', dest='chrome_exe', default='chromium-browser', + '-e', '--chrome-exe', dest='chrome_exe', + default=suggest_default_chome_exe(), help='executable to use to invoke chrome') arg_parser.add_argument( '-n', '--max-browsers', dest='max_browsers', default='1', @@ -227,42 +253,77 @@ def brozzler_worker(): def sigint(signum, frame): raise brozzler.ShutdownRequested('shutdown requested (caught SIGINT)') - def dump_state(signum, frame): - pp = pprint.PrettyPrinter(indent=4) - state_strs = [] + # do not print in signal handler to avoid RuntimeError: reentrant call + state_dump_msgs = [] + def queue_state_dump(signum, frame): + signal.signal(signal.SIGQUIT, signal.SIG_IGN) + try: + state_strs = [] + frames = sys._current_frames() + threads = {th.ident: th for th in threading.enumerate()} + for ident in frames: + if threads[ident]: + state_strs.append(str(threads[ident])) + else: + state_strs.append('' % ident) + stack = traceback.format_stack(frames[ident]) + state_strs.append(''.join(stack)) + state_dump_msgs.append( + 'dumping state (caught signal %s)\n%s' % ( + signum, '\n'.join(state_strs))) + except BaseException as e: + state_dump_msgs.append('exception dumping state: %s' % e) + finally: + signal.signal(signal.SIGQUIT, queue_state_dump) - for th in threading.enumerate(): - state_strs.append(str(th)) - stack = traceback.format_stack(sys._current_frames()[th.ident]) - state_strs.append("".join(stack)) - - logging.warn("dumping state (caught signal {})\n{}".format( - signum, "\n".join(state_strs))) - - signal.signal(signal.SIGQUIT, dump_state) + signal.signal(signal.SIGQUIT, queue_state_dump) signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) r = rethinkstuff.Rethinker( - args.rethinkdb_servers.split(","), args.rethinkdb_db) + args.rethinkdb_servers.split(','), args.rethinkdb_db) frontier = brozzler.RethinkDbFrontier(r) service_registry = rethinkstuff.ServiceRegistry(r) worker = brozzler.worker.BrozzlerWorker( frontier, service_registry, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe) - worker_thread = worker.start() - + worker.start() try: - while worker_thread.is_alive(): + while worker.is_alive(): + while state_dump_msgs: + logging.warn(state_dump_msgs.pop(0)) time.sleep(0.5) - logging.critical("worker thread has died, shutting down") + logging.critical('worker thread has died, shutting down') except brozzler.ShutdownRequested as e: pass finally: worker.shutdown_now() - for th in threading.enumerate(): - if th != threading.current_thread(): - th.join() - logging.info("brozzler-worker is all done, exiting") + logging.info('brozzler-worker is all done, exiting') + +def brozzler_ensure_tables(): + ''' + Creates rethinkdb tables if they don't already exist. Brozzler + (brozzler-worker, brozzler-new-job, etc) normally creates the tables it + needs on demand at startup, but if multiple instances are starting up at + the same time, you can end up with duplicate broken tables. So it's a good + idea to use this utility at an early step when spinning up a cluster. + ''' + arg_parser = argparse.ArgumentParser( + prog=os.path.basename(sys.argv[0]), + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + _add_rethinkdb_options(arg_parser) + _add_common_options(arg_parser) + + args = arg_parser.parse_args(args=sys.argv[1:]) + _configure_logging(args) + + r = rethinkstuff.Rethinker( + args.rethinkdb_servers.split(','), args.rethinkdb_db) + + # services table + rethinkstuff.ServiceRegistry(r) + + # sites, pages, jobs tables + brozzler.frontier.RethinkDbFrontier(r) diff --git a/brozzler/easy.py b/brozzler/easy.py new file mode 100644 index 0000000..2f5f92a --- /dev/null +++ b/brozzler/easy.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python +''' +brozzler-easy - brozzler-worker, warcprox, and pywb all working together in a +single process + +Copyright (C) 2016 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import sys +import logging +try: + import warcprox + import warcprox.main + import pywb + import brozzler.pywb + import wsgiref.simple_server + import wsgiref.handlers + import six.moves.socketserver +except ImportError as e: + logging.critical( + '%s: %s\n\nYou might need to run "pip install ' + 'brozzler[easy]".\nSee README.rst for more information.', + type(e).__name__, e) + sys.exit(1) +import argparse +import brozzler +import brozzler.cli +import os +import socket +import signal +import threading +import time +import rethinkstuff +import traceback + +def _build_arg_parser(prog=os.path.basename(sys.argv[0])): + arg_parser = argparse.ArgumentParser( + prog=prog, formatter_class=argparse.ArgumentDefaultsHelpFormatter, + description=( + 'brozzler-easy - easy deployment of brozzler, with ' + 'brozzler-worker, warcprox, and pywb all running in a single ' + 'process')) + + # === common args === + arg_parser.add_argument( + '--rethinkdb-servers', dest='rethinkdb_servers', + default='localhost', help=( + 'rethinkdb servers, e.g. ' + 'db0.foo.org,db0.foo.org:38015,db1.foo.org')) + arg_parser.add_argument( + '--rethinkdb-db', dest='rethinkdb_db', default='brozzler', + help='rethinkdb database name') + arg_parser.add_argument( + '-d', '--warcs-dir', dest='warcs_dir', default='./warcs', + help='where to write warcs') + + # === warcprox args === + arg_parser.add_argument( + '-c', '--cacert', dest='cacert', + default='./%s-warcprox-ca.pem' % socket.gethostname(), + help=( + 'warcprox CA certificate file; if file does not exist, it ' + 'will be created')) + arg_parser.add_argument( + '--certs-dir', dest='certs_dir', + default='./%s-warcprox-ca' % socket.gethostname(), + help='where warcprox will store and load generated certificates') + arg_parser.add_argument( + '--onion-tor-socks-proxy', dest='onion_tor_socks_proxy', + default=None, help=( + 'host:port of tor socks proxy, used only to connect to ' + '.onion sites')) + + # === brozzler-worker args === + arg_parser.add_argument( + '-e', '--chrome-exe', dest='chrome_exe', + default=brozzler.cli.suggest_default_chome_exe(), + help='executable to use to invoke chrome') + arg_parser.add_argument( + '-n', '--max-browsers', dest='max_browsers', default='1', + help='max number of chrome instances simultaneously browsing pages') + + # === pywb args === + arg_parser.add_argument( + '--pywb-port', dest='pywb_port', type=int, default=8091, + help='pywb wayback port') + + # === common at the bottom args === + arg_parser.add_argument( + '-v', '--verbose', dest='verbose', action='store_true') + arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true') + # arg_parser.add_argument( + # '-s', '--silent', dest='log_level', action='store_const', + # default=logging.INFO, const=logging.CRITICAL) + arg_parser.add_argument( + '--version', action='version', + version='brozzler %s - %s' % (brozzler.__version__, prog)) + + return arg_parser + +class BrozzlerEasyController: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, args): + self.stop = threading.Event() + self.args = args + self.warcprox_controller = warcprox.main.init_controller( + self._warcprox_args(args)) + self.brozzler_worker = self._init_brozzler_worker(args) + self.pywb_httpd = self._init_pywb(args) + + def _init_brozzler_worker(self, args): + r = rethinkstuff.Rethinker( + args.rethinkdb_servers.split(","), args.rethinkdb_db) + frontier = brozzler.RethinkDbFrontier(r) + service_registry = rethinkstuff.ServiceRegistry(r) + worker = brozzler.worker.BrozzlerWorker( + frontier, service_registry, + max_browsers=int(args.max_browsers), + chrome_exe=args.chrome_exe, + proxy='%s:%s' % self.warcprox_controller.proxy.server_address, + enable_warcprox_features=True) + return worker + + def _init_pywb(self, args): + brozzler.pywb.TheGoodUrlCanonicalizer.replace_default_canonicalizer() + brozzler.pywb.TheGoodUrlCanonicalizer.monkey_patch_dsrules_init() + brozzler.pywb.support_in_progress_warcs() + + if args.warcs_dir.endswith('/'): + warcs_dir = args.warcs_dir + else: + warcs_dir = args.warcs_dir + '/' + + conf = { + 'collections': { + 'brozzler': { + 'index_paths': brozzler.pywb.RethinkCDXSource( + servers=args.rethinkdb_servers.split(","), + db=args.rethinkdb_db, table='captures') + }, + }, + # 'enable_http_proxy': True, + # 'enable_memento': True, + 'archive_paths': warcs_dir, + 'enable_cdx_api': True, + 'framed_replay': True, + 'port': args.pywb_port, + 'enable_auto_colls': False, + } + wsgi_app = pywb.framework.wsgi_wrappers.init_app( + pywb.webapp.pywb_init.create_wb_router, config=conf, + load_yaml=False) + + # disable is_hop_by_hop restrictions + wsgiref.handlers.is_hop_by_hop = lambda x: False + class ThreadingWSGIServer( + six.moves.socketserver.ThreadingMixIn, + wsgiref.simple_server.WSGIServer): + pass + return wsgiref.simple_server.make_server( + '', args.pywb_port, wsgi_app, ThreadingWSGIServer) + + def start(self): + self.logger.info('starting warcprox') + self.warcprox_controller.start() + + # XXX wait til fully started? + self.logger.info('starting brozzler-worker') + self.brozzler_worker.start() + + self.logger.info( + 'starting pywb at %s:%s', *self.pywb_httpd.server_address) + threading.Thread(target=self.pywb_httpd.serve_forever).start() + + def shutdown(self): + self.logger.info('shutting down brozzler-worker') + self.brozzler_worker.shutdown_now() + # brozzler-worker is fully shut down at this point + + self.logger.info('shutting down pywb') + self.pywb_httpd.shutdown() + + self.logger.info('shutting down warcprox') + self.warcprox_controller.shutdown() + + def wait_for_shutdown_request(self): + try: + while not self.stop.is_set(): + time.sleep(0.5) + finally: + self.shutdown() + + def _warcprox_args(self, args): + ''' + Takes args as produced by the argument parser built by + _build_arg_parser and builds warcprox arguments object suitable to pass + to warcprox.main.init_controller. Copies some arguments, renames some, + populates some with defaults appropriate for brozzler-easy, etc. + ''' + warcprox_args = argparse.Namespace() + warcprox_args.address = 'localhost' + # let the OS choose an available port; discover it later using + # sock.getsockname()[1] + warcprox_args.port = 0 + warcprox_args.cacert = args.cacert + warcprox_args.certs_dir = args.certs_dir + warcprox_args.directory = args.warcs_dir + warcprox_args.gzip = True + warcprox_args.prefix = 'brozzler' + warcprox_args.size = 1000 * 1000* 1000 + warcprox_args.rollover_idle_time = 3 * 60 + warcprox_args.digest_algorithm = 'sha1' + warcprox_args.base32 = True + warcprox_args.stats_db_file = None + warcprox_args.playback_port = None + warcprox_args.playback_index_db_file = None + warcprox_args.rethinkdb_servers = args.rethinkdb_servers + warcprox_args.rethinkdb_db = args.rethinkdb_db + warcprox_args.rethinkdb_big_table = True + warcprox_args.kafka_broker_list = None + warcprox_args.kafka_capture_feed_topic = None + warcprox_args.queue_size = 500 + warcprox_args.max_threads = None + warcprox_args.profile = False + warcprox_args.onion_tor_socks_proxy = args.onion_tor_socks_proxy + return warcprox_args + + def dump_state(self, signum=None, frame=None): + state_strs = [] + for th in threading.enumerate(): + state_strs.append(str(th)) + stack = traceback.format_stack(sys._current_frames()[th.ident]) + state_strs.append(''.join(stack)) + logging.warn('dumping state (caught signal {})\n{}'.format( + signum, '\n'.join(state_strs))) + +def main(): + arg_parser = _build_arg_parser() + args = arg_parser.parse_args(args=sys.argv[1:]) + if args.verbose: + loglevel = logging.DEBUG + elif args.quiet: + loglevel = logging.WARNING + else: + loglevel = logging.INFO + + logging.basicConfig( + level=loglevel, stream=sys.stderr, format=( + '%(asctime)s %(process)d %(levelname)s %(threadName)s ' + '%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')) + + controller = BrozzlerEasyController(args) + signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set()) + signal.signal(signal.SIGINT, lambda a,b: controller.stop.set()) + signal.signal(signal.SIGQUIT, controller.dump_state) + controller.start() + controller.wait_for_shutdown_request() diff --git a/brozzler/frontier.py b/brozzler/frontier.py index c6185b8..afa0858 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -292,6 +292,8 @@ class RethinkDbFrontier: self.update_page(page) def scope_and_schedule_outlinks(self, site, parent_page, outlinks): + if site.remember_outlinks: + parent_page.outlinks = {"accepted":[],"blocked":[],"rejected":[]} counts = {"added":0,"updated":0,"rejected":0,"blocked":0} for url in outlinks or []: u = brozzler.site.Url(url) @@ -314,10 +316,19 @@ class RethinkDbFrontier: else: self.new_page(new_child_page) counts["added"] += 1 + if site.remember_outlinks: + parent_page.outlinks["accepted"].append(url) else: counts["blocked"] += 1 + if site.remember_outlinks: + parent_page.outlinks["blocked"].append(url) else: counts["rejected"] += 1 + if site.remember_outlinks: + parent_page.outlinks["rejected"].append(url) + + if site.remember_outlinks: + self.update_page(parent_page) self.logger.info( "%s new links added, %s existing links updated, %s links " diff --git a/brozzler/job.py b/brozzler/job.py index fb3d720..bfaef4d 100644 --- a/brozzler/job.py +++ b/brozzler/job.py @@ -54,8 +54,8 @@ def new_job(frontier, job_conf): merged_conf = merge(seed_conf, job_conf) # XXX check for unknown settings, invalid url, etc - site = brozzler.Site(job_id=job.id, - seed=merged_conf["url"], + site = brozzler.Site( + job_id=job.id, seed=merged_conf["url"], scope=merged_conf.get("scope"), time_limit=merged_conf.get("time_limit"), proxy=merged_conf.get("proxy"), @@ -63,7 +63,8 @@ def new_job(frontier, job_conf): enable_warcprox_features=merged_conf.get( "enable_warcprox_features"), warcprox_meta=merged_conf.get("warcprox_meta"), - metadata=merged_conf.get("metadata")) + metadata=merged_conf.get("metadata"), + remember_outlinks=merged_conf.get("remember_outlinks")) sites.append(site) # insert all the sites into database before the job diff --git a/brozzler/pywb.py b/brozzler/pywb.py new file mode 100644 index 0000000..8f1ece8 --- /dev/null +++ b/brozzler/pywb.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python +''' +brozzler/pywb.py - pywb support for rethinkdb index + +Copyright (C) 2016 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import sys +import logging +try: + import pywb.apps.cli + import pywb.cdx.cdxdomainspecific + import pywb.cdx.cdxobject + import pywb.cdx.cdxserver + import pywb.webapp.query_handler +except ImportError as e: + logging.critical( + '%s: %s\n\nYou might need to run "pip install ' + 'brozzler[easy]".\nSee README.rst for more information.', + type(e).__name__, e) + sys.exit(1) +import rethinkstuff +import rethinkdb +import surt +import json + +class RethinkCDXSource(pywb.cdx.cdxsource.CDXSource): + def __init__(self, servers, db, table): + self.servers = servers + self.db = db + self.table = table + + @property + def r(self): + try: + return self._r + except AttributeError: + self._r = rethinkstuff.Rethinker(self.servers, self.db) + return self._r + + def load_cdx(self, cdx_query): + # logging.debug('vars(cdx_query)=%s', vars(cdx_query)) + rethink_results = self._query_rethinkdb(cdx_query) + return self._gen_cdx_lines(rethink_results) + + def _gen_cdx_lines(self, rethink_results): + for record in rethink_results: + # XXX inefficient, it gets parsed later, figure out how to + # short-circuit this step and create the CDXObject directly + blob = { + 'url': record['url'], + 'mime': record['content_type'], + 'status': str(record['response_code']), + 'digest': record['sha1base32'], + 'length': str(record['length']), # XXX is this the right length? + 'offset': str(record['offset']), + 'filename': record['filename'], + } + # b'org,archive)/ 20160427215530 {"url": "https://archive.org/", "mime": "text/html", "status": "200", "digest": "VILUFXZD232SLUA6XROZQIMEVUPW6EIE", "length": "16001", "offset": "90144", "filename": "ARCHIVEIT-261-ONE_TIME-JOB209607-20160427215508135-00000.warc.gz"}' + cdx_line = '{} {:%Y%m%d%H%M%S} {}'.format( + record['canon_surt'], record['timestamp'], + json.dumps(blob)) + yield cdx_line.encode('utf-8') + + def _query_rethinkdb(self, cdx_query): + start_key = cdx_query.key.decode('utf-8') + end_key = cdx_query.end_key.decode('utf-8') + reql = self.r.table(self.table).between( + [start_key[:150], rethinkdb.minval], + [end_key[:150]+'!', rethinkdb.maxval], + index='abbr_canon_surt_timestamp') + reql = reql.order_by(index='abbr_canon_surt_timestamp') + + # filters have to come after order_by apparently + + # TODO support for POST, etc + # http_method='WARCPROX_WRITE_RECORD' for screenshots, thumbnails + reql = reql.filter( + lambda capture: rethinkdb.expr( + ['WARCPROX_WRITE_RECORD','GET']).contains( + capture['http_method'])) + reql = reql.filter( + lambda capture: (capture['canon_surt'] >= start_key) + & (capture['canon_surt'] < end_key)) + + if cdx_query.limit: + reql = reql.limit(cdx_query.limit) + + logging.debug('rethinkdb query: %s', reql) + results = reql.run() + return results + +class TheGoodUrlCanonicalizer(object): + ''' + Replacement for pywb.utils.canonicalize.UrlCanonicalizer that produces + surts with scheme and with trailing comma, and does not "massage" + www.foo.org into foo.org. + ''' + def __init__(self, surt_ordered=True): + '''We are always surt ordered (surt_ordered param is ignored)''' + self.surt_ordered = True + + def __call__(self, url): + try: + key = surt.surt( + url, trailing_comma=True, host_massage=False, + with_scheme=True) + # logging.debug('%s -> %s', url, key) + return key + except Exception as e: + raise pywb.utils.canonicalize.UrlCanonicalizeException( + 'Invalid Url: ' + url) + + def replace_default_canonicalizer(): + '''Replace parent class of CustomUrlCanonicalizer with this class.''' + pywb.cdx.cdxdomainspecific.CustomUrlCanonicalizer.__bases__ = ( + TheGoodUrlCanonicalizer,) + + def good_surts_from_default(default_surt): + ''' + Takes a standard surt without scheme and without trailing comma, and + returns a list of "good" surts that together match the same set of + urls. For example: + + good_surts_from_default('com,example)/path') + + returns + + ['http://(com,example,)/path', + 'https://(com,example,)/path', + 'http://(com,example,www,)/path', + 'https://(com,example,www,)/path'] + + ''' + if default_surt == '': + return [''] + + parts = default_surt.split(')', 1) + if len(parts) == 2: + orig_host_part, path_part = parts + good_surts = [ + 'http://(%s,)%s' % (orig_host_part, path_part), + 'https://(%s,)%s' % (orig_host_part, path_part), + 'http://(%s,www,)%s' % (orig_host_part, path_part), + 'https://(%s,www,)%s' % (orig_host_part, path_part), + ] + else: # no path part + host_part = parts[0] + good_surts = [ + 'http://(%s' % host_part, + 'https://(%s' % host_part, + ] + return good_surts + + def monkey_patch_dsrules_init(): + orig_init = pywb.cdx.cdxdomainspecific.CDXDomainSpecificRule.__init__ + def cdx_dsrule_init(self, url_prefix, rules): + orig_init(self, url_prefix, rules) + good_surts = [] + for url_prefix in self.url_prefix: + good_surts.extend( + TheGoodUrlCanonicalizer.good_surts_from_default( + url_prefix)) + self.url_prefix = good_surts + pywb.cdx.cdxdomainspecific.CDXDomainSpecificRule.__init__ = cdx_dsrule_init + +def support_in_progress_warcs(): + ''' + Monkey-patch pywb.warc.pathresolvers.PrefixResolver to include warcs still + being written to (warcs having ".open" suffix). This way if a cdx entry + references foo.warc.gz, pywb will try both foo.warc.gz and + foo.warc.gz.open. + ''' + _orig_prefix_resolver_call = pywb.warc.pathresolvers.PrefixResolver.__call__ + def _prefix_resolver_call(self, filename, cdx=None): + raw_results = _orig_prefix_resolver_call(self, filename, cdx) + results = [] + for warc_path in raw_results: + results.append(warc_path) + results.append('%s.open' % warc_path) + return results + pywb.warc.pathresolvers.PrefixResolver.__call__ = _prefix_resolver_call diff --git a/brozzler/site.py b/brozzler/site.py index 53593de..cb75d5e 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -91,7 +91,7 @@ class Site(brozzler.BaseDictable): enable_warcprox_features=False, reached_limit=None, status="ACTIVE", claimed=False, start_time=None, last_disclaimed=_EPOCH_UTC, last_claimed_by=None, - last_claimed=_EPOCH_UTC, metadata={}): + last_claimed=_EPOCH_UTC, metadata={}, remember_outlinks=None, cookie_db=None): self.seed = seed self.id = id @@ -109,6 +109,8 @@ class Site(brozzler.BaseDictable): self.last_disclaimed = last_disclaimed self.last_claimed = last_claimed self.metadata = metadata + self.remember_outlinks = remember_outlinks + self.cookie_db = cookie_db self.scope = scope or {} if not "surt" in self.scope: @@ -218,7 +220,8 @@ class Page(brozzler.BaseDictable): def __init__( self, url, id=None, site_id=None, job_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0, - via_page_id=None, last_claimed_by=None, hops_off_surt=0): + via_page_id=None, last_claimed_by=None, hops_off_surt=0, + outlinks=None): self.site_id = site_id self.job_id = job_id self.url = url @@ -229,6 +232,7 @@ class Page(brozzler.BaseDictable): self.brozzle_count = brozzle_count self.via_page_id = via_page_id self.hops_off_surt = hops_off_surt + self.outlinks = outlinks self._canon_hurl = None if priority is not None: diff --git a/brozzler/webconsole/__init__.py b/brozzler/webconsole/__init__.py index db0f78b..43f306c 100644 --- a/brozzler/webconsole/__init__.py +++ b/brozzler/webconsole/__init__.py @@ -48,7 +48,6 @@ app = flask.Flask(__name__) gunicorn_error_logger = logging.getLogger('gunicorn.error') app.logger.handlers.extend(gunicorn_error_logger.handlers) app.logger.setLevel(logging.INFO) -app.logger.info('will this show in the log?') # configure with environment variables SETTINGS = { @@ -99,18 +98,39 @@ def pages(site_id): app.logger.info("flask.request.args=%s", flask.request.args) start = int(flask.request.args.get("start", 0)) end = int(flask.request.args.get("end", start + 90)) - app.logger.info("yes new query") pages_ = r.table("pages").between( [site_id, 1, r.minval], [site_id, r.maxval, r.maxval], index="least_hops").order_by(index="least_hops")[start:end].run() return flask.jsonify(pages=list(pages_)) +@app.route("/api/pages/") +@app.route("/api/page/") +def page(page_id): + page_ = r.table("pages").get(page_id).run() + return flask.jsonify(page_) + +@app.route("/api/pages//yaml") +@app.route("/api/page//yaml") +def page_yaml(page_id): + page_ = r.table("pages").get(page_id).run() + return app.response_class( + yaml.dump(page_, default_flow_style=False), + mimetype='application/yaml') + @app.route("/api/sites/") @app.route("/api/site/") def site(site_id): site_ = r.table("sites").get(site_id).run() return flask.jsonify(site_) +@app.route("/api/sites//yaml") +@app.route("/api/site//yaml") +def site_yaml(site_id): + site_ = r.table("sites").get(site_id).run() + return app.response_class( + yaml.dump(site_, default_flow_style=False), + mimetype='application/yaml') + @app.route("/api/stats/") def stats(bucket): stats_ = r.table("stats").get(bucket).run() diff --git a/brozzler/webconsole/static/js/app.js b/brozzler/webconsole/static/js/app.js index ebd6a34..d30fad7 100644 --- a/brozzler/webconsole/static/js/app.js +++ b/brozzler/webconsole/static/js/app.js @@ -208,6 +208,9 @@ brozzlerControllers.controller("SiteController", ["$scope", "$routeParams", "$ht loadSiteStats($http, $scope.site); // console.log("site = ", $scope.site); }); + $http.get("/api/site/" + $routeParams.id + "/yaml").success(function(data) { + $scope.site_yaml = data; + }); loadMorePages(); }]); diff --git a/brozzler/webconsole/static/partials/site.html b/brozzler/webconsole/static/partials/site.html index 63b7dee..e6257de 100644 --- a/brozzler/webconsole/static/partials/site.html +++ b/brozzler/webconsole/static/partials/site.html @@ -11,7 +11,12 @@
-

Site {{site.seed}} (Job {{site.job_id}})

+

+ + Site {{site.seed}} (Job {{site.job_id}}) +

+
{{site_yaml}}
diff --git a/brozzler/worker.py b/brozzler/worker.py index c1415b7..0d777e3 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -94,13 +94,33 @@ class BrozzlerWorker: HEARTBEAT_INTERVAL = 20.0 - def __init__(self, frontier, service_registry=None, max_browsers=1, chrome_exe="chromium-browser"): + def __init__( + self, frontier, service_registry=None, max_browsers=1, + chrome_exe="chromium-browser", proxy=None, + enable_warcprox_features=False): self._frontier = frontier self._service_registry = service_registry self._max_browsers = max_browsers + + # these two settings can be overridden by the job/site configuration + self.__proxy = proxy + self.__enable_warcprox_features = enable_warcprox_features + self._browser_pool = brozzler.browser.BrowserPool(max_browsers, chrome_exe=chrome_exe, ignore_cert_errors=True) self._shutdown_requested = threading.Event() + self._thread = None + self._start_stop_lock = threading.Lock() + self._browsing_threads = set() + + def _proxy(self, site): + return site.proxy or self.__proxy + + def _enable_warcprox_features(self, site): + if site.enable_warcprox_features is not None: + return site.enable_warcprox_features + else: + return self.__enable_warcprox_features def _youtube_dl(self, destdir, site): ydl_opts = { @@ -114,12 +134,12 @@ class BrozzlerWorker: "nopart": True, "no_color": True, } - if site.proxy: - ydl_opts["proxy"] = "http://{}".format(site.proxy) + if self._proxy(site): + ydl_opts["proxy"] = "http://{}".format(self._proxy(site)) ## XXX (sometimes?) causes chrome debug websocket to go through ## proxy. Maybe not needed thanks to hls_prefer_native. ## # see https://github.com/rg3/youtube-dl/issues/6087 - ## os.environ["http_proxy"] = "http://{}".format(site.proxy) + ## os.environ["http_proxy"] = "http://{}".format(self._proxy(site)) ydl = youtube_dl.YoutubeDL(ydl_opts) if site.extra_headers(): ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers())) @@ -158,13 +178,13 @@ class BrozzlerWorker: try: self.logger.info("trying youtube-dl on {}".format(page)) info = ydl.extract_info(page.url) - if site.proxy and site.enable_warcprox_features: + if self._proxy(site) and self._enable_warcprox_features(site): info_json = json.dumps(info, sort_keys=True, indent=4) self.logger.info( "sending WARCPROX_WRITE_RECORD request to warcprox " "with youtube-dl json for %s", page) self._warcprox_write_record( - warcprox_address=site.proxy, + warcprox_address=self._proxy(site), url="youtube-dl:%s" % page.url, warc_type="metadata", content_type="application/vnd.youtube-dl_formats+json;charset=utf-8", payload=info_json.encode("utf-8"), @@ -199,17 +219,17 @@ class BrozzlerWorker: def _on_screenshot(screenshot_png): if on_screenshot: on_screenshot(screenshot_png) - elif site.proxy and site.enable_warcprox_features: + elif self._proxy(site) and self._enable_warcprox_features(site): self.logger.info("sending WARCPROX_WRITE_RECORD request " "to warcprox with screenshot for %s", page) screenshot_jpeg, thumbnail_jpeg = self.full_and_thumb_jpegs( screenshot_png) - self._warcprox_write_record(warcprox_address=site.proxy, + self._warcprox_write_record(warcprox_address=self._proxy(site), url="screenshot:{}".format(page.url), warc_type="resource", content_type="image/jpeg", payload=screenshot_jpeg, extra_headers=site.extra_headers()) - self._warcprox_write_record(warcprox_address=site.proxy, + self._warcprox_write_record(warcprox_address=self._proxy(site), url="thumbnail:{}".format(page.url), warc_type="resource", content_type="image/jpeg", payload=thumbnail_jpeg, @@ -237,7 +257,7 @@ class BrozzlerWorker: if self._needs_browsing(page, ydl_spy): self.logger.info('needs browsing: %s', page) if not browser.is_running(): - browser.start(proxy=site.proxy) + browser.start(proxy=self._proxy(site), cookie_db=site.cookie_db) outlinks = browser.browse_page( page.url, extra_headers=site.extra_headers(), on_screenshot=_on_screenshot, @@ -253,10 +273,10 @@ class BrozzlerWorker: def _fetch_url(self, site, page): proxies = None - if site.proxy: + if self._proxy(site): proxies = { - 'http': 'http://%s' % site.proxy, - 'https': 'http://%s' % site.proxy, + 'http': 'http://%s' % self._proxy(site), + 'https': 'http://%s' % self._proxy(site), } self.logger.info('fetching %s', page) @@ -292,8 +312,10 @@ class BrozzlerWorker: page = self._frontier.claim_page(site, "%s:%s" % ( socket.gethostname(), browser.chrome_port)) outlinks = self.brozzle_page(browser, site, page) + site.cookie_db=browser.persist_and_read_cookie_db() self._frontier.completed_page(site, page) - self._frontier.scope_and_schedule_outlinks(site, page, outlinks) + self._frontier.scope_and_schedule_outlinks( + site, page, outlinks) page = None except brozzler.NothingToClaim: self.logger.info("no pages left for site %s", site) @@ -311,6 +333,7 @@ class BrozzlerWorker: browser.stop() self._frontier.disclaim_site(site, page) self._browser_pool.release(browser) + self._browsing_threads.remove(threading.current_thread()) def _service_heartbeat(self): if hasattr(self, "status_info"): @@ -354,9 +377,9 @@ class BrozzlerWorker: th = threading.Thread( target=lambda: self._brozzle_site( browser, site), - name="BrowsingThread:{}-{}".format( - browser.chrome_port, site.seed)) + name="BrozzlingThread:%s" % site.seed) th.start() + self._browsing_threads.add(th) except: self._browser_pool.release(browser) raise @@ -371,7 +394,9 @@ class BrozzlerWorker: latest_state = "no-unclaimed-sites" time.sleep(0.5) except: - self.logger.critical("thread exiting due to unexpected exception", exc_info=True) + self.logger.critical( + "thread exiting due to unexpected exception", + exc_info=True) finally: if self._service_registry and hasattr(self, "status_info"): try: @@ -382,12 +407,25 @@ class BrozzlerWorker: exc_info=True) def start(self): - th = threading.Thread(target=self.run, name="BrozzlerWorker") - th.start() - return th + with self._start_stop_lock: + if self._thread: + self.logger.warn( + 'ignoring start request because self._thread is ' + 'not None') + return + self._thread = threading.Thread( + target=self.run, name="BrozzlerWorker") + self._thread.start() def shutdown_now(self): - self.logger.info("brozzler worker shutting down") - self._shutdown_requested.set() - self._browser_pool.shutdown_now() + with self._start_stop_lock: + self.logger.info("brozzler worker shutting down") + self._shutdown_requested.set() + self._browser_pool.shutdown_now() + while self._browsing_threads: + time.sleep(0.5) + self._thread = None + + def is_alive(self): + return self._thread and self._thread.is_alive() diff --git a/setup.py b/setup.py index 93d3cfb..d7081ca 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ import setuptools setuptools.setup( name='brozzler', - version='1.1.dev38', + version='1.1b3.dev58', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', @@ -36,7 +36,9 @@ setuptools.setup( 'brozzler-new-job=brozzler.cli:brozzler_new_job', 'brozzler-new-site=brozzler.cli:brozzler_new_site', 'brozzler-worker=brozzler.cli:brozzler_worker', + 'brozzler-ensure-tables=brozzler.cli:brozzler_ensure_tables', 'brozzler-webconsole=brozzler.webconsole:run', + 'brozzler-easy=brozzler.easy:main', ], }, install_requires=[ @@ -53,7 +55,7 @@ setuptools.setup( ], extras_require={ 'webconsole': ['flask>=0.11', 'gunicorn'], - # 'brozzler-easy': ['warcprox', 'pywb'], + 'easy': ['warcprox>=2.0b1', 'pywb'], }, zip_safe=False, classifiers=[ diff --git a/vagrant/README.rst b/vagrant/README.rst new file mode 100644 index 0000000..f546da8 --- /dev/null +++ b/vagrant/README.rst @@ -0,0 +1,61 @@ +Single-VM Vagrant Brozzler Deployment +------------------------------------- + +This is a work in progress. Vagrant + ansible configuration for a single-vm +deployment of brozzler and warcprox with dependencies (notably rethinkdb). + +The idea is for this to be a quick way for people to get up and running with a +deployment resembling a real distributed deployment, and to offer a starting +configuration for people to adapt to their clusters. + +And equally important, as a harness for integration tests. (As of now brozzler +itself has no automated tests!) + +You'll need vagrant installed. +https://www.vagrantup.com/docs/installation/ +Then run: + +:: + + my-laptop$ vagrant up + +Currently to start a crawl you first need to ssh to the vagrant vm and activate +the brozzler virtualenv. + +:: + + my-laptop$ vagrant ssh + vagrant@brozzler-easy:~$ source ~/brozzler-ve34/bin/activate + (brozzler-ve34)vagrant@brozzler-easy:~$ + +Then you can run brozzler-new-site: + +:: + + (brozzler-ve34)vagrant@brozzler-easy:~$ brozzler-new-site \ + --proxy=localhost:8000 --enable-warcprox-features \ + http://example.com/ + + +Or brozzler-new-job (make sure to set the proxy to localhost:8000): + +:: + + (brozzler-ve34)vagrant@brozzler-easy:~$ cat >job1.yml + id: job1 + proxy: localhost:8000 # point at warcprox for archiving + enable_warcprox_features: true + seeds: + - url: https://example.org/ + (brozzler-ve34)vagrant@brozzler-easy:~$ brozzler-new-job job1.yml + +WARC files will appear in ./warcs and brozzler, warcprox and rethinkdb logs in +./logs (via vagrant folders syncing). + +You can also look at the rethinkdb console by opening http://localhost:8080 in +your browser after opening an ssh tunnel like so: + +:: + + my-laptop$ vagrant ssh -- -fN -Llocalhost:8080:localhost:8080 + diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile new file mode 100644 index 0000000..820c7fa --- /dev/null +++ b/vagrant/Vagrantfile @@ -0,0 +1,16 @@ +Vagrant.configure(2) do |config| + config.vm.box = "ubuntu/trusty64" + config.vm.hostname = "brozzler-easy" + + config.vm.synced_folder "..", "/brozzler" + + config.vm.provision "ansible" do |ansible| + ansible.playbook = "ansible/playbook.yml" + ansible.groups = { + "rethinkdb" => ["default"], + "warcprox" => ["default"], + "brozzler-worker" => ["default"], + # "brozzler-webconsole" => ["default"], + } + end +end diff --git a/vagrant/ansible/playbook.yml b/vagrant/ansible/playbook.yml new file mode 100644 index 0000000..f22f327 --- /dev/null +++ b/vagrant/ansible/playbook.yml @@ -0,0 +1,30 @@ +--- +- name: apply common configuration to all nodes + hosts: all + roles: + - common + +- name: deploy rethinkdb + hosts: rethinkdb + roles: + - rethinkdb + +- name: deploy warcprox + hosts: warcprox + roles: + - warcprox + +- name: deploy brozzler-worker + hosts: brozzler-worker + roles: + - brozzler-worker + +# - name: deploy brozzler-webconsole +# hosts: brozzler-webconsole +# roles: +# - brozzler-webconsole + +# - name: deploy pywb +# hosts: pywb +# roles: +# - pywb diff --git a/vagrant/ansible/roles/brozzler-webconsole/tasks/main.yml b/vagrant/ansible/roles/brozzler-webconsole/tasks/main.yml new file mode 100644 index 0000000..6b54696 --- /dev/null +++ b/vagrant/ansible/roles/brozzler-webconsole/tasks/main.yml @@ -0,0 +1,19 @@ +--- +- name: git clone https://github.com/internetarchive/brozzler.git + git: repo=https://github.com/internetarchive/brozzler.git + dest=/home/vagrant/brozzler +- name: pip install -r requirements.txt in virtualenv + pip: requirements=/home/vagrant/brozzler/webconsole/requirements.txt + virtualenv=/home/vagrant/brozzler-webconsole-ve34 + virtualenv_python=python3.4 + extra_args='--no-input --upgrade --pre' + notify: + - restart brozzler-webconsole +- name: install upstart config /etc/init/brozzler-webconsole.conf + become: true + template: src=templates/brozzler-webconsole.conf.j2 + dest=/etc/init/brozzler-webconsole.conf + notify: + - restart brozzler-webconsole + + diff --git a/vagrant/ansible/roles/brozzler-webconsole/templates/brozzler-webconsole.conf.j2 b/vagrant/ansible/roles/brozzler-webconsole/templates/brozzler-webconsole.conf.j2 new file mode 100644 index 0000000..efe2d03 --- /dev/null +++ b/vagrant/ansible/roles/brozzler-webconsole/templates/brozzler-webconsole.conf.j2 @@ -0,0 +1,21 @@ +description "brozzler-webconsole" + +start on runlevel [2345] +stop on runlevel [!2345] + +env PYTHONPATH=/home/vagrant/brozzler-webconsole-ve34/lib/python3.4/site-packages:/home/vagrant/brozzler/webconsole +env PATH=/home/vagrant/brozzler-webconsole-ve34/bin:/usr/bin:/bin +env LC_ALL=C.UTF-8 + +env WAYBACK_BASEURL={{base_wayback_url}}/all +# env RETHINKDB_SERVERS={{groups['rethinkdb'] | join(',')}} +env RETHINKDB_SERVERS=localhost +env RETHINKDB_DB={{rethinkdb_db}} + +setuid vagrant + +# console log + +exec gunicorn --bind=0.0.0.0:8081 brozzler-webconsole:app >&/vagrant/logs/brozzler-webconsole.log + + diff --git a/vagrant/ansible/roles/brozzler-worker/handlers/main.yml b/vagrant/ansible/roles/brozzler-worker/handlers/main.yml new file mode 100644 index 0000000..1fac304 --- /dev/null +++ b/vagrant/ansible/roles/brozzler-worker/handlers/main.yml @@ -0,0 +1,13 @@ +--- +- name: restart Xvnc + service: name=Xvnc state=restarted + become: true +- name: restart websockify + service: name=websockify state=restarted + become: true +- name: restart vnc-websock + service: name=vnc-websock state=restarted + become: true +- name: restart brozzler-worker + service: name=brozzler-worker state=restarted + become: true diff --git a/vagrant/ansible/roles/brozzler-worker/tasks/main.yml b/vagrant/ansible/roles/brozzler-worker/tasks/main.yml new file mode 100644 index 0000000..a4ec194 --- /dev/null +++ b/vagrant/ansible/roles/brozzler-worker/tasks/main.yml @@ -0,0 +1,64 @@ +--- +- name: ensure canonical partner repo is in apt sources.list + apt_repository: repo='deb http://archive.canonical.com/ubuntu trusty partner' + state=present + become: true +- name: ensure required packages are installed + become: true + apt: name={{item}} state=present + with_items: + - python-virtualenv + - vnc4server + - chromium-browser + - xfonts-base + - fonts-arphic-bkai00mp + - fonts-arphic-bsmi00lp + - fonts-arphic-gbsn00lp + - fonts-arphic-gkai00mp + - fonts-arphic-ukai + - fonts-farsiweb + - fonts-nafees + - fonts-sil-abyssinica + - fonts-sil-ezra + - fonts-sil-padauk + - fonts-unfonts-extra + - fonts-unfonts-core + - ttf-indic-fonts + - fonts-thai-tlwg + - fonts-lklug-sinhala + - python3-pip + - git + - libjpeg-turbo8-dev + - zlib1g-dev + - gcc + - libpython3.4-dev + - adobe-flashplugin +- name: install Xvnc upstart config /etc/init/Xvnc.conf + template: src=templates/Xvnc.conf.j2 dest=/etc/init/Xvnc.conf + become: true + notify: + - restart Xvnc +- name: install websockify in virtualenv + pip: name=git+https://github.com/kanaka/websockify.git#egg=websockify + virtualenv=/home/vagrant/websockify-ve34 + virtualenv_python=python3.4 + extra_args='--no-input --upgrade --pre' +- name: install vnc-websock upstart config /etc/init/vnc-websock.conf + template: src=templates/vnc-websock.conf.j2 dest=/etc/init/vnc-websock.conf + become: true + notify: + - restart vnc-websock +- name: install brozzler in virtualenv + become: true + pip: # name=git+https://github.com/internetarchive/brozzler.git#egg=brozzler + name='-e /brozzler' + virtualenv=/home/vagrant/brozzler-ve34 + virtualenv_python=python3.4 + extra_args='--no-input --upgrade --pre' + notify: + - restart brozzler-worker +- name: install brozzler-worker upstart config /etc/init/brozzler-worker.conf + template: src=templates/brozzler-worker.conf.j2 dest=/etc/init/brozzler-worker.conf + become: true + notify: + - restart brozzler-worker diff --git a/vagrant/ansible/roles/brozzler-worker/templates/Xvnc.conf.j2 b/vagrant/ansible/roles/brozzler-worker/templates/Xvnc.conf.j2 new file mode 100644 index 0000000..6381143 --- /dev/null +++ b/vagrant/ansible/roles/brozzler-worker/templates/Xvnc.conf.j2 @@ -0,0 +1,14 @@ +description "Xvnc" + +start on runlevel [2345] +stop on runlevel [!2345] + +setuid vagrant + +console log + +exec nice Xvnc4 :1 -auth /tmp/Xauthority.vagrant \ + -geometry 1600x1000 -depth 24 -rfbwait 0 -nolisten tcp -rfbport 5901 \ + -SecurityTypes None -pn -fp /usr/share/fonts/X11/misc/ -co /etc/X11/rgb \ + AcceptCutText=0 AcceptPointerEvents=0 AcceptKeyEvents=0 + diff --git a/vagrant/ansible/roles/brozzler-worker/templates/brozzler-worker.conf.j2 b/vagrant/ansible/roles/brozzler-worker/templates/brozzler-worker.conf.j2 new file mode 100644 index 0000000..4ec328a --- /dev/null +++ b/vagrant/ansible/roles/brozzler-worker/templates/brozzler-worker.conf.j2 @@ -0,0 +1,25 @@ +description "brozzler-worker" + +start on runlevel [2345] +stop on runlevel [!2345] + +env DISPLAY=:1 +env PATH=/home/vagrant/brozzler-ve34/bin:/usr/bin:/bin +env PYTHONPATH=/home/vagrant/brozzler-ve34/lib/python3.4/site-packages +env LANG=C.UTF-8 + +setuid vagrant + +# console log + +# depends on vnc server +start on started Xvnc +stop on stopping Xvnc + +kill timeout 60 + +exec nice brozzler-worker \ + --rethinkdb-servers=localhost \ + --max-browsers=4 >>/vagrant/logs/brozzler-worker.log 2>&1 + # --rethinkdb-servers={{groups['rethinkdb'] | join(',')}} \ + diff --git a/vagrant/ansible/roles/brozzler-worker/templates/vnc-websock.conf.j2 b/vagrant/ansible/roles/brozzler-worker/templates/vnc-websock.conf.j2 new file mode 100644 index 0000000..86b4012 --- /dev/null +++ b/vagrant/ansible/roles/brozzler-worker/templates/vnc-websock.conf.j2 @@ -0,0 +1,14 @@ +description "vnc-websock" + +start on runlevel [2345] +stop on runlevel [!2345] + +setuid vagrant + +console log + +env PYTHONPATH=/home/vagrant/websockify-ve34/lib/python3.4/site-packages +env PATH=/home/vagrant/websockify-ve34/bin:/usr/bin:/bin + +exec nice websockify 0.0.0.0:8901 localhost:5901 + diff --git a/vagrant/ansible/roles/common/tasks/main.yml b/vagrant/ansible/roles/common/tasks/main.yml new file mode 100644 index 0000000..f9012ca --- /dev/null +++ b/vagrant/ansible/roles/common/tasks/main.yml @@ -0,0 +1,4 @@ +--- +- name: ensure logs directory exists + file: path=/vagrant/logs state=directory + become: true diff --git a/vagrant/ansible/roles/rethinkdb/handlers/main.yml b/vagrant/ansible/roles/rethinkdb/handlers/main.yml new file mode 100644 index 0000000..512fae0 --- /dev/null +++ b/vagrant/ansible/roles/rethinkdb/handlers/main.yml @@ -0,0 +1,4 @@ +--- +- name: restart rethinkdb + service: name=rethinkdb state=restarted + become: true diff --git a/vagrant/ansible/roles/rethinkdb/tasks/main.yml b/vagrant/ansible/roles/rethinkdb/tasks/main.yml new file mode 100644 index 0000000..77bbb89 --- /dev/null +++ b/vagrant/ansible/roles/rethinkdb/tasks/main.yml @@ -0,0 +1,21 @@ +--- +- name: ensure rethinkdb apt public key is trusted + apt_key: url=http://download.rethinkdb.com/apt/pubkey.gpg + become: true +- name: ensure rethinkdb repo is in apt sources.list + apt_repository: repo='deb http://download.rethinkdb.com/apt trusty main' + state=present + become: true +- name: ensure rethinkdb package is installed + apt: name=rethinkdb state=present + become: true + notify: + - restart rethinkdb +- name: ensure rethinkdb starts on reboot + service: name=rethinkdb enabled=yes +- name: ensure rethinkdb instance config file is installed + template: src=templates/rethinkdb-brozzler-easy.conf.j2 + dest=/etc/rethinkdb/instances.d/rethinkdb-brozzler-easy.conf + become: true + notify: + - restart rethinkdb diff --git a/vagrant/ansible/roles/rethinkdb/templates/rethinkdb-brozzler-easy.conf.j2 b/vagrant/ansible/roles/rethinkdb/templates/rethinkdb-brozzler-easy.conf.j2 new file mode 100644 index 0000000..bbb1099 --- /dev/null +++ b/vagrant/ansible/roles/rethinkdb/templates/rethinkdb-brozzler-easy.conf.j2 @@ -0,0 +1,5 @@ +runuser=vagrant +# bind=0.0.0.0 +# directory=/var/lib/rethinkdb +# log-file=/var/log/rethinkdb.log +log-file=/vagrant/logs/rethinkdb.log # synced dir diff --git a/vagrant/ansible/roles/warcprox/handlers/main.yml b/vagrant/ansible/roles/warcprox/handlers/main.yml new file mode 100644 index 0000000..698d871 --- /dev/null +++ b/vagrant/ansible/roles/warcprox/handlers/main.yml @@ -0,0 +1,14 @@ +--- +# - name: start warcprox +# environment: +# PYTHONPATH: /home/vagrant/warcprox-ve34/lib/python3.4/site-packages +# PATH: /home/vagrant/warcprox-ve34/bin:/usr/bin:/bin +# args: +# executable: /bin/bash +# shell: nice warcprox --dir=/vagrant/warcs --base32 --gzip +# --rollover-idle-time=180 --cacert=/vagrant/warcprox-ca.pem +# --onion-tor-socks-proxy=localhost:9050 --rethinkdb-servers=localhost +# --rethinkdb-big-table &> /vagrant/logs/warcprox.out & +- name: restart warcprox + service: name=warcprox state=restarted + become: true diff --git a/vagrant/ansible/roles/warcprox/tasks/main.yml b/vagrant/ansible/roles/warcprox/tasks/main.yml new file mode 100644 index 0000000..c9f611d --- /dev/null +++ b/vagrant/ansible/roles/warcprox/tasks/main.yml @@ -0,0 +1,25 @@ +--- +- name: ensure required packages are installed + become: true + apt: name={{item}} state=present + with_items: + - gcc + - python-virtualenv + - python3.4 + - libpython3.4-dev + - libffi-dev + - libssl-dev + - tor + - git +- name: install warcprox in virtualenv + pip: name=git+https://github.com/internetarchive/warcprox.git@2.x#egg=warcprox + virtualenv=/home/vagrant/warcprox-ve34 + virtualenv_python=python3.4 + extra_args='--no-input --upgrade --pre' + notify: + - restart warcprox +- name: install upstart config /etc/init/warcprox.conf + become: true + template: src=templates/warcprox.conf.j2 dest=/etc/init/warcprox.conf + notify: + - restart warcprox diff --git a/vagrant/ansible/roles/warcprox/templates/warcprox.conf.j2 b/vagrant/ansible/roles/warcprox/templates/warcprox.conf.j2 new file mode 100644 index 0000000..1afccce --- /dev/null +++ b/vagrant/ansible/roles/warcprox/templates/warcprox.conf.j2 @@ -0,0 +1,26 @@ +description "warcprox" + +start on runlevel [2345] +stop on runlevel [!2345] + +env PYTHONPATH=/home/vagrant/warcprox-ve34/lib/python3.4/site-packages +env PATH=/home/vagrant/warcprox-ve34/bin:/usr/bin:/bin + +# by default warcprox creates some files/dirs relative to cwd +chdir /home/vagrant +setuid vagrant + +# console log + +# --profile +exec nice warcprox \ + --dir=/vagrant/warcs \ + --base32 \ + --gzip \ + --rollover-idle-time=180 \ + --cacert=/vagrant/warcprox-ca.pem \ + --onion-tor-socks-proxy=localhost:9050 \ + --rethinkdb-servers=localhost \ + --rethinkdb-db=brozzler \ + --rethinkdb-big-table >>/vagrant/logs/warcprox.log 2>&1 + # --rethinkdb-servers={{groups['rethinkdb'] | join(',')}} \