diff --git a/bin/drain-queue b/bin/drain-queue new file mode 100755 index 0000000..ad9fd24 --- /dev/null +++ b/bin/drain-queue @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# vim: set sw=4 et: +import os +import sys +import argparse +import logging +import socket +from kombu import Connection, Exchange, Queue + +arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), + description='drain-queue - consume messages from AMQP queue', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the AMQP server to talk to') +arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', + help='AMQP exchange name') +arg_parser.add_argument('--queue', dest='amqp_queue', default='urls', + help='AMQP queue name') +arg_parser.add_argument('-n', '--no-ack', dest='no_ack', action="store_const", + default=False, const=True, help="leave messages on the queue (default: remove them from the queue)") +arg_parser.add_argument('-r', '--run-forever', dest='run_forever', action="store_const", + default=False, const=True, help="run forever, waiting for new messages to appear on the queue (default: exit when all messages in the queue have been consumed)") +arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) +args = arg_parser.parse_args(args=sys.argv[1:]) + +logging.basicConfig(stream=sys.stderr, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +def print_and_maybe_ack(body, message): + print(body) + if not args.no_ack: + message.ack() + +exchange = Exchange(args.amqp_exchange, 'direct', durable=True) +queue = Queue(args.amqp_queue, exchange=exchange) +try: + with Connection(args.amqp_url) as conn: + with conn.Consumer(queue, callbacks=[print_and_maybe_ack]) as consumer: + while True: + try: + conn.drain_events(timeout=0.5) + except socket.timeout: + if not args.run_forever: + logging.debug("exiting, no messages left on the queue") + break +except KeyboardInterrupt: + logging.debug("exiting, stopped by user") diff --git a/bin/dump_queue.py b/bin/dump_queue.py deleted file mode 100755 index 9e31619..0000000 --- a/bin/dump_queue.py +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env python -from json import dumps, loads -import os,sys,argparse, urllib.request, urllib.error, urllib.parse -import websocket -import time -import uuid -import logging -import threading -from kombu import Connection, Exchange, Queue -logging.basicConfig(level=logging.INFO) - -umbra_exchange = Exchange('umbra', 'direct', durable=True) -requests_queue = Queue('requests', exchange=umbra_exchange) -def print_and_ack(body, message): - print(body['url']) - message.ack() - -with Connection(sys.argv[1] if len(sys.argv) > 1 else "amqp://guest:guest@localhost:5672//") as conn: - with conn.Consumer(requests_queue, callbacks=[print_and_ack]) as consumer: - while True: - conn.drain_events() diff --git a/bin/load_url.py b/bin/queue-url similarity index 52% rename from bin/load_url.py rename to bin/queue-url index ac87cef..2e4d01c 100755 --- a/bin/load_url.py +++ b/bin/queue-url @@ -9,22 +9,30 @@ import logging import threading from kombu import Connection, Exchange, Queue -logging.basicConfig(stream=sys.stdout, level=logging.INFO, - format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') - arg_parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]), - description='load_url.py - send url to umbra via amqp', + description='queue-url - send url to umbra via amqp', formatter_class=argparse.ArgumentDefaultsHelpFormatter) -arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672//', - help='URL identifying the amqp server to talk to') +arg_parser.add_argument('-u', '--url', dest='amqp_url', default='amqp://guest:guest@localhost:5672/%2f', + help='URL identifying the AMQP server to talk to') +arg_parser.add_argument('--exchange', dest='amqp_exchange', default='umbra', + help='AMQP exchange name') +arg_parser.add_argument('--routing-key', dest='amqp_routing_key', default='url', + help='AMQP routing key') arg_parser.add_argument('-i', '--client-id', dest='client_id', default='load_url.0', help='client id - included in the json payload with each url; umbra uses this value as the routing key to send requests back to') +arg_parser.add_argument('-v', '--verbose', dest='log_level', + action="store_const", default=logging.INFO, const=logging.DEBUG) arg_parser.add_argument('urls', metavar='URL', nargs='+', help='URLs to send to umbra') args = arg_parser.parse_args(args=sys.argv[1:]) -umbra_exchange = Exchange('umbra', 'direct', durable=True) +logging.basicConfig(stream=sys.stdout, level=args.log_level, + format='%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s') + +exchange = Exchange(args.amqp_exchange, 'direct', durable=True) with Connection(args.amqp_url) as conn: producer = conn.Producer(serializer='json') for url in args.urls: - producer.publish({'url': url, 'metadata': {}, 'clientId': args.client_id}, 'url', exchange=umbra_exchange) - + payload = {'url': url, 'metadata': {}, 'clientId': args.client_id} + logging.info("sending to amqp url={} exchange={} routing_key={} -- {}".format(args.amqp_url, args.amqp_exchange, args.amqp_routing_key, payload)) + producer.publish(payload, routing_key=args.amqp_routing_key, exchange=exchange) + diff --git a/setup.py b/setup.py index f0fd904..ae5c4ab 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,7 @@ # vim: set sw=4 et: import setuptools +import glob setuptools.setup(name='umbra', version='0.1', @@ -13,7 +14,7 @@ setuptools.setup(name='umbra', packages=['umbra'], package_data={'umbra':['behaviors.d/*.js']}, install_requires=['kombu', 'websocket-client-py3==0.13.1','argparse'], - scripts=['bin/umbra', 'bin/load_url.py', 'bin/dump_queue.py'], + scripts=glob.glob("bin/*"), zip_safe=False, classifiers=[ 'Development Status :: 3 - Alpha Development Status',