crawling outlinks not totally working

This commit is contained in:
Noah Levitt 2015-07-11 02:29:19 -07:00
parent 56a7bb7306
commit 1fb336cb2e
3 changed files with 92 additions and 47 deletions

View File

@ -12,6 +12,7 @@ import time
import kombu import kombu
import kombu.simple import kombu.simple
import json import json
import umbra.hq
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description="brozzler-hq - headquarters of distributed brozzler crawl", description="brozzler-hq - headquarters of distributed brozzler crawl",
@ -73,10 +74,9 @@ class BrozzlerHQDb:
else: else:
return None return None
def new_site(self, site_dict): def new_site(self, site):
site_json = json.dumps(site_dict, separators=(',', ':'))
cursor = self._conn.cursor() cursor = self._conn.cursor()
cursor.execute("insert into brozzler_sites (site_json) values (?)", (site_json,)) cursor.execute("insert into brozzler_sites (site_json) values (?)", (site.to_json(),))
self._conn.commit() self._conn.commit()
return cursor.lastrowid return cursor.lastrowid
@ -95,7 +95,23 @@ class BrozzlerHQDb:
break break
site_dict = json.loads(row[1]) site_dict = json.loads(row[1])
site_dict["id"] = row[0] site_dict["id"] = row[0]
yield site_dict yield umbra.hq.Site(**site_dict)
def update_crawl_url(self, crawl_url):
cursor = self._conn.cursor()
# CREATE TABLE brozzler_urls ( id integer primary key, site_id integer, priority integer, in_progress boolean, canon_url varchar(4000), crawl_url_json text
cursor.execute("select id, priority, crawl_url_json from brozzler_urls where site_id=? and canon_url=?", (crawl_url.site_id, crawl_url.canonical()))
row = cursor.fetchone()
if row:
# (id, priority, existing_crawl_url) = row
new_priority = crawl_url.calc_priority() + row[1]
existing_crawl_url = CrawlUrl(**json.loads(row[2]))
existing_crawl_url.hops_from_seed = min(crawl_url.hops_from_seed, existing_crawl_url.hops_from_seed)
cursor.execute("update brozzler_urls set priority=?, crawl_url_json=? where id=?", (new_priority, existing_crawl_url.to_json(), row["id"]))
self._conn.commit()
else:
raise KeyError("crawl url not in brozzler_urls site_id={} url={}".format(crawl_url.site_id, crawl_url.canonical()))
class BrozzlerHQ: class BrozzlerHQ:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = logging.getLogger(__module__ + "." + __qualname__)
@ -114,7 +130,7 @@ class BrozzlerHQ:
try: try:
while True: while True:
self._new_site() self._new_site()
# self._consume_completed_url() self._consume_completed_url()
self._feed_crawl_urls() self._feed_crawl_urls()
time.sleep(0.5) time.sleep(0.5)
finally: finally:
@ -123,13 +139,16 @@ class BrozzlerHQ:
def _new_site(self): def _new_site(self):
try: try:
msg = self._new_sites_q.get(block=False) msg = self._new_sites_q.get(block=False)
new_site_dict = msg.payload new_site = umbra.hq.Site(**msg.payload)
site_id = self._db.new_site(new_site_dict)
new_site_dict["id"] = site_id
self._schedule_seed(site_id, new_site_dict["seed"])
self._unclaimed_sites_q.put(new_site_dict)
msg.ack() msg.ack()
self.logger.info("new site {}".format(new_site_dict))
site_id = self._db.new_site(new_site)
new_site.id = site_id
self._schedule_seed(site_id, new_site.seed)
self._unclaimed_sites_q.put(new_site.to_dict())
self.logger.info("new site {}".format(new_site))
except kombu.simple.Empty: except kombu.simple.Empty:
pass pass
@ -139,13 +158,33 @@ class BrozzlerHQ:
def _feed_crawl_urls(self): def _feed_crawl_urls(self):
for site in self._db.sites(): for site in self._db.sites():
q = self._conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site["id"])) q = self._conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id))
if len(q) == 0: if len(q) == 0:
url = self._db.pop_url(site["id"]) url = self._db.pop_url(site.id)
if url: if url:
self.logger.info("feeding {} to {}".format(url, q.queue.name)) self.logger.info("feeding {} to {}".format(url, q.queue.name))
q.put(url) q.put(url)
def _consume_completed_url(self):
for site in self._db.sites():
q = self._conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id))
try:
msg = q.get(block=False)
completed_url = umbra.CrawlUrl(**msg.payload)
msg.ack()
self.logger.info("adding outlinks from {} outlinks={}".format(completed_url, completed_url.outlinks))
if completed_url.outlinks:
for url in completed_url.outlinks:
crawl_url = umbra.CrawlUrl(url, site_id=site.id, hops_from_seed=completed_url.hops_from_seed+1)
try:
self._db.update_crawl_url(crawl_url)
except KeyError:
self._db.schedule_url(crawl_url, priority=crawl_url.calc_priority())
except kombu.simple.Empty:
pass
logging.info("brozzler-hq starting") logging.info("brozzler-hq starting")
db = BrozzlerHQDb(db_file=args.db_file) db = BrozzlerHQDb(db_file=args.db_file)

View File

