diff --git a/umbra/umbra.py b/umbra/umbra.py index c3d4403..ce3642c 100755 --- a/umbra/umbra.py +++ b/umbra/umbra.py @@ -143,19 +143,29 @@ class Umbra: self.amqp_thread.join() def consume_amqp(self): - self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True) - url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange) - self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url)) - with Connection(self.amqp_url) as conn: - self.producer = conn.Producer(serializer='json') - self.producer_lock = threading.Lock() - with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer: - import socket - while not self.amqp_stop.is_set(): - try: - conn.drain_events(timeout=0.5) - except socket.timeout: - pass + while not self.amqp_stop.is_set(): + try: + self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True) + url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange) + self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url)) + with Connection(self.amqp_url) as conn: + if self.producer_lock is None: + self.producer_lock = threading.Lock() + with self.producer_lock: + self.producer = conn.Producer(serializer='json') + with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer: + import socket + while not self.amqp_stop.is_set(): + import random + if random.randrange(0, 30) == 0: + raise BaseException("test exception!") + try: + conn.drain_events(timeout=0.5) + except socket.timeout: + pass + except BaseException as e: + self.logger.error("amqp exception {}".format(e)) + self.logger.error("attempting to reopen amqp connection") def fetch_url(self, body, message): client_id = body['clientId']