mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-02-24 00:29:53 -05:00
basic of site/seed crawling with scoping
This commit is contained in:
parent
92ea701987
commit
783794ca37
108
bin/crawl-url
108
bin/crawl-url
@ -6,9 +6,10 @@ import os
|
|||||||
import sys
|
import sys
|
||||||
import logging
|
import logging
|
||||||
import umbra
|
import umbra
|
||||||
import umbra.frontier
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
import sortedcontainers
|
||||||
|
import surt
|
||||||
|
|
||||||
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
|
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
|
||||||
description='browse-url - open urls in chrome/chromium and run behaviors',
|
description='browse-url - open urls in chrome/chromium and run behaviors',
|
||||||
@ -29,28 +30,101 @@ args = arg_parser.parse_args(args=sys.argv[1:])
|
|||||||
logging.basicConfig(stream=sys.stdout, level=args.log_level,
|
logging.basicConfig(stream=sys.stdout, level=args.log_level,
|
||||||
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
|
format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s')
|
||||||
|
|
||||||
frontier = umbra.frontier.Frontier()
|
class CrawlUrl:
|
||||||
for url in args.urls:
|
def __init__(self, url, priority=1):
|
||||||
frontier.schedule(umbra.frontier.CrawlUrl(url, priority=1000))
|
self.url = url
|
||||||
|
self._surt = None
|
||||||
|
self.set_priority(priority)
|
||||||
|
|
||||||
def frontier_schedule(urls):
|
def set_priority(self, priority):
|
||||||
logging.info("scheduling {} urls".format(len(urls)))
|
# priority_key is both a sortable priority (higher value is higher
|
||||||
|
# priority) and a unique hash key
|
||||||
|
self.priority_key = (priority << 32) | (hash(self.surt) & (2**32 - 1))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def surt(self):
|
||||||
|
if self._surt is None:
|
||||||
|
self._surt = surt.surt(self.url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True)
|
||||||
|
return self._surt
|
||||||
|
|
||||||
|
@property
|
||||||
|
def priority(self):
|
||||||
|
return self.priority_key >> 32
|
||||||
|
|
||||||
|
class CrawlUrlQueue:
|
||||||
|
def __init__(self):
|
||||||
|
# {priority_key:CrawlUrl}
|
||||||
|
self._pq = sortedcontainers.SortedDict()
|
||||||
|
# {surt:CrawlUrl}
|
||||||
|
self._urls = {}
|
||||||
|
self.aggregate_priority = 0
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
assert len(self._urls) == len(self._pq)
|
||||||
|
return len(self._urls)
|
||||||
|
|
||||||
|
def schedule(self, crawl_url):
|
||||||
|
self.aggregate_priority += crawl_url.priority
|
||||||
|
|
||||||
|
try:
|
||||||
|
old_priority_key = self._urls.pop(crawl_url.surt).priority_key
|
||||||
|
old_crawl_url = self._pq.pop(old_priority_key)
|
||||||
|
|
||||||
|
# XXX dumb calculation of new priority, may not belong here
|
||||||
|
crawl_url.set_priority(crawl_url.priority + old_crawl_url.priority)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._urls[crawl_url.surt] = crawl_url
|
||||||
|
self._pq[crawl_url.priority_key] = crawl_url
|
||||||
|
|
||||||
|
def pop(self):
|
||||||
|
res0 = self._pq.popitem(last=True)[1]
|
||||||
|
res1 = self._urls.pop(res0.surt)
|
||||||
|
assert res0 is res1
|
||||||
|
|
||||||
|
self.aggregate_priority -= res0.priority
|
||||||
|
|
||||||
|
return res0
|
||||||
|
|
||||||
|
class Site:
|
||||||
|
"""A seed url, scope definition, and prioritized url queue."""
|
||||||
|
def __init__(self, seed_url):
|
||||||
|
self.seed = CrawlUrl(seed_url, priority=1000)
|
||||||
|
|
||||||
|
self.q = CrawlUrlQueue()
|
||||||
|
self.q.schedule(self.seed)
|
||||||
|
|
||||||
|
def is_in_scope(self, url):
|
||||||
|
surtt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True)
|
||||||
|
return surtt.startswith(self.seed.surt)
|
||||||
|
|
||||||
|
def submit(self, urls):
|
||||||
for url in urls:
|
for url in urls:
|
||||||
frontier.schedule(umbra.frontier.CrawlUrl(url))
|
if self.is_in_scope(url):
|
||||||
|
logging.debug("accepted {}".format(url))
|
||||||
|
site.q.schedule(CrawlUrl(url))
|
||||||
|
else:
|
||||||
|
logging.info("rejected {}".format(url))
|
||||||
|
|
||||||
def crawl_from_frontier(i):
|
# "browse" + "crawl" = "brozzle"
|
||||||
with umbra.Browser(chrome_port=9200 + i, chrome_exe=args.chrome_exe) as browser:
|
def brozzle_site(site, chrome_port):
|
||||||
|
with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser:
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
crawl_url = frontier.pop()
|
crawl_url = site.q.pop()
|
||||||
browser.browse_page(crawl_url.url, on_outlinks=frontier_schedule)
|
outlinks = browser.browse_page(crawl_url.url)
|
||||||
|
site.submit(outlinks)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
time.sleep(0.5)
|
break
|
||||||
|
|
||||||
for i in range(int(args.max_browsers)):
|
chrome_port = 9200
|
||||||
th = threading.Thread(target=lambda: crawl_from_frontier(i),
|
for seed_url in args.urls:
|
||||||
name="BrowsingThread{}".format(i))
|
site = Site(seed_url)
|
||||||
|
|
||||||
|
th = threading.Thread(target=lambda: brozzle_site(site, chrome_port),
|
||||||
|
name="BrowsingThread-{}".format(site.seed.surt))
|
||||||
th.start()
|
th.start()
|
||||||
|
|
||||||
while True:
|
chrome_port += 1
|
||||||
time.sleep(0.5)
|
|
||||||
|
6
requirements.txt
Normal file
6
requirements.txt
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
kombu
|
||||||
|
websocket-client-py3==0.13.1
|
||||||
|
argparse
|
||||||
|
PyYAML
|
||||||
|
sortedcontainers
|
||||||
|
git+https://github.com/ikreymer/surt.git@py3
|
2
setup.py
2
setup.py
@ -32,7 +32,6 @@ setuptools.setup(name='umbra',
|
|||||||
license='Apache License 2.0',
|
license='Apache License 2.0',
|
||||||
packages=['umbra'],
|
packages=['umbra'],
|
||||||
package_data={'umbra':['behaviors.d/*.js*', 'behaviors.yaml', 'version.txt']},
|
package_data={'umbra':['behaviors.d/*.js*', 'behaviors.yaml', 'version.txt']},
|
||||||
install_requires=['kombu', 'websocket-client-py3==0.13.1', 'argparse', 'PyYAML', 'sortedcontainers'],
|
|
||||||
scripts=glob.glob('bin/*'),
|
scripts=glob.glob('bin/*'),
|
||||||
zip_safe=False,
|
zip_safe=False,
|
||||||
classifiers=[
|
classifiers=[
|
||||||
@ -40,5 +39,6 @@ setuptools.setup(name='umbra',
|
|||||||
'Environment :: Console',
|
'Environment :: Console',
|
||||||
'License :: OSI Approved :: Apache Software License',
|
'License :: OSI Approved :: Apache Software License',
|
||||||
'Programming Language :: Python :: 3.3',
|
'Programming Language :: Python :: 3.3',
|
||||||
|
'Programming Language :: Python :: 3.4',
|
||||||
'Topic :: System :: Archiving',
|
'Topic :: System :: Archiving',
|
||||||
])
|
])
|
||||||
|
@ -96,11 +96,13 @@ class Browser:
|
|||||||
def abort_browse_page(self):
|
def abort_browse_page(self):
|
||||||
self._abort_browse_page = True
|
self._abort_browse_page = True
|
||||||
|
|
||||||
def browse_page(self, url, on_request=None, on_screenshot=None, on_outlinks=None):
|
def browse_page(self, url, on_request=None, on_screenshot=None):
|
||||||
"""Synchronously loads a page, takes a screenshot, and runs behaviors.
|
"""Synchronously loads a page, takes a screenshot, and runs behaviors.
|
||||||
|
|
||||||
Raises BrowsingException if browsing the page fails in a non-critical
|
Raises BrowsingException if browsing the page fails in a non-critical
|
||||||
way.
|
way.
|
||||||
|
|
||||||
|
Returns extracted outlinks.
|
||||||
"""
|
"""
|
||||||
self.url = url
|
self.url = url
|
||||||
self.on_request = on_request
|
self.on_request = on_request
|
||||||
@ -108,9 +110,8 @@ class Browser:
|
|||||||
self.on_screenshot = on_screenshot
|
self.on_screenshot = on_screenshot
|
||||||
self._waiting_on_screenshot_msg_id = None
|
self._waiting_on_screenshot_msg_id = None
|
||||||
|
|
||||||
self.on_outlinks = on_outlinks
|
|
||||||
self._waiting_on_outlinks_msg_id = None
|
self._waiting_on_outlinks_msg_id = None
|
||||||
self._got_outlinks = False
|
self._outlinks = None
|
||||||
|
|
||||||
self._websock = websocket.WebSocketApp(self._websocket_url,
|
self._websock = websocket.WebSocketApp(self._websocket_url,
|
||||||
on_open=self._visit_page, on_message=self._handle_message)
|
on_open=self._visit_page, on_message=self._handle_message)
|
||||||
@ -126,7 +127,7 @@ class Browser:
|
|||||||
while True:
|
while True:
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
if self._browse_interval_func():
|
if self._browse_interval_func():
|
||||||
return
|
return self._outlinks
|
||||||
finally:
|
finally:
|
||||||
if self._websock and self._websock.sock and self._websock.sock.connected:
|
if self._websock and self._websock.sock and self._websock.sock.connected:
|
||||||
try:
|
try:
|
||||||
@ -149,7 +150,7 @@ class Browser:
|
|||||||
if not self._websock or not self._websock.sock or not self._websock.sock.connected:
|
if not self._websock or not self._websock.sock or not self._websock.sock.connected:
|
||||||
raise BrowsingException("websocket closed, did chrome die? {}".format(self._websocket_url))
|
raise BrowsingException("websocket closed, did chrome die? {}".format(self._websocket_url))
|
||||||
elif self._behavior != None and self._behavior.is_finished():
|
elif self._behavior != None and self._behavior.is_finished():
|
||||||
if self._got_outlinks:
|
if self._outlinks:
|
||||||
self.logger.info("got outlinks, finished url={}".format(self.url))
|
self.logger.info("got outlinks, finished url={}".format(self.url))
|
||||||
return True
|
return True
|
||||||
elif not self._waiting_on_outlinks_msg_id:
|
elif not self._waiting_on_outlinks_msg_id:
|
||||||
@ -225,10 +226,8 @@ class Browser:
|
|||||||
self._behavior.start()
|
self._behavior.start()
|
||||||
elif message["id"] == self._waiting_on_outlinks_msg_id:
|
elif message["id"] == self._waiting_on_outlinks_msg_id:
|
||||||
self.logger.debug("got outlinks message={}".format(message))
|
self.logger.debug("got outlinks message={}".format(message))
|
||||||
self._got_outlinks = True
|
|
||||||
# {'result': {'wasThrown': False, 'result': {'value': 'https://archive-it.org/cgi-bin/dedup-test/change_every_second https://archive-it.org/cgi-bin/dedup-test/change_every_minute https://archive-it.org/cgi-bin/dedup-test/change_every_10minutes https://archive-it.org/cgi-bin/dedup-test/change_every_hour https://archive-it.org/cgi-bin/dedup-test/change_every_day https://archive-it.org/cgi-bin/dedup-test/change_every_month https://archive-it.org/cgi-bin/dedup-test/change_every_year https://archive-it.org/cgi-bin/dedup-test/change_never http://validator.w3.org/check?uri=referer', 'type': 'string'}}, 'id': 32}
|
# {'result': {'wasThrown': False, 'result': {'value': 'https://archive-it.org/cgi-bin/dedup-test/change_every_second https://archive-it.org/cgi-bin/dedup-test/change_every_minute https://archive-it.org/cgi-bin/dedup-test/change_every_10minutes https://archive-it.org/cgi-bin/dedup-test/change_every_hour https://archive-it.org/cgi-bin/dedup-test/change_every_day https://archive-it.org/cgi-bin/dedup-test/change_every_month https://archive-it.org/cgi-bin/dedup-test/change_every_year https://archive-it.org/cgi-bin/dedup-test/change_never http://validator.w3.org/check?uri=referer', 'type': 'string'}}, 'id': 32}
|
||||||
if self.on_outlinks:
|
self._outlinks = frozenset(message["result"]["result"]["value"].split(" "))
|
||||||
self.on_outlinks(frozenset(message["result"]["result"]["value"].split(" ")))
|
|
||||||
elif self._behavior and self._behavior.is_waiting_on_result(message["id"]):
|
elif self._behavior and self._behavior.is_waiting_on_result(message["id"]):
|
||||||
self._behavior.notify_of_result(message)
|
self._behavior.notify_of_result(message)
|
||||||
# elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"):
|
# elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"):
|
||||||
|
@ -1,72 +0,0 @@
|
|||||||
#!/usr/bin/env python
|
|
||||||
# vim: set sw=4 et:
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import sys
|
|
||||||
import urllib.parse
|
|
||||||
import sortedcontainers
|
|
||||||
import threading
|
|
||||||
|
|
||||||
class CrawlUrl:
|
|
||||||
def __init__(self, url, priority=1):
|
|
||||||
self.url = url
|
|
||||||
self.set_priority(priority)
|
|
||||||
self._netloc = None
|
|
||||||
|
|
||||||
def set_priority(self, priority):
|
|
||||||
# priority_key is both a sortable priority (higher value is higher
|
|
||||||
# priority) and a unique hash key
|
|
||||||
self.priority_key = (priority << 32) | (hash(self.url) & (2**32 - 1))
|
|
||||||
|
|
||||||
def get_priority(self):
|
|
||||||
return self.priority_key >> 32
|
|
||||||
|
|
||||||
@property
|
|
||||||
def host(self):
|
|
||||||
if self._netloc is None:
|
|
||||||
self._netloc = urllib.parse.urlsplit(self.url)[1]
|
|
||||||
return self._netloc
|
|
||||||
|
|
||||||
class Frontier:
|
|
||||||
def __init__(self):
|
|
||||||
# {url:CrawlUrl}
|
|
||||||
self._urls = {}
|
|
||||||
|
|
||||||
# {host:SortedDict{priority_key:CrawlUrl}}
|
|
||||||
self._queues_by_host = {}
|
|
||||||
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
|
|
||||||
def schedule(self, crawl_url):
|
|
||||||
with self._lock:
|
|
||||||
try:
|
|
||||||
old_priority_key = self._urls.pop(crawl_url.url).priority_key
|
|
||||||
old_crawl_url = self._queues_by_host[crawl_url.host].pop(old_priority_key)
|
|
||||||
|
|
||||||
# XXX very dumb calculation of new priority, probably doesn't belong here
|
|
||||||
crawl_url.set_priority(crawl_url.get_priority() + old_crawl_url.get_priority())
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
self._urls[crawl_url.url] = crawl_url
|
|
||||||
if crawl_url.host not in self._queues_by_host:
|
|
||||||
self._queues_by_host[crawl_url.host] = sortedcontainers.SortedDict()
|
|
||||||
self._queues_by_host[crawl_url.host][crawl_url.priority_key] = crawl_url
|
|
||||||
|
|
||||||
def pop(self, host=None):
|
|
||||||
with self._lock:
|
|
||||||
if not host or host not in self._queues_by_host:
|
|
||||||
# XXX should prioritize queues, this picks one at random
|
|
||||||
for h in self._queues_by_host:
|
|
||||||
host = h
|
|
||||||
break
|
|
||||||
|
|
||||||
result = self._queues_by_host[host].popitem(last=True)[1]
|
|
||||||
if len(self._queues_by_host[host]) == 0:
|
|
||||||
del self._queues_by_host[host]
|
|
||||||
|
|
||||||
result2 = self._urls.pop(result.url)
|
|
||||||
assert result2 is result
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user