brozzler/bin/brozzler-hq
2015-07-10 21:07:47 -07:00

155 lines
5.6 KiB
Python

#!/usr/bin/env python
# vim: set sw=4 et:
import argparse
import os
import sys
import logging
import umbra
import surt
import sqlite3
import time
import kombu
import kombu.simple
import json
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-hq - headquarters of distributed brozzler crawl",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument("-d", "--db", dest="db_file", default="./brozzler-hq-0.db",
help="sqlite3 database filename; if the file does not exist, it will be created")
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to')
arg_parser.add_argument("-v", "--verbose", dest="log_level",
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument("--version", action="version",
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
class BrozzlerHQDb:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, db_file="./brozzler-hq-0.db"):
self._conn = sqlite3.connect(db_file)
self._create_tables()
def _create_tables(self):
cursor = self._conn.cursor()
cursor.executescript("""
create table if not exists brozzler_sites (
id integer primary key,
site_json text
);
create table if not exists brozzler_urls (
id integer primary key,
site_id integer,
priority integer,
in_progress boolean,
canon_url varchar(4000),
crawl_url_json text
);
create index if not exists brozzler_urls_priority on brozzler_urls (priority desc);
create index if not exists brozzler_urls_site_id on brozzler_urls (site_id);
""")
self._conn.commit()
def pop_url(self, site_id):
cursor = self._conn.cursor()
cursor.execute("select id, priority, crawl_url_json from brozzler_urls where site_id = ? and not in_progress order by priority desc limit 1", (site_id,))
row = cursor.fetchone()
if row:
(id, priority, crawl_url_json) = row
new_priority = priority - 2000
cursor.execute("update brozzler_urls set priority=?, in_progress=1 where id=?", (new_priority, id))
self._conn.commit()
d = json.loads(crawl_url_json)
d["id"] = id
return d
else:
return None
def new_site(self, site_dict):
site_json = json.dumps(site_dict, separators=(',', ':'))
cursor = self._conn.cursor()
cursor.execute("insert into brozzler_sites (site_json) values (?)", (site_json,))
self._conn.commit()
return cursor.lastrowid
def schedule_url(self, crawl_url, priority=0):
cursor = self._conn.cursor()
cursor.execute("insert into brozzler_urls (site_id, priority, canon_url, crawl_url_json, in_progress) values (?, ?, ?, ?, 0)",
(crawl_url.site_id, priority, crawl_url.canonical(), crawl_url.to_json()))
self._conn.commit()
def sites(self):
cursor = self._conn.cursor()
cursor.execute("select id, site_json from brozzler_sites")
while True:
row = cursor.fetchone()
if row is None:
break
site_dict = json.loads(row[1])
site_dict["id"] = row[0]
yield site_dict
class BrozzlerHQ:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, amqp_url="amqp://guest:guest@localhost:5672/%2f", db=None):
self.amqp_url = amqp_url
self._conn = kombu.Connection(amqp_url)
self._new_sites_q = self._conn.SimpleQueue("brozzler.sites.new")
self._unclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.unclaimed")
if db != None:
self._db = db
else:
self._db = BrozzlerHQDb()
def run(self):
try:
while True:
self._new_site()
# self._consume_completed_url()
self._feed_crawl_urls()
time.sleep(0.5)
finally:
self._conn.close()
def _new_site(self):
try:
msg = self._new_sites_q.get(block=False)
new_site_dict = msg.payload
site_id = self._db.new_site(new_site_dict)
new_site_dict["id"] = site_id
self._schedule_seed(site_id, new_site_dict["seed"])
self._unclaimed_sites_q.put(new_site_dict)
msg.ack()
self.logger.info("new site {}".format(new_site_dict))
except kombu.simple.Empty:
pass
def _schedule_seed(self, site_id, seed_url):
crawl_url = umbra.CrawlUrl(seed_url, site_id=site_id, hops_from_seed=0)
self._db.schedule_url(crawl_url, priority=1000)
def _feed_crawl_urls(self):
for site in self._db.sites():
q = self._conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site["id"]))
if len(q) == 0:
url = self._db.pop_url(site["id"])
if url:
self.logger.info("feeding {} to {}".format(url, q.queue.name))
q.put(url)
logging.info("brozzler-hq starting")
db = BrozzlerHQDb(db_file=args.db_file)
hq = BrozzlerHQ(amqp_url=args.amqp_url, db=db)
hq.run()