From d7512fbeb632473e8f09c8b329035f445ec6401a Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 1 Mar 2018 11:28:30 -0800 Subject: [PATCH] move time limit enforcement now it's next to stop request enforcement which makes more sense and supports more timely action --- brozzler/__init__.py | 3 +++ brozzler/frontier.py | 17 +++++------------ brozzler/worker.py | 4 +++- tests/test_brozzling.py | 0 tests/test_cluster.py | 28 ++++++++++++++++++++++++++++ tests/test_frontier.py | 12 ++++++++---- 6 files changed, 47 insertions(+), 17 deletions(-) mode change 100755 => 100644 tests/test_brozzling.py diff --git a/brozzler/__init__.py b/brozzler/__init__.py index a8d0b33..30da7f0 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -32,6 +32,9 @@ class CrawlStopped(Exception): class ProxyError(Exception): pass +class ReachedTimeLimit(Exception): + pass + class ReachedLimit(Exception): def __init__(self, http_error=None, warcprox_meta=None, http_payload=None): import json diff --git a/brozzler/frontier.py b/brozzler/frontier.py index ee43966..06f482f 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -131,28 +131,21 @@ class RethinkDbFrontier: sites.append(site) if not sites: raise brozzler.NothingToClaim - # XXX This is the only place we enforce time limit for now. Worker - # loop should probably check time limit. Maybe frontier needs a - # housekeeping thread to ensure that time limits get enforced in a - # timely fashion. - for site in list(sites): - if self._enforce_time_limit(site): - sites.remove(site) if sites: return sites # else try again - def _enforce_time_limit(self, site): + def enforce_time_limit(self, site): + ''' + Raises `brozzler.ReachedTimeLimit` if appropriate. + ''' if (site.time_limit and site.time_limit > 0 and (site.active_brozzling_time or 0) > site.time_limit): self.logger.debug( "site FINISHED_TIME_LIMIT! time_limit=%s " "active_brozzling_time=%s %s", site.time_limit, site.active_brozzling_time, site) - self.finished(site, "FINISHED_TIME_LIMIT") - return True - else: - return False + raise brozzler.ReachedTimeLimit def claim_page(self, site, worker_id): # ignores the "claimed" field of the page, because only one diff --git a/brozzler/worker.py b/brozzler/worker.py index 4ea21b4..e681ba5 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -506,12 +506,12 @@ class BrozzlerWorker: site.save() start = time.time() page = None - self._frontier.honor_stop_request(site) self.logger.info( "brozzling site (proxy=%r) %r", self._proxy_for(site), site) while time.time() - start < self.SITE_SESSION_MINUTES * 60: site.refresh() + self._frontier.enforce_time_limit(site) self._frontier.honor_stop_request(site) page = self._frontier.claim_page(site, "%s:%s" % ( socket.gethostname(), browser.chrome.port)) @@ -539,6 +539,8 @@ class BrozzlerWorker: self.logger.info("no pages left for site %s", site) except brozzler.ReachedLimit as e: self._frontier.reached_limit(site, e) + except brozzler.ReachedTimeLimit as e: + self._frontier.finished(site, "FINISHED_TIME_LIMIT") except brozzler.CrawlStopped: self._frontier.finished(site, "FINISHED_STOP_REQUESTED") # except brozzler.browser.BrowsingAborted: diff --git a/tests/test_brozzling.py b/tests/test_brozzling.py old mode 100755 new mode 100644 diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 32e9734..2559e07 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -762,3 +762,31 @@ def test_warcprox_outage_resiliency(httpd): warcprox2_thread.join() start_service('warcprox') +def test_time_limit(httpd): + test_id = 'test_time_limit-%s' % datetime.datetime.utcnow().isoformat() + rr = doublethink.Rethinker('localhost', db='brozzler') + frontier = brozzler.RethinkDbFrontier(rr) + + # create a new job with three sites that could be crawled forever + job_conf = {'seeds': [{ + 'url': 'http://localhost:%s/infinite/foo/' % httpd.server_port, + 'time_limit': 20}]} + job = brozzler.new_job(frontier, job_conf) + assert job.id + + sites = list(frontier.job_sites(job.id)) + assert len(sites) == 1 + site = sites[0] + + # time limit should be enforced pretty soon + start = time.time() + while not sites[0].status.startswith( + 'FINISHED') and time.time() - start < 120: + time.sleep(0.5) + sites[0].refresh() + assert sites[0].status == 'FINISHED_TIME_LIMIT' + + # all sites finished so job should be finished too + job.refresh() + assert job.status == 'FINISHED' + diff --git a/tests/test_frontier.py b/tests/test_frontier.py index 5170a04..58207d2 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -3,7 +3,7 @@ test_frontier.py - fairly narrow tests of frontier management, requires rethinkdb running on localhost -Copyright (C) 2017 Internet Archive +Copyright (C) 2017-2018 Internet Archive Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -380,7 +380,7 @@ def test_time_limit(): assert site.starts_and_stops[1]['stop'] is None # time limit not reached yet - frontier._enforce_time_limit(site) + frontier.enforce_time_limit(site) assert site.status == 'ACTIVE' assert len(site.starts_and_stops) == 2 @@ -392,7 +392,7 @@ def test_time_limit(): site.save() # time limit not reached yet - frontier._enforce_time_limit(site) + frontier.enforce_time_limit(site) assert site.status == 'ACTIVE' assert len(site.starts_and_stops) == 2 assert site.starts_and_stops[1]['start'] @@ -400,7 +400,11 @@ def test_time_limit(): site.active_brozzling_time = 0.2 # this is why the time limit will be hit - frontier._enforce_time_limit(site) + try: + frontier.enforce_time_limit(site) + except brozzler.ReachedTimeLimit: + frontier.finished(site, 'FINISHED_TIME_LIMIT') + assert site.status == 'FINISHED_TIME_LIMIT' assert not site.claimed assert len(site.starts_and_stops) == 2