mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-04-19 23:35:54 -04:00
kill -HUP to immediately close and reopen amqp consumer connection
This commit is contained in:
parent
02c054c284
commit
6306c16698
@ -78,7 +78,6 @@ if __name__ == "__main__":
|
||||
|
||||
logging.warn("dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs)))
|
||||
|
||||
signal.signal(signal.SIGQUIT, dump_state)
|
||||
|
||||
class ShutdownRequested(Exception):
|
||||
pass
|
||||
@ -88,6 +87,8 @@ if __name__ == "__main__":
|
||||
def sigint(signum, frame):
|
||||
raise ShutdownRequested('shutdown requested (caught SIGINT)')
|
||||
|
||||
signal.signal(signal.SIGQUIT, dump_state)
|
||||
signal.signal(signal.SIGHUP, controller.reconnect)
|
||||
signal.signal(signal.SIGTERM, sigterm)
|
||||
signal.signal(signal.SIGINT, sigint)
|
||||
|
||||
|
@ -60,6 +60,8 @@ class AmqpBrowserController:
|
||||
self._exchange = kombu.Exchange(name=self.exchange_name, type='direct',
|
||||
durable=True)
|
||||
|
||||
self._reconnect_requested = False
|
||||
|
||||
self._producer = None
|
||||
self._producer_lock = threading.Lock()
|
||||
with self._producer_lock:
|
||||
@ -80,12 +82,16 @@ class AmqpBrowserController:
|
||||
self._browser_pool.shutdown_now()
|
||||
self._consumer_thread.join()
|
||||
|
||||
def reconnect(self, *args, **kwargs):
|
||||
self._reconnect_requested = True
|
||||
self._browser_pool.shutdown_now()
|
||||
|
||||
def _wait_for_and_browse_urls(self, conn, consumer, timeout):
|
||||
start = time.time()
|
||||
browser = None
|
||||
consumer.qos(prefetch_count=1)
|
||||
|
||||
while not self._consumer_stop.is_set() and time.time() - start < timeout:
|
||||
while not self._consumer_stop.is_set() and time.time() - start < timeout and not self._reconnect_requested:
|
||||
try:
|
||||
browser = self._browser_pool.acquire() # raises KeyError if none available
|
||||
browser.start()
|
||||
@ -101,7 +107,7 @@ class AmqpBrowserController:
|
||||
except socket.timeout:
|
||||
pass
|
||||
|
||||
if self._consumer_stop.is_set() or time.time() - start >= timeout:
|
||||
if self._consumer_stop.is_set() or time.time() - start >= timeout or self._reconnect_requested:
|
||||
browser.stop()
|
||||
self._browser_pool.release(browser)
|
||||
break
|
||||
@ -138,6 +144,7 @@ class AmqpBrowserController:
|
||||
while not self._consumer_stop.is_set():
|
||||
try:
|
||||
self.logger.info("connecting to amqp exchange={} at {}".format(self._exchange.name, self.amqp_url))
|
||||
self._reconnect_requested = False
|
||||
with kombu.Connection(self.amqp_url) as conn:
|
||||
with conn.Consumer(url_queue) as consumer:
|
||||
self._wait_for_and_browse_urls(conn, consumer, timeout=RECONNECT_AFTER_SECONDS)
|
||||
|
Loading…
x
Reference in New Issue
Block a user