diff --git a/brozzler/frontier.py b/brozzler/frontier.py index dd3ceb3..3b3c8f9 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -4,6 +4,7 @@ import random import time import datetime import rethinkdb +import rethinkstuff class UnexpectedDbResult(Exception): pass @@ -91,9 +92,13 @@ class RethinkDbFrontier: # XXX keep track of aggregate priority and prioritize sites accordingly? while True: result = (self.r.table("sites") - .between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed") + .between( + ["ACTIVE",False,rethinkdb.minval], + ["ACTIVE",False,rethinkdb.maxval], + index="sites_last_disclaimed") .order_by(index="sites_last_disclaimed").limit(1) - .update({"claimed":True,"last_claimed_by":worker_id},return_changes=True)).run() + .update({"claimed":True,"last_claimed_by":worker_id}, + return_changes=True)).run() self._vet_result(result, replaced=[0,1], unchanged=[0,1]) if result["replaced"] == 1: site = brozzler.Site(**result["changes"][0]["new_val"]) @@ -108,7 +113,7 @@ class RethinkDbFrontier: def _enforce_time_limit(self, site): if (site.time_limit and site.time_limit > 0 - and time.time() - site.start_time > site.time_limit): + and (rethinkstuff.utcnow() - site.start_time).total_seconds() > site.time_limit): self.logger.debug("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s", site.time_limit, site.start_time, time.time() - site.start_time, site) self.finished(site, "FINISHED_TIME_LIMIT") @@ -177,7 +182,7 @@ class RethinkDbFrontier: self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id) job.status = "FINISHED" - job.finished = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + job.finished = rethinkdb.utcnow() self.update_job(job) return True @@ -185,13 +190,13 @@ class RethinkDbFrontier: self.logger.info("%s %s", site, status) site.status = status self.update_site(site) - if site.job_id: + if site.job_id: self._maybe_finish_job(site.job_id) def disclaim_site(self, site, page=None): self.logger.info("disclaiming %s", site) site.claimed = False - site.last_disclaimed = time.time() # XXX use string or rethinkdb time type? + site.last_disclaimed = rethinkstuff.utcnow() if not page and not self.has_outstanding_pages(site): self.finished(site, "FINISHED") else: diff --git a/brozzler/job.py b/brozzler/job.py index 276f924..ff6858a 100644 --- a/brozzler/job.py +++ b/brozzler/job.py @@ -4,6 +4,7 @@ import yaml import json import datetime import uuid +import rethinkstuff def merge(a, b): if isinstance(a, dict) and isinstance(b, dict): @@ -26,18 +27,18 @@ def new_job_file(frontier, job_conf_file): def new_job(frontier, job_conf): job = Job(id=job_conf.get("id"), conf=job_conf, status="ACTIVE", - started=datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")) + started=rethinkstuff.utcnow()) sites = [] for seed_conf in job_conf["seeds"]: merged_conf = merge(seed_conf, job_conf) # XXX check for unknown settings, invalid url, etc - + extra_headers = None if "warcprox_meta" in merged_conf: warcprox_meta = json.dumps(merged_conf["warcprox_meta"], separators=(',', ':')) extra_headers = {"Warcprox-Meta":warcprox_meta} - + site = brozzler.Site(job_id=job.id, seed=merged_conf["url"], scope=merged_conf.get("scope"), diff --git a/brozzler/site.py b/brozzler/site.py index 5aecc34..af030da 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -1,11 +1,13 @@ -# vim: set sw=4 et: - import surt import json import logging import brozzler import hashlib import time +import rethinkstuff +import datetime + +_EPOCH_UTC = datetime.datetime.utcfromtimestamp(0.0).replace(tzinfo=rethinkstuff.UTC) class Site(brozzler.BaseDictable): logger = logging.getLogger(__module__ + "." + __qualname__) @@ -13,8 +15,8 @@ class Site(brozzler.BaseDictable): def __init__(self, seed, id=None, job_id=None, scope=None, proxy=None, ignore_robots=False, time_limit=None, extra_headers=None, enable_warcprox_features=False, reached_limit=None, status="ACTIVE", - claimed=False, start_time=time.time(), last_disclaimed=0, - last_claimed_by=None): + claimed=False, start_time=rethinkstuff.utcnow(), + last_disclaimed=_EPOCH_UTC, last_claimed_by=None): self.seed = seed self.id = id @@ -28,7 +30,6 @@ class Site(brozzler.BaseDictable): self.status = status self.claimed = bool(claimed) self.last_claimed_by = last_claimed_by - # times as seconds since epoch self.start_time = start_time self.last_disclaimed = last_disclaimed