mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-02-24 08:39:59 -05:00
ARI-3814 try to recover from rabbitmq communication problems
This commit is contained in:
parent
4e72cbae58
commit
aacb886b62
@ -143,19 +143,29 @@ class Umbra:
|
||||
self.amqp_thread.join()
|
||||
|
||||
def consume_amqp(self):
|
||||
while not self.amqp_stop.is_set():
|
||||
try:
|
||||
self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True)
|
||||
url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange)
|
||||
self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url))
|
||||
with Connection(self.amqp_url) as conn:
|
||||
self.producer = conn.Producer(serializer='json')
|
||||
if self.producer_lock is None:
|
||||
self.producer_lock = threading.Lock()
|
||||
with self.producer_lock:
|
||||
self.producer = conn.Producer(serializer='json')
|
||||
with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer:
|
||||
import socket
|
||||
while not self.amqp_stop.is_set():
|
||||
import random
|
||||
if random.randrange(0, 30) == 0:
|
||||
raise BaseException("test exception!")
|
||||
try:
|
||||
conn.drain_events(timeout=0.5)
|
||||
except socket.timeout:
|
||||
pass
|
||||
except BaseException as e:
|
||||
self.logger.error("amqp exception {}".format(e))
|
||||
self.logger.error("attempting to reopen amqp connection")
|
||||
|
||||
def fetch_url(self, body, message):
|
||||
client_id = body['clientId']
|
||||
|
Loading…
x
Reference in New Issue
Block a user