Skip to content

Commit

Permalink
issue-8 - Initial working version of Artemis broker implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
fgiorgetti committed Dec 4, 2018
1 parent 42eec54 commit ab51f82
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions messaging_components/brokers/artemis/artemis.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def create_address(self, address: Address):
:return:
"""
client = self._get_client()
client.create_address(address.name, address.routing_type.name)
routing_type = self._get_routing_type(address.routing_type)
return client.create_address(address.name, routing_type)

def create_queue(self, queue: Queue, address: Address, durable: bool = True):
"""
Expand All @@ -69,7 +70,9 @@ def create_queue(self, queue: Queue, address: Address, durable: bool = True):
:return:
"""
client = self._get_client()
client.create_queue(address.name, queue.name, durable, queue.routing_type.name)
if queue.routing_type == RoutingType.BOTH:
raise ValueError('Queues can only use ANYCAST or MULTICAST routing type')
return client.create_queue(address.name, queue.name, durable, queue.routing_type.name)

def delete_address(self, name: str, force: bool = False):
"""
Expand All @@ -79,7 +82,7 @@ def delete_address(self, name: str, force: bool = False):
:return:
"""
client = self._get_client()
client.delete_address(name, force)
return client.delete_address(name, force)

def delete_queue(self, name: str, remove_consumers: bool = False):
"""
Expand All @@ -89,7 +92,7 @@ def delete_queue(self, name: str, remove_consumers: bool = False):
:return:
"""
client = self._get_client()
client.delete_queue(name, remove_consumers)
return client.delete_queue(name, remove_consumers)

def _refresh_addresses_and_queues(self):
"""
Expand Down Expand Up @@ -125,7 +128,7 @@ def _refresh_addresses_and_queues(self):
else:
# Parsing returned addresses
for addr_info in addresses_result.data:
logging.debug("Address found: %s" % addr_info['name'])
logging.debug("Address found: %s - routingType: %s" % (addr_info['name'], addr_info['routingTypes']))
address = Address(name=addr_info['name'],
routing_type=RoutingType.from_value(addr_info['routingTypes']))
addresses_dict[address.name] = address
Expand All @@ -137,7 +140,7 @@ def _refresh_addresses_and_queues(self):
else:
# Parsing returned queues
for queue_info in queues_result.data:
logging.debug("Queue found: %s" % queue_info['name'])
logging.debug("Queue found: %s - routingType: %s" % (queue_info['name'], queue_info['routingType']))
routing_type = RoutingType.from_value(queue_info['routingType'])
address = addresses_dict[queue_info['address']]
queue = Queue(name=queue_info['name'],
Expand All @@ -160,3 +163,14 @@ def _get_client(self):
client = ArtemisJolokiaClient(self.broker_name, self.node.get_ip(), self.web_port,
self.user, self.password)
return client

def _get_routing_type(self, routing_type: RoutingType) -> str:
"""
Returns the routing type str value, based on expected values on the broker.
:param routing_type:
:return:
"""
if routing_type == RoutingType.BOTH:
return 'ANYCAST, MULTICAST'
return routing_type.name

0 comments on commit ab51f82

Please sign in to comment.