diff --git a/brozzler/hq.py b/brozzler/hq.py index a07a940..8943160 100644 --- a/brozzler/hq.py +++ b/brozzler/hq.py @@ -124,6 +124,15 @@ class BrozzlerHQDb: cursor.execute("update brozzler_sites set status=? where id=?", (status, site.id,)) self._conn.commit() + def get_status(self, site): + cursor = self._conn.cursor() + cursor.execute("select status from brozzler_sites where id=?", (site.id,)) + row = cursor.fetchone() + if row: + return row[0] + else: + raise KeyError("site not in brozzler_sites id={}".format(site.id,)) + class BrozzlerHQ: logger = logging.getLogger(__module__ + "." + __qualname__) @@ -132,6 +141,7 @@ class BrozzlerHQ: self._conn = kombu.Connection(amqp_url) self._new_sites_q = self._conn.SimpleQueue("brozzler.sites.new") self._unclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.unclaimed") + self._disclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.disclaimed") if db != None: self._db = db else: @@ -143,10 +153,27 @@ class BrozzlerHQ: self._new_site() self._consume_completed_page() self._feed_pages() + self._disclaimed_site() time.sleep(0.5) finally: self._conn.close() + def _disclaimed_site(self): + try: + msg = self._disclaimed_sites_q.get(block=False) + site = brozzler.Site(**msg.payload) + msg.ack() + self.logger.info("received disclaimed site {}".format(site)) + + status = self._db.get_status(site) + if status != "FINISHED": + self.logger.info("feeding disclaimed site {} back to {}".format(site, self._unclaimed_sites_q.queue.name)) + self._unclaimed_sites_q.put(site.to_dict()) + else: + self.logger.info("disclaimed site is FINISHED {}".format(site)) + except kombu.simple.Empty: + pass + def _new_site(self): try: msg = self._new_sites_q.get(block=False) diff --git a/brozzler/site.py b/brozzler/site.py index feea078..c38f878 100644 --- a/brozzler/site.py +++ b/brozzler/site.py @@ -3,7 +3,6 @@ import surt import json import logging -import urllib.robotparser import requests import reppy.cache diff --git a/brozzler/worker.py b/brozzler/worker.py index 6cd7e73..f6cf4a0 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -57,18 +57,17 @@ class BrozzlerWorker: def _completed_page(self, site, page): with kombu.Connection(self._amqp_url) as conn: q = conn.SimpleQueue("brozzler.sites.{}.completed_pages".format(site.id)) - self.logger.info("putting {} on queue {}".format(page, q.queue.name)) + self.logger.info("feeding {} to {}".format(page, q.queue.name)) q.put(page.to_dict()) def _disclaim_site(self, site, page=None): - # XXX maybe should put on "disclaimed" queue and hq should put back on "unclaimed" with kombu.Connection(self._amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.unclaimed".format(site.id)) - self.logger.info("putting {} on queue {}".format(site, q.queue.name)) + q = conn.SimpleQueue("brozzler.sites.disclaimed".format(site.id)) + self.logger.info("feeding {} to {}".format(site, q.queue.name)) q.put(site.to_dict()) if page: q = conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id)) - self.logger.info("putting unfinished page {} on queue {}".format(page, q.queue.name)) + self.logger.info("feeding unfinished page {} to {}".format(page, q.queue.name)) q.put(page.to_dict()) def _putmeta(self, warcprox_address, url, content_type, payload, extra_headers=None):