change terminology CrawlUrl => Page since that better represents what it means in brozzler and differentiates from heritrix

This commit is contained in:
Noah Levitt 2015-07-16 18:39:29 -07:00
parent d2650a2547
commit a54e60dbaf
4 changed files with 82 additions and 82 deletions

View File

@ -1,5 +1,5 @@
from brozzler.browser import Browser, BrowserPool
from brozzler.site import CrawlUrl, Site
from brozzler.site import Page, Site
from brozzler.hq import BrozzlerHQ
def _read_version():

View File

@ -23,38 +23,38 @@ class BrozzlerHQDb:
site_json text
);
create table if not exists brozzler_urls (
create table if not exists brozzler_pages (
id integer primary key,
site_id integer,
priority integer,
in_progress boolean,
canon_url varchar(4000),
crawl_url_json text
page_json text
);
create index if not exists brozzler_urls_priority on brozzler_urls (priority desc);
create index if not exists brozzler_urls_site_id on brozzler_urls (site_id);
create index if not exists brozzler_pages_priority on brozzler_pages (priority desc);
create index if not exists brozzler_pages_site_id on brozzler_pages (site_id);
""")
self._conn.commit()
def pop_url(self, site_id):
def pop_page(self, site_id):
cursor = self._conn.cursor()
cursor.execute("select id, priority, crawl_url_json from brozzler_urls where site_id = ? and not in_progress order by priority desc limit 1", (site_id,))
cursor.execute("select id, priority, page_json from brozzler_pages where site_id = ? and not in_progress order by priority desc limit 1", (site_id,))
row = cursor.fetchone()
if row:
(id, priority, crawl_url_json) = row
(id, priority, page_json) = row
new_priority = priority - 2000
cursor.execute("update brozzler_urls set priority=?, in_progress=1 where id=?", (new_priority, id))
cursor.execute("update brozzler_pages set priority=?, in_progress=1 where id=?", (new_priority, id))
self._conn.commit()
d = json.loads(crawl_url_json)
d = json.loads(page_json)
d["id"] = id
return d
else:
return None
def completed(self, crawl_url):
def completed(self, page):
cursor = self._conn.cursor()
cursor.execute("update brozzler_urls set in_progress=0 where id=?", (crawl_url.id,))
cursor.execute("update brozzler_pages set in_progress=0 where id=?", (page.id,))
self._conn.commit()
def new_site(self, site):
@ -68,10 +68,10 @@ class BrozzlerHQDb:
cursor.execute("update brozzler_sites set site_json=? where id=?", (site.to_json(), site.id))
self._conn.commit()
def schedule_url(self, crawl_url, priority=0):
def schedule_page(self, page, priority=0):
cursor = self._conn.cursor()
cursor.execute("insert into brozzler_urls (site_id, priority, canon_url, crawl_url_json, in_progress) values (?, ?, ?, ?, 0)",
(crawl_url.site_id, priority, crawl_url.canonical(), crawl_url.to_json()))
cursor.execute("insert into brozzler_pages (site_id, priority, canon_url, page_json, in_progress) values (?, ?, ?, ?, 0)",
(page.site_id, priority, page.canonical(), page.to_json()))
self._conn.commit()
def sites(self):
@ -85,21 +85,21 @@ class BrozzlerHQDb:
site_dict["id"] = row[0]
yield brozzler.Site(**site_dict)
def update_crawl_url(self, crawl_url):
def update_page(self, page):
cursor = self._conn.cursor()
# CREATE TABLE brozzler_urls ( id integer primary key, site_id integer, priority integer, in_progress boolean, canon_url varchar(4000), crawl_url_json text
cursor.execute("select id, priority, crawl_url_json from brozzler_urls where site_id=? and canon_url=?", (crawl_url.site_id, crawl_url.canonical()))
# CREATE TABLE brozzler_pages ( id integer primary key, site_id integer, priority integer, in_progress boolean, canon_url varchar(4000), page_json text
cursor.execute("select id, priority, page_json from brozzler_pages where site_id=? and canon_url=?", (page.site_id, page.canonical()))
row = cursor.fetchone()
if row:
# (id, priority, existing_crawl_url) = row
new_priority = crawl_url.calc_priority() + row[1]
existing_crawl_url = brozzler.CrawlUrl(**json.loads(row[2]))
existing_crawl_url.hops_from_seed = min(crawl_url.hops_from_seed, existing_crawl_url.hops_from_seed)
# (id, priority, existing_page) = row
new_priority = page.calc_priority() + row[1]
existing_page = brozzler.Page(**json.loads(row[2]))
existing_page.hops_from_seed = min(page.hops_from_seed, existing_page.hops_from_seed)
cursor.execute("update brozzler_urls set priority=?, crawl_url_json=? where id=?", (new_priority, existing_crawl_url.to_json(), row[0]))
cursor.execute("update brozzler_pages set priority=?, page_json=? where id=?", (new_priority, existing_page.to_json(), row[0]))
self._conn.commit()
else:
raise KeyError("crawl url not in brozzler_urls site_id={} url={}".format(crawl_url.site_id, crawl_url.canonical()))
raise KeyError("page not in brozzler_pages site_id={} canon_url={}".format(page.site_id, page.canonical()))
class BrozzlerHQ:
logger = logging.getLogger(__module__ + "." + __qualname__)
@ -118,8 +118,8 @@ class BrozzlerHQ:
try:
while True:
self._new_site()
self._consume_completed_url()
self._feed_crawl_urls()
self._consume_completed_page()
self._feed_pages()
time.sleep(0.5)
finally:
self._conn.close()
@ -135,35 +135,35 @@ class BrozzlerHQ:
new_site.id = site_id
if new_site.is_permitted_by_robots(new_site.seed):
crawl_url = brozzler.CrawlUrl(new_site.seed, site_id=new_site.id, hops_from_seed=0)
self._db.schedule_url(crawl_url, priority=1000)
page = brozzler.Page(new_site.seed, site_id=new_site.id, hops_from_seed=0)
self._db.schedule_page(page, priority=1000)
self._unclaimed_sites_q.put(new_site.to_dict())
else:
self.logger.warn("seed url {} is blocked by robots.txt".format(new_site.seed))
except kombu.simple.Empty:
pass
def _feed_crawl_urls(self):
def _feed_pages(self):
for site in self._db.sites():
q = self._conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id))
q = self._conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id))
if len(q) == 0:
url = self._db.pop_url(site.id)
if url:
self.logger.info("feeding {} to {}".format(url, q.queue.name))
q.put(url)
page = self._db.pop_page(site.id)
if page:
self.logger.info("feeding {} to {}".format(page, q.queue.name))
q.put(page)
def _scope_and_schedule_outlinks(self, site, parent_url):
def _scope_and_schedule_outlinks(self, site, parent_page):
counts = {"added":0,"updated":0,"rejected":0,"blocked":0}
if parent_url.outlinks:
for url in parent_url.outlinks:
if parent_page.outlinks:
for url in parent_page.outlinks:
if site.is_in_scope(url):
if site.is_permitted_by_robots(url):
crawl_url = brozzler.CrawlUrl(url, site_id=site.id, hops_from_seed=parent_url.hops_from_seed+1)
child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=page.hops_from_seed+1)
try:
self._db.update_crawl_url(crawl_url)
self._db.update_page(child_page)
counts["updated"] += 1
except KeyError:
self._db.schedule_url(crawl_url, priority=crawl_url.calc_priority())
self._db.schedule_page(child_page, priority=child_page.calc_priority())
counts["added"] += 1
else:
counts["blocked"] += 1
@ -171,20 +171,20 @@ class BrozzlerHQ:
counts["rejected"] += 1
self.logger.info("{} new links added, {} existing links updated, {} links rejected, {} links blocked by robots from {}".format(
counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_url))
counts["added"], counts["updated"], counts["rejected"], counts["blocked"], parent_page))
def _consume_completed_url(self):
def _consume_completed_page(self):
for site in self._db.sites():
q = self._conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id))
q = self._conn.SimpleQueue("brozzler.sites.{}.completed_pages".format(site.id))
try:
msg = q.get(block=False)
completed_url = brozzler.CrawlUrl(**msg.payload)
completed_page = brozzler.Page(**msg.payload)
msg.ack()
self._db.completed(completed_url)
if completed_url.redirect_url and completed_url.hops_from_seed == 0:
site.note_seed_redirect(completed_url.redirect_url)
self._db.completed(completed_page)
if completed_page.redirect_url and completed_page.hops_from_seed == 0:
site.note_seed_redirect(completed_page.redirect_url)
self._db.update_site(site)
self._scope_and_schedule_outlinks(site, completed_url)
self._scope_and_schedule_outlinks(site, completed_page)
except kombu.simple.Empty:
pass

View File

@ -62,7 +62,7 @@ class Site:
def to_json(self):
return json.dumps(self.to_dict(), separators=(',', ':'))
class CrawlUrl:
class Page:
def __init__(self, url, id=None, site_id=None, hops_from_seed=0, outlinks=None, redirect_url=None):
self.id = id
self.site_id = site_id
@ -73,7 +73,7 @@ class CrawlUrl:
self.redirect_url = redirect_url
def __repr__(self):
return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format(
return """Page(url="{}",site_id={},hops_from_seed={})""".format(
self.url, self.site_id, self.hops_from_seed)
def note_redirect(self, url):

View File

@ -43,32 +43,32 @@ class BrozzlerWorker:
## os.environ["http_proxy"] = "http://{}".format(site.proxy)
return youtube_dl.YoutubeDL(ydl_opts)
def _next_url(self, site):
def _next_page(self, site):
"""Raises kombu.simple.Empty if queue is empty"""
with kombu.Connection(self._amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id))
q = conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id))
msg = q.get(block=True, timeout=0.5)
crawl_url_dict = msg.payload
crawl_url = brozzler.CrawlUrl(**crawl_url_dict)
page_dict = msg.payload
page = brozzler.Page(**page_dict)
msg.ack()
return crawl_url
return page
def _completed_url(self, site, crawl_url):
def _completed_page(self, site, page):
with kombu.Connection(self._amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.{}.completed_urls".format(site.id))
self.logger.info("putting {} on queue {}".format(crawl_url, q.queue.name))
q.put(crawl_url.to_dict())
q = conn.SimpleQueue("brozzler.sites.{}.completed_pages".format(site.id))
self.logger.info("putting {} on queue {}".format(page, q.queue.name))
q.put(page.to_dict())
def _disclaim_site(self, site, crawl_url=None):
def _disclaim_site(self, site, page=None):
# XXX maybe should put on "disclaimed" queue and hq should put back on "unclaimed"
with kombu.Connection(self._amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.unclaimed".format(site.id))
self.logger.info("putting {} on queue {}".format(site, q.queue.name))
q.put(site.to_dict())
if crawl_url:
q = conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site.id))
self.logger.info("putting unfinished url {} on queue {}".format(crawl_url, q.queue.name))
q.put(crawl_url.to_dict())
if page:
q = conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id))
self.logger.info("putting unfinished page {} on queue {}".format(page, q.queue.name))
q.put(page.to_dict())
def _putmeta(self, warcprox_address, url, content_type, payload):
request = urllib.request.Request(url, method="PUTMETA",
@ -86,14 +86,14 @@ class BrozzlerWorker:
except urllib.error.HTTPError as e:
self.logger.warn("""got "{} {}" response on warcprox PUTMETA request (expected 204)""".format(e.getcode(), e.info()))
def _try_youtube_dl(self, ydl, site, crawl_url):
def _try_youtube_dl(self, ydl, site, page):
try:
self.logger.info("trying youtube-dl on {}".format(crawl_url))
info = ydl.extract_info(crawl_url.url)
self.logger.info("trying youtube-dl on {}".format(page))
info = ydl.extract_info(page.url)
if site.proxy and site.enable_warcprox_features:
info_json = json.dumps(info, sort_keys=True, indent=4)
self.logger.info("sending PUTMETA request to warcprox with youtube-dl json for {}".format(crawl_url))
self._putmeta(warcprox_address, site.proxy, url=crawl_url.url,
self.logger.info("sending PUTMETA request to warcprox with youtube-dl json for {}".format(page))
self._putmeta(warcprox_address=site.proxy, url=page.url,
content_type="application/vnd.youtube-dl_formats+json;charset=utf-8",
payload=info_json.encode("utf-8"))
except BaseException as e:
@ -102,31 +102,31 @@ class BrozzlerWorker:
else:
raise
def _brozzle_page(self, browser, ydl, site, crawl_url):
def _brozzle_page(self, browser, ydl, site, page):
def on_screenshot(screenshot_png):
if site.proxy and site.enable_warcprox_features:
self.logger.info("sending PUTMETA request to warcprox with screenshot for {}".format(crawl_url))
self._putmeta(warcprox_address=site.proxy, url=crawl_url.url,
self.logger.info("sending PUTMETA request to warcprox with screenshot for {}".format(page))
self._putmeta(warcprox_address=site.proxy, url=page.url,
content_type="image/png", payload=screenshot_png)
self.logger.info("brozzling {}".format(crawl_url))
self._try_youtube_dl(ydl, site, crawl_url)
self.logger.info("brozzling {}".format(page))
self._try_youtube_dl(ydl, site, page)
crawl_url.outlinks = browser.browse_page(crawl_url.url,
page.outlinks = browser.browse_page(page.url,
on_screenshot=on_screenshot,
on_url_change=crawl_url.note_redirect)
on_url_change=page.note_redirect)
def _brozzle_site(self, browser, ydl, site):
start = time.time()
crawl_url = None
page = None
try:
browser.start(proxy=site.proxy)
while not self._shutdown_requested.is_set() and time.time() - start < 60:
try:
crawl_url = self._next_url(site)
self._brozzle_page(browser, ydl, site, crawl_url)
self._completed_url(site, crawl_url)
crawl_url = None
page = self._next_page(site)
self._brozzle_page(browser, ydl, site, page)
self._completed_page(site, page)
page = None
except kombu.simple.Empty:
# if some timeout reached, re-raise?
pass
@ -136,7 +136,7 @@ class BrozzlerWorker:
self.logger.info("{} shut down".format(browser))
finally:
browser.stop()
self._disclaim_site(site, crawl_url)
self._disclaim_site(site, page)
self._browser_pool.release(browser)
def run(self):