starting work on brozzler crawl hq

This commit is contained in:
Noah Levitt 2015-07-10 18:01:54 -07:00
parent fcc63b6675
commit 1d068f4f86
4 changed files with 199 additions and 2 deletions

31
bin/brozzler-add-site Normal file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import argparse
import os
import sys
import logging
import umbra
import kombu
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-add-site - register site to crawl with brozzler-hq",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='seed URL(s) of sites to crawl')
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to')
arg_parser.add_argument("-v", "--verbose", dest="log_level",
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument("--version", action="version",
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
with kombu.Connection(args.amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.new")
for url in args.urls:
q.put({"seed":url})

113
bin/brozzler-hq Normal file
View File

@ -0,0 +1,113 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import argparse
import os
import sys
import logging
import umbra
import surt
import sqlite3
import time
import kombu
import kombu.simple
import json
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-hq - headquarters of distributed brozzler crawl",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument("-d", "--db", dest="db_file", default="./brozzler-hq-0.db",
help="sqlite3 database filename; if the file does not exist, it will be created")
arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to')
arg_parser.add_argument("-v", "--verbose", dest="log_level",
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument("--version", action="version",
version="umbra {} - {}".format(umbra.version, os.path.basename(__file__)))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
class BrozzlerHQDb:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, db_file="./brozzler-hq-0.db"):
self.db_file = db_file
self.db_conn = sqlite3.connect(db_file)
self._create_tables()
def _create_tables(self):
cursor = self.db_conn.cursor()
cursor.executescript("""
create table if not exists brozzler_sites (
id integer primary key,
site_json text
);
create table if not exists brozzler_urls (
id integer primary key,
site_id integer,
priority integer,
in_progress boolean,
canon_url varchar(4000),
crawl_url_json text
);
create index if not exists brozzler_urls_priority on brozzler_urls (priority);
create index if not exists brozzler_urls_site_id on brozzler_urls (site_id);
""")
self.db_conn.commit()
def new_site(self, site_dict):
site_json = json.dumps(site_dict, separators=(',', ':'))
cursor = self.db_conn.cursor()
cursor.execute("insert into brozzler_sites (site_json) values (?)", (site_json,))
self.db_conn.commit()
return cursor.lastrowid
class BrozzlerHQ:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, amqp_url="amqp://guest:guest@localhost:5672/%2f", db=None):
self.amqp_url = amqp_url
self._conn = kombu.Connection(amqp_url)
self._new_sites_q = self._conn.SimpleQueue("brozzler.sites.new")
self._unclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.unclaimed")
if db != None:
self._db = db
else:
self._db = BrozzlerHQDb()
def run(self):
try:
while True:
self._new_site()
# self._consume_completed_url()
# self._feed_crawl_urls()
time.sleep(0.5)
finally:
self._conn.close()
def _new_site(self):
try:
msg = self._new_sites_q.get(block=False)
new_site_dict = msg.payload
site_id = self._db.new_site(new_site_dict)
new_site_dict["id"] = site_id
self._schedule_seed(new_site_dict["seed"])
self._unclaimed_sites_q.put(new_site_dict)
msg.ack()
self.logger.info("new site {}".format(new_site_dict))
except kombu.simple.Empty:
pass
def _schedule_seed(self, seed_url):
pass
# self._db.schedule_url(
logging.info("brozzler-hq starting")
db = BrozzlerHQDb(db_file=args.db_file)
hq = BrozzlerHQ(amqp_url=args.amqp_url, db=db)
hq.run()

View File

@ -13,7 +13,7 @@ import surt
import signal import signal
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description='browse-url - open urls in chrome/chromium and run behaviors', description='crawl-url - browse urls, follow links',
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse') arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URL(s) to browse')
arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60', arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60',
@ -21,7 +21,7 @@ arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60
arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser',
help='executable to use to invoke chrome') help='executable to use to invoke chrome')
arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1',
help='Max number of chrome instances simultaneously browsing pages') help='max number of chrome instances simultaneously browsing pages')
arg_parser.add_argument('-v', '--verbose', dest='log_level', arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG) action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('--version', action='version', arg_parser.add_argument('--version', action='version',

53
hq-notes.txt Normal file
View File

@ -0,0 +1,53 @@
possible architecture of brozzler-hq
====================================
keeps queues in rdbms
because easy to update, index on priority, index on canonicalized url
also easy to inspect
initially sqlite
-- sqlite3 syntax
create table brozzler_sites (
id integer primary key,
-- claimed boolean,
site_json text,
-- data_limit integer, -- bytes
-- time_limit integer, -- seconds
-- page_limit integer,
);
create table brozzler_urls (
id integer primary key,
site_id integer,
priority integer,
in_progress boolean,
canon_url varchar(4000),
crawl_url_json text,
index(priority),
index(canon_url),
index(site_id)
);
feeds rabbitmq:
- json payloads
- queue per site brozzler.{site_id}.crawl_urls
- queue of unclaimed sites brozzler.sites.unclaimed
reads from rabbitmq
- queue of new sites brozzler.sites.new
- queue per site brozzler.{site_id}.completed_urls
* json blob fed to this queue includes urls extracted to schedule
??? brozzler-hq considers site unclaimed if brozzler.{site_id}.crawl_urls has
not been read in some amount of time ??? or do workers need to explicitly
disclaim ???
brozzler-worker
- decides if it can run a new browser
- if so reads site from brozzler.sites.unclaimed
- site includes scope definition, crawl job info, ...
- starts browser
- reads urls from brozzler.{site-id}.crawl_urls
- after each(?) (every n?) urls, feeds brozzler.{site_id}.completed_urls