diff --git a/bin/brozzler-add-site b/bin/brozzler-add-site new file mode 100644 index 0000000..641441d --- /dev/null +++ b/bin/brozzler-add-site @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# vim: set sw=4 et: + +import argparse +import os +import sys +import logging +import umbra +import kombu + +arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), + description="brozzler-add-site - register site to crawl with brozzler-hq", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument('urls', metavar='URL', nargs='+', help='seed URL(s) of sites to crawl') +arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the amqp server to talk to') +arg_parser.add_argument("-v", "--verbose", dest="log_level", + action="store_const", default=logging.INFO, const=logging.DEBUG) +arg_parser.add_argument("--version", action="version", + version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) +args = arg_parser.parse_args(args=sys.argv[1:]) + +logging.basicConfig(stream=sys.stdout, level=args.log_level, + format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") + +with kombu.Connection(args.amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.new") + for url in args.urls: + q.put({"seed":url}) + + diff --git a/bin/brozzler-hq b/bin/brozzler-hq new file mode 100644 index 0000000..cb88058 --- /dev/null +++ b/bin/brozzler-hq @@ -0,0 +1,113 @@ +#!/usr/bin/env python +# vim: set sw=4 et: + +import argparse +import os +import sys +import logging +import umbra +import surt +import sqlite3 +import time +import kombu +import kombu.simple +import json + +arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), + description="brozzler-hq - headquarters of distributed brozzler crawl", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument("-d", "--db", dest="db_file", default="./brozzler-hq-0.db", + help="sqlite3 database filename; if the file does not exist, it will be created") +arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the amqp server to talk to') +arg_parser.add_argument("-v", "--verbose", dest="log_level", + action="store_const", default=logging.INFO, const=logging.DEBUG) +arg_parser.add_argument("--version", action="version", + version="umbra {} - {}".format(umbra.version, os.path.basename(__file__))) +args = arg_parser.parse_args(args=sys.argv[1:]) + +logging.basicConfig(stream=sys.stdout, level=args.log_level, + format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") + +class BrozzlerHQDb: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, db_file="./brozzler-hq-0.db"): + self.db_file = db_file + self.db_conn = sqlite3.connect(db_file) + + self._create_tables() + + def _create_tables(self): + cursor = self.db_conn.cursor() + cursor.executescript(""" + create table if not exists brozzler_sites ( + id integer primary key, + site_json text + ); + + create table if not exists brozzler_urls ( + id integer primary key, + site_id integer, + priority integer, + in_progress boolean, + canon_url varchar(4000), + crawl_url_json text + ); + create index if not exists brozzler_urls_priority on brozzler_urls (priority); + create index if not exists brozzler_urls_site_id on brozzler_urls (site_id); + """) + self.db_conn.commit() + + def new_site(self, site_dict): + site_json = json.dumps(site_dict, separators=(',', ':')) + cursor = self.db_conn.cursor() + cursor.execute("insert into brozzler_sites (site_json) values (?)", (site_json,)) + self.db_conn.commit() + return cursor.lastrowid + +class BrozzlerHQ: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, amqp_url="amqp://guest:guest@localhost:5672/%2f", db=None): + self.amqp_url = amqp_url + self._conn = kombu.Connection(amqp_url) + self._new_sites_q = self._conn.SimpleQueue("brozzler.sites.new") + self._unclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.unclaimed") + if db != None: + self._db = db + else: + self._db = BrozzlerHQDb() + + def run(self): + try: + while True: + self._new_site() + # self._consume_completed_url() + # self._feed_crawl_urls() + time.sleep(0.5) + finally: + self._conn.close() + + def _new_site(self): + try: + msg = self._new_sites_q.get(block=False) + new_site_dict = msg.payload + site_id = self._db.new_site(new_site_dict) + new_site_dict["id"] = site_id + self._schedule_seed(new_site_dict["seed"]) + self._unclaimed_sites_q.put(new_site_dict) + msg.ack() + self.logger.info("new site {}".format(new_site_dict)) + except kombu.simple.Empty: + pass + + def _schedule_seed(self, seed_url): + pass + # self._db.schedule_url( + +logging.info("brozzler-hq starting") +db = BrozzlerHQDb(db_file=args.db_file) +hq = BrozzlerHQ(amqp_url=args.amqp_url, db=db) + +hq.run() diff --git a/bin/crawl-url b/bin/crawl-url index f2a2dfd..159c72c 100755 --- a/bin/crawl-url +++ b/bin/crawl-url @@ -13,7 +13,7 @@ import surt import signal arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), - description='browse-url - open urls in chrome/chromium and run behaviors', + description='crawl-url - browse urls, follow links', formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse') arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', @@ -21,7 +21,7 @@ arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60 arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', help='executable to use to invoke chrome') arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', - help='Max number of chrome instances simultaneously browsing pages') + help='max number of chrome instances simultaneously browsing pages') arg_parser.add_argument('-v', '--verbose', dest='log_level', action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument('--version', action='version', diff --git a/hq-notes.txt b/hq-notes.txt new file mode 100644 index 0000000..4527d87 --- /dev/null +++ b/hq-notes.txt @@ -0,0 +1,53 @@ +possible architecture of brozzler-hq +==================================== + +keeps queues in rdbms +because easy to update, index on priority, index on canonicalized url +also easy to inspect +initially sqlite + +-- sqlite3 syntax +create table brozzler_sites ( + id integer primary key, + -- claimed boolean, + site_json text, + -- data_limit integer, -- bytes + -- time_limit integer, -- seconds + -- page_limit integer, +); + +create table brozzler_urls ( + id integer primary key, + site_id integer, + priority integer, + in_progress boolean, + canon_url varchar(4000), + crawl_url_json text, + index(priority), + index(canon_url), + index(site_id) +); + +feeds rabbitmq: + - json payloads + - queue per site brozzler.{site_id}.crawl_urls + - queue of unclaimed sites brozzler.sites.unclaimed + +reads from rabbitmq + - queue of new sites brozzler.sites.new + - queue per site brozzler.{site_id}.completed_urls + * json blob fed to this queue includes urls extracted to schedule + +??? brozzler-hq considers site unclaimed if brozzler.{site_id}.crawl_urls has +not been read in some amount of time ??? or do workers need to explicitly +disclaim ??? + +brozzler-worker + - decides if it can run a new browser + - if so reads site from brozzler.sites.unclaimed + - site includes scope definition, crawl job info, ... + - starts browser + - reads urls from brozzler.{site-id}.crawl_urls + - after each(?) (every n?) urls, feeds brozzler.{site_id}.completed_urls + +