2018-01-26 16:01:39 +00:00
|
|
|
import json
|
|
|
|
import pika
|
2018-04-14 22:48:30 -05:00
|
|
|
import os
|
2018-01-26 16:01:39 +00:00
|
|
|
|
|
|
|
class Publisher:
|
2018-04-16 15:19:38 +01:00
|
|
|
HOST = os.getenv('AMQP_HOST', 'rabbitmq')
|
2018-01-26 16:01:39 +00:00
|
|
|
VIRTUAL_HOST = '/'
|
|
|
|
EXCHANGE='robot-shop'
|
|
|
|
TYPE='direct'
|
|
|
|
ROUTING_KEY = 'orders'
|
|
|
|
|
|
|
|
def __init__(self, logger):
|
|
|
|
self._logger = logger
|
|
|
|
self._params = pika.connection.ConnectionParameters(
|
|
|
|
host=self.HOST,
|
|
|
|
virtual_host=self.VIRTUAL_HOST,
|
|
|
|
credentials=pika.credentials.PlainCredentials('guest', 'guest'))
|
|
|
|
self._conn = None
|
|
|
|
self._channel = None
|
|
|
|
|
|
|
|
def _connect(self):
|
2019-10-10 13:32:20 +02:00
|
|
|
if not self._conn or self._conn.is_closed or self._channel is None or self._channel.is_closed:
|
2018-01-26 16:01:39 +00:00
|
|
|
self._conn = pika.BlockingConnection(self._params)
|
|
|
|
self._channel = self._conn.channel()
|
2018-01-29 17:40:12 +00:00
|
|
|
self._channel.exchange_declare(exchange=self.EXCHANGE, exchange_type=self.TYPE, durable=True)
|
2018-01-26 16:01:39 +00:00
|
|
|
self._logger.info('connected to broker')
|
|
|
|
|
2018-01-31 17:27:17 +00:00
|
|
|
def _publish(self, msg, headers):
|
2018-01-26 16:01:39 +00:00
|
|
|
self._channel.basic_publish(exchange=self.EXCHANGE,
|
|
|
|
routing_key=self.ROUTING_KEY,
|
2018-01-31 17:27:17 +00:00
|
|
|
properties=pika.BasicProperties(headers=headers),
|
2018-01-26 16:01:39 +00:00
|
|
|
body=json.dumps(msg).encode())
|
|
|
|
self._logger.info('message sent')
|
|
|
|
|
|
|
|
#Publish msg, reconnecting if necessary.
|
2018-01-31 17:27:17 +00:00
|
|
|
def publish(self, msg, headers):
|
2019-10-10 13:32:20 +02:00
|
|
|
if self._channel is None or self._channel.is_closed or self._conn is None or self._conn.is_closed:
|
2018-01-26 16:01:39 +00:00
|
|
|
self._connect()
|
|
|
|
try:
|
2018-01-31 17:27:17 +00:00
|
|
|
self._publish(msg, headers)
|
2019-04-24 14:38:31 +01:00
|
|
|
except (pika.exceptions.ConnectionClosed, pika.exceptions.StreamLostError):
|
2018-01-26 16:01:39 +00:00
|
|
|
self._logger.info('reconnecting to queue')
|
|
|
|
self._connect()
|
2018-01-31 17:27:17 +00:00
|
|
|
self._publish(msg, headers)
|
2018-01-26 16:01:39 +00:00
|
|
|
|
|
|
|
def close(self):
|
|
|
|
if self._conn and self._conn.is_open:
|
|
|
|
self._logger.info('closing queue connection')
|
|
|
|
self._conn.close()
|
|
|
|
|