From f334107b4748e1594a450d299e55d39b69a5f3ce Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 28 Aug 2015 00:37:26 +0000 Subject: [PATCH] support for specifying rethinkdb database name; wrap rethinkdb operations and retry if appropriate (as best as we can tell) --- README.md | 9 ++-- bin/brozzler-new-job | 8 +-- bin/brozzler-new-site | 8 +-- bin/brozzler-worker | 8 +-- brozzler/__init__.py | 35 +++++++++++++ brozzler/frontier.py | 118 ++++++++++++++++++------------------------ brozzler/job.py | 6 +-- requirements.txt | 5 +- 8 files changed, 108 insertions(+), 89 deletions(-) diff --git a/README.md b/README.md index 449759d..abf8415 100644 --- a/README.md +++ b/README.md @@ -15,19 +15,20 @@ Brozzler is designed to work in conjunction with Installation ------------ ``` -git clone https://github.com/nlevitt/brozzler +git clone https://github.com/nlevitt/brozzler.git cd brozzler # set up virtualenv if desired pip install -r requirements.txt . ``` -Brozzler also requires rethinkdb. +Brozzler also requires a rethinkdb deployment. Fonts for good screenshots -------------------------- On ubuntu 14.04 trusty I installed these packages: -xfonts-base ttf-mscorefonts-installer fonts-arphic-bkai00mp fonts-arphic-bsmi00lp fonts-arphic-gbsn00lp fonts-arphic-gkai00mp fonts-arphic-ukai fonts-farsiweb fonts-nafees fonts-sil-abyssinica fonts-sil-ezra fonts-sil-padauk fonts-unfonts- -extra fonts-unfonts-core ttf-indic-fonts fonts-thai-tlwg fonts-lklug-sinhala +xfonts-base ttf-mscorefonts-installer fonts-arphic-bkai00mp fonts-arphic-bsmi00lp fonts-arphic-gbsn00lp fonts-arphic-gkai00mp fonts-arphic-ukai fonts-farsiweb fonts-nafees fonts-sil-abyssinica fonts-sil-ezra fonts-sil-padauk fonts-unfonts-extra fonts-unfonts-core ttf-indic-fonts fonts-thai-tlwg fonts-lklug-sinhala + +Haven't looked much at the resulting screenshots yet though. License ------- diff --git a/bin/brozzler-new-job b/bin/brozzler-new-job index fc3f8a6..34145d0 100755 --- a/bin/brozzler-new-job +++ b/bin/brozzler-new-job @@ -13,8 +13,10 @@ arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description="brozzler-new-job - queue new job with brozzler", formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('job_conf_file', metavar='JOB_CONF_FILE', help='brozzler job configuration file in yaml') -arg_parser.add_argument("--db", dest="db", default="localhost", - help="comma-separated list of RethinkDB server addresses, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org") +arg_parser.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', default="localhost", + help='rethinkdb servers, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') +arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="brozzler", + help='rethinkdb database name') arg_parser.add_argument("-v", "--verbose", dest="log_level", action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument("--version", action="version", @@ -24,6 +26,6 @@ args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") -frontier = brozzler.RethinkDbFrontier(args.db.split(",")) +frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db) brozzler.job.new_job_file(frontier, job_conf_file) diff --git a/bin/brozzler-new-site b/bin/brozzler-new-site index 3f3f0c9..a138a73 100755 --- a/bin/brozzler-new-site +++ b/bin/brozzler-new-site @@ -12,8 +12,10 @@ arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description="brozzler-new-site - register site to brozzle", formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('seed', metavar='SEED', help='seed url') -arg_parser.add_argument("--db", dest="db", default="localhost", - help="comma-separated list of RethinkDB server addresses, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org") +arg_parser.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', default="localhost", + help='rethinkdb servers, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') +arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="brozzler", + help='rethinkdb database name') arg_parser.add_argument("--proxy", dest="proxy", default=None, help="http proxy for this site") arg_parser.add_argument("--time-limit", dest="time_limit", default=None, help="time limit in seconds for this site") arg_parser.add_argument("-H", "--extra-header", action="append", @@ -43,6 +45,6 @@ site = brozzler.Site(seed=args.seed, proxy=args.proxy, enable_warcprox_features=args.enable_warcprox_features, extra_headers=extra_headers) -frontier = brozzler.RethinkDbFrontier(args.db.split(",")) +frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db) brozzler.new_site(frontier, site) diff --git a/bin/brozzler-worker b/bin/brozzler-worker index c7bd3f0..6df96c2 100755 --- a/bin/brozzler-worker +++ b/bin/brozzler-worker @@ -14,8 +14,10 @@ import traceback arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), formatter_class=argparse.ArgumentDefaultsHelpFormatter) -arg_parser.add_argument("--db", dest="db", default="localhost", - help="comma-separated list of RethinkDB server addresses, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org") +arg_parser.add_argument('--rethinkdb-servers', dest='rethinkdb_servers', default="localhost", + help='rethinkdb servers, e.g. db0.foo.org,db0.foo.org:38015,db1.foo.org') +arg_parser.add_argument('--rethinkdb-db', dest='rethinkdb_db', default="brozzler", + help='rethinkdb database name') arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', help='executable to use to invoke chrome') arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', @@ -49,7 +51,7 @@ signal.signal(signal.SIGQUIT, dump_state) signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) -frontier = brozzler.RethinkDbFrontier(args.db.split(",")) +frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db) worker = brozzler.BrozzlerWorker(frontier, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe) worker.start() diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 76c08d4..8e20a88 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -45,4 +45,39 @@ class ReachedLimit(Exception): def __str__(self): return self.__repr__() +class Rethinker: + import logging + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, servers=["localhost"], db=None): + self.servers = servers + self.db = db + + # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py + # "Best practices: Managing connections: a connection per request" + def _random_server_connection(self): + import rethinkdb as r + import random + while True: + server = random.choice(self.servers) + try: + try: + host, port = server.split(":") + return r.connect(host=host, port=port) + except ValueError: + return r.connect(host=server) + except Exception as e: + self.logger.error("will keep trying to get a connection after failure connecting to %s", server, exc_info=True) + import time + time.sleep(0.5) + + def run(self, query): + while True: + with self._random_server_connection() as conn: + try: + return query.run(conn, db=self.db) + except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e: + self.logger.error("will retry rethinkdb query/operation %s which failed like so:", exc_info=True) + + # vim: set sw=4 et: diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 927724f..95ff7e8 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -16,39 +16,27 @@ class RethinkDbFrontier: logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, servers=["localhost"], db="brozzler", shards=3, replicas=3): - self.servers = servers - self.db = db + self.r = brozzler.Rethinker(servers, db) self.shards = shards self.replicas = replicas self._ensure_db() - # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py - # "Best practices: Managing connections: a connection per request" - def _random_server_connection(self): - server = random.choice(self.servers) - try: - host, port = server.split(":") - return r.connect(host=host, port=port) - except ValueError: - return r.connect(host=server) - def _ensure_db(self): - with self._random_server_connection() as conn: - dbs = r.db_list().run(conn) - if not self.db in dbs: - self.logger.info("creating rethinkdb database %s", repr(self.db)) - r.db_create(self.db).run(conn) - tables = r.db(self.db).table_list().run(conn) - if not "sites" in tables: - self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.db)) - r.db(self.db).table_create("sites", shards=self.shards, replicas=self.replicas).run(conn) - r.db(self.db).table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]]).run(conn) - if not "pages" in tables: - self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.db)) - r.db(self.db).table_create("pages", shards=self.shards, replicas=self.replicas).run(conn) - r.db(self.db).table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]]).run(conn) - # if not "jobs" in tables: - # r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(conn) + dbs = self.r.run(r.db_list()) + if not self.r.db in dbs: + self.logger.info("creating rethinkdb database %s", repr(self.r.db)) + self.r.run(r.db_create(self.r.db)) + tables = self.r.run(r.table_list()) + if not "sites" in tables: + self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.r.db)) + self.r.run(r.table_create("sites", shards=self.shards, replicas=self.replicas)) + self.r.run(r.table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]])) + if not "pages" in tables: + self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.db)) + self.r.run(r.table_create("pages", shards=self.shards, replicas=self.replicas)) + self.r.run(r.table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]])) + # if not "jobs" in tables: + # r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(conn) def _vet_result(self, result, **kwargs): self.logger.debug("vetting expected=%s result=%s", kwargs, result) @@ -67,41 +55,36 @@ class RethinkDbFrontier: def new_site(self, site): self.logger.info("inserting into 'sites' table %s", site) - with self._random_server_connection() as conn: - result = r.db(self.db).table("sites").insert(site.to_dict()).run(conn) - self._vet_result(result, inserted=1) - site.id = result["generated_keys"][0] + result = self.r.run(r.table("sites").insert(site.to_dict())) + self._vet_result(result, inserted=1) + site.id = result["generated_keys"][0] def update_site(self, site): self.logger.debug("updating 'sites' table entry %s", site) - with self._random_server_connection() as conn: - result = r.db(self.db).table("sites").get(site.id).replace(site.to_dict()).run(conn) - self._vet_result(result, replaced=[0,1], unchanged=[0,1]) + result = self.r.run(r.table("sites").get(site.id).replace(site.to_dict())) + self._vet_result(result, replaced=[0,1], unchanged=[0,1]) def update_page(self, page): self.logger.debug("updating 'pages' table entry %s", page) - with self._random_server_connection() as conn: - result = r.db(self.db).table("pages").get(page.id).replace(page.to_dict()).run(conn) - self._vet_result(result, replaced=[0,1], unchanged=[0,1]) + result = self.r.run(r.table("pages").get(page.id).replace(page.to_dict())) + self._vet_result(result, replaced=[0,1], unchanged=[0,1]) def new_page(self, page): self.logger.debug("inserting into 'pages' table %s", page) - with self._random_server_connection() as conn: - result = r.db(self.db).table("pages").insert(page.to_dict()).run(conn) - self._vet_result(result, inserted=1) + result = self.r.run(r.table("pages").insert(page.to_dict())) + self._vet_result(result, inserted=1) def claim_site(self): # XXX keep track of aggregate priority and prioritize sites accordingly? while True: - with self._random_server_connection() as conn: - result = (r.db(self.db).table("sites") - .between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed") - .order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True).run(conn)) - self._vet_result(result, replaced=[0,1]) - if result["replaced"] == 1: - site = brozzler.Site(**result["changes"][0]["new_val"]) - else: - raise brozzler.NothingToClaim + result = self.r.run(r.table("sites") + .between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed") + .order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True)) + self._vet_result(result, replaced=[0,1]) + if result["replaced"] == 1: + site = brozzler.Site(**result["changes"][0]["new_val"]) + else: + raise brozzler.NothingToClaim # XXX This is the only place we enforce time limit for now. Worker # loop should probably check time limit. Maybe frontier needs a # housekeeping thread to ensure that time limits get enforced in a @@ -121,29 +104,26 @@ class RethinkDbFrontier: return False def claim_page(self, site): - with self._random_server_connection() as conn: - result = (r.db(self.db).table("pages") - .between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site") - .order_by(index=r.desc("priority_by_site")).limit(1) - .update({"claimed":True},return_changes=True).run(conn)) - self._vet_result(result, replaced=[0,1]) - if result["replaced"] == 1: - return brozzler.Page(**result["changes"][0]["new_val"]) - else: - raise brozzler.NothingToClaim + result = self.r.run(r.table("pages") + .between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site") + .order_by(index=r.desc("priority_by_site")).limit(1) + .update({"claimed":True},return_changes=True)) + self._vet_result(result, replaced=[0,1]) + if result["replaced"] == 1: + return brozzler.Page(**result["changes"][0]["new_val"]) + else: + raise brozzler.NothingToClaim def has_outstanding_pages(self, site): - with self._random_server_connection() as conn: - cursor = r.db(self.db).table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1).run(conn) - return len(list(cursor)) > 0 + cursor = self.r.run(r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1)) + return len(list(cursor)) > 0 def get_page(self, page): - with self._random_server_connection() as conn: - result = r.db(self.db).table("pages").get(page.id).run(conn) - if result: - return brozzler.Page(**result) - else: - return None + result = self.r.run(r.table("pages").get(page.id)) + if result: + return brozzler.Page(**result) + else: + return None def completed_page(self, site, page): page.brozzle_count += 1 diff --git a/brozzler/job.py b/brozzler/job.py index 5a49993..ca70374 100644 --- a/brozzler/job.py +++ b/brozzler/job.py @@ -61,12 +61,12 @@ def new_site(frontier, site): logging.info("new site {}".format(site)) frontier.new_site(site) try: - if is_permitted_by_robots(site, site.seed): - page = Page(site.seed, site_id=site.id, hops_from_seed=0, priority=1000) + if brozzler.is_permitted_by_robots(site, site.seed): + page = brozzler.Page(site.seed, site_id=site.id, hops_from_seed=0, priority=1000) frontier.new_page(page) else: logging.warn("seed url {} is blocked by robots.txt".format(site.seed)) - except ReachedLimit as e: + except brozzler.ReachedLimit as e: site.note_limit_reached(e) frontier.update_site(site) diff --git a/requirements.txt b/requirements.txt index a5b98b9..423f701 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,7 @@ -argparse PyYAML git+https://github.com/nlevitt/surt.git@py3 -# youtube_dl git+https://github.com/nlevitt/youtube-dl.git@brozzler -git+https://github.com/seomoz/reppy.git +git+https://github.com/seomoz/reppy.git # https://github.com/seomoz/reppy/commit/7661606c not in pypi package requests -# websocket-client git+https://github.com/nlevitt/websocket-client.git@tweaks rethinkdb