restore reclamation of orphaned, claimed sites, and heartbeat site.last_claimed every 7 minutes during youtube-dl processing, to prevent another brozzler-worker claiming the site

This commit is contained in:
Noah Levitt 2017-06-23 13:50:49 -07:00
parent 6bae53e646
commit 405c5725e4
4 changed files with 22 additions and 19 deletions

View File

@ -102,28 +102,21 @@ class RethinkDbFrontier:
["ACTIVE", r.minval], ["ACTIVE", r.maxval],
index="sites_last_disclaimed")
.order_by(index="sites_last_disclaimed")
.filter(r.row["claimed"] != True)
# XXX
# .filter((r.row["claimed"] != True) | (
# r.row["last_claimed"] < r.now() - 60*60))
.filter((r.row["claimed"] != True) | (
r.row["last_claimed"] < r.now() - 60*60))
.limit(1)
.update(
# try to avoid a race condition resulting in multiple
# brozzler-workers claiming the same site
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
r.branch(r.row["claimed"] != True, {
r.branch((r.row["claimed"] != True) | (
r.row["last_claimed"] < r.now() - 60*60), {
"claimed": True, "last_claimed_by": worker_id,
"last_claimed": doublethink.utcnow()}, {}),
# XXX
# r.branch((r.row["claimed"] != True) | (
# r.row["last_claimed"] < r.now() - 60*60), {
# "claimed": True, "last_claimed_by": worker_id,
# "last_claimed": doublethink.utcnow()}, {}),
return_changes=True)).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
if result["replaced"] == 1:
if result["changes"][0]["old_val"]["claimed"]:
# XXX impossible at the moment
self.logger.warn(
"re-claimed site that was still marked 'claimed' "
"because it was last claimed a long time ago "

View File

@ -36,6 +36,7 @@ import tempfile
import urlcanon
from requests.structures import CaseInsensitiveDict
import rethinkdb as r
import datetime
class ExtraHeaderAdder(urllib.request.BaseHandler):
def __init__(self, extra_headers):
@ -156,7 +157,18 @@ class BrozzlerWorker:
else:
return bool(site.proxy or self._warcprox_auto)
def _youtube_dl(self, destdir, site):
def ydl_progress(*args, **kwargs):
# in case youtube-dl takes a long time, heartbeat site.last_claimed
# to prevent another brozzler-worker from claiming the site
if site.rr and doublethink.utcnow() - site.last_claimed > datetime.timedelta(minutes=7):
self.logger.debug(
'heartbeating site.last_claimed to prevent another '
'brozzler-worker claiming this site id=%r', site.id)
site.last_claimed = doublethink.utcnow()
site.save()
ydl_opts = {
"outtmpl": "{}/ydl%(autonumber)s.out".format(destdir),
"verbose": False,
@ -167,6 +179,7 @@ class BrozzlerWorker:
"noprogress": True,
"nopart": True,
"no_color": True,
"progress_hooks": [ydl_progress],
}
if self._proxy_for(site):
ydl_opts["proxy"] = "http://{}".format(self._proxy_for(site))

View File

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup(
name='brozzler',
version='1.1b12.dev258',
version='1.1b12.dev259',
description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler',
author='Noah Levitt',

View File

@ -723,17 +723,14 @@ def test_claim_site():
with pytest.raises(brozzler.NothingToClaim):
claimed_site = frontier.claim_site(worker_id='test_claim_site')
### temporarily changing this behavior
### # site last_claimed more than 1 hour ago can be reclaimed
# site last_claimed more than 1 hour ago can be reclaimed
site = claimed_site
claimed_site = None
site.last_claimed = doublethink.utcnow() - datetime.timedelta(minutes=65)
site.save()
### claimed_site = frontier.claim_site(worker_id='test_claim_site')
### assert claimed_site.id == site.id
with pytest.raises(brozzler.NothingToClaim):
claimed_site = frontier.claim_site(worker_id='test_claim_site')
claimed_site = frontier.claim_site(worker_id='test_claim_site')
assert claimed_site.id == site.id
# clean up
rr.table('sites').get(site.id).delete().run()
rr.table('sites').get(claimed_site.id).delete().run()