goodbye sqlite and rabbitmq, hello rethinkdb

This commit is contained in:
Noah Levitt 2015-08-18 21:44:54 +00:00
parent e6fbf0e2e9
commit a878730e02
14 changed files with 345 additions and 508 deletions

View File

@ -2,8 +2,8 @@ brozzler
======== ========
"browser" | "crawler" = "brozzler" "browser" | "crawler" = "brozzler"
Brozzler is a distributed web crawler that uses a real browser (chrome or Brozzler is a distributed web crawler (爬虫) that uses a real browser (chrome
chromium) to fetch pages and embedded urls and to extract links. It also or chromium) to fetch pages and embedded urls and to extract links. It also
uses [youtube-dl](https://github.com/rg3/youtube-dl) to enhance media capture uses [youtube-dl](https://github.com/rg3/youtube-dl) to enhance media capture
capabilities. capabilities.

View File

@ -6,7 +6,6 @@ import os
import sys import sys
import logging import logging
import brozzler import brozzler
import kombu
import re import re
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),

View File

@ -1,45 +0,0 @@
#!/usr/bin/env python
# vim: set sw=4 et:
import argparse
import os
import sys
import logging
import brozzler
import brozzler.hq
import signal
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-hq - headquarters of distributed brozzler crawl",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument("-d", "--db", dest="db_file", default="./brozzler-hq.db",
help="sqlite3 database filename; if the file does not exist, it will be created")
arg_parser.add_argument('-u', '--amqp-url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f',
help='URL identifying the amqp server to talk to')
arg_parser.add_argument("-v", "--verbose", dest="log_level",
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument("--version", action="version",
version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__)))
args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level,
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
def sigterm(signum, frame):
raise brozzler.ShutdownRequested("shutdown requested (caught SIGTERM)")
def sigint(signum, frame):
raise brozzler.ShutdownRequested("shutdown requested (caught SIGINT)")
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint)
logging.info("brozzler-hq starting")
db = brozzler.hq.BrozzlerHQDb(db_file=args.db_file)
hq = brozzler.hq.BrozzlerHQ(amqp_url=args.amqp_url, db=db)
try:
hq.run()
except brozzler.ShutdownRequested as e:
logging.info("{}".format(e))

View File

