Merge branch 'master' into qa

* master:
  pass behavior template parameters on to behavior - fixes umbra's ability to log in with parameters received from amqp
  Changing EnvironmentError to OSError
  Fix naming conventions.
  Create cookie directory if it doesn't exist. Add debug messages for cookie db read/write.
  Read/Write Cookie DB file when creating and stopping browser instance.
  brozzler[easy] requires warcprox>=2.0b1
  look for a sensible default chromium/chrome executable
  tweak thread names
  convert domain specific rule url prefixes to our style of surt
  have pywb support loading warc records from warc files still being written (look for foo.warc.gz.open)
  install flash plugin for chromium
  make state dumping signal handler more robust (now you can kill -QUIT a thousand times in a row without causing problems)
  handle case where websocket connection is unexpectedly closed during the post-behavior phase
  implement timeout and retries to work around issue where sometimes we receive no result message after requesting outlinks
  forgot to commit easy.py, add pywb.py with support for pywb rethinkdb index, and make brozzler-easy also run pywb
  working on brozzler-easy, single process with brozzler-worker and warcprox working together (pywb to be added)
  twirldown for site yaml on site page
  give master a version number considered later than the one up on pypi (1.1b3.dev45 > 1.1b2)
  in vagrant/ansible, install brozzler from this checkout instead of from github master
  option to save list of outlinks (categorized as "accepted", "blocked" (by robots), or "rejected") per page in rethinkdb (to be used by archive-it for out-of-scope reporting)
  oops didn't mean to leave that windows-only subprocess flag
  remove accidentally committed playbook.retry
  vagrant setup (unfinished)
  do not send more than one SIGTERM when shutting down browser process, because on recent chromium on linux, the second sigterm abruptly ends the process, and sometimes leaves orphan subprocesses; also send TERM/KILL signals to the whole process group, another measure to avoid orphans; and adjust logging levels for captured chrome output
  command line utility brozzler-ensure-tables, creates rethinkdb tables if they don't already exist... brozzler normally creates them on demand at startup, but if multiple instances are starting up at the same time, you can end up with duplicate broken tables, so it's a good idea to use this utility when spinning up a cluster
This commit is contained in:
Noah Levitt 2016-07-26 19:47:50 -05:00
commit fdc2f87a0e
30 changed files with 1222 additions and 127 deletions

View File

@ -1,21 +1,21 @@
#
# brozzler/__init__.py - __init__.py for brozzler package, contains some common
# code
#
# Copyright (C) 2014-2016 Internet Archive
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
brozzler/__init__.py - __init__.py for brozzler package, contains some common
code
Copyright (C) 2014-2016 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json as _json
import logging as _logging

View File

