diff --git a/brozzler/worker.py b/brozzler/worker.py index 1049295..95b6cda 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -149,38 +149,51 @@ class BrozzlerWorker: self._disclaim_site(site, page) self._browser_pool.release(browser) + def _claim_site(self, q): + """Reads message from SimpleQueue q, fires off thread to brozzle the + site. Raises KeyError if no browsers are available, kombu.simple.Empty + if queue is empty.""" + browser = self._browser_pool.acquire() + try: + msg = q.get(block=True, timeout=0.5) + site = brozzler.Site(**msg.payload) + msg.ack() + self.logger.info("brozzling site {}".format(site)) + ydl = self._youtube_dl(site) + th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site), + name="BrowsingThread-{}".format(site.scope_surt)) + th.start() + except: + self._browser_pool.release(browser) + raise + def run(self): latest_state = None while not self._shutdown_requested.is_set(): - with kombu.Connection(self._amqp_url) as conn: - q = conn.SimpleQueue("brozzler.sites.unclaimed") - q_empty = False - if len(q) > 0: - try: - browser = self._browser_pool.acquire() + try: + # XXX too much connecting and disconnecting from rabbitmq + with kombu.Connection(self._amqp_url) as conn: + q = conn.SimpleQueue("brozzler.sites.unclaimed") + q_empty = False + if len(q) > 0: try: - msg = q.get(block=True, timeout=0.5) - site = brozzler.Site(**msg.payload) - msg.ack() # XXX ack only after browsing finished? kinda complicated - self.logger.info("brozzling site {}".format(site)) - ydl = self._youtube_dl(site) - th = threading.Thread(target=lambda: self._brozzle_site(browser, ydl, site), - name="BrowsingThread-{}".format(site.scope_surt)) - th.start() + self._claim_site(q) except kombu.simple.Empty: q_empty = True - except KeyError: - if latest_state != "browsers-busy": - self.logger.info("all {} browsers are busy".format(self._max_browsers)) - latest_state = "browsers-busy" - else: - q_empty = True + except KeyError: + if latest_state != "browsers-busy": + self.logger.info("all {} browsers are busy".format(self._max_browsers)) + latest_state = "browsers-busy" + else: + q_empty = True - if q_empty: - if latest_state != "no-unclaimed-sites": - self.logger.info("no unclaimed sites to browse") - latest_state = "no-unclaimed-sites" - time.sleep(0.5) + if q_empty: + if latest_state != "no-unclaimed-sites": + self.logger.info("no unclaimed sites to browse") + latest_state = "no-unclaimed-sites" + time.sleep(0.5) + except OSError as e: + self.logger.warn("continuing after i/o exception (from rabbitmq?)", exc_info=True) def start(self): th = threading.Thread(target=self.run, name="BrozzlerWorker")