mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-02-24 08:39:59 -05:00
fix some rethinkdb related stuff; most notably r.desc() and related stuff don't currently work correctly if r is a Rethinker, so use rethinkdb directly in that case
This commit is contained in:
parent
8bf34d9db6
commit
40522ef5a5
@ -10,11 +10,6 @@ def _read_version():
|
|||||||
|
|
||||||
version = _read_version()
|
version = _read_version()
|
||||||
|
|
||||||
# XXX don't know if these should be restricted; right now, only needed for
|
|
||||||
# rethinkdb "between" query
|
|
||||||
MAX_PRIORITY = 1000000000
|
|
||||||
MIN_PRIORITY = -1000000000
|
|
||||||
|
|
||||||
class ShutdownRequested(Exception):
|
class ShutdownRequested(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ import brozzler
|
|||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
import datetime
|
import datetime
|
||||||
|
import rethinkdb
|
||||||
|
|
||||||
class UnexpectedDbResult(Exception):
|
class UnexpectedDbResult(Exception):
|
||||||
pass
|
pass
|
||||||
@ -25,12 +26,12 @@ class RethinkDbFrontier:
|
|||||||
if not "sites" in tables:
|
if not "sites" in tables:
|
||||||
self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.r.dbname))
|
self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.r.dbname))
|
||||||
self.r.table_create("sites", shards=self.shards, replicas=self.replicas).run()
|
self.r.table_create("sites", shards=self.shards, replicas=self.replicas).run()
|
||||||
self.r.table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]]).run()
|
self.r.table("sites").index_create("sites_last_disclaimed", [self.r.row["status"], self.r.row["claimed"], self.r.row["last_disclaimed"]]).run()
|
||||||
self.r.table("sites").index_create("job_id").run()
|
self.r.table("sites").index_create("job_id").run()
|
||||||
if not "pages" in tables:
|
if not "pages" in tables:
|
||||||
self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.dbname))
|
self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.dbname))
|
||||||
self.r.table_create("pages", shards=self.shards, replicas=self.replicas).run()
|
self.r.table_create("pages", shards=self.shards, replicas=self.replicas).run()
|
||||||
self.r.table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]]).run()
|
self.r.table("pages").index_create("priority_by_site", [self.r.row["site_id"], self.r.row["brozzle_count"], self.r.row["claimed"], self.r.row["priority"]]).run()
|
||||||
if not "jobs" in tables:
|
if not "jobs" in tables:
|
||||||
self.logger.info("creating rethinkdb table 'jobs' in database %s", repr(self.r.dbname))
|
self.logger.info("creating rethinkdb table 'jobs' in database %s", repr(self.r.dbname))
|
||||||
self.r.table_create("jobs", shards=self.shards, replicas=self.replicas).run()
|
self.r.table_create("jobs", shards=self.shards, replicas=self.replicas).run()
|
||||||
@ -114,9 +115,10 @@ class RethinkDbFrontier:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def claim_page(self, site, worker_id):
|
def claim_page(self, site, worker_id):
|
||||||
|
# import pdb; pdb.set_trace()
|
||||||
result = (self.r.table("pages")
|
result = (self.r.table("pages")
|
||||||
.between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site")
|
.between([site.id, 0, False, self.r.minval], [site.id, 0, False, self.r.maxval], index="priority_by_site")
|
||||||
.order_by(index=r.desc("priority_by_site")).limit(1)
|
.order_by(index=rethinkdb.desc("priority_by_site")).limit(1)
|
||||||
.update({"claimed":True,"last_claimed_by":worker_id},return_changes=True)).run()
|
.update({"claimed":True,"last_claimed_by":worker_id},return_changes=True)).run()
|
||||||
self.logger.info("query returned %s", result)
|
self.logger.info("query returned %s", result)
|
||||||
self._vet_result(result, replaced=[0,1])
|
self._vet_result(result, replaced=[0,1])
|
||||||
@ -126,7 +128,7 @@ class RethinkDbFrontier:
|
|||||||
raise brozzler.NothingToClaim
|
raise brozzler.NothingToClaim
|
||||||
|
|
||||||
def has_outstanding_pages(self, site):
|
def has_outstanding_pages(self, site):
|
||||||
results_iter = self.r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1).run()
|
results_iter = self.r.table("pages").between([site.id, 0, False, self.r.minval], [site.id, 0, True, self.r.maxval], index="priority_by_site").limit(1).run()
|
||||||
return len(list(results_iter)) > 0
|
return len(list(results_iter)) > 0
|
||||||
|
|
||||||
def page(self, id):
|
def page(self, id):
|
||||||
|
@ -110,8 +110,6 @@ class Page(brozzler.BaseDictable):
|
|||||||
priority = 0
|
priority = 0
|
||||||
priority += max(0, 10 - self.hops_from_seed)
|
priority += max(0, 10 - self.hops_from_seed)
|
||||||
priority += max(0, 6 - self.canon_url().count("/"))
|
priority += max(0, 6 - self.canon_url().count("/"))
|
||||||
priority = max(priority, brozzler.MIN_PRIORITY)
|
|
||||||
priority = min(priority, brozzler.MAX_PRIORITY)
|
|
||||||
return priority
|
return priority
|
||||||
|
|
||||||
def canon_url(self):
|
def canon_url(self):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user