If you're new to the publish/subscribe pattern, start with What is publish/subscribe?
Zato's publish/subscribe functionality is accessible from Python code in two distinct ways, each suited to different integration scenarios.
Services running within the Zato platform can use the built-in self.publish method to send messages directly to topics without any HTTP overhead. This approach provides the most efficient path for broadcasting events from your integration layer to external systems and applications that subscribe to those topics.
External Python applications, on the other hand, interact with the pub/sub system through the REST API. Using standard libraries such as requests, these applications can publish messages and retrieve messages from their queues over HTTP. This approach enables any Python application - whether it's a web application, a background worker, or a microservice - to participate in your event-driven architecture.
The simplest way to publish a message from a Zato service is to call self.publish with a topic name and the data to send. The method returns a PublishResult object whose msg_id attribute contains the unique identifier assigned to the message. The system automatically generates all required metadata including timestamps and expiration time.
# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class MyService(Service):
def handle(self):
# Prepare topic and data
topic_name = 'demo.1'
data = {
'order_id': 12345,
'status': 'completed'
}
# Publish a message to a topic
result = self.publish(topic_name, data)
# Log the message ID
self.logger.info('Published msg_id: %s', result.msg_id)
When you need more control over message delivery and identification, you can provide custom metadata:
priority - controls message ordering (higher values are delivered first)expiration - sets how long the message remains available (in seconds)cid - correlation ID that ties the published message to the current service invocation for end-to-end tracingin_reply_to - message ID this message is a reply toext_client_id - external client identifier for tracingpublisher - publisher name overridepub_time - custom publish timestamp (ISO format)# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class MyService(Service):
def handle(self):
# Prepare topic and data
topic_name = 'demo.1'
data = {
'order_id': 12345,
'status': 'completed'
}
# Prepare metadata
priority = 7
expiration = 3600
# Publish a message with custom metadata
result = self.publish(
topic_name,
data,
cid=self.cid,
priority=priority,
expiration=expiration,
)
# Log the message ID
self.logger.info('Published msg_id: %s', result.msg_id)
Instead of building a dict explicitly, you can pass payload fields directly as keyword arguments. The system automatically separates payload keys from metadata keys (like priority or expiration) - non-metadata keys form the payload dict that the subscriber receives.
# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class MyService(Service):
def handle(self):
# Publish with inline kwargs - the subscriber will receive
# a dict payload of {'order_id': 123, 'status': 'pending'}
result = self.publish('demo.1', order_id=123, status='pending')
# Log the message ID
self.logger.info('Published msg_id: %s', result.msg_id)
Metadata and payload keys can be mixed freely:
# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class MyService(Service):
def handle(self):
# Here, priority is consumed as metadata while order_id
# and status become the payload dict
result = self.publish(
'demo.1',
order_id=123,
status='pending',
priority=7,
expiration=3600,
)
# Log the message ID
self.logger.info('Published msg_id: %s', result.msg_id)
When a service publishes a Model (dataclass) instance, the system automatically serializes it and stores its class name. On the subscriber side, the original Model class is reconstructed - the subscriber receives the same Model type, not a plain dict.
# -*- coding: utf-8 -*-
# stdlib
from dataclasses import dataclass
# Zato
from zato.common.marshal_.api import Model
from zato.server.service import Service
@dataclass(init=False)
class OrderEvent(Model):
order_id: str = ''
status: str = ''
amount: float = 0.0
class PublisherService(Service):
def handle(self):
# Build a Model instance ..
event = OrderEvent()
event.order_id = 'ORD-001'
event.status = 'completed'
event.amount = 99.95
# .. and publish it. The subscriber will receive
# an OrderEvent instance, not a plain dict.
result = self.publish('orders.events', event)
# Log the message ID
self.logger.info('Published msg_id: %s', result.msg_id)
The subscribing service receives a fully reconstructed Model:
# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class SubscriberService(Service):
def handle(self):
# self.request.raw_request is an OrderEvent instance
event = self.request.raw_request
self.logger.info('Order: %s', event.order_id)
self.logger.info('Status: %s', event.status)
self.logger.info('Amount: %s', event.amount)
# -*- coding: utf-8 -*-
# stdlib
import logging
from logging import getLogger, INFO
# requests
import requests
logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)
base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)
topic_name = 'demo.1'
url = f'{base_url}/pubsub/topic/{topic_name}'
request = {
'data': {
'order_id': 12345,
'status': 'completed'
}
}
response = requests.post(url, json=request, auth=auth)
result = response.json()
logger.info('Response: %s', result)
is_ok = result['is_ok']
msg_id = result['msg_id']
logger.info('Published: %s', is_ok)
logger.info('Message ID: %s', msg_id)
# -*- coding: utf-8 -*-
# stdlib
import logging
from logging import getLogger, INFO
# requests
import requests
logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)
base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)
url = f'{base_url}/pubsub/messages/get'
response = requests.post(url, auth=auth)
result = response.json()
logger.info('Response: %s', result)
message_count = result['message_count']
messages = result['messages']
suffix = 'message' if message_count == 1 else 'messages'
logger.info('Retrieved %s %s', message_count, suffix)
for message in messages:
meta = message['meta']
data = message['data']
topic_name = meta['topic_name']
priority = meta['priority']
logger.info('Topic: %s', topic_name)
logger.info('Data: %s', data)
logger.info('Priority: %s', priority)
logger.info('---')
# -*- coding: utf-8 -*-
# stdlib
import logging
from logging import getLogger, INFO
from time import sleep
# requests
import requests
logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)
base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)
topic_name = 'demo.1'
# Publish a message
publish_url = f'{base_url}/pubsub/topic/{topic_name}'
request = {
'data': {
'order_id': 12345,
'status': 'completed'
}
}
response = requests.post(publish_url, json=request, auth=auth)
result = response.json()
logger.info('Response: %s', result)
msg_id = result['msg_id']
logger.info('Published: %s', msg_id)
# Wait a moment for message to be routed
sleep(0.5)
# Get messages
get_url = f'{base_url}/pubsub/messages/get'
response = requests.post(get_url, auth=auth)
result = response.json()
logger.info('Response: %s', result)
message_count = result['message_count']
messages = result['messages']
suffix = 'message' if message_count == 1 else 'messages'
logger.info('Retrieved %s %s', message_count, suffix)
for message in messages:
data = message['data']
logger.info('Data: %s', data)
The result will be:
Published: zpsm.20251011-171049-9451-66b0c25a2d900fb5f
Retrieved 1 message
Data: {'order_id': 12345, 'status': 'completed'}
# -*- coding: utf-8 -*-
# stdlib
import logging
from logging import getLogger, INFO
# requests
import requests
logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)
base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)
url = f'{base_url}/pubsub/topic/demo.1'
request = {
'data': 'Order processed'
}
response = requests.post(url, json=request, auth=auth)
result = response.json()
logger.info('Response: %s', result)
is_ok = result['is_ok']
if is_ok:
msg_id = result['msg_id']
logger.info('Success: %s', msg_id)
else:
details = result['details']
logger.info('Error: %s', details)
What Event-Driven Architecture is and how it helps in systems integrations
Understanding the concepts of topics that messages are published to and queues that messages are read from
How APIs and systems can communicate with the broker to publish and subscribe to their messages
How to grant and secure access to your topics and message queues