mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-04-20 23:56:34 -04:00
refactor to simplify run()
This commit is contained in:
parent
5c701abb36
commit
8366bd2d66
@ -149,38 +149,51 @@ class BrozzlerWorker:
|
||||
self._disclaim_site(site, page)
|
||||
self._browser_pool.release(browser)
|
||||
|
||||
def _claim_site(self, q):
|
||||
"""Reads message from SimpleQueue q, fires off thread to brozzle the
|
||||
site. Raises KeyError if no browsers are available, kombu.simple.Empty
|
||||
if queue is empty."""
|
||||
browser = self._browser_pool.acquire()
|
||||
try:
|
||||
msg = q.get(block=True, timeout=0.5)
|
||||
site = brozzler.Site(**msg.payload)
|
||||
msg.ack()
|
||||
self.logger.info("brozzling site {}".format(site))
|
||||
ydl = self._youtube_dl(site)
|
||||
th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site),
|
||||
name="BrowsingThread-{}".format(site.scope_surt))
|
||||
th.start()
|
||||
except:
|
||||
self._browser_pool.release(browser)
|
||||
raise
|
||||
|
||||
def run(self):
|
||||
latest_state = None
|
||||
while not self._shutdown_requested.is_set():
|
||||
with kombu.Connection(self._amqp_url) as conn:
|
||||
q = conn.SimpleQueue("brozzler.sites.unclaimed")
|
||||
q_empty = False
|
||||
if len(q) > 0:
|
||||
try:
|
||||
browser = self._browser_pool.acquire()
|
||||
try:
|
||||
# XXX too much connecting and disconnecting from rabbitmq
|
||||
with kombu.Connection(self._amqp_url) as conn:
|
||||
q = conn.SimpleQueue("brozzler.sites.unclaimed")
|
||||
q_empty = False
|
||||
if len(q) > 0:
|
||||
try:
|
||||
msg = q.get(block=True, timeout=0.5)
|
||||
site = brozzler.Site(**msg.payload)
|
||||
msg.ack() # XXX ack only after browsing finished? kinda complicated
|
||||
self.logger.info("brozzling site {}".format(site))
|
||||
ydl = self._youtube_dl(site)
|
||||
th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site),
|
||||
name="BrowsingThread-{}".format(site.scope_surt))
|
||||
th.start()
|
||||
self._claim_site(q)
|
||||
except kombu.simple.Empty:
|
||||
q_empty = True
|
||||
except KeyError:
|
||||
if latest_state != "browsers-busy":
|
||||
self.logger.info("all {} browsers are busy".format(self._max_browsers))
|
||||
latest_state = "browsers-busy"
|
||||
else:
|
||||
q_empty = True
|
||||
except KeyError:
|
||||
if latest_state != "browsers-busy":
|
||||
self.logger.info("all {} browsers are busy".format(self._max_browsers))
|
||||
latest_state = "browsers-busy"
|
||||
else:
|
||||
q_empty = True
|
||||
|
||||
if q_empty:
|
||||
if latest_state != "no-unclaimed-sites":
|
||||
self.logger.info("no unclaimed sites to browse")
|
||||
latest_state = "no-unclaimed-sites"
|
||||
time.sleep(0.5)
|
||||
if q_empty:
|
||||
if latest_state != "no-unclaimed-sites":
|
||||
self.logger.info("no unclaimed sites to browse")
|
||||
latest_state = "no-unclaimed-sites"
|
||||
time.sleep(0.5)
|
||||
except OSError as e:
|
||||
self.logger.warn("continuing after i/o exception (from rabbitmq?)", exc_info=True)
|
||||
|
||||
def start(self):
|
||||
th = threading.Thread(target=self.run, name="BrozzlerWorker")
|
||||
|
Loading…
x
Reference in New Issue
Block a user