Rethinker moved to pyrethink library

This commit is contained in:
Noah Levitt 2015-09-16 19:24:17 +00:00
parent a8f9664212
commit c682627aec
3 changed files with 15 additions and 45 deletions

View File

@ -39,42 +39,6 @@ class ReachedLimit(Exception):
def __str__(self):
return self.__repr__()
class Rethinker:
import logging
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, servers=["localhost"], db=None):
self.servers = servers
self.db = db
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
# "Best practices: Managing connections: a connection per request"
def _random_server_connection(self):
import random
import rethinkdb as r
while True:
server = random.choice(self.servers)
try:
try:
host, port = server.split(":")
return r.connect(host=host, port=port)
except ValueError:
return r.connect(host=server)
except Exception as e:
self.logger.error("will keep trying to get a connection after failure connecting to %s", server, exc_info=True)
import time
time.sleep(0.5)
def run(self, query):
import rethinkdb as r
while True:
with self._random_server_connection() as conn:
try:
return query.run(conn, db=self.db)
except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e:
self.logger.error("will retry rethinkdb query/operation %s which failed like so:", exc_info=True)
time.sleep(0.5)
class BaseDictable:
def to_dict(self):
d = dict(vars(self))

View File

@ -5,6 +5,7 @@ r = rethinkdb
import random
import time
import datetime
import pyrethink
class UnexpectedDbResult(Exception):
pass
@ -13,7 +14,7 @@ class RethinkDbFrontier:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, servers=["localhost"], db="brozzler", shards=3, replicas=3):
self.r = brozzler.Rethinker(servers, db)
self.r = pyrethink.Rethinker(servers, db)
self.shards = shards
self.replicas = replicas
self._ensure_db()
@ -127,8 +128,8 @@ class RethinkDbFrontier:
raise brozzler.NothingToClaim
def has_outstanding_pages(self, site):
cursor = self.r.run(r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1))
return len(list(cursor)) > 0
results_iter = self.r.results_iter(r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1))
return len(list(results_iter)) > 0
def page(self, id):
result = self.r.run(r.table("pages").get(id))
@ -147,7 +148,7 @@ class RethinkDbFrontier:
self.update_site(site)
def active_jobs(self):
results = self.r.run(r.table("jobs").filter({"status":"ACTIVE"}))
results = self.r.results_iter(r.table("jobs").filter({"status":"ACTIVE"}))
for result in results:
yield brozzler.Job(**result)
@ -165,11 +166,12 @@ class RethinkDbFrontier:
self.logger.warn("%s is already %s", job, job.status)
return True
results = self.r.run(r.table("sites").get_all(job_id, index="job_id"))
results = self.r.results_iter(r.table("sites").get_all(job_id, index="job_id"))
n = 0
for result in results:
site = brozzler.Site(**result)
if not site.status.startswith("FINISH"):
results.close()
return False
n += 1
@ -188,7 +190,7 @@ class RethinkDbFrontier:
def disclaim_site(self, site, page=None):
self.logger.info("disclaiming %s", site)
site.claimed = False
site.last_disclaimed = time.time()
site.last_disclaimed = time.time() # XXX use string or rethinkdb time type?
if not page and not self.has_outstanding_pages(site):
self.finished(site, "FINISHED")
else:

View File

@ -1,11 +1,15 @@
PyYAML
git+https://github.com/nlevitt/surt.git@py3
# -e /home/nlevitt/workspace/surt
git+https://github.com/nlevitt/youtube-dl.git@brozzler
git+https://github.com/seomoz/reppy.git # https://github.com/seomoz/reppy/commit/7661606c not in pypi package
requests
git+https://github.com/nlevitt/websocket-client.git@tweaks
rethinkdb
pillow
git+https://github.com/nlevitt/surt.git@py3
https://github.com/nlevitt/pyrethink.git
.
# -e /home/nlevitt/workspace/surt
# -e /home/nlevitt/workspace/pyrethink
# -e .
-e .