@ -11,6 +11,7 @@ import time
import surt import surt
import signal import signal
import kombu import kombu
from umbra import hq
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description='crawl-url - browse urls, follow links', description='crawl-url - browse urls, follow links',
@ -30,35 +31,26 @@ args = arg_parser.parse_args(args=sys.argv[1:])
logging.basicConfig(stream=sys.stdout, level=args.log_level, logging.basicConfig(stream=sys.stdout, level=args.log_level,
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
class Site: browsers = set()
def __init__(self, id, seed): browsers_lock = threading.Lock()
self.id = id
self.seed = seed
self.scope_surt = surt.surt(seed, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True)
def is_in_scope(self, url): shutdown_requested = threading.Event()
surtt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True)
return surtt.startswith(self.scope_surt)
def next_url(self): def next_url(site):
"""Raises kombu.simple.Empty if queue is empty""" """Raises kombu.simple.Empty if queue is empty"""
with kombu.Connection(args.amqp_url) as conn: with kombu.Connection(args.amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(self.id)) q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id))
msg = q.get(block=True, timeout=0.5) msg = q.get(block=True, timeout=0.5)
crawl_url_dict = msg.payload crawl_url_dict = msg.payload
crawl_url = umbra.CrawlUrl(**crawl_url_dict) crawl_url = umbra.CrawlUrl(**crawl_url_dict)
msg.ack() msg.ack()
return crawl_url return crawl_url
def completed(self, crawl_url): def completed(site, crawl_url):
with kombu.Connection(args.amqp_url) as conn: with kombu.Connection(args.amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(self.id)) q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id))
q.put(crawl_url.to_json()) logging.info("putting {} on queue {}".format(crawl_url, q.queue.name))
q.put(crawl_url.to_dict())
browsers = set()
browsers_lock = threading.Lock()
shutdown_requested = threading.Event()
def brozzle_site(site, chrome_port): def brozzle_site(site, chrome_port):
with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser: with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser:
@ -67,10 +59,10 @@ def brozzle_site(site, chrome_port):
try: try:
while not shutdown_requested.is_set(): while not shutdown_requested.is_set():
try: try:
crawl_url = site.next_url() crawl_url = next_url(site)
logging.info("crawling {}".format(crawl_url)) logging.info("crawling {}".format(crawl_url))
crawl_url.outlinks = browser.browse_page(crawl_url.url) crawl_url.outlinks = browser.browse_page(crawl_url.url)
site.completed(crawl_url) completed(site, crawl_url)
except kombu.simple.Empty: except kombu.simple.Empty:
# if some timeout reached, raise # if some timeout reached, raise
pass pass
@ -93,6 +85,7 @@ def sigint(signum, frame):
signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigint) signal.signal(signal.SIGINT, sigint)
latest_state = None
chrome_port = 9200 chrome_port = 9200
try: try:
while True: while True:
@ -101,16 +94,21 @@ try:
q = conn.SimpleQueue("brozzler.sites.unclaimed") q = conn.SimpleQueue("brozzler.sites.unclaimed")
try: try:
msg = q.get(block=True, timeout=0.5) msg = q.get(block=True, timeout=0.5)
site_dict = msg.payload site = hq.Site(**msg.payload)
site = Site(**site_dict) logging.info("browsing site {}".format(site))
msg.ack() msg.ack()
th = threading.Thread(target=lambda: brozzle_site(site, chrome_port), th = threading.Thread(target=lambda: brozzle_site(site, chrome_port),
name="BrowsingThread-{}".format(site.scope_surt)) name="BrowsingThread-{}".format(site.scope_surt))
th.start() th.start()
chrome_port += 1 chrome_port += 1
except kombu.simple.Empty: except kombu.simple.Empty:
pass if latest_state != "no-unclaimed-sites":
logging.info("no unclaimed sites to browse")
latest_state = "no-unclaimed-sites"
else: else:
if latest_state != "browsers-busy":
logging.info("all {} browsers are busy, not looking for unclaimed sites".format(args.max_browsers))
latest_state = "browsers-busy"
time.sleep(0.5) time.sleep(0.5)
except ShutdownRequested as e: except ShutdownRequested as e:
logging.info("shutting down browsers") logging.info("shutting down browsers")

View File

@ -10,26 +10,34 @@ class CrawlUrl:
self.url = url self.url = url
self.hops_from_seed = hops_from_seed self.hops_from_seed = hops_from_seed
self._canon_hurl = None self._canon_hurl = None
self.outlinks = None self.outlinks = outlinks
def __repr__(self): def __repr__(self):
return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format( return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format(
self.url, self.site_id, self.hops_from_seed) self.url, self.site_id, self.hops_from_seed)
def calc_priority(self):
priority = 0
priority += max(0, 10 - self.hops_from_seed)
priority += max(0, 6 - self.canonical().count("/"))
return priority
def canonical(self): def canonical(self):
if self._canon_hurl is None: if self._canon_hurl is None:
self._canon_hurl = surt.handyurl.parse(self.url) self._canon_hurl = surt.handyurl.parse(self.url)
surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl) surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl)
return self._canon_hurl.geturl() return self._canon_hurl.geturl()
def to_json(self): def to_dict(self):
if self.outlinks is not None and not isinstance(self.outlinks, list): if self.outlinks is not None and not isinstance(self.outlinks, list):
outlinks = [] outlinks = []
outlinks.extend(self.outlinks) outlinks.extend(self.outlinks)
else: else:
outlinks = self.outlinks outlinks = self.outlinks
d = dict(id=self.id, site_id=self.site_id, url=self.url, return dict(id=self.id, site_id=self.site_id, url=self.url,
hops_from_seed=self.hops_from_seed, outlinks=outlinks) hops_from_seed=self.hops_from_seed, outlinks=outlinks)
return json.dumps(d, separators=(',', ':'))
def to_json(self):
return json.dumps(self.to_dict(), separators=(',', ':'))