From 125d77b8c498876f1301dc660a33cdfa1a4b73d1 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Wed, 29 Mar 2017 18:49:04 -0700 Subject: [PATCH] consolidate job.py and site.py into model.py, and let Job and Site share the elapsed() method by way of a mixin --- brozzler/__init__.py | 6 +- brozzler/cli.py | 4 +- brozzler/job.py | 144 ----------------------------- brozzler/{site.py => model.py} | 160 ++++++++++++++++++++++++++++----- setup.py | 2 +- 5 files changed, 146 insertions(+), 170 deletions(-) delete mode 100644 brozzler/job.py rename brozzler/{site.py => model.py} (61%) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index cde8cad..4f747ce 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -158,14 +158,14 @@ import datetime EPOCH_UTC = datetime.datetime.utcfromtimestamp(0.0).replace( tzinfo=doublethink.UTC) -from brozzler.site import Page, Site from brozzler.worker import BrozzlerWorker from brozzler.robots import is_permitted_by_robots from brozzler.frontier import RethinkDbFrontier from brozzler.browser import Browser, BrowserPool, BrowsingException -from brozzler.job import new_job, new_site, Job +from brozzler.model import ( + new_job, new_job_file, new_site, Job, Page, Site, InvalidJobConf) from brozzler.cli import suggest_default_chrome_exe __all__ = ['Page', 'Site', 'BrozzlerWorker', 'is_permitted_by_robots', 'RethinkDbFrontier', 'Browser', 'BrowserPool', 'BrowsingException', - 'new_job', 'new_site', 'Job'] + 'new_job', 'new_site', 'Job', 'new_job_file', 'InvalidJobConf'] diff --git a/brozzler/cli.py b/brozzler/cli.py index b0d5afa..f5f9dc1 100644 --- a/brozzler/cli.py +++ b/brozzler/cli.py @@ -209,8 +209,8 @@ def brozzler_new_job(): rr = rethinker(args) frontier = brozzler.RethinkDbFrontier(rr) try: - brozzler.job.new_job_file(frontier, args.job_conf_file) - except brozzler.job.InvalidJobConf as e: + brozzler.new_job_file(frontier, args.job_conf_file) + except brozzler.InvalidJobConf as e: print('brozzler-new-job: invalid job file:', args.job_conf_file, file=sys.stderr) print(' ' + yaml.dump(e.errors).rstrip().replace('\n', '\n '), file=sys.stderr) sys.exit(1) diff --git a/brozzler/job.py b/brozzler/job.py deleted file mode 100644 index ac001f1..0000000 --- a/brozzler/job.py +++ /dev/null @@ -1,144 +0,0 @@ -''' -brozzler/job.py - Job class representing a brozzler crawl job, and functions -for setting up a job with supplied configuration - -Copyright (C) 2014-2017 Internet Archive - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -import logging -import brozzler -import yaml -import json -import datetime -import uuid -import doublethink -import os -import cerberus -import urllib -import urlcanon - -def load_schema(): - schema_file = os.path.join(os.path.dirname(__file__), 'job_schema.yaml') - with open(schema_file) as f: - return yaml.load(f) - -class JobValidator(cerberus.Validator): - def _validate_type_url(self, value): - url = urllib.parse.urlparse(value) - return url.scheme in ('http', 'https', 'ftp') - -class InvalidJobConf(Exception): - def __init__(self, errors): - self.errors = errors - -def validate_conf(job_conf, schema=load_schema()): - v = JobValidator(schema) - if not v.validate(job_conf): - raise InvalidJobConf(v.errors) - -def merge(a, b): - if isinstance(a, dict) and isinstance(b, dict): - merged = dict(a) - b_tmp = dict(b) - for k in a: - merged[k] = merge(a[k], b_tmp.pop(k, None)) - merged.update(b_tmp) - return merged - elif isinstance(a, list) and isinstance(b, list): - return a + b - else: - return a - -def new_job_file(frontier, job_conf_file): - '''Returns new Job.''' - logging.info("loading %s", job_conf_file) - with open(job_conf_file) as f: - job_conf = yaml.load(f) - return new_job(frontier, job_conf) - -def new_job(frontier, job_conf): - '''Returns new Job.''' - validate_conf(job_conf) - job = Job(frontier.rr, { - "conf": job_conf, "status": "ACTIVE", - "started": doublethink.utcnow()}) - if "id" in job_conf: - job.id = job_conf["id"] - job.save() - - sites = [] - for seed_conf in job_conf["seeds"]: - merged_conf = merge(seed_conf, job_conf) - merged_conf.pop("seeds") - merged_conf["job_id"] = job.id - merged_conf["seed"] = merged_conf.pop("url") - site = brozzler.Site(frontier.rr, merged_conf) - sites.append(site) - - for site in sites: - new_site(frontier, site) - - return job - -def new_site(frontier, site): - site.id = str(uuid.uuid4()) - logging.info("new site {}".format(site)) - # insert the Page into the database before the Site, to avoid situation - # where a brozzler worker immediately claims the site, finds no pages - # to crawl, and decides the site is finished - try: - url = urlcanon.parse_url(site.seed) - hashtag = (url.hash_sign + url.fragment).decode("utf-8") - urlcanon.canon.remove_fragment(url) - page = brozzler.Page(frontier.rr, { - "url": str(url), "site_id": site.get("id"), - "job_id": site.get("job_id"), "hops_from_seed": 0, - "priority": 1000, "needs_robots_check": True}) - if hashtag: - page.hashtags = [hashtag,] - page.save() - logging.info("queued page %s", page) - finally: - # finally block because we want to insert the Site no matter what - site.save() - -class Job(doublethink.Document): - logger = logging.getLogger(__module__ + "." + __qualname__) - table = "jobs" - - def populate_defaults(self): - if not "status" in self: - self.status = "ACTIVE" - if not "starts_and_stops" in self: - if self.get("started"): # backward compatibility - self.starts_and_stops = [{ - "start": self.get("started"), - "stop": self.get("finished")}] - del self["started"] - if "finished" in self: - del self["finished"] - else: - self.starts_and_stops = [ - {"start":doublethink.utcnow(),"stop":None}] - - def finish(self): - if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]: - self.logger.error( - "job is already finished status=%s " - "starts_and_stops[-1]['stop']=%s", self.status, - self.starts_and_stops[-1]["stop"]) - self.status = "FINISHED" - self.starts_and_stops[-1]["stop"] = doublethink.utcnow() - diff --git a/brozzler/site.py b/brozzler/model.py similarity index 61% rename from brozzler/site.py rename to brozzler/model.py index 7580e17..67852b4 100644 --- a/brozzler/site.py +++ b/brozzler/model.py @@ -1,5 +1,6 @@ ''' -brozzler/site.py - classes representing sites and pages +brozzler/models.py - model classes representing jobs, sites, and pages, with +related logic Copyright (C) 2014-2017 Internet Archive @@ -16,17 +17,148 @@ See the License for the specific language governing permissions and limitations under the License. ''' -import urlcanon +import brozzler +import cerberus +import datetime +import doublethink +import hashlib import json import logging -import brozzler -import hashlib -import time -import doublethink -import datetime +import os import re +import time +import urlcanon +import urllib +import uuid +import yaml -class Site(doublethink.Document): +def load_schema(): + schema_file = os.path.join(os.path.dirname(__file__), 'job_schema.yaml') + with open(schema_file) as f: + return yaml.load(f) + +class JobValidator(cerberus.Validator): + def _validate_type_url(self, value): + url = urllib.parse.urlparse(value) + return url.scheme in ('http', 'https', 'ftp') + +class InvalidJobConf(Exception): + def __init__(self, errors): + self.errors = errors + +def validate_conf(job_conf, schema=load_schema()): + v = JobValidator(schema) + if not v.validate(job_conf): + raise InvalidJobConf(v.errors) + +def merge(a, b): + if isinstance(a, dict) and isinstance(b, dict): + merged = dict(a) + b_tmp = dict(b) + for k in a: + merged[k] = merge(a[k], b_tmp.pop(k, None)) + merged.update(b_tmp) + return merged + elif isinstance(a, list) and isinstance(b, list): + return a + b + else: + return a + +def new_job_file(frontier, job_conf_file): + '''Returns new Job.''' + logging.info("loading %s", job_conf_file) + with open(job_conf_file) as f: + job_conf = yaml.load(f) + return new_job(frontier, job_conf) + +def new_job(frontier, job_conf): + '''Returns new Job.''' + validate_conf(job_conf) + job = Job(frontier.rr, { + "conf": job_conf, "status": "ACTIVE", + "started": doublethink.utcnow()}) + if "id" in job_conf: + job.id = job_conf["id"] + job.save() + + sites = [] + for seed_conf in job_conf["seeds"]: + merged_conf = merge(seed_conf, job_conf) + merged_conf.pop("seeds") + merged_conf["job_id"] = job.id + merged_conf["seed"] = merged_conf.pop("url") + site = brozzler.Site(frontier.rr, merged_conf) + sites.append(site) + + for site in sites: + new_site(frontier, site) + + return job + +def new_site(frontier, site): + site.id = str(uuid.uuid4()) + logging.info("new site {}".format(site)) + # insert the Page into the database before the Site, to avoid situation + # where a brozzler worker immediately claims the site, finds no pages + # to crawl, and decides the site is finished + try: + url = urlcanon.parse_url(site.seed) + hashtag = (url.hash_sign + url.fragment).decode("utf-8") + urlcanon.canon.remove_fragment(url) + page = brozzler.Page(frontier.rr, { + "url": str(url), "site_id": site.get("id"), + "job_id": site.get("job_id"), "hops_from_seed": 0, + "priority": 1000, "needs_robots_check": True}) + if hashtag: + page.hashtags = [hashtag,] + page.save() + logging.info("queued page %s", page) + finally: + # finally block because we want to insert the Site no matter what + site.save() + +class ElapsedMixIn(object): + def elapsed(self): + '''Returns elapsed crawl time as a float in seconds.''' + dt = 0 + for ss in self.starts_and_stops[:-1]: + dt += (ss['stop'] - ss['start']).total_seconds() + ss = self.starts_and_stops[-1] + if ss['stop']: + dt += (ss['stop'] - ss['start']).total_seconds() + else: # crawl is active + dt += (doublethink.utcnow() - ss['start']).total_seconds() + return dt + +class Job(doublethink.Document, ElapsedMixIn): + logger = logging.getLogger(__module__ + "." + __qualname__) + table = "jobs" + + def populate_defaults(self): + if not "status" in self: + self.status = "ACTIVE" + if not "starts_and_stops" in self: + if self.get("started"): # backward compatibility + self.starts_and_stops = [{ + "start": self.get("started"), + "stop": self.get("finished")}] + del self["started"] + if "finished" in self: + del self["finished"] + else: + self.starts_and_stops = [ + {"start":doublethink.utcnow(),"stop":None}] + + def finish(self): + if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]: + self.logger.error( + "job is already finished status=%s " + "starts_and_stops[-1]['stop']=%s", self.status, + self.starts_and_stops[-1]["stop"]) + self.status = "FINISHED" + self.starts_and_stops[-1]["stop"] = doublethink.utcnow() + +class Site(doublethink.Document, ElapsedMixIn): logger = logging.getLogger(__module__ + "." + __qualname__) table = 'sites' @@ -59,18 +191,6 @@ class Site(doublethink.Document): def __str__(self): return 'Site({"id":"%s","seed":"%s",...})' % (self.id, self.seed) - def elapsed(self): - '''Returns elapsed crawl time as a float in seconds.''' - dt = 0 - for ss in self.starts_and_stops[:-1]: - dt += (ss['stop'] - ss['start']).total_seconds() - ss = self.starts_and_stops[-1] - if ss['stop']: - dt += (ss['stop'] - ss['start']).total_seconds() - else: # crawl is active - dt += (doublethink.utcnow() - ss['start']).total_seconds() - return dt - def note_seed_redirect(self, url): new_scope_surt = brozzler.site_surt_canon(url).surt().decode("ascii") if not new_scope_surt.startswith(self.scope["surt"]): diff --git a/setup.py b/setup.py index d4d70b6..f8cb71b 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def find_package_data(package): setuptools.setup( name='brozzler', - version='1.1b10.dev224', + version='1.1b10.dev225', description='Distributed web crawling with browsers', url='https://github.com/internetarchive/brozzler', author='Noah Levitt',