finally, the jobs table

This commit is contained in:
Noah Levitt 2015-09-03 01:05:03 +00:00
parent 6cda4739b8
commit 3c23aa8fd4
5 changed files with 54 additions and 47 deletions

View File

@ -27,5 +27,5 @@ logging.basicConfig(stream=sys.stdout, level=args.log_level,
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db)
brozzler.job.new_job_file(frontier, job_conf_file)
brozzler.job.new_job_file(frontier, args.job_conf_file)

View File

@ -1,11 +1,5 @@
import json as _json
import logging as _logging
from brozzler.site import Page, Site
from brozzler.worker import BrozzlerWorker
from brozzler.robots import is_permitted_by_robots
from brozzler.frontier import RethinkDbFrontier
from brozzler.browser import Browser, BrowserPool
from brozzler.job import new_job, new_site
def _read_version():
import os
@ -79,5 +73,24 @@ class Rethinker:
except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e:
self.logger.error("will retry rethinkdb query/operation %s which failed like so:", exc_info=True)
class BaseDictable:
def to_dict(self):
d = dict(vars(self))
for k in vars(self):
if k.startswith("_") or d[k] is None:
del d[k]
return d
def to_json(self):
return json.dumps(self.to_dict(), separators=(',', ':'))
def __repr__(self):
return "{}(**{})".format(self.__class__.__name__, self.to_dict())
from brozzler.site import Page, Site
from brozzler.worker import BrozzlerWorker
from brozzler.robots import is_permitted_by_robots
from brozzler.frontier import RethinkDbFrontier
from brozzler.browser import Browser, BrowserPool
from brozzler.job import new_job, new_site, Job
# vim: set sw=4 et:

View File

@ -1,7 +1,3 @@
# vim: set sw=4 et:
__all__ = ["UnexpectedDbResult", "RethinkDbFrontier"]
import logging
import brozzler
import rethinkdb
@ -35,8 +31,9 @@ class RethinkDbFrontier:
self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.db))
self.r.run(r.table_create("pages", shards=self.shards, replicas=self.replicas))
self.r.run(r.table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]]))
# if not "jobs" in tables:
# r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(conn)
if not "jobs" in tables:
self.logger.info("creating rethinkdb table 'jobs' in database %s", repr(self.r.db))
self.r.run(r.table_create("jobs", shards=self.shards, replicas=self.replicas))
def _vet_result(self, result, **kwargs):
self.logger.debug("vetting expected=%s result=%s", kwargs, result)
@ -53,6 +50,14 @@ class RethinkDbFrontier:
if result.get(k) != expected:
raise UnexpectedDbResult("expected {} to be {} in {}".format(repr(k), expected, result))
def new_job(self, job):
self.logger.info("inserting into 'jobs' table %s", repr(job))
result = self.r.run(r.table("jobs").insert(job.to_dict()))
self._vet_result(result, inserted=1)
if not job.id:
# only if "id" has not already been set
job.id = result["generated_keys"][0]
def new_site(self, site):
self.logger.info("inserting into 'sites' table %s", site)
result = self.r.run(r.table("sites").insert(site.to_dict()))

View File

@ -1,9 +1,8 @@
# vim: set sw=4 et:
import logging
import brozzler
import yaml
import json
import datetime
def merge(a, b):
if isinstance(a, dict) and isinstance(b, dict):
@ -19,32 +18,27 @@ def merge(a, b):
return a
def new_job_file(frontier, job_conf_file):
logging.info("loading %s", args.job_conf_file)
with open(args.job_conf_file) as f:
logging.info("loading %s", job_conf_file)
with open(job_conf_file) as f:
job_conf = yaml.load(f)
new_job(frontier, job_conf)
def new_job(frontier, job_conf):
# logging.info("job_conf=%s", job_conf)
seeds = job_conf.pop("seeds")
# logging.info("=== global settings ===\n%s", yaml.dump(job_conf))
job = Job(id=job_conf.get("id"), conf=job_conf, status="ACTIVE", started=datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))
frontier.new_job(job)
sites = []
for seed_conf in seeds:
if "id" in seed_conf:
seed_conf.pop("id")
for seed_conf in job_conf["seeds"]:
merged_conf = merge(seed_conf, job_conf)
# XXX check for unknown settings, invalid url, etc
# logging.info("merge(%s, %s) = %s", seed_conf, global_conf, merged_conf)
# logging.info("=== seed_conf ===\n%s", yaml.dump(seed_conf))
# logging.info("=== merged_conf ===\n%s", yaml.dump(merged_conf))
extra_headers = None
if "warcprox_meta" in merged_conf:
warcprox_meta = json.dumps(merged_conf["warcprox_meta"], separators=(',', ':'))
extra_headers = {"Warcprox-Meta":warcprox_meta}
site = brozzler.Site(seed=merged_conf["url"],
site = brozzler.Site(job_id=job.id,
seed=merged_conf["url"],
scope=merged_conf.get("scope"),
time_limit=merged_conf.get("time_limit"),
proxy=merged_conf.get("proxy"),
@ -52,8 +46,7 @@ def new_job(frontier, job_conf):
enable_warcprox_features=merged_conf.get("enable_warcprox_features"),
extra_headers=extra_headers)
sites.append(site)
# frontier = brozzler.RethinkDbFrontier(args.db.split(","))
for site in sites:
new_site(frontier, site)
@ -70,4 +63,13 @@ def new_site(frontier, site):
site.note_limit_reached(e)
frontier.update_site(site)
class Job(brozzler.BaseDictable):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, id=None, conf=None, status="ACTIVE", started=None, finished=None):
self.id = id
self.conf = conf
self.status = status
self.started = started
self.finished = finished

View File

@ -7,23 +7,10 @@ import brozzler
import hashlib
import time
__all__ = ["Site", "Page"]
class BaseDictable:
def to_dict(self):
d = dict(vars(self))
for k in vars(self):
if k.startswith("_") or d[k] is None:
del d[k]
return d
def to_json(self):
return json.dumps(self.to_dict(), separators=(',', ':'))
class Site(BaseDictable):
class Site(brozzler.BaseDictable):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, seed, id=None, scope=None, proxy=None,
def __init__(self, seed, id=None, job_id=None, scope=None, proxy=None,
ignore_robots=False, time_limit=None, extra_headers=None,
enable_warcprox_features=False, reached_limit=None, status="ACTIVE",
claimed=False, start_time=time.time(), last_disclaimed=0):
@ -93,7 +80,7 @@ class Site(BaseDictable):
self.logger.warn("problem parsing url %s", repr(url))
return False
class Page(BaseDictable):
class Page(brozzler.BaseDictable):
def __init__(self, url, id=None, site_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0):
self.site_id = site_id
self.url = url