support for specifying rethinkdb database name; wrap rethinkdb operations and retry if appropriate (as best as we can tell)

This commit is contained in:
Noah Levitt 2015-08-28 00:37:26 +00:00
parent cf91fb1377
commit f334107b47
8 changed files with 108 additions and 89 deletions

View File

@ -15,19 +15,20 @@ Brozzler is designed to work in conjunction with
Installation
------------
```
git clone https://github.com/nlevitt/brozzler
git clone https://github.com/nlevitt/brozzler.git
cd brozzler
# set up virtualenv if desired
pip install -r requirements.txt .
```
Brozzler also requires rethinkdb.
Brozzler also requires a rethinkdb deployment.
Fonts for good screenshots
--------------------------
On ubuntu 14.04 trusty I installed these packages:
xfonts-base ttf-mscorefonts-installer fonts-arphic-bkai00mp fonts-arphic-bsmi00lp fonts-arphic-gbsn00lp fonts-arphic-gkai00mp fonts-arphic-ukai fonts-farsiweb fonts-nafees fonts-sil-abyssinica fonts-sil-ezra fonts-sil-padauk fonts-unfonts-
extra fonts-unfonts-core ttf-indic-fonts fonts-thai-tlwg fonts-lklug-sinhala
xfonts-base ttf-mscorefonts-installer fonts-arphic-bkai00mp fonts-arphic-bsmi00lp fonts-arphic-gbsn00lp fonts-arphic-gkai00mp fonts-arphic-ukai fonts-farsiweb fonts-nafees fonts-sil-abyssinica fonts-sil-ezra fonts-sil-padauk fonts-unfonts-extra fonts-unfonts-core ttf-indic-fonts fonts-thai-tlwg fonts-lklug-sinhala
Haven't looked much at the resulting screenshots yet though.
License
-------

View File

@ -13,8 +13,10 @@ arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-new-job - queue new job with brozzler",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('job_conf_file', metavar='JOB_CONF_FILE', help='brozzler job configuration file in yaml')
arg_parser.add_argument("--db", dest="db", default="localhost",
help="comma-separated list of RethinkDB server addresses, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org")
arg_parser.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', default="localhost",
help='rethinkdb servers, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="brozzler",
help='rethinkdb database name')
arg_parser.add_argument("-v", "--verbose", dest="log_level",
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument("--version", action="version",
@ -24,6 +26,6 @@ args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
frontier = brozzler.RethinkDbFrontier(args.db.split(","))
frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db)
brozzler.job.new_job_file(frontier, job_conf_file)

View File

@ -12,8 +12,10 @@ arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-new-site - register site to brozzle",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('seed', metavar='SEED', help='seed url')
arg_parser.add_argument("--db", dest="db", default="localhost",
help="comma-separated list of RethinkDB server addresses, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org")
arg_parser.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', default="localhost",
help='rethinkdb servers, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="brozzler",
help='rethinkdb database name')
arg_parser.add_argument("--proxy", dest="proxy", default=None, help="http proxy for this site")
arg_parser.add_argument("--time-limit", dest="time_limit", default=None, help="time limit in seconds for this site")
arg_parser.add_argument("-H", "--extra-header", action="append",
@ -43,6 +45,6 @@ site = brozzler.Site(seed=args.seed, proxy=args.proxy,
enable_warcprox_features=args.enable_warcprox_features,
extra_headers=extra_headers)
frontier = brozzler.RethinkDbFrontier(args.db.split(","))
frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db)
brozzler.new_site(frontier, site)

View File

@ -14,8 +14,10 @@ import traceback
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument("--db", dest="db", default="localhost",
help="comma-separated list of RethinkDB server addresses, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org")
arg_parser.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', default="localhost",
help='rethinkdb servers, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org')
arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="brozzler",
help='rethinkdb database name')
arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser',
help='executable to use to invoke chrome')
arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1',
@ -49,7 +51,7 @@ signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint)
frontier = brozzler.RethinkDbFrontier(args.db.split(","))
frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db)
worker = brozzler.BrozzlerWorker(frontier, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe)
worker.start()

View File

@ -45,4 +45,39 @@ class ReachedLimit(Exception):
def __str__(self):
return self.__repr__()
class Rethinker:
import logging
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, servers=["localhost"], db=None):
self.servers = servers
self.db = db
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
# "Best practices: Managing connections: a connection per request"
def _random_server_connection(self):
import rethinkdb as r
import random
while True:
server = random.choice(self.servers)
try:
try:
host, port = server.split(":")
return r.connect(host=host, port=port)
except ValueError:
return r.connect(host=server)
except Exception as e:
self.logger.error("will keep trying to get a connection after failure connecting to %s", server, exc_info=True)
import time
time.sleep(0.5)
def run(self, query):
while True:
with self._random_server_connection() as conn:
try:
return query.run(conn, db=self.db)
except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e:
self.logger.error("will retry rethinkdb query/operation %s which failed like so:", exc_info=True)
# vim: set sw=4 et:

View File

@ -16,39 +16,27 @@ class RethinkDbFrontier:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, servers=["localhost"], db="brozzler", shards=3, replicas=3):
self.servers = servers
self.db = db
self.r = brozzler.Rethinker(servers, db)
self.shards = shards
self.replicas = replicas
self._ensure_db()
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
# "Best practices: Managing connections: a connection per request"
def _random_server_connection(self):
server = random.choice(self.servers)
try:
host, port = server.split(":")
return r.connect(host=host, port=port)
except ValueError:
return r.connect(host=server)
def _ensure_db(self):
with self._random_server_connection() as conn:
dbs = r.db_list().run(conn)
if not self.db in dbs:
self.logger.info("creating rethinkdb database %s", repr(self.db))
r.db_create(self.db).run(conn)
tables = r.db(self.db).table_list().run(conn)
if not "sites" in tables:
self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.db))
r.db(self.db).table_create("sites", shards=self.shards, replicas=self.replicas).run(conn)
r.db(self.db).table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]]).run(conn)
if not "pages" in tables:
self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.db))
r.db(self.db).table_create("pages", shards=self.shards, replicas=self.replicas).run(conn)
r.db(self.db).table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]]).run(conn)
# if not "jobs" in tables:
# r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(conn)
dbs = self.r.run(r.db_list())
if not self.r.db in dbs:
self.logger.info("creating rethinkdb database %s", repr(self.r.db))
self.r.run(r.db_create(self.r.db))
tables = self.r.run(r.table_list())
if not "sites" in tables:
self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.r.db))
self.r.run(r.table_create("sites", shards=self.shards, replicas=self.replicas))
self.r.run(r.table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]]))
if not "pages" in tables:
self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.db))
self.r.run(r.table_create("pages", shards=self.shards, replicas=self.replicas))
self.r.run(r.table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]]))
# if not "jobs" in tables:
# r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(conn)
def _vet_result(self, result, **kwargs):
self.logger.debug("vetting expected=%s result=%s", kwargs, result)
@ -67,41 +55,36 @@ class RethinkDbFrontier:
def new_site(self, site):
self.logger.info("inserting into 'sites' table %s", site)
with self._random_server_connection() as conn:
result = r.db(self.db).table("sites").insert(site.to_dict()).run(conn)
self._vet_result(result, inserted=1)
site.id = result["generated_keys"][0]
result = self.r.run(r.table("sites").insert(site.to_dict()))
self._vet_result(result, inserted=1)
site.id = result["generated_keys"][0]
def update_site(self, site):
self.logger.debug("updating 'sites' table entry %s", site)
with self._random_server_connection() as conn:
result = r.db(self.db).table("sites").get(site.id).replace(site.to_dict()).run(conn)
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
result = self.r.run(r.table("sites").get(site.id).replace(site.to_dict()))
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def update_page(self, page):
self.logger.debug("updating 'pages' table entry %s", page)
with self._random_server_connection() as conn:
result = r.db(self.db).table("pages").get(page.id).replace(page.to_dict()).run(conn)
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
result = self.r.run(r.table("pages").get(page.id).replace(page.to_dict()))
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def new_page(self, page):
self.logger.debug("inserting into 'pages' table %s", page)
with self._random_server_connection() as conn:
result = r.db(self.db).table("pages").insert(page.to_dict()).run(conn)
self._vet_result(result, inserted=1)
result = self.r.run(r.table("pages").insert(page.to_dict()))
self._vet_result(result, inserted=1)
def claim_site(self):
# XXX keep track of aggregate priority and prioritize sites accordingly?
while True:
with self._random_server_connection() as conn:
result = (r.db(self.db).table("sites")
.between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed")
.order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True).run(conn))
self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1:
site = brozzler.Site(**result["changes"][0]["new_val"])
else:
raise brozzler.NothingToClaim
result = self.r.run(r.table("sites")
.between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed")
.order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True))
self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1:
site = brozzler.Site(**result["changes"][0]["new_val"])
else:
raise brozzler.NothingToClaim
# XXX This is the only place we enforce time limit for now. Worker
# loop should probably check time limit. Maybe frontier needs a
# housekeeping thread to ensure that time limits get enforced in a
@ -121,29 +104,26 @@ class RethinkDbFrontier:
return False
def claim_page(self, site):
with self._random_server_connection() as conn:
result = (r.db(self.db).table("pages")
.between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site")
.order_by(index=r.desc("priority_by_site")).limit(1)
.update({"claimed":True},return_changes=True).run(conn))
self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1:
return brozzler.Page(**result["changes"][0]["new_val"])
else:
raise brozzler.NothingToClaim
result = self.r.run(r.table("pages")
.between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site")
.order_by(index=r.desc("priority_by_site")).limit(1)
.update({"claimed":True},return_changes=True))
self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1:
return brozzler.Page(**result["changes"][0]["new_val"])
else:
raise brozzler.NothingToClaim
def has_outstanding_pages(self, site):
with self._random_server_connection() as conn:
cursor = r.db(self.db).table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1).run(conn)
return len(list(cursor)) > 0
cursor = self.r.run(r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1))
return len(list(cursor)) > 0
def get_page(self, page):
with self._random_server_connection() as conn:
result = r.db(self.db).table("pages").get(page.id).run(conn)
if result:
return brozzler.Page(**result)
else:
return None
result = self.r.run(r.table("pages").get(page.id))
if result:
return brozzler.Page(**result)
else:
return None
def completed_page(self, site, page):
page.brozzle_count += 1

View File

@ -61,12 +61,12 @@ def new_site(frontier, site):
logging.info("new site {}".format(site))
frontier.new_site(site)
try:
if is_permitted_by_robots(site, site.seed):
page = Page(site.seed, site_id=site.id, hops_from_seed=0, priority=1000)
if brozzler.is_permitted_by_robots(site, site.seed):
page = brozzler.Page(site.seed, site_id=site.id, hops_from_seed=0, priority=1000)
frontier.new_page(page)
else:
logging.warn("seed url {} is blocked by robots.txt".format(site.seed))
except ReachedLimit as e:
except brozzler.ReachedLimit as e:
site.note_limit_reached(e)
frontier.update_site(site)

View File

@ -1,10 +1,7 @@
argparse
PyYAML
git+https://github.com/nlevitt/surt.git@py3
# youtube_dl
git+https://github.com/nlevitt/youtube-dl.git@brozzler
git+https://github.com/seomoz/reppy.git
git+https://github.com/seomoz/reppy.git # https://github.com/seomoz/reppy/commit/7661606c not in pypi package
requests
# websocket-client
git+https://github.com/nlevitt/websocket-client.git@tweaks
rethinkdb