mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-06-20 12:54:23 -04:00
rethinkstuff is now "doublethink
This commit is contained in:
parent
700b08b7d7
commit
569af05b11
12 changed files with 238 additions and 234 deletions
|
@ -26,7 +26,7 @@ import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import requests
|
import requests
|
||||||
import rethinkstuff
|
import doublethink
|
||||||
import signal
|
import signal
|
||||||
import string
|
import string
|
||||||
import sys
|
import sys
|
||||||
|
@ -37,6 +37,7 @@ import warnings
|
||||||
import yaml
|
import yaml
|
||||||
import shutil
|
import shutil
|
||||||
import base64
|
import base64
|
||||||
|
import rethinkdb as r
|
||||||
|
|
||||||
def add_common_options(arg_parser):
|
def add_common_options(arg_parser):
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
|
@ -78,7 +79,7 @@ def rethinker(args):
|
||||||
servers = args.rethinkdb_servers or 'localhost'
|
servers = args.rethinkdb_servers or 'localhost'
|
||||||
db = args.rethinkdb_db or os.environ.get(
|
db = args.rethinkdb_db or os.environ.get(
|
||||||
'BROZZLER_RETHINKDB_DB') or 'brozzler'
|
'BROZZLER_RETHINKDB_DB') or 'brozzler'
|
||||||
return rethinkstuff.Rethinker(servers.split(','), db)
|
return doublethink.Rethinker(servers.split(','), db)
|
||||||
|
|
||||||
def _add_proxy_options(arg_parser):
|
def _add_proxy_options(arg_parser):
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
|
@ -222,8 +223,8 @@ def brozzler_new_job():
|
||||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||||
configure_logging(args)
|
configure_logging(args)
|
||||||
|
|
||||||
r = rethinker(args)
|
rr = rethinker(args)
|
||||||
frontier = brozzler.RethinkDbFrontier(r)
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
try:
|
try:
|
||||||
brozzler.job.new_job_file(frontier, args.job_conf_file)
|
brozzler.job.new_job_file(frontier, args.job_conf_file)
|
||||||
except brozzler.job.InvalidJobConf as e:
|
except brozzler.job.InvalidJobConf as e:
|
||||||
|
@ -273,8 +274,8 @@ def brozzler_new_site():
|
||||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||||
configure_logging(args)
|
configure_logging(args)
|
||||||
|
|
||||||
r = rethinker(args)
|
rr = rethinker(args)
|
||||||
site = brozzler.Site(r, {
|
site = brozzler.Site(rr, {
|
||||||
'seed': args.seed,
|
'seed': args.seed,
|
||||||
'proxy': args.proxy,
|
'proxy': args.proxy,
|
||||||
'time_limit': int(args.time_limit) if args.time_limit else None,
|
'time_limit': int(args.time_limit) if args.time_limit else None,
|
||||||
|
@ -287,7 +288,7 @@ def brozzler_new_site():
|
||||||
'username': args.username,
|
'username': args.username,
|
||||||
'password': args.password})
|
'password': args.password})
|
||||||
|
|
||||||
frontier = brozzler.RethinkDbFrontier(r)
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
brozzler.new_site(frontier, site)
|
brozzler.new_site(frontier, site)
|
||||||
|
|
||||||
def brozzler_worker():
|
def brozzler_worker():
|
||||||
|
@ -296,7 +297,7 @@ def brozzler_worker():
|
||||||
rethinkdb, brozzles them.
|
rethinkdb, brozzles them.
|
||||||
'''
|
'''
|
||||||
arg_parser = argparse.ArgumentParser(
|
arg_parser = argparse.ArgumentParser(
|
||||||
prog=os.path.basename(__file__),
|
prog=os.path.basename(sys.argv[0]),
|
||||||
formatter_class=BetterArgumentDefaultsHelpFormatter)
|
formatter_class=BetterArgumentDefaultsHelpFormatter)
|
||||||
add_rethinkdb_options(arg_parser)
|
add_rethinkdb_options(arg_parser)
|
||||||
arg_parser.add_argument(
|
arg_parser.add_argument(
|
||||||
|
@ -341,9 +342,9 @@ def brozzler_worker():
|
||||||
signal.signal(signal.SIGTERM, sigterm)
|
signal.signal(signal.SIGTERM, sigterm)
|
||||||
signal.signal(signal.SIGINT, sigint)
|
signal.signal(signal.SIGINT, sigint)
|
||||||
|
|
||||||
r = rethinker(args)
|
rr = rethinker(args)
|
||||||
frontier = brozzler.RethinkDbFrontier(r)
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
service_registry = rethinkstuff.ServiceRegistry(r)
|
service_registry = doublethink.ServiceRegistry(rr)
|
||||||
worker = brozzler.worker.BrozzlerWorker(
|
worker = brozzler.worker.BrozzlerWorker(
|
||||||
frontier, service_registry, max_browsers=int(args.max_browsers),
|
frontier, service_registry, max_browsers=int(args.max_browsers),
|
||||||
chrome_exe=args.chrome_exe)
|
chrome_exe=args.chrome_exe)
|
||||||
|
@ -369,13 +370,13 @@ def brozzler_ensure_tables():
|
||||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||||
configure_logging(args)
|
configure_logging(args)
|
||||||
|
|
||||||
r = rethinker(args)
|
rr = rethinker(args)
|
||||||
|
|
||||||
# services table
|
# services table
|
||||||
rethinkstuff.ServiceRegistry(r)
|
doublethink.ServiceRegistry(rr)
|
||||||
|
|
||||||
# sites, pages, jobs tables
|
# sites, pages, jobs tables
|
||||||
brozzler.frontier.RethinkDbFrontier(r)
|
brozzler.frontier.RethinkDbFrontier(rr)
|
||||||
|
|
||||||
class Jsonner(json.JSONEncoder):
|
class Jsonner(json.JSONEncoder):
|
||||||
def default(self, o):
|
def default(self, o):
|
||||||
|
@ -402,8 +403,8 @@ def brozzler_list_jobs():
|
||||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||||
configure_logging(args)
|
configure_logging(args)
|
||||||
|
|
||||||
r = rethinker(args)
|
rr = rethinker(args)
|
||||||
reql = r.table('jobs').order_by('id')
|
reql = rr.table('jobs').order_by('id')
|
||||||
if not args.all:
|
if not args.all:
|
||||||
reql = reql.filter({'status': 'ACTIVE'})
|
reql = reql.filter({'status': 'ACTIVE'})
|
||||||
logging.debug('querying rethinkdb: %s', reql)
|
logging.debug('querying rethinkdb: %s', reql)
|
||||||
|
@ -439,9 +440,9 @@ def brozzler_list_sites():
|
||||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||||
configure_logging(args)
|
configure_logging(args)
|
||||||
|
|
||||||
r = rethinker(args)
|
rr = rethinker(args)
|
||||||
|
|
||||||
reql = r.table('sites')
|
reql = rr.table('sites')
|
||||||
if args.job:
|
if args.job:
|
||||||
try:
|
try:
|
||||||
job_id = int(args.job)
|
job_id = int(args.job)
|
||||||
|
@ -493,13 +494,13 @@ def brozzler_list_pages():
|
||||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||||
configure_logging(args)
|
configure_logging(args)
|
||||||
|
|
||||||
r = rethinker(args)
|
rr = rethinker(args)
|
||||||
if args.job:
|
if args.job:
|
||||||
try:
|
try:
|
||||||
job_id = int(args.job)
|
job_id = int(args.job)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
job_id = args.job
|
job_id = args.job
|
||||||
reql = r.table('sites').get_all(job_id, index='job_id')['id']
|
reql = rr.table('sites').get_all(job_id, index='job_id')['id']
|
||||||
logging.debug('querying rethinkb: %s', reql)
|
logging.debug('querying rethinkb: %s', reql)
|
||||||
site_ids = reql.run()
|
site_ids = reql.run()
|
||||||
else:
|
else:
|
||||||
|
@ -509,7 +510,7 @@ def brozzler_list_pages():
|
||||||
site_ids = [args.site]
|
site_ids = [args.site]
|
||||||
|
|
||||||
for site_id in site_ids:
|
for site_id in site_ids:
|
||||||
reql = r.table('pages')
|
reql = rr.table('pages')
|
||||||
if args.queued:
|
if args.queued:
|
||||||
reql = reql.between(
|
reql = reql.between(
|
||||||
[site_id, 0, r.minval], [site_id, 0, r.maxval],
|
[site_id, 0, r.minval], [site_id, 0, r.maxval],
|
||||||
|
@ -541,7 +542,6 @@ def brozzler_list_captures():
|
||||||
url or sha1.
|
url or sha1.
|
||||||
'''
|
'''
|
||||||
import surt
|
import surt
|
||||||
import rethinkdb
|
|
||||||
|
|
||||||
arg_parser = argparse.ArgumentParser(
|
arg_parser = argparse.ArgumentParser(
|
||||||
prog=os.path.basename(sys.argv[0]),
|
prog=os.path.basename(sys.argv[0]),
|
||||||
|
@ -563,7 +563,7 @@ def brozzler_list_captures():
|
||||||
args = arg_parser.parse_args(args=sys.argv[1:])
|
args = arg_parser.parse_args(args=sys.argv[1:])
|
||||||
configure_logging(args)
|
configure_logging(args)
|
||||||
|
|
||||||
r = rethinker(args)
|
rr = rethinker(args)
|
||||||
|
|
||||||
if args.url_or_sha1[:5] == 'sha1:':
|
if args.url_or_sha1[:5] == 'sha1:':
|
||||||
if args.prefix:
|
if args.prefix:
|
||||||
|
@ -572,9 +572,9 @@ def brozzler_list_captures():
|
||||||
'to lookup by sha1')
|
'to lookup by sha1')
|
||||||
# assumes it's already base32 (XXX could detect if hex and convert)
|
# assumes it's already base32 (XXX could detect if hex and convert)
|
||||||
sha1base32 = args.url_or_sha1[5:].upper()
|
sha1base32 = args.url_or_sha1[5:].upper()
|
||||||
reql = r.table('captures').between(
|
reql = rr.table('captures').between(
|
||||||
[sha1base32, rethinkdb.minval, rethinkdb.minval],
|
[sha1base32, r.minval, r.minval],
|
||||||
[sha1base32, rethinkdb.maxval, rethinkdb.maxval],
|
[sha1base32, r.maxval, r.maxval],
|
||||||
index='sha1_warc_type')
|
index='sha1_warc_type')
|
||||||
logging.debug('querying rethinkdb: %s', reql)
|
logging.debug('querying rethinkdb: %s', reql)
|
||||||
results = reql.run()
|
results = reql.run()
|
||||||
|
@ -590,9 +590,9 @@ def brozzler_list_captures():
|
||||||
else:
|
else:
|
||||||
abbr_end_key = key[:150]
|
abbr_end_key = key[:150]
|
||||||
end_key = key
|
end_key = key
|
||||||
reql = r.table('captures').between(
|
reql = rr.table('captures').between(
|
||||||
[abbr_start_key, rethinkdb.minval],
|
[abbr_start_key, r.minval],
|
||||||
[abbr_end_key, rethinkdb.maxval],
|
[abbr_end_key, r.maxval],
|
||||||
index='abbr_canon_surt_timestamp', right_bound='closed')
|
index='abbr_canon_surt_timestamp', right_bound='closed')
|
||||||
reql = reql.order_by(index='abbr_canon_surt_timestamp')
|
reql = reql.order_by(index='abbr_canon_surt_timestamp')
|
||||||
reql = reql.filter(
|
reql = reql.filter(
|
||||||
|
|
|
@ -27,11 +27,11 @@ except ImportError as e:
|
||||||
'brozzler[dashboard]".\nSee README.rst for more information.',
|
'brozzler[dashboard]".\nSee README.rst for more information.',
|
||||||
type(e).__name__, e)
|
type(e).__name__, e)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
import rethinkstuff
|
import doublethink
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import importlib
|
import importlib
|
||||||
import rethinkdb
|
import rethinkdb as r
|
||||||
import yaml
|
import yaml
|
||||||
import base64
|
import base64
|
||||||
|
|
||||||
|
@ -45,19 +45,19 @@ SETTINGS = {
|
||||||
'WAYBACK_BASEURL': os.environ.get(
|
'WAYBACK_BASEURL': os.environ.get(
|
||||||
'WAYBACK_BASEURL', 'http://localhost:8880/brozzler'),
|
'WAYBACK_BASEURL', 'http://localhost:8880/brozzler'),
|
||||||
}
|
}
|
||||||
r = rethinkstuff.Rethinker(
|
rr = doublethink.Rethinker(
|
||||||
SETTINGS['RETHINKDB_SERVERS'], db=SETTINGS['RETHINKDB_DB'])
|
SETTINGS['RETHINKDB_SERVERS'], db=SETTINGS['RETHINKDB_DB'])
|
||||||
_svc_reg = None
|
_svc_reg = None
|
||||||
def service_registry():
|
def service_registry():
|
||||||
global _svc_reg
|
global _svc_reg
|
||||||
if not _svc_reg:
|
if not _svc_reg:
|
||||||
_svc_reg = rethinkstuff.ServiceRegistry(r)
|
_svc_reg = doublethink.ServiceRegistry(rr)
|
||||||
return _svc_reg
|
return _svc_reg
|
||||||
|
|
||||||
@app.route("/api/sites/<site_id>/queued_count")
|
@app.route("/api/sites/<site_id>/queued_count")
|
||||||
@app.route("/api/site/<site_id>/queued_count")
|
@app.route("/api/site/<site_id>/queued_count")
|
||||||
def queued_count(site_id):
|
def queued_count(site_id):
|
||||||
reql = r.table("pages").between(
|
reql = rr.table("pages").between(
|
||||||
[site_id, 0, False, r.minval], [site_id, 0, False, r.maxval],
|
[site_id, 0, False, r.minval], [site_id, 0, False, r.maxval],
|
||||||
index="priority_by_site").count()
|
index="priority_by_site").count()
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
|
@ -70,7 +70,7 @@ def queue(site_id):
|
||||||
logging.debug("flask.request.args=%s", flask.request.args)
|
logging.debug("flask.request.args=%s", flask.request.args)
|
||||||
start = flask.request.args.get("start", 0)
|
start = flask.request.args.get("start", 0)
|
||||||
end = flask.request.args.get("end", start + 90)
|
end = flask.request.args.get("end", start + 90)
|
||||||
reql = r.table("pages").between(
|
reql = rr.table("pages").between(
|
||||||
[site_id, 0, False, r.minval], [site_id, 0, False, r.maxval],
|
[site_id, 0, False, r.minval], [site_id, 0, False, r.maxval],
|
||||||
index="priority_by_site")[start:end]
|
index="priority_by_site")[start:end]
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
|
@ -82,7 +82,7 @@ def queue(site_id):
|
||||||
@app.route("/api/sites/<site_id>/page_count")
|
@app.route("/api/sites/<site_id>/page_count")
|
||||||
@app.route("/api/site/<site_id>/page_count")
|
@app.route("/api/site/<site_id>/page_count")
|
||||||
def page_count(site_id):
|
def page_count(site_id):
|
||||||
reql = r.table("pages").between(
|
reql = rr.table("pages").between(
|
||||||
[site_id, 1, False, r.minval],
|
[site_id, 1, False, r.minval],
|
||||||
[site_id, r.maxval, False, r.maxval],
|
[site_id, r.maxval, False, r.maxval],
|
||||||
index="priority_by_site").count()
|
index="priority_by_site").count()
|
||||||
|
@ -96,7 +96,7 @@ def pages(site_id):
|
||||||
"""Pages already crawled."""
|
"""Pages already crawled."""
|
||||||
start = int(flask.request.args.get("start", 0))
|
start = int(flask.request.args.get("start", 0))
|
||||||
end = int(flask.request.args.get("end", start + 90))
|
end = int(flask.request.args.get("end", start + 90))
|
||||||
reql = r.table("pages").between(
|
reql = rr.table("pages").between(
|
||||||
[site_id, 1, r.minval], [site_id, r.maxval, r.maxval],
|
[site_id, 1, r.minval], [site_id, r.maxval, r.maxval],
|
||||||
index="least_hops").order_by(index="least_hops")[start:end]
|
index="least_hops").order_by(index="least_hops")[start:end]
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
|
@ -106,7 +106,7 @@ def pages(site_id):
|
||||||
@app.route("/api/pages/<page_id>")
|
@app.route("/api/pages/<page_id>")
|
||||||
@app.route("/api/page/<page_id>")
|
@app.route("/api/page/<page_id>")
|
||||||
def page(page_id):
|
def page(page_id):
|
||||||
reql = r.table("pages").get(page_id)
|
reql = rr.table("pages").get(page_id)
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
page_ = reql.run()
|
page_ = reql.run()
|
||||||
return flask.jsonify(page_)
|
return flask.jsonify(page_)
|
||||||
|
@ -114,7 +114,7 @@ def page(page_id):
|
||||||
@app.route("/api/pages/<page_id>/yaml")
|
@app.route("/api/pages/<page_id>/yaml")
|
||||||
@app.route("/api/page/<page_id>/yaml")
|
@app.route("/api/page/<page_id>/yaml")
|
||||||
def page_yaml(page_id):
|
def page_yaml(page_id):
|
||||||
reql = r.table("pages").get(page_id)
|
reql = rr.table("pages").get(page_id)
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
page_ = reql.run()
|
page_ = reql.run()
|
||||||
return app.response_class(
|
return app.response_class(
|
||||||
|
@ -124,7 +124,7 @@ def page_yaml(page_id):
|
||||||
@app.route("/api/sites/<site_id>")
|
@app.route("/api/sites/<site_id>")
|
||||||
@app.route("/api/site/<site_id>")
|
@app.route("/api/site/<site_id>")
|
||||||
def site(site_id):
|
def site(site_id):
|
||||||
reql = r.table("sites").get(site_id)
|
reql = rr.table("sites").get(site_id)
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
s = reql.run()
|
s = reql.run()
|
||||||
if "cookie_db" in s:
|
if "cookie_db" in s:
|
||||||
|
@ -134,7 +134,7 @@ def site(site_id):
|
||||||
@app.route("/api/sites/<site_id>/yaml")
|
@app.route("/api/sites/<site_id>/yaml")
|
||||||
@app.route("/api/site/<site_id>/yaml")
|
@app.route("/api/site/<site_id>/yaml")
|
||||||
def site_yaml(site_id):
|
def site_yaml(site_id):
|
||||||
reql = r.table("sites").get(site_id)
|
reql = rr.table("sites").get(site_id)
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
site_ = reql.run()
|
site_ = reql.run()
|
||||||
return app.response_class(
|
return app.response_class(
|
||||||
|
@ -143,7 +143,7 @@ def site_yaml(site_id):
|
||||||
|
|
||||||
@app.route("/api/stats/<bucket>")
|
@app.route("/api/stats/<bucket>")
|
||||||
def stats(bucket):
|
def stats(bucket):
|
||||||
reql = r.table("stats").get(bucket)
|
reql = rr.table("stats").get(bucket)
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
stats_ = reql.run()
|
stats_ = reql.run()
|
||||||
return flask.jsonify(stats_)
|
return flask.jsonify(stats_)
|
||||||
|
@ -155,7 +155,7 @@ def sites(job_id):
|
||||||
jid = int(job_id)
|
jid = int(job_id)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
jid = job_id
|
jid = job_id
|
||||||
reql = r.table("sites").get_all(jid, index="job_id")
|
reql = rr.table("sites").get_all(jid, index="job_id")
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
sites_ = list(reql.run())
|
sites_ = list(reql.run())
|
||||||
# TypeError: <binary, 7168 bytes, '53 51 4c 69 74 65...'> is not JSON serializable
|
# TypeError: <binary, 7168 bytes, '53 51 4c 69 74 65...'> is not JSON serializable
|
||||||
|
@ -167,7 +167,7 @@ def sites(job_id):
|
||||||
@app.route("/api/jobless-sites")
|
@app.route("/api/jobless-sites")
|
||||||
def jobless_sites():
|
def jobless_sites():
|
||||||
# XXX inefficient (unindexed) query
|
# XXX inefficient (unindexed) query
|
||||||
reql = r.table("sites").filter(~r.row.has_fields("job_id"))
|
reql = rr.table("sites").filter(~r.row.has_fields("job_id"))
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
sites_ = list(reql.run())
|
sites_ = list(reql.run())
|
||||||
# TypeError: <binary, 7168 bytes, '53 51 4c 69 74 65...'> is not JSON serializable
|
# TypeError: <binary, 7168 bytes, '53 51 4c 69 74 65...'> is not JSON serializable
|
||||||
|
@ -183,7 +183,7 @@ def job(job_id):
|
||||||
jid = int(job_id)
|
jid = int(job_id)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
jid = job_id
|
jid = job_id
|
||||||
reql = r.table("jobs").get(jid)
|
reql = rr.table("jobs").get(jid)
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
job_ = reql.run()
|
job_ = reql.run()
|
||||||
return flask.jsonify(job_)
|
return flask.jsonify(job_)
|
||||||
|
@ -195,7 +195,7 @@ def job_yaml(job_id):
|
||||||
jid = int(job_id)
|
jid = int(job_id)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
jid = job_id
|
jid = job_id
|
||||||
reql = r.table("jobs").get(jid)
|
reql = rr.table("jobs").get(jid)
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
job_ = reql.run()
|
job_ = reql.run()
|
||||||
return app.response_class(
|
return app.response_class(
|
||||||
|
@ -214,7 +214,7 @@ def services():
|
||||||
|
|
||||||
@app.route("/api/jobs")
|
@app.route("/api/jobs")
|
||||||
def jobs():
|
def jobs():
|
||||||
reql = r.table("jobs").order_by(rethinkdb.desc("id"))
|
reql = rr.table("jobs").order_by(r.desc("id"))
|
||||||
logging.debug("querying rethinkdb: %s", reql)
|
logging.debug("querying rethinkdb: %s", reql)
|
||||||
jobs_ = list(reql.run())
|
jobs_ = list(reql.run())
|
||||||
return flask.jsonify(jobs=jobs_)
|
return flask.jsonify(jobs=jobs_)
|
||||||
|
|
|
@ -42,7 +42,7 @@ import socket
|
||||||
import signal
|
import signal
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import rethinkstuff
|
import doublethink
|
||||||
import traceback
|
import traceback
|
||||||
import socketserver
|
import socketserver
|
||||||
|
|
||||||
|
@ -133,10 +133,10 @@ class BrozzlerEasyController:
|
||||||
brozzler.dashboard.app, ThreadingWSGIServer)
|
brozzler.dashboard.app, ThreadingWSGIServer)
|
||||||
|
|
||||||
def _init_brozzler_worker(self, args):
|
def _init_brozzler_worker(self, args):
|
||||||
r = rethinkstuff.Rethinker(
|
rr = doublethink.Rethinker(
|
||||||
args.rethinkdb_servers.split(","), args.rethinkdb_db)
|
args.rethinkdb_servers.split(","), args.rethinkdb_db)
|
||||||
frontier = brozzler.RethinkDbFrontier(r)
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
service_registry = rethinkstuff.ServiceRegistry(r)
|
service_registry = doublethink.ServiceRegistry(rr)
|
||||||
worker = brozzler.worker.BrozzlerWorker(
|
worker = brozzler.worker.BrozzlerWorker(
|
||||||
frontier, service_registry,
|
frontier, service_registry,
|
||||||
max_browsers=args.max_browsers,
|
max_browsers=args.max_browsers,
|
||||||
|
|
|
@ -21,8 +21,8 @@ import brozzler
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
import datetime
|
import datetime
|
||||||
import rethinkdb
|
import rethinkdb as r
|
||||||
import rethinkstuff
|
import doublethink
|
||||||
|
|
||||||
class UnexpectedDbResult(Exception):
|
class UnexpectedDbResult(Exception):
|
||||||
pass
|
pass
|
||||||
|
@ -30,57 +30,55 @@ class UnexpectedDbResult(Exception):
|
||||||
class RethinkDbFrontier:
|
class RethinkDbFrontier:
|
||||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||||
|
|
||||||
def __init__(self, r, shards=None, replicas=None):
|
def __init__(self, rr, shards=None, replicas=None):
|
||||||
self.r = r
|
self.rr = rr
|
||||||
self.shards = shards or len(r.servers)
|
self.shards = shards or len(rr.servers)
|
||||||
self.replicas = replicas or min(len(r.servers), 3)
|
self.replicas = replicas or min(len(rr.servers), 3)
|
||||||
self._ensure_db()
|
self._ensure_db()
|
||||||
|
|
||||||
def _ensure_db(self):
|
def _ensure_db(self):
|
||||||
dbs = self.r.db_list().run()
|
dbs = self.rr.db_list().run()
|
||||||
if not self.r.dbname in dbs:
|
if not self.rr.dbname in dbs:
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"creating rethinkdb database %s", repr(self.r.dbname))
|
"creating rethinkdb database %s", repr(self.rr.dbname))
|
||||||
self.r.db_create(self.r.dbname).run()
|
self.rr.db_create(self.rr.dbname).run()
|
||||||
tables = self.r.table_list().run()
|
tables = self.rr.table_list().run()
|
||||||
if not "sites" in tables:
|
if not "sites" in tables:
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"creating rethinkdb table 'sites' in database %s",
|
"creating rethinkdb table 'sites' in database %s",
|
||||||
repr(self.r.dbname))
|
repr(self.rr.dbname))
|
||||||
self.r.table_create(
|
self.rr.table_create(
|
||||||
"sites", shards=self.shards, replicas=self.replicas).run()
|
"sites", shards=self.shards, replicas=self.replicas).run()
|
||||||
self.r.table("sites").index_create(
|
self.rr.table("sites").index_create("sites_last_disclaimed", [
|
||||||
"sites_last_disclaimed", [
|
r.row["status"], r.row["last_disclaimed"]]).run()
|
||||||
self.r.row["status"],
|
self.rr.table("sites").index_create("job_id").run()
|
||||||
self.r.row["last_disclaimed"]]).run()
|
|
||||||
self.r.table("sites").index_create("job_id").run()
|
|
||||||
if not "pages" in tables:
|
if not "pages" in tables:
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"creating rethinkdb table 'pages' in database %s",
|
"creating rethinkdb table 'pages' in database %s",
|
||||||
repr(self.r.dbname))
|
repr(self.rr.dbname))
|
||||||
self.r.table_create(
|
self.rr.table_create(
|
||||||
"pages", shards=self.shards, replicas=self.replicas).run()
|
"pages", shards=self.shards, replicas=self.replicas).run()
|
||||||
self.r.table("pages").index_create(
|
self.rr.table("pages").index_create("priority_by_site", [
|
||||||
"priority_by_site", [
|
r.row["site_id"], r.row["brozzle_count"],
|
||||||
self.r.row["site_id"], self.r.row["brozzle_count"],
|
r.row["claimed"], r.row["priority"]]).run()
|
||||||
self.r.row["claimed"], self.r.row["priority"]]).run()
|
|
||||||
# this index is for displaying pages in a sensible order in the web
|
# this index is for displaying pages in a sensible order in the web
|
||||||
# console
|
# console
|
||||||
self.r.table("pages").index_create(
|
self.rr.table("pages").index_create("least_hops", [
|
||||||
"least_hops", [
|
r.row["site_id"], r.row["brozzle_count"],
|
||||||
self.r.row["site_id"], self.r.row["brozzle_count"],
|
r.row["hops_from_seed"]]).run()
|
||||||
self.r.row["hops_from_seed"]]).run()
|
|
||||||
if not "jobs" in tables:
|
if not "jobs" in tables:
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"creating rethinkdb table 'jobs' in database %s",
|
"creating rethinkdb table 'jobs' in database %s",
|
||||||
repr(self.r.dbname))
|
repr(self.rr.dbname))
|
||||||
self.r.table_create(
|
self.rr.table_create(
|
||||||
"jobs", shards=self.shards, replicas=self.replicas).run()
|
"jobs", shards=self.shards, replicas=self.replicas).run()
|
||||||
|
|
||||||
def _vet_result(self, result, **kwargs):
|
def _vet_result(self, result, **kwargs):
|
||||||
# self.logger.debug("vetting expected=%s result=%s", kwargs, result)
|
# self.logger.debug("vetting expected=%s result=%s", kwargs, result)
|
||||||
# {'replaced': 0, 'errors': 0, 'skipped': 0, 'inserted': 1, 'deleted': 0, 'generated_keys': ['292859c1-4926-4b27-9d87-b2c367667058'], 'unchanged': 0}
|
# {'replaced': 0, 'errors': 0, 'skipped': 0, 'inserted': 1, 'deleted': 0, 'generated_keys': ['292859c1-4926-4b27-9d87-b2c367667058'], 'unchanged': 0}
|
||||||
for k in ["replaced", "errors", "skipped", "inserted", "deleted", "unchanged"]:
|
for k in [
|
||||||
|
"replaced", "errors", "skipped", "inserted", "deleted",
|
||||||
|
"unchanged"]:
|
||||||
if k in kwargs:
|
if k in kwargs:
|
||||||
expected = kwargs[k]
|
expected = kwargs[k]
|
||||||
else:
|
else:
|
||||||
|
@ -96,29 +94,23 @@ class RethinkDbFrontier:
|
||||||
# XXX keep track of aggregate priority and prioritize sites accordingly?
|
# XXX keep track of aggregate priority and prioritize sites accordingly?
|
||||||
while True:
|
while True:
|
||||||
result = (
|
result = (
|
||||||
self.r.table("sites", read_mode="majority")
|
self.rr.table("sites", read_mode="majority")
|
||||||
.between(
|
.between(
|
||||||
["ACTIVE",rethinkdb.minval],
|
["ACTIVE", r.minval], ["ACTIVE", r.maxval],
|
||||||
["ACTIVE",rethinkdb.maxval],
|
|
||||||
index="sites_last_disclaimed")
|
index="sites_last_disclaimed")
|
||||||
.order_by(index="sites_last_disclaimed")
|
.order_by(index="sites_last_disclaimed")
|
||||||
.filter(
|
.filter((r.row["claimed"] != True) | (
|
||||||
(rethinkdb.row["claimed"] != True) |
|
r.row["last_claimed"] < r.now() - 2*60*60))
|
||||||
(rethinkdb.row["last_claimed"]
|
|
||||||
< rethinkdb.now() - 2*60*60))
|
|
||||||
.limit(1)
|
.limit(1)
|
||||||
.update(
|
.update(
|
||||||
# try to avoid a race condition resulting in multiple
|
# try to avoid a race condition resulting in multiple
|
||||||
# brozzler-workers claiming the same site
|
# brozzler-workers claiming the same site
|
||||||
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
|
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
|
||||||
rethinkdb.branch(
|
r.branch((r.row["claimed"] != True) | (
|
||||||
(rethinkdb.row["claimed"] != True) |
|
r.row["last_claimed"] < r.now() - 2*60*60), {
|
||||||
(rethinkdb.row["last_claimed"]
|
"claimed": True, "last_claimed_by": worker_id,
|
||||||
< rethinkdb.now() - 2*60*60), {
|
"last_claimed": doublethink.utcnow()}, {}),
|
||||||
"claimed": True,
|
return_changes=True)).run()
|
||||||
"last_claimed_by": worker_id,
|
|
||||||
"last_claimed": rethinkstuff.utcnow()
|
|
||||||
}, {}), return_changes=True)).run()
|
|
||||||
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
|
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
|
||||||
if result["replaced"] == 1:
|
if result["replaced"] == 1:
|
||||||
if result["changes"][0]["old_val"]["claimed"]:
|
if result["changes"][0]["old_val"]["claimed"]:
|
||||||
|
@ -128,7 +120,7 @@ class RethinkDbFrontier:
|
||||||
"at %s, and presumably some error stopped it from "
|
"at %s, and presumably some error stopped it from "
|
||||||
"being disclaimed",
|
"being disclaimed",
|
||||||
result["changes"][0]["old_val"]["last_claimed"])
|
result["changes"][0]["old_val"]["last_claimed"])
|
||||||
site = brozzler.Site(self.r, result["changes"][0]["new_val"])
|
site = brozzler.Site(self.rr, result["changes"][0]["new_val"])
|
||||||
else:
|
else:
|
||||||
raise brozzler.NothingToClaim
|
raise brozzler.NothingToClaim
|
||||||
# XXX This is the only place we enforce time limit for now. Worker
|
# XXX This is the only place we enforce time limit for now. Worker
|
||||||
|
@ -154,11 +146,11 @@ class RethinkDbFrontier:
|
||||||
# brozzler-worker can be working on a site at a time, and that would
|
# brozzler-worker can be working on a site at a time, and that would
|
||||||
# have to be the worker calling this method, so if something is claimed
|
# have to be the worker calling this method, so if something is claimed
|
||||||
# already, it must have been left that way because of some error
|
# already, it must have been left that way because of some error
|
||||||
result = self.r.table("pages").between(
|
result = self.rr.table("pages").between(
|
||||||
[site.id, 0, self.r.minval, self.r.minval],
|
[site.id, 0, r.minval, r.minval],
|
||||||
[site.id, 0, self.r.maxval, self.r.maxval],
|
[site.id, 0, r.maxval, r.maxval],
|
||||||
index="priority_by_site").order_by(
|
index="priority_by_site").order_by(
|
||||||
index=rethinkdb.desc("priority_by_site")).limit(
|
index=r.desc("priority_by_site")).limit(
|
||||||
1).update({
|
1).update({
|
||||||
"claimed":True,
|
"claimed":True,
|
||||||
"last_claimed_by":worker_id},
|
"last_claimed_by":worker_id},
|
||||||
|
@ -167,12 +159,12 @@ class RethinkDbFrontier:
|
||||||
if result["unchanged"] == 0 and result["replaced"] == 0:
|
if result["unchanged"] == 0 and result["replaced"] == 0:
|
||||||
raise brozzler.NothingToClaim
|
raise brozzler.NothingToClaim
|
||||||
else:
|
else:
|
||||||
return brozzler.Page(self.r, result["changes"][0]["new_val"])
|
return brozzler.Page(self.rr, result["changes"][0]["new_val"])
|
||||||
|
|
||||||
def has_outstanding_pages(self, site):
|
def has_outstanding_pages(self, site):
|
||||||
results_iter = self.r.table("pages").between(
|
results_iter = self.rr.table("pages").between(
|
||||||
[site.id, 0, self.r.minval, self.r.minval],
|
[site.id, 0, r.minval, r.minval],
|
||||||
[site.id, 0, self.r.maxval, self.r.maxval],
|
[site.id, 0, r.maxval, r.maxval],
|
||||||
index="priority_by_site").limit(1).run()
|
index="priority_by_site").limit(1).run()
|
||||||
return len(list(results_iter)) > 0
|
return len(list(results_iter)) > 0
|
||||||
|
|
||||||
|
@ -186,36 +178,37 @@ class RethinkDbFrontier:
|
||||||
site.save()
|
site.save()
|
||||||
|
|
||||||
def active_jobs(self):
|
def active_jobs(self):
|
||||||
results = self.r.table("jobs").filter({"status":"ACTIVE"}).run()
|
results = self.rr.table("jobs").filter({"status":"ACTIVE"}).run()
|
||||||
for result in results:
|
for result in results:
|
||||||
yield brozzler.Job(self.r, result)
|
yield brozzler.Job(self.rr, result)
|
||||||
|
|
||||||
def honor_stop_request(self, job_id):
|
def honor_stop_request(self, job_id):
|
||||||
"""Raises brozzler.CrawlJobStopped if stop has been requested."""
|
"""Raises brozzler.CrawlJobStopped if stop has been requested."""
|
||||||
job = brozzler.Job.load(self.r, job_id)
|
job = brozzler.Job.load(self.rr, job_id)
|
||||||
if job and job.get('stop_requested'):
|
if job and job.get('stop_requested'):
|
||||||
self.logger.info("stop requested for job %s", job_id)
|
self.logger.info("stop requested for job %s", job_id)
|
||||||
raise brozzler.CrawlJobStopped
|
raise brozzler.CrawlJobStopped
|
||||||
|
|
||||||
def _maybe_finish_job(self, job_id):
|
def _maybe_finish_job(self, job_id):
|
||||||
"""Returns True if job is finished."""
|
"""Returns True if job is finished."""
|
||||||
job = brozzler.Job.load(self.r, job_id)
|
job = brozzler.Job.load(self.rr, job_id)
|
||||||
if not job:
|
if not job:
|
||||||
return False
|
return False
|
||||||
if job.status.startswith("FINISH"):
|
if job.status.startswith("FINISH"):
|
||||||
self.logger.warn("%s is already %s", job, job.status)
|
self.logger.warn("%s is already %s", job, job.status)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
results = self.r.table("sites").get_all(job_id, index="job_id").run()
|
results = self.rr.table("sites").get_all(job_id, index="job_id").run()
|
||||||
n = 0
|
n = 0
|
||||||
for result in results:
|
for result in results:
|
||||||
site = brozzler.Site(self.r, result)
|
site = brozzler.Site(self.rr, result)
|
||||||
if not site.status.startswith("FINISH"):
|
if not site.status.startswith("FINISH"):
|
||||||
results.close()
|
results.close()
|
||||||
return False
|
return False
|
||||||
n += 1
|
n += 1
|
||||||
|
|
||||||
self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id)
|
self.logger.info(
|
||||||
|
"all %s sites finished, job %s is FINISHED!", n, job.id)
|
||||||
job.finish()
|
job.finish()
|
||||||
job.save()
|
job.save()
|
||||||
return True
|
return True
|
||||||
|
@ -224,8 +217,8 @@ class RethinkDbFrontier:
|
||||||
self.logger.info("%s %s", status, site)
|
self.logger.info("%s %s", status, site)
|
||||||
site.status = status
|
site.status = status
|
||||||
site.claimed = False
|
site.claimed = False
|
||||||
site.last_disclaimed = rethinkstuff.utcnow()
|
site.last_disclaimed = doublethink.utcnow()
|
||||||
site.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow()
|
site.starts_and_stops[-1]["stop"] = doublethink.utcnow()
|
||||||
site.save()
|
site.save()
|
||||||
if site.job_id:
|
if site.job_id:
|
||||||
self._maybe_finish_job(site.job_id)
|
self._maybe_finish_job(site.job_id)
|
||||||
|
@ -233,7 +226,7 @@ class RethinkDbFrontier:
|
||||||
def disclaim_site(self, site, page=None):
|
def disclaim_site(self, site, page=None):
|
||||||
self.logger.info("disclaiming %s", site)
|
self.logger.info("disclaiming %s", site)
|
||||||
site.claimed = False
|
site.claimed = False
|
||||||
site.last_disclaimed = rethinkstuff.utcnow()
|
site.last_disclaimed = doublethink.utcnow()
|
||||||
if not page and not self.has_outstanding_pages(site):
|
if not page and not self.has_outstanding_pages(site):
|
||||||
self.finished(site, "FINISHED")
|
self.finished(site, "FINISHED")
|
||||||
else:
|
else:
|
||||||
|
@ -245,25 +238,25 @@ class RethinkDbFrontier:
|
||||||
def resume_job(self, job):
|
def resume_job(self, job):
|
||||||
job.status = "ACTIVE"
|
job.status = "ACTIVE"
|
||||||
job.starts_and_stops.append(
|
job.starts_and_stops.append(
|
||||||
{"start":rethinkstuff.utcnow(), "stop":None})
|
{"start":doublethink.utcnow(), "stop":None})
|
||||||
job.save()
|
job.save()
|
||||||
for site in self.job_sites(job.id):
|
for site in self.job_sites(job.id):
|
||||||
site.status = "ACTIVE"
|
site.status = "ACTIVE"
|
||||||
site.starts_and_stops.append(
|
site.starts_and_stops.append(
|
||||||
{"start":rethinkstuff.utcnow(), "stop":None})
|
{"start":doublethink.utcnow(), "stop":None})
|
||||||
site.save()
|
site.save()
|
||||||
|
|
||||||
def resume_site(self, site):
|
def resume_site(self, site):
|
||||||
if site.job_id:
|
if site.job_id:
|
||||||
# can't call resume_job since that would resume jobs's other sites
|
# can't call resume_job since that would resume jobs's other sites
|
||||||
job = brozzler.job.load(self.r, site.job_id)
|
job = brozzler.Job.load(self.rr, site.job_id)
|
||||||
job.status = "ACTIVE"
|
job.status = "ACTIVE"
|
||||||
job.starts_and_stops.append(
|
job.starts_and_stops.append(
|
||||||
{"start":rethinkstuff.utcnow(), "stop":None})
|
{"start":doublethink.utcnow(), "stop":None})
|
||||||
job.save()
|
job.save()
|
||||||
site.status = "ACTIVE"
|
site.status = "ACTIVE"
|
||||||
site.starts_and_stops.append(
|
site.starts_and_stops.append(
|
||||||
{"start":rethinkstuff.utcnow(), "stop":None})
|
{"start":doublethink.utcnow(), "stop":None})
|
||||||
site.save()
|
site.save()
|
||||||
|
|
||||||
def scope_and_schedule_outlinks(self, site, parent_page, outlinks):
|
def scope_and_schedule_outlinks(self, site, parent_page, outlinks):
|
||||||
|
@ -278,12 +271,13 @@ class RethinkDbFrontier:
|
||||||
hops_off_surt = parent_page.hops_off_surt + 1
|
hops_off_surt = parent_page.hops_off_surt + 1
|
||||||
else:
|
else:
|
||||||
hops_off_surt = 0
|
hops_off_surt = 0
|
||||||
new_child_page = brozzler.Page(self.r, {
|
new_child_page = brozzler.Page(self.rr, {
|
||||||
'url': url, 'site_id': site.id, 'job_id': site.job_id,
|
'url': url, 'site_id': site.id, 'job_id': site.job_id,
|
||||||
'hops_from_seed': parent_page.hops_from_seed+1,
|
'hops_from_seed': parent_page.hops_from_seed+1,
|
||||||
'via_page_id': parent_page.id,
|
'via_page_id': parent_page.id,
|
||||||
'hops_off_surt': hops_off_surt})
|
'hops_off_surt': hops_off_surt})
|
||||||
existing_child_page = brozzler.Page.load(new_child_page.id)
|
existing_child_page = brozzler.Page.load(
|
||||||
|
self.rr, new_child_page.id)
|
||||||
if existing_child_page:
|
if existing_child_page:
|
||||||
existing_child_page.priority += new_child_page.priority
|
existing_child_page.priority += new_child_page.priority
|
||||||
existing_child_page.save()
|
existing_child_page.save()
|
||||||
|
@ -314,22 +308,24 @@ class RethinkDbFrontier:
|
||||||
def reached_limit(self, site, e):
|
def reached_limit(self, site, e):
|
||||||
self.logger.info("reached_limit site=%s e=%s", site, e)
|
self.logger.info("reached_limit site=%s e=%s", site, e)
|
||||||
assert isinstance(e, brozzler.ReachedLimit)
|
assert isinstance(e, brozzler.ReachedLimit)
|
||||||
if site.reached_limit and site.reached_limit != e.warcprox_meta["reached-limit"]:
|
if (site.reached_limit
|
||||||
self.logger.warn("reached limit %s but site had already reached limit %s",
|
and site.reached_limit != e.warcprox_meta["reached-limit"]):
|
||||||
|
self.logger.warn(
|
||||||
|
"reached limit %s but site had already reached limit %s",
|
||||||
e.warcprox_meta["reached-limit"], self.reached_limit)
|
e.warcprox_meta["reached-limit"], self.reached_limit)
|
||||||
else:
|
else:
|
||||||
site.reached_limit = e.warcprox_meta["reached-limit"]
|
site.reached_limit = e.warcprox_meta["reached-limit"]
|
||||||
self.finished(site, "FINISHED_REACHED_LIMIT")
|
self.finished(site, "FINISHED_REACHED_LIMIT")
|
||||||
|
|
||||||
def job_sites(self, job_id):
|
def job_sites(self, job_id):
|
||||||
results = self.r.table('sites').get_all(job_id, index="job_id").run()
|
results = self.rr.table('sites').get_all(job_id, index="job_id").run()
|
||||||
for result in results:
|
for result in results:
|
||||||
yield brozzler.Site(self.r, result)
|
yield brozzler.Site(self.rr, result)
|
||||||
|
|
||||||
def seed_page(self, site_id):
|
def seed_page(self, site_id):
|
||||||
results = self.r.table("pages").between(
|
results = self.rr.table("pages").between(
|
||||||
[site_id, self.r.minval, self.r.minval, self.r.minval],
|
[site_id, r.minval, r.minval, r.minval],
|
||||||
[site_id, self.r.maxval, self.r.maxval, self.r.maxval],
|
[site_id, r.maxval, r.maxval, r.maxval],
|
||||||
index="priority_by_site").filter({"hops_from_seed":0}).run()
|
index="priority_by_site").filter({"hops_from_seed":0}).run()
|
||||||
pages = list(results)
|
pages = list(results)
|
||||||
if len(pages) > 1:
|
if len(pages) > 1:
|
||||||
|
@ -337,15 +333,15 @@ class RethinkDbFrontier:
|
||||||
"more than one seed page for site_id %s ?", site_id)
|
"more than one seed page for site_id %s ?", site_id)
|
||||||
if len(pages) < 1:
|
if len(pages) < 1:
|
||||||
return None
|
return None
|
||||||
return brozzler.Page(self.r, pages[0])
|
return brozzler.Page(self.rr, pages[0])
|
||||||
|
|
||||||
def site_pages(self, site_id, unbrozzled_only=False):
|
def site_pages(self, site_id, unbrozzled_only=False):
|
||||||
results = self.r.table("pages").between(
|
results = self.rr.table("pages").between(
|
||||||
[site_id, 0 if unbrozzled_only else self.r.minval,
|
[site_id, 0 if unbrozzled_only else r.minval,
|
||||||
self.r.minval, self.r.minval],
|
r.minval, r.minval],
|
||||||
[site_id, 0 if unbrozzled_only else self.r.maxval,
|
[site_id, 0 if unbrozzled_only else r.maxval,
|
||||||
self.r.maxval, self.r.maxval],
|
r.maxval, r.maxval],
|
||||||
index="priority_by_site").run()
|
index="priority_by_site").run()
|
||||||
for result in results:
|
for result in results:
|
||||||
yield brozzler.Page(self.r, result)
|
yield brozzler.Page(self.rr, result)
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ import yaml
|
||||||
import json
|
import json
|
||||||
import datetime
|
import datetime
|
||||||
import uuid
|
import uuid
|
||||||
import rethinkstuff
|
import doublethink
|
||||||
import os
|
import os
|
||||||
import cerberus
|
import cerberus
|
||||||
import urllib
|
import urllib
|
||||||
|
@ -70,9 +70,9 @@ def new_job_file(frontier, job_conf_file):
|
||||||
def new_job(frontier, job_conf):
|
def new_job(frontier, job_conf):
|
||||||
'''Returns new Job.'''
|
'''Returns new Job.'''
|
||||||
validate_conf(job_conf)
|
validate_conf(job_conf)
|
||||||
job = Job(frontier.r, {
|
job = Job(frontier.rr, {
|
||||||
"conf": job_conf,
|
"conf": job_conf,
|
||||||
"status": "ACTIVE", "started": rethinkstuff.utcnow()})
|
"status": "ACTIVE", "started": doublethink.utcnow()})
|
||||||
if "id" in job_conf:
|
if "id" in job_conf:
|
||||||
job.id = job_conf["id"]
|
job.id = job_conf["id"]
|
||||||
job.save()
|
job.save()
|
||||||
|
@ -80,7 +80,7 @@ def new_job(frontier, job_conf):
|
||||||
sites = []
|
sites = []
|
||||||
for seed_conf in job_conf["seeds"]:
|
for seed_conf in job_conf["seeds"]:
|
||||||
merged_conf = merge(seed_conf, job_conf)
|
merged_conf = merge(seed_conf, job_conf)
|
||||||
site = brozzler.Site(frontier.r, {
|
site = brozzler.Site(frontier.rr, {
|
||||||
"job_id": job.id,
|
"job_id": job.id,
|
||||||
"seed": merged_conf["url"],
|
"seed": merged_conf["url"],
|
||||||
"scope": merged_conf.get("scope"),
|
"scope": merged_conf.get("scope"),
|
||||||
|
@ -111,7 +111,7 @@ def new_site(frontier, site):
|
||||||
# where a brozzler worker immediately claims the site, finds no pages
|
# where a brozzler worker immediately claims the site, finds no pages
|
||||||
# to crawl, and decides the site is finished
|
# to crawl, and decides the site is finished
|
||||||
try:
|
try:
|
||||||
page = brozzler.Page(frontier.r, {
|
page = brozzler.Page(frontier.rr, {
|
||||||
"url": site.seed, "site_id": site.get("id"),
|
"url": site.seed, "site_id": site.get("id"),
|
||||||
"job_id": site.get("job_id"), "hops_from_seed": 0,
|
"job_id": site.get("job_id"), "hops_from_seed": 0,
|
||||||
"priority": 1000, "needs_robots_check": True})
|
"priority": 1000, "needs_robots_check": True})
|
||||||
|
@ -123,12 +123,12 @@ def new_site(frontier, site):
|
||||||
except brozzler.ReachedLimit as e:
|
except brozzler.ReachedLimit as e:
|
||||||
frontier.reached_limit(site, e)
|
frontier.reached_limit(site, e)
|
||||||
|
|
||||||
class Job(rethinkstuff.Document):
|
class Job(doublethink.Document):
|
||||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||||
table = "jobs"
|
table = "jobs"
|
||||||
|
|
||||||
def __init__(self, rethinker, d={}):
|
def __init__(self, rr, d={}):
|
||||||
rethinkstuff.Document.__init__(self, rethinker, d)
|
doublethink.Document.__init__(self, rr, d)
|
||||||
self.status = self.get("status", "ACTIVE")
|
self.status = self.get("status", "ACTIVE")
|
||||||
if not "starts_and_stops" in self:
|
if not "starts_and_stops" in self:
|
||||||
if self.get("started"): # backward compatibility
|
if self.get("started"): # backward compatibility
|
||||||
|
@ -138,7 +138,7 @@ class Job(rethinkstuff.Document):
|
||||||
del self["started"]
|
del self["started"]
|
||||||
else:
|
else:
|
||||||
self.starts_and_stops = [
|
self.starts_and_stops = [
|
||||||
{"start":rethinkstuff.utcnow(),"stop":None}]
|
{"start":doublethink.utcnow(),"stop":None}]
|
||||||
|
|
||||||
def finish(self):
|
def finish(self):
|
||||||
if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]:
|
if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]:
|
||||||
|
@ -147,5 +147,5 @@ class Job(rethinkstuff.Document):
|
||||||
"starts_and_stops[-1]['stop']=%s", self.status,
|
"starts_and_stops[-1]['stop']=%s", self.status,
|
||||||
self.starts_and_stops[-1]["stop"])
|
self.starts_and_stops[-1]["stop"])
|
||||||
self.status = "FINISHED"
|
self.status = "FINISHED"
|
||||||
self.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow()
|
self.starts_and_stops[-1]["stop"] = doublethink.utcnow()
|
||||||
|
|
||||||
|
|
|
@ -34,8 +34,8 @@ except ImportError as e:
|
||||||
'brozzler[easy]".\nSee README.rst for more information.',
|
'brozzler[easy]".\nSee README.rst for more information.',
|
||||||
type(e).__name__, e)
|
type(e).__name__, e)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
import rethinkstuff
|
import doublethink
|
||||||
import rethinkdb
|
import rethinkdb as r
|
||||||
import surt
|
import surt
|
||||||
import json
|
import json
|
||||||
import brozzler
|
import brozzler
|
||||||
|
@ -48,12 +48,12 @@ class RethinkCDXSource(pywb.cdx.cdxsource.CDXSource):
|
||||||
self.table = table
|
self.table = table
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def r(self):
|
def rr(self):
|
||||||
try:
|
try:
|
||||||
return self._r
|
return self._rr
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
self._r = rethinkstuff.Rethinker(self.servers, self.db)
|
self._rr = doublethink.Rethinker(self.servers, self.db)
|
||||||
return self._r
|
return self._rr
|
||||||
|
|
||||||
def load_cdx(self, cdx_query):
|
def load_cdx(self, cdx_query):
|
||||||
# logging.debug('vars(cdx_query)=%s', vars(cdx_query))
|
# logging.debug('vars(cdx_query)=%s', vars(cdx_query))
|
||||||
|
@ -85,15 +85,14 @@ class RethinkCDXSource(pywb.cdx.cdxsource.CDXSource):
|
||||||
def _query_rethinkdb(self, cdx_query):
|
def _query_rethinkdb(self, cdx_query):
|
||||||
start_key = cdx_query.key.decode('utf-8')
|
start_key = cdx_query.key.decode('utf-8')
|
||||||
end_key = cdx_query.end_key.decode('utf-8')
|
end_key = cdx_query.end_key.decode('utf-8')
|
||||||
reql = self.r.table(self.table).between(
|
reql = self.rr.table(self.table).between(
|
||||||
[start_key[:150], rethinkdb.minval],
|
[start_key[:150], r.minval], [end_key[:150], r.maxval],
|
||||||
[end_key[:150], rethinkdb.maxval],
|
|
||||||
index='abbr_canon_surt_timestamp', right_bound='closed')
|
index='abbr_canon_surt_timestamp', right_bound='closed')
|
||||||
reql = reql.order_by(index='abbr_canon_surt_timestamp')
|
reql = reql.order_by(index='abbr_canon_surt_timestamp')
|
||||||
# TODO support for POST, etc
|
# TODO support for POST, etc
|
||||||
# http_method='WARCPROX_WRITE_RECORD' for screenshots, thumbnails
|
# http_method='WARCPROX_WRITE_RECORD' for screenshots, thumbnails
|
||||||
reql = reql.filter(
|
reql = reql.filter(
|
||||||
lambda capture: rethinkdb.expr(
|
lambda capture: r.expr(
|
||||||
['WARCPROX_WRITE_RECORD','GET']).contains(
|
['WARCPROX_WRITE_RECORD','GET']).contains(
|
||||||
capture['http_method']))
|
capture['http_method']))
|
||||||
reql = reql.filter(
|
reql = reql.filter(
|
||||||
|
|
|
@ -22,13 +22,13 @@ import logging
|
||||||
import brozzler
|
import brozzler
|
||||||
import hashlib
|
import hashlib
|
||||||
import time
|
import time
|
||||||
import rethinkstuff
|
import doublethink
|
||||||
import datetime
|
import datetime
|
||||||
import re
|
import re
|
||||||
import ipaddress
|
import ipaddress
|
||||||
|
|
||||||
_EPOCH_UTC = datetime.datetime.utcfromtimestamp(0.0).replace(
|
_EPOCH_UTC = datetime.datetime.utcfromtimestamp(0.0).replace(
|
||||||
tzinfo=rethinkstuff.UTC)
|
tzinfo=doublethink.UTC)
|
||||||
|
|
||||||
class Url:
|
class Url:
|
||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
|
@ -88,12 +88,12 @@ class Url:
|
||||||
|
|
||||||
return host_parts[-len(domain_parts):] == domain_parts
|
return host_parts[-len(domain_parts):] == domain_parts
|
||||||
|
|
||||||
class Site(rethinkstuff.Document):
|
class Site(doublethink.Document):
|
||||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||||
table = 'sites'
|
table = 'sites'
|
||||||
|
|
||||||
def __init__(self, rethinker, d={}):
|
def __init__(self, rr, d={}):
|
||||||
rethinkstuff.Document.__init__(self, rethinker, d)
|
doublethink.Document.__init__(self, rr, d)
|
||||||
if not self.get('status'):
|
if not self.get('status'):
|
||||||
self.status = 'ACTIVE'
|
self.status = 'ACTIVE'
|
||||||
self.enable_warcprox_features = bool(self.get('enable_warcprox_features'))
|
self.enable_warcprox_features = bool(self.get('enable_warcprox_features'))
|
||||||
|
@ -107,7 +107,11 @@ class Site(rethinkstuff.Document):
|
||||||
self.starts_and_stops[0]["stop"] = self.last_disclaimed
|
self.starts_and_stops[0]["stop"] = self.last_disclaimed
|
||||||
else:
|
else:
|
||||||
self.starts_and_stops = [
|
self.starts_and_stops = [
|
||||||
{"start":rethinkstuff.utcnow(),"stop":None}]
|
{"start":doublethink.utcnow(),"stop":None}]
|
||||||
|
if not self.scope:
|
||||||
|
self.scope = {}
|
||||||
|
if not 'surt' in self.scope:
|
||||||
|
self.scope['surt'] = Url(self.seed).surt
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return 'Site({"id":"%s","seed":"%s",...})' % (self.id, self.seed)
|
return 'Site({"id":"%s","seed":"%s",...})' % (self.id, self.seed)
|
||||||
|
@ -121,7 +125,7 @@ class Site(rethinkstuff.Document):
|
||||||
if ss['stop']:
|
if ss['stop']:
|
||||||
dt += (ss['stop'] - ss['start']).total_seconds()
|
dt += (ss['stop'] - ss['start']).total_seconds()
|
||||||
else: # crawl is active
|
else: # crawl is active
|
||||||
dt += (rethinkstuff.utcnow() - ss['start']).total_seconds()
|
dt += (doublethink.utcnow() - ss['start']).total_seconds()
|
||||||
return dt
|
return dt
|
||||||
|
|
||||||
def note_seed_redirect(self, url):
|
def note_seed_redirect(self, url):
|
||||||
|
@ -278,12 +282,12 @@ class Site(rethinkstuff.Document):
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
class Page(rethinkstuff.Document):
|
class Page(doublethink.Document):
|
||||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||||
table = 'pages'
|
table = 'pages'
|
||||||
|
|
||||||
def __init__(self, rethinker, d={}):
|
def __init__(self, rr, d={}):
|
||||||
rethinkstuff.Document.__init__(self, rethinker, d)
|
doublethink.Document.__init__(self, rr, d)
|
||||||
self.hops_from_seed = self.get('hops_from_seed', 0)
|
self.hops_from_seed = self.get('hops_from_seed', 0)
|
||||||
self.brozzle_count = self.get('brozzle_count', 0)
|
self.brozzle_count = self.get('brozzle_count', 0)
|
||||||
self.claimed = self.get('claimed', False)
|
self.claimed = self.get('claimed', False)
|
||||||
|
@ -303,12 +307,16 @@ class Page(rethinkstuff.Document):
|
||||||
self.redirect_url = url
|
self.redirect_url = url
|
||||||
|
|
||||||
def _calc_priority(self):
|
def _calc_priority(self):
|
||||||
|
if not self.url:
|
||||||
|
return None
|
||||||
priority = 0
|
priority = 0
|
||||||
priority += max(0, 10 - self.hops_from_seed)
|
priority += max(0, 10 - self.hops_from_seed)
|
||||||
priority += max(0, 6 - self.canon_url().count("/"))
|
priority += max(0, 6 - self.canon_url().count("/"))
|
||||||
return priority
|
return priority
|
||||||
|
|
||||||
def canon_url(self):
|
def canon_url(self):
|
||||||
|
if not self.url:
|
||||||
|
return None
|
||||||
if self._canon_hurl is None:
|
if self._canon_hurl is None:
|
||||||
self._canon_hurl = surt.handyurl.parse(self.url)
|
self._canon_hurl = surt.handyurl.parse(self.url)
|
||||||
surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl)
|
surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl)
|
||||||
|
|
|
@ -31,7 +31,7 @@ import io
|
||||||
import socket
|
import socket
|
||||||
import collections
|
import collections
|
||||||
import requests
|
import requests
|
||||||
import rethinkstuff
|
import doublethink
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
class ExtraHeaderAdder(urllib.request.BaseHandler):
|
class ExtraHeaderAdder(urllib.request.BaseHandler):
|
||||||
|
@ -131,7 +131,7 @@ class BrozzlerWorker:
|
||||||
'no available instances of warcprox in the service '
|
'no available instances of warcprox in the service '
|
||||||
'registry')
|
'registry')
|
||||||
site.proxy = '%s:%s' % (svc['host'], svc['port'])
|
site.proxy = '%s:%s' % (svc['host'], svc['port'])
|
||||||
self._frontier.update_site(site)
|
site.save()
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'chose warcprox instance %s from service registry for %s',
|
'chose warcprox instance %s from service registry for %s',
|
||||||
repr(site.proxy), site)
|
repr(site.proxy), site)
|
||||||
|
@ -408,7 +408,7 @@ class BrozzlerWorker:
|
||||||
if not hasattr(self, "status_info"):
|
if not hasattr(self, "status_info"):
|
||||||
due = True
|
due = True
|
||||||
else:
|
else:
|
||||||
d = rethinkstuff.utcnow() - self.status_info["last_heartbeat"]
|
d = doublethink.utcnow() - self.status_info["last_heartbeat"]
|
||||||
due = d.total_seconds() > self.HEARTBEAT_INTERVAL
|
due = d.total_seconds() > self.HEARTBEAT_INTERVAL
|
||||||
|
|
||||||
if due:
|
if due:
|
||||||
|
|
4
setup.py
4
setup.py
|
@ -32,7 +32,7 @@ def find_package_data(package):
|
||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='brozzler',
|
name='brozzler',
|
||||||
version='1.1b9.dev197',
|
version='1.1b9.dev198',
|
||||||
description='Distributed web crawling with browsers',
|
description='Distributed web crawling with browsers',
|
||||||
url='https://github.com/internetarchive/brozzler',
|
url='https://github.com/internetarchive/brozzler',
|
||||||
author='Noah Levitt',
|
author='Noah Levitt',
|
||||||
|
@ -69,7 +69,7 @@ setuptools.setup(
|
||||||
'websocket-client!=0.39.0',
|
'websocket-client!=0.39.0',
|
||||||
'pillow==3.3.0',
|
'pillow==3.3.0',
|
||||||
'surt>=0.3.0',
|
'surt>=0.3.0',
|
||||||
'rethinkstuff>=0.2.0.dev62',
|
'doublethink>=0.2.0.dev66',
|
||||||
'rethinkdb>=2.3,<2.4',
|
'rethinkdb>=2.3,<2.4',
|
||||||
'cerberus==1.0.1',
|
'cerberus==1.0.1',
|
||||||
'jinja2',
|
'jinja2',
|
||||||
|
|
|
@ -24,7 +24,7 @@ import threading
|
||||||
import urllib.request
|
import urllib.request
|
||||||
import os
|
import os
|
||||||
import socket
|
import socket
|
||||||
import rethinkstuff
|
import doublethink
|
||||||
import time
|
import time
|
||||||
import brozzler
|
import brozzler
|
||||||
import datetime
|
import datetime
|
||||||
|
@ -78,8 +78,8 @@ def test_httpd(httpd):
|
||||||
def test_services_up():
|
def test_services_up():
|
||||||
'''Check that the expected services are up and running.'''
|
'''Check that the expected services are up and running.'''
|
||||||
# check that rethinkdb is listening and looks sane
|
# check that rethinkdb is listening and looks sane
|
||||||
r = rethinkstuff.Rethinker(db='rethinkdb') # built-in db
|
rr = doublethink.Rethinker(db='rethinkdb') # built-in db
|
||||||
tbls = r.table_list().run()
|
tbls = rr.table_list().run()
|
||||||
assert len(tbls) > 10
|
assert len(tbls) > 10
|
||||||
|
|
||||||
# check that warcprox is listening
|
# check that warcprox is listening
|
||||||
|
@ -99,10 +99,11 @@ def test_services_up():
|
||||||
|
|
||||||
def test_brozzle_site(httpd):
|
def test_brozzle_site(httpd):
|
||||||
test_id = 'test_brozzle_site-%s' % datetime.datetime.utcnow().isoformat()
|
test_id = 'test_brozzle_site-%s' % datetime.datetime.utcnow().isoformat()
|
||||||
site = brozzler.Site(
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
||||||
seed='http://localhost:%s/site1/' % httpd.server_port,
|
site = brozzler.Site(rr, {
|
||||||
proxy='localhost:8000', enable_warcprox_features=True,
|
'seed': 'http://localhost:%s/site1/' % httpd.server_port,
|
||||||
warcprox_meta={'captures-table-extra-fields':{'test_id':test_id}})
|
'proxy': 'localhost:8000', 'enable_warcprox_features': True,
|
||||||
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
||||||
|
|
||||||
# the two pages we expect to be crawled
|
# the two pages we expect to be crawled
|
||||||
page1 = 'http://localhost:%s/site1/' % httpd.server_port
|
page1 = 'http://localhost:%s/site1/' % httpd.server_port
|
||||||
|
@ -114,8 +115,7 @@ def test_brozzle_site(httpd):
|
||||||
stop_service('brozzler-worker')
|
stop_service('brozzler-worker')
|
||||||
|
|
||||||
assert site.id is None
|
assert site.id is None
|
||||||
r = rethinkstuff.Rethinker('localhost', db='brozzler')
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
frontier = brozzler.RethinkDbFrontier(r)
|
|
||||||
brozzler.new_site(frontier, site)
|
brozzler.new_site(frontier, site)
|
||||||
assert site.id is not None
|
assert site.id is not None
|
||||||
assert len(list(frontier.site_pages(site.id))) == 1
|
assert len(list(frontier.site_pages(site.id))) == 1
|
||||||
|
@ -126,7 +126,7 @@ def test_brozzle_site(httpd):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while site.status != 'FINISHED' and time.time() - start < 300:
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
site = frontier.site(site.id)
|
site.refresh()
|
||||||
assert site.status == 'FINISHED'
|
assert site.status == 'FINISHED'
|
||||||
|
|
||||||
# check that we got the two pages we expected
|
# check that we got the two pages we expected
|
||||||
|
@ -138,7 +138,7 @@ def test_brozzle_site(httpd):
|
||||||
|
|
||||||
time.sleep(2) # in case warcprox hasn't finished processing urls
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
||||||
# take a look at the captures table
|
# take a look at the captures table
|
||||||
captures = r.table('captures').filter({'test_id':test_id}).run()
|
captures = rr.table('captures').filter({'test_id':test_id}).run()
|
||||||
captures_by_url = {
|
captures_by_url = {
|
||||||
c['url']: c for c in captures if c['http_method'] != 'HEAD'}
|
c['url']: c for c in captures if c['http_method'] != 'HEAD'}
|
||||||
assert robots in captures_by_url
|
assert robots in captures_by_url
|
||||||
|
@ -180,17 +180,17 @@ def test_warcprox_selection(httpd):
|
||||||
page2 = 'http://localhost:%s/site1/file1.txt' % httpd.server_port
|
page2 = 'http://localhost:%s/site1/file1.txt' % httpd.server_port
|
||||||
robots = 'http://localhost:%s/robots.txt' % httpd.server_port
|
robots = 'http://localhost:%s/robots.txt' % httpd.server_port
|
||||||
|
|
||||||
site = brozzler.Site(
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
||||||
seed='http://localhost:%s/site1/' % httpd.server_port,
|
site = brozzler.Site(rr, {
|
||||||
enable_warcprox_features=True,
|
'seed': 'http://localhost:%s/site1/' % httpd.server_port,
|
||||||
warcprox_meta={'captures-table-extra-fields':{'test_id':test_id}})
|
'enable_warcprox_features': True,
|
||||||
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
||||||
|
|
||||||
# so we can examine rethinkdb before it does anything
|
# so we can examine rethinkdb before it does anything
|
||||||
try:
|
try:
|
||||||
stop_service('brozzler-worker')
|
stop_service('brozzler-worker')
|
||||||
assert site.id is None
|
assert site.id is None
|
||||||
r = rethinkstuff.Rethinker('localhost', db='brozzler')
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
frontier = brozzler.RethinkDbFrontier(r)
|
|
||||||
brozzler.new_site(frontier, site)
|
brozzler.new_site(frontier, site)
|
||||||
assert site.id is not None
|
assert site.id is not None
|
||||||
assert len(list(frontier.site_pages(site.id))) == 1
|
assert len(list(frontier.site_pages(site.id))) == 1
|
||||||
|
@ -201,14 +201,14 @@ def test_warcprox_selection(httpd):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while not site.proxy and time.time() - start < 20:
|
while not site.proxy and time.time() - start < 20:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
site = frontier.site(site.id)
|
site.refresh()
|
||||||
assert site.proxy[-5:] == ':8000'
|
assert site.proxy[-5:] == ':8000'
|
||||||
|
|
||||||
# the site should be brozzled fairly quickly
|
# the site should be brozzled fairly quickly
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while site.status != 'FINISHED' and time.time() - start < 300:
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
site = frontier.site(site.id)
|
site.refresh()
|
||||||
assert site.status == 'FINISHED'
|
assert site.status == 'FINISHED'
|
||||||
|
|
||||||
# check that we got the two pages we expected
|
# check that we got the two pages we expected
|
||||||
|
@ -220,7 +220,7 @@ def test_warcprox_selection(httpd):
|
||||||
|
|
||||||
time.sleep(2) # in case warcprox hasn't finished processing urls
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
||||||
# take a look at the captures table
|
# take a look at the captures table
|
||||||
captures = r.table('captures').filter({'test_id':test_id}).run()
|
captures = rr.table('captures').filter({'test_id':test_id}).run()
|
||||||
captures_by_url = {
|
captures_by_url = {
|
||||||
c['url']: c for c in captures if c['http_method'] != 'HEAD'}
|
c['url']: c for c in captures if c['http_method'] != 'HEAD'}
|
||||||
assert robots in captures_by_url
|
assert robots in captures_by_url
|
||||||
|
@ -239,19 +239,19 @@ def test_warcprox_selection(httpd):
|
||||||
|
|
||||||
def test_obey_robots(httpd):
|
def test_obey_robots(httpd):
|
||||||
test_id = 'test_obey_robots-%s' % datetime.datetime.utcnow().isoformat()
|
test_id = 'test_obey_robots-%s' % datetime.datetime.utcnow().isoformat()
|
||||||
site = brozzler.Site(
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
||||||
seed='http://localhost:%s/site1/' % httpd.server_port,
|
site = brozzler.Site(rr, {
|
||||||
proxy='localhost:8000', enable_warcprox_features=True,
|
'seed': 'http://localhost:%s/site1/' % httpd.server_port,
|
||||||
user_agent='im a badbot', # robots.txt blocks badbot
|
'proxy': 'localhost:8000', 'enable_warcprox_features': True,
|
||||||
warcprox_meta={'captures-table-extra-fields':{'test_id':test_id}})
|
'user_agent': 'im a badbot', # robots.txt blocks badbot
|
||||||
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}}})
|
||||||
|
|
||||||
# so we can examine rethinkdb before it does anything
|
# so we can examine rethinkdb before it does anything
|
||||||
try:
|
try:
|
||||||
stop_service('brozzler-worker')
|
stop_service('brozzler-worker')
|
||||||
|
|
||||||
assert site.id is None
|
assert site.id is None
|
||||||
r = rethinkstuff.Rethinker('localhost', db='brozzler')
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
frontier = brozzler.RethinkDbFrontier(r)
|
|
||||||
brozzler.new_site(frontier, site)
|
brozzler.new_site(frontier, site)
|
||||||
assert site.id is not None
|
assert site.id is not None
|
||||||
site_pages = list(frontier.site_pages(site.id))
|
site_pages = list(frontier.site_pages(site.id))
|
||||||
|
@ -265,7 +265,7 @@ def test_obey_robots(httpd):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while site.status != 'FINISHED' and time.time() - start < 300:
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
site = frontier.site(site.id)
|
site.refresh()
|
||||||
assert site.status == 'FINISHED'
|
assert site.status == 'FINISHED'
|
||||||
|
|
||||||
# check that only the one page is in rethinkdb
|
# check that only the one page is in rethinkdb
|
||||||
|
@ -278,7 +278,7 @@ def test_obey_robots(httpd):
|
||||||
# take a look at the captures table
|
# take a look at the captures table
|
||||||
time.sleep(2) # in case warcprox hasn't finished processing urls
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
||||||
robots_url = 'http://localhost:%s/robots.txt' % httpd.server_port
|
robots_url = 'http://localhost:%s/robots.txt' % httpd.server_port
|
||||||
captures = list(r.table('captures').filter({'test_id':test_id}).run())
|
captures = list(rr.table('captures').filter({'test_id':test_id}).run())
|
||||||
assert len(captures) == 1
|
assert len(captures) == 1
|
||||||
assert captures[0]['url'] == robots_url
|
assert captures[0]['url'] == robots_url
|
||||||
|
|
||||||
|
@ -292,27 +292,27 @@ def test_obey_robots(httpd):
|
||||||
|
|
||||||
def test_login(httpd):
|
def test_login(httpd):
|
||||||
test_id = 'test_login-%s' % datetime.datetime.utcnow().isoformat()
|
test_id = 'test_login-%s' % datetime.datetime.utcnow().isoformat()
|
||||||
site = brozzler.Site(
|
rr = doublethink.Rethinker('localhost', db='brozzler')
|
||||||
seed='http://localhost:%s/site2/' % httpd.server_port,
|
site = brozzler.Site(rr, {
|
||||||
proxy='localhost:8000', enable_warcprox_features=True,
|
'seed': 'http://localhost:%s/site2/' % httpd.server_port,
|
||||||
warcprox_meta={'captures-table-extra-fields':{'test_id':test_id}},
|
'proxy': 'localhost:8000', 'enable_warcprox_features': True,
|
||||||
username='test_username', password='test_password')
|
'warcprox_meta': {'captures-table-extra-fields':{'test_id':test_id}},
|
||||||
|
'username': 'test_username', 'password': 'test_password'})
|
||||||
|
|
||||||
r = rethinkstuff.Rethinker('localhost', db='brozzler')
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
frontier = brozzler.RethinkDbFrontier(r)
|
|
||||||
brozzler.new_site(frontier, site)
|
brozzler.new_site(frontier, site)
|
||||||
|
|
||||||
# the site should be brozzled fairly quickly
|
# the site should be brozzled fairly quickly
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while site.status != 'FINISHED' and time.time() - start < 300:
|
while site.status != 'FINISHED' and time.time() - start < 300:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
site = frontier.site(site.id)
|
site.refresh()
|
||||||
assert site.status == 'FINISHED'
|
assert site.status == 'FINISHED'
|
||||||
|
|
||||||
# take a look at the captures table
|
# take a look at the captures table
|
||||||
time.sleep(2) # in case warcprox hasn't finished processing urls
|
time.sleep(2) # in case warcprox hasn't finished processing urls
|
||||||
robots_url = 'http://localhost:%s/robots.txt' % httpd.server_port
|
robots_url = 'http://localhost:%s/robots.txt' % httpd.server_port
|
||||||
captures = list(r.table('captures').filter(
|
captures = list(rr.table('captures').filter(
|
||||||
{'test_id':test_id}).order_by('timestamp').run())
|
{'test_id':test_id}).order_by('timestamp').run())
|
||||||
meth_url = ['%s %s' % (c['http_method'], c['url']) for c in captures]
|
meth_url = ['%s %s' % (c['http_method'], c['url']) for c in captures]
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ limitations under the License.
|
||||||
import brozzler
|
import brozzler
|
||||||
import logging
|
import logging
|
||||||
import argparse
|
import argparse
|
||||||
import rethinkstuff
|
import doublethink
|
||||||
import time
|
import time
|
||||||
|
|
||||||
args = argparse.Namespace()
|
args = argparse.Namespace()
|
||||||
|
@ -30,8 +30,8 @@ brozzler.cli.configure_logging(args)
|
||||||
|
|
||||||
def test_rethinkdb_up():
|
def test_rethinkdb_up():
|
||||||
'''Checks that rethinkdb is listening and looks sane.'''
|
'''Checks that rethinkdb is listening and looks sane.'''
|
||||||
r = rethinkstuff.Rethinker(db='rethinkdb') # built-in db
|
rr = doublethink.Rethinker(db='rethinkdb') # built-in db
|
||||||
tbls = r.table_list().run()
|
tbls = rr.table_list().run()
|
||||||
assert len(tbls) > 10
|
assert len(tbls) > 10
|
||||||
|
|
||||||
def test_resume_job():
|
def test_resume_job():
|
||||||
|
@ -40,8 +40,8 @@ def test_resume_job():
|
||||||
"finish" crawling a job. Doesn't actually crawl anything.
|
"finish" crawling a job. Doesn't actually crawl anything.
|
||||||
'''
|
'''
|
||||||
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db
|
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db
|
||||||
r = rethinkstuff.Rethinker(db='ignoreme')
|
rr = doublethink.Rethinker(db='ignoreme')
|
||||||
frontier = brozzler.RethinkDbFrontier(r)
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
job_conf = {'seeds': [{'url': 'http://example.com/'}]}
|
job_conf = {'seeds': [{'url': 'http://example.com/'}]}
|
||||||
job = brozzler.new_job(frontier, job_conf)
|
job = brozzler.new_job(frontier, job_conf)
|
||||||
assert len(list(frontier.job_sites(job.id))) == 1
|
assert len(list(frontier.job_sites(job.id))) == 1
|
||||||
|
@ -57,7 +57,7 @@ def test_resume_job():
|
||||||
assert site.starts_and_stops[0]['stop'] is None
|
assert site.starts_and_stops[0]['stop'] is None
|
||||||
|
|
||||||
frontier.finished(site, 'FINISHED')
|
frontier.finished(site, 'FINISHED')
|
||||||
job = frontier.job(job.id)
|
job.refresh()
|
||||||
|
|
||||||
assert job.status == 'FINISHED'
|
assert job.status == 'FINISHED'
|
||||||
assert len(job.starts_and_stops) == 1
|
assert len(job.starts_and_stops) == 1
|
||||||
|
@ -71,7 +71,7 @@ def test_resume_job():
|
||||||
assert site.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start']
|
assert site.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start']
|
||||||
|
|
||||||
frontier.resume_site(site)
|
frontier.resume_site(site)
|
||||||
job = frontier.job(job.id)
|
job.refresh()
|
||||||
|
|
||||||
assert job.status == 'ACTIVE'
|
assert job.status == 'ACTIVE'
|
||||||
assert len(job.starts_and_stops) == 2
|
assert len(job.starts_and_stops) == 2
|
||||||
|
@ -83,7 +83,7 @@ def test_resume_job():
|
||||||
assert site.starts_and_stops[1]['stop'] is None
|
assert site.starts_and_stops[1]['stop'] is None
|
||||||
|
|
||||||
frontier.finished(site, 'FINISHED')
|
frontier.finished(site, 'FINISHED')
|
||||||
job = frontier.job(job.id)
|
job.refresh()
|
||||||
|
|
||||||
assert job.status == 'FINISHED'
|
assert job.status == 'FINISHED'
|
||||||
assert len(job.starts_and_stops) == 2
|
assert len(job.starts_and_stops) == 2
|
||||||
|
@ -110,7 +110,7 @@ def test_resume_job():
|
||||||
assert site.starts_and_stops[2]['stop'] is None
|
assert site.starts_and_stops[2]['stop'] is None
|
||||||
|
|
||||||
frontier.finished(site, 'FINISHED')
|
frontier.finished(site, 'FINISHED')
|
||||||
job = frontier.job(job.id)
|
job.refresh()
|
||||||
|
|
||||||
assert job.status == 'FINISHED'
|
assert job.status == 'FINISHED'
|
||||||
assert len(job.starts_and_stops) == 3
|
assert len(job.starts_and_stops) == 3
|
||||||
|
@ -125,12 +125,12 @@ def test_resume_job():
|
||||||
|
|
||||||
def test_time_limit():
|
def test_time_limit():
|
||||||
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db
|
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db
|
||||||
r = rethinkstuff.Rethinker('localhost', db='ignoreme')
|
rr = doublethink.Rethinker('localhost', db='ignoreme')
|
||||||
frontier = brozzler.RethinkDbFrontier(r)
|
frontier = brozzler.RethinkDbFrontier(rr)
|
||||||
site = brozzler.Site(seed='http://example.com/', time_limit=99999)
|
site = brozzler.Site(rr, {'seed':'http://example.com/', 'time_limit':99999})
|
||||||
brozzler.new_site(frontier, site)
|
brozzler.new_site(frontier, site)
|
||||||
|
|
||||||
site = frontier.site(site.id) # get it back from the db
|
site.refresh() # get it back from the db
|
||||||
assert site.status == 'ACTIVE'
|
assert site.status == 'ACTIVE'
|
||||||
assert len(site.starts_and_stops) == 1
|
assert len(site.starts_and_stops) == 1
|
||||||
assert site.starts_and_stops[0]['start']
|
assert site.starts_and_stops[0]['start']
|
||||||
|
@ -161,7 +161,7 @@ def test_time_limit():
|
||||||
|
|
||||||
site.time_limit = 0.1
|
site.time_limit = 0.1
|
||||||
site.claimed = True
|
site.claimed = True
|
||||||
frontier.update_site(site)
|
site.save()
|
||||||
|
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
frontier._enforce_time_limit(site)
|
frontier._enforce_time_limit(site)
|
||||||
|
|
|
@ -50,10 +50,10 @@ def test_robots(httpd):
|
||||||
Basic test of robots.txt user-agent substring matching.
|
Basic test of robots.txt user-agent substring matching.
|
||||||
'''
|
'''
|
||||||
url = 'http://localhost:%s/' % httpd.server_port
|
url = 'http://localhost:%s/' % httpd.server_port
|
||||||
site = brozzler.Site(seed=url, user_agent='im/a/GoOdbot/yep')
|
site = brozzler.Site(None, {'seed':url,'user_agent':'im/a/GoOdbot/yep'})
|
||||||
assert brozzler.is_permitted_by_robots(site, url)
|
assert brozzler.is_permitted_by_robots(site, url)
|
||||||
|
|
||||||
site = brozzler.Site(seed=url, user_agent='im/a bAdBOt/uh huh')
|
site = brozzler.Site(None, {'seed':url,'user_agent':'im/a bAdBOt/uh huh'})
|
||||||
assert not brozzler.is_permitted_by_robots(site, url)
|
assert not brozzler.is_permitted_by_robots(site, url)
|
||||||
|
|
||||||
def test_scoping():
|
def test_scoping():
|
||||||
|
@ -77,11 +77,12 @@ blocks:
|
||||||
- bad_thing: bad rule should be ignored
|
- bad_thing: bad rule should be ignored
|
||||||
''')
|
''')
|
||||||
|
|
||||||
site = brozzler.Site(
|
site = brozzler.Site(None, {
|
||||||
seed='http://example.com/foo/bar?baz=quux#monkey', id=1,
|
'id': 1, 'seed': 'http://example.com/foo/bar?baz=quux#monkey',
|
||||||
scope=test_scope)
|
'scope': test_scope})
|
||||||
page = brozzler.Page(
|
page = brozzler.Page(None, {
|
||||||
url='http://example.com/foo/bar?baz=quux#monkey', site_id=site.id)
|
'url': 'http://example.com/foo/bar?baz=quux#monkey',
|
||||||
|
'site_id': site.id})
|
||||||
|
|
||||||
assert site.is_in_scope('http://example.com/foo/bar', page)
|
assert site.is_in_scope('http://example.com/foo/bar', page)
|
||||||
assert not site.is_in_scope('http://example.com/foo/baz', page)
|
assert not site.is_in_scope('http://example.com/foo/baz', page)
|
||||||
|
@ -100,9 +101,9 @@ blocks:
|
||||||
|
|
||||||
assert not site.is_in_scope(
|
assert not site.is_in_scope(
|
||||||
'https://www.youtube.com/watch?v=dUIn5OAPS5s', page)
|
'https://www.youtube.com/watch?v=dUIn5OAPS5s', page)
|
||||||
yt_user_page = brozzler.Page(
|
yt_user_page = brozzler.Page(None, {
|
||||||
url='https://www.youtube.com/user/SonoraSantaneraVEVO',
|
'url': 'https://www.youtube.com/user/SonoraSantaneraVEVO',
|
||||||
site_id=site.id, hops_from_seed=10)
|
'site_id': site.id, 'hops_from_seed': 10})
|
||||||
assert site.is_in_scope(
|
assert site.is_in_scope(
|
||||||
'https://www.youtube.com/watch?v=dUIn5OAPS5s', yt_user_page)
|
'https://www.youtube.com/watch?v=dUIn5OAPS5s', yt_user_page)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue