rudimentary crawling in parallel with multiple browsers

This commit is contained in:
Noah Levitt 2015-07-08 18:50:18 -07:00
parent 32abfcac8a
commit 92ea701987
2 changed files with 48 additions and 31 deletions

View File

@ -7,6 +7,8 @@ import sys
import logging
import umbra
import umbra.frontier
import threading
import time
arg_parser = argparse.ArgumentParser(prog=os.path.basename(__file__),
description='browse-url - open urls in chrome/chromium and run behaviors',
@ -16,6 +18,8 @@ arg_parser.add_argument('-w', '--browser-wait', dest='browser_wait', default='60
help='seconds to wait for browser initialization')
arg_parser.add_argument('-e', '--executable', dest='chrome_exe', default='chromium-browser',
help='executable to use to invoke chrome')
arg_parser.add_argument('-n', '--max-browsers', dest='max_browsers', default='1',
help='Max number of chrome instances simultaneously browsing pages')
arg_parser.add_argument('-v', '--verbose', dest='log_level',
action="store_const", default=logging.INFO, const=logging.DEBUG)
arg_parser.add_argument('--version', action='version',
@ -30,15 +34,23 @@ for url in args.urls:
frontier.schedule(umbra.frontier.CrawlUrl(url, priority=1000))
def frontier_schedule(urls):
logging.info("adding {} urls to frontier".format(len(urls)))
logging.info("scheduling {} urls".format(len(urls)))
for url in urls:
frontier.schedule(umbra.frontier.CrawlUrl(url))
with umbra.Browser(chrome_exe=args.chrome_exe) as browser:
try:
def crawl_from_frontier(i):
with umbra.Browser(chrome_port=9200 + i, chrome_exe=args.chrome_exe) as browser:
while True:
crawl_url = frontier.pop()
browser.browse_page(crawl_url.url, on_outlinks=frontier_schedule)
except IndexError:
logging.info("finished, frontier is empty")
try:
crawl_url = frontier.pop()
browser.browse_page(crawl_url.url, on_outlinks=frontier_schedule)
except KeyError:
time.sleep(0.5)
for i in range(int(args.max_browsers)):
th = threading.Thread(target=lambda: crawl_from_frontier(i),
name="BrowsingThread{}".format(i))
th.start()
while True:
time.sleep(0.5)

View File

@ -5,6 +5,7 @@ import logging
import sys
import urllib.parse
import sortedcontainers
import threading
class CrawlUrl:
def __init__(self, url, priority=1):
@ -29,39 +30,43 @@ class CrawlUrl:
class Frontier:
def __init__(self):
# {url:CrawlUrl}
self.urls = {}
self._urls = {}
# {host:SortedDict{priority_key:CrawlUrl}}
self.queues_by_host = {}
self._queues_by_host = {}
self._lock = threading.Lock()
def schedule(self, crawl_url):
try:
old_priority_key = self.urls.pop(crawl_url.url).priority_key
old_crawl_url = self.queues_by_host[crawl_url.host].pop(old_priority_key)
with self._lock:
try:
old_priority_key = self._urls.pop(crawl_url.url).priority_key
old_crawl_url = self._queues_by_host[crawl_url.host].pop(old_priority_key)
# XXX very dumb calculation of new priority, probably doesn't belong here
crawl_url.set_priority(crawl_url.get_priority() + old_crawl_url.get_priority())
except KeyError:
pass
# XXX very dumb calculation of new priority, probably doesn't belong here
crawl_url.set_priority(crawl_url.get_priority() + old_crawl_url.get_priority())
except KeyError:
pass
self.urls[crawl_url.url] = crawl_url
if crawl_url.host not in self.queues_by_host:
self.queues_by_host[crawl_url.host] = sortedcontainers.SortedDict()
self.queues_by_host[crawl_url.host][crawl_url.priority_key] = crawl_url
self._urls[crawl_url.url] = crawl_url
if crawl_url.host not in self._queues_by_host:
self._queues_by_host[crawl_url.host] = sortedcontainers.SortedDict()
self._queues_by_host[crawl_url.host][crawl_url.priority_key] = crawl_url
def pop(self, host=None):
if not host or host not in self.queues_by_host:
# XXX should prioritize queues, this picks one at random
for h in self.queues_by_host:
host = h
break
with self._lock:
if not host or host not in self._queues_by_host:
# XXX should prioritize queues, this picks one at random
for h in self._queues_by_host:
host = h
break
result = self.queues_by_host[host].popitem(last=True)[1]
if len(self.queues_by_host[host]) == 0:
del self.queues_by_host[host]
result = self._queues_by_host[host].popitem(last=True)[1]
if len(self._queues_by_host[host]) == 0:
del self._queues_by_host[host]
result2 = self.urls.pop(result.url)
assert result2 is result
result2 = self._urls.pop(result.url)
assert result2 is result
return result
return result