consolidate job.py and site.py into model.py, and let Job and Site share the elapsed() method by way of a mixin

This commit is contained in:
Noah Levitt 2017-03-29 18:49:04 -07:00
parent 3d47805ec1
commit 125d77b8c4
5 changed files with 146 additions and 170 deletions

View File

@ -158,14 +158,14 @@ import datetime
EPOCH_UTC = datetime.datetime.utcfromtimestamp(0.0).replace( EPOCH_UTC = datetime.datetime.utcfromtimestamp(0.0).replace(
tzinfo=doublethink.UTC) tzinfo=doublethink.UTC)
from brozzler.site import Page, Site
from brozzler.worker import BrozzlerWorker from brozzler.worker import BrozzlerWorker
from brozzler.robots import is_permitted_by_robots from brozzler.robots import is_permitted_by_robots
from brozzler.frontier import RethinkDbFrontier from brozzler.frontier import RethinkDbFrontier
from brozzler.browser import Browser, BrowserPool, BrowsingException from brozzler.browser import Browser, BrowserPool, BrowsingException
from brozzler.job import new_job, new_site, Job from brozzler.model import (
new_job, new_job_file, new_site, Job, Page, Site, InvalidJobConf)
from brozzler.cli import suggest_default_chrome_exe from brozzler.cli import suggest_default_chrome_exe
__all__ = ['Page', 'Site', 'BrozzlerWorker', 'is_permitted_by_robots', __all__ = ['Page', 'Site', 'BrozzlerWorker', 'is_permitted_by_robots',
'RethinkDbFrontier', 'Browser', 'BrowserPool', 'BrowsingException', 'RethinkDbFrontier', 'Browser', 'BrowserPool', 'BrowsingException',
'new_job', 'new_site', 'Job'] 'new_job', 'new_site', 'Job', 'new_job_file', 'InvalidJobConf']

View File

@ -209,8 +209,8 @@ def brozzler_new_job():
rr = rethinker(args) rr = rethinker(args)
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
try: try:
brozzler.job.new_job_file(frontier, args.job_conf_file) brozzler.new_job_file(frontier, args.job_conf_file)
except brozzler.job.InvalidJobConf as e: except brozzler.InvalidJobConf as e:
print('brozzler-new-job: invalid job file:', args.job_conf_file, file=sys.stderr) print('brozzler-new-job: invalid job file:', args.job_conf_file, file=sys.stderr)
print(' ' + yaml.dump(e.errors).rstrip().replace('\n', '\n '), file=sys.stderr) print(' ' + yaml.dump(e.errors).rstrip().replace('\n', '\n '), file=sys.stderr)
sys.exit(1) sys.exit(1)

View File

@ -1,144 +0,0 @@
'''
brozzler/job.py - Job class representing a brozzler crawl job, and functions
for setting up a job with supplied configuration
Copyright (C) 2014-2017 Internet Archive
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging
import brozzler
import yaml
import json
import datetime
import uuid
import doublethink
import os
import cerberus
import urllib
import urlcanon
def load_schema():
schema_file = os.path.join(os.path.dirname(__file__), 'job_schema.yaml')
with open(schema_file) as f:
return yaml.load(f)
class JobValidator(cerberus.Validator):
def _validate_type_url(self, value):
url = urllib.parse.urlparse(value)
return url.scheme in ('http', 'https', 'ftp')
class InvalidJobConf(Exception):
def __init__(self, errors):
self.errors = errors
def validate_conf(job_conf, schema=load_schema()):
v = JobValidator(schema)
if not v.validate(job_conf):
raise InvalidJobConf(v.errors)
def merge(a, b):
if isinstance(a, dict) and isinstance(b, dict):
merged = dict(a)
b_tmp = dict(b)
for k in a:
merged[k] = merge(a[k], b_tmp.pop(k, None))
merged.update(b_tmp)
return merged
elif isinstance(a, list) and isinstance(b, list):
return a + b
else:
return a
def new_job_file(frontier, job_conf_file):
'''Returns new Job.'''
logging.info("loading %s", job_conf_file)
with open(job_conf_file) as f:
job_conf = yaml.load(f)
return new_job(frontier, job_conf)
def new_job(frontier, job_conf):
'''Returns new Job.'''
validate_conf(job_conf)
job = Job(frontier.rr, {
"conf": job_conf, "status": "ACTIVE",
"started": doublethink.utcnow()})
if "id" in job_conf:
job.id = job_conf["id"]
job.save()
sites = []
for seed_conf in job_conf["seeds"]:
merged_conf = merge(seed_conf, job_conf)
merged_conf.pop("seeds")
merged_conf["job_id"] = job.id
merged_conf["seed"] = merged_conf.pop("url")
site = brozzler.Site(frontier.rr, merged_conf)
sites.append(site)
for site in sites:
new_site(frontier, site)
return job
def new_site(frontier, site):
site.id = str(uuid.uuid4())
logging.info("new site {}".format(site))
# insert the Page into the database before the Site, to avoid situation
# where a brozzler worker immediately claims the site, finds no pages
# to crawl, and decides the site is finished
try:
url = urlcanon.parse_url(site.seed)
hashtag = (url.hash_sign + url.fragment).decode("utf-8")
urlcanon.canon.remove_fragment(url)
page = brozzler.Page(frontier.rr, {
"url": str(url), "site_id": site.get("id"),
"job_id": site.get("job_id"), "hops_from_seed": 0,
"priority": 1000, "needs_robots_check": True})
if hashtag:
page.hashtags = [hashtag,]
page.save()
logging.info("queued page %s", page)
finally:
# finally block because we want to insert the Site no matter what
site.save()
class Job(doublethink.Document):
logger = logging.getLogger(__module__ + "." + __qualname__)
table = "jobs"
def populate_defaults(self):
if not "status" in self:
self.status = "ACTIVE"
if not "starts_and_stops" in self:
if self.get("started"): # backward compatibility
self.starts_and_stops = [{
"start": self.get("started"),
"stop": self.get("finished")}]
del self["started"]
if "finished" in self:
del self["finished"]
else:
self.starts_and_stops = [
{"start":doublethink.utcnow(),"stop":None}]
def finish(self):
if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]:
self.logger.error(
"job is already finished status=%s "
"starts_and_stops[-1]['stop']=%s", self.status,
self.starts_and_stops[-1]["stop"])
self.status = "FINISHED"
self.starts_and_stops[-1]["stop"] = doublethink.utcnow()

View File

@ -1,5 +1,6 @@
''' '''
brozzler/site.py - classes representing sites and pages brozzler/models.py - model classes representing jobs, sites, and pages, with
related logic
Copyright (C) 2014-2017 Internet Archive Copyright (C) 2014-2017 Internet Archive
@ -16,17 +17,148 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
''' '''
import urlcanon import brozzler
import cerberus
import datetime
import doublethink
import hashlib
import json import json
import logging import logging
import brozzler import os
import hashlib
import time
import doublethink
import datetime
import re import re
import time
import urlcanon
import urllib
import uuid
import yaml
class Site(doublethink.Document): def load_schema():
schema_file = os.path.join(os.path.dirname(__file__), 'job_schema.yaml')
with open(schema_file) as f:
return yaml.load(f)
class JobValidator(cerberus.Validator):
def _validate_type_url(self, value):
url = urllib.parse.urlparse(value)
return url.scheme in ('http', 'https', 'ftp')
class InvalidJobConf(Exception):
def __init__(self, errors):
self.errors = errors
def validate_conf(job_conf, schema=load_schema()):
v = JobValidator(schema)
if not v.validate(job_conf):
raise InvalidJobConf(v.errors)
def merge(a, b):
if isinstance(a, dict) and isinstance(b, dict):
merged = dict(a)
b_tmp = dict(b)
for k in a:
merged[k] = merge(a[k], b_tmp.pop(k, None))
merged.update(b_tmp)
return merged
elif isinstance(a, list) and isinstance(b, list):
return a + b
else:
return a
def new_job_file(frontier, job_conf_file):
'''Returns new Job.'''
logging.info("loading %s", job_conf_file)
with open(job_conf_file) as f:
job_conf = yaml.load(f)
return new_job(frontier, job_conf)
def new_job(frontier, job_conf):
'''Returns new Job.'''
validate_conf(job_conf)
job = Job(frontier.rr, {
"conf": job_conf, "status": "ACTIVE",
"started": doublethink.utcnow()})
if "id" in job_conf:
job.id = job_conf["id"]
job.save()
sites = []
for seed_conf in job_conf["seeds"]:
merged_conf = merge(seed_conf, job_conf)
merged_conf.pop("seeds")
merged_conf["job_id"] = job.id
merged_conf["seed"] = merged_conf.pop("url")
site = brozzler.Site(frontier.rr, merged_conf)
sites.append(site)
for site in sites:
new_site(frontier, site)
return job
def new_site(frontier, site):
site.id = str(uuid.uuid4())
logging.info("new site {}".format(site))
# insert the Page into the database before the Site, to avoid situation
# where a brozzler worker immediately claims the site, finds no pages
# to crawl, and decides the site is finished
try:
url = urlcanon.parse_url(site.seed)
hashtag = (url.hash_sign + url.fragment).decode("utf-8")
urlcanon.canon.remove_fragment(url)
page = brozzler.Page(frontier.rr, {
"url": str(url), "site_id": site.get("id"),
"job_id": site.get("job_id"), "hops_from_seed": 0,
"priority": 1000, "needs_robots_check": True})
if hashtag:
page.hashtags = [hashtag,]
page.save()
logging.info("queued page %s", page)
finally:
# finally block because we want to insert the Site no matter what
site.save()
class ElapsedMixIn(object):
def elapsed(self):
'''Returns elapsed crawl time as a float in seconds.'''
dt = 0
for ss in self.starts_and_stops[:-1]:
dt += (ss['stop'] - ss['start']).total_seconds()
ss = self.starts_and_stops[-1]
if ss['stop']:
dt += (ss['stop'] - ss['start']).total_seconds()
else: # crawl is active
dt += (doublethink.utcnow() - ss['start']).total_seconds()
return dt
class Job(doublethink.Document, ElapsedMixIn):
logger = logging.getLogger(__module__ + "." + __qualname__)
table = "jobs"
def populate_defaults(self):
if not "status" in self:
self.status = "ACTIVE"
if not "starts_and_stops" in self:
if self.get("started"): # backward compatibility
self.starts_and_stops = [{
"start": self.get("started"),
"stop": self.get("finished")}]
del self["started"]
if "finished" in self:
del self["finished"]
else:
self.starts_and_stops = [
{"start":doublethink.utcnow(),"stop":None}]
def finish(self):
if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]:
self.logger.error(
"job is already finished status=%s "
"starts_and_stops[-1]['stop']=%s", self.status,
self.starts_and_stops[-1]["stop"])
self.status = "FINISHED"
self.starts_and_stops[-1]["stop"] = doublethink.utcnow()
class Site(doublethink.Document, ElapsedMixIn):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
table = 'sites' table = 'sites'
@ -59,18 +191,6 @@ class Site(doublethink.Document):
def __str__(self): def __str__(self):
return 'Site({"id":"%s","seed":"%s",...})' % (self.id, self.seed) return 'Site({"id":"%s","seed":"%s",...})' % (self.id, self.seed)
def elapsed(self):
'''Returns elapsed crawl time as a float in seconds.'''
dt = 0
for ss in self.starts_and_stops[:-1]:
dt += (ss['stop'] - ss['start']).total_seconds()
ss = self.starts_and_stops[-1]
if ss['stop']:
dt += (ss['stop'] - ss['start']).total_seconds()
else: # crawl is active
dt += (doublethink.utcnow() - ss['start']).total_seconds()
return dt
def note_seed_redirect(self, url): def note_seed_redirect(self, url):
new_scope_surt = brozzler.site_surt_canon(url).surt().decode("ascii") new_scope_surt = brozzler.site_surt_canon(url).surt().decode("ascii")
if not new_scope_surt.startswith(self.scope["surt"]): if not new_scope_surt.startswith(self.scope["surt"]):

View File

@ -32,7 +32,7 @@ def find_package_data(package):
setuptools.setup( setuptools.setup(
name='brozzler', name='brozzler',
version='1.1b10.dev224', version='1.1b10.dev225',
description='Distributed web crawling with browsers', description='Distributed web crawling with browsers',
url='https://github.com/internetarchive/brozzler', url='https://github.com/internetarchive/brozzler',
author='Noah Levitt', author='Noah Levitt',