From 92a288bc358f56a3195afc64b1974e8b3238fde6 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 9 Sep 2015 22:11:48 +0000 Subject: [PATCH] detect jobs finishing! (not well tested yet) --- bin/brozzler-worker | 3 +- brozzler/__init__.py | 6 ++-- brozzler/frontier.py | 69 +++++++++++++++++++++++++++++++++++++------- brozzler/job.py | 6 ++-- brozzler/site.py | 18 ++++-------- brozzler/worker.py | 3 +- requirements.txt | 5 ++-- 7 files changed, 77 insertions(+), 33 deletions(-) diff --git a/bin/brozzler-worker b/bin/brozzler-worker index 6df96c2..bd459e1 100755 --- a/bin/brozzler-worker +++ b/bin/brozzler-worker @@ -6,6 +6,7 @@ import os import sys import logging import brozzler +import brozzler.worker import threading import time import signal @@ -52,7 +53,7 @@ signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) frontier = brozzler.RethinkDbFrontier(args.rethinkdb_servers.split(","), args.rethinkdb_db) -worker = brozzler.BrozzlerWorker(frontier, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe) +worker = brozzler.worker.BrozzlerWorker(frontier, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe) worker.start() diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 5a62006..516c8d0 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -41,7 +41,6 @@ class ReachedLimit(Exception): class Rethinker: import logging - logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, servers=["localhost"], db=None): @@ -74,6 +73,7 @@ class Rethinker: return query.run(conn, db=self.db) except (r.ReqlAvailabilityError, r.ReqlTimeoutError) as e: self.logger.error("will retry rethinkdb query/operation %s which failed like so:", exc_info=True) + time.sleep(0.5) class BaseDictable: def to_dict(self): @@ -90,9 +90,9 @@ class BaseDictable: return "{}(**{})".format(self.__class__.__name__, self.to_dict()) from brozzler.site import Page, Site -from brozzler.worker import BrozzlerWorker +# from brozzler.worker import BrozzlerWorker from brozzler.robots import is_permitted_by_robots from brozzler.frontier import RethinkDbFrontier -from brozzler.browser import Browser, BrowserPool +# from brozzler.browser import Browser, BrowserPool from brozzler.job import new_job, new_site, Job diff --git a/brozzler/frontier.py b/brozzler/frontier.py index bde97a8..bd25efe 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -27,6 +27,7 @@ class RethinkDbFrontier: self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.r.db)) self.r.run(r.table_create("sites", shards=self.shards, replicas=self.replicas)) self.r.run(r.table("sites").index_create("sites_last_disclaimed", [r.row["status"], r.row["claimed"], r.row["last_disclaimed"]])) + self.r.run(r.table("sites").index_create("job_id")) if not "pages" in tables: self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.db)) self.r.run(r.table_create("pages", shards=self.shards, replicas=self.replicas)) @@ -64,6 +65,11 @@ class RethinkDbFrontier: self._vet_result(result, inserted=1) site.id = result["generated_keys"][0] + def update_job(self, job): + self.logger.debug("updating 'jobs' table entry %s", job) + result = self.r.run(r.table("jobs").get(job.id).replace(job.to_dict())) + self._vet_result(result, replaced=[0,1], unchanged=[0,1]) + def update_site(self, site): self.logger.debug("updating 'sites' table entry %s", site) result = self.r.run(r.table("sites").get(site.id).replace(site.to_dict())) @@ -100,10 +106,9 @@ class RethinkDbFrontier: def _enforce_time_limit(self, site): if (site.time_limit and site.time_limit > 0 and time.time() - site.start_time > site.time_limit): - self.logger.info("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s", + self.logger.debug("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s", site.time_limit, site.start_time, time.time() - site.start_time, site) - site.status = "FINISHED_TIME_LIMIT" - self.update_site(site) + self.finished(site, "FINISHED_TIME_LIMIT") return True else: return False @@ -113,6 +118,7 @@ class RethinkDbFrontier: .between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site") .order_by(index=r.desc("priority_by_site")).limit(1) .update({"claimed":True},return_changes=True)) + self.logger.info("query returned %s", result) self._vet_result(result, replaced=[0,1]) if result["replaced"] == 1: return brozzler.Page(**result["changes"][0]["new_val"]) @@ -123,8 +129,8 @@ class RethinkDbFrontier: cursor = self.r.run(r.table("pages").between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,True,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1)) return len(list(cursor)) > 0 - def get_page(self, page): - result = self.r.run(r.table("pages").get(page.id)) + def page(self, id): + result = self.r.run(r.table("pages").get(id)) if result: return brozzler.Page(**result) else: @@ -139,14 +145,48 @@ class RethinkDbFrontier: site.note_seed_redirect(page.redirect_url) self.update_site(site) + def active_jobs(self): + results = self.r.run(r.table("jobs").filter({"status":"ACTIVE"})) + for result in results: + yield brozzler.Job(**result) + + def job(self, id): + result = self.r.run(r.table("jobs").get(id)) + if result: + return brozzler.Job(**result) + else: + return None + + def _maybe_finish_job(self, job_id): + """Returns True if job is finished.""" + job = self.job(job_id) + if job.status.startswith("FINISH"): + self.logger.warn("%s is already %s", job, job.status) + return True + + results = self.r.run(r.table("sites").get_all(job_id, index="job_id")) + for result in results: + site = brozzler.Site(**result) + if not site.status.startswith("FINISH"): + return False + + job.status = "FINISHED" + job.finished = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + self.update_job(job) + return True + + def finished(self, site, status): + self.logger.info("%s %s", site, status) + site.status = status + self.update_site(site) + self._maybe_finish_job(site.job_id) + def disclaim_site(self, site, page=None): self.logger.info("disclaiming %s", site) site.claimed = False site.last_disclaimed = time.time() if not page and not self.has_outstanding_pages(site): - self.logger.info("site FINISHED! %s", site) - site.status = "FINISHED" - self.update_site(site) + self.finished(site, "FINISHED") if page: page.claimed = False self.update_page(page) @@ -157,8 +197,8 @@ class RethinkDbFrontier: for url in outlinks: if site.is_in_scope(url, parent_page): if brozzler.is_permitted_by_robots(site, url): - new_child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1, via_page_id=parent_page.id) - existing_child_page = self.get_page(new_child_page) + new_child_page = brozzler.Page(url, site_id=site.id, job_id=site.job_id, hops_from_seed=parent_page.hops_from_seed+1, via_page_id=parent_page.id) + existing_child_page = self.page(new_child_page.id) if existing_child_page: existing_child_page.priority += new_child_page.priority self.update_page(existing_child_page) @@ -174,3 +214,12 @@ class RethinkDbFrontier: self.logger.info("%s new links added, %s existing links updated, %s links rejected, %s links blocked by robots from %s", counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page) + def reached_limit(self, site, e): + self.logger.info("reached_limit site=%s e=%s", site, e) + assert isinstance(e, brozzler.ReachedLimit) + if site.reached_limit and site.reached_limit != e.warcprox_meta["reached-limit"]: + self.logger.warn("reached limit %s but site had already reached limit %s", + e.warcprox_meta["reached-limit"], self.reached_limit) + else: + site.reached_limit = e.warcprox_meta["reached-limit"] + self.finished(site, "FINISHED_REACHED_LIMIT") diff --git a/brozzler/job.py b/brozzler/job.py index 1696f95..759eef5 100644 --- a/brozzler/job.py +++ b/brozzler/job.py @@ -55,13 +55,13 @@ def new_site(frontier, site): frontier.new_site(site) try: if brozzler.is_permitted_by_robots(site, site.seed): - page = brozzler.Page(site.seed, site_id=site.id, hops_from_seed=0, priority=1000) + page = brozzler.Page(site.seed, site_id=site.id, job_id=site.job_id, hops_from_seed=0, priority=1000) frontier.new_page(page) + logging.info("queued page %s", page) else: logging.warn("seed url {} is blocked by robots.txt".format(site.seed)) except brozzler.ReachedLimit as e: - site.note_limit_reached(e) - frontier.update_site(site) + frontier.reached_limit(site, e) class Job(brozzler.BaseDictable): logger = logging.getLogger(__module__ + "." + __qualname__) diff --git a/brozzler/site.py b/brozzler/site.py index 6e156ac..7fbfeb3 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -17,6 +17,7 @@ class Site(brozzler.BaseDictable): self.seed = seed self.id = id + self.job_id = job_id self.proxy = proxy self.ignore_robots = ignore_robots self.enable_warcprox_features = bool(enable_warcprox_features) @@ -53,16 +54,6 @@ class Site(brozzler.BaseDictable): self.logger.info("changing site scope surt from {} to {}".format(self.scope["surt"], new_scope_surt)) self.scope["surt"] = new_scope_surt - def note_limit_reached(self, e): - self.logger.info("reached_limit e=%s", e) - assert isinstance(e, brozzler.ReachedLimit) - if self.reached_limit and self.reached_limit != e.warcprox_meta["reached-limit"]: - self.logger.warn("reached limit %s but site had already reached limit %s", - e.warcprox_meta["reached-limit"], self.reached_limit) - else: - self.reached_limit = e.warcprox_meta["reached-limit"] - self.status = "FINISHED_REACHED_LIMIT" - def is_in_scope(self, url, parent_page=None): if parent_page and "max_hops" in self.scope and parent_page.hops_from_seed >= self.scope["max_hops"]: return False @@ -81,8 +72,9 @@ class Site(brozzler.BaseDictable): return False class Page(brozzler.BaseDictable): - def __init__(self, url, id=None, site_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0, via_page_id=None): + def __init__(self, url, id=None, site_id=None, job_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0, via_page_id=None): self.site_id = site_id + self.job_id = job_id self.url = url self.hops_from_seed = hops_from_seed self.redirect_url = redirect_url @@ -103,8 +95,8 @@ class Page(brozzler.BaseDictable): self.id = hashlib.sha1(digest_this.encode("utf-8")).hexdigest() def __repr__(self): - return """Page(url={},site_id={},hops_from_seed={})""".format( - repr(self.url), self.site_id, self.hops_from_seed) + return """Page(url={},job_id={},site_id={},hops_from_seed={})""".format( + repr(self.url), self.job_id, self.site_id, self.hops_from_seed) def note_redirect(self, url): self.redirect_url = url diff --git a/brozzler/worker.py b/brozzler/worker.py index dcfe109..a774785 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -3,6 +3,7 @@ import os import logging import brozzler +import brozzler.browser import threading import time import signal @@ -144,7 +145,7 @@ class BrozzlerWorker: except brozzler.NothingToClaim: self.logger.info("no pages left for site %s", site) except brozzler.ReachedLimit as e: - site.note_limit_reached(e) + self._frontier.reached_limit(site, e) except brozzler.browser.BrowsingAborted: self.logger.info("{} shut down".format(browser)) except: diff --git a/requirements.txt b/requirements.txt index 3216650..8d07699 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,10 @@ PyYAML -git+https://github.com/nlevitt/surt.git@py3 -# -e /home/nlevitt/workspace/surt +# git+https://github.com/nlevitt/surt.git@py3 +-e /home/nlevitt/workspace/surt git+https://github.com/nlevitt/youtube-dl.git@brozzler git+https://github.com/seomoz/reppy.git # https://github.com/seomoz/reppy/commit/7661606c not in pypi package requests git+https://github.com/nlevitt/websocket-client.git@tweaks rethinkdb pillow +-e .