diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 33c2053..856b8bd 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -177,9 +177,10 @@ class RethinkDbFrontier: def _enforce_time_limit(self, site): if (site.time_limit and site.time_limit > 0 - and (rethinkstuff.utcnow() - site.start_time).total_seconds() > site.time_limit): - self.logger.debug("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s", - site.time_limit, site.start_time, rethinkstuff.utcnow() - site.start_time, site) + and site.elapsed() > site.time_limit): + self.logger.debug( + "site FINISHED_TIME_LIMIT! time_limit=%s elapsed=%s %s", + site.time_limit, site.elapsed(), site) self.finished(site, "FINISHED_TIME_LIMIT") return True else: @@ -277,14 +278,14 @@ class RethinkDbFrontier: n += 1 self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id) - job.status = "FINISHED" - job.finished = rethinkstuff.utcnow() + job.finish() self.update_job(job) return True def finished(self, site, status): self.logger.info("%s %s", status, site) site.status = status + site.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow() self.update_site(site) if site.job_id: self._maybe_finish_job(site.job_id) @@ -301,6 +302,30 @@ class RethinkDbFrontier: page.claimed = False self.update_page(page) + def resume_job(self, job): + job.status = "ACTIVE" + job.starts_and_stops.append( + {"start":rethinkstuff.utcnow(), "stop":None}) + self.update_job(job) + for site in self.job_sites(job.id): + site.status = "ACTIVE" + site.starts_and_stops.append( + {"start":rethinkstuff.utcnow(), "stop":None}) + self.update_site(site) + + def resume_site(self, site): + if site.job_id: + # can't call resume_job since that would resume jobs's other sites + job = self.job(site.job_id) + job.status = "ACTIVE" + job.starts_and_stops.append( + {"start":rethinkstuff.utcnow(), "stop":None}) + self.update_job(job) + site.status = "ACTIVE" + site.starts_and_stops.append( + {"start":rethinkstuff.utcnow(), "stop":None}) + self.update_site(site) + def scope_and_schedule_outlinks(self, site, parent_page, outlinks): if site.remember_outlinks: parent_page.outlinks = {"accepted":[],"blocked":[],"rejected":[]} diff --git a/brozzler/job.py b/brozzler/job.py index bf6f571..1d91621 100644 --- a/brozzler/job.py +++ b/brozzler/job.py @@ -124,14 +124,29 @@ def new_site(frontier, site): class Job(brozzler.BaseDictable): logger = logging.getLogger(__module__ + "." + __qualname__) - def __init__(self, id=None, conf=None, status="ACTIVE", started=None, - finished=None, stop_requested=None): + def __init__( + self, id=None, conf=None, status="ACTIVE", started=None, + finished=None, stop_requested=None, starts_and_stops=None): self.id = id self.conf = conf self.status = status - self.started = started - self.finished = finished self.stop_requested = stop_requested + self.starts_and_stops = starts_and_stops + if not self.starts_and_stops: + if started: # backward compatibility + self.starts_and_stops = [{"start":started,"stop":finished}] + else: + self.starts_and_stops = [ + {"start":rethinkstuff.utcnow(),"stop":None}] + + def finish(self): + if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]: + self.logger.error( + "job is already finished status=%s " + "starts_and_stops[-1]['stop']=%s", self.status, + self.starts_and_stops[-1]["stop"]) + self.status = "FINISHED" + self.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow() def __str__(self): return 'Job(id=%s)' % self.id diff --git a/brozzler/site.py b/brozzler/site.py index a1414c1..17f8983 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -99,7 +99,7 @@ class Site(brozzler.BaseDictable): last_disclaimed=_EPOCH_UTC, last_claimed_by=None, last_claimed=_EPOCH_UTC, metadata={}, remember_outlinks=None, cookie_db=None, user_agent=None, behavior_parameters=None, - username=None, password=None): + username=None, password=None, starts_and_stops=None): self.seed = seed self.id = id @@ -113,7 +113,6 @@ class Site(brozzler.BaseDictable): self.status = status self.claimed = bool(claimed) self.last_claimed_by = last_claimed_by - self.start_time = start_time or rethinkstuff.utcnow() self.last_disclaimed = last_disclaimed self.last_claimed = last_claimed self.metadata = metadata @@ -123,11 +122,32 @@ class Site(brozzler.BaseDictable): self.behavior_parameters = behavior_parameters self.username = username self.password = password + self.starts_and_stops = starts_and_stops + if not self.starts_and_stops: + if start_time: # backward compatibility + self.starts_and_stops = [{"start":start_time,"stop":None}] + if self.status != "ACTIVE": + self.starts_and_stops[0]["stop"] = self.last_disclaimed + else: + self.starts_and_stops = [ + {"start":rethinkstuff.utcnow(),"stop":None}] self.scope = scope or {} if not "surt" in self.scope: self.scope["surt"] = Url(seed).surt + def elapsed(self): + '''Returns elapsed crawl time as a float in seconds.''' + dt = 0 + for ss in self.starts_and_stops[:-1]: + dt += (ss['stop'] - ss['start']).total_seconds() + ss = self.starts_and_stops[-1] + if ss['stop']: + dt += (ss['stop'] - ss['start']).total_seconds() + else: # crawl is active + dt += (rethinkstuff.utcnow() - ss['start']).total_seconds() + return dt + def __str__(self): return "Site-%s-%s" % (self.id, self.seed) diff --git a/setup.py b/setup.py index 830b51e..4a3821a 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b9.dev182', + version='1.1b9.dev183', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt', diff --git a/tests/test_frontier.py b/tests/test_frontier.py new file mode 100644 index 0000000..4553835 --- /dev/null +++ b/tests/test_frontier.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +''' +test_frontier.py - fairly narrow tests of frontier management, requires +rethinkdb running on localhost + +Copyright (C) 2017 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import brozzler +import logging +import argparse +import rethinkstuff +import time + +args = argparse.Namespace() +args.log_level = logging.INFO +brozzler.cli.configure_logging(args) + +def test_rethinkdb_up(): + '''Checks that rethinkdb is listening and looks sane.''' + r = rethinkstuff.Rethinker(db='rethinkdb') # built-in db + tbls = r.table_list().run() + assert len(tbls) > 10 + +def test_resume_job(): + ''' + Tests that the right stuff gets twiddled in rethinkdb when we "start" and + "finish" crawling a job. Doesn't actually crawl anything. + ''' + # vagrant brozzler-worker isn't configured to look at the "ignoreme" db + r = rethinkstuff.Rethinker(db='ignoreme') + frontier = brozzler.RethinkDbFrontier(r) + job_conf = {'seeds': [{'url': 'http://example.com/'}]} + job = brozzler.new_job(frontier, job_conf) + assert len(list(frontier.job_sites(job.id))) == 1 + site = list(frontier.job_sites(job.id))[0] + + assert job.status == 'ACTIVE' + assert len(job.starts_and_stops) == 1 + assert job.starts_and_stops[0]['start'] + assert job.starts_and_stops[0]['stop'] is None + assert site.status == 'ACTIVE' + assert len(site.starts_and_stops) == 1 + assert site.starts_and_stops[0]['start'] + assert site.starts_and_stops[0]['stop'] is None + + frontier.finished(site, 'FINISHED') + job = frontier.job(job.id) + + assert job.status == 'FINISHED' + assert len(job.starts_and_stops) == 1 + assert job.starts_and_stops[0]['start'] + assert job.starts_and_stops[0]['stop'] + assert job.starts_and_stops[0]['stop'] > job.starts_and_stops[0]['start'] + assert site.status == 'FINISHED' + assert len(site.starts_and_stops) == 1 + assert site.starts_and_stops[0]['start'] + assert site.starts_and_stops[0]['stop'] + assert site.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start'] + + frontier.resume_site(site) + job = frontier.job(job.id) + + assert job.status == 'ACTIVE' + assert len(job.starts_and_stops) == 2 + assert job.starts_and_stops[1]['start'] + assert job.starts_and_stops[1]['stop'] is None + assert site.status == 'ACTIVE' + assert len(site.starts_and_stops) == 2 + assert site.starts_and_stops[1]['start'] + assert site.starts_and_stops[1]['stop'] is None + + frontier.finished(site, 'FINISHED') + job = frontier.job(job.id) + + assert job.status == 'FINISHED' + assert len(job.starts_and_stops) == 2 + assert job.starts_and_stops[1]['start'] + assert job.starts_and_stops[1]['stop'] + assert job.starts_and_stops[1]['stop'] > job.starts_and_stops[0]['start'] + assert site.status == 'FINISHED' + assert len(site.starts_and_stops) == 2 + assert site.starts_and_stops[1]['start'] + assert site.starts_and_stops[1]['stop'] + assert site.starts_and_stops[1]['stop'] > site.starts_and_stops[0]['start'] + + # resuming a job == resuming all of its sites + frontier.resume_job(job) + site = list(frontier.job_sites(job.id))[0] + + assert job.status == 'ACTIVE' + assert len(job.starts_and_stops) == 3 + assert job.starts_and_stops[2]['start'] + assert job.starts_and_stops[2]['stop'] is None + assert site.status == 'ACTIVE' + assert len(site.starts_and_stops) == 3 + assert site.starts_and_stops[2]['start'] + assert site.starts_and_stops[2]['stop'] is None + + frontier.finished(site, 'FINISHED') + job = frontier.job(job.id) + + assert job.status == 'FINISHED' + assert len(job.starts_and_stops) == 3 + assert job.starts_and_stops[2]['start'] + assert job.starts_and_stops[2]['stop'] + assert job.starts_and_stops[2]['stop'] > job.starts_and_stops[0]['start'] + assert site.status == 'FINISHED' + assert len(site.starts_and_stops) == 3 + assert site.starts_and_stops[2]['start'] + assert site.starts_and_stops[2]['stop'] + assert site.starts_and_stops[2]['stop'] > site.starts_and_stops[0]['start'] + +def test_time_limit(): + # vagrant brozzler-worker isn't configured to look at the "ignoreme" db + r = rethinkstuff.Rethinker('localhost', db='ignoreme') + frontier = brozzler.RethinkDbFrontier(r) + site = brozzler.Site(seed='http://example.com/', time_limit=99999) + brozzler.new_site(frontier, site) + + site = frontier.site(site.id) # get it back from the db + assert site.status == 'ACTIVE' + assert len(site.starts_and_stops) == 1 + assert site.starts_and_stops[0]['start'] + assert site.starts_and_stops[0]['stop'] is None + + frontier.finished(site, 'FINISHED') + + assert site.status == 'FINISHED' + assert len(site.starts_and_stops) == 1 + assert site.starts_and_stops[0]['start'] + assert site.starts_and_stops[0]['stop'] + assert site.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start'] + + frontier.resume_site(site) + + assert site.status == 'ACTIVE' + assert len(site.starts_and_stops) == 2 + assert site.starts_and_stops[1]['start'] + assert site.starts_and_stops[1]['stop'] is None + + # time limit not reached yet + frontier._enforce_time_limit(site) + + assert site.status == 'ACTIVE' + assert len(site.starts_and_stops) == 2 + assert site.starts_and_stops[1]['start'] + assert site.starts_and_stops[1]['stop'] is None + + site.time_limit = 0.1 + frontier.update_site(site) + + time.sleep(0.1) + frontier._enforce_time_limit(site) + + assert site.status == 'FINISHED_TIME_LIMIT' + assert len(site.starts_and_stops) == 2 + assert site.starts_and_stops[1]['start'] + assert site.starts_and_stops[1]['stop'] + assert site.starts_and_stops[1]['stop'] > site.starts_and_stops[0]['start']