@ -35,6 +35,9 @@ import select
import re
import base64
import psutil
import signal
import string
import sqlite3
__all__ = ["BrowserPool", "Browser"]
@ -126,11 +129,23 @@ class Browser:
def __exit__(self, *args):
self.stop()
def start(self, proxy=None):
def start(self, proxy=None, cookie_db=None):
if not self._chrome_instance:
# these can raise exceptions
self.chrome_port = self._find_available_port()
self._work_dir = tempfile.TemporaryDirectory()
if cookie_db is not None:
cookie_dir = os.sep.join([self._work_dir.name, "chrome-user-data","Default"])
cookie_location = os.sep.join([cookie_dir,"Cookies"])
self.logger.debug("Cookie DB provided. Writing to: %s", cookie_location)
os.makedirs(cookie_dir, exist_ok=True)
try:
with open(cookie_location,'wb') as cookie_file:
cookie_file.write(cookie_db)
except OSError:
self.logger.error("exception writing cookie file at: %s", cookie_location, exc_info=True)
self._chrome_instance = Chrome(
port=self.chrome_port, executable=self.chrome_exe,
user_home_dir=self._work_dir.name,
@ -159,6 +174,24 @@ class Browser:
except:
self.logger.error("problem stopping", exc_info=True)
def persist_and_read_cookie_db(self):
cookie_location = os.sep.join([self._work_dir.name, "chrome-user-data","Default","Cookies"])
self.logger.debug("Saving Cookie DB from: %s", cookie_location)
try:
with sqlite3.connect(cookie_location) as conn:
cur = conn.cursor()
cur.execute("UPDATE cookies SET persistent = 1")
except sqlite3.Error:
self.logger.error("exception updating cookie DB", exc_info=True)
cookie_db=None
try:
with open(cookie_location, "rb") as cookie_file:
cookie_db=cookie_file.read()
except OSError:
self.logger.error("exception reading from cookie DB file at: %s", cookie_location, exc_info=True)
return cookie_db
def _find_available_port(self):
port_available = False
port = self.chrome_port
@ -208,18 +241,23 @@ class Browser:
self._waiting_on_screenshot_msg_id = None
self._waiting_on_document_url_msg_id = None
self._waiting_on_outlinks_msg_id = None
self._waiting_on_outlinks_start = None
self._waiting_on_outlinks_attempt = 0
self._outlinks = None
self._reached_limit = None
self._aw_snap_hes_dead_jim = None
self._abort_browse_page = False
self._has_screenshot = False
self._websock = websocket.WebSocketApp(self._websocket_url,
on_open=self._visit_page, on_message=self._wrap_handle_message)
self._websock = websocket.WebSocketApp(
self._websocket_url, on_open=self._visit_page,
on_message=self._wrap_handle_message)
threadName = "WebsockThread{}-{}".format(self.chrome_port,
''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6))))
websock_thread = threading.Thread(target=self._websock.run_forever, name=threadName, kwargs={'ping_timeout':0.5})
threadName = "WebsockThread:%s-%s" % (self.chrome_port, ''.join(
random.choice(string.ascii_letters) for _ in range(6)))
websock_thread = threading.Thread(
target=self._websock.run_forever, name=threadName,
kwargs={'ping_timeout':0.5})
websock_thread.start()
self._start = time.time()
aborted = False
@ -235,27 +273,43 @@ class Browser:
if self._post_behavior_interval_func():
return self._outlinks
finally:
if self._websock and self._websock.sock and self._websock.sock.connected:
if (self._websock and self._websock.sock
and self._websock.sock.connected):
try:
self._websock.close()
except BaseException as e:
self.logger.error("exception closing websocket {} - {}".format(self._websock, e))
self.logger.error(
"exception closing websocket %s - %s" % (
self._websock, e))
websock_thread.join(timeout=30)
if websock_thread.is_alive():
self.logger.error("{} still alive 30 seconds after closing {}, will forcefully nudge it again".format(websock_thread, self._websock))
self.logger.error(
"%s still alive 30 seconds after closing %s, will "
"forcefully nudge it again" % (
websock_thread, self._websock))
self._websock.keep_running = False
websock_thread.join(timeout=30)
if websock_thread.is_alive():
self.logger.critical("{} still alive 60 seconds after closing {}".format(websock_thread, self._websock))
self.logger.critical(
"%s still alive 60 seconds after closing %s" % (
websock_thread, self._websock))
self._behavior = None
def _post_behavior_interval_func(self):
"""Called periodically after behavior is finished on the page. Returns
true when post-behavior tasks are finished."""
if not self._has_screenshot and (
not self._waiting_on_scroll_to_top_msg_id
"""
Called periodically after behavior is finished on the page. Returns
true when post-behavior tasks are finished.
"""
if (not self._websock or not self._websock.sock
or not self._websock.sock.connected):
raise BrowsingException(
"websocket closed, did chrome die? {}".format(
self._websocket_url))
if (not self._has_screenshot
and not self._waiting_on_scroll_to_top_msg_id
and not self._waiting_on_screenshot_msg_id):
if time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS:
self.logger.info(
@ -274,7 +328,7 @@ class Browser:
self._waiting_on_scroll_to_top_start = time.time()
return False
elif (self._waiting_on_scroll_to_top_msg_id
and time.time() - self._waiting_on_scroll_to_top_start > 30):
and time.time() - self._waiting_on_scroll_to_top_start > 30.0):
# chromium bug? occasionally we get no scroll-to-top result message
self.logger.warn(
"timed out after %.1fs waiting for scroll-to-top result "
@ -294,12 +348,33 @@ class Browser:
return True
elif not self._waiting_on_outlinks_msg_id:
self.logger.info("retrieving outlinks for %s", self.url)
self._waiting_on_outlinks_msg_id = self.send_to_chrome(
method="Runtime.evaluate",
params={"expression": self.OUTLINKS_JS})
self._request_outlinks()
return False
else: # self._waiting_on_outlinks_msg_id
return False
if time.time() - self._waiting_on_outlinks_start > 30.0:
if self._waiting_on_outlinks_attempt < 5:
self.logger.warn(
"timed out after %.1fs on attempt %s to retrieve "
"outlinks, trying again",
time.time() - self._waiting_on_outlinks_start,
self._waiting_on_outlinks_attempt)
self._request_outlinks()
return False
else:
raise BrowsingException(
"timed out after %.1fs on (final) attempt %s "
"to retrieve outlinks" % (
time.time() - self._waiting_on_outlinks_start,
self._waiting_on_outlinks_attempt))
else: # just waiting for outlinks
return False
def _request_outlinks(self):
self._waiting_on_outlinks_msg_id = self.send_to_chrome(
method="Runtime.evaluate",
params={"expression": self.OUTLINKS_JS})
self._waiting_on_outlinks_attempt += 1
self._waiting_on_outlinks_start = time.time()
OUTLINKS_JS = """
var compileOutlinks = function(frame) {
@ -318,10 +393,14 @@ compileOutlinks(window).join(' ');
def _browse_interval_func(self):
"""Called periodically while page is being browsed. Returns True when
finished browsing."""
if not self._websock or not self._websock.sock or not self._websock.sock.connected:
raise BrowsingException("websocket closed, did chrome die? {}".format(self._websocket_url))
if (not self._websock or not self._websock.sock
or not self._websock.sock.connected):
raise BrowsingException(
"websocket closed, did chrome die? {}".format(
self._websocket_url))
elif self._aw_snap_hes_dead_jim:
raise BrowsingException("""chrome tab went "aw snap" or "he's dead jim"!""")
raise BrowsingException(
"""chrome tab went "aw snap" or "he's dead jim"!""")
elif (self._behavior != None and self._behavior.is_finished()
or time.time() - self._start > Browser.HARD_TIMEOUT_SECONDS):
return True
@ -393,7 +472,7 @@ compileOutlinks(window).join(' ');
def _page_load_event_fired(self, message):
self.logger.info("Page.loadEventFired, moving on to starting behaviors url={}".format(self.url))
self._behavior = Behavior(self.url, self)
self._behavior.start()
self._behavior.start(self.behavior_parameters)
self._waiting_on_document_url_msg_id = self.send_to_chrome(method="Runtime.evaluate", params={"expression":"document.URL"})
@ -420,7 +499,7 @@ compileOutlinks(window).join(' ');
self.on_screenshot(base64.b64decode(message["result"]["data"]))
self._waiting_on_screenshot_msg_id = None
self._has_screenshot = True
self.logger.info("got screenshot, moving on to getting outlinks url={}".format(self.url))
self.logger.info("got screenshot, moving on to getting outlinks")
elif message["id"] == self._waiting_on_scroll_to_top_msg_id:
self._waiting_on_scroll_to_top_msg_id = None
self._waiting_on_scroll_to_top_start = None
@ -473,15 +552,19 @@ class Chrome:
self.ignore_cert_errors = ignore_cert_errors
self._shutdown = threading.Event()
# returns websocket url to chrome window with about:blank loaded
def __enter__(self):
'''
Returns websocket url to chrome window with about:blank loaded.
'''
return self.start()
def __exit__(self, *args):
self.stop()
# returns websocket url to chrome window with about:blank loaded
def start(self):
'''
Returns websocket url to chrome window with about:blank loaded.
'''
timeout_sec = 600
new_env = os.environ.copy()
new_env["HOME"] = self.user_home_dir
@ -502,9 +585,10 @@ class Chrome:
chrome_args.append("--proxy-server={}".format(self.proxy))
chrome_args.append("about:blank")
self.logger.info("running: {}".format(" ".join(chrome_args)))
# start_new_session - new process group so we can kill the whole group
self.chrome_process = subprocess.Popen(chrome_args, env=new_env,
start_new_session=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, bufsize=0)
stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0,
start_new_session=True)
self._out_reader_thread = threading.Thread(target=self._read_stderr_stdout,
name="ChromeOutReaderThread(pid={})".format(self.chrome_process.pid))
self._out_reader_thread.start()
@ -540,7 +624,9 @@ class Chrome:
# XXX select doesn't work on windows
def readline_nonblock(f):
buf = b""
while not self._shutdown.is_set() and (len(buf) == 0 or buf[-1] != 0xa) and select.select([f],[],[],0.5)[0]:
while not self._shutdown.is_set() and (
len(buf) == 0 or buf[-1] != 0xa) and select.select(
[f],[],[],0.5)[0]:
buf += f.read(1)
return buf
@ -548,17 +634,33 @@ class Chrome:
while not self._shutdown.is_set():
buf = readline_nonblock(self.chrome_process.stdout)
if buf:
if re.search(b"Xlib: extension|CERT_PKIXVerifyCert for [^ ]* failed|^ALSA lib|ERROR:gl_surface_glx.cc|ERROR:gpu_child_thread.cc", buf):
logging.debug("chrome pid %s STDERR %s", self.chrome_process.pid, buf)
if re.search(
b"Xlib: extension|"
b"CERT_PKIXVerifyCert for [^ ]* failed|"
b"^ALSA lib|ERROR:gl_surface_glx.cc|"
b"ERROR:gpu_child_thread.cc", buf):
logging.log(
brozzler.TRACE, "chrome pid %s STDOUT %s",
self.chrome_process.pid, buf)
else:
logging.warn("chrome pid %s STDERR %s", self.chrome_process.pid, buf)
logging.info(
"chrome pid %s STDOUT %s",
self.chrome_process.pid, buf)
buf = readline_nonblock(self.chrome_process.stderr)
if buf:
if re.search(b"Xlib: extension|CERT_PKIXVerifyCert for [^ ]* failed|^ALSA lib|ERROR:gl_surface_glx.cc|ERROR:gpu_child_thread.cc", buf):
logging.debug("chrome pid %s STDERR %s", self.chrome_process.pid, buf)
if re.search(
b"Xlib: extension|"
b"CERT_PKIXVerifyCert for [^ ]* failed|"
b"^ALSA lib|ERROR:gl_surface_glx.cc|"
b"ERROR:gpu_child_thread.cc", buf):
logging.log(
brozzler.TRACE, "chrome pid %s STDOUT %s",
self.chrome_process.pid, buf)
else:
logging.warn("chrome pid %s STDERR %s", self.chrome_process.pid, buf)
logging.info(
"chrome pid %s STDERR %s",
self.chrome_process.pid, buf)
except:
logging.error("unexpected exception", exc_info=True)
@ -568,10 +670,10 @@ class Chrome:
timeout_sec = 300
self._shutdown.set()
self.logger.info("terminating chrome pid {}".format(self.chrome_process.pid))
self.logger.info("terminating chrome pgid %s" % self.chrome_process.pid)
self.chrome_process.terminate()
first_sigterm = last_sigterm = time.time()
os.killpg(self.chrome_process.pid, signal.SIGTERM)
first_sigterm = time.time()
try:
while time.time() - first_sigterm < timeout_sec:
@ -580,21 +682,29 @@ class Chrome:
status = self.chrome_process.poll()
if status is not None:
if status == 0:
self.logger.info("chrome pid {} exited normally".format(self.chrome_process.pid, status))
self.logger.info(
"chrome pid %s exited normally",
self.chrome_process.pid)
else:
self.logger.warn("chrome pid {} exited with nonzero status {}".format(self.chrome_process.pid, status))
self.logger.warn(
"chrome pid %s exited with nonzero status %s",
self.chrome_process.pid, status)
# XXX I would like to forcefully kill the process group
# here to guarantee no orphaned chromium subprocesses hang
# around, but there's a chance I suppose that some other
# process could have started with the same pgid
return
# sometimes a hung chrome process will terminate on repeated sigterms
if time.time() - last_sigterm > 10:
self.chrome_process.terminate()
last_sigterm = time.time()
self.logger.warn("chrome pid {} still alive {} seconds after sending SIGTERM, sending SIGKILL".format(self.chrome_process.pid, timeout_sec))
self.chrome_process.kill()
self.logger.warn(
"chrome pid %s still alive %.1f seconds after sending "
"SIGTERM, sending SIGKILL", self.chrome_process.pid,
time.time() - first_sigterm)
os.killpg(self.chrome_process.pid, signal.SIGKILL)
status = self.chrome_process.wait()
self.logger.warn("chrome pid {} reaped (status={}) after killing with SIGKILL".format(self.chrome_process.pid, status))
self.logger.warn(
"chrome pid %s reaped (status=%s) after killing with "
"SIGKILL", self.chrome_process.pid, status)
finally:
self._out_reader_thread.join()
self.chrome_process = None

View File

@ -24,7 +24,6 @@ import datetime
import json
import logging
import os
import pprint
import re
import requests
import rethinkstuff
@ -36,14 +35,21 @@ import time
import traceback
import warnings
import yaml
import shutil
def _add_common_options(arg_parser):
arg_parser.add_argument(
'-q', '--quiet', dest='log_level',
action='store_const', default=logging.INFO, const=logging.WARN)
arg_parser.add_argument(
'-v', '--verbose', dest='log_level',
action='store_const', default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument(
'--trace', dest='log_level',
action='store_const', default=logging.INFO, const=brozzler.TRACE)
# arg_parser.add_argument(
# '-s', '--silent', dest='log_level', action='store_const',
# default=logging.INFO, const=logging.CRITICAL)
arg_parser.add_argument(
'--version', action='version',
version='brozzler %s - %s' % (
@ -80,6 +86,26 @@ def _configure_logging(args):
warnings.simplefilter(
'ignore', category=requests.packages.urllib3.exceptions.InsecurePlatformWarning)
def suggest_default_chome_exe():
# mac os x application executable paths
for path in [
'/Applications/Chromium.app/Contents/MacOS/Chromium',
'/Applications/Google Chrome.app/Contents/MacOS/Google Chrome']:
if os.path.exists(path):
return path
# "chromium-browser" is the executable on ubuntu trusty
# https://github.com/internetarchive/brozzler/pull/6/files uses "chromium"
# google chrome executable names taken from these packages:
# http://www.ubuntuupdates.org/ppa/google_chrome
for exe in [
'chromium-browser', 'chromium', 'google-chrome',
'google-chrome-stable', 'google-chrome-beta',
'google-chrome-unstable']:
if shutil.which(exe):
return exe
return 'chromium-browser'
def brozzle_page():
'''
Command line utility entry point for brozzling a single page. Opens url in
@ -91,7 +117,8 @@ def brozzle_page():
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('url', metavar='URL', help='page url')
arg_parser.add_argument(
'-e', '--executable', dest='chrome_exe', default='chromium-browser',
'-e', '--chrome-exe', dest='chrome_exe',
default=suggest_default_chome_exe(),
help='executable to use to invoke chrome')
arg_parser.add_argument(
'--proxy', dest='proxy', default=None,
@ -157,10 +184,9 @@ def brozzler_new_job():
frontier = brozzler.RethinkDbFrontier(r)
brozzler.job.new_job_file(frontier, args.job_conf_file)
def brozzler_new_site():
'''
Command line utility entry point for queuing a new brozzler site.
Command line utility entry point for queuing a new brozzler site.
Takes a seed url and creates a site and page object in rethinkdb, which
brozzler-workers will look at and start crawling.
'''
@ -206,13 +232,13 @@ def brozzler_worker():
Main entrypoint for brozzler, gets sites and pages to brozzle from
rethinkdb, brozzles them.
'''
arg_parser = argparse.ArgumentParser(
prog=os.path.basename(__file__),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
_add_rethinkdb_options(arg_parser)
arg_parser.add_argument(
'-e', '--executable', dest='chrome_exe', default='chromium-browser',
'-e', '--chrome-exe', dest='chrome_exe',
default=suggest_default_chome_exe(),
help='executable to use to invoke chrome')
arg_parser.add_argument(
'-n', '--max-browsers', dest='max_browsers', default='1',
@ -227,42 +253,77 @@ def brozzler_worker():
def sigint(signum, frame):
raise brozzler.ShutdownRequested('shutdown requested (caught SIGINT)')
def dump_state(signum, frame):
pp = pprint.PrettyPrinter(indent=4)
state_strs = []
# do not print in signal handler to avoid RuntimeError: reentrant call
state_dump_msgs = []
def queue_state_dump(signum, frame):
signal.signal(signal.SIGQUIT, signal.SIG_IGN)
try:
state_strs = []
frames = sys._current_frames()
threads = {th.ident: th for th in threading.enumerate()}
for ident in frames:
if threads[ident]:
state_strs.append(str(threads[ident]))
else:
state_strs.append('<???:thread:ident=%s>' % ident)
stack = traceback.format_stack(frames[ident])
state_strs.append(''.join(stack))
state_dump_msgs.append(
'dumping state (caught signal %s)\n%s' % (
signum, '\n'.join(state_strs)))
except BaseException as e:
state_dump_msgs.append('exception dumping state: %s' % e)
finally:
signal.signal(signal.SIGQUIT, queue_state_dump)
for th in threading.enumerate():
state_strs.append(str(th))
stack = traceback.format_stack(sys._current_frames()[th.ident])
state_strs.append("".join(stack))
logging.warn("dumping state (caught signal {})\n{}".format(
signum, "\n".join(state_strs)))
signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGQUIT, queue_state_dump)
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint)
r = rethinkstuff.Rethinker(
args.rethinkdb_servers.split(","), args.rethinkdb_db)
args.rethinkdb_servers.split(','), args.rethinkdb_db)
frontier = brozzler.RethinkDbFrontier(r)
service_registry = rethinkstuff.ServiceRegistry(r)
worker = brozzler.worker.BrozzlerWorker(
frontier, service_registry, max_browsers=int(args.max_browsers),
chrome_exe=args.chrome_exe)
worker_thread = worker.start()
worker.start()
try:
while worker_thread.is_alive():
while worker.is_alive():
while state_dump_msgs:
logging.warn(state_dump_msgs.pop(0))
time.sleep(0.5)
logging.critical("worker thread has died, shutting down")
logging.critical('worker thread has died, shutting down')
except brozzler.ShutdownRequested as e:
pass
finally:
worker.shutdown_now()
for th in threading.enumerate():
if th != threading.current_thread():
th.join()
logging.info("brozzler-worker is all done, exiting")
logging.info('brozzler-worker is all done, exiting')
def brozzler_ensure_tables():
'''
Creates rethinkdb tables if they don't already exist. Brozzler
(brozzler-worker, brozzler-new-job, etc) normally creates the tables it
needs on demand at startup, but if multiple instances are starting up at
the same time, you can end up with duplicate broken tables. So it's a good
idea to use this utility at an early step when spinning up a cluster.
'''
arg_parser = argparse.ArgumentParser(
prog=os.path.basename(sys.argv[0]),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
_add_rethinkdb_options(arg_parser)
_add_common_options(arg_parser)
args = arg_parser.parse_args(args=sys.argv[1:])
_configure_logging(args)
r = rethinkstuff.Rethinker(
args.rethinkdb_servers.split(','), args.rethinkdb_db)
# services table
rethinkstuff.ServiceRegistry(r)
# sites, pages, jobs tables
brozzler.frontier.RethinkDbFrontier(r)

270
brozzler/easy.py Normal file
View File

@ -0,0 +1,270 @@
#!/usr/bin/env python
'''
brozzler-easy - brozzler-worker, warcprox, and pywb all working together in a
single process
Copyright (C) 2016 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import sys
import logging
try:
import warcprox
import warcprox.main
import pywb
import brozzler.pywb
import wsgiref.simple_server
import wsgiref.handlers
import six.moves.socketserver
except ImportError as e:
logging.critical(
'%s: %s\n\nYou might need to run "pip install '
'brozzler[easy]".\nSee README.rst for more information.',
type(e).__name__, e)
sys.exit(1)
import argparse
import brozzler
import brozzler.cli
import os
import socket
import signal
import threading
import time
import rethinkstuff
import traceback
def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
arg_parser = argparse.ArgumentParser(
prog=prog, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description=(
'brozzler-easy - easy deployment of brozzler, with '
'brozzler-worker, warcprox, and pywb all running in a single '
'process'))
# === common args ===
arg_parser.add_argument(
'--rethinkdb-servers', dest='rethinkdb_servers',
default='localhost', help=(
'rethinkdb servers, e.g. '
'db0.foo.org,db0.foo.org:38015,db1.foo.org'))
arg_parser.add_argument(
'--rethinkdb-db', dest='rethinkdb_db', default='brozzler',
help='rethinkdb database name')
arg_parser.add_argument(
'-d', '--warcs-dir', dest='warcs_dir', default='./warcs',
help='where to write warcs')
# === warcprox args ===
arg_parser.add_argument(
'-c', '--cacert', dest='cacert',
default='./%s-warcprox-ca.pem' % socket.gethostname(),
help=(
'warcprox CA certificate file; if file does not exist, it '
'will be created'))
arg_parser.add_argument(
'--certs-dir', dest='certs_dir',
default='./%s-warcprox-ca' % socket.gethostname(),
help='where warcprox will store and load generated certificates')
arg_parser.add_argument(
'--onion-tor-socks-proxy', dest='onion_tor_socks_proxy',
default=None, help=(
'host:port of tor socks proxy, used only to connect to '
'.onion sites'))
# === brozzler-worker args ===
arg_parser.add_argument(
'-e', '--chrome-exe', dest='chrome_exe',
default=brozzler.cli.suggest_default_chome_exe(),
help='executable to use to invoke chrome')
arg_parser.add_argument(
'-n', '--max-browsers', dest='max_browsers', default='1',
help='max number of chrome instances simultaneously browsing pages')
# === pywb args ===
arg_parser.add_argument(
'--pywb-port', dest='pywb_port', type=int, default=8091,
help='pywb wayback port')
# === common at the bottom args ===
arg_parser.add_argument(
'-v', '--verbose', dest='verbose', action='store_true')
arg_parser.add_argument('-q', '--quiet', dest='quiet', action='store_true')
# arg_parser.add_argument(
# '-s', '--silent', dest='log_level', action='store_const',
# default=logging.INFO, const=logging.CRITICAL)
arg_parser.add_argument(
'--version', action='version',
version='brozzler %s - %s' % (brozzler.__version__, prog))
return arg_parser
class BrozzlerEasyController:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, args):
self.stop = threading.Event()
self.args = args
self.warcprox_controller = warcprox.main.init_controller(
self._warcprox_args(args))
self.brozzler_worker = self._init_brozzler_worker(args)
self.pywb_httpd = self._init_pywb(args)
def _init_brozzler_worker(self, args):
r = rethinkstuff.Rethinker(
args.rethinkdb_servers.split(","), args.rethinkdb_db)
frontier = brozzler.RethinkDbFrontier(r)
service_registry = rethinkstuff.ServiceRegistry(r)
worker = brozzler.worker.BrozzlerWorker(
frontier, service_registry,
max_browsers=int(args.max_browsers),
chrome_exe=args.chrome_exe,
proxy='%s:%s' % self.warcprox_controller.proxy.server_address,
enable_warcprox_features=True)
return worker
def _init_pywb(self, args):
brozzler.pywb.TheGoodUrlCanonicalizer.replace_default_canonicalizer()
brozzler.pywb.TheGoodUrlCanonicalizer.monkey_patch_dsrules_init()
brozzler.pywb.support_in_progress_warcs()
if args.warcs_dir.endswith('/'):
warcs_dir = args.warcs_dir
else:
warcs_dir = args.warcs_dir + '/'
conf = {
'collections': {
'brozzler': {
'index_paths': brozzler.pywb.RethinkCDXSource(
servers=args.rethinkdb_servers.split(","),
db=args.rethinkdb_db, table='captures')
},
},
# 'enable_http_proxy': True,
# 'enable_memento': True,
'archive_paths': warcs_dir,
'enable_cdx_api': True,
'framed_replay': True,
'port': args.pywb_port,
'enable_auto_colls': False,
}
wsgi_app = pywb.framework.wsgi_wrappers.init_app(
pywb.webapp.pywb_init.create_wb_router, config=conf,
load_yaml=False)
# disable is_hop_by_hop restrictions
wsgiref.handlers.is_hop_by_hop = lambda x: False
class ThreadingWSGIServer(
six.moves.socketserver.ThreadingMixIn,
wsgiref.simple_server.WSGIServer):
pass
return wsgiref.simple_server.make_server(
'', args.pywb_port, wsgi_app, ThreadingWSGIServer)
def start(self):
self.logger.info('starting warcprox')
self.warcprox_controller.start()
# XXX wait til fully started?
self.logger.info('starting brozzler-worker')
self.brozzler_worker.start()
self.logger.info(
'starting pywb at %s:%s', *self.pywb_httpd.server_address)
threading.Thread(target=self.pywb_httpd.serve_forever).start()
def shutdown(self):
self.logger.info('shutting down brozzler-worker')
self.brozzler_worker.shutdown_now()
# brozzler-worker is fully shut down at this point
self.logger.info('shutting down pywb')
self.pywb_httpd.shutdown()
self.logger.info('shutting down warcprox')
self.warcprox_controller.shutdown()
def wait_for_shutdown_request(self):
try:
while not self.stop.is_set():
time.sleep(0.5)
finally:
self.shutdown()
def _warcprox_args(self, args):
'''
Takes args as produced by the argument parser built by
_build_arg_parser and builds warcprox arguments object suitable to pass
to warcprox.main.init_controller. Copies some arguments, renames some,
populates some with defaults appropriate for brozzler-easy, etc.
'''
warcprox_args = argparse.Namespace()
warcprox_args.address = 'localhost'
# let the OS choose an available port; discover it later using
# sock.getsockname()[1]
warcprox_args.port = 0
warcprox_args.cacert = args.cacert
warcprox_args.certs_dir = args.certs_dir
warcprox_args.directory = args.warcs_dir
warcprox_args.gzip = True
warcprox_args.prefix = 'brozzler'
warcprox_args.size = 1000 * 1000* 1000
warcprox_args.rollover_idle_time = 3 * 60
warcprox_args.digest_algorithm = 'sha1'
warcprox_args.base32 = True
warcprox_args.stats_db_file = None
warcprox_args.playback_port = None
warcprox_args.playback_index_db_file = None
warcprox_args.rethinkdb_servers = args.rethinkdb_servers
warcprox_args.rethinkdb_db = args.rethinkdb_db
warcprox_args.rethinkdb_big_table = True
warcprox_args.kafka_broker_list = None
warcprox_args.kafka_capture_feed_topic = None
warcprox_args.queue_size = 500
warcprox_args.max_threads = None
warcprox_args.profile = False
warcprox_args.onion_tor_socks_proxy = args.onion_tor_socks_proxy
return warcprox_args
def dump_state(self, signum=None, frame=None):
state_strs = []
for th in threading.enumerate():
state_strs.append(str(th))
stack = traceback.format_stack(sys._current_frames()[th.ident])
state_strs.append(''.join(stack))
logging.warn('dumping state (caught signal {})\n{}'.format(
signum, '\n'.join(state_strs)))
def main():
arg_parser = _build_arg_parser()
args = arg_parser.parse_args(args=sys.argv[1:])
if args.verbose:
loglevel = logging.DEBUG
elif args.quiet:
loglevel = logging.WARNING
else:
loglevel = logging.INFO
logging.basicConfig(
level=loglevel, stream=sys.stderr, format=(
'%(asctime)s %(process)d %(levelname)s %(threadName)s '
'%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s'))
controller = BrozzlerEasyController(args)
signal.signal(signal.SIGTERM, lambda a,b: controller.stop.set())
signal.signal(signal.SIGINT, lambda a,b: controller.stop.set())
signal.signal(signal.SIGQUIT, controller.dump_state)
controller.start()
controller.wait_for_shutdown_request()

View File

@ -292,6 +292,8 @@ class RethinkDbFrontier:
self.update_page(page)
def scope_and_schedule_outlinks(self, site, parent_page, outlinks):
if site.remember_outlinks:
parent_page.outlinks = {"accepted":[],"blocked":[],"rejected":[]}
counts = {"added":0,"updated":0,"rejected":0,"blocked":0}
for url in outlinks or []:
u = brozzler.site.Url(url)
@ -314,10 +316,19 @@ class RethinkDbFrontier:
else:
self.new_page(new_child_page)
counts["added"] += 1
if site.remember_outlinks:
parent_page.outlinks["accepted"].append(url)
else:
counts["blocked"] += 1
if site.remember_outlinks:
parent_page.outlinks["blocked"].append(url)
else:
counts["rejected"] += 1
if site.remember_outlinks:
parent_page.outlinks["rejected"].append(url)
if site.remember_outlinks:
self.update_page(parent_page)
self.logger.info(
"%s new links added, %s existing links updated, %s links "

View File

@ -54,8 +54,8 @@ def new_job(frontier, job_conf):
merged_conf = merge(seed_conf, job_conf)
# XXX check for unknown settings, invalid url, etc
site = brozzler.Site(job_id=job.id,
seed=merged_conf["url"],
site = brozzler.Site(
job_id=job.id, seed=merged_conf["url"],
scope=merged_conf.get("scope"),
time_limit=merged_conf.get("time_limit"),
proxy=merged_conf.get("proxy"),
@ -63,7 +63,8 @@ def new_job(frontier, job_conf):
enable_warcprox_features=merged_conf.get(
"enable_warcprox_features"),
warcprox_meta=merged_conf.get("warcprox_meta"),
metadata=merged_conf.get("metadata"))
metadata=merged_conf.get("metadata"),
remember_outlinks=merged_conf.get("remember_outlinks"))
sites.append(site)
# insert all the sites into database before the job

194
brozzler/pywb.py Normal file
View File

@ -0,0 +1,194 @@
#!/usr/bin/env python
'''
brozzler/pywb.py - pywb support for rethinkdb index
Copyright (C) 2016 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import sys
import logging
try:
import pywb.apps.cli
import pywb.cdx.cdxdomainspecific
import pywb.cdx.cdxobject
import pywb.cdx.cdxserver
import pywb.webapp.query_handler
except ImportError as e:
logging.critical(
'%s: %s\n\nYou might need to run "pip install '
'brozzler[easy]".\nSee README.rst for more information.',
type(e).__name__, e)
sys.exit(1)
import rethinkstuff
import rethinkdb
import surt
import json
class RethinkCDXSource(pywb.cdx.cdxsource.CDXSource):
def __init__(self, servers, db, table):
self.servers = servers
self.db = db
self.table = table
@property
def r(self):
try:
return self._r
except AttributeError:
self._r = rethinkstuff.Rethinker(self.servers, self.db)
return self._r
def load_cdx(self, cdx_query):
# logging.debug('vars(cdx_query)=%s', vars(cdx_query))
rethink_results = self._query_rethinkdb(cdx_query)
return self._gen_cdx_lines(rethink_results)
def _gen_cdx_lines(self, rethink_results):
for record in rethink_results:
# XXX inefficient, it gets parsed later, figure out how to
# short-circuit this step and create the CDXObject directly
blob = {
'url': record['url'],
'mime': record['content_type'],
'status': str(record['response_code']),
'digest': record['sha1base32'],
'length': str(record['length']), # XXX is this the right length?
'offset': str(record['offset']),
'filename': record['filename'],
}
# b'org,archive)/ 20160427215530 {"url": "https://archive.org/", "mime": "text/html", "status": "200", "digest": "VILUFXZD232SLUA6XROZQIMEVUPW6EIE", "length": "16001", "offset": "90144", "filename": "ARCHIVEIT-261-ONE_TIME-JOB209607-20160427215508135-00000.warc.gz"}'
cdx_line = '{} {:%Y%m%d%H%M%S} {}'.format(
record['canon_surt'], record['timestamp'],
json.dumps(blob))
yield cdx_line.encode('utf-8')
def _query_rethinkdb(self, cdx_query):
start_key = cdx_query.key.decode('utf-8')
end_key = cdx_query.end_key.decode('utf-8')
reql = self.r.table(self.table).between(
[start_key[:150], rethinkdb.minval],
[end_key[:150]+'!', rethinkdb.maxval],
index='abbr_canon_surt_timestamp')
reql = reql.order_by(index='abbr_canon_surt_timestamp')
# filters have to come after order_by apparently
# TODO support for POST, etc
# http_method='WARCPROX_WRITE_RECORD' for screenshots, thumbnails
reql = reql.filter(
lambda capture: rethinkdb.expr(
['WARCPROX_WRITE_RECORD','GET']).contains(
capture['http_method']))
reql = reql.filter(
lambda capture: (capture['canon_surt'] >= start_key)
& (capture['canon_surt'] < end_key))
if cdx_query.limit:
reql = reql.limit(cdx_query.limit)
logging.debug('rethinkdb query: %s', reql)
results = reql.run()
return results
class TheGoodUrlCanonicalizer(object):
'''
Replacement for pywb.utils.canonicalize.UrlCanonicalizer that produces
surts with scheme and with trailing comma, and does not "massage"
www.foo.org into foo.org.
'''
def __init__(self, surt_ordered=True):
'''We are always surt ordered (surt_ordered param is ignored)'''
self.surt_ordered = True
def __call__(self, url):
try:
key = surt.surt(
url, trailing_comma=True, host_massage=False,
with_scheme=True)
# logging.debug('%s -> %s', url, key)
return key
except Exception as e:
raise pywb.utils.canonicalize.UrlCanonicalizeException(
'Invalid Url: ' + url)
def replace_default_canonicalizer():
'''Replace parent class of CustomUrlCanonicalizer with this class.'''
pywb.cdx.cdxdomainspecific.CustomUrlCanonicalizer.__bases__ = (
TheGoodUrlCanonicalizer,)
def good_surts_from_default(default_surt):
'''
Takes a standard surt without scheme and without trailing comma, and
returns a list of "good" surts that together match the same set of
urls. For example:
good_surts_from_default('com,example)/path')
returns
['http://(com,example,)/path',
'https://(com,example,)/path',
'http://(com,example,www,)/path',
'https://(com,example,www,)/path']
'''
if default_surt == '':
return ['']
parts = default_surt.split(')', 1)
if len(parts) == 2:
orig_host_part, path_part = parts
good_surts = [
'http://(%s,)%s' % (orig_host_part, path_part),
'https://(%s,)%s' % (orig_host_part, path_part),
'http://(%s,www,)%s' % (orig_host_part, path_part),
'https://(%s,www,)%s' % (orig_host_part, path_part),
]
else: # no path part
host_part = parts[0]
good_surts = [
'http://(%s' % host_part,
'https://(%s' % host_part,
]
return good_surts
def monkey_patch_dsrules_init():
orig_init = pywb.cdx.cdxdomainspecific.CDXDomainSpecificRule.__init__
def cdx_dsrule_init(self, url_prefix, rules):
orig_init(self, url_prefix, rules)
good_surts = []
for url_prefix in self.url_prefix:
good_surts.extend(
TheGoodUrlCanonicalizer.good_surts_from_default(
url_prefix))
self.url_prefix = good_surts
pywb.cdx.cdxdomainspecific.CDXDomainSpecificRule.__init__ = cdx_dsrule_init
def support_in_progress_warcs():
'''
Monkey-patch pywb.warc.pathresolvers.PrefixResolver to include warcs still
being written to (warcs having ".open" suffix). This way if a cdx entry
references foo.warc.gz, pywb will try both foo.warc.gz and
foo.warc.gz.open.
'''
_orig_prefix_resolver_call = pywb.warc.pathresolvers.PrefixResolver.__call__
def _prefix_resolver_call(self, filename, cdx=None):
raw_results = _orig_prefix_resolver_call(self, filename, cdx)
results = []
for warc_path in raw_results:
results.append(warc_path)
results.append('%s.open' % warc_path)
return results
pywb.warc.pathresolvers.PrefixResolver.__call__ = _prefix_resolver_call

View File

@ -91,7 +91,7 @@ class Site(brozzler.BaseDictable):
enable_warcprox_features=False, reached_limit=None,
status="ACTIVE", claimed=False, start_time=None,
last_disclaimed=_EPOCH_UTC, last_claimed_by=None,
last_claimed=_EPOCH_UTC, metadata={}):
last_claimed=_EPOCH_UTC, metadata={}, remember_outlinks=None, cookie_db=None):
self.seed = seed
self.id = id
@ -109,6 +109,8 @@ class Site(brozzler.BaseDictable):
self.last_disclaimed = last_disclaimed
self.last_claimed = last_claimed
self.metadata = metadata
self.remember_outlinks = remember_outlinks
self.cookie_db = cookie_db
self.scope = scope or {}
if not "surt" in self.scope:
@ -218,7 +220,8 @@ class Page(brozzler.BaseDictable):
def __init__(
self, url, id=None, site_id=None, job_id=None, hops_from_seed=0,
redirect_url=None, priority=None, claimed=False, brozzle_count=0,
via_page_id=None, last_claimed_by=None, hops_off_surt=0):
via_page_id=None, last_claimed_by=None, hops_off_surt=0,
outlinks=None):
self.site_id = site_id
self.job_id = job_id
self.url = url
@ -229,6 +232,7 @@ class Page(brozzler.BaseDictable):
self.brozzle_count = brozzle_count
self.via_page_id = via_page_id
self.hops_off_surt = hops_off_surt
self.outlinks = outlinks
self._canon_hurl = None
if priority is not None:

View File

@ -48,7 +48,6 @@ app = flask.Flask(__name__)
gunicorn_error_logger = logging.getLogger('gunicorn.error')
app.logger.handlers.extend(gunicorn_error_logger.handlers)
app.logger.setLevel(logging.INFO)
app.logger.info('will this show in the log?')
# configure with environment variables
SETTINGS = {
@ -99,18 +98,39 @@ def pages(site_id):
app.logger.info("flask.request.args=%s", flask.request.args)
start = int(flask.request.args.get("start", 0))
end = int(flask.request.args.get("end", start + 90))
app.logger.info("yes new query")
pages_ = r.table("pages").between(
[site_id, 1, r.minval], [site_id, r.maxval, r.maxval],
index="least_hops").order_by(index="least_hops")[start:end].run()
return flask.jsonify(pages=list(pages_))
@app.route("/api/pages/<page_id>")
@app.route("/api/page/<page_id>")
def page(page_id):
page_ = r.table("pages").get(page_id).run()
return flask.jsonify(page_)
@app.route("/api/pages/<page_id>/yaml")
@app.route("/api/page/<page_id>/yaml")
def page_yaml(page_id):
page_ = r.table("pages").get(page_id).run()
return app.response_class(
yaml.dump(page_, default_flow_style=False),
mimetype='application/yaml')
@app.route("/api/sites/<site_id>")
@app.route("/api/site/<site_id>")
def site(site_id):
site_ = r.table("sites").get(site_id).run()
return flask.jsonify(site_)
@app.route("/api/sites/<site_id>/yaml")
@app.route("/api/site/<site_id>/yaml")
def site_yaml(site_id):
site_ = r.table("sites").get(site_id).run()
return app.response_class(
yaml.dump(site_, default_flow_style=False),
mimetype='application/yaml')
@app.route("/api/stats/<bucket>")
def stats(bucket):
stats_ = r.table("stats").get(bucket).run()

View File

@ -208,6 +208,9 @@ brozzlerControllers.controller("SiteController", ["$scope", "$routeParams", "$ht
loadSiteStats($http, $scope.site);
// console.log("site = ", $scope.site);
});
$http.get("/api/site/" + $routeParams.id + "/yaml").success(function(data) {
$scope.site_yaml = data;
});
loadMorePages();
}]);

View File

@ -11,7 +11,12 @@
</div>
<div>
<h2>Site {{site.seed}} (Job <a href="/jobs/{{site.job_id}}">{{site.job_id}}</a>)</h2>
<h2 ng-click="show_yaml = !show_yaml">
<span class="fa fa-caret-right"
ng-class="{ 'fa-caret-right': !show_yaml, 'fa-caret-down': !!show_yaml }"></span>
Site {{site.seed}} (Job <a href="/jobs/{{site.job_id}}">{{site.job_id}}</a>)
</h2>
<pre style="display:{{show_yaml?'block':'none'}}">{{site_yaml}}</pre>
<div class="row bigstats">
<div class="col-sm-6 col-md-3">

View File

@ -94,13 +94,33 @@ class BrozzlerWorker:
HEARTBEAT_INTERVAL = 20.0
def __init__(self, frontier, service_registry=None, max_browsers=1, chrome_exe="chromium-browser"):
def __init__(
self, frontier, service_registry=None, max_browsers=1,
chrome_exe="chromium-browser", proxy=None,
enable_warcprox_features=False):
self._frontier = frontier
self._service_registry = service_registry
self._max_browsers = max_browsers
# these two settings can be overridden by the job/site configuration
self.__proxy = proxy
self.__enable_warcprox_features = enable_warcprox_features
self._browser_pool = brozzler.browser.BrowserPool(max_browsers,
chrome_exe=chrome_exe, ignore_cert_errors=True)
self._shutdown_requested = threading.Event()
self._thread = None
self._start_stop_lock = threading.Lock()
self._browsing_threads = set()
def _proxy(self, site):
return site.proxy or self.__proxy
def _enable_warcprox_features(self, site):
if site.enable_warcprox_features is not None:
return site.enable_warcprox_features
else:
return self.__enable_warcprox_features
def _youtube_dl(self, destdir, site):
ydl_opts = {
@ -114,12 +134,12 @@ class BrozzlerWorker:
"nopart": True,
"no_color": True,
}
if site.proxy:
ydl_opts["proxy"] = "http://{}".format(site.proxy)
if self._proxy(site):
ydl_opts["proxy"] = "http://{}".format(self._proxy(site))
## XXX (sometimes?) causes chrome debug websocket to go through
## proxy. Maybe not needed thanks to hls_prefer_native.
## # see https://github.com/rg3/youtube-dl/issues/6087
## os.environ["http_proxy"] = "http://{}".format(site.proxy)
## os.environ["http_proxy"] = "http://{}".format(self._proxy(site))
ydl = youtube_dl.YoutubeDL(ydl_opts)
if site.extra_headers():
ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers()))
@ -158,13 +178,13 @@ class BrozzlerWorker:
try:
self.logger.info("trying youtube-dl on {}".format(page))
info = ydl.extract_info(page.url)
if site.proxy and site.enable_warcprox_features:
if self._proxy(site) and self._enable_warcprox_features(site):
info_json = json.dumps(info, sort_keys=True, indent=4)
self.logger.info(
"sending WARCPROX_WRITE_RECORD request to warcprox "
"with youtube-dl json for %s", page)
self._warcprox_write_record(
warcprox_address=site.proxy,
warcprox_address=self._proxy(site),
url="youtube-dl:%s" % page.url, warc_type="metadata",
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
payload=info_json.encode("utf-8"),
@ -199,17 +219,17 @@ class BrozzlerWorker:
def _on_screenshot(screenshot_png):
if on_screenshot:
on_screenshot(screenshot_png)
elif site.proxy and site.enable_warcprox_features:
elif self._proxy(site) and self._enable_warcprox_features(site):
self.logger.info("sending WARCPROX_WRITE_RECORD request "
"to warcprox with screenshot for %s", page)
screenshot_jpeg, thumbnail_jpeg = self.full_and_thumb_jpegs(
screenshot_png)
self._warcprox_write_record(warcprox_address=site.proxy,
self._warcprox_write_record(warcprox_address=self._proxy(site),
url="screenshot:{}".format(page.url),
warc_type="resource", content_type="image/jpeg",
payload=screenshot_jpeg,
extra_headers=site.extra_headers())
self._warcprox_write_record(warcprox_address=site.proxy,
self._warcprox_write_record(warcprox_address=self._proxy(site),
url="thumbnail:{}".format(page.url),
warc_type="resource", content_type="image/jpeg",
payload=thumbnail_jpeg,
@ -237,7 +257,7 @@ class BrozzlerWorker:
if self._needs_browsing(page, ydl_spy):
self.logger.info('needs browsing: %s', page)
if not browser.is_running():
browser.start(proxy=site.proxy)
browser.start(proxy=self._proxy(site), cookie_db=site.cookie_db)
outlinks = browser.browse_page(
page.url, extra_headers=site.extra_headers(),
on_screenshot=_on_screenshot,
@ -253,10 +273,10 @@ class BrozzlerWorker:
def _fetch_url(self, site, page):
proxies = None
if site.proxy:
if self._proxy(site):
proxies = {
'http': 'http://%s' % site.proxy,
'https': 'http://%s' % site.proxy,
'http': 'http://%s' % self._proxy(site),
'https': 'http://%s' % self._proxy(site),
}
self.logger.info('fetching %s', page)
@ -292,8 +312,10 @@ class BrozzlerWorker:
page = self._frontier.claim_page(site, "%s:%s" % (
socket.gethostname(), browser.chrome_port))
outlinks = self.brozzle_page(browser, site, page)
site.cookie_db=browser.persist_and_read_cookie_db()
self._frontier.completed_page(site, page)
self._frontier.scope_and_schedule_outlinks(site, page, outlinks)
self._frontier.scope_and_schedule_outlinks(
site, page, outlinks)
page = None
except brozzler.NothingToClaim:
self.logger.info("no pages left for site %s", site)
@ -311,6 +333,7 @@ class BrozzlerWorker:
browser.stop()
self._frontier.disclaim_site(site, page)
self._browser_pool.release(browser)
self._browsing_threads.remove(threading.current_thread())
def _service_heartbeat(self):
if hasattr(self, "status_info"):
@ -354,9 +377,9 @@ class BrozzlerWorker:
th = threading.Thread(
target=lambda: self._brozzle_site(
browser, site),
name="BrowsingThread:{}-{}".format(
browser.chrome_port, site.seed))
name="BrozzlingThread:%s" % site.seed)
th.start()
self._browsing_threads.add(th)
except:
self._browser_pool.release(browser)
raise
@ -371,7 +394,9 @@ class BrozzlerWorker:
latest_state = "no-unclaimed-sites"
time.sleep(0.5)
except:
self.logger.critical("thread exiting due to unexpected exception", exc_info=True)
self.logger.critical(
"thread exiting due to unexpected exception",
exc_info=True)
finally:
if self._service_registry and hasattr(self, "status_info"):
try:
@ -382,12 +407,25 @@ class BrozzlerWorker:
exc_info=True)
def start(self):
th = threading.Thread(target=self.run, name="BrozzlerWorker")
th.start()
return th
with self._start_stop_lock:
if self._thread:
self.logger.warn(
'ignoring start request because self._thread is '
'not None')
return
self._thread = threading.Thread(
target=self.run, name="BrozzlerWorker")
self._thread.start()
def shutdown_now(self):
self.logger.info("brozzler worker shutting down")
self._shutdown_requested.set()
self._browser_pool.shutdown_now()
with self._start_stop_lock:
self.logger.info("brozzler worker shutting down")
self._shutdown_requested.set()
self._browser_pool.shutdown_now()
while self._browsing_threads:
time.sleep(0.5)
self._thread = None
def is_alive(self):
return self._thread and self._thread.is_alive()

View File

@ -21,7 +21,7 @@ import setuptools
setuptools.setup(
name='brozzler',
version='1.1.dev38',
version='1.1b3.dev58',
description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler',
author='Noah Levitt',
@ -36,7 +36,9 @@ setuptools.setup(
'brozzler-new-job=brozzler.cli:brozzler_new_job',
'brozzler-new-site=brozzler.cli:brozzler_new_site',
'brozzler-worker=brozzler.cli:brozzler_worker',
'brozzler-ensure-tables=brozzler.cli:brozzler_ensure_tables',
'brozzler-webconsole=brozzler.webconsole:run',
'brozzler-easy=brozzler.easy:main',
],
},
install_requires=[
@ -53,7 +55,7 @@ setuptools.setup(
],
extras_require={
'webconsole': ['flask>=0.11', 'gunicorn'],
# 'brozzler-easy': ['warcprox', 'pywb'],
'easy': ['warcprox>=2.0b1', 'pywb'],
},
zip_safe=False,
classifiers=[

61
vagrant/README.rst Normal file
View File

@ -0,0 +1,61 @@
Single-VM Vagrant Brozzler Deployment
-------------------------------------
This is a work in progress. Vagrant + ansible configuration for a single-vm
deployment of brozzler and warcprox with dependencies (notably rethinkdb).
The idea is for this to be a quick way for people to get up and running with a
deployment resembling a real distributed deployment, and to offer a starting
configuration for people to adapt to their clusters.
And equally important, as a harness for integration tests. (As of now brozzler
itself has no automated tests!)
You'll need vagrant installed.
https://www.vagrantup.com/docs/installation/
Then run:
::
my-laptop$ vagrant up
Currently to start a crawl you first need to ssh to the vagrant vm and activate
the brozzler virtualenv.
::
my-laptop$ vagrant ssh
vagrant@brozzler-easy:~$ source ~/brozzler-ve34/bin/activate
(brozzler-ve34)vagrant@brozzler-easy:~$
Then you can run brozzler-new-site:
::
(brozzler-ve34)vagrant@brozzler-easy:~$ brozzler-new-site \
--proxy=localhost:8000 --enable-warcprox-features \
http://example.com/
Or brozzler-new-job (make sure to set the proxy to localhost:8000):
::
(brozzler-ve34)vagrant@brozzler-easy:~$ cat >job1.yml
id: job1
proxy: localhost:8000 # point at warcprox for archiving
enable_warcprox_features: true
seeds:
- url: https://example.org/
(brozzler-ve34)vagrant@brozzler-easy:~$ brozzler-new-job job1.yml
WARC files will appear in ./warcs and brozzler, warcprox and rethinkdb logs in
./logs (via vagrant folders syncing).
You can also look at the rethinkdb console by opening http://localhost:8080 in
your browser after opening an ssh tunnel like so:
::
my-laptop$ vagrant ssh -- -fN -Llocalhost:8080:localhost:8080

16
vagrant/Vagrantfile vendored Normal file
View File

@ -0,0 +1,16 @@
Vagrant.configure(2) do |config|
config.vm.box = "ubuntu/trusty64"
config.vm.hostname = "brozzler-easy"
config.vm.synced_folder "..", "/brozzler"
config.vm.provision "ansible" do |ansible|
ansible.playbook = "ansible/playbook.yml"
ansible.groups = {
"rethinkdb" => ["default"],
"warcprox" => ["default"],
"brozzler-worker" => ["default"],
# "brozzler-webconsole" => ["default"],
}
end
end

View File

@ -0,0 +1,30 @@
---
- name: apply common configuration to all nodes
hosts: all
roles:
- common
- name: deploy rethinkdb
hosts: rethinkdb
roles:
- rethinkdb
- name: deploy warcprox
hosts: warcprox
roles:
- warcprox
- name: deploy brozzler-worker
hosts: brozzler-worker
roles:
- brozzler-worker
# - name: deploy brozzler-webconsole
# hosts: brozzler-webconsole
# roles:
# - brozzler-webconsole
# - name: deploy pywb
# hosts: pywb
# roles:
# - pywb

View File

@ -0,0 +1,19 @@
---
- name: git clone https://github.com/internetarchive/brozzler.git
git: repo=https://github.com/internetarchive/brozzler.git
dest=/home/vagrant/brozzler
- name: pip install -r requirements.txt in virtualenv
pip: requirements=/home/vagrant/brozzler/webconsole/requirements.txt
virtualenv=/home/vagrant/brozzler-webconsole-ve34
virtualenv_python=python3.4
extra_args='--no-input --upgrade --pre'
notify:
- restart brozzler-webconsole
- name: install upstart config /etc/init/brozzler-webconsole.conf
become: true
template: src=templates/brozzler-webconsole.conf.j2
dest=/etc/init/brozzler-webconsole.conf
notify:
- restart brozzler-webconsole

View File

@ -0,0 +1,21 @@
description "brozzler-webconsole"
start on runlevel [2345]
stop on runlevel [!2345]
env PYTHONPATH=/home/vagrant/brozzler-webconsole-ve34/lib/python3.4/site-packages:/home/vagrant/brozzler/webconsole
env PATH=/home/vagrant/brozzler-webconsole-ve34/bin:/usr/bin:/bin
env LC_ALL=C.UTF-8
env WAYBACK_BASEURL={{base_wayback_url}}/all
# env RETHINKDB_SERVERS={{groups['rethinkdb'] | join(',')}}
env RETHINKDB_SERVERS=localhost
env RETHINKDB_DB={{rethinkdb_db}}
setuid vagrant
# console log
exec gunicorn --bind=0.0.0.0:8081 brozzler-webconsole:app >&/vagrant/logs/brozzler-webconsole.log

View File

@ -0,0 +1,13 @@
---
- name: restart Xvnc
service: name=Xvnc state=restarted
become: true
- name: restart websockify
service: name=websockify state=restarted
become: true
- name: restart vnc-websock
service: name=vnc-websock state=restarted
become: true
- name: restart brozzler-worker
service: name=brozzler-worker state=restarted
become: true

View File

@ -0,0 +1,64 @@
---
- name: ensure canonical partner repo is in apt sources.list
apt_repository: repo='deb http://archive.canonical.com/ubuntu trusty partner'
state=present
become: true
- name: ensure required packages are installed
become: true
apt: name={{item}} state=present
with_items:
- python-virtualenv
- vnc4server
- chromium-browser
- xfonts-base
- fonts-arphic-bkai00mp
- fonts-arphic-bsmi00lp
- fonts-arphic-gbsn00lp
- fonts-arphic-gkai00mp
- fonts-arphic-ukai
- fonts-farsiweb
- fonts-nafees
- fonts-sil-abyssinica
- fonts-sil-ezra
- fonts-sil-padauk
- fonts-unfonts-extra
- fonts-unfonts-core
- ttf-indic-fonts
- fonts-thai-tlwg
- fonts-lklug-sinhala
- python3-pip
- git
- libjpeg-turbo8-dev
- zlib1g-dev
- gcc
- libpython3.4-dev
- adobe-flashplugin
- name: install Xvnc upstart config /etc/init/Xvnc.conf
template: src=templates/Xvnc.conf.j2 dest=/etc/init/Xvnc.conf
become: true
notify:
- restart Xvnc
- name: install websockify in virtualenv
pip: name=git+https://github.com/kanaka/websockify.git#egg=websockify
virtualenv=/home/vagrant/websockify-ve34
virtualenv_python=python3.4
extra_args='--no-input --upgrade --pre'
- name: install vnc-websock upstart config /etc/init/vnc-websock.conf
template: src=templates/vnc-websock.conf.j2 dest=/etc/init/vnc-websock.conf
become: true
notify:
- restart vnc-websock
- name: install brozzler in virtualenv
become: true
pip: # name=git+https://github.com/internetarchive/brozzler.git#egg=brozzler
name='-e /brozzler'
virtualenv=/home/vagrant/brozzler-ve34
virtualenv_python=python3.4
extra_args='--no-input --upgrade --pre'
notify:
- restart brozzler-worker
- name: install brozzler-worker upstart config /etc/init/brozzler-worker.conf
template: src=templates/brozzler-worker.conf.j2 dest=/etc/init/brozzler-worker.conf
become: true
notify:
- restart brozzler-worker

View File

@ -0,0 +1,14 @@
description "Xvnc"
start on runlevel [2345]
stop on runlevel [!2345]
setuid vagrant
console log
exec nice Xvnc4 :1 -auth /tmp/Xauthority.vagrant \
-geometry 1600x1000 -depth 24 -rfbwait 0 -nolisten tcp -rfbport 5901 \
-SecurityTypes None -pn -fp /usr/share/fonts/X11/misc/ -co /etc/X11/rgb \
AcceptCutText=0 AcceptPointerEvents=0 AcceptKeyEvents=0

View File

@ -0,0 +1,25 @@
description "brozzler-worker"
start on runlevel [2345]
stop on runlevel [!2345]
env DISPLAY=:1
env PATH=/home/vagrant/brozzler-ve34/bin:/usr/bin:/bin
env PYTHONPATH=/home/vagrant/brozzler-ve34/lib/python3.4/site-packages
env LANG=C.UTF-8
setuid vagrant
# console log
# depends on vnc server
start on started Xvnc
stop on stopping Xvnc
kill timeout 60
exec nice brozzler-worker \
--rethinkdb-servers=localhost \
--max-browsers=4 >>/vagrant/logs/brozzler-worker.log 2>&1
# --rethinkdb-servers={{groups['rethinkdb'] | join(',')}} \

View File

@ -0,0 +1,14 @@
description "vnc-websock"
start on runlevel [2345]
stop on runlevel [!2345]
setuid vagrant
console log
env PYTHONPATH=/home/vagrant/websockify-ve34/lib/python3.4/site-packages
env PATH=/home/vagrant/websockify-ve34/bin:/usr/bin:/bin
exec nice websockify 0.0.0.0:8901 localhost:5901

View File

@ -0,0 +1,4 @@
---
- name: ensure logs directory exists
file: path=/vagrant/logs state=directory
become: true

View File

@ -0,0 +1,4 @@
---
- name: restart rethinkdb
service: name=rethinkdb state=restarted
become: true

View File

@ -0,0 +1,21 @@
---
- name: ensure rethinkdb apt public key is trusted
apt_key: url=http://download.rethinkdb.com/apt/pubkey.gpg
become: true
- name: ensure rethinkdb repo is in apt sources.list
apt_repository: repo='deb http://download.rethinkdb.com/apt trusty main'
state=present
become: true
- name: ensure rethinkdb package is installed
apt: name=rethinkdb state=present
become: true
notify:
- restart rethinkdb
- name: ensure rethinkdb starts on reboot
service: name=rethinkdb enabled=yes
- name: ensure rethinkdb instance config file is installed
template: src=templates/rethinkdb-brozzler-easy.conf.j2
dest=/etc/rethinkdb/instances.d/rethinkdb-brozzler-easy.conf
become: true
notify:
- restart rethinkdb

View File

@ -0,0 +1,5 @@
runuser=vagrant
# bind=0.0.0.0
# directory=/var/lib/rethinkdb
# log-file=/var/log/rethinkdb.log
log-file=/vagrant/logs/rethinkdb.log # synced dir

View File

@ -0,0 +1,14 @@
---
# - name: start warcprox
# environment:
# PYTHONPATH: /home/vagrant/warcprox-ve34/lib/python3.4/site-packages
# PATH: /home/vagrant/warcprox-ve34/bin:/usr/bin:/bin
# args:
# executable: /bin/bash
# shell: nice warcprox --dir=/vagrant/warcs --base32 --gzip
# --rollover-idle-time=180 --cacert=/vagrant/warcprox-ca.pem
# --onion-tor-socks-proxy=localhost:9050 --rethinkdb-servers=localhost
# --rethinkdb-big-table &> /vagrant/logs/warcprox.out &
- name: restart warcprox
service: name=warcprox state=restarted
become: true

View File

@ -0,0 +1,25 @@
---
- name: ensure required packages are installed
become: true
apt: name={{item}} state=present
with_items:
- gcc
- python-virtualenv
- python3.4
- libpython3.4-dev
- libffi-dev
- libssl-dev
- tor
- git
- name: install warcprox in virtualenv
pip: name=git+https://github.com/internetarchive/warcprox.git@2.x#egg=warcprox
virtualenv=/home/vagrant/warcprox-ve34
virtualenv_python=python3.4
extra_args='--no-input --upgrade --pre'
notify:
- restart warcprox
- name: install upstart config /etc/init/warcprox.conf
become: true
template: src=templates/warcprox.conf.j2 dest=/etc/init/warcprox.conf
notify:
- restart warcprox

View File

@ -0,0 +1,26 @@
description "warcprox"
start on runlevel [2345]
stop on runlevel [!2345]
env PYTHONPATH=/home/vagrant/warcprox-ve34/lib/python3.4/site-packages
env PATH=/home/vagrant/warcprox-ve34/bin:/usr/bin:/bin
# by default warcprox creates some files/dirs relative to cwd
chdir /home/vagrant
setuid vagrant
# console log
# --profile
exec nice warcprox \
--dir=/vagrant/warcs \
--base32 \
--gzip \
--rollover-idle-time=180 \
--cacert=/vagrant/warcprox-ca.pem \
--onion-tor-socks-proxy=localhost:9050 \
--rethinkdb-servers=localhost \
--rethinkdb-db=brozzler \
--rethinkdb-big-table >>/vagrant/logs/warcprox.log 2>&1
# --rethinkdb-servers={{groups['rethinkdb'] | join(',')}} \