support for resuming jobs, keeping track of each start and stop time, used to enforce time limits correctly

This commit is contained in:
Noah Levitt 2017-02-03 14:56:12 -08:00
parent 5a0301ac12
commit a60878c5a7
5 changed files with 244 additions and 12 deletions

View File

@ -177,9 +177,10 @@ class RethinkDbFrontier:
def _enforce_time_limit(self, site):
if (site.time_limit and site.time_limit > 0
and (rethinkstuff.utcnow() - site.start_time).total_seconds() > site.time_limit):
self.logger.debug("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s",
site.time_limit, site.start_time, rethinkstuff.utcnow() - site.start_time, site)
and site.elapsed() > site.time_limit):
self.logger.debug(
"site FINISHED_TIME_LIMIT! time_limit=%s elapsed=%s %s",
site.time_limit, site.elapsed(), site)
self.finished(site, "FINISHED_TIME_LIMIT")
return True
else:
@ -277,14 +278,14 @@ class RethinkDbFrontier:
n += 1
self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id)
job.status = "FINISHED"
job.finished = rethinkstuff.utcnow()
job.finish()
self.update_job(job)
return True
def finished(self, site, status):
self.logger.info("%s %s", status, site)
site.status = status
site.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow()
self.update_site(site)
if site.job_id:
self._maybe_finish_job(site.job_id)
@ -301,6 +302,30 @@ class RethinkDbFrontier:
page.claimed = False
self.update_page(page)
def resume_job(self, job):
job.status = "ACTIVE"
job.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None})
self.update_job(job)
for site in self.job_sites(job.id):
site.status = "ACTIVE"
site.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None})
self.update_site(site)
def resume_site(self, site):
if site.job_id:
# can't call resume_job since that would resume jobs's other sites
job = self.job(site.job_id)
job.status = "ACTIVE"
job.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None})
self.update_job(job)
site.status = "ACTIVE"
site.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None})
self.update_site(site)
def scope_and_schedule_outlinks(self, site, parent_page, outlinks):
if site.remember_outlinks:
parent_page.outlinks = {"accepted":[],"blocked":[],"rejected":[]}

View File

@ -124,14 +124,29 @@ def new_site(frontier, site):
class Job(brozzler.BaseDictable):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, id=None, conf=None, status="ACTIVE", started=None,
finished=None, stop_requested=None):
def __init__(
self, id=None, conf=None, status="ACTIVE", started=None,
finished=None, stop_requested=None, starts_and_stops=None):
self.id = id
self.conf = conf
self.status = status
self.started = started
self.finished = finished
self.stop_requested = stop_requested
self.starts_and_stops = starts_and_stops
if not self.starts_and_stops:
if started: # backward compatibility
self.starts_and_stops = [{"start":started,"stop":finished}]
else:
self.starts_and_stops = [
{"start":rethinkstuff.utcnow(),"stop":None}]
def finish(self):
if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]:
self.logger.error(
"job is already finished status=%s "
"starts_and_stops[-1]['stop']=%s", self.status,
self.starts_and_stops[-1]["stop"])
self.status = "FINISHED"
self.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow()
def __str__(self):
return 'Job(id=%s)' % self.id

View File

@ -99,7 +99,7 @@ class Site(brozzler.BaseDictable):
last_disclaimed=_EPOCH_UTC, last_claimed_by=None,
last_claimed=_EPOCH_UTC, metadata={}, remember_outlinks=None,
cookie_db=None, user_agent=None, behavior_parameters=None,
username=None, password=None):
username=None, password=None, starts_and_stops=None):
self.seed = seed
self.id = id
@ -113,7 +113,6 @@ class Site(brozzler.BaseDictable):
self.status = status
self.claimed = bool(claimed)
self.last_claimed_by = last_claimed_by
self.start_time = start_time or rethinkstuff.utcnow()
self.last_disclaimed = last_disclaimed
self.last_claimed = last_claimed
self.metadata = metadata
@ -123,11 +122,32 @@ class Site(brozzler.BaseDictable):
self.behavior_parameters = behavior_parameters
self.username = username
self.password = password
self.starts_and_stops = starts_and_stops
if not self.starts_and_stops:
if start_time: # backward compatibility
self.starts_and_stops = [{"start":start_time,"stop":None}]
if self.status != "ACTIVE":
self.starts_and_stops[0]["stop"] = self.last_disclaimed
else:
self.starts_and_stops = [
{"start":rethinkstuff.utcnow(),"stop":None}]
self.scope = scope or {}
if not "surt" in self.scope:
self.scope["surt"] = Url(seed).surt
def elapsed(self):
'''Returns elapsed crawl time as a float in seconds.'''
dt = 0
for ss in self.starts_and_stops[:-1]:
dt += (ss['stop'] - ss['start']).total_seconds()
ss = self.starts_and_stops[-1]
if ss['stop']:
dt += (ss['stop'] - ss['start']).total_seconds()
else: # crawl is active
dt += (rethinkstuff.utcnow() - ss['start']).total_seconds()
return dt
def __str__(self):
return "Site-%s-%s" % (self.id, self.seed)

View File

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup(
name='brozzler',
version='1.1b9.dev182',
version='1.1b9.dev183',
description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler',
author='Noah Levitt',

172
tests/test_frontier.py Normal file
View File

@ -0,0 +1,172 @@
#!/usr/bin/env python
'''
test_frontier.py - fairly narrow tests of frontier management, requires
rethinkdb running on localhost
Copyright (C) 2017 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import brozzler
import logging
import argparse
import rethinkstuff
import time
args = argparse.Namespace()
args.log_level = logging.INFO
brozzler.cli.configure_logging(args)
def test_rethinkdb_up():
'''Checks that rethinkdb is listening and looks sane.'''
r = rethinkstuff.Rethinker(db='rethinkdb') # built-in db
tbls = r.table_list().run()
assert len(tbls) > 10
def test_resume_job():
'''
Tests that the right stuff gets twiddled in rethinkdb when we "start" and
"finish" crawling a job. Doesn't actually crawl anything.
'''
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db
r = rethinkstuff.Rethinker(db='ignoreme')
frontier = brozzler.RethinkDbFrontier(r)
job_conf = {'seeds': [{'url': 'http://example.com/'}]}
job = brozzler.new_job(frontier, job_conf)
assert len(list(frontier.job_sites(job.id))) == 1
site = list(frontier.job_sites(job.id))[0]
assert job.status == 'ACTIVE'
assert len(job.starts_and_stops) == 1
assert job.starts_and_stops[0]['start']
assert job.starts_and_stops[0]['stop'] is None
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 1
assert site.starts_and_stops[0]['start']
assert site.starts_and_stops[0]['stop'] is None
frontier.finished(site, 'FINISHED')
job = frontier.job(job.id)
assert job.status == 'FINISHED'
assert len(job.starts_and_stops) == 1
assert job.starts_and_stops[0]['start']
assert job.starts_and_stops[0]['stop']
assert job.starts_and_stops[0]['stop'] > job.starts_and_stops[0]['start']
assert site.status == 'FINISHED'
assert len(site.starts_and_stops) == 1
assert site.starts_and_stops[0]['start']
assert site.starts_and_stops[0]['stop']
assert site.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start']
frontier.resume_site(site)
job = frontier.job(job.id)
assert job.status == 'ACTIVE'
assert len(job.starts_and_stops) == 2
assert job.starts_and_stops[1]['start']
assert job.starts_and_stops[1]['stop'] is None
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop'] is None
frontier.finished(site, 'FINISHED')
job = frontier.job(job.id)
assert job.status == 'FINISHED'
assert len(job.starts_and_stops) == 2
assert job.starts_and_stops[1]['start']
assert job.starts_and_stops[1]['stop']
assert job.starts_and_stops[1]['stop'] > job.starts_and_stops[0]['start']
assert site.status == 'FINISHED'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop']
assert site.starts_and_stops[1]['stop'] > site.starts_and_stops[0]['start']
# resuming a job == resuming all of its sites
frontier.resume_job(job)
site = list(frontier.job_sites(job.id))[0]
assert job.status == 'ACTIVE'
assert len(job.starts_and_stops) == 3
assert job.starts_and_stops[2]['start']
assert job.starts_and_stops[2]['stop'] is None
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 3
assert site.starts_and_stops[2]['start']
assert site.starts_and_stops[2]['stop'] is None
frontier.finished(site, 'FINISHED')
job = frontier.job(job.id)
assert job.status == 'FINISHED'
assert len(job.starts_and_stops) == 3
assert job.starts_and_stops[2]['start']
assert job.starts_and_stops[2]['stop']
assert job.starts_and_stops[2]['stop'] > job.starts_and_stops[0]['start']
assert site.status == 'FINISHED'
assert len(site.starts_and_stops) == 3
assert site.starts_and_stops[2]['start']
assert site.starts_and_stops[2]['stop']
assert site.starts_and_stops[2]['stop'] > site.starts_and_stops[0]['start']
def test_time_limit():
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db
r = rethinkstuff.Rethinker('localhost', db='ignoreme')
frontier = brozzler.RethinkDbFrontier(r)
site = brozzler.Site(seed='http://example.com/', time_limit=99999)
brozzler.new_site(frontier, site)
site = frontier.site(site.id) # get it back from the db
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 1
assert site.starts_and_stops[0]['start']
assert site.starts_and_stops[0]['stop'] is None
frontier.finished(site, 'FINISHED')
assert site.status == 'FINISHED'
assert len(site.starts_and_stops) == 1
assert site.starts_and_stops[0]['start']
assert site.starts_and_stops[0]['stop']
assert site.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start']
frontier.resume_site(site)
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop'] is None
# time limit not reached yet
frontier._enforce_time_limit(site)
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop'] is None
site.time_limit = 0.1
frontier.update_site(site)
time.sleep(0.1)
frontier._enforce_time_limit(site)
assert site.status == 'FINISHED_TIME_LIMIT'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop']
assert site.starts_and_stops[1]['stop'] > site.starts_and_stops[0]['start']