Files
robot-shop/payment/rabbitmq.py

66 lines
2.3 KiB
Python
Raw Normal View History

2018-01-26 16:01:39 +00:00
import json
import pika
import os
2018-01-26 16:01:39 +00:00
2019-08-24 06:10:08 +02:00
from cfenv import AppEnv
2018-01-26 16:01:39 +00:00
class Publisher:
EXCHANGE='robot-shop'
TYPE='direct'
ROUTING_KEY = 'orders'
def __init__(self, logger):
self._logger = logger
2019-08-24 06:10:08 +02:00
if 'VCAP_SERVICES' in os.environ:
self._logger.info('Cloud Foundry detected')
2019-08-24 06:10:08 +02:00
env = AppEnv()
amqp_service = env.get_service(binding_name='dispatch_queue')
self._logger.info('Service binding \'{binding_name}\' found'.format(binding_name='dispatch_queue'))
2019-08-24 06:10:08 +02:00
self._uri = amqp_service.credentials.get('uri')
else:
self._uri = 'ampq://{user}:{pwd}@{host}:{port}/{vhost}'.format(
host=os.getenv('AMQP_HOST', 'rabbitmq'),
port=os.getenv('AMQP_PORT', '5672'),
vhost='/',
user=os.getenv('AMQP_USER', 'guest'),
pwd=os.getenv('AMQP_PWD', 'guest'))
self._params = pika.URLParameters(self._uri)
2018-01-26 16:01:39 +00:00
self._conn = None
self._channel = None
def _connect(self):
if not self._conn or self._conn.is_closed or self._channel is None or self._channel.is_closed:
2018-01-26 16:01:39 +00:00
self._conn = pika.BlockingConnection(self._params)
self._channel = self._conn.channel()
2018-01-29 17:40:12 +00:00
self._channel.exchange_declare(exchange=self.EXCHANGE, exchange_type=self.TYPE, durable=True)
2018-01-26 16:01:39 +00:00
self._logger.info('connected to broker')
2018-01-31 17:27:17 +00:00
def _publish(self, msg, headers):
2018-01-26 16:01:39 +00:00
self._channel.basic_publish(exchange=self.EXCHANGE,
routing_key=self.ROUTING_KEY,
2018-01-31 17:27:17 +00:00
properties=pika.BasicProperties(headers=headers),
2018-01-26 16:01:39 +00:00
body=json.dumps(msg).encode())
self._logger.info('message sent')
#Publish msg, reconnecting if necessary.
2018-01-31 17:27:17 +00:00
def publish(self, msg, headers):
if self._channel is None or self._channel.is_closed or self._conn is None or self._conn.is_closed:
2018-01-26 16:01:39 +00:00
self._connect()
try:
2018-01-31 17:27:17 +00:00
self._publish(msg, headers)
2018-01-26 16:01:39 +00:00
except pika.exceptions.ConnectionClosed:
self._logger.info('reconnecting to queue')
self._connect()
2018-01-31 17:27:17 +00:00
self._publish(msg, headers)
2018-01-26 16:01:39 +00:00
def close(self):
if self._conn and self._conn.is_open:
self._logger.info('closing queue connection')
self._conn.close()