new rethinkstuff.Rethinker api

This commit is contained in:
Noah Levitt 2015-09-23 00:50:15 +00:00
parent 2863b7e422
commit 2bc66f52d4
4 changed files with 41 additions and 38 deletions

View file

@ -1,5 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python
# vim: set sw=4 et:
import argparse import argparse
import os import os
@ -8,6 +7,7 @@ import logging
import brozzler import brozzler
import yaml import yaml
import json import json
import rethinkstuff
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-new-job - queue new job with brozzler", description="brozzler-new-job - queue new job with brozzler",
@ -26,6 +26,7 @@ args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level, logging.basicConfig(stream=sys.stdout, level=args.log_level,
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db) r = rethinkstuff.Rethinker(args.rethinkdb_servers.split(","), args.rethinkdb_db)
frontier = brozzler.RethinkDbFrontier(r)
brozzler.job.new_job_file(frontier, args.job_conf_file) brozzler.job.new_job_file(frontier, args.job_conf_file)

View file

@ -7,6 +7,7 @@ import sys
import logging import logging
import brozzler import brozzler
import re import re
import rethinkstuff
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-new-site - register site to brozzle", description="brozzler-new-site - register site to brozzle",
@ -45,6 +46,7 @@ site = brozzler.Site(seed=args.seed, proxy=args.proxy,
enable_warcprox_features=args.enable_warcprox_features, enable_warcprox_features=args.enable_warcprox_features,
extra_headers=extra_headers) extra_headers=extra_headers)
frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db) r = rethinkstuff.Rethinker(args.rethinkdb_servers.split(","), args.rethinkdb_db)
frontier = brozzler.RethinkDbFrontier(r)
brozzler.new_site(frontier, site) brozzler.new_site(frontier, site)

View file

@ -12,6 +12,7 @@ import time
import signal import signal
import pprint import pprint
import traceback import traceback
import rethinkstuff
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@ -52,7 +53,8 @@ signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint) signal.signal(signal.SIGINT, sigint)
frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db) r = rethinkstuff.Rethinker(args.rethinkdb_servers.split(","), args.rethinkdb_db)
frontier = brozzler.RethinkDbFrontier(r)
worker = brozzler.worker.BrozzlerWorker(frontier, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe) worker = brozzler.worker.BrozzlerWorker(frontier, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe)
worker.start() worker.start()

View file

