submit outlinks to hq

This commit is contained in:
Noah Levitt 2015-07-10 21:31:41 -07:00
parent fd99764baa
commit 56a7bb7306
2 changed files with 29 additions and 11 deletions

View File

@ -40,7 +40,6 @@ class Site:
surtt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) surtt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True)
return surtt.startswith(self.scope_surt) return surtt.startswith(self.scope_surt)
# should this be a generator function?
def next_url(self): def next_url(self):
"""Raises kombu.simple.Empty if queue is empty""" """Raises kombu.simple.Empty if queue is empty"""
with kombu.Connection(args.amqp_url) as conn: with kombu.Connection(args.amqp_url) as conn:
@ -51,22 +50,32 @@ class Site:
msg.ack() msg.ack()
return crawl_url return crawl_url
def completed(self, crawl_url):
with kombu.Connection(args.amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(self.id))
q.put(crawl_url.to_json())
browsers = set() browsers = set()
browsers_lock = threading.Lock() browsers_lock = threading.Lock()
# "browse" + "crawl" = "brozzle" shutdown_requested = threading.Event()
def brozzle_site(site, chrome_port): def brozzle_site(site, chrome_port):
with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser: with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser:
with browsers_lock: with browsers_lock:
browsers.add(browser) browsers.add(browser)
try: try:
while True: while not shutdown_requested.is_set():
crawl_url = site.next_url() try:
logging.info("crawling {}".format(crawl_url)) crawl_url = site.next_url()
outlinks = browser.browse_page(crawl_url.url) logging.info("crawling {}".format(crawl_url))
# site.submit(outlinks, hops_from_seed=crawl_url.hops_from_seed+1) crawl_url.outlinks = browser.browse_page(crawl_url.url)
except kombu.simple.Empty: site.completed(crawl_url)
logging.info("finished {} (queue is empty)".format(site)) except kombu.simple.Empty:
# if some timeout reached, raise
pass
# except kombu.simple.Empty:
# logging.info("finished {} (queue is empty)".format(site))
except umbra.browser.BrowsingAborted: except umbra.browser.BrowsingAborted:
logging.info("{} shut down") logging.info("{} shut down")
finally: finally:
@ -105,6 +114,7 @@ try:
time.sleep(0.5) time.sleep(0.5)
except ShutdownRequested as e: except ShutdownRequested as e:
logging.info("shutting down browsers") logging.info("shutting down browsers")
shutdown_requested.set()
with browsers_lock: with browsers_lock:
for browser in browsers: for browser in browsers:

View File

@ -4,12 +4,13 @@ import surt
import json import json
class CrawlUrl: class CrawlUrl:
def __init__(self, url, id=None, site_id=None, hops_from_seed=0): def __init__(self, url, id=None, site_id=None, hops_from_seed=0, outlinks=None):
self.id = id self.id = id
self.site_id = site_id self.site_id = site_id
self.url = url self.url = url
self.hops_from_seed = hops_from_seed self.hops_from_seed = hops_from_seed
self._canon_hurl = None self._canon_hurl = None
self.outlinks = None
def __repr__(self): def __repr__(self):
return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format( return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format(
@ -22,6 +23,13 @@ class CrawlUrl:
return self._canon_hurl.geturl() return self._canon_hurl.geturl()
def to_json(self): def to_json(self):
d = dict(id=self.id, site_id=self.site_id, url=self.url, hops_from_seed=self.hops_from_seed) if self.outlinks is not None and not isinstance(self.outlinks, list):
outlinks = []
outlinks.extend(self.outlinks)
else:
outlinks = self.outlinks
d = dict(id=self.id, site_id=self.site_id, url=self.url,
hops_from_seed=self.hops_from_seed, outlinks=outlinks)
return json.dumps(d, separators=(',', ':')) return json.dumps(d, separators=(',', ':'))