new queue for disclaimed sites so that sites that are finished don't get picked up again off of the unclaimed queue

This commit is contained in:
Noah Levitt 2015-07-23 01:21:23 +00:00
parent 6e6fd5dc2c
commit 4dacc0b087
3 changed files with 31 additions and 6 deletions

View File

@ -124,6 +124,15 @@ class BrozzlerHQDb:
cursor.execute("update brozzler_sites set status=? where id=?", (status, site.id,))
self._conn.commit()
def get_status(self, site):
cursor = self._conn.cursor()
cursor.execute("select status from brozzler_sites where id=?", (site.id,))
row = cursor.fetchone()
if row:
return row[0]
else:
raise KeyError("site not in brozzler_sites id={}".format(site.id,))
class BrozzlerHQ:
logger = logging.getLogger(__module__ + "." + __qualname__)
@ -132,6 +141,7 @@ class BrozzlerHQ:
self._conn = kombu.Connection(amqp_url)
self._new_sites_q = self._conn.SimpleQueue("brozzler.sites.new")
self._unclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.unclaimed")
self._disclaimed_sites_q = self._conn.SimpleQueue("brozzler.sites.disclaimed")
if db != None:
self._db = db
else:
@ -143,10 +153,27 @@ class BrozzlerHQ:
self._new_site()
self._consume_completed_page()
self._feed_pages()
self._disclaimed_site()
time.sleep(0.5)
finally:
self._conn.close()
def _disclaimed_site(self):
try:
msg = self._disclaimed_sites_q.get(block=False)
site = brozzler.Site(**msg.payload)
msg.ack()
self.logger.info("received disclaimed site {}".format(site))
status = self._db.get_status(site)
if status != "FINISHED":
self.logger.info("feeding disclaimed site {} back to {}".format(site, self._unclaimed_sites_q.queue.name))
self._unclaimed_sites_q.put(site.to_dict())
else:
self.logger.info("disclaimed site is FINISHED {}".format(site))
except kombu.simple.Empty:
pass
def _new_site(self):
try:
msg = self._new_sites_q.get(block=False)

View File

@ -3,7 +3,6 @@
import surt
import json
import logging
import urllib.robotparser
import requests
import reppy.cache

View File

@ -57,18 +57,17 @@ class BrozzlerWorker:
def _completed_page(self, site, page):
with kombu.Connection(self._amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.{}.completed_pages".format(site.id))
self.logger.info("putting {} on queue {}".format(page, q.queue.name))
self.logger.info("feeding {} to {}".format(page, q.queue.name))
q.put(page.to_dict())
def _disclaim_site(self, site, page=None):
# XXX maybe should put on "disclaimed" queue and hq should put back on "unclaimed"
with kombu.Connection(self._amqp_url) as conn:
q = conn.SimpleQueue("brozzler.sites.unclaimed".format(site.id))
self.logger.info("putting {} on queue {}".format(site, q.queue.name))
q = conn.SimpleQueue("brozzler.sites.disclaimed".format(site.id))
self.logger.info("feeding {} to {}".format(site, q.queue.name))
q.put(site.to_dict())
if page:
q = conn.SimpleQueue("brozzler.sites.{}.pages".format(site.id))
self.logger.info("putting unfinished page {} on queue {}".format(page, q.queue.name))
self.logger.info("feeding unfinished page {} to {}".format(page, q.queue.name))
q.put(page.to_dict())
def _putmeta(self, warcprox_address, url, content_type, payload, extra_headers=None):