@ -1,11 +1,8 @@
import logging import logging
import brozzler import brozzler
import rethinkdb
r = rethinkdb
import random import random
import time import time
import datetime import datetime
import pyrethink
class UnexpectedDbResult(Exception): class UnexpectedDbResult(Exception):
pass pass
@ -13,30 +10,30 @@ class UnexpectedDbResult(Exception):
class RethinkDbFrontier: class RethinkDbFrontier:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, servers=["localhost"], db="brozzler", shards=3, replicas=3): def __init__(self, r, shards=3, replicas=3):
self.r = pyrethink.Rethinker(servers, db) self.r = r
self.shards = shards self.shards = shards
self.replicas = replicas self.replicas = replicas
self._ensure_db() self._ensure_db()
def _ensure_db(self): def _ensure_db(self):
dbs = self.r.run(r.db_list()) dbs = self.r.db_list().run()
if not self.r.db in dbs: if not self.r.dbname in dbs:
self.logger.info("creating rethinkdb database %s", repr(self.r.db)) self.logger.info("creating rethinkdb database %s", repr(self.r.dbname))
self.r.run(r.db_create(self.r.db)) self.r.db_create(self.r.dbname).run()
tables = self.r.run(r.table_list()) tables = self.r.table_list().run()
if not "sites" in tables: if not "sites" in tables:
self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.r.db)) self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.r.dbname))
self.r.run(r.table_create("sites", shards=self.shards, replicas=self.replicas)) self.r.table_create("sites", shards=self.shards, replicas=self.replicas).run()
self.r.run(r.table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]])) self.r.table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]]).run()
self.r.run(r.table("sites").index_create("job_id")) self.r.table("sites").index_create("job_id").run()
if not "pages" in tables: if not "pages" in tables:
self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.db)) self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.dbname))
self.r.run(r.table_create("pages", shards=self.shards, replicas=self.replicas)) self.r.table_create("pages", shards=self.shards, replicas=self.replicas).run()
self.r.run(r.table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]])) self.r.table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]]).run()
if not "jobs" in tables: if not "jobs" in tables:
self.logger.info("creating rethinkdb table 'jobs' in database %s", repr(self.r.db)) self.logger.info("creating rethinkdb table 'jobs' in database %s", repr(self.r.dbname))
self.r.run(r.table_create("jobs", shards=self.shards, replicas=self.replicas)) self.r.table_create("jobs", shards=self.shards, replicas=self.replicas).run()
def _vet_result(self, result, **kwargs): def _vet_result(self, result, **kwargs):
self.logger.debug("vetting expected=%s result=%s", kwargs, result) self.logger.debug("vetting expected=%s result=%s", kwargs, result)
@ -55,7 +52,7 @@ class RethinkDbFrontier:
def new_job(self, job): def new_job(self, job):
self.logger.info("inserting into 'jobs' table %s", repr(job)) self.logger.info("inserting into 'jobs' table %s", repr(job))
result = self.r.run(r.table("jobs").insert(job.to_dict())) result = self.r.table("jobs").insert(job.to_dict()).run()
self._vet_result(result, inserted=1) self._vet_result(result, inserted=1)
if not job.id: if not job.id:
# only if "id" has not already been set # only if "id" has not already been set
@ -63,36 +60,37 @@ class RethinkDbFrontier:
def new_site(self, site): def new_site(self, site):
self.logger.info("inserting into 'sites' table %s", site) self.logger.info("inserting into 'sites' table %s", site)
result = self.r.run(r.table("sites").insert(site.to_dict())) result = self.r.table("sites").insert(site.to_dict()).run()
self._vet_result(result, inserted=1) self._vet_result(result, inserted=1)
site.id = result["generated_keys"][0] site.id = result["generated_keys"][0]
def update_job(self, job): def update_job(self, job):
self.logger.debug("updating 'jobs' table entry %s", job) self.logger.debug("updating 'jobs' table entry %s", job)
result = self.r.run(r.table("jobs").get(job.id).replace(job.to_dict())) result = self.r.table("jobs").get(job.id).replace(job.to_dict()).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1]) self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def update_site(self, site): def update_site(self, site):
self.logger.debug("updating 'sites' table entry %s", site) self.logger.debug("updating 'sites' table entry %s", site)
result = self.r.run(r.table("sites").get(site.id).replace(site.to_dict())) result = self.r.table("sites").get(site.id).replace(site.to_dict()).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1]) self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def update_page(self, page): def update_page(self, page):
self.logger.debug("updating 'pages' table entry %s", page) self.logger.debug("updating 'pages' table entry %s", page)
result = self.r.run(r.table("pages").get(page.id).replace(page.to_dict())) result = self.r.table("pages").get(page.id).replace(page.to_dict()).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1]) self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def new_page(self, page): def new_page(self, page):
self.logger.debug("inserting into 'pages' table %s", page) self.logger.debug("inserting into 'pages' table %s", page)
result = self.r.run(r.table("pages").insert(page.to_dict())) result = self.r.table("pages").insert(page.to_dict()).run()
self._vet_result(result, inserted=1) self._vet_result(result, inserted=1)
def claim_site(self, worker_id): def claim_site(self, worker_id):
# XXX keep track of aggregate priority and prioritize sites accordingly? # XXX keep track of aggregate priority and prioritize sites accordingly?
while True: while True:
result = self.r.run(r.table("sites") result = (self.r.table("sites")
.between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed") .between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed")
.order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True,"last_claimed_by":worker_id},return_changes=True)) .order_by(index="sites_last_disclaimed").limit(1)
.update({"claimed":True,"last_claimed_by":worker_id},return_changes=True)).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1]) self._vet_result(result, replaced=[0,1], unchanged=[0,1])
if result["replaced"] == 1: if result["replaced"] == 1:
site = brozzler.Site(**result["changes"][0]["new_val"]) site = brozzler.Site(**result["changes"][0]["new_val"])
@ -116,10 +114,10 @@ class RethinkDbFrontier:
return False return False
def claim_page(self, site, worker_id): def claim_page(self, site, worker_id):
result = self.r.run(r.table("pages") result = (self.r.table("pages")
.between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site") .between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site")
.order_by(index=r.desc("priority_by_site")).limit(1) .order_by(index=r.desc("priority_by_site")).limit(1)
.update({"claimed":True,"last_claimed_by":worker_id},return_changes=True)) .update({"claimed":True,"last_claimed_by":worker_id},return_changes=True)).run()
self.logger.info("query returned %s", result) self.logger.info("query returned %s", result)
self._vet_result(result, replaced=[0,1]) self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1: if result["replaced"] == 1:
@ -128,11 +126,11 @@ class RethinkDbFrontier:
raise brozzler.NothingToClaim raise brozzler.NothingToClaim
def has_outstanding_pages(self, site): def has_outstanding_pages(self, site):
results_iter = self.r.results_iter(r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1)) results_iter = self.r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1).run()
return len(list(results_iter)) > 0 return len(list(results_iter)) > 0
def page(self, id): def page(self, id):
result = self.r.run(r.table("pages").get(id)) result = self.r.table("pages").get(id).run()
if result: if result:
return brozzler.Page(**result) return brozzler.Page(**result)
else: else:
@ -148,12 +146,12 @@ class RethinkDbFrontier:
self.update_site(site) self.update_site(site)
def active_jobs(self): def active_jobs(self):
results = self.r.results_iter(r.table("jobs").filter({"status":"ACTIVE"})) results = self.r.table("jobs").filter({"status":"ACTIVE"}).run()
for result in results: for result in results:
yield brozzler.Job(**result) yield brozzler.Job(**result)
def job(self, id): def job(self, id):
result = self.r.run(r.table("jobs").get(id)) result = self.r.table("jobs").get(id).run()
if result: if result:
return brozzler.Job(**result) return brozzler.Job(**result)
else: else:
@ -166,7 +164,7 @@ class RethinkDbFrontier:
self.logger.warn("%s is already %s", job, job.status) self.logger.warn("%s is already %s", job, job.status)
return True return True
results = self.r.results_iter(r.table("sites").get_all(job_id, index="job_id")) results = self.r.table("sites").get_all(job_id, index="job_id").run()
n = 0 n = 0
for result in results: for result in results:
site = brozzler.Site(**result) site = brozzler.Site(**result)