tracing tweaks

This commit is contained in:
Steve Waterworth
2018-01-31 17:27:17 +00:00
parent e778bd7f5b
commit 3f5034a102
6 changed files with 98 additions and 23 deletions

View File

@@ -24,22 +24,23 @@ class Publisher:
self._channel.exchange_declare(exchange=self.EXCHANGE, exchange_type=self.TYPE, durable=True)
self._logger.info('connected to broker')
def _publish(self, msg):
def _publish(self, msg, headers):
self._channel.basic_publish(exchange=self.EXCHANGE,
routing_key=self.ROUTING_KEY,
properties=pika.BasicProperties(headers=headers),
body=json.dumps(msg).encode())
self._logger.info('message sent')
#Publish msg, reconnecting if necessary.
def publish(self, msg):
def publish(self, msg, headers):
if self._channel is None:
self._connect()
try:
self._publish(msg)
self._publish(msg, headers)
except pika.exceptions.ConnectionClosed:
self._logger.info('reconnecting to queue')
self._connect()
self._publish(msg)
self._publish(msg, headers)
def close(self):
if self._conn and self._conn.is_open: