Merge branch 'master' into qa

* master:
  support for resuming jobs, keeping track of each start and stop time, used to enforce time limits correctly
  let rethinkdb generate job.id if not supplied in configuration
This commit is contained in:
Noah Levitt 2017-02-03 14:56:21 -08:00
commit 3783e49cb4
8 changed files with 265 additions and 29 deletions

View File

@ -97,9 +97,8 @@ Job Configuration
-----------------
Jobs are defined using yaml files. Options may be specified either at the
top-level or on individual seeds. A job id and at least one seed url
must be specified, everything else is optional. For details, see
`<job-conf.rst>`_.
top-level or on individual seeds. At least one seed url must be specified,
everything else is optional. For details, see `<job-conf.rst>`_.
::
@ -238,7 +237,7 @@ option:
brozzler-worker --chrome-exe ~/bin/headless_chromium.sh
To render Flash content, `download <https://get.adobe.com/flashplayer/otherversions/>`_
and extract the Linux (.tar.gz) PPAPI plugin. Configure Headless Chromium
and extract the Linux (.tar.gz) PPAPI plugin. Configure Headless Chromium
to load the plugin by adding this option to your wrapper script:
::

View File

@ -1,7 +1,7 @@
'''
brozzler/frontier.py - RethinkDbFrontier manages crawl jobs, sites and pages
Copyright (C) 2014-2016 Internet Archive
Copyright (C) 2014-2017 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -99,6 +99,7 @@ class RethinkDbFrontier:
if not job.id:
# only if "id" has not already been set
job.id = result["generated_keys"][0]
return job
def new_site(self, site):
self.logger.info("inserting into 'sites' table %s", site)
@ -176,9 +177,10 @@ class RethinkDbFrontier:
def _enforce_time_limit(self, site):
if (site.time_limit and site.time_limit > 0
and (rethinkstuff.utcnow() - site.start_time).total_seconds() > site.time_limit):
self.logger.debug("site FINISHED_TIME_LIMIT! time_limit=%s start_time=%s elapsed=%s %s",
site.time_limit, site.start_time, rethinkstuff.utcnow() - site.start_time, site)
and site.elapsed() > site.time_limit):
self.logger.debug(
"site FINISHED_TIME_LIMIT! time_limit=%s elapsed=%s %s",
site.time_limit, site.elapsed(), site)
self.finished(site, "FINISHED_TIME_LIMIT")
return True
else:
@ -276,14 +278,14 @@ class RethinkDbFrontier:
n += 1
self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id)
job.status = "FINISHED"
job.finished = rethinkstuff.utcnow()
job.finish()
self.update_job(job)
return True
def finished(self, site, status):
self.logger.info("%s %s", status, site)
site.status = status
site.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow()
self.update_site(site)
if site.job_id:
self._maybe_finish_job(site.job_id)
@ -300,6 +302,30 @@ class RethinkDbFrontier:
page.claimed = False
self.update_page(page)
def resume_job(self, job):
job.status = "ACTIVE"
job.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None})
self.update_job(job)
for site in self.job_sites(job.id):
site.status = "ACTIVE"
site.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None})
self.update_site(site)
def resume_site(self, site):
if site.job_id:
# can't call resume_job since that would resume jobs's other sites
job = self.job(site.job_id)
job.status = "ACTIVE"
job.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None})
self.update_job(job)
site.status = "ACTIVE"
site.starts_and_stops.append(
{"start":rethinkstuff.utcnow(), "stop":None})
self.update_site(site)
def scope_and_schedule_outlinks(self, site, parent_page, outlinks):
if site.remember_outlinks:
parent_page.outlinks = {"accepted":[],"blocked":[],"rejected":[]}

View File

@ -2,7 +2,7 @@
brozzler/job.py - Job class representing a brozzler crawl job, and functions
for setting up a job with supplied configuration
Copyright (C) 2014-2016 Internet Archive
Copyright (C) 2014-2017 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -61,17 +61,22 @@ def merge(a, b):
return a
def new_job_file(frontier, job_conf_file):
'''Returns new Job.'''
logging.info("loading %s", job_conf_file)
with open(job_conf_file) as f:
job_conf = yaml.load(f)
new_job(frontier, job_conf)
return new_job(frontier, job_conf)
def new_job(frontier, job_conf):
'''Returns new Job.'''
validate_conf(job_conf)
job = Job(
id=job_conf.get("id"), conf=job_conf, status="ACTIVE",
started=rethinkstuff.utcnow())
# insert the job now to make sure it has an id
job = frontier.new_job(job)
sites = []
for seed_conf in job_conf["seeds"]:
merged_conf = merge(seed_conf, job_conf)
@ -92,11 +97,10 @@ def new_job(frontier, job_conf):
password=merged_conf.get("password"))
sites.append(site)
# insert all the sites into database before the job
for site in sites:
new_site(frontier, site)
frontier.new_job(job)
return job
def new_site(frontier, site):
site.id = str(uuid.uuid4())
@ -120,14 +124,29 @@ def new_site(frontier, site):
class Job(brozzler.BaseDictable):
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, id=None, conf=None, status="ACTIVE", started=None,
finished=None, stop_requested=None):
def __init__(
self, id=None, conf=None, status="ACTIVE", started=None,
finished=None, stop_requested=None, starts_and_stops=None):
self.id = id
self.conf = conf
self.status = status
self.started = started
self.finished = finished
self.stop_requested = stop_requested
self.starts_and_stops = starts_and_stops
if not self.starts_and_stops:
if started: # backward compatibility
self.starts_and_stops = [{"start":started,"stop":finished}]
else:
self.starts_and_stops = [
{"start":rethinkstuff.utcnow(),"stop":None}]
def finish(self):
if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]:
self.logger.error(
"job is already finished status=%s "
"starts_and_stops[-1]['stop']=%s", self.status,
self.starts_and_stops[-1]["stop"])
self.status = "FINISHED"
self.starts_and_stops[-1]["stop"] = rethinkstuff.utcnow()
def __str__(self):
return 'Job(id=%s)' % self.id

View File

@ -2,7 +2,7 @@ id:
type:
- string
- integer
required: true
required: false
<<: &multi_level_options
time_limit:

View File

@ -99,7 +99,7 @@ class Site(brozzler.BaseDictable):
last_disclaimed=_EPOCH_UTC, last_claimed_by=None,
last_claimed=_EPOCH_UTC, metadata={}, remember_outlinks=None,
cookie_db=None, user_agent=None, behavior_parameters=None,
username=None, password=None):
username=None, password=None, starts_and_stops=None):
self.seed = seed
self.id = id
@ -113,7 +113,6 @@ class Site(brozzler.BaseDictable):
self.status = status
self.claimed = bool(claimed)
self.last_claimed_by = last_claimed_by
self.start_time = start_time or rethinkstuff.utcnow()
self.last_disclaimed = last_disclaimed
self.last_claimed = last_claimed
self.metadata = metadata
@ -123,11 +122,32 @@ class Site(brozzler.BaseDictable):
self.behavior_parameters = behavior_parameters
self.username = username
self.password = password
self.starts_and_stops = starts_and_stops
if not self.starts_and_stops:
if start_time: # backward compatibility
self.starts_and_stops = [{"start":start_time,"stop":None}]
if self.status != "ACTIVE":
self.starts_and_stops[0]["stop"] = self.last_disclaimed
else:
self.starts_and_stops = [
{"start":rethinkstuff.utcnow(),"stop":None}]
self.scope = scope or {}
if not "surt" in self.scope:
self.scope["surt"] = Url(seed).surt
def elapsed(self):
'''Returns elapsed crawl time as a float in seconds.'''
dt = 0
for ss in self.starts_and_stops[:-1]:
dt += (ss['stop'] - ss['start']).total_seconds()
ss = self.starts_and_stops[-1]
if ss['stop']:
dt += (ss['stop'] - ss['start']).total_seconds()
else: # crawl is active
dt += (rethinkstuff.utcnow() - ss['start']).total_seconds()
return dt
def __str__(self):
return "Site-%s-%s" % (self.id, self.seed)

View File

@ -2,8 +2,8 @@ brozzler job configuration
**************************
Jobs are defined using yaml files. Options may be specified either at the
top-level or on individual seeds. A job id and at least one seed url
must be specified, everything else is optional.
top-level or on individual seeds. At least one seed url must be specified,
everything else is optional.
an example
==========
@ -85,11 +85,11 @@ settings reference
id
--
+-----------+--------+----------+---------+
| scope | type | required | default |
+===========+========+==========+=========+
| top-level | string | yes? | *n/a* |
+-----------+--------+----------+---------+
+-----------+--------+----------+--------------------------+
| scope | type | required | default |
+===========+========+==========+==========================+
| top-level | string | no | *generated by rethinkdb* |
+-----------+--------+----------+--------------------------+
An arbitrary identifier for this job. Must be unique across this deployment of
brozzler.

View File

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup(
name='brozzler',
version='1.1b9.dev181',
version='1.1b9.dev183',
description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler',
author='Noah Levitt',

172
tests/test_frontier.py Normal file
View File

@ -0,0 +1,172 @@
#!/usr/bin/env python
'''
test_frontier.py - fairly narrow tests of frontier management, requires
rethinkdb running on localhost
Copyright (C) 2017 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import brozzler
import logging
import argparse
import rethinkstuff
import time
args = argparse.Namespace()
args.log_level = logging.INFO
brozzler.cli.configure_logging(args)
def test_rethinkdb_up():
'''Checks that rethinkdb is listening and looks sane.'''
r = rethinkstuff.Rethinker(db='rethinkdb') # built-in db
tbls = r.table_list().run()
assert len(tbls) > 10
def test_resume_job():
'''
Tests that the right stuff gets twiddled in rethinkdb when we "start" and
"finish" crawling a job. Doesn't actually crawl anything.
'''
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db
r = rethinkstuff.Rethinker(db='ignoreme')
frontier = brozzler.RethinkDbFrontier(r)
job_conf = {'seeds': [{'url': 'http://example.com/'}]}
job = brozzler.new_job(frontier, job_conf)
assert len(list(frontier.job_sites(job.id))) == 1
site = list(frontier.job_sites(job.id))[0]
assert job.status == 'ACTIVE'
assert len(job.starts_and_stops) == 1
assert job.starts_and_stops[0]['start']
assert job.starts_and_stops[0]['stop'] is None
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 1
assert site.starts_and_stops[0]['start']
assert site.starts_and_stops[0]['stop'] is None
frontier.finished(site, 'FINISHED')
job = frontier.job(job.id)
assert job.status == 'FINISHED'
assert len(job.starts_and_stops) == 1
assert job.starts_and_stops[0]['start']
assert job.starts_and_stops[0]['stop']
assert job.starts_and_stops[0]['stop'] > job.starts_and_stops[0]['start']
assert site.status == 'FINISHED'
assert len(site.starts_and_stops) == 1
assert site.starts_and_stops[0]['start']
assert site.starts_and_stops[0]['stop']
assert site.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start']
frontier.resume_site(site)
job = frontier.job(job.id)
assert job.status == 'ACTIVE'
assert len(job.starts_and_stops) == 2
assert job.starts_and_stops[1]['start']
assert job.starts_and_stops[1]['stop'] is None
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop'] is None
frontier.finished(site, 'FINISHED')
job = frontier.job(job.id)
assert job.status == 'FINISHED'
assert len(job.starts_and_stops) == 2
assert job.starts_and_stops[1]['start']
assert job.starts_and_stops[1]['stop']
assert job.starts_and_stops[1]['stop'] > job.starts_and_stops[0]['start']
assert site.status == 'FINISHED'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop']
assert site.starts_and_stops[1]['stop'] > site.starts_and_stops[0]['start']
# resuming a job == resuming all of its sites
frontier.resume_job(job)
site = list(frontier.job_sites(job.id))[0]
assert job.status == 'ACTIVE'
assert len(job.starts_and_stops) == 3
assert job.starts_and_stops[2]['start']
assert job.starts_and_stops[2]['stop'] is None
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 3
assert site.starts_and_stops[2]['start']
assert site.starts_and_stops[2]['stop'] is None
frontier.finished(site, 'FINISHED')
job = frontier.job(job.id)
assert job.status == 'FINISHED'
assert len(job.starts_and_stops) == 3
assert job.starts_and_stops[2]['start']
assert job.starts_and_stops[2]['stop']
assert job.starts_and_stops[2]['stop'] > job.starts_and_stops[0]['start']
assert site.status == 'FINISHED'
assert len(site.starts_and_stops) == 3
assert site.starts_and_stops[2]['start']
assert site.starts_and_stops[2]['stop']
assert site.starts_and_stops[2]['stop'] > site.starts_and_stops[0]['start']
def test_time_limit():
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db
r = rethinkstuff.Rethinker('localhost', db='ignoreme')
frontier = brozzler.RethinkDbFrontier(r)
site = brozzler.Site(seed='http://example.com/', time_limit=99999)
brozzler.new_site(frontier, site)
site = frontier.site(site.id) # get it back from the db
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 1
assert site.starts_and_stops[0]['start']
assert site.starts_and_stops[0]['stop'] is None
frontier.finished(site, 'FINISHED')
assert site.status == 'FINISHED'
assert len(site.starts_and_stops) == 1
assert site.starts_and_stops[0]['start']
assert site.starts_and_stops[0]['stop']
assert site.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start']
frontier.resume_site(site)
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop'] is None
# time limit not reached yet
frontier._enforce_time_limit(site)
assert site.status == 'ACTIVE'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop'] is None
site.time_limit = 0.1
frontier.update_site(site)
time.sleep(0.1)
frontier._enforce_time_limit(site)
assert site.status == 'FINISHED_TIME_LIMIT'
assert len(site.starts_and_stops) == 2
assert site.starts_and_stops[1]['start']
assert site.starts_and_stops[1]['stop']
assert site.starts_and_stops[1]['stop'] > site.starts_and_stops[0]['start']