@ -6,7 +6,6 @@ import os
import sys import sys
import logging import logging
import brozzler import brozzler
import kombu
import yaml import yaml
import json import json
@ -14,8 +13,8 @@ arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-new-job - queue new job with brozzler", description="brozzler-new-job - queue new job with brozzler",
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('job_conf_file', metavar='JOB_CONF_FILE', help='brozzler job configuration file in yaml') arg_parser.add_argument('job_conf_file', metavar='JOB_CONF_FILE', help='brozzler job configuration file in yaml')
arg_parser.add_argument('-u', '--amqp-url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', arg_parser.add_argument("--db", dest="db", default="localhost",
help='URL identifying the amqp server to talk to') help="comma-separated list of RethinkDB server addresses, e.g. db0.example.com,db0.example.com:39015,db1.example.com")
arg_parser.add_argument("-v", "--verbose", dest="log_level", arg_parser.add_argument("-v", "--verbose", dest="log_level",
action="store_const", default=logging.INFO, const=logging.DEBUG) action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument("--version", action="version", arg_parser.add_argument("--version", action="version",
@ -60,6 +59,7 @@ for seed_conf in seeds:
if "warcprox_meta" in merged_conf: if "warcprox_meta" in merged_conf:
warcprox_meta = json.dumps(merged_conf["warcprox_meta"], separators=(',', ':')) warcprox_meta = json.dumps(merged_conf["warcprox_meta"], separators=(',', ':'))
extra_headers = {"Warcprox-Meta":warcprox_meta} extra_headers = {"Warcprox-Meta":warcprox_meta}
site = brozzler.Site(seed=merged_conf["url"], site = brozzler.Site(seed=merged_conf["url"],
scope=merged_conf.get("scope"), scope=merged_conf.get("scope"),
time_limit=merged_conf.get("time_limit"), time_limit=merged_conf.get("time_limit"),
@ -69,10 +69,7 @@ for seed_conf in seeds:
extra_headers=extra_headers) extra_headers=extra_headers)
sites.append(site) sites.append(site)
with kombu.Connection(args.amqp_url) as conn: db = brozzler.BrozzlerRethinkDb(args.db.split(","))
q = conn.SimpleQueue("brozzler.sites.new") for site in sites:
for site in sites: brozzler.new_site(db, site)
d = site.to_dict()
logging.info("feeding amqp queue %s with %s", repr(q.queue.name), repr(d))
q.put(d)

View File

@ -6,15 +6,14 @@ import os
import sys import sys
import logging import logging
import brozzler import brozzler
import kombu
import re import re
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-add-site - register site to crawl with brozzler-hq", description="brozzler-new-site - register site to brozzle",
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('seed', metavar='SEED', help='seed url') arg_parser.add_argument('seed', metavar='SEED', help='seed url')
arg_parser.add_argument('-u', '--amqp-url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', arg_parser.add_argument("--db", dest="db", default="localhost",
help='URL identifying the amqp server to talk to') help="comma-separated list of RethinkDB server addresses, e.g. db0.example.com,db0.example.com:39015,db1.example.com")
arg_parser.add_argument("--proxy", dest="proxy", default=None, help="http proxy for this site") arg_parser.add_argument("--proxy", dest="proxy", default=None, help="http proxy for this site")
arg_parser.add_argument("--time-limit", dest="time_limit", default=None, help="time limit in seconds for this site") arg_parser.add_argument("--time-limit", dest="time_limit", default=None, help="time limit in seconds for this site")
arg_parser.add_argument("-H", "--extra-header", action="append", arg_parser.add_argument("-H", "--extra-header", action="append",
@ -44,11 +43,6 @@ site = brozzler.Site(seed=args.seed, proxy=args.proxy,
enable_warcprox_features=args.enable_warcprox_features, enable_warcprox_features=args.enable_warcprox_features,
extra_headers=extra_headers) extra_headers=extra_headers)
with kombu.Connection(args.amqp_url) as conn: db = brozzler.BrozzlerRethinkDb(args.db.split(","))
q = conn.SimpleQueue("brozzler.sites.new") brozzler.new_site(db, site)
# XXX check url syntax?
d = site.to_dict()
logging.info("""feeding amqp queue "{}" with {}""".format(q.queue.name, d))
q.put(d)

View File

@ -11,12 +11,11 @@ import time
import signal import signal
import pprint import pprint
import traceback import traceback
import brozzler.worker
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('-u', '--amqp-url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', arg_parser.add_argument("--db", dest="db", default="localhost",
help='URL identifying the amqp server to talk to') help="comma-separated list of RethinkDB server addresses, e.g. db0.example.org,db0.example.org:39015,db1.example.org")
arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser',
help='executable to use to invoke chrome') help='executable to use to invoke chrome')
arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1',
@ -29,8 +28,6 @@ args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level, logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
# the way we're using it, amqp is too verbose at debug level
logging.getLogger("amqp").setLevel(logging.INFO)
def sigterm(signum, frame): def sigterm(signum, frame):
raise brozzler.ShutdownRequested('shutdown requested (caught SIGTERM)') raise brozzler.ShutdownRequested('shutdown requested (caught SIGTERM)')
@ -52,8 +49,8 @@ signal.signal(signal.SIGQUIT, dump_state)
signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint) signal.signal(signal.SIGINT, sigint)
worker = brozzler.worker.BrozzlerWorker(amqp_url=args.amqp_url, db = brozzler.BrozzlerRethinkDb(args.db.split(","))
max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe) worker = brozzler.BrozzlerWorker(db, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe)
worker.start() worker.start()

View File

@ -1,8 +1,10 @@
import json as _json import json as _json
from brozzler.browser import Browser, BrowserPool import logging as _logging
from brozzler.site import Page, Site from brozzler.site import Page, Site
from brozzler.hq import BrozzlerHQ
from brozzler.worker import BrozzlerWorker from brozzler.worker import BrozzlerWorker
from brozzler.robots import is_permitted_by_robots
from brozzler.db import BrozzlerRethinkDb
from brozzler.browser import Browser, BrowserPool
def _read_version(): def _read_version():
import os import os
@ -13,9 +15,17 @@ def _read_version():
version = _read_version() version = _read_version()
# XXX don't know if these should be restricted; right now, only needed for
# rethinkdb "between" query
MAX_PRIORITY = 1000000000
MIN_PRIORITY = -1000000000
class ShutdownRequested(Exception): class ShutdownRequested(Exception):
pass pass
class NothingToClaim(Exception):
pass
class ReachedLimit(Exception): class ReachedLimit(Exception):
def __init__(self, http_error=None, warcprox_meta=None, http_payload=None): def __init__(self, http_error=None, warcprox_meta=None, http_payload=None):
if http_error: if http_error:
@ -34,4 +44,17 @@ class ReachedLimit(Exception):
def __str__(self): def __str__(self):
return self.__repr__() return self.__repr__()
def new_site(db, site):
_logging.info("new site {}".format(site))
db.new_site(site)
try:
if is_permitted_by_robots(site, site.seed):
page = Page(site.seed, site_id=site.id, hops_from_seed=0, priority=1000)
db.new_page(page)
else:
_logging.warn("seed url {} is blocked by robots.txt".format(site.seed))
except ReachedLimit as e:
site.note_limit_reached(e)
db.update_site(site)
# vim: set sw=4 et: # vim: set sw=4 et:

View File

@ -40,9 +40,12 @@ class BrowserPool:
self.logger.info("browser ports: {}".format([browser.chrome_port for browser in self._available])) self.logger.info("browser ports: {}".format([browser.chrome_port for browser in self._available]))
def acquire(self): def acquire(self):
"""Returns browser from pool if available, raises KeyError otherwise.""" """Returns browser from pool if available, raises NoBrowsersAvailable otherwise."""
with self._lock: with self._lock:
browser = self._available.pop() try:
browser = self._available.pop()
except KeyError:
raise NoBrowsersAvailable()
self._in_use.add(browser) self._in_use.add(browser)
return browser return browser
@ -55,6 +58,9 @@ class BrowserPool:
for browser in self._in_use: for browser in self._in_use:
browser.abort_browse_page() browser.abort_browse_page()
class NoBrowsersAvailable(Exception):
pass
class BrowsingException(Exception): class BrowsingException(Exception):
pass pass
@ -400,7 +406,7 @@ class Chrome:
# XXX select doesn't work on windows # XXX select doesn't work on windows
def readline_nonblock(f): def readline_nonblock(f):
buf = b"" buf = b""
while (len(buf) == 0 or buf[-1] != 0xa) and select.select([f],[],[],0.5)[0]: while not self._shutdown.is_set() and (len(buf) == 0 or buf[-1] != 0xa) and select.select([f],[],[],0.5)[0]:
buf += f.read(1) buf += f.read(1)
return buf return buf
@ -432,25 +438,28 @@ class Chrome:
self.chrome_process.terminate() self.chrome_process.terminate()
first_sigterm = last_sigterm = time.time() first_sigterm = last_sigterm = time.time()
while time.time() - first_sigterm < timeout_sec: try:
time.sleep(0.5) while time.time() - first_sigterm < timeout_sec:
time.sleep(0.5)
status = self.chrome_process.poll() status = self.chrome_process.poll()
if status is not None: if status is not None:
if status == 0: if status == 0:
self.logger.info("chrome pid {} exited normally".format(self.chrome_process.pid, status)) self.logger.info("chrome pid {} exited normally".format(self.chrome_process.pid, status))
else: else:
self.logger.warn("chrome pid {} exited with nonzero status {}".format(self.chrome_process.pid, status)) self.logger.warn("chrome pid {} exited with nonzero status {}".format(self.chrome_process.pid, status))
return return
# sometimes a hung chrome process will terminate on repeated sigterms # sometimes a hung chrome process will terminate on repeated sigterms
if time.time() - last_sigterm > 10: if time.time() - last_sigterm > 10:
self.chrome_process.terminate() self.chrome_process.terminate()
last_sigterm = time.time() last_sigterm = time.time()
self.logger.warn("chrome pid {} still alive {} seconds after sending SIGTERM, sending SIGKILL".format(self.chrome_process.pid, timeout_sec)) self.logger.warn("chrome pid {} still alive {} seconds after sending SIGTERM, sending SIGKILL".format(self.chrome_process.pid, timeout_sec))
self.chrome_process.kill() self.chrome_process.kill()
self._out_reader_thread.join() status = self.chrome_process.wait()
status = self.chrome_process.wait() self.logger.warn("chrome pid {} reaped (status={}) after killing with SIGKILL".format(self.chrome_process.pid, status))
self.logger.warn("chrome pid {} reaped (status={}) after killing with SIGKILL".format(self.chrome_process.pid, status)) finally:
self._out_reader_thread.join()
self.chrome_process = None

120
brozzler/db.py Normal file
View File

@ -0,0 +1,120 @@
# vim: set sw=4 et:
import logging
import brozzler
import rethinkdb
r = rethinkdb
class UnexpectedDbResult(Exception):
pass
class BrozzlerRethinkDb:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, servers=["localhost"], db="brozzler", shards=3, replicas=3):
self.servers = servers
self.db = db
self.shards = shards
self.replicas = replicas
self._conn = self._connect(servers[0]) # XXX round robin
try:
tables = r.db(self.db).table_list().run(self._conn)
for tbl in "sites", "pages":
if not tbl in tables:
raise Exception("rethinkdb database {} exists but does not have table {}".format(repr(self.db), repr(tbl)))
except rethinkdb.errors.ReqlOpFailedError as e:
self.logger.info("rethinkdb database %s does not exist, initializing", repr(self.db))
self._init_db()
def _connect(self, server):
self.logger.info("connecting to rethinkdb at %s", server)
try:
host, port = server.split(":")
return r.connect(host=host, port=port)
except ValueError:
return r.connect(host=server)
# def _round_robin_connection(self):
# while True:
# for server in self.servers:
# try:
# host, port = server.split(":")
# conn = r.connect(host=host, port=port)
# except ValueError:
# conn = r.connect(host=server)
def _init_db(self):
r.db_create(self.db).run(self._conn)
# r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(self._conn)
r.db(self.db).table_create("sites", shards=self.shards, replicas=self.replicas).run(self._conn)
r.db(self.db).table_create("pages", shards=self.shards, replicas=self.replicas).run(self._conn)
r.db(self.db).table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["claimed"], r.row["brozzle_count"], r.row["priority"]]).run(self._conn)
self.logger.info("created database %s with tables 'sites' and 'pages'", self.db)
def _vet_result(self, result, **kwargs):
self.logger.debug("vetting expected=%s result=%s", kwargs, result)
# {'replaced': 0, 'errors': 0, 'skipped': 0, 'inserted': 1, 'deleted': 0, 'generated_keys': ['292859c1-4926-4b27-9d87-b2c367667058'], 'unchanged': 0}
for k in ["replaced", "errors", "skipped", "inserted", "deleted", "unchanged"]:
if k in kwargs:
expected = kwargs[k]
else:
expected = 0
if isinstance(expected, list):
if result.get(k) not in kwargs[k]:
raise UnexpectedDbResult("expected {} to be one of {} in {}".format(repr(k), expected, result))
else:
if result.get(k) != expected:
raise UnexpectedDbResult("expected {} to be {} in {}".format(repr(k), expected, result))
def new_site(self, site):
self.logger.info("inserting into 'sites' table %s", site)
result = r.db(self.db).table("sites").insert(site.to_dict()).run(self._conn)
self._vet_result(result, inserted=1)
site.id = result["generated_keys"][0]
def update_site(self, site):
self.logger.debug("updating 'sites' table entry %s", site)
result = r.db(self.db).table("sites").get(site.id).update(site.to_dict()).run(self._conn)
self._vet_result(result, replaced=1)
def update_page(self, page):
self.logger.debug("updating 'pages' table entry %s", page)
result = r.db(self.db).table("pages").get(page.id).update(page.to_dict()).run(self._conn)
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
def new_page(self, page):
self.logger.debug("inserting into 'pages' table %s", page)
result = r.db(self.db).table("pages").insert(page.to_dict()).run(self._conn)
self._vet_result(result, inserted=1)
def claim_site(self):
# XXX keep track of aggregate priority and prioritize sites accordingly?
result = r.db(self.db).table("sites").filter({"claimed":False,"status":"ACTIVE"}).limit(1).update({"claimed":True},return_changes=True).run(self._conn)
self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1:
return brozzler.Site(**result["changes"][0]["new_val"])
else:
raise brozzler.NothingToClaim
def claim_page(self, site):
result = (r.db(self.db).table("pages")
.between([site.id,False,0,brozzler.MIN_PRIORITY], [site.id,False,0,brozzler.MAX_PRIORITY], index="priority_by_site")
.order_by(index=r.desc("priority_by_site")).limit(1)
.update({"claimed":True},return_changes=True).run(self._conn))
self._vet_result(result, replaced=[0,1])
if result["replaced"] == 1:
return brozzler.Page(**result["changes"][0]["new_val"])
else:
raise brozzler.NothingToClaim
def has_outstanding_pages(self, site):
cursor = r.db(self.db).table("pages").between([site.id,False,0,brozzler.MIN_PRIORITY], [site.id,True,0,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1).run(self._conn)
return len(list(cursor)) > 0
def get_page(self, page):
result = r.db(self.db).table("pages").get(page.id).run(self._conn)
if result:
return brozzler.Page(**result)
else:
return None

View File

@ -1,294 +0,0 @@
# vim: set sw=4 et:
import json
import logging
import brozzler
import sqlite3
import time
import kombu
import kombu.simple
import reppy.cache
import requests
class BrozzlerHQDb:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, db_file="./brozzler-hq.db"):
self._conn = sqlite3.connect(db_file)
self._create_tables()
def _create_tables(self):
cursor = self._conn.cursor()
cursor.executescript("""
create table if not exists brozzler_sites (
id integer primary key,
status varchar(100) default 'ACTIVE',
site_json text
);
create table if not exists brozzler_pages (
id integer primary key,
site_id integer,
priority integer,
brozzle_count integer default 0,
in_progress boolean,
canon_url varchar(4000),
page_json text
);
create index if not exists brozzler_pages_priority on brozzler_pages (priority desc);
create index if not exists brozzler_pages_site_id on brozzler_pages (site_id);
""")
self._conn.commit()
def pop_page(self, site_id):
cursor = self._conn.cursor()
cursor.execute("select id, priority, page_json from brozzler_pages where site_id=? and not in_progress and brozzle_count=0 order by priority desc limit 1", (site_id,))
row = cursor.fetchone()
if row:
(id, priority, page_json) = row
new_priority = priority - 2000
cursor.execute("update brozzler_pages set priority=?, in_progress=1 where id=?", (new_priority, id))
self._conn.commit()
d = json.loads(page_json)
d["id"] = id
return d
else:
return None
def completed(self, page):
cursor = self._conn.cursor()
cursor.execute("update brozzler_pages set in_progress=0, brozzle_count=brozzle_count+1 where id=?", (page.id,))
self._conn.commit()
def new_site(self, site):
cursor = self._conn.cursor()
cursor.execute("insert into brozzler_sites (site_json) values (?)", (site.to_json(),))
self._conn.commit()
return cursor.lastrowid
def update_site(self, site):
cursor = self._conn.cursor()
if site.reached_limit:
self.logger.info("setting status=FINISHED_REACHED_LIMIT because site.reached_limit=%s", site.reached_limit)
cursor.execute("update brozzler_sites set status=?, site_json=? where id=?", ("FINISHED_REACHED_LIMIT", site.to_json(), site.id))
else:
cursor.execute("update brozzler_sites set site_json=? where id=?", (site.to_json(), site.id))
self._conn.commit()
def schedule_page(self, page, priority=0):
cursor = self._conn.cursor()
cursor.execute("insert into brozzler_pages (site_id, priority, canon_url, page_json, in_progress) values (?, ?, ?, ?, 0)",
(page.site_id, priority, page.canon_url(), page.to_json()))
self._conn.commit()
def sites(self):
cursor = self._conn.cursor()
cursor.execute("select id, site_json from brozzler_sites where status not like 'FINISHED%'")
while True:
row = cursor.fetchone()
if row is None:
break
site_dict = json.loads(row[1])
site_dict["id"] = row[0]
yield brozzler.Site(**site_dict)
def update_page(self, page):
cursor = self._conn.cursor()
# CREATE TABLE brozzler_pages ( id integer primary key, site_id integer, priority integer, in_progress boolean, canon_url varchar(4000), page_json text
cursor.execute("select id, priority, page_json from brozzler_pages where site_id=? and canon_url=?", (page.site_id, page.canon_url()))
row = cursor.fetchone()
if row:
# (id, priority, existing_page) = row
new_priority = page.calc_priority() + row[1]
existing_page = brozzler.Page(**json.loads(row[2]))
existing_page.hops_from_seed = min(page.hops_from_seed, existing_page.hops_from_seed)
cursor.execute("update brozzler_pages set priority=?, page_json=? where id=?", (new_priority, existing_page.to_json(), row[0]))
self._conn.commit()
else:
raise KeyError("page not in brozzler_pages site_id={} canon_url={}".format(page.site_id, page.canon_url()))
def in_progress_pages(self, site):
cursor = self._conn.cursor()
cursor.execute("select id, page_json from brozzler_pages where site_id = ? and in_progress", (site.id,))
pages = []
for row in cursor.fetchall():
(id, page_json) = row
page = brozzler.Page(**json.loads(page_json))
page.id = id
pages.append(page)
if len(pages) > 1:
self.logger.error("more than one page in progress for site?! shouldn't happen, violates politeness policy... site={}: pages={}".format(site, pages))
return pages
def set_status(self, site, status):
cursor = self._conn.cursor()
cursor.execute("update brozzler_sites set status=? where id=?", (status, site.id,))
self._conn.commit()
def get_status(self, site):
cursor = self._conn.cursor()
cursor.execute("select status from brozzler_sites where id=?", (site.id,))
row = cursor.fetchone()
if row:
return row[0]
else:
raise KeyError("site not in brozzler_sites id={}".format(site.id,))
class BrozzlerHQ:
logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, amqp_url="amqp://guest:guest@localhost:5672/%2f", db=None):
self.amqp_url = amqp_url
self._conn = kombu.Connection(amqp_url)
self._new_sites_q = self._conn.SimpleQueue("brozzler.sites.new")
self._unclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.unclaimed")
self._disclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.disclaimed")
if db != None:
self._db = db
else:
self._db = BrozzlerHQDb()
self._robots_caches = {} # {site_id:reppy.cache.RobotsCache}
def _robots_cache(self, site):
class SessionRaiseOn420(requests.Session):
def get(self, url, **kwargs):
res = super().get(url, **kwargs)
if res.status_code == 420 and 'warcprox-meta' in res.headers:
raise brozzler.ReachedLimit(warcprox_meta=json.loads(res.headers['warcprox-meta']), http_payload=res.text)
else:
return res
if not site.id in self._robots_caches:
req_sesh = SessionRaiseOn420()
req_sesh.verify = False # ignore cert errors
if site.proxy:
proxie = "http://{}".format(site.proxy)
req_sesh.proxies = {"http":proxie,"https":proxie}
if site.extra_headers:
req_sesh.headers.update(site.extra_headers)
self._robots_caches[site.id] = reppy.cache.RobotsCache(session=req_sesh)
return self._robots_caches[site.id]
def is_permitted_by_robots(self, site, url):
if site.ignore_robots:
return True
try:
result = self._robots_cache(site).allowed(url, "brozzler")
return result
except BaseException as e:
if isinstance(e, reppy.exceptions.ServerError) and isinstance(e.args[0], brozzler.ReachedLimit):
raise e.args[0]
else:
self.logger.error("problem with robots.txt for %s: %s", url, repr(e))
return False
def run(self):
try:
while True:
self._new_site()
self._consume_completed_page()
self._feed_pages()
self._disclaimed_site()
time.sleep(0.5)
finally:
self._conn.close()
def _disclaimed_site(self):
try:
msg = self._disclaimed_sites_q.get(block=False)
self.logger.info("msg.payload=%s", msg.payload)
site = brozzler.Site(**msg.payload)
self.logger.info("site=%s", site)
self._db.update_site(site)
msg.ack()
self.logger.info("received disclaimed site {}".format(site))
status = self._db.get_status(site)
if not status.startswith("FINISHED"):
self.logger.info("feeding disclaimed site {} back to {}".format(site, self._unclaimed_sites_q.queue.name))
self._unclaimed_sites_q.put(site.to_dict())
else:
self.logger.info("disclaimed site is %s %s", status, site)
except kombu.simple.Empty:
pass
def _new_site(self):
try:
msg = self._new_sites_q.get(block=False)
site = brozzler.Site(**msg.payload)
msg.ack()
self.logger.info("new site {}".format(site))
site_id = self._db.new_site(site)
site.id = site_id
try:
if self.is_permitted_by_robots(site, site.seed):
page = brozzler.Page(site.seed, site_id=site.id, hops_from_seed=0)
self._db.schedule_page(page, priority=1000)
self._unclaimed_sites_q.put(site.to_dict())
else:
self.logger.warn("seed url {} is blocked by robots.txt".format(site.seed))
except brozzler.ReachedLimit as e:
site.note_limit_reached(e)
self._db.update_site(site)
except kombu.simple.Empty:
pass
def _finished(self, site):
self.logger.info("site FINISHED! {}".format(site))
self._db.set_status(site, "FINISHED")
def _feed_pages(self):
for site in self._db.sites():
q = self._conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id))
if len(q) == 0:
page = self._db.pop_page(site.id)
if page:
self.logger.info("feeding {} to {}".format(page, q.queue.name))
q.put(page)
elif not self._db.in_progress_pages(site):
self._finished(site)
def _scope_and_schedule_outlinks(self, site, parent_page):
counts = {"added":0,"updated":0,"rejected":0,"blocked":0}
if parent_page.outlinks:
for url in parent_page.outlinks:
if site.is_in_scope(url, parent_page):
if self.is_permitted_by_robots(site, url):
child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1)
try:
self._db.update_page(child_page)
counts["updated"] += 1
except KeyError:
self._db.schedule_page(child_page, priority=child_page.calc_priority())
counts["added"] += 1
else:
counts["blocked"] += 1
else:
counts["rejected"] += 1
self.logger.info("{} new links added, {} existing links updated, {} links rejected, {} links blocked by robots from {}".format(
counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page))
def _consume_completed_page(self):
for site in self._db.sites():
q = self._conn.SimpleQueue("brozzler.sites.{}.completed_pages".format(site.id))
try:
msg = q.get(block=False)
completed_page = brozzler.Page(**msg.payload)
msg.ack()
self._db.completed(completed_page)
if completed_page.redirect_url and completed_page.hops_from_seed == 0:
site.note_seed_redirect(completed_page.redirect_url)
self._db.update_site(site)
self._scope_and_schedule_outlinks(site, completed_page)
except kombu.simple.Empty:
pass

43
brozzler/robots.py Normal file
View File

@ -0,0 +1,43 @@
# vim: set sw=4 et:
import json
import logging
import brozzler
import reppy.cache
import requests
_robots_caches = {} # {site_id:reppy.cache.RobotsCache}
def _robots_cache(site):
class SessionRaiseOn420(requests.Session):
def get(self, url, *args, **kwargs):
res = super().get(url, *args, **kwargs)
if res.status_code == 420 and 'warcprox-meta' in res.headers:
raise ReachedLimit(warcprox_meta=json.loads(res.headers['warcprox-meta']), http_payload=res.text)
else:
return res
if not site.id in _robots_caches:
req_sesh = SessionRaiseOn420()
req_sesh.verify = False # ignore cert errors
if site.proxy:
proxie = "http://{}".format(site.proxy)
req_sesh.proxies = {"http":proxie,"https":proxie}
if site.extra_headers:
req_sesh.headers.update(site.extra_headers)
_robots_caches[site.id] = reppy.cache.RobotsCache(session=req_sesh)
return _robots_caches[site.id]
def is_permitted_by_robots(site, url):
if site.ignore_robots:
return True
try:
result = _robots_cache(site).allowed(url, "brozzler")
return result
except BaseException as e:
if isinstance(e, reppy.exceptions.ServerError) and isinstance(e.args[0], brozzler.ReachedLimit):
raise e.args[0]
else:
logging.error("problem with robots.txt for %s: %s", url, repr(e), exc_info=True)
return False

View File

@ -4,13 +4,26 @@ import surt
import json import json
import logging import logging
import brozzler import brozzler
import hashlib
class Site: class BaseDictable:
def to_dict(self):
d = dict(vars(self))
for k in vars(self):
if k.startswith("_") or d[k] is None:
del d[k]
return d
def to_json(self):
return json.dumps(self.to_dict(), separators=(',', ':'))
class Site(BaseDictable):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, seed, id=None, scope=None, proxy=None, def __init__(self, seed, id=None, scope=None, proxy=None,
ignore_robots=False, time_limit=None, extra_headers=None, ignore_robots=False, time_limit=None, extra_headers=None,
enable_warcprox_features=False, reached_limit=None): enable_warcprox_features=False, reached_limit=None, status="ACTIVE",
claimed=False):
self.seed = seed self.seed = seed
self.id = id self.id = id
self.proxy = proxy self.proxy = proxy
@ -19,6 +32,8 @@ class Site:
self.extra_headers = extra_headers self.extra_headers = extra_headers
self.time_limit = time_limit self.time_limit = time_limit
self.reached_limit = reached_limit self.reached_limit = reached_limit
self.status = status
self.claimed = bool(claimed)
self.scope = scope or {} self.scope = scope or {}
if not "surt" in self.scope: if not "surt" in self.scope:
@ -44,6 +59,7 @@ class Site:
e.warcprox_meta["reached-limit"], self.reached_limit) e.warcprox_meta["reached-limit"], self.reached_limit)
else: else:
self.reached_limit = e.warcprox_meta["reached-limit"] self.reached_limit = e.warcprox_meta["reached-limit"]
self.status = "FINISHED_REACHED_LIMIT"
def is_in_scope(self, url, parent_page=None): def is_in_scope(self, url, parent_page=None):
if parent_page and "max_hops" in self.scope and parent_page.hops_from_seed >= self.scope["max_hops"]: if parent_page and "max_hops" in self.scope and parent_page.hops_from_seed >= self.scope["max_hops"]:
@ -62,25 +78,26 @@ class Site:
self.logger.warn("""problem parsing url "{}" """.format(url)) self.logger.warn("""problem parsing url "{}" """.format(url))
return False return False
def to_dict(self): class Page(BaseDictable):
d = dict(vars(self)) def __init__(self, url, id=None, site_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0):
for k in vars(self):
if k.startswith("_"):
del d[k]
return d
def to_json(self):
return json.dumps(self.to_dict(), separators=(',', ':'))
class Page:
def __init__(self, url, id=None, site_id=None, hops_from_seed=0, outlinks=None, redirect_url=None):
self.id = id
self.site_id = site_id self.site_id = site_id
self.url = url self.url = url
self.hops_from_seed = hops_from_seed self.hops_from_seed = hops_from_seed
self._canon_hurl = None
self.outlinks = outlinks
self.redirect_url = redirect_url self.redirect_url = redirect_url
self.claimed = bool(claimed)
self.brozzle_count = brozzle_count
self._canon_hurl = None
if priority is not None:
self.priority = priority
else:
self.priority = self._calc_priority()
if id is not None:
self.id = id
else:
digest_this = "site_id:{},canon_url:{}".format(self.site_id, self.canon_url())
self.id = hashlib.sha1(digest_this.encode("utf-8")).hexdigest()
def __repr__(self): def __repr__(self):
return """Page(url={},site_id={},hops_from_seed={})""".format( return """Page(url={},site_id={},hops_from_seed={})""".format(
@ -89,10 +106,12 @@ class Page:
def note_redirect(self, url): def note_redirect(self, url):
self.redirect_url = url self.redirect_url = url
def calc_priority(self): def _calc_priority(self):
priority = 0 priority = 0
priority += max(0, 10 - self.hops_from_seed) priority += max(0, 10 - self.hops_from_seed)
priority += max(0, 6 - self.canon_url().count("/")) priority += max(0, 6 - self.canon_url().count("/"))
priority = max(priority, brozzler.MIN_PRIORITY)
priority = min(priority, brozzler.MAX_PRIORITY)
return priority return priority
def canon_url(self): def canon_url(self):
@ -101,20 +120,3 @@ class Page:
surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl) surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl)
return self._canon_hurl.geturl() return self._canon_hurl.geturl()
def to_dict(self):
d = dict(vars(self))
for k in vars(self):
if k.startswith("_"):
del d[k]
if self.outlinks is not None and not isinstance(self.outlinks, list):
outlinks = []
outlinks.extend(self.outlinks)
d["outlinks"] = outlinks
return d
def to_json(self):
return json.dumps(self.to_dict(), separators=(',', ':'))

View File

@ -6,7 +6,6 @@ import brozzler
import threading import threading
import time import time
import signal import signal
import kombu
import youtube_dl import youtube_dl
import urllib.request import urllib.request
import json import json
@ -14,9 +13,8 @@ import json
class BrozzlerWorker: class BrozzlerWorker:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
def __init__(self, amqp_url="amqp://guest:guest@localhost:5672/%2f", def __init__(self, db, max_browsers=1, chrome_exe="chromium-browser"):
max_browsers=1, chrome_exe="chromium-browser"): self._db = db
self._amqp_url = amqp_url
self._max_browsers = max_browsers self._max_browsers = max_browsers
self._browser_pool = brozzler.browser.BrowserPool(max_browsers, self._browser_pool = brozzler.browser.BrowserPool(max_browsers,
chrome_exe=chrome_exe, ignore_cert_errors=True) chrome_exe=chrome_exe, ignore_cert_errors=True)
@ -44,31 +42,25 @@ class BrozzlerWorker:
## os.environ["http_proxy"] = "http://{}".format(site.proxy) ## os.environ["http_proxy"] = "http://{}".format(site.proxy)
return youtube_dl.YoutubeDL(ydl_opts) return youtube_dl.YoutubeDL(ydl_opts)
def _next_page(self, site):
"""Raises kombu.simple.Empty if queue is empty"""
with kombu.Connection(self._amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id))
msg = q.get(block=True, timeout=0.5)
page_dict = msg.payload
page = brozzler.Page(**page_dict)
msg.ack()
return page
def _completed_page(self, site, page): def _completed_page(self, site, page):
with kombu.Connection(self._amqp_url) as conn: page.brozzle_count += 1
q = conn.SimpleQueue("brozzler.sites.{}.completed_pages".format(site.id)) page.claimed = False
self.logger.info("feeding {} to {}".format(page, q.queue.name)) # XXX set priority?
q.put(page.to_dict()) self._db.update_page(page)
if page.redirect_url and page.hops_from_seed == 0:
site.note_seed_redirect(page.redirect_url)
self._db.update_site(site)
def _disclaim_site(self, site, page=None): def _disclaim_site(self, site, page=None):
with kombu.Connection(self._amqp_url) as conn: self.logger.info("disclaiming %s", site)
q = conn.SimpleQueue("brozzler.sites.disclaimed".format(site.id)) site.claimed = False
self.logger.info("feeding {} to {}".format(site, q.queue.name)) if not page and not self._db.has_outstanding_pages(site):
q.put(site.to_dict()) self.logger.info("site FINISHED! %s", site)
if page: site.status = "FINISHED"
q = conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id)) self._db.update_site(site)
self.logger.info("feeding unfinished page {} to {}".format(page, q.queue.name)) if page:
q.put(page.to_dict()) page.claimed = False
self._db.update_page(page)
def _putmeta(self, warcprox_address, url, content_type, payload, extra_headers=None): def _putmeta(self, warcprox_address, url, content_type, payload, extra_headers=None):
headers = {"Content-Type":content_type} headers = {"Content-Type":content_type}
@ -110,6 +102,29 @@ class BrozzlerWorker:
else: else:
raise raise
def _scope_and_schedule_outlinks(self, site, parent_page, outlinks):
counts = {"added":0,"updated":0,"rejected":0,"blocked":0}
if outlinks:
for url in outlinks:
if site.is_in_scope(url, parent_page):
if brozzler.is_permitted_by_robots(site, url):
new_child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1)
existing_child_page = self._db.get_page(new_child_page)
if existing_child_page:
existing_child_page.priority += new_child_page.priority
self._db.update_page(existing_child_page)
counts["updated"] += 1
else:
self._db.new_page(new_child_page)
counts["added"] += 1
else:
counts["blocked"] += 1
else:
counts["rejected"] += 1
self.logger.info("%s new links added, %s existing links updated, %s links rejected, %s links blocked by robots from %s",
counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page)
def brozzle_page(self, browser, ydl, site, page): def brozzle_page(self, browser, ydl, site, page):
def on_screenshot(screenshot_png): def on_screenshot(screenshot_png):
if site.proxy and site.enable_warcprox_features: if site.proxy and site.enable_warcprox_features:
@ -126,10 +141,10 @@ class BrozzlerWorker:
except: except:
self.logger.error("youtube_dl raised exception on {}".format(page), exc_info=True) self.logger.error("youtube_dl raised exception on {}".format(page), exc_info=True)
page.outlinks = browser.browse_page(page.url, outlinks = browser.browse_page(page.url,
extra_headers=site.extra_headers, extra_headers=site.extra_headers, on_screenshot=on_screenshot,
on_screenshot=on_screenshot,
on_url_change=page.note_redirect) on_url_change=page.note_redirect)
self._scope_and_schedule_outlinks(site, page, outlinks)
def _brozzle_site(self, browser, ydl, site): def _brozzle_site(self, browser, ydl, site):
start = time.time() start = time.time()
@ -137,71 +152,48 @@ class BrozzlerWorker:
try: try:
browser.start(proxy=site.proxy) browser.start(proxy=site.proxy)
while not self._shutdown_requested.is_set() and time.time() - start < 300: while not self._shutdown_requested.is_set() and time.time() - start < 300:
try: page = self._db.claim_page(site)
page = self._next_page(site) self.brozzle_page(browser, ydl, site, page)
self.brozzle_page(browser, ydl, site, page) self._completed_page(site, page)
self._completed_page(site, page) page = None
page = None except brozzler.NothingToClaim:
except kombu.simple.Empty: self.logger.info("no pages left for site %s", site)
# if some timeout reached, re-raise?
pass
# except kombu.simple.Empty:
# self.logger.info("finished {} (queue is empty)".format(site))
except brozzler.ReachedLimit as e: except brozzler.ReachedLimit as e:
site.note_limit_reached(e) site.note_limit_reached(e)
except brozzler.browser.BrowsingAborted: except brozzler.browser.BrowsingAborted:
self.logger.info("{} shut down".format(browser)) self.logger.info("{} shut down".format(browser))
except:
self.logger.critical("unexpected exception", exc_info=True)
finally: finally:
self.logger.info("finished session brozzling site, stopping browser and disclaiming site") self.logger.info("finished session brozzling site, stopping browser and disclaiming site")
browser.stop() browser.stop()
self._disclaim_site(site, page) self._disclaim_site(site, page)
self._browser_pool.release(browser) self._browser_pool.release(browser)
def _claim_site(self, q):
"""Reads message from SimpleQueue q, fires off thread to brozzle the
site. Raises KeyError if no browsers are available, kombu.simple.Empty
if queue is empty."""
browser = self._browser_pool.acquire()
try:
msg = q.get(block=True, timeout=0.5)
site = brozzler.Site(**msg.payload)
msg.ack()
self.logger.info("brozzling site {}".format(site))
ydl = self._youtube_dl(site)
th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site),
name="BrowsingThread-{}".format(site.seed))
th.start()
except:
self._browser_pool.release(browser)
raise
def run(self): def run(self):
latest_state = None latest_state = None
while not self._shutdown_requested.is_set(): while not self._shutdown_requested.is_set():
try: try:
# XXX too much connecting and disconnecting from rabbitmq browser = self._browser_pool.acquire()
with kombu.Connection(self._amqp_url) as conn: try:
q = conn.SimpleQueue("brozzler.sites.unclaimed") site = self._db.claim_site()
q_empty = False self.logger.info("brozzling site %s", site)
if len(q) > 0: ydl = self._youtube_dl(site)
try: th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site),
self._claim_site(q) name="BrowsingThread-{}".format(site.seed))
except kombu.simple.Empty: th.start()
q_empty = True except:
except KeyError: self._browser_pool.release(browser)
if latest_state != "browsers-busy": raise
self.logger.info("all {} browsers are busy".format(self._max_browsers)) except brozzler.browser.NoBrowsersAvailable:
latest_state = "browsers-busy" if latest_state != "browsers-busy":
else: self.logger.info("all %s browsers are busy", self._max_browsers)
q_empty = True latest_state = "browsers-busy"
except brozzler.NothingToClaim:
if q_empty: if latest_state != "no-unclaimed-sites":
if latest_state != "no-unclaimed-sites": self.logger.info("no unclaimed sites to browse")
# self.logger.info("no unclaimed sites to browse") latest_state = "no-unclaimed-sites"
latest_state = "no-unclaimed-sites" time.sleep(0.5)
time.sleep(0.5)
except OSError as e:
self.logger.warn("continuing after i/o exception (from rabbitmq?)", exc_info=True)
def start(self): def start(self):
th = threading.Thread(target=self.run, name="BrozzlerWorker") th = threading.Thread(target=self.run, name="BrozzlerWorker")

View File

@ -1,4 +1,3 @@
kombu
argparse argparse
PyYAML PyYAML
git+https://github.com/ikreymer/surt.git@py3 git+https://github.com/ikreymer/surt.git@py3
@ -8,3 +7,4 @@ git+https://github.com/seomoz/reppy.git
requests requests
# websocket-client # websocket-client
git+https://github.com/nlevitt/websocket-client.git@tweaks git+https://github.com/nlevitt/websocket-client.git@tweaks
rethinkdb