mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-07-26 08:15:43 -04:00
Merge pull request #19 from nlevitt/ari-3814
ARI-3814 try to recover from rabbitmq communication problems
This commit is contained in:
commit
e7353fbb4b
1 changed files with 20 additions and 13 deletions
|
@ -143,12 +143,16 @@ class Umbra:
|
||||||
self.amqp_thread.join()
|
self.amqp_thread.join()
|
||||||
|
|
||||||
def consume_amqp(self):
|
def consume_amqp(self):
|
||||||
|
while not self.amqp_stop.is_set():
|
||||||
|
try:
|
||||||
self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True)
|
self.umbra_exchange = Exchange(name='umbra', type='direct', durable=True)
|
||||||
url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange)
|
url_queue = Queue('urls', routing_key='url', exchange=self.umbra_exchange)
|
||||||
self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url))
|
self.logger.info("connecting to amqp exchange={} at {}".format(self.umbra_exchange.name, self.amqp_url))
|
||||||
with Connection(self.amqp_url) as conn:
|
with Connection(self.amqp_url) as conn:
|
||||||
self.producer = conn.Producer(serializer='json')
|
if self.producer_lock is None:
|
||||||
self.producer_lock = threading.Lock()
|
self.producer_lock = threading.Lock()
|
||||||
|
with self.producer_lock:
|
||||||
|
self.producer = conn.Producer(serializer='json')
|
||||||
with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer:
|
with conn.Consumer(url_queue, callbacks=[self.fetch_url]) as consumer:
|
||||||
import socket
|
import socket
|
||||||
while not self.amqp_stop.is_set():
|
while not self.amqp_stop.is_set():
|
||||||
|
@ -156,6 +160,9 @@ class Umbra:
|
||||||
conn.drain_events(timeout=0.5)
|
conn.drain_events(timeout=0.5)
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
pass
|
pass
|
||||||
|
except BaseException as e:
|
||||||
|
self.logger.error("amqp exception {}".format(e))
|
||||||
|
self.logger.error("attempting to reopen amqp connection")
|
||||||
|
|
||||||
def fetch_url(self, body, message):
|
def fetch_url(self, body, message):
|
||||||
client_id = body['clientId']
|
client_id = body['clientId']
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue