diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 7a6f826..86bfe48 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -102,28 +102,21 @@ class RethinkDbFrontier: ["ACTIVE", r.minval], ["ACTIVE", r.maxval], index="sites_last_disclaimed") .order_by(index="sites_last_disclaimed") - .filter(r.row["claimed"] != True) - # XXX - # .filter((r.row["claimed"] != True) | ( - # r.row["last_claimed"] < r.now() - 60*60)) + .filter((r.row["claimed"] != True) | ( + r.row["last_claimed"] < r.now() - 60*60)) .limit(1) .update( # try to avoid a race condition resulting in multiple # brozzler-workers claiming the same site # see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038 - r.branch(r.row["claimed"] != True, { + r.branch((r.row["claimed"] != True) | ( + r.row["last_claimed"] < r.now() - 60*60), { "claimed": True, "last_claimed_by": worker_id, "last_claimed": doublethink.utcnow()}, {}), - # XXX - # r.branch((r.row["claimed"] != True) | ( - # r.row["last_claimed"] < r.now() - 60*60), { - # "claimed": True, "last_claimed_by": worker_id, - # "last_claimed": doublethink.utcnow()}, {}), return_changes=True)).run() self._vet_result(result, replaced=[0,1], unchanged=[0,1]) if result["replaced"] == 1: if result["changes"][0]["old_val"]["claimed"]: - # XXX impossible at the moment self.logger.warn( "re-claimed site that was still marked 'claimed' " "because it was last claimed a long time ago " diff --git a/brozzler/worker.py b/brozzler/worker.py index f7a72db..7a7a246 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -36,6 +36,7 @@ import tempfile import urlcanon from requests.structures import CaseInsensitiveDict import rethinkdb as r +import datetime class ExtraHeaderAdder(urllib.request.BaseHandler): def __init__(self, extra_headers): @@ -156,7 +157,18 @@ class BrozzlerWorker: else: return bool(site.proxy or self._warcprox_auto) + def _youtube_dl(self, destdir, site): + def ydl_progress(*args, **kwargs): + # in case youtube-dl takes a long time, heartbeat site.last_claimed + # to prevent another brozzler-worker from claiming the site + if site.rr and doublethink.utcnow() - site.last_claimed > datetime.timedelta(minutes=7): + self.logger.debug( + 'heartbeating site.last_claimed to prevent another ' + 'brozzler-worker claiming this site id=%r', site.id) + site.last_claimed = doublethink.utcnow() + site.save() + ydl_opts = { "outtmpl": "{}/ydl%(autonumber)s.out".format(destdir), "verbose": False, @@ -167,6 +179,7 @@ class BrozzlerWorker: "noprogress": True, "nopart": True, "no_color": True, + "progress_hooks": [ydl_progress], } if self._proxy_for(site): ydl_opts["proxy"] = "http://{}".format(self._proxy_for(site)) diff --git a/setup.py b/setup.py index 7c3e695..8d2f6f2 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b12.dev258', + version='1.1b12.dev259', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', diff --git a/tests/test_frontier.py b/tests/test_frontier.py index 7c2a328..f4fcec3 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -723,17 +723,14 @@ def test_claim_site(): with pytest.raises(brozzler.NothingToClaim): claimed_site = frontier.claim_site(worker_id='test_claim_site') - ### temporarily changing this behavior - ### # site last_claimed more than 1 hour ago can be reclaimed + # site last_claimed more than 1 hour ago can be reclaimed site = claimed_site claimed_site = None site.last_claimed = doublethink.utcnow() - datetime.timedelta(minutes=65) site.save() - ### claimed_site = frontier.claim_site(worker_id='test_claim_site') - ### assert claimed_site.id == site.id - with pytest.raises(brozzler.NothingToClaim): - claimed_site = frontier.claim_site(worker_id='test_claim_site') + claimed_site = frontier.claim_site(worker_id='test_claim_site') + assert claimed_site.id == site.id # clean up - rr.table('sites').get(site.id).delete().run() + rr.table('sites').get(claimed_site.id).delete().run()