diff --git a/README.md b/README.md index 7d77886..5ad836f 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,8 @@ brozzler ======== "browser" | "crawler" = "brozzler" -Brozzler is a distributed web crawler that uses a real browser (chrome or -chromium) to fetch pages and embedded urls and to extract links. It also +Brozzler is a distributed web crawler (爬虫) that uses a real browser (chrome +or chromium) to fetch pages and embedded urls and to extract links. It also uses [youtube-dl](https://github.com/rg3/youtube-dl) to enhance media capture capabilities. diff --git a/bin/brozzle-page b/bin/brozzle-page index fd961e2..de3209e 100755 --- a/bin/brozzle-page +++ b/bin/brozzle-page @@ -6,7 +6,6 @@ import os import sys import logging import brozzler -import kombu import re arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), diff --git a/bin/brozzler-hq b/bin/brozzler-hq deleted file mode 100755 index e83d695..0000000 --- a/bin/brozzler-hq +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python -# vim: set sw=4 et: - -import argparse -import os -import sys -import logging -import brozzler -import brozzler.hq -import signal - -arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), - description="brozzler-hq - headquarters of distributed brozzler crawl", - formatter_class=argparse.ArgumentDefaultsHelpFormatter) -arg_parser.add_argument("-d", "--db", dest="db_file", default="./brozzler-hq.db", - help="sqlite3 database filename; if the file does not exist, it will be created") -arg_parser.add_argument('-u', '--amqp-url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', - help='URL identifying the amqp server to talk to') -arg_parser.add_argument("-v", "--verbose", dest="log_level", - action="store_const", default=logging.INFO, const=logging.DEBUG) -arg_parser.add_argument("--version", action="version", - version="brozzler {} - {}".format(brozzler.version, os.path.basename(__file__))) -args = arg_parser.parse_args(args=sys.argv[1:]) - -logging.basicConfig(stream=sys.stdout, level=args.log_level, - format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") - -def sigterm(signum, frame): - raise brozzler.ShutdownRequested("shutdown requested (caught SIGTERM)") -def sigint(signum, frame): - raise brozzler.ShutdownRequested("shutdown requested (caught SIGINT)") - -signal.signal(signal.SIGTERM, sigterm) -signal.signal(signal.SIGINT, sigint) - -logging.info("brozzler-hq starting") - -db = brozzler.hq.BrozzlerHQDb(db_file=args.db_file) -hq = brozzler.hq.BrozzlerHQ(amqp_url=args.amqp_url, db=db) - -try: - hq.run() -except brozzler.ShutdownRequested as e: - logging.info("{}".format(e)) - diff --git a/bin/brozzler-new-job b/bin/brozzler-new-job index b785a73..f949a8d 100755 --- a/bin/brozzler-new-job +++ b/bin/brozzler-new-job @@ -6,7 +6,6 @@ import os import sys import logging import brozzler -import kombu import yaml import json @@ -14,8 +13,8 @@ arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description="brozzler-new-job - queue new job with brozzler", formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('job_conf_file', metavar='JOB_CONF_FILE', help='brozzler job configuration file in yaml') -arg_parser.add_argument('-u', '--amqp-url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', - help='URL identifying the amqp server to talk to') +arg_parser.add_argument("--db", dest="db", default="localhost", + help="comma-separated list of RethinkDB server addresses, e.g. db0.example.com,db0.example.com:39015,db1.example.com") arg_parser.add_argument("-v", "--verbose", dest="log_level", action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument("--version", action="version", @@ -60,6 +59,7 @@ for seed_conf in seeds: if "warcprox_meta" in merged_conf: warcprox_meta = json.dumps(merged_conf["warcprox_meta"], separators=(',', ':')) extra_headers = {"Warcprox-Meta":warcprox_meta} + site = brozzler.Site(seed=merged_conf["url"], scope=merged_conf.get("scope"), time_limit=merged_conf.get("time_limit"), @@ -69,10 +69,7 @@ for seed_conf in seeds: extra_headers=extra_headers) sites.append(site) -with kombu.Connection(args.amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.new") - for site in sites: - d = site.to_dict() - logging.info("feeding amqp queue %s with %s", repr(q.queue.name), repr(d)) - q.put(d) +db = brozzler.BrozzlerRethinkDb(args.db.split(",")) +for site in sites: + brozzler.new_site(db, site) diff --git a/bin/brozzler-new-site b/bin/brozzler-new-site index 2386206..0c90122 100755 --- a/bin/brozzler-new-site +++ b/bin/brozzler-new-site @@ -6,15 +6,14 @@ import os import sys import logging import brozzler -import kombu import re arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), - description="brozzler-add-site - register site to crawl with brozzler-hq", + description="brozzler-new-site - register site to brozzle", formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument('seed', metavar='SEED', help='seed url') -arg_parser.add_argument('-u', '--amqp-url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', - help='URL identifying the amqp server to talk to') +arg_parser.add_argument("--db", dest="db", default="localhost", + help="comma-separated list of RethinkDB server addresses, e.g. db0.example.com,db0.example.com:39015,db1.example.com") arg_parser.add_argument("--proxy", dest="proxy", default=None, help="http proxy for this site") arg_parser.add_argument("--time-limit", dest="time_limit", default=None, help="time limit in seconds for this site") arg_parser.add_argument("-H", "--extra-header", action="append", @@ -44,11 +43,6 @@ site = brozzler.Site(seed=args.seed, proxy=args.proxy, enable_warcprox_features=args.enable_warcprox_features, extra_headers=extra_headers) -with kombu.Connection(args.amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.new") - # XXX check url syntax? - d = site.to_dict() - logging.info("""feeding amqp queue "{}" with {}""".format(q.queue.name, d)) - q.put(d) - +db = brozzler.BrozzlerRethinkDb(args.db.split(",")) +brozzler.new_site(db, site) diff --git a/bin/brozzler-worker b/bin/brozzler-worker index 439d714..184d4e2 100755 --- a/bin/brozzler-worker +++ b/bin/brozzler-worker @@ -11,12 +11,11 @@ import time import signal import pprint import traceback -import brozzler.worker arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), formatter_class=argparse.ArgumentDefaultsHelpFormatter) -arg_parser.add_argument('-u', '--amqp-url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', - help='URL identifying the amqp server to talk to') +arg_parser.add_argument("--db", dest="db", default="localhost", + help="comma-separated list of RethinkDB server addresses, e.g. db0.example.org,db0.example.org:39015,db1.example.org") arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser', help='executable to use to invoke chrome') arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1', @@ -29,8 +28,6 @@ args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -# the way we're using it, amqp is too verbose at debug level -logging.getLogger("amqp").setLevel(logging.INFO) def sigterm(signum, frame): raise brozzler.ShutdownRequested('shutdown requested (caught SIGTERM)') @@ -52,8 +49,8 @@ signal.signal(signal.SIGQUIT, dump_state) signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) -worker = brozzler.worker.BrozzlerWorker(amqp_url=args.amqp_url, - max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe) +db = brozzler.BrozzlerRethinkDb(args.db.split(",")) +worker = brozzler.BrozzlerWorker(db, max_browsers=int(args.max_browsers), chrome_exe=args.chrome_exe) worker.start() diff --git a/brozzler/__init__.py b/brozzler/__init__.py index d550eed..bfac5f9 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -1,8 +1,10 @@ import json as _json -from brozzler.browser import Browser, BrowserPool +import logging as _logging from brozzler.site import Page, Site -from brozzler.hq import BrozzlerHQ from brozzler.worker import BrozzlerWorker +from brozzler.robots import is_permitted_by_robots +from brozzler.db import BrozzlerRethinkDb +from brozzler.browser import Browser, BrowserPool def _read_version(): import os @@ -13,9 +15,17 @@ def _read_version(): version = _read_version() +# XXX don't know if these should be restricted; right now, only needed for +# rethinkdb "between" query +MAX_PRIORITY = 1000000000 +MIN_PRIORITY = -1000000000 + class ShutdownRequested(Exception): pass +class NothingToClaim(Exception): + pass + class ReachedLimit(Exception): def __init__(self, http_error=None, warcprox_meta=None, http_payload=None): if http_error: @@ -34,4 +44,17 @@ class ReachedLimit(Exception): def __str__(self): return self.__repr__() +def new_site(db, site): + _logging.info("new site {}".format(site)) + db.new_site(site) + try: + if is_permitted_by_robots(site, site.seed): + page = Page(site.seed, site_id=site.id, hops_from_seed=0, priority=1000) + db.new_page(page) + else: + _logging.warn("seed url {} is blocked by robots.txt".format(site.seed)) + except ReachedLimit as e: + site.note_limit_reached(e) + db.update_site(site) + # vim: set sw=4 et: diff --git a/brozzler/browser.py b/brozzler/browser.py index 1fec6b0..5423405 100644 --- a/brozzler/browser.py +++ b/brozzler/browser.py @@ -40,9 +40,12 @@ class BrowserPool: self.logger.info("browser ports: {}".format([browser.chrome_port for browser in self._available])) def acquire(self): - """Returns browser from pool if available, raises KeyError otherwise.""" + """Returns browser from pool if available, raises NoBrowsersAvailable otherwise.""" with self._lock: - browser = self._available.pop() + try: + browser = self._available.pop() + except KeyError: + raise NoBrowsersAvailable() self._in_use.add(browser) return browser @@ -55,6 +58,9 @@ class BrowserPool: for browser in self._in_use: browser.abort_browse_page() +class NoBrowsersAvailable(Exception): + pass + class BrowsingException(Exception): pass @@ -400,7 +406,7 @@ class Chrome: # XXX select doesn't work on windows def readline_nonblock(f): buf = b"" - while (len(buf) == 0 or buf[-1] != 0xa) and select.select([f],[],[],0.5)[0]: + while not self._shutdown.is_set() and (len(buf) == 0 or buf[-1] != 0xa) and select.select([f],[],[],0.5)[0]: buf += f.read(1) return buf @@ -432,25 +438,28 @@ class Chrome: self.chrome_process.terminate() first_sigterm = last_sigterm = time.time() - while time.time() - first_sigterm < timeout_sec: - time.sleep(0.5) + try: + while time.time() - first_sigterm < timeout_sec: + time.sleep(0.5) - status = self.chrome_process.poll() - if status is not None: - if status == 0: - self.logger.info("chrome pid {} exited normally".format(self.chrome_process.pid, status)) - else: - self.logger.warn("chrome pid {} exited with nonzero status {}".format(self.chrome_process.pid, status)) - return + status = self.chrome_process.poll() + if status is not None: + if status == 0: + self.logger.info("chrome pid {} exited normally".format(self.chrome_process.pid, status)) + else: + self.logger.warn("chrome pid {} exited with nonzero status {}".format(self.chrome_process.pid, status)) + return - # sometimes a hung chrome process will terminate on repeated sigterms - if time.time() - last_sigterm > 10: - self.chrome_process.terminate() - last_sigterm = time.time() + # sometimes a hung chrome process will terminate on repeated sigterms + if time.time() - last_sigterm > 10: + self.chrome_process.terminate() + last_sigterm = time.time() - self.logger.warn("chrome pid {} still alive {} seconds after sending SIGTERM, sending SIGKILL".format(self.chrome_process.pid, timeout_sec)) - self.chrome_process.kill() - self._out_reader_thread.join() - status = self.chrome_process.wait() - self.logger.warn("chrome pid {} reaped (status={}) after killing with SIGKILL".format(self.chrome_process.pid, status)) + self.logger.warn("chrome pid {} still alive {} seconds after sending SIGTERM, sending SIGKILL".format(self.chrome_process.pid, timeout_sec)) + self.chrome_process.kill() + status = self.chrome_process.wait() + self.logger.warn("chrome pid {} reaped (status={}) after killing with SIGKILL".format(self.chrome_process.pid, status)) + finally: + self._out_reader_thread.join() + self.chrome_process = None diff --git a/brozzler/db.py b/brozzler/db.py new file mode 100644 index 0000000..b062052 --- /dev/null +++ b/brozzler/db.py @@ -0,0 +1,120 @@ +# vim: set sw=4 et: + +import logging +import brozzler +import rethinkdb +r = rethinkdb + +class UnexpectedDbResult(Exception): + pass + +class BrozzlerRethinkDb: + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self, servers=["localhost"], db="brozzler", shards=3, replicas=3): + self.servers = servers + self.db = db + self.shards = shards + self.replicas = replicas + + self._conn = self._connect(servers[0]) # XXX round robin + try: + tables = r.db(self.db).table_list().run(self._conn) + for tbl in "sites", "pages": + if not tbl in tables: + raise Exception("rethinkdb database {} exists but does not have table {}".format(repr(self.db), repr(tbl))) + except rethinkdb.errors.ReqlOpFailedError as e: + self.logger.info("rethinkdb database %s does not exist, initializing", repr(self.db)) + self._init_db() + + def _connect(self, server): + self.logger.info("connecting to rethinkdb at %s", server) + try: + host, port = server.split(":") + return r.connect(host=host, port=port) + except ValueError: + return r.connect(host=server) + + # def _round_robin_connection(self): + # while True: + # for server in self.servers: + # try: + # host, port = server.split(":") + # conn = r.connect(host=host, port=port) + # except ValueError: + # conn = r.connect(host=server) + + def _init_db(self): + r.db_create(self.db).run(self._conn) + # r.db("test").table_create("jobs", shards=self.shards, replicas=self.replicas).run(self._conn) + r.db(self.db).table_create("sites", shards=self.shards, replicas=self.replicas).run(self._conn) + r.db(self.db).table_create("pages", shards=self.shards, replicas=self.replicas).run(self._conn) + r.db(self.db).table("pages").index_create("priority_by_site", [r.row["site_id"], r.row["claimed"], r.row["brozzle_count"], r.row["priority"]]).run(self._conn) + self.logger.info("created database %s with tables 'sites' and 'pages'", self.db) + + def _vet_result(self, result, **kwargs): + self.logger.debug("vetting expected=%s result=%s", kwargs, result) + # {'replaced': 0, 'errors': 0, 'skipped': 0, 'inserted': 1, 'deleted': 0, 'generated_keys': ['292859c1-4926-4b27-9d87-b2c367667058'], 'unchanged': 0} + for k in ["replaced", "errors", "skipped", "inserted", "deleted", "unchanged"]: + if k in kwargs: + expected = kwargs[k] + else: + expected = 0 + if isinstance(expected, list): + if result.get(k) not in kwargs[k]: + raise UnexpectedDbResult("expected {} to be one of {} in {}".format(repr(k), expected, result)) + else: + if result.get(k) != expected: + raise UnexpectedDbResult("expected {} to be {} in {}".format(repr(k), expected, result)) + + def new_site(self, site): + self.logger.info("inserting into 'sites' table %s", site) + result = r.db(self.db).table("sites").insert(site.to_dict()).run(self._conn) + self._vet_result(result, inserted=1) + site.id = result["generated_keys"][0] + + def update_site(self, site): + self.logger.debug("updating 'sites' table entry %s", site) + result = r.db(self.db).table("sites").get(site.id).update(site.to_dict()).run(self._conn) + self._vet_result(result, replaced=1) + + def update_page(self, page): + self.logger.debug("updating 'pages' table entry %s", page) + result = r.db(self.db).table("pages").get(page.id).update(page.to_dict()).run(self._conn) + self._vet_result(result, replaced=[0,1], unchanged=[0,1]) + + def new_page(self, page): + self.logger.debug("inserting into 'pages' table %s", page) + result = r.db(self.db).table("pages").insert(page.to_dict()).run(self._conn) + self._vet_result(result, inserted=1) + + def claim_site(self): + # XXX keep track of aggregate priority and prioritize sites accordingly? + result = r.db(self.db).table("sites").filter({"claimed":False,"status":"ACTIVE"}).limit(1).update({"claimed":True},return_changes=True).run(self._conn) + self._vet_result(result, replaced=[0,1]) + if result["replaced"] == 1: + return brozzler.Site(**result["changes"][0]["new_val"]) + else: + raise brozzler.NothingToClaim + + def claim_page(self, site): + result = (r.db(self.db).table("pages") + .between([site.id,False,0,brozzler.MIN_PRIORITY], [site.id,False,0,brozzler.MAX_PRIORITY], index="priority_by_site") + .order_by(index=r.desc("priority_by_site")).limit(1) + .update({"claimed":True},return_changes=True).run(self._conn)) + self._vet_result(result, replaced=[0,1]) + if result["replaced"] == 1: + return brozzler.Page(**result["changes"][0]["new_val"]) + else: + raise brozzler.NothingToClaim + + def has_outstanding_pages(self, site): + cursor = r.db(self.db).table("pages").between([site.id,False,0,brozzler.MIN_PRIORITY], [site.id,True,0,brozzler.MAX_PRIORITY], index="priority_by_site").limit(1).run(self._conn) + return len(list(cursor)) > 0 + + def get_page(self, page): + result = r.db(self.db).table("pages").get(page.id).run(self._conn) + if result: + return brozzler.Page(**result) + else: + return None diff --git a/brozzler/hq.py b/brozzler/hq.py deleted file mode 100644 index 73d47c7..0000000 --- a/brozzler/hq.py +++ /dev/null @@ -1,294 +0,0 @@ -# vim: set sw=4 et: - -import json -import logging -import brozzler -import sqlite3 -import time -import kombu -import kombu.simple -import reppy.cache -import requests - -class BrozzlerHQDb: - logger = logging.getLogger(__module__ + "." + __qualname__) - - def __init__(self, db_file="./brozzler-hq.db"): - self._conn = sqlite3.connect(db_file) - self._create_tables() - - def _create_tables(self): - cursor = self._conn.cursor() - cursor.executescript(""" - create table if not exists brozzler_sites ( - id integer primary key, - status varchar(100) default 'ACTIVE', - site_json text - ); - - create table if not exists brozzler_pages ( - id integer primary key, - site_id integer, - priority integer, - brozzle_count integer default 0, - in_progress boolean, - canon_url varchar(4000), - page_json text - ); - create index if not exists brozzler_pages_priority on brozzler_pages (priority desc); - create index if not exists brozzler_pages_site_id on brozzler_pages (site_id); - """) - self._conn.commit() - - def pop_page(self, site_id): - cursor = self._conn.cursor() - cursor.execute("select id, priority, page_json from brozzler_pages where site_id=? and not in_progress and brozzle_count=0 order by priority desc limit 1", (site_id,)) - row = cursor.fetchone() - if row: - (id, priority, page_json) = row - new_priority = priority - 2000 - cursor.execute("update brozzler_pages set priority=?, in_progress=1 where id=?", (new_priority, id)) - self._conn.commit() - - d = json.loads(page_json) - d["id"] = id - return d - else: - return None - - def completed(self, page): - cursor = self._conn.cursor() - cursor.execute("update brozzler_pages set in_progress=0, brozzle_count=brozzle_count+1 where id=?", (page.id,)) - self._conn.commit() - - def new_site(self, site): - cursor = self._conn.cursor() - cursor.execute("insert into brozzler_sites (site_json) values (?)", (site.to_json(),)) - self._conn.commit() - return cursor.lastrowid - - def update_site(self, site): - cursor = self._conn.cursor() - if site.reached_limit: - self.logger.info("setting status=FINISHED_REACHED_LIMIT because site.reached_limit=%s", site.reached_limit) - cursor.execute("update brozzler_sites set status=?, site_json=? where id=?", ("FINISHED_REACHED_LIMIT", site.to_json(), site.id)) - else: - cursor.execute("update brozzler_sites set site_json=? where id=?", (site.to_json(), site.id)) - self._conn.commit() - - def schedule_page(self, page, priority=0): - cursor = self._conn.cursor() - cursor.execute("insert into brozzler_pages (site_id, priority, canon_url, page_json, in_progress) values (?, ?, ?, ?, 0)", - (page.site_id, priority, page.canon_url(), page.to_json())) - self._conn.commit() - - def sites(self): - cursor = self._conn.cursor() - cursor.execute("select id, site_json from brozzler_sites where status not like 'FINISHED%'") - while True: - row = cursor.fetchone() - if row is None: - break - site_dict = json.loads(row[1]) - site_dict["id"] = row[0] - yield brozzler.Site(**site_dict) - - def update_page(self, page): - cursor = self._conn.cursor() - # CREATE TABLE brozzler_pages ( id integer primary key, site_id integer, priority integer, in_progress boolean, canon_url varchar(4000), page_json text - cursor.execute("select id, priority, page_json from brozzler_pages where site_id=? and canon_url=?", (page.site_id, page.canon_url())) - row = cursor.fetchone() - if row: - # (id, priority, existing_page) = row - new_priority = page.calc_priority() + row[1] - existing_page = brozzler.Page(**json.loads(row[2])) - existing_page.hops_from_seed = min(page.hops_from_seed, existing_page.hops_from_seed) - - cursor.execute("update brozzler_pages set priority=?, page_json=? where id=?", (new_priority, existing_page.to_json(), row[0])) - self._conn.commit() - else: - raise KeyError("page not in brozzler_pages site_id={} canon_url={}".format(page.site_id, page.canon_url())) - - def in_progress_pages(self, site): - cursor = self._conn.cursor() - cursor.execute("select id, page_json from brozzler_pages where site_id = ? and in_progress", (site.id,)) - - pages = [] - for row in cursor.fetchall(): - (id, page_json) = row - page = brozzler.Page(**json.loads(page_json)) - page.id = id - pages.append(page) - - if len(pages) > 1: - self.logger.error("more than one page in progress for site?! shouldn't happen, violates politeness policy... site={}: pages={}".format(site, pages)) - - return pages - - def set_status(self, site, status): - cursor = self._conn.cursor() - cursor.execute("update brozzler_sites set status=? where id=?", (status, site.id,)) - self._conn.commit() - - def get_status(self, site): - cursor = self._conn.cursor() - cursor.execute("select status from brozzler_sites where id=?", (site.id,)) - row = cursor.fetchone() - if row: - return row[0] - else: - raise KeyError("site not in brozzler_sites id={}".format(site.id,)) - -class BrozzlerHQ: - logger = logging.getLogger(__module__ + "." + __qualname__) - - def __init__(self, amqp_url="amqp://guest:guest@localhost:5672/%2f", db=None): - self.amqp_url = amqp_url - self._conn = kombu.Connection(amqp_url) - self._new_sites_q = self._conn.SimpleQueue("brozzler.sites.new") - self._unclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.unclaimed") - self._disclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.disclaimed") - if db != None: - self._db = db - else: - self._db = BrozzlerHQDb() - self._robots_caches = {} # {site_id:reppy.cache.RobotsCache} - - def _robots_cache(self, site): - class SessionRaiseOn420(requests.Session): - def get(self, url, **kwargs): - res = super().get(url, **kwargs) - if res.status_code == 420 and 'warcprox-meta' in res.headers: - raise brozzler.ReachedLimit(warcprox_meta=json.loads(res.headers['warcprox-meta']), http_payload=res.text) - else: - return res - - if not site.id in self._robots_caches: - req_sesh = SessionRaiseOn420() - req_sesh.verify = False # ignore cert errors - if site.proxy: - proxie = "http://{}".format(site.proxy) - req_sesh.proxies = {"http":proxie,"https":proxie} - if site.extra_headers: - req_sesh.headers.update(site.extra_headers) - self._robots_caches[site.id] = reppy.cache.RobotsCache(session=req_sesh) - - return self._robots_caches[site.id] - - def is_permitted_by_robots(self, site, url): - if site.ignore_robots: - return True - try: - result = self._robots_cache(site).allowed(url, "brozzler") - return result - except BaseException as e: - if isinstance(e, reppy.exceptions.ServerError) and isinstance(e.args[0], brozzler.ReachedLimit): - raise e.args[0] - else: - self.logger.error("problem with robots.txt for %s: %s", url, repr(e)) - return False - - def run(self): - try: - while True: - self._new_site() - self._consume_completed_page() - self._feed_pages() - self._disclaimed_site() - time.sleep(0.5) - finally: - self._conn.close() - - def _disclaimed_site(self): - try: - msg = self._disclaimed_sites_q.get(block=False) - self.logger.info("msg.payload=%s", msg.payload) - site = brozzler.Site(**msg.payload) - self.logger.info("site=%s", site) - self._db.update_site(site) - msg.ack() - self.logger.info("received disclaimed site {}".format(site)) - - status = self._db.get_status(site) - if not status.startswith("FINISHED"): - self.logger.info("feeding disclaimed site {} back to {}".format(site, self._unclaimed_sites_q.queue.name)) - self._unclaimed_sites_q.put(site.to_dict()) - else: - self.logger.info("disclaimed site is %s %s", status, site) - except kombu.simple.Empty: - pass - - def _new_site(self): - try: - msg = self._new_sites_q.get(block=False) - site = brozzler.Site(**msg.payload) - msg.ack() - - self.logger.info("new site {}".format(site)) - site_id = self._db.new_site(site) - site.id = site_id - - try: - if self.is_permitted_by_robots(site, site.seed): - page = brozzler.Page(site.seed, site_id=site.id, hops_from_seed=0) - self._db.schedule_page(page, priority=1000) - self._unclaimed_sites_q.put(site.to_dict()) - else: - self.logger.warn("seed url {} is blocked by robots.txt".format(site.seed)) - except brozzler.ReachedLimit as e: - site.note_limit_reached(e) - self._db.update_site(site) - except kombu.simple.Empty: - pass - - def _finished(self, site): - self.logger.info("site FINISHED! {}".format(site)) - self._db.set_status(site, "FINISHED") - - def _feed_pages(self): - for site in self._db.sites(): - q = self._conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id)) - if len(q) == 0: - page = self._db.pop_page(site.id) - if page: - self.logger.info("feeding {} to {}".format(page, q.queue.name)) - q.put(page) - elif not self._db.in_progress_pages(site): - self._finished(site) - - def _scope_and_schedule_outlinks(self, site, parent_page): - counts = {"added":0,"updated":0,"rejected":0,"blocked":0} - if parent_page.outlinks: - for url in parent_page.outlinks: - if site.is_in_scope(url, parent_page): - if self.is_permitted_by_robots(site, url): - child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1) - try: - self._db.update_page(child_page) - counts["updated"] += 1 - except KeyError: - self._db.schedule_page(child_page, priority=child_page.calc_priority()) - counts["added"] += 1 - else: - counts["blocked"] += 1 - else: - counts["rejected"] += 1 - - self.logger.info("{} new links added, {} existing links updated, {} links rejected, {} links blocked by robots from {}".format( - counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page)) - - def _consume_completed_page(self): - for site in self._db.sites(): - q = self._conn.SimpleQueue("brozzler.sites.{}.completed_pages".format(site.id)) - try: - msg = q.get(block=False) - completed_page = brozzler.Page(**msg.payload) - msg.ack() - self._db.completed(completed_page) - if completed_page.redirect_url and completed_page.hops_from_seed == 0: - site.note_seed_redirect(completed_page.redirect_url) - self._db.update_site(site) - self._scope_and_schedule_outlinks(site, completed_page) - except kombu.simple.Empty: - pass - diff --git a/brozzler/robots.py b/brozzler/robots.py new file mode 100644 index 0000000..84daab7 --- /dev/null +++ b/brozzler/robots.py @@ -0,0 +1,43 @@ +# vim: set sw=4 et: + +import json +import logging +import brozzler +import reppy.cache +import requests + +_robots_caches = {} # {site_id:reppy.cache.RobotsCache} +def _robots_cache(site): + class SessionRaiseOn420(requests.Session): + def get(self, url, *args, **kwargs): + res = super().get(url, *args, **kwargs) + if res.status_code == 420 and 'warcprox-meta' in res.headers: + raise ReachedLimit(warcprox_meta=json.loads(res.headers['warcprox-meta']), http_payload=res.text) + else: + return res + + if not site.id in _robots_caches: + req_sesh = SessionRaiseOn420() + req_sesh.verify = False # ignore cert errors + if site.proxy: + proxie = "http://{}".format(site.proxy) + req_sesh.proxies = {"http":proxie,"https":proxie} + if site.extra_headers: + req_sesh.headers.update(site.extra_headers) + _robots_caches[site.id] = reppy.cache.RobotsCache(session=req_sesh) + + return _robots_caches[site.id] + +def is_permitted_by_robots(site, url): + if site.ignore_robots: + return True + try: + result = _robots_cache(site).allowed(url, "brozzler") + return result + except BaseException as e: + if isinstance(e, reppy.exceptions.ServerError) and isinstance(e.args[0], brozzler.ReachedLimit): + raise e.args[0] + else: + logging.error("problem with robots.txt for %s: %s", url, repr(e), exc_info=True) + return False + diff --git a/brozzler/site.py b/brozzler/site.py index 612c62a..9f062dc 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -4,13 +4,26 @@ import surt import json import logging import brozzler +import hashlib -class Site: +class BaseDictable: + def to_dict(self): + d = dict(vars(self)) + for k in vars(self): + if k.startswith("_") or d[k] is None: + del d[k] + return d + + def to_json(self): + return json.dumps(self.to_dict(), separators=(',', ':')) + +class Site(BaseDictable): logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, seed, id=None, scope=None, proxy=None, ignore_robots=False, time_limit=None, extra_headers=None, - enable_warcprox_features=False, reached_limit=None): + enable_warcprox_features=False, reached_limit=None, status="ACTIVE", + claimed=False): self.seed = seed self.id = id self.proxy = proxy @@ -19,6 +32,8 @@ class Site: self.extra_headers = extra_headers self.time_limit = time_limit self.reached_limit = reached_limit + self.status = status + self.claimed = bool(claimed) self.scope = scope or {} if not "surt" in self.scope: @@ -44,6 +59,7 @@ class Site: e.warcprox_meta["reached-limit"], self.reached_limit) else: self.reached_limit = e.warcprox_meta["reached-limit"] + self.status = "FINISHED_REACHED_LIMIT" def is_in_scope(self, url, parent_page=None): if parent_page and "max_hops" in self.scope and parent_page.hops_from_seed >= self.scope["max_hops"]: @@ -62,25 +78,26 @@ class Site: self.logger.warn("""problem parsing url "{}" """.format(url)) return False - def to_dict(self): - d = dict(vars(self)) - for k in vars(self): - if k.startswith("_"): - del d[k] - return d - - def to_json(self): - return json.dumps(self.to_dict(), separators=(',', ':')) - -class Page: - def __init__(self, url, id=None, site_id=None, hops_from_seed=0, outlinks=None, redirect_url=None): - self.id = id +class Page(BaseDictable): + def __init__(self, url, id=None, site_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0): self.site_id = site_id self.url = url self.hops_from_seed = hops_from_seed - self._canon_hurl = None - self.outlinks = outlinks self.redirect_url = redirect_url + self.claimed = bool(claimed) + self.brozzle_count = brozzle_count + self._canon_hurl = None + + if priority is not None: + self.priority = priority + else: + self.priority = self._calc_priority() + + if id is not None: + self.id = id + else: + digest_this = "site_id:{},canon_url:{}".format(self.site_id, self.canon_url()) + self.id = hashlib.sha1(digest_this.encode("utf-8")).hexdigest() def __repr__(self): return """Page(url={},site_id={},hops_from_seed={})""".format( @@ -89,10 +106,12 @@ class Page: def note_redirect(self, url): self.redirect_url = url - def calc_priority(self): + def _calc_priority(self): priority = 0 priority += max(0, 10 - self.hops_from_seed) priority += max(0, 6 - self.canon_url().count("/")) + priority = max(priority, brozzler.MIN_PRIORITY) + priority = min(priority, brozzler.MAX_PRIORITY) return priority def canon_url(self): @@ -101,20 +120,3 @@ class Page: surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl) return self._canon_hurl.geturl() - def to_dict(self): - d = dict(vars(self)) - - for k in vars(self): - if k.startswith("_"): - del d[k] - - if self.outlinks is not None and not isinstance(self.outlinks, list): - outlinks = [] - outlinks.extend(self.outlinks) - d["outlinks"] = outlinks - - return d - - def to_json(self): - return json.dumps(self.to_dict(), separators=(',', ':')) - diff --git a/brozzler/worker.py b/brozzler/worker.py index 89368e6..98ae5f1 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -6,7 +6,6 @@ import brozzler import threading import time import signal -import kombu import youtube_dl import urllib.request import json @@ -14,9 +13,8 @@ import json class BrozzlerWorker: logger = logging.getLogger(__module__ + "." + __qualname__) - def __init__(self, amqp_url="amqp://guest:guest@localhost:5672/%2f", - max_browsers=1, chrome_exe="chromium-browser"): - self._amqp_url = amqp_url + def __init__(self, db, max_browsers=1, chrome_exe="chromium-browser"): + self._db = db self._max_browsers = max_browsers self._browser_pool = brozzler.browser.BrowserPool(max_browsers, chrome_exe=chrome_exe, ignore_cert_errors=True) @@ -44,31 +42,25 @@ class BrozzlerWorker: ## os.environ["http_proxy"] = "http://{}".format(site.proxy) return youtube_dl.YoutubeDL(ydl_opts) - def _next_page(self, site): - """Raises kombu.simple.Empty if queue is empty""" - with kombu.Connection(self._amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id)) - msg = q.get(block=True, timeout=0.5) - page_dict = msg.payload - page = brozzler.Page(**page_dict) - msg.ack() - return page - def _completed_page(self, site, page): - with kombu.Connection(self._amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.{}.completed_pages".format(site.id)) - self.logger.info("feeding {} to {}".format(page, q.queue.name)) - q.put(page.to_dict()) + page.brozzle_count += 1 + page.claimed = False + # XXX set priority? + self._db.update_page(page) + if page.redirect_url and page.hops_from_seed == 0: + site.note_seed_redirect(page.redirect_url) + self._db.update_site(site) def _disclaim_site(self, site, page=None): - with kombu.Connection(self._amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.disclaimed".format(site.id)) - self.logger.info("feeding {} to {}".format(site, q.queue.name)) - q.put(site.to_dict()) - if page: - q = conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id)) - self.logger.info("feeding unfinished page {} to {}".format(page, q.queue.name)) - q.put(page.to_dict()) + self.logger.info("disclaiming %s", site) + site.claimed = False + if not page and not self._db.has_outstanding_pages(site): + self.logger.info("site FINISHED! %s", site) + site.status = "FINISHED" + self._db.update_site(site) + if page: + page.claimed = False + self._db.update_page(page) def _putmeta(self, warcprox_address, url, content_type, payload, extra_headers=None): headers = {"Content-Type":content_type} @@ -110,6 +102,29 @@ class BrozzlerWorker: else: raise + def _scope_and_schedule_outlinks(self, site, parent_page, outlinks): + counts = {"added":0,"updated":0,"rejected":0,"blocked":0} + if outlinks: + for url in outlinks: + if site.is_in_scope(url, parent_page): + if brozzler.is_permitted_by_robots(site, url): + new_child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1) + existing_child_page = self._db.get_page(new_child_page) + if existing_child_page: + existing_child_page.priority += new_child_page.priority + self._db.update_page(existing_child_page) + counts["updated"] += 1 + else: + self._db.new_page(new_child_page) + counts["added"] += 1 + else: + counts["blocked"] += 1 + else: + counts["rejected"] += 1 + + self.logger.info("%s new links added, %s existing links updated, %s links rejected, %s links blocked by robots from %s", + counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page) + def brozzle_page(self, browser, ydl, site, page): def on_screenshot(screenshot_png): if site.proxy and site.enable_warcprox_features: @@ -126,10 +141,10 @@ class BrozzlerWorker: except: self.logger.error("youtube_dl raised exception on {}".format(page), exc_info=True) - page.outlinks = browser.browse_page(page.url, - extra_headers=site.extra_headers, - on_screenshot=on_screenshot, + outlinks = browser.browse_page(page.url, + extra_headers=site.extra_headers, on_screenshot=on_screenshot, on_url_change=page.note_redirect) + self._scope_and_schedule_outlinks(site, page, outlinks) def _brozzle_site(self, browser, ydl, site): start = time.time() @@ -137,71 +152,48 @@ class BrozzlerWorker: try: browser.start(proxy=site.proxy) while not self._shutdown_requested.is_set() and time.time() - start < 300: - try: - page = self._next_page(site) - self.brozzle_page(browser, ydl, site, page) - self._completed_page(site, page) - page = None - except kombu.simple.Empty: - # if some timeout reached, re-raise? - pass - # except kombu.simple.Empty: - # self.logger.info("finished {} (queue is empty)".format(site)) + page = self._db.claim_page(site) + self.brozzle_page(browser, ydl, site, page) + self._completed_page(site, page) + page = None + except brozzler.NothingToClaim: + self.logger.info("no pages left for site %s", site) except brozzler.ReachedLimit as e: site.note_limit_reached(e) except brozzler.browser.BrowsingAborted: self.logger.info("{} shut down".format(browser)) + except: + self.logger.critical("unexpected exception", exc_info=True) finally: self.logger.info("finished session brozzling site, stopping browser and disclaiming site") browser.stop() self._disclaim_site(site, page) self._browser_pool.release(browser) - def _claim_site(self, q): - """Reads message from SimpleQueue q, fires off thread to brozzle the - site. Raises KeyError if no browsers are available, kombu.simple.Empty - if queue is empty.""" - browser = self._browser_pool.acquire() - try: - msg = q.get(block=True, timeout=0.5) - site = brozzler.Site(**msg.payload) - msg.ack() - self.logger.info("brozzling site {}".format(site)) - ydl = self._youtube_dl(site) - th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site), - name="BrowsingThread-{}".format(site.seed)) - th.start() - except: - self._browser_pool.release(browser) - raise - def run(self): latest_state = None while not self._shutdown_requested.is_set(): try: - # XXX too much connecting and disconnecting from rabbitmq - with kombu.Connection(self._amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.unclaimed") - q_empty = False - if len(q) > 0: - try: - self._claim_site(q) - except kombu.simple.Empty: - q_empty = True - except KeyError: - if latest_state != "browsers-busy": - self.logger.info("all {} browsers are busy".format(self._max_browsers)) - latest_state = "browsers-busy" - else: - q_empty = True - - if q_empty: - if latest_state != "no-unclaimed-sites": - # self.logger.info("no unclaimed sites to browse") - latest_state = "no-unclaimed-sites" - time.sleep(0.5) - except OSError as e: - self.logger.warn("continuing after i/o exception (from rabbitmq?)", exc_info=True) + browser = self._browser_pool.acquire() + try: + site = self._db.claim_site() + self.logger.info("brozzling site %s", site) + ydl = self._youtube_dl(site) + th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site), + name="BrowsingThread-{}".format(site.seed)) + th.start() + except: + self._browser_pool.release(browser) + raise + except brozzler.browser.NoBrowsersAvailable: + if latest_state != "browsers-busy": + self.logger.info("all %s browsers are busy", self._max_browsers) + latest_state = "browsers-busy" + except brozzler.NothingToClaim: + if latest_state != "no-unclaimed-sites": + self.logger.info("no unclaimed sites to browse") + latest_state = "no-unclaimed-sites" + time.sleep(0.5) def start(self): th = threading.Thread(target=self.run, name="BrozzlerWorker") diff --git a/requirements.txt b/requirements.txt index 016c3e2..794a4ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -kombu argparse PyYAML git+https://github.com/ikreymer/surt.git@py3 @@ -8,3 +7,4 @@ git+https://github.com/seomoz/reppy.git requests # websocket-client git+https://github.com/nlevitt/websocket-client.git@tweaks +rethinkdb