diff --git a/bin/crawl-url b/bin/crawl-url index 4b6c7d1..4b27a56 100755 --- a/bin/crawl-url +++ b/bin/crawl-url @@ -6,9 +6,10 @@ import os import sys import logging import umbra -import umbra.frontier import threading import time +import sortedcontainers +import surt arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__), description='browse-url - open urls in chrome/chromium and run behaviors', @@ -29,28 +30,101 @@ args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') -frontier = umbra.frontier.Frontier() -for url in args.urls: - frontier.schedule(umbra.frontier.CrawlUrl(url, priority=1000)) +class CrawlUrl: + def __init__(self, url, priority=1): + self.url = url + self._surt = None + self.set_priority(priority) -def frontier_schedule(urls): - logging.info("scheduling {} urls".format(len(urls))) - for url in urls: - frontier.schedule(umbra.frontier.CrawlUrl(url)) + def set_priority(self, priority): + # priority_key is both a sortable priority (higher value is higher + # priority) and a unique hash key + self.priority_key = (priority << 32) | (hash(self.surt) & (2**32 - 1)) -def crawl_from_frontier(i): - with umbra.Browser(chrome_port=9200 + i, chrome_exe=args.chrome_exe) as browser: + @property + def surt(self): + if self._surt is None: + self._surt = surt.surt(self.url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) + return self._surt + + @property + def priority(self): + return self.priority_key >> 32 + +class CrawlUrlQueue: + def __init__(self): + # {priority_key:CrawlUrl} + self._pq = sortedcontainers.SortedDict() + # {surt:CrawlUrl} + self._urls = {} + self.aggregate_priority = 0 + + def __len__(self): + assert len(self._urls) == len(self._pq) + return len(self._urls) + + def schedule(self, crawl_url): + self.aggregate_priority += crawl_url.priority + + try: + old_priority_key = self._urls.pop(crawl_url.surt).priority_key + old_crawl_url = self._pq.pop(old_priority_key) + + # XXX dumb calculation of new priority, may not belong here + crawl_url.set_priority(crawl_url.priority + old_crawl_url.priority) + except KeyError: + pass + + self._urls[crawl_url.surt] = crawl_url + self._pq[crawl_url.priority_key] = crawl_url + + def pop(self): + res0 = self._pq.popitem(last=True)[1] + res1 = self._urls.pop(res0.surt) + assert res0 is res1 + + self.aggregate_priority -= res0.priority + + return res0 + +class Site: + """A seed url, scope definition, and prioritized url queue.""" + def __init__(self, seed_url): + self.seed = CrawlUrl(seed_url, priority=1000) + + self.q = CrawlUrlQueue() + self.q.schedule(self.seed) + + def is_in_scope(self, url): + surtt = surt.surt(url, canonicalizer=surt.GoogleURLCanonicalizer, trailing_comma=True) + return surtt.startswith(self.seed.surt) + + def submit(self, urls): + for url in urls: + if self.is_in_scope(url): + logging.debug("accepted {}".format(url)) + site.q.schedule(CrawlUrl(url)) + else: + logging.info("rejected {}".format(url)) + +# "browse" + "crawl" = "brozzle" +def brozzle_site(site, chrome_port): + with umbra.Browser(chrome_port=chrome_port, chrome_exe=args.chrome_exe) as browser: while True: try: - crawl_url = frontier.pop() - browser.browse_page(crawl_url.url, on_outlinks=frontier_schedule) + crawl_url = site.q.pop() + outlinks = browser.browse_page(crawl_url.url) + site.submit(outlinks) except KeyError: - time.sleep(0.5) + break -for i in range(int(args.max_browsers)): - th = threading.Thread(target=lambda: crawl_from_frontier(i), - name="BrowsingThread{}".format(i)) +chrome_port = 9200 +for seed_url in args.urls: + site = Site(seed_url) + + th = threading.Thread(target=lambda: brozzle_site(site, chrome_port), + name="BrowsingThread-{}".format(site.seed.surt)) th.start() -while True: - time.sleep(0.5) + chrome_port += 1 + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..992922e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +kombu +websocket-client-py3==0.13.1 +argparse +PyYAML +sortedcontainers +git+https://github.com/ikreymer/surt.git@py3 diff --git a/setup.py b/setup.py index 6c78b07..8af8679 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,6 @@ setuptools.setup(name='umbra', license='Apache License 2.0', packages=['umbra'], package_data={'umbra':['behaviors.d/*.js*', 'behaviors.yaml', 'version.txt']}, - install_requires=['kombu', 'websocket-client-py3==0.13.1', 'argparse', 'PyYAML', 'sortedcontainers'], scripts=glob.glob('bin/*'), zip_safe=False, classifiers=[ @@ -40,5 +39,6 @@ setuptools.setup(name='umbra', 'Environment :: Console', 'License :: OSI Approved :: Apache Software License', 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', 'Topic :: System :: Archiving', ]) diff --git a/umbra/browser.py b/umbra/browser.py index f294b15..cc518e2 100644 --- a/umbra/browser.py +++ b/umbra/browser.py @@ -96,11 +96,13 @@ class Browser: def abort_browse_page(self): self._abort_browse_page = True - def browse_page(self, url, on_request=None, on_screenshot=None, on_outlinks=None): + def browse_page(self, url, on_request=None, on_screenshot=None): """Synchronously loads a page, takes a screenshot, and runs behaviors. Raises BrowsingException if browsing the page fails in a non-critical way. + + Returns extracted outlinks. """ self.url = url self.on_request = on_request @@ -108,9 +110,8 @@ class Browser: self.on_screenshot = on_screenshot self._waiting_on_screenshot_msg_id = None - self.on_outlinks = on_outlinks self._waiting_on_outlinks_msg_id = None - self._got_outlinks = False + self._outlinks = None self._websock = websocket.WebSocketApp(self._websocket_url, on_open=self._visit_page, on_message=self._handle_message) @@ -126,7 +127,7 @@ class Browser: while True: time.sleep(0.5) if self._browse_interval_func(): - return + return self._outlinks finally: if self._websock and self._websock.sock and self._websock.sock.connected: try: @@ -149,7 +150,7 @@ class Browser: if not self._websock or not self._websock.sock or not self._websock.sock.connected: raise BrowsingException("websocket closed, did chrome die? {}".format(self._websocket_url)) elif self._behavior != None and self._behavior.is_finished(): - if self._got_outlinks: + if self._outlinks: self.logger.info("got outlinks, finished url={}".format(self.url)) return True elif not self._waiting_on_outlinks_msg_id: @@ -225,10 +226,8 @@ class Browser: self._behavior.start() elif message["id"] == self._waiting_on_outlinks_msg_id: self.logger.debug("got outlinks message={}".format(message)) - self._got_outlinks = True # {'result': {'wasThrown': False, 'result': {'value': 'https://archive-it.org/cgi-bin/dedup-test/change_every_second https://archive-it.org/cgi-bin/dedup-test/change_every_minute https://archive-it.org/cgi-bin/dedup-test/change_every_10minutes https://archive-it.org/cgi-bin/dedup-test/change_every_hour https://archive-it.org/cgi-bin/dedup-test/change_every_day https://archive-it.org/cgi-bin/dedup-test/change_every_month https://archive-it.org/cgi-bin/dedup-test/change_every_year https://archive-it.org/cgi-bin/dedup-test/change_never http://validator.w3.org/check?uri=referer', 'type': 'string'}}, 'id': 32} - if self.on_outlinks: - self.on_outlinks(frozenset(message["result"]["result"]["value"].split(" "))) + self._outlinks = frozenset(message["result"]["result"]["value"].split(" ")) elif self._behavior and self._behavior.is_waiting_on_result(message["id"]): self._behavior.notify_of_result(message) # elif "method" in message and message["method"] in ("Network.dataReceived", "Network.responseReceived", "Network.loadingFinished"): diff --git a/umbra/frontier.py b/umbra/frontier.py deleted file mode 100644 index 4d6957b..0000000 --- a/umbra/frontier.py +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/env python -# vim: set sw=4 et: - -import logging -import sys -import urllib.parse -import sortedcontainers -import threading - -class CrawlUrl: - def __init__(self, url, priority=1): - self.url = url - self.set_priority(priority) - self._netloc = None - - def set_priority(self, priority): - # priority_key is both a sortable priority (higher value is higher - # priority) and a unique hash key - self.priority_key = (priority << 32) | (hash(self.url) & (2**32 - 1)) - - def get_priority(self): - return self.priority_key >> 32 - - @property - def host(self): - if self._netloc is None: - self._netloc = urllib.parse.urlsplit(self.url)[1] - return self._netloc - -class Frontier: - def __init__(self): - # {url:CrawlUrl} - self._urls = {} - - # {host:SortedDict{priority_key:CrawlUrl}} - self._queues_by_host = {} - - self._lock = threading.Lock() - - def schedule(self, crawl_url): - with self._lock: - try: - old_priority_key = self._urls.pop(crawl_url.url).priority_key - old_crawl_url = self._queues_by_host[crawl_url.host].pop(old_priority_key) - - # XXX very dumb calculation of new priority, probably doesn't belong here - crawl_url.set_priority(crawl_url.get_priority() + old_crawl_url.get_priority()) - except KeyError: - pass - - self._urls[crawl_url.url] = crawl_url - if crawl_url.host not in self._queues_by_host: - self._queues_by_host[crawl_url.host] = sortedcontainers.SortedDict() - self._queues_by_host[crawl_url.host][crawl_url.priority_key] = crawl_url - - def pop(self, host=None): - with self._lock: - if not host or host not in self._queues_by_host: - # XXX should prioritize queues, this picks one at random - for h in self._queues_by_host: - host = h - break - - result = self._queues_by_host[host].popitem(last=True)[1] - if len(self._queues_by_host[host]) == 0: - del self._queues_by_host[host] - - result2 = self._urls.pop(result.url) - assert result2 is result - - return result -