mirror of
				https://github.com/internetarchive/brozzler.git
				synced 2025-10-30 22:08:54 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			134 lines
		
	
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			134 lines
		
	
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| '''
 | |
| brozzler/job.py - Job class representing a brozzler crawl job, and functions
 | |
| for setting up a job with supplied configuration
 | |
| 
 | |
| Copyright (C) 2014-2016 Internet Archive
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| '''
 | |
| 
 | |
| import logging
 | |
| import brozzler
 | |
| import yaml
 | |
| import json
 | |
| import datetime
 | |
| import uuid
 | |
| import rethinkstuff
 | |
| import os
 | |
| import cerberus
 | |
| import urllib
 | |
| 
 | |
| def load_schema():
 | |
|     schema_file = os.path.join(os.path.dirname(__file__), 'job_schema.yaml')
 | |
|     with open(schema_file) as f:
 | |
|         return yaml.load(f)
 | |
| 
 | |
| class JobValidator(cerberus.Validator):
 | |
|     def _validate_type_url(self, value):
 | |
|         url = urllib.parse.urlparse(value)
 | |
|         return url.scheme in ('http', 'https', 'ftp')
 | |
| 
 | |
| class InvalidJobConf(Exception):
 | |
|     def __init__(self, errors):
 | |
|         self.errors = errors
 | |
| 
 | |
| def validate_conf(job_conf, schema=load_schema()):
 | |
|     v = JobValidator(schema)
 | |
|     if not v.validate(job_conf):
 | |
|         raise InvalidJobConf(v.errors)
 | |
| 
 | |
| def merge(a, b):
 | |
|     if isinstance(a, dict) and isinstance(b, dict):
 | |
|         merged = dict(a)
 | |
|         b_tmp = dict(b)
 | |
|         for k in a:
 | |
|             merged[k] = merge(a[k], b_tmp.pop(k, None))
 | |
|         merged.update(b_tmp)
 | |
|         return merged
 | |
|     elif isinstance(a, list) and isinstance(b, list):
 | |
|         return a + b
 | |
|     else:
 | |
|         return a
 | |
| 
 | |
| def new_job_file(frontier, job_conf_file):
 | |
|     logging.info("loading %s", job_conf_file)
 | |
|     with open(job_conf_file) as f:
 | |
|         job_conf = yaml.load(f)
 | |
|         new_job(frontier, job_conf)
 | |
| 
 | |
| def new_job(frontier, job_conf):
 | |
|     validate_conf(job_conf)
 | |
|     job = Job(
 | |
|             id=job_conf.get("id"), conf=job_conf, status="ACTIVE",
 | |
|             started=rethinkstuff.utcnow())
 | |
| 
 | |
|     sites = []
 | |
|     for seed_conf in job_conf["seeds"]:
 | |
|         merged_conf = merge(seed_conf, job_conf)
 | |
|         site = brozzler.Site(
 | |
|                 job_id=job.id, seed=merged_conf["url"],
 | |
|                 scope=merged_conf.get("scope"),
 | |
|                 time_limit=merged_conf.get("time_limit"),
 | |
|                 proxy=merged_conf.get("proxy"),
 | |
|                 ignore_robots=merged_conf.get("ignore_robots"),
 | |
|                 enable_warcprox_features=merged_conf.get(
 | |
|                     "enable_warcprox_features"),
 | |
|                 warcprox_meta=merged_conf.get("warcprox_meta"),
 | |
|                 metadata=merged_conf.get("metadata"),
 | |
|                 remember_outlinks=merged_conf.get("remember_outlinks"),
 | |
|                 user_agent=merged_conf.get("user_agent"),
 | |
|                 behavior_parameters=merged_conf.get("behavior_parameters"),
 | |
|                 username=merged_conf.get("username"),
 | |
|                 password=merged_conf.get("password"))
 | |
|         sites.append(site)
 | |
| 
 | |
|     # insert all the sites into database before the job
 | |
|     for site in sites:
 | |
|         new_site(frontier, site)
 | |
| 
 | |
|     frontier.new_job(job)
 | |
| 
 | |
| def new_site(frontier, site):
 | |
|     site.id = str(uuid.uuid4())
 | |
|     logging.info("new site {}".format(site))
 | |
|     try:
 | |
|         # insert the Page into the database before the Site, to avoid situation
 | |
|         # where a brozzler worker immediately claims the site, finds no pages
 | |
|         # to crawl, and decides the site is finished
 | |
|         try:
 | |
|             page = brozzler.Page(
 | |
|                     site.seed, site_id=site.id, job_id=site.job_id,
 | |
|                     hops_from_seed=0, priority=1000, needs_robots_check=True)
 | |
|             frontier.new_page(page)
 | |
|             logging.info("queued page %s", page)
 | |
|         finally:
 | |
|             # finally block because we want to insert the Site no matter what
 | |
|             frontier.new_site(site)
 | |
|     except brozzler.ReachedLimit as e:
 | |
|         frontier.reached_limit(site, e)
 | |
| 
 | |
| class Job(brozzler.BaseDictable):
 | |
|     logger = logging.getLogger(__module__ + "." + __qualname__)
 | |
| 
 | |
|     def __init__(self, id=None, conf=None, status="ACTIVE", started=None,
 | |
|                  finished=None, stop_requested=None):
 | |
|         self.id = id
 | |
|         self.conf = conf
 | |
|         self.status = status
 | |
|         self.started = started
 | |
|         self.finished = finished
 | |
|         self.stop_requested = stop_requested
 | |
| 
 | |
|     def __str__(self):
 | |
|         return 'Job(id=%s)' % self.id
 | |
| 
 | 
