detect jobs finishing! (not well tested yet)

This commit is contained in:
Noah Levitt 2015-09-09 22:11:48 +00:00
parent 72e72e03c4
commit 92a288bc35
7 changed files with 77 additions and 33 deletions

View File

@ -6,6 +6,7 @@ import os
import sys
import logging
import brozzler
import brozzler.worker
import threading
import time
import signal
@ -52,7 +53,7 @@ signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint)
frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db)
worker = brozzler.BrozzlerWorker(frontier, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe)
worker = brozzler.worker.BrozzlerWorker(frontier, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe)
worker.start()

View File

@ -41,7 +41,6 @@ class ReachedLimit(Exception):
class Rethinker:
import logging
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, servers=["localhost"], db=None):
@ -74,6 +73,7 @@ class Rethinker:
return query.run(conn, db=self.db)
except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e:
self.logger.error("will retry rethinkdb query/operation %s which failed like so:", exc_info=True)
time.sleep(0.5)
class BaseDictable:
def to_dict(self):
@ -90,9 +90,9 @@ class BaseDictable:
return "{}(**{})".format(self.__class__.__name__, self.to_dict())
from brozzler.site import Page, Site
from brozzler.worker import BrozzlerWorker
# from brozzler.worker import BrozzlerWorker
from brozzler.robots import is_permitted_by_robots
from brozzler.frontier import RethinkDbFrontier
from brozzler.browser import Browser, BrowserPool
# from brozzler.browser import Browser, BrowserPool
from brozzler.job import new_job, new_site, Job

View File

@ -27,6 +27,7 @@ class RethinkDbFrontier:
self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.r.db))
self.r.run(r.table_create("sites", shards=self.shards, replicas=self.replicas))
self.r.run(r.table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]]))
self.r.run(r.table("sites").index_create("job_id"))
if not "pages" in tables:
self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.db))
self.r.run(r.table_create("pages", shards=self.shards, replicas=self.replicas))
@ -64,6 +65,11 @@ class RethinkDbFrontier:
self._vet_result(result, inserted=1)
site.id = result["generated_keys"][0]
def update_job(self, job):
self.logger.debug("updating 'jobs' table entry %s", job)
result = self.r.run(r.table("jobs").get(job.id).replace(job.to_dict()))
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def update_site(self, site):
self.logger.debug("updating 'sites' table entry %s", site)
result = self.r.run(r.table("sites").get(site.id).replace(site.to_dict()))
@ -100,10 +106,9 @@ class RethinkDbFrontier:
def _enforce_time_limit(self, site):
if (site.time_limit and site.time_limit > 0
and time.time() - site.start_time > site.time_limit):
self.logger.info("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s",
self.logger.debug("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s",
site.time_limit, site.start_time, time.time() - site.start_time, site)
site.status = "FINISHED_TIME_LIMIT"
self.update_site(site)
self.finished(site, "FINISHED_TIME_LIMIT")
return True
else:
return False
@ -113,6 +118,7 @@ class RethinkDbFrontier:
.between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site")
.order_by(index=r.desc("priority_by_site")).limit(1)
.update({"claimed":True},return_changes=True))
self.logger.info("query returned %s", result)
self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1:
return brozzler.Page(**result["changes"][0]["new_val"])
@ -123,8 +129,8 @@ class RethinkDbFrontier:
cursor = self.r.run(r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1))
return len(list(cursor)) > 0
def get_page(self, page):
result = self.r.run(r.table("pages").get(page.id))
def page(self, id):
result = self.r.run(r.table("pages").get(id))
if result:
return brozzler.Page(**result)
else:
@ -139,14 +145,48 @@ class RethinkDbFrontier:
site.note_seed_redirect(page.redirect_url)
self.update_site(site)
def active_jobs(self):
results = self.r.run(r.table("jobs").filter({"status":"ACTIVE"}))
for result in results:
yield brozzler.Job(**result)
def job(self, id):
result = self.r.run(r.table("jobs").get(id))
if result:
return brozzler.Job(**result)
else:
return None
def _maybe_finish_job(self, job_id):
"""Returns True if job is finished."""
job = self.job(job_id)
if job.status.startswith("FINISH"):
self.logger.warn("%s is already %s", job, job.status)
return True
results = self.r.run(r.table("sites").get_all(job_id, index="job_id"))
for result in results:
site = brozzler.Site(**result)
if not site.status.startswith("FINISH"):
return False
job.status = "FINISHED"
job.finished = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
self.update_job(job)
return True
def finished(self, site, status):
self.logger.info("%s %s", site, status)
site.status = status
self.update_site(site)
self._maybe_finish_job(site.job_id)
def disclaim_site(self, site, page=None):
self.logger.info("disclaiming %s", site)
site.claimed = False
site.last_disclaimed = time.time()
if not page and not self.has_outstanding_pages(site):
self.logger.info("site FINISHED! %s", site)
site.status = "FINISHED"
self.update_site(site)
self.finished(site, "FINISHED")
if page:
page.claimed = False
self.update_page(page)
@ -157,8 +197,8 @@ class RethinkDbFrontier:
for url in outlinks:
if site.is_in_scope(url, parent_page):
if brozzler.is_permitted_by_robots(site, url):
new_child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1, via_page_id=parent_page.id)
existing_child_page = self.get_page(new_child_page)
new_child_page = brozzler.Page(url, site_id=site.id, job_id=site.job_id, hops_from_seed=parent_page.hops_from_seed+1, via_page_id=parent_page.id)
existing_child_page = self.page(new_child_page.id)
if existing_child_page:
existing_child_page.priority += new_child_page.priority
self.update_page(existing_child_page)
@ -174,3 +214,12 @@ class RethinkDbFrontier:
self.logger.info("%s new links added, %s existing links updated, %s links rejected, %s links blocked by robots from %s",
counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page)
def reached_limit(self, site, e):
self.logger.info("reached_limit site=%s e=%s", site, e)
assert isinstance(e, brozzler.ReachedLimit)
if site.reached_limit and site.reached_limit != e.warcprox_meta["reached-limit"]:
self.logger.warn("reached limit %s but site had already reached limit %s",
e.warcprox_meta["reached-limit"], self.reached_limit)
else:
site.reached_limit = e.warcprox_meta["reached-limit"]
self.finished(site, "FINISHED_REACHED_LIMIT")

View File

@ -55,13 +55,13 @@ def new_site(frontier, site):
frontier.new_site(site)
try:
if brozzler.is_permitted_by_robots(site, site.seed):
page = brozzler.Page(site.seed, site_id=site.id, hops_from_seed=0, priority=1000)
page = brozzler.Page(site.seed, site_id=site.id, job_id=site.job_id, hops_from_seed=0, priority=1000)
frontier.new_page(page)
logging.info("queued page %s", page)
else:
logging.warn("seed url {} is blocked by robots.txt".format(site.seed))
except brozzler.ReachedLimit as e:
site.note_limit_reached(e)
frontier.update_site(site)
frontier.reached_limit(site, e)
class Job(brozzler.BaseDictable):
logger = logging.getLogger(__module__ + "." + __qualname__)

View File

@ -17,6 +17,7 @@ class Site(brozzler.BaseDictable):
self.seed = seed
self.id = id
self.job_id = job_id
self.proxy = proxy
self.ignore_robots = ignore_robots
self.enable_warcprox_features = bool(enable_warcprox_features)
@ -53,16 +54,6 @@ class Site(brozzler.BaseDictable):
self.logger.info("changing site scope surt from {} to {}".format(self.scope["surt"], new_scope_surt))
self.scope["surt"] = new_scope_surt
def note_limit_reached(self, e):
self.logger.info("reached_limit e=%s", e)
assert isinstance(e, brozzler.ReachedLimit)
if self.reached_limit and self.reached_limit != e.warcprox_meta["reached-limit"]:
self.logger.warn("reached limit %s but site had already reached limit %s",
e.warcprox_meta["reached-limit"], self.reached_limit)
else:
self.reached_limit = e.warcprox_meta["reached-limit"]
self.status = "FINISHED_REACHED_LIMIT"
def is_in_scope(self, url, parent_page=None):
if parent_page and "max_hops" in self.scope and parent_page.hops_from_seed >= self.scope["max_hops"]:
return False
@ -81,8 +72,9 @@ class Site(brozzler.BaseDictable):
return False
class Page(brozzler.BaseDictable):
def __init__(self, url, id=None, site_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0, via_page_id=None):
def __init__(self, url, id=None, site_id=None, job_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0, via_page_id=None):
self.site_id = site_id
self.job_id = job_id
self.url = url
self.hops_from_seed = hops_from_seed
self.redirect_url = redirect_url
@ -103,8 +95,8 @@ class Page(brozzler.BaseDictable):
self.id = hashlib.sha1(digest_this.encode("utf-8")).hexdigest()
def __repr__(self):
return """Page(url={},site_id={},hops_from_seed={})""".format(
repr(self.url), self.site_id, self.hops_from_seed)
return """Page(url={},job_id={},site_id={},hops_from_seed={})""".format(
repr(self.url), self.job_id, self.site_id, self.hops_from_seed)
def note_redirect(self, url):
self.redirect_url = url

View File

@ -3,6 +3,7 @@
import os
import logging
import brozzler
import brozzler.browser
import threading
import time
import signal
@ -144,7 +145,7 @@ class BrozzlerWorker:
except brozzler.NothingToClaim:
self.logger.info("no pages left for site %s", site)
except brozzler.ReachedLimit as e:
site.note_limit_reached(e)
self._frontier.reached_limit(site, e)
except brozzler.browser.BrowsingAborted:
self.logger.info("{} shut down".format(browser))
except:

View File

@ -1,9 +1,10 @@
PyYAML
git+https://github.com/nlevitt/surt.git@py3
# -e /home/nlevitt/workspace/surt
# git+https://github.com/nlevitt/surt.git@py3
-e /home/nlevitt/workspace/surt
git+https://github.com/nlevitt/youtube-dl.git@brozzler
git+https://github.com/seomoz/reppy.git # https://github.com/seomoz/reppy/commit/7661606c not in pypi package
requests
git+https://github.com/nlevitt/websocket-client.git@tweaks
rethinkdb
pillow
-e .