Merge branch 'master' into qa

* master:
  enforce time limits based on time claimed by worker actively brozzling, to avoid problem of stopping crawls that haven't had much chance to crawl, because of cluster busy-ness
  minimally update test_time_limit for new time accounting
  make sure youtube-dl progress thing can't derail youtube-dl operation
This commit is contained in:
Noah Levitt 2017-06-26 18:00:41 -07:00
commit b132c9c956
5 changed files with 36 additions and 11 deletions

View File

@ -135,10 +135,11 @@ class RethinkDbFrontier:
def _enforce_time_limit(self, site):
if (site.time_limit and site.time_limit > 0
and site.elapsed() > site.time_limit):
and (site.active_brozzling_time or 0) > site.time_limit):
self.logger.debug(
"site FINISHED_TIME_LIMIT! time_limit=%s elapsed=%s %s",
site.time_limit, site.elapsed(), site)
"site FINISHED_TIME_LIMIT! time_limit=%s "
"active_brozzling_time=%s %s", site.time_limit,
site.active_brozzling_time, site)
self.finished(site, "FINISHED_TIME_LIMIT")
return True
else:

View File

@ -119,7 +119,15 @@ def new_site(frontier, site):
class ElapsedMixIn(object):
def elapsed(self):
'''Returns elapsed crawl time as a float in seconds.'''
'''
Returns elapsed crawl time as a float in seconds.
This metric includes all the time that a site was in active rotation,
including any time it spent waiting for its turn to be brozzled.
In contrast `Site.active_brozzling_time` only counts time when a
brozzler worker claimed the site and was actively brozzling it.
'''
dt = 0
for ss in self.starts_and_stops[:-1]:
dt += (ss['stop'] - ss['start']).total_seconds()

View File

@ -162,12 +162,17 @@ class BrozzlerWorker:
def ydl_progress(*args, **kwargs):
# in case youtube-dl takes a long time, heartbeat site.last_claimed
# to prevent another brozzler-worker from claiming the site
if site.rr and doublethink.utcnow() - site.last_claimed > datetime.timedelta(minutes=7):
try:
if site.rr and doublethink.utcnow() - site.last_claimed > datetime.timedelta(minutes=7):
self.logger.debug(
'heartbeating site.last_claimed to prevent another '
'brozzler-worker claiming this site id=%r', site.id)
site.last_claimed = doublethink.utcnow()
site.save()
except:
self.logger.debug(
'heartbeating site.last_claimed to prevent another '
'brozzler-worker claiming this site id=%r', site.id)
site.last_claimed = doublethink.utcnow()
site.save()
'problem heartbeating site.last_claimed site id=%r',
site.id, exc_info=True)
ydl_opts = {
"outtmpl": "{}/ydl%(autonumber)s.out".format(destdir),
@ -494,6 +499,7 @@ class BrozzlerWorker:
except:
self.logger.critical("unexpected exception", exc_info=True)
finally:
site.active_brozzling_time = (site.active_brozzling_time or 0) + time.time() - start
self._frontier.disclaim_site(site, page)
def _brozzle_site_thread_target(self, browser, site):

View File

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup(
name='brozzler',
version='1.1b12.dev261',
version='1.1b12.dev263',
description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler',
author='Noah Levitt',

View File

@ -238,6 +238,9 @@ def test_resume_job():
assert site.starts_and_stops[2]['stop'] > site.starts_and_stops[0]['start']
def test_time_limit():
# XXX test not thoroughly adapted to change in time accounting, since
# starts_and_stops is no longer used to enforce time limits
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db
rr = doublethink.Rethinker('localhost', db='ignoreme')
frontier = brozzler.RethinkDbFrontier(rr)
@ -277,9 +280,16 @@ def test_time_limit():
site.claimed = True
site.save()
time.sleep(0.1)
# time limit not reached yet
frontier._enforce_time_limit(site)
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop'] is None
site.active_brozzling_time = 0.2 # this is why the time limit will be hit
frontier._enforce_time_limit(site)
assert site.status == 'FINISHED_TIME_LIMIT'
assert not site.claimed
assert len(site.starts_and_stops) == 2