Schedule a demo

Azure Service Bus

Zato integrates with Microsoft Azure Service Bus, letting you receive and send messages from Azure queues in your Python services.

After adding a connection, such as the one below, your services will be able to receive and send messages without any additional configuration or programming.

Getting credentials from Azure

Assuming you have an Azure Service Bus namespace and a queue created, you need to create a Shared Access Policy to authenticate Zato with Azure Service Bus. You can create one at the namespace level (applies to all queues) or at the queue level (applies only to that queue).

For namespace-level policy:

  1. Open your Service Bus namespace
  2. In the left menu, click Shared access policies
  3. Click + Add
  4. Enter a policy name, e.g. MyPolicyName, and note that it will later become your username in the connection string
  5. Select the permissions:
  6. Listen - to receive messages
  7. Send - to send messages
  8. Click Create

Getting the connection details

  1. Click on your newly created policy and select Show AMQP connection string
  2. Copy the primary connection string, the secondary one is not used

Extracting information from the AMQP connection string

The string will look like this in the screenshot below, and you'll need to extract a couple of things from it:

  1. Protocol - must be amqps
  2. Username - your policy name
  3. Password - password for that policy
  4. Address - your namespace hostname (e.g. mycompany-dev.servicebus.windows.net) and port 5671
  5. "?verify=verify_none" - ignore this, it's not used by Zato

Configuring Zato

In Zato, you create channels that will listen for messages from the bus. To send messages to queues, you create an outgoing connection. Both will rely on the connection details extracted in the previous step.

A single channel listens for messages from a single queue. An outgoing connection, on the other hand, can be used to send messages to any queue by its name.

Channels to receive messages

  1. Go to ConnectionsChannelsAMQP
  2. Click Create a new channel
  3. Fill in the details as in the screenshot below, and click OK

Service demo.input-logger is a useful one because it logs all the messages it receives, so you can send some test messages to your queue, and it will log everything as soon as it receives them.

Outgoing connections to send messages

  1. Go to ConnectionsAMQPOutgoing
  2. Click Create a new outgoing connection
  3. Fill in the details as in the screenshot below, and click OK

Next steps

Once you have configured a channel, Zato will automatically invoke your service for each message received from the Azure queue.

  • The message body (e.g. JSON or other business data) is available in self.request.payload.
  • You access the underlying AMQP message via self.request.raw_request, and that lets you control whether you want to acknowledge or reject a given message.

Acknowledging and rejecting messages

When you receive a message from Azure Service Bus, you control what happens to it:

  • msg.ack() - Acknowledges the message. This removes it from the queue permanently. Use this when you have successfully processed the message.

  • msg.reject() - Rejects the message. This releases it back to the queue so it can be delivered again. Use this when processing failed and you want to receive the message later to try again.

Basic example

A minimal service that receives a message and acknowledges it:

from zato.server.service import Service

class FlightStatusHandler(Service):

    def handle(self):

        # Get the message from Azure Service Bus
        msg = self.request.raw_request

        # Log what we received
        self.logger.info('Flight status update: %s', self.request.payload)

        # Acknowledge the message - removes it from the queue
        msg.ack()

Updating flight information displays

In this example, an airport operations system receives flight status updates from Azure Service Bus and forwards them to a flight information display system (FIDS) via REST API. If the update fails, the message is rejected so it can be retried.

from zato.server.service import Service

class ProcessFlightUpdate(Service):

    def handle(self):

        # The raw message object for ack/reject
        msg = self.request.raw_request

        # The parsed payload
        data = self.request.payload

        try:
            # Extract flight information
            flight_no = data['flight_no']
            status = data['status']
            gate = data.get('gate')

            # Update the flight information display system
            conn = self.out.rest['FlightDisplayAPI'].conn
            conn.post(self.cid, {
                'flight': flight_no,
                'status': status,
                'gate': gate
            })

            # Success - acknowledge the message
            msg.ack()

        except Exception as e:
            self.logger.error('Failed to process flight %s: %s', data.get('flight_no'), e)

            # Processing failed - reject so it can be retried
            msg.reject()

Tracking baggage movements

This service processes baggage scan events from Azure Service Bus. Each time a bag is scanned at a checkpoint (check-in, security, loading, etc.), the event is sent to a central baggage tracking API.

from zato.server.service import Service

class BaggageEventHandler(Service):

    def handle(self):

        msg = self.request.raw_request
        data = self.request.payload

        event_type = data['event']
        bag_tag = data['bag_tag']
        location = data['location']

        self.logger.info('Baggage %s: %s at %s', event_type, bag_tag, location)

        # Notify the baggage tracking system
        conn = self.out.rest['BaggageTrackingAPI'].conn
        conn.post(self.cid, {
            'tag': bag_tag,
            'event': event_type,
            'location': location,
            'timestamp': data['timestamp']
        })

        # Done processing
        msg.ack()

Assigning gates to flights

This service handles gate assignment requests. It checks if the requested gate is available before assigning it. If the gate is occupied, the message is rejected and will be retried later when the gate might be free.

from zato.server.service import Service

class GateAssignmentHandler(Service):

    def handle(self):

        msg = self.request.raw_request
        data = self.request.payload

        flight_no = data['flight_no']
        gate = data['gate']

        # Check if the gate is available
        conn = self.out.rest['GateManagementAPI'].conn
        response = conn.get(self.cid, params={'gate': gate})

        if response.data['available']:
            # Gate is free - assign it
            conn.post(self.cid, {
                'flight': flight_no,
                'gate': gate,
                'action': 'assign'
            })
            msg.ack()
        else:
            # Gate is occupied - reject and retry later
            self.logger.warning('Gate %s not available for flight %s', gate, flight_no)
            msg.reject()

Next steps

Use the outgoing AMQP connection to send messages to Azure Service Bus queues.

Basic example

A minimal service that sends a message to an Azure queue:

from zato.server.service import Service

class SendFlightAlert(Service):

    def handle(self):
        msg = {'flight_no': 'LH123', 'status': 'delayed', 'delay_minutes': 30}
        self.out.amqp.send(msg, 'Azure Queue Outgoing', 'flight-alerts')

Publishing boarding notifications

This service publishes boarding notifications to Azure Service Bus when a flight is ready for boarding. Other systems (mobile apps, display boards, airline staff devices) subscribe to this queue.

from zato.server.service import Service

class PublishBoardingNotification(Service):

    def handle(self):

        data = self.request.payload

        notification = {
            'flight_no': data['flight_no'],
            'gate': data['gate'],
            'boarding_time': data['boarding_time'],
            'status': 'now_boarding',
            'zones': data.get('zones', ['all'])
        }

        self.out.amqp.send(notification, 'Azure Queue Outgoing', 'boarding-notifications')

        self.logger.info('Published boarding notification for %s at gate %s',
            data['flight_no'], data['gate'])

Sending baggage alerts

This service sends alerts when baggage requires attention - for example, when a bag is loaded onto the wrong aircraft or when a passenger's bag needs to be offloaded due to a no-show.

from zato.server.service import Service

class SendBaggageAlert(Service):

    def handle(self):

        data = self.request.payload

        alert = {
            'alert_type': data['alert_type'],
            'bag_tag': data['bag_tag'],
            'flight_no': data['flight_no'],
            'location': data['location'],
            'action_required': data['action_required'],
            'priority': data.get('priority', 'normal')
        }

        self.out.amqp.send(alert, 'Azure Queue Outgoing', 'baggage-alerts')

Next steps


Schedule a meaningful demo

Book a demo with an expert who will help you build meaningful systems that match your ambitions

"For me, Zato Source is the only technology partner to help with operational improvements."

— John Adams
Program Manager of Channel Enablement at Keysight