diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 516c8d0..7715ecf 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -39,42 +39,6 @@ class ReachedLimit(Exception): def __str__(self): return self.__repr__() -class Rethinker: - import logging - logger = logging.getLogger(__module__ + "." + __qualname__) - - def __init__(self, servers=["localhost"], db=None): - self.servers = servers - self.db = db - - # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py - # "Best practices: Managing connections: a connection per request" - def _random_server_connection(self): - import random - import rethinkdb as r - while True: - server = random.choice(self.servers) - try: - try: - host, port = server.split(":") - return r.connect(host=host, port=port) - except ValueError: - return r.connect(host=server) - except Exception as e: - self.logger.error("will keep trying to get a connection after failure connecting to %s", server, exc_info=True) - import time - time.sleep(0.5) - - def run(self, query): - import rethinkdb as r - while True: - with self._random_server_connection() as conn: - try: - return query.run(conn, db=self.db) - except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e: - self.logger.error("will retry rethinkdb query/operation %s which failed like so:", exc_info=True) - time.sleep(0.5) - class BaseDictable: def to_dict(self): d = dict(vars(self)) diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 3908319..1af7044 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -5,6 +5,7 @@ r = rethinkdb import random import time import datetime +import pyrethink class UnexpectedDbResult(Exception): pass @@ -13,7 +14,7 @@ class RethinkDbFrontier: logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, servers=["localhost"], db="brozzler", shards=3, replicas=3): - self.r = brozzler.Rethinker(servers, db) + self.r = pyrethink.Rethinker(servers, db) self.shards = shards self.replicas = replicas self._ensure_db() @@ -127,8 +128,8 @@ class RethinkDbFrontier: raise brozzler.NothingToClaim def has_outstanding_pages(self, site): - cursor = self.r.run(r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1)) - return len(list(cursor)) > 0 + results_iter = self.r.results_iter(r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1)) + return len(list(results_iter)) > 0 def page(self, id): result = self.r.run(r.table("pages").get(id)) @@ -147,7 +148,7 @@ class RethinkDbFrontier: self.update_site(site) def active_jobs(self): - results = self.r.run(r.table("jobs").filter({"status":"ACTIVE"})) + results = self.r.results_iter(r.table("jobs").filter({"status":"ACTIVE"})) for result in results: yield brozzler.Job(**result) @@ -165,11 +166,12 @@ class RethinkDbFrontier: self.logger.warn("%s is already %s", job, job.status) return True - results = self.r.run(r.table("sites").get_all(job_id, index="job_id")) + results = self.r.results_iter(r.table("sites").get_all(job_id, index="job_id")) n = 0 for result in results: site = brozzler.Site(**result) if not site.status.startswith("FINISH"): + results.close() return False n += 1 @@ -188,7 +190,7 @@ class RethinkDbFrontier: def disclaim_site(self, site, page=None): self.logger.info("disclaiming %s", site) site.claimed = False - site.last_disclaimed = time.time() + site.last_disclaimed = time.time() # XXX use string or rethinkdb time type? if not page and not self.has_outstanding_pages(site): self.finished(site, "FINISHED") else: diff --git a/requirements.txt b/requirements.txt index 90e4404..5ba1ccd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,15 @@ PyYAML -git+https://github.com/nlevitt/surt.git@py3 -# -e /home/nlevitt/workspace/surt git+https://github.com/nlevitt/youtube-dl.git@brozzler git+https://github.com/seomoz/reppy.git # https://github.com/seomoz/reppy/commit/7661606c not in pypi package requests git+https://github.com/nlevitt/websocket-client.git@tweaks rethinkdb pillow + +git+https://github.com/nlevitt/surt.git@py3 +https://github.com/nlevitt/pyrethink.git +. + +# -e /home/nlevitt/workspace/surt +# -e /home/nlevitt/workspace/pyrethink # -e . --e .