Zato integrates with Microsoft Azure Service Bus, letting you receive and send messages from Azure queues in your Python services.
After adding a connection, such as the one below, your services will be able to receive and send messages without any additional configuration or programming.

Assuming you have an Azure Service Bus namespace and a queue created, you need to create a Shared Access Policy to authenticate Zato with Azure Service Bus. You can create one at the namespace level (applies to all queues) or at the queue level (applies only to that queue).
For namespace-level policy:
MyPolicyName, and note that it will later become your username in the connection string

The string will look like this in the screenshot below, and you'll need to extract a couple of things from it:

amqpsmycompany-dev.servicebus.windows.net) and port 5671In Zato, you create channels that will listen for messages from the bus. To send messages to queues, you create an outgoing connection. Both will rely on the connection details extracted in the previous step.
A single channel listens for messages from a single queue. An outgoing connection, on the other hand, can be used to send messages to any queue by its name.

Service demo.input-logger is a useful one because it logs all the messages it receives, so you can send some test messages to your queue, and it will log everything as soon as it receives them.

Once you have configured a channel, Zato will automatically invoke your service for each message received from the Azure queue.
self.request.payload.self.request.raw_request, and that lets you control whether you want to acknowledge or reject a given message.When you receive a message from Azure Service Bus, you control what happens to it:
msg.ack() - Acknowledges the message. This removes it from the queue permanently. Use this when you have successfully processed the message.
msg.reject() - Rejects the message. This releases it back to the queue so it can be delivered again. Use this when processing failed and you want to receive the message later to try again.
A minimal service that receives a message and acknowledges it:
from zato.server.service import Service
class FlightStatusHandler(Service):
def handle(self):
# Get the message from Azure Service Bus
msg = self.request.raw_request
# Log what we received
self.logger.info('Flight status update: %s', self.request.payload)
# Acknowledge the message - removes it from the queue
msg.ack()
In this example, an airport operations system receives flight status updates from Azure Service Bus and forwards them to a flight information display system (FIDS) via REST API. If the update fails, the message is rejected so it can be retried.
from zato.server.service import Service
class ProcessFlightUpdate(Service):
def handle(self):
# The raw message object for ack/reject
msg = self.request.raw_request
# The parsed payload
data = self.request.payload
try:
# Extract flight information
flight_no = data['flight_no']
status = data['status']
gate = data.get('gate')
# Update the flight information display system
conn = self.out.rest['FlightDisplayAPI'].conn
conn.post(self.cid, {
'flight': flight_no,
'status': status,
'gate': gate
})
# Success - acknowledge the message
msg.ack()
except Exception as e:
self.logger.error('Failed to process flight %s: %s', data.get('flight_no'), e)
# Processing failed - reject so it can be retried
msg.reject()
This service processes baggage scan events from Azure Service Bus. Each time a bag is scanned at a checkpoint (check-in, security, loading, etc.), the event is sent to a central baggage tracking API.
from zato.server.service import Service
class BaggageEventHandler(Service):
def handle(self):
msg = self.request.raw_request
data = self.request.payload
event_type = data['event']
bag_tag = data['bag_tag']
location = data['location']
self.logger.info('Baggage %s: %s at %s', event_type, bag_tag, location)
# Notify the baggage tracking system
conn = self.out.rest['BaggageTrackingAPI'].conn
conn.post(self.cid, {
'tag': bag_tag,
'event': event_type,
'location': location,
'timestamp': data['timestamp']
})
# Done processing
msg.ack()
This service handles gate assignment requests. It checks if the requested gate is available before assigning it. If the gate is occupied, the message is rejected and will be retried later when the gate might be free.
from zato.server.service import Service
class GateAssignmentHandler(Service):
def handle(self):
msg = self.request.raw_request
data = self.request.payload
flight_no = data['flight_no']
gate = data['gate']
# Check if the gate is available
conn = self.out.rest['GateManagementAPI'].conn
response = conn.get(self.cid, params={'gate': gate})
if response.data['available']:
# Gate is free - assign it
conn.post(self.cid, {
'flight': flight_no,
'gate': gate,
'action': 'assign'
})
msg.ack()
else:
# Gate is occupied - reject and retry later
self.logger.warning('Gate %s not available for flight %s', gate, flight_no)
msg.reject()
Use the outgoing AMQP connection to send messages to Azure Service Bus queues.
A minimal service that sends a message to an Azure queue:
from zato.server.service import Service
class SendFlightAlert(Service):
def handle(self):
msg = {'flight_no': 'LH123', 'status': 'delayed', 'delay_minutes': 30}
self.out.amqp.send(msg, 'Azure Queue Outgoing', 'flight-alerts')
This service publishes boarding notifications to Azure Service Bus when a flight is ready for boarding. Other systems (mobile apps, display boards, airline staff devices) subscribe to this queue.
from zato.server.service import Service
class PublishBoardingNotification(Service):
def handle(self):
data = self.request.payload
notification = {
'flight_no': data['flight_no'],
'gate': data['gate'],
'boarding_time': data['boarding_time'],
'status': 'now_boarding',
'zones': data.get('zones', ['all'])
}
self.out.amqp.send(notification, 'Azure Queue Outgoing', 'boarding-notifications')
self.logger.info('Published boarding notification for %s at gate %s',
data['flight_no'], data['gate'])
This service sends alerts when baggage requires attention - for example, when a bag is loaded onto the wrong aircraft or when a passenger's bag needs to be offloaded due to a no-show.
from zato.server.service import Service
class SendBaggageAlert(Service):
def handle(self):
data = self.request.payload
alert = {
'alert_type': data['alert_type'],
'bag_tag': data['bag_tag'],
'flight_no': data['flight_no'],
'location': data['location'],
'action_required': data['action_required'],
'priority': data.get('priority', 'normal')
}
self.out.amqp.send(alert, 'Azure Queue Outgoing', 'baggage-alerts')
Book a demo with an expert who will help you build meaningful systems that match your ambitions
"For me, Zato Source is the only technology partner to help with operational improvements."