rethinkdb connection per request, to server chosen randomly from list

This commit is contained in:
Noah Levitt 2015-08-18 23:47:28 +00:00
parent a878730e02
commit 382c826678

View file

@ -4,6 +4,7 @@ import logging
import brozzler import brozzler
import rethinkdb import rethinkdb
r = rethinkdb r = rethinkdb
import random
class UnexpectedDbResult(Exception): class UnexpectedDbResult(Exception):
pass pass
@ -16,40 +17,32 @@ class BrozzlerRethinkDb:
self.db = db self.db = db
self.shards = shards self.shards = shards
self.replicas = replicas self.replicas = replicas
self._ensure_db()
self._conn = self._connect(servers[0]) # XXX round robin # https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
try: # "Best practices: Managing connections: a connection per request"
tables = r.db(self.db).table_list().run(self._conn) def _random_server_connection(self):
for tbl in "sites", "pages": server = random.choice(self.servers)
if not tbl in tables:
raise Exception("rethinkdb database {} exists but does not have table {}".format(repr(self.db), repr(tbl)))
except rethinkdb.errors.ReqlOpFailedError as e:
self.logger.info("rethinkdb database %s does not exist, initializing", repr(self.db))
self._init_db()
def _connect(self, server):
self.logger.info("connecting to rethinkdb at %s", server)
try: try:
host, port = server.split(":") host, port = server.split(":")
return r.connect(host=host, port=port) return r.connect(host=host, port=port)
except ValueError: except ValueError:
return r.connect(host=server) return r.connect(host=server)
# def _round_robin_connection(self): def _ensure_db(self):
# while True: with self._random_server_connection() as conn:
# for server in self.servers: try:
# try: tables = r.db(self.db).table_list().run(conn)
# host, port = server.split(":") for tbl in "sites", "pages":
# conn = r.connect(host=host, port=port) if not tbl in tables:
# except ValueError: raise Exception("rethinkdb database {} exists but does not have table {}".format(repr(self.db), repr(tbl)))
# conn = r.connect(host=server) except rethinkdb.errors.ReqlOpFailedError as e:
self.logger.info("rethinkdb database %s does not exist, initializing", repr(self.db))
def _init_db(self): r.db_create(self.db).run(conn)
r.db_create(self.db).run(self._conn) # r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(conn)
# r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(self._conn) r.db(self.db).table_create("sites", shards=self.shards, replicas=self.replicas).run(conn)
r.db(self.db).table_create("sites", shards=self.shards, replicas=self.replicas).run(self._conn) r.db(self.db).table_create("pages", shards=self.shards, replicas=self.replicas).run(conn)
r.db(self.db).table_create("pages", shards=self.shards, replicas=self.replicas).run(self._conn) r.db(self.db).table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["claimed"], r.row["brozzle_count"], r.row["priority"]]).run(conn)
r.db(self.db).table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["claimed"], r.row["brozzle_count"], r.row["priority"]]).run(self._conn)
self.logger.info("created database %s with tables 'sites' and 'pages'", self.db) self.logger.info("created database %s with tables 'sites' and 'pages'", self.db)
def _vet_result(self, result, **kwargs): def _vet_result(self, result, **kwargs):
@ -69,28 +62,33 @@ class BrozzlerRethinkDb:
def new_site(self, site): def new_site(self, site):
self.logger.info("inserting into 'sites' table %s", site) self.logger.info("inserting into 'sites' table %s", site)
result = r.db(self.db).table("sites").insert(site.to_dict()).run(self._conn) with self._random_server_connection() as conn:
result = r.db(self.db).table("sites").insert(site.to_dict()).run(conn)
self._vet_result(result, inserted=1) self._vet_result(result, inserted=1)
site.id = result["generated_keys"][0] site.id = result["generated_keys"][0]
def update_site(self, site): def update_site(self, site):
self.logger.debug("updating 'sites' table entry %s", site) self.logger.debug("updating 'sites' table entry %s", site)
result = r.db(self.db).table("sites").get(site.id).update(site.to_dict()).run(self._conn) with self._random_server_connection() as conn:
result = r.db(self.db).table("sites").get(site.id).update(site.to_dict()).run(conn)
self._vet_result(result, replaced=1) self._vet_result(result, replaced=1)
def update_page(self, page): def update_page(self, page):
self.logger.debug("updating 'pages' table entry %s", page) self.logger.debug("updating 'pages' table entry %s", page)
result = r.db(self.db).table("pages").get(page.id).update(page.to_dict()).run(self._conn) with self._random_server_connection() as conn:
result = r.db(self.db).table("pages").get(page.id).update(page.to_dict()).run(conn)
self._vet_result(result, replaced=[0,1], unchanged=[0,1]) self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def new_page(self, page): def new_page(self, page):
self.logger.debug("inserting into 'pages' table %s", page) self.logger.debug("inserting into 'pages' table %s", page)
result = r.db(self.db).table("pages").insert(page.to_dict()).run(self._conn) with self._random_server_connection() as conn:
result = r.db(self.db).table("pages").insert(page.to_dict()).run(conn)
self._vet_result(result, inserted=1) self._vet_result(result, inserted=1)
def claim_site(self): def claim_site(self):
# XXX keep track of aggregate priority and prioritize sites accordingly? # XXX keep track of aggregate priority and prioritize sites accordingly?
result = r.db(self.db).table("sites").filter({"claimed":False,"status":"ACTIVE"}).limit(1).update({"claimed":True},return_changes=True).run(self._conn) with self._random_server_connection() as conn:
result = r.db(self.db).table("sites").filter({"claimed":False,"status":"ACTIVE"}).limit(1).update({"claimed":True},return_changes=True).run(conn)
self._vet_result(result, replaced=[0,1]) self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1: if result["replaced"] == 1:
return brozzler.Site(**result["changes"][0]["new_val"]) return brozzler.Site(**result["changes"][0]["new_val"])
@ -98,10 +96,11 @@ class BrozzlerRethinkDb:
raise brozzler.NothingToClaim raise brozzler.NothingToClaim
def claim_page(self, site): def claim_page(self, site):
with self._random_server_connection() as conn:
result = (r.db(self.db).table("pages") result = (r.db(self.db).table("pages")
.between([site.id,False,0,brozzler.MIN_PRIORITY], [site.id,False,0,brozzler.MAX_PRIORITY], index="priority_by_site") .between([site.id,False,0,brozzler.MIN_PRIORITY], [site.id,False,0,brozzler.MAX_PRIORITY], index="priority_by_site")
.order_by(index=r.desc("priority_by_site")).limit(1) .order_by(index=r.desc("priority_by_site")).limit(1)
.update({"claimed":True},return_changes=True).run(self._conn)) .update({"claimed":True},return_changes=True).run(conn))
self._vet_result(result, replaced=[0,1]) self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1: if result["replaced"] == 1:
return brozzler.Page(**result["changes"][0]["new_val"]) return brozzler.Page(**result["changes"][0]["new_val"])
@ -109,11 +108,13 @@ class BrozzlerRethinkDb:
raise brozzler.NothingToClaim raise brozzler.NothingToClaim
def has_outstanding_pages(self, site): def has_outstanding_pages(self, site):
cursor = r.db(self.db).table("pages").between([site.id,False,0,brozzler.MIN_PRIORITY], [site.id,True,0,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1).run(self._conn) with self._random_server_connection() as conn:
cursor = r.db(self.db).table("pages").between([site.id,False,0,brozzler.MIN_PRIORITY], [site.id,True,0,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1).run(conn)
return len(list(cursor)) > 0 return len(list(cursor)) > 0
def get_page(self, page): def get_page(self, page):
result = r.db(self.db).table("pages").get(page.id).run(self._conn) with self._random_server_connection() as conn:
result = r.db(self.db).table("pages").get(page.id).run(conn)
if result: if result:
return brozzler.Page(**result) return brozzler.Page(**result)
else: else: