From 6306c1669863fe72b26d85408efca908bf8b082f Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 23 Jun 2014 17:18:27 -0700 Subject: [PATCH] kill -HUP to immediately close and reopen amqp consumer connection --- bin/umbra | 3 ++- umbra/controller.py | 11 +++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/bin/umbra b/bin/umbra index 8817e74..63da89d 100755 --- a/bin/umbra +++ b/bin/umbra @@ -78,7 +78,6 @@ if __name__ == "__main__": logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs))) - signal.signal(signal.SIGQUIT, dump_state) class ShutdownRequested(Exception): pass @@ -88,6 +87,8 @@ if __name__ == "__main__": def sigint(signum, frame): raise ShutdownRequested('shutdown requested (caught SIGINT)') + signal.signal(signal.SIGQUIT, dump_state) + signal.signal(signal.SIGHUP, controller.reconnect) signal.signal(signal.SIGTERM, sigterm) signal.signal(signal.SIGINT, sigint) diff --git a/umbra/controller.py b/umbra/controller.py index 644e357..713d39b 100644 --- a/umbra/controller.py +++ b/umbra/controller.py @@ -60,6 +60,8 @@ class AmqpBrowserController: self._exchange = kombu.Exchange(name=self.exchange_name, type='direct', durable=True) + self._reconnect_requested = False + self._producer = None self._producer_lock = threading.Lock() with self._producer_lock: @@ -80,12 +82,16 @@ class AmqpBrowserController: self._browser_pool.shutdown_now() self._consumer_thread.join() + def reconnect(self, *args, **kwargs): + self._reconnect_requested = True + self._browser_pool.shutdown_now() + def _wait_for_and_browse_urls(self, conn, consumer, timeout): start = time.time() browser = None consumer.qos(prefetch_count=1) - while not self._consumer_stop.is_set() and time.time() - start < timeout: + while not self._consumer_stop.is_set() and time.time() - start < timeout and not self._reconnect_requested: try: browser = self._browser_pool.acquire() # raises KeyError if none available browser.start() @@ -101,7 +107,7 @@ class AmqpBrowserController: except socket.timeout: pass - if self._consumer_stop.is_set() or time.time() - start >= timeout: + if self._consumer_stop.is_set() or time.time() - start >= timeout or self._reconnect_requested: browser.stop() self._browser_pool.release(browser) break @@ -138,6 +144,7 @@ class AmqpBrowserController: while not self._consumer_stop.is_set(): try: self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url)) + self._reconnect_requested = False with kombu.Connection(self.amqp_url) as conn: with conn.Consumer(url_queue) as consumer: self._wait_for_and_browse_urls(conn, consumer, timeout=RECONNECT_AFTER_SECONDS)