#!/usr/bin/env python # vim: set sw=4 et: import argparse import os import sys import logging import umbra import surt import sqlite3 import time import kombu import kombu.simple import json import umbra.hq import signal arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description="brozzler-hq - headquarters of distributed brozzler crawl", formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument("-d", "--db", dest="db_file", default="./brozzler-hq-0.db", help="sqlite3 database filename; if the file does not exist, it will be created") arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', help='URL identifying the amqp server to talk to') arg_parser.add_argument("-v", "--verbose", dest="log_level", action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument("--version", action="version", version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") class BrozzlerHQDb: logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, db_file="./brozzler-hq-0.db"): self._conn = sqlite3.connect(db_file) self._create_tables() def _create_tables(self): cursor = self._conn.cursor() cursor.executescript(""" create table if not exists brozzler_sites ( id integer primary key, site_json text ); create table if not exists brozzler_urls ( id integer primary key, site_id integer, priority integer, in_progress boolean, canon_url varchar(4000), crawl_url_json text ); create index if not exists brozzler_urls_priority on brozzler_urls (priority desc); create index if not exists brozzler_urls_site_id on brozzler_urls (site_id); """) self._conn.commit() def pop_url(self, site_id): cursor = self._conn.cursor() cursor.execute("select id, priority, crawl_url_json from brozzler_urls where site_id = ? and not in_progress order by priority desc limit 1", (site_id,)) row = cursor.fetchone() if row: (id, priority, crawl_url_json) = row new_priority = priority - 2000 cursor.execute("update brozzler_urls set priority=?, in_progress=1 where id=?", (new_priority, id)) self._conn.commit() d = json.loads(crawl_url_json) d["id"] = id return d else: return None def completed(self, crawl_url): cursor = self._conn.cursor() cursor.execute("update brozzler_urls set in_progress=0 where id=?", (crawl_url.id,)) self._conn.commit() def new_site(self, site): cursor = self._conn.cursor() cursor.execute("insert into brozzler_sites (site_json) values (?)", (site.to_json(),)) self._conn.commit() return cursor.lastrowid def schedule_url(self, crawl_url, priority=0): cursor = self._conn.cursor() cursor.execute("insert into brozzler_urls (site_id, priority, canon_url, crawl_url_json, in_progress) values (?, ?, ?, ?, 0)", (crawl_url.site_id, priority, crawl_url.canonical(), crawl_url.to_json())) self._conn.commit() def sites(self): cursor = self._conn.cursor() cursor.execute("select id, site_json from brozzler_sites") while True: row = cursor.fetchone() if row is None: break site_dict = json.loads(row[1]) site_dict["id"] = row[0] yield umbra.hq.Site(**site_dict) def update_crawl_url(self, crawl_url): cursor = self._conn.cursor() # CREATE TABLE brozzler_urls ( id integer primary key, site_id integer, priority integer, in_progress boolean, canon_url varchar(4000), crawl_url_json text cursor.execute("select id, priority, crawl_url_json from brozzler_urls where site_id=? and canon_url=?", (crawl_url.site_id, crawl_url.canonical())) row = cursor.fetchone() if row: # (id, priority, existing_crawl_url) = row new_priority = crawl_url.calc_priority() + row[1] existing_crawl_url = umbra.CrawlUrl(**json.loads(row[2])) existing_crawl_url.hops_from_seed = min(crawl_url.hops_from_seed, existing_crawl_url.hops_from_seed) cursor.execute("update brozzler_urls set priority=?, crawl_url_json=? where id=?", (new_priority, existing_crawl_url.to_json(), row[0])) self._conn.commit() else: raise KeyError("crawl url not in brozzler_urls site_id={} url={}".format(crawl_url.site_id, crawl_url.canonical())) class BrozzlerHQ: logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, amqp_url="amqp://guest:guest@localhost:5672/%2f", db=None): self.amqp_url = amqp_url self._conn = kombu.Connection(amqp_url) self._new_sites_q = self._conn.SimpleQueue("brozzler.sites.new") self._unclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.unclaimed") if db != None: self._db = db else: self._db = BrozzlerHQDb() def run(self): try: while True: self._new_site() self._consume_completed_url() self._feed_crawl_urls() time.sleep(0.5) finally: self._conn.close() def _new_site(self): try: msg = self._new_sites_q.get(block=False) new_site = umbra.hq.Site(**msg.payload) msg.ack() self.logger.info("new site {}".format(new_site)) site_id = self._db.new_site(new_site) new_site.id = site_id if new_site.is_permitted_by_robots(new_site.seed): crawl_url = umbra.CrawlUrl(new_site.seed, site_id=new_site.id, hops_from_seed=0) self._db.schedule_url(crawl_url, priority=1000) self._unclaimed_sites_q.put(new_site.to_dict()) else: self.logger.warn("seed url {} is blocked by robots.txt".format(new_site.seed)) except kombu.simple.Empty: pass def _feed_crawl_urls(self): for site in self._db.sites(): q = self._conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id)) if len(q) == 0: url = self._db.pop_url(site.id) if url: self.logger.info("feeding {} to {}".format(url, q.queue.name)) q.put(url) def _scope_and_schedule_outlinks(self, site, parent_url): counts = {"added":0,"updated":0,"rejected":0,"blocked":0} if parent_url.outlinks: for url in parent_url.outlinks: if site.is_in_scope(url): if site.is_permitted_by_robots(url): crawl_url = umbra.CrawlUrl(url, site_id=site.id, hops_from_seed=parent_url.hops_from_seed+1) try: self._db.update_crawl_url(crawl_url) counts["updated"] += 1 except KeyError: self._db.schedule_url(crawl_url, priority=crawl_url.calc_priority()) counts["added"] += 1 else: counts["blocked"] += 1 else: counts["rejected"] += 1 self.logger.info("{} new links added, {} existing links updated, {} links rejected, {} links blocked by robots from {}".format( counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_url)) def _consume_completed_url(self): for site in self._db.sites(): q = self._conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id)) try: msg = q.get(block=False) completed_url = umbra.CrawlUrl(**msg.payload) msg.ack() self._db.completed(completed_url) self._scope_and_schedule_outlinks(site, completed_url) except kombu.simple.Empty: pass logging.info("brozzler-hq starting") class ShutdownRequested(Exception): pass def sigterm(signum, frame): raise ShutdownRequested("shutdown requested (caught SIGTERM)") def sigint(signum, frame): raise ShutdownRequested("shutdown requested (caught SIGINT)") signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) db = BrozzlerHQDb(db_file=args.db_file) hq = BrozzlerHQ(amqp_url=args.amqp_url, db=db) try: hq.run() except ShutdownRequested as e: logging.info("{}".format(e))