From 8aa1e6715a6af244eeb40aca166aa08e3fecabc2 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 10 Jul 2015 20:12:33 -0700 Subject: [PATCH] feed seed url to the crawl url queue --- bin/brozzler-hq | 88 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 13 deletions(-) diff --git a/bin/brozzler-hq b/bin/brozzler-hq index cb88058..974c312 100644 --- a/bin/brozzler-hq +++ b/bin/brozzler-hq @@ -29,17 +29,37 @@ args = arg_parser.parse_args(args=sys.argv[1:]) logging.basicConfig(stream=sys.stdout, level=args.log_level, format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s") +class CrawlUrl: + def __init__(self, url, id=None, site_id=None, hops_from_seed=0): + self.id = id + self.site_id = site_id + self.url = url + self.hops_from_seed = hops_from_seed + self._canon_hurl = None + + def __repr__(self): + return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format( + self.url, self.site_id, self.hops_from_seed) + + def canonical(self): + if self._canon_hurl is None: + self._canon_hurl = surt.handyurl.parse(self.url) + surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl) + return self._canon_hurl.geturl() + + def to_json(self): + d = dict(id=self.id, site_id=self.site_id, url=self.url, hops_from_seed=self.hops_from_seed) + return json.dumps(d, separators=(',', ':')) + class BrozzlerHQDb: logger = logging.getLogger(__module__ + "." + __qualname__) def __init__(self, db_file="./brozzler-hq-0.db"): - self.db_file = db_file - self.db_conn = sqlite3.connect(db_file) - + self._conn = sqlite3.connect(db_file) self._create_tables() def _create_tables(self): - cursor = self.db_conn.cursor() + cursor = self._conn.cursor() cursor.executescript(""" create table if not exists brozzler_sites ( id integer primary key, @@ -54,18 +74,51 @@ class BrozzlerHQDb: canon_url varchar(4000), crawl_url_json text ); - create index if not exists brozzler_urls_priority on brozzler_urls (priority); + create index if not exists brozzler_urls_priority on brozzler_urls (priority desc); create index if not exists brozzler_urls_site_id on brozzler_urls (site_id); """) - self.db_conn.commit() + self._conn.commit() + + def pop_url(self, site_id): + cursor = self._conn.cursor() + cursor.execute("select id, priority, crawl_url_json from brozzler_urls where site_id = ? and not in_progress order by priority desc limit 1", (site_id,)) + row = cursor.fetchone() + if row: + (id, priority, crawl_url_json) = row + new_priority = priority - 2000 + cursor.execute("update brozzler_urls set priority=?, in_progress=1 where id=?", (new_priority, id)) + self._conn.commit() + + d = json.loads(crawl_url_json) + d["id"] = id + return d + else: + return None def new_site(self, site_dict): site_json = json.dumps(site_dict, separators=(',', ':')) - cursor = self.db_conn.cursor() + cursor = self._conn.cursor() cursor.execute("insert into brozzler_sites (site_json) values (?)", (site_json,)) - self.db_conn.commit() + self._conn.commit() return cursor.lastrowid + def schedule_url(self, crawl_url, priority=0): + cursor = self._conn.cursor() + cursor.execute("insert into brozzler_urls (site_id, priority, canon_url, crawl_url_json, in_progress) values (?, ?, ?, ?, 0)", + (crawl_url.site_id, priority, crawl_url.canonical(), crawl_url.to_json())) + self._conn.commit() + + def sites(self): + cursor = self._conn.cursor() + cursor.execute("select id, site_json from brozzler_sites") + while True: + row = cursor.fetchone() + if row is None: + break + site_dict = json.loads(row[1]) + site_dict["id"] = row[0] + yield site_dict + class BrozzlerHQ: logger = logging.getLogger(__module__ + "." + __qualname__) @@ -84,7 +137,7 @@ class BrozzlerHQ: while True: self._new_site() # self._consume_completed_url() - # self._feed_crawl_urls() + self._feed_crawl_urls() time.sleep(0.5) finally: self._conn.close() @@ -95,18 +148,27 @@ class BrozzlerHQ: new_site_dict = msg.payload site_id = self._db.new_site(new_site_dict) new_site_dict["id"] = site_id - self._schedule_seed(new_site_dict["seed"]) + self._schedule_seed(site_id, new_site_dict["seed"]) self._unclaimed_sites_q.put(new_site_dict) msg.ack() self.logger.info("new site {}".format(new_site_dict)) except kombu.simple.Empty: pass - def _schedule_seed(self, seed_url): - pass - # self._db.schedule_url( + def _schedule_seed(self, site_id, seed_url): + crawl_url = CrawlUrl(seed_url, site_id=site_id, hops_from_seed=0) + self._db.schedule_url(crawl_url, priority=1000) + + def _feed_crawl_urls(self): + for site in self._db.sites(): + q = self._conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site["id"])) + if len(q) == 0: + url = self._db.pop_url(site["id"]) + if url: + q.put(url) logging.info("brozzler-hq starting") + db = BrozzlerHQDb(db_file=args.db_file) hq = BrozzlerHQ(amqp_url=args.amqp_url, db=db)