From 3c23aa8fd491804891f36d1c0d160e62e35461ce Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 3 Sep 2015 01:05:03 +0000 Subject: [PATCH] finally, the jobs table --- bin/brozzler-new-job | 2 +- brozzler/__init__.py | 27 ++++++++++++++++++++------- brozzler/frontier.py | 17 +++++++++++------ brozzler/job.py | 36 +++++++++++++++++++----------------- brozzler/site.py | 19 +++---------------- 5 files changed, 54 insertions(+), 47 deletions(-) diff --git a/bin/brozzler-new-job b/bin/brozzler-new-job index 34145d0..729315f 100755 --- a/bin/brozzler-new-job +++ b/bin/brozzler-new-job @@ -27,5 +27,5 @@ logging.basicConfig(stream=sys.stdout, level=args.log_level, format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db) -brozzler.job.new_job_file(frontier, job_conf_file) +brozzler.job.new_job_file(frontier, args.job_conf_file) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 8e20a88..24230b5 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -1,11 +1,5 @@ import json as _json import logging as _logging -from brozzler.site import Page, Site -from brozzler.worker import BrozzlerWorker -from brozzler.robots import is_permitted_by_robots -from brozzler.frontier import RethinkDbFrontier -from brozzler.browser import Browser, BrowserPool -from brozzler.job import new_job, new_site def _read_version(): import os @@ -79,5 +73,24 @@ class Rethinker: except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e: self.logger.error("will retry rethinkdb query/operation %s which failed like so:", exc_info=True) +class BaseDictable: + def to_dict(self): + d = dict(vars(self)) + for k in vars(self): + if k.startswith("_") or d[k] is None: + del d[k] + return d + + def to_json(self): + return json.dumps(self.to_dict(), separators=(',', ':')) + + def __repr__(self): + return "{}(**{})".format(self.__class__.__name__, self.to_dict()) + +from brozzler.site import Page, Site +from brozzler.worker import BrozzlerWorker +from brozzler.robots import is_permitted_by_robots +from brozzler.frontier import RethinkDbFrontier +from brozzler.browser import Browser, BrowserPool +from brozzler.job import new_job, new_site, Job -# vim: set sw=4 et: diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 95ff7e8..85ed971 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -1,7 +1,3 @@ -# vim: set sw=4 et: - -__all__ = ["UnexpectedDbResult", "RethinkDbFrontier"] - import logging import brozzler import rethinkdb @@ -35,8 +31,9 @@ class RethinkDbFrontier: self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.db)) self.r.run(r.table_create("pages", shards=self.shards, replicas=self.replicas)) self.r.run(r.table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["brozzle_count"], r.row["claimed"], r.row["priority"]])) - # if not "jobs" in tables: - # r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(conn) + if not "jobs" in tables: + self.logger.info("creating rethinkdb table 'jobs' in database %s", repr(self.r.db)) + self.r.run(r.table_create("jobs", shards=self.shards, replicas=self.replicas)) def _vet_result(self, result, **kwargs): self.logger.debug("vetting expected=%s result=%s", kwargs, result) @@ -53,6 +50,14 @@ class RethinkDbFrontier: if result.get(k) != expected: raise UnexpectedDbResult("expected {} to be {} in {}".format(repr(k), expected, result)) + def new_job(self, job): + self.logger.info("inserting into 'jobs' table %s", repr(job)) + result = self.r.run(r.table("jobs").insert(job.to_dict())) + self._vet_result(result, inserted=1) + if not job.id: + # only if "id" has not already been set + job.id = result["generated_keys"][0] + def new_site(self, site): self.logger.info("inserting into 'sites' table %s", site) result = self.r.run(r.table("sites").insert(site.to_dict())) diff --git a/brozzler/job.py b/brozzler/job.py index ca70374..1696f95 100644 --- a/brozzler/job.py +++ b/brozzler/job.py @@ -1,9 +1,8 @@ -# vim: set sw=4 et: - import logging import brozzler import yaml import json +import datetime def merge(a, b): if isinstance(a, dict) and isinstance(b, dict): @@ -19,32 +18,27 @@ def merge(a, b): return a def new_job_file(frontier, job_conf_file): - logging.info("loading %s", args.job_conf_file) - with open(args.job_conf_file) as f: + logging.info("loading %s", job_conf_file) + with open(job_conf_file) as f: job_conf = yaml.load(f) new_job(frontier, job_conf) def new_job(frontier, job_conf): - # logging.info("job_conf=%s", job_conf) - seeds = job_conf.pop("seeds") - # logging.info("=== global settings ===\n%s", yaml.dump(job_conf)) - + job = Job(id=job_conf.get("id"), conf=job_conf, status="ACTIVE", started=datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")) + frontier.new_job(job) + sites = [] - for seed_conf in seeds: - if "id" in seed_conf: - seed_conf.pop("id") + for seed_conf in job_conf["seeds"]: merged_conf = merge(seed_conf, job_conf) # XXX check for unknown settings, invalid url, etc - # logging.info("merge(%s, %s) = %s", seed_conf, global_conf, merged_conf) - # logging.info("=== seed_conf ===\n%s", yaml.dump(seed_conf)) - # logging.info("=== merged_conf ===\n%s", yaml.dump(merged_conf)) extra_headers = None if "warcprox_meta" in merged_conf: warcprox_meta = json.dumps(merged_conf["warcprox_meta"], separators=(',', ':')) extra_headers = {"Warcprox-Meta":warcprox_meta} - site = brozzler.Site(seed=merged_conf["url"], + site = brozzler.Site(job_id=job.id, + seed=merged_conf["url"], scope=merged_conf.get("scope"), time_limit=merged_conf.get("time_limit"), proxy=merged_conf.get("proxy"), @@ -52,8 +46,7 @@ def new_job(frontier, job_conf): enable_warcprox_features=merged_conf.get("enable_warcprox_features"), extra_headers=extra_headers) sites.append(site) - - # frontier = brozzler.RethinkDbFrontier(args.db.split(",")) + for site in sites: new_site(frontier, site) @@ -70,4 +63,13 @@ def new_site(frontier, site): site.note_limit_reached(e) frontier.update_site(site) +class Job(brozzler.BaseDictable): + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, id=None, conf=None, status="ACTIVE", started=None, finished=None): + self.id = id + self.conf = conf + self.status = status + self.started = started + self.finished = finished diff --git a/brozzler/site.py b/brozzler/site.py index b76bc5a..2948709 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -7,23 +7,10 @@ import brozzler import hashlib import time -__all__ = ["Site", "Page"] - -class BaseDictable: - def to_dict(self): - d = dict(vars(self)) - for k in vars(self): - if k.startswith("_") or d[k] is None: - del d[k] - return d - - def to_json(self): - return json.dumps(self.to_dict(), separators=(',', ':')) - -class Site(BaseDictable): +class Site(brozzler.BaseDictable): logger = logging.getLogger(__module__ + "." + __qualname__) - def __init__(self, seed, id=None, scope=None, proxy=None, + def __init__(self, seed, id=None, job_id=None, scope=None, proxy=None, ignore_robots=False, time_limit=None, extra_headers=None, enable_warcprox_features=False, reached_limit=None, status="ACTIVE", claimed=False, start_time=time.time(), last_disclaimed=0): @@ -93,7 +80,7 @@ class Site(BaseDictable): self.logger.warn("problem parsing url %s", repr(url)) return False -class Page(BaseDictable): +class Page(brozzler.BaseDictable): def __init__(self, url, id=None, site_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0): self.site_id = site_id self.url = url