mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-06-21 05:14:22 -04:00
rewrite frontier.scope_and_schedule_outlinks() to use batch rethinkdb queries, because we have witnessed the method running for hours(!)
This commit is contained in:
parent
d904daea9c
commit
bdc0badec3
2 changed files with 87 additions and 44 deletions
|
@ -268,52 +268,95 @@ class RethinkDbFrontier:
|
||||||
{"start":doublethink.utcnow(), "stop":None})
|
{"start":doublethink.utcnow(), "stop":None})
|
||||||
site.save()
|
site.save()
|
||||||
|
|
||||||
def scope_and_schedule_outlinks(self, site, parent_page, outlinks):
|
def _scope_and_enforce_robots(self, site, parent_page, outlinks):
|
||||||
decisions = {"accepted":set(),"blocked":set(),"rejected":set()}
|
'''
|
||||||
counts = {"added":0,"updated":0,"rejected":0,"blocked":0}
|
Returns tuple (
|
||||||
|
set of in scope urls (uncanonicalized) accepted by robots policy,
|
||||||
|
set of in scope urls (canonicalized) blocked by robots policy,
|
||||||
|
set of out-of-scope urls (canonicalized)).
|
||||||
|
'''
|
||||||
|
in_scope = set()
|
||||||
|
blocked = set()
|
||||||
|
out_of_scope = set()
|
||||||
for url in outlinks or []:
|
for url in outlinks or []:
|
||||||
|
url_for_scoping = urlcanon.semantic(url)
|
||||||
|
url_for_crawling = urlcanon.whatwg(url)
|
||||||
|
urlcanon.canon.remove_fragment(url_for_crawling)
|
||||||
|
if site.is_in_scope(url_for_scoping, parent_page=parent_page):
|
||||||
|
if brozzler.is_permitted_by_robots(site, str(url_for_crawling)):
|
||||||
|
in_scope.add(url)
|
||||||
|
else:
|
||||||
|
blocked.add(str(url_for_crawling))
|
||||||
|
else:
|
||||||
|
out_of_scope.add(str(url_for_crawling))
|
||||||
|
return in_scope, blocked, out_of_scope
|
||||||
|
|
||||||
|
def _build_fresh_pages(self, site, parent_page, urls):
|
||||||
|
'''
|
||||||
|
Returns a dict of page_id => brozzler.Page.
|
||||||
|
'''
|
||||||
|
pages = {}
|
||||||
|
for url in urls:
|
||||||
url_for_scoping = urlcanon.semantic(url)
|
url_for_scoping = urlcanon.semantic(url)
|
||||||
url_for_crawling = urlcanon.whatwg(url)
|
url_for_crawling = urlcanon.whatwg(url)
|
||||||
hashtag = (url_for_crawling.hash_sign
|
hashtag = (url_for_crawling.hash_sign
|
||||||
+ url_for_crawling.fragment).decode('utf-8')
|
+ url_for_crawling.fragment).decode('utf-8')
|
||||||
urlcanon.canon.remove_fragment(url_for_crawling)
|
urlcanon.canon.remove_fragment(url_for_crawling)
|
||||||
if site.is_in_scope(url_for_scoping, parent_page=parent_page):
|
|
||||||
if brozzler.is_permitted_by_robots(site, str(url_for_crawling)):
|
|
||||||
if not url_for_scoping.surt().startswith(
|
if not url_for_scoping.surt().startswith(
|
||||||
site.scope["surt"].encode("utf-8")):
|
site.scope['surt'].encode('utf-8')):
|
||||||
hops_off_surt = parent_page.hops_off_surt + 1
|
hops_off_surt = parent_page.hops_off_surt + 1
|
||||||
else:
|
else:
|
||||||
hops_off_surt = 0
|
hops_off_surt = 0
|
||||||
new_child_page = brozzler.Page(self.rr, {
|
page = brozzler.Page(self.rr, {
|
||||||
'url': str(url_for_crawling),
|
'url': str(url_for_crawling),
|
||||||
'site_id': site.id, 'job_id': site.job_id,
|
'site_id': site.id,
|
||||||
'hops_from_seed': parent_page.hops_from_seed+1,
|
'job_id': site.job_id,
|
||||||
|
'hops_from_seed': parent_page.hops_from_seed + 1,
|
||||||
'via_page_id': parent_page.id,
|
'via_page_id': parent_page.id,
|
||||||
'hops_off_surt': hops_off_surt})
|
'hops_off_surt': hops_off_surt,
|
||||||
existing_child_page = brozzler.Page.load(
|
'hashtags': []})
|
||||||
self.rr, new_child_page.id)
|
if page.id in pages:
|
||||||
if existing_child_page:
|
pages[page.id].priority += page.priority
|
||||||
existing_child_page.priority += new_child_page.priority
|
page = pages[page.id]
|
||||||
if hashtag and existing_child_page.hashtags:
|
|
||||||
hashtags = set(existing_child_page.hashtags)
|
|
||||||
hashtags.add(hashtag)
|
|
||||||
existing_child_page.hashtags = list(hashtags)
|
|
||||||
elif hashtag:
|
|
||||||
existing_child_page.hashtags = [hashtag]
|
|
||||||
existing_child_page.save()
|
|
||||||
counts["updated"] += 1
|
|
||||||
else:
|
else:
|
||||||
|
pages[page.id] = page
|
||||||
if hashtag:
|
if hashtag:
|
||||||
new_child_page.hashtags = [hashtag,]
|
page.hashtags = list(set(page.hashtags + [hashtag]))
|
||||||
new_child_page.save()
|
return pages
|
||||||
counts["added"] += 1
|
|
||||||
decisions["accepted"].add(str(url_for_crawling))
|
def scope_and_schedule_outlinks(self, site, parent_page, outlinks):
|
||||||
|
decisions = {'accepted':set(),'blocked':set(),'rejected':set()}
|
||||||
|
counts = {'added':0,'updated':0,'rejected':0,'blocked':0}
|
||||||
|
|
||||||
|
in_scope, blocked, out_of_scope = self._scope_and_enforce_robots(
|
||||||
|
site, parent_page, outlinks)
|
||||||
|
decisions['blocked'] = blocked
|
||||||
|
decisions['rejected'] = out_of_scope
|
||||||
|
counts['blocked'] += len(blocked)
|
||||||
|
counts['rejected'] += len(out_of_scope)
|
||||||
|
|
||||||
|
fresh_pages = self._build_fresh_pages(site, parent_page, in_scope)
|
||||||
|
|
||||||
|
# get existing pages from rethinkdb
|
||||||
|
results = self.rr.table('pages').get_all(*fresh_pages.keys()).run()
|
||||||
|
pages = {doc['id']: brozzler.Page(self.rr, doc) for doc in results}
|
||||||
|
|
||||||
|
# build list of pages to save, consisting of new pages, and existing
|
||||||
|
# pages updated with higher priority and new hashtags
|
||||||
|
for fresh_page in fresh_pages.values():
|
||||||
|
decisions['accepted'].add(fresh_page.url)
|
||||||
|
if fresh_page.id in pages:
|
||||||
|
page = pages[fresh_page.id]
|
||||||
|
page.hashtags = list(set((page.hashtags or [])
|
||||||
|
+ fresh_page.hashtags))
|
||||||
|
page.priority += fresh_page.priority
|
||||||
|
counts['updated'] += 1
|
||||||
else:
|
else:
|
||||||
counts["blocked"] += 1
|
pages[fresh_page.id] = fresh_page
|
||||||
decisions["blocked"].add(str(url_for_crawling))
|
counts['added'] += 1
|
||||||
else:
|
|
||||||
counts["rejected"] += 1
|
result = self.rr.table('pages').insert(
|
||||||
decisions["rejected"].add(str(url_for_crawling))
|
pages.values(), conflict='replace').run()
|
||||||
|
|
||||||
parent_page.outlinks = {}
|
parent_page.outlinks = {}
|
||||||
for k in decisions:
|
for k in decisions:
|
||||||
|
@ -321,10 +364,10 @@ class RethinkDbFrontier:
|
||||||
parent_page.save()
|
parent_page.save()
|
||||||
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"%s new links added, %s existing links updated, %s links "
|
'%s new links added, %s existing links updated, %s links '
|
||||||
"rejected, %s links blocked by robots from %s",
|
'rejected, %s links blocked by robots from %s',
|
||||||
counts["added"], counts["updated"], counts["rejected"],
|
counts['added'], counts['updated'], counts['rejected'],
|
||||||
counts["blocked"], parent_page)
|
counts['blocked'], parent_page)
|
||||||
|
|
||||||
def reached_limit(self, site, e):
|
def reached_limit(self, site, e):
|
||||||
self.logger.info("reached_limit site=%s e=%s", site, e)
|
self.logger.info("reached_limit site=%s e=%s", site, e)
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -32,7 +32,7 @@ def find_package_data(package):
|
||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='brozzler',
|
name='brozzler',
|
||||||
version='1.1b11.dev250',
|
version='1.1b11.dev251',
|
||||||
description='Distributed web crawling with browsers',
|
description='Distributed web crawling with browsers',
|
||||||
url='https://github.com/internetarchive/brozzler',
|
url='https://github.com/internetarchive/brozzler',
|
||||||
author='Noah Levitt',
|
author='Noah Levitt',
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue