From dc9d1a4959889304deada399f875d4a9493a9529 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 10 Sep 2015 01:38:31 +0000 Subject: [PATCH] detecting job finish seems to be working now --- brozzler/frontier.py | 14 ++++++++++---- brozzler/site.py | 9 +++++++-- brozzler/worker.py | 6 ++++-- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/brozzler/frontier.py b/brozzler/frontier.py index bd25efe..3908319 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -4,6 +4,7 @@ import rethinkdb r = rethinkdb import random import time +import datetime class UnexpectedDbResult(Exception): pass @@ -85,12 +86,12 @@ class RethinkDbFrontier: result = self.r.run(r.table("pages").insert(page.to_dict())) self._vet_result(result, inserted=1) - def claim_site(self): + def claim_site(self, worker_id): # XXX keep track of aggregate priority and prioritize sites accordingly? while True: result = self.r.run(r.table("sites") .between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed") - .order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True)) + .order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True,"last_claimed_by":worker_id},return_changes=True)) self._vet_result(result, replaced=[0,1], unchanged=[0,1]) if result["replaced"] == 1: site = brozzler.Site(**result["changes"][0]["new_val"]) @@ -113,11 +114,11 @@ class RethinkDbFrontier: else: return False - def claim_page(self, site): + def claim_page(self, site, worker_id): result = self.r.run(r.table("pages") .between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site") .order_by(index=r.desc("priority_by_site")).limit(1) - .update({"claimed":True},return_changes=True)) + .update({"claimed":True,"last_claimed_by":worker_id},return_changes=True)) self.logger.info("query returned %s", result) self._vet_result(result, replaced=[0,1]) if result["replaced"] == 1: @@ -165,11 +166,14 @@ class RethinkDbFrontier: return True results = self.r.run(r.table("sites").get_all(job_id, index="job_id")) + n = 0 for result in results: site = brozzler.Site(**result) if not site.status.startswith("FINISH"): return False + n += 1 + self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id) job.status = "FINISHED" job.finished = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") self.update_job(job) @@ -187,6 +191,8 @@ class RethinkDbFrontier: site.last_disclaimed = time.time() if not page and not self.has_outstanding_pages(site): self.finished(site, "FINISHED") + else: + self.update_site(site) if page: page.claimed = False self.update_page(page) diff --git a/brozzler/site.py b/brozzler/site.py index 7fbfeb3..fc5186a 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -13,7 +13,8 @@ class Site(brozzler.BaseDictable): def __init__(self, seed, id=None, job_id=None, scope=None, proxy=None, ignore_robots=False, time_limit=None, extra_headers=None, enable_warcprox_features=False, reached_limit=None, status="ACTIVE", - claimed=False, start_time=time.time(), last_disclaimed=0): + claimed=False, start_time=time.time(), last_disclaimed=0, + last_claimed_by=None): self.seed = seed self.id = id @@ -26,6 +27,7 @@ class Site(brozzler.BaseDictable): self.reached_limit = reached_limit self.status = status self.claimed = bool(claimed) + self.last_claimed_by = last_claimed_by # times as seconds since epoch self.start_time = start_time self.last_disclaimed = last_disclaimed @@ -72,13 +74,16 @@ class Site(brozzler.BaseDictable): return False class Page(brozzler.BaseDictable): - def __init__(self, url, id=None, site_id=None, job_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0, via_page_id=None): + def __init__(self, url, id=None, site_id=None, job_id=None, + hops_from_seed=0, redirect_url=None, priority=None, claimed=False, + brozzle_count=0, via_page_id=None, last_claimed_by=None): self.site_id = site_id self.job_id = job_id self.url = url self.hops_from_seed = hops_from_seed self.redirect_url = redirect_url self.claimed = bool(claimed) + self.last_claimed_by = last_claimed_by self.brozzle_count = brozzle_count self.via_page_id = via_page_id self._canon_hurl = None diff --git a/brozzler/worker.py b/brozzler/worker.py index a774785..721b0d9 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -12,6 +12,7 @@ import urllib.request import json import PIL.Image import io +import socket class BrozzlerWorker: logger = logging.getLogger(__module__ + "." + __qualname__) @@ -22,6 +23,7 @@ class BrozzlerWorker: self._browser_pool = brozzler.browser.BrowserPool(max_browsers, chrome_exe=chrome_exe, ignore_cert_errors=True) self._shutdown_requested = threading.Event() + self._id = "{}@{}".format(socket.gethostname(), os.getpid()) def _youtube_dl(self, site): ydl_opts = { @@ -137,7 +139,7 @@ class BrozzlerWorker: try: browser.start(proxy=site.proxy) while not self._shutdown_requested.is_set() and time.time() - start < 60: - page = self._frontier.claim_page(site) + page = self._frontier.claim_page(site, self._id) outlinks = self.brozzle_page(browser, ydl, site, page) self._frontier.completed_page(site, page) self._frontier.scope_and_schedule_outlinks(site, page, outlinks) @@ -163,7 +165,7 @@ class BrozzlerWorker: try: browser = self._browser_pool.acquire() try: - site = self._frontier.claim_site() + site = self._frontier.claim_site(self._id) self.logger.info("brozzling site %s", site) ydl = self._youtube_dl(site) th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site),