detecting job finish seems to be working now

This commit is contained in:
Noah Levitt 2015-09-10 01:38:31 +00:00
parent 92a288bc35
commit dc9d1a4959
3 changed files with 21 additions and 8 deletions

View File

@ -4,6 +4,7 @@ import rethinkdb
r = rethinkdb
import random
import time
import datetime
class UnexpectedDbResult(Exception):
pass
@ -85,12 +86,12 @@ class RethinkDbFrontier:
result = self.r.run(r.table("pages").insert(page.to_dict()))
self._vet_result(result, inserted=1)
def claim_site(self):
def claim_site(self, worker_id):
# XXX keep track of aggregate priority and prioritize sites accordingly?
while True:
result = self.r.run(r.table("sites")
.between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed")
.order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True))
.order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True,"last_claimed_by":worker_id},return_changes=True))
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
if result["replaced"] == 1:
site = brozzler.Site(**result["changes"][0]["new_val"])
@ -113,11 +114,11 @@ class RethinkDbFrontier:
else:
return False
def claim_page(self, site):
def claim_page(self, site, worker_id):
result = self.r.run(r.table("pages")
.between([site.id,0,False,brozzler.MIN_PRIORITY], [site.id,0,False,brozzler.MAX_PRIORITY], index="priority_by_site")
.order_by(index=r.desc("priority_by_site")).limit(1)
.update({"claimed":True},return_changes=True))
.update({"claimed":True,"last_claimed_by":worker_id},return_changes=True))
self.logger.info("query returned %s", result)
self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1:
@ -165,11 +166,14 @@ class RethinkDbFrontier:
return True
results = self.r.run(r.table("sites").get_all(job_id, index="job_id"))
n = 0
for result in results:
site = brozzler.Site(**result)
if not site.status.startswith("FINISH"):
return False
n += 1
self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id)
job.status = "FINISHED"
job.finished = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
self.update_job(job)
@ -187,6 +191,8 @@ class RethinkDbFrontier:
site.last_disclaimed = time.time()
if not page and not self.has_outstanding_pages(site):
self.finished(site, "FINISHED")
else:
self.update_site(site)
if page:
page.claimed = False
self.update_page(page)

View File

@ -13,7 +13,8 @@ class Site(brozzler.BaseDictable):
def __init__(self, seed, id=None, job_id=None, scope=None, proxy=None,
ignore_robots=False, time_limit=None, extra_headers=None,
enable_warcprox_features=False, reached_limit=None, status="ACTIVE",
claimed=False, start_time=time.time(), last_disclaimed=0):
claimed=False, start_time=time.time(), last_disclaimed=0,
last_claimed_by=None):
self.seed = seed
self.id = id
@ -26,6 +27,7 @@ class Site(brozzler.BaseDictable):
self.reached_limit = reached_limit
self.status = status
self.claimed = bool(claimed)
self.last_claimed_by = last_claimed_by
# times as seconds since epoch
self.start_time = start_time
self.last_disclaimed = last_disclaimed
@ -72,13 +74,16 @@ class Site(brozzler.BaseDictable):
return False
class Page(brozzler.BaseDictable):
def __init__(self, url, id=None, site_id=None, job_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0, via_page_id=None):
def __init__(self, url, id=None, site_id=None, job_id=None,
hops_from_seed=0, redirect_url=None, priority=None, claimed=False,
brozzle_count=0, via_page_id=None, last_claimed_by=None):
self.site_id = site_id
self.job_id = job_id
self.url = url
self.hops_from_seed = hops_from_seed
self.redirect_url = redirect_url
self.claimed = bool(claimed)
self.last_claimed_by = last_claimed_by
self.brozzle_count = brozzle_count
self.via_page_id = via_page_id
self._canon_hurl = None

View File

@ -12,6 +12,7 @@ import urllib.request
import json
import PIL.Image
import io
import socket
class BrozzlerWorker:
logger = logging.getLogger(__module__ + "." + __qualname__)
@ -22,6 +23,7 @@ class BrozzlerWorker:
self._browser_pool = brozzler.browser.BrowserPool(max_browsers,
chrome_exe=chrome_exe, ignore_cert_errors=True)
self._shutdown_requested = threading.Event()
self._id = "{}@{}".format(socket.gethostname(), os.getpid())
def _youtube_dl(self, site):
ydl_opts = {
@ -137,7 +139,7 @@ class BrozzlerWorker:
try:
browser.start(proxy=site.proxy)
while not self._shutdown_requested.is_set() and time.time() - start < 60:
page = self._frontier.claim_page(site)
page = self._frontier.claim_page(site, self._id)
outlinks = self.brozzle_page(browser, ydl, site, page)
self._frontier.completed_page(site, page)
self._frontier.scope_and_schedule_outlinks(site, page, outlinks)
@ -163,7 +165,7 @@ class BrozzlerWorker:
try:
browser = self._browser_pool.acquire()
try:
site = self._frontier.claim_site()
site = self._frontier.claim_site(self._id)
self.logger.info("brozzling site %s", site)
ydl = self._youtube_dl(site)
th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site),