Publish/subscribe - Python API

Overview

Zato's publish/subscribe functionality is accessible from Python code in two distinct ways, each suited to different integration scenarios.

  • Services running within the Zato platform can use the built-in self.publish method to send messages directly to topics without any HTTP overhead. This approach provides the most efficient path for broadcasting events from your integration layer to external systems and applications that subscribe to those topics.

  • External Python applications, on the other hand, interact with the pub/sub system through the REST API. Using standard libraries such as requests, these applications can publish messages and retrieve messages from their queues over HTTP. This approach enables any Python application - whether it's a web application, a background worker, or a microservice - to participate in your event-driven architecture.

Zato services

Publishing messages

The simplest way to publish a message from a Zato service is to call self.pubsub.publish with a topic name and the data to send. The system automatically generates all required metadata including message ID, timestamps, and expiration time.

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class MyService(Service):

    def handle(self):

        # Prepare topic and data
        topic_name = 'demo.1'
        data = {
            'order_id': 12345,
            'status': 'completed'
        }

        # Publish a message to a topic
        self.pubsub.publish(topic_name, data)

Publishing with metadata

When you need more control over message delivery and identification, you can provide custom metadata:

  • msg_id - specify your own message identifier for tracking purposes
  • priority - controls message ordering (higher values are delivered first)
  • expiration - sets how long the message remains available (in seconds)
  • cid - correlation ID that ties the published message to the current service invocation for end-to-end tracing
# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class MyService(Service):

    def handle(self):

        # Prepare topic and data
        topic_name = 'demo.1'
        data = {
            'order_id': 12345,
            'status': 'completed'
        }

        # Prepare metadata
        msg_id = 'my-custom-msg-id'
        priority = 7
        expiration = 3600

        # Publish a message with custom metadata
        self.pubsub.publish(
            topic_name,
            data,
            msg_id=msg_id,
            cid=self.cid,
            priority=priority,
            expiration=expiration,
        )

External Python applications

Publishing messages

# -*- coding: utf-8 -*-

# stdlib
import logging
from logging import getLogger, INFO

# requests
import requests

logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)

base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)

topic_name = 'demo.1'
url = f'{base_url}/pubsub/topic/{topic_name}'

request = {
    'data': {
        'order_id': 12345,
        'status': 'completed'
    }
}

response = requests.post(url, json=request, auth=auth)
result = response.json()

logger.info('Response: %s', result)

is_ok = result['is_ok']
msg_id = result['msg_id']

logger.info('Published: %s', is_ok)
logger.info('Message ID: %s', msg_id)

Getting messages from queues

# -*- coding: utf-8 -*-

# stdlib
import logging
from logging import getLogger, INFO

# requests
import requests

logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)

base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)

url = f'{base_url}/pubsub/messages/get'

response = requests.post(url, auth=auth)
result = response.json()

logger.info('Response: %s', result)

message_count = result['message_count']
messages = result['messages']

suffix = 'message' if message_count == 1 else 'messages'
logger.info('Retrieved %s %s', message_count, suffix)

for message in messages:
    meta = message['meta']
    data = message['data']

    topic_name = meta['topic_name']
    priority = meta['priority']

    logger.info('Topic: %s', topic_name)
    logger.info('Data: %s', data)
    logger.info('Priority: %s', priority)
    logger.info('---')

Complete example

# -*- coding: utf-8 -*-

# stdlib
import logging
from logging import getLogger, INFO
from time import sleep

# requests
import requests

logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)

base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)
topic_name = 'demo.1'

# Publish a message
publish_url = f'{base_url}/pubsub/topic/{topic_name}'
request = {
    'data': {
        'order_id': 12345,
        'status': 'completed'
    }
}
response = requests.post(publish_url, json=request, auth=auth)
result = response.json()

logger.info('Response: %s', result)

msg_id = result['msg_id']
logger.info('Published: %s', msg_id)

# Wait a moment for message to be routed
sleep(0.5)

# Get messages
get_url = f'{base_url}/pubsub/messages/get'
response = requests.post(get_url, auth=auth)
result = response.json()

logger.info('Response: %s', result)

message_count = result['message_count']
messages = result['messages']

suffix = 'message' if message_count == 1 else 'messages'
logger.info('Retrieved %s %s', message_count, suffix)
for message in messages:
    data = message['data']
    logger.info('Data: %s', data)

The result will be:

Published: zpsm.20251011-171049-9451-66b0c25a2d900fb5f
Retrieved 1 message
Data: {'order_id': 12345, 'status': 'completed'}

Error handling

# -*- coding: utf-8 -*-

# stdlib
import logging
from logging import getLogger, INFO

# requests
import requests

logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)

base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)

url = f'{base_url}/pubsub/topic/demo.1'
request = {
    'data': 'Order processed'
}

response = requests.post(url, json=request, auth=auth)
result = response.json()

logger.info('Response: %s', result)

is_ok = result['is_ok']

if is_ok:
    msg_id = result['msg_id']
    logger.info('Success: %s', msg_id)
else:
    details = result['details']
    logger.info('Error: %s', details)

Getting started with EDA

Using EDA