use rethinkdb native time type for date/time values

This commit is contained in:
Noah Levitt 2015-11-18 02:07:27 +00:00
parent ca0053e3be
commit 36e2bb2729
3 changed files with 21 additions and 14 deletions

View file

@ -4,6 +4,7 @@ import random
import time import time
import datetime import datetime
import rethinkdb import rethinkdb
import rethinkstuff
class UnexpectedDbResult(Exception): class UnexpectedDbResult(Exception):
pass pass
@ -91,9 +92,13 @@ class RethinkDbFrontier:
# XXX keep track of aggregate priority and prioritize sites accordingly? # XXX keep track of aggregate priority and prioritize sites accordingly?
while True: while True:
result = (self.r.table("sites") result = (self.r.table("sites")
.between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed") .between(
["ACTIVE",False,rethinkdb.minval],
["ACTIVE",False,rethinkdb.maxval],
index="sites_last_disclaimed")
.order_by(index="sites_last_disclaimed").limit(1) .order_by(index="sites_last_disclaimed").limit(1)
.update({"claimed":True,"last_claimed_by":worker_id},return_changes=True)).run() .update({"claimed":True,"last_claimed_by":worker_id},
return_changes=True)).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1]) self._vet_result(result, replaced=[0,1], unchanged=[0,1])
if result["replaced"] == 1: if result["replaced"] == 1:
site = brozzler.Site(**result["changes"][0]["new_val"]) site = brozzler.Site(**result["changes"][0]["new_val"])
@ -108,7 +113,7 @@ class RethinkDbFrontier:
def _enforce_time_limit(self, site): def _enforce_time_limit(self, site):
if (site.time_limit and site.time_limit > 0 if (site.time_limit and site.time_limit > 0
and time.time() - site.start_time > site.time_limit): and (rethinkstuff.utcnow() - site.start_time).total_seconds() > site.time_limit):
self.logger.debug("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s", self.logger.debug("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s",
site.time_limit, site.start_time, time.time() - site.start_time, site) site.time_limit, site.start_time, time.time() - site.start_time, site)
self.finished(site, "FINISHED_TIME_LIMIT") self.finished(site, "FINISHED_TIME_LIMIT")
@ -177,7 +182,7 @@ class RethinkDbFrontier:
self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id) self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id)
job.status = "FINISHED" job.status = "FINISHED"
job.finished = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") job.finished = rethinkdb.utcnow()
self.update_job(job) self.update_job(job)
return True return True
@ -185,13 +190,13 @@ class RethinkDbFrontier:
self.logger.info("%s %s", site, status) self.logger.info("%s %s", site, status)
site.status = status site.status = status
self.update_site(site) self.update_site(site)
if site.job_id: if site.job_id:
self._maybe_finish_job(site.job_id) self._maybe_finish_job(site.job_id)
def disclaim_site(self, site, page=None): def disclaim_site(self, site, page=None):
self.logger.info("disclaiming %s", site) self.logger.info("disclaiming %s", site)
site.claimed = False site.claimed = False
site.last_disclaimed = time.time() # XXX use string or rethinkdb time type? site.last_disclaimed = rethinkstuff.utcnow()
if not page and not self.has_outstanding_pages(site): if not page and not self.has_outstanding_pages(site):
self.finished(site, "FINISHED") self.finished(site, "FINISHED")
else: else:

View file

@ -4,6 +4,7 @@ import yaml
import json import json
import datetime import datetime
import uuid import uuid
import rethinkstuff
def merge(a, b): def merge(a, b):
if isinstance(a, dict) and isinstance(b, dict): if isinstance(a, dict) and isinstance(b, dict):
@ -26,18 +27,18 @@ def new_job_file(frontier, job_conf_file):
def new_job(frontier, job_conf): def new_job(frontier, job_conf):
job = Job(id=job_conf.get("id"), conf=job_conf, status="ACTIVE", job = Job(id=job_conf.get("id"), conf=job_conf, status="ACTIVE",
started=datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")) started=rethinkstuff.utcnow())
sites = [] sites = []
for seed_conf in job_conf["seeds"]: for seed_conf in job_conf["seeds"]:
merged_conf = merge(seed_conf, job_conf) merged_conf = merge(seed_conf, job_conf)
# XXX check for unknown settings, invalid url, etc # XXX check for unknown settings, invalid url, etc
extra_headers = None extra_headers = None
if "warcprox_meta" in merged_conf: if "warcprox_meta" in merged_conf:
warcprox_meta = json.dumps(merged_conf["warcprox_meta"], separators=(',', ':')) warcprox_meta = json.dumps(merged_conf["warcprox_meta"], separators=(',', ':'))
extra_headers = {"Warcprox-Meta":warcprox_meta} extra_headers = {"Warcprox-Meta":warcprox_meta}
site = brozzler.Site(job_id=job.id, site = brozzler.Site(job_id=job.id,
seed=merged_conf["url"], seed=merged_conf["url"],
scope=merged_conf.get("scope"), scope=merged_conf.get("scope"),

View file

@ -1,11 +1,13 @@
# vim: set sw=4 et:
import surt import surt
import json import json
import logging import logging
import brozzler import brozzler
import hashlib import hashlib
import time import time
import rethinkstuff
import datetime
_EPOCH_UTC = datetime.datetime.utcfromtimestamp(0.0).replace(tzinfo=rethinkstuff.UTC)
class Site(brozzler.BaseDictable): class Site(brozzler.BaseDictable):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
@ -13,8 +15,8 @@ class Site(brozzler.BaseDictable):
def __init__(self, seed, id=None, job_id=None, scope=None, proxy=None, def __init__(self, seed, id=None, job_id=None, scope=None, proxy=None,
ignore_robots=False, time_limit=None, extra_headers=None, ignore_robots=False, time_limit=None, extra_headers=None,
enable_warcprox_features=False, reached_limit=None, status="ACTIVE", enable_warcprox_features=False, reached_limit=None, status="ACTIVE",
claimed=False, start_time=time.time(), last_disclaimed=0, claimed=False, start_time=rethinkstuff.utcnow(),
last_claimed_by=None): last_disclaimed=_EPOCH_UTC, last_claimed_by=None):
self.seed = seed self.seed = seed
self.id = id self.id = id
@ -28,7 +30,6 @@ class Site(brozzler.BaseDictable):
self.status = status self.status = status
self.claimed = bool(claimed) self.claimed = bool(claimed)
self.last_claimed_by = last_claimed_by self.last_claimed_by = last_claimed_by
# times as seconds since epoch
self.start_time = start_time self.start_time = start_time
self.last_disclaimed = last_disclaimed self.last_disclaimed = last_disclaimed