Blog
From the perspective of Zato service authors, publish/subscribe offers a few methods for publications and reception of messages:
Method | Notes |
---|---|
self.pubsub.publish | Sends a message to a destination. The destination can be a topic name or service name. |
self.pubsub.get_messages | Receives all outstanding messages from input queue by subscription key |
self.pubsub.subscribe | Subscribes to a topic, returning subscription key on output |
self.pubsub.resume_wsx_subscription | Invoked on behalf of WebSocket clients to resume delivery of messages for an already existing subscription key after clients reconnect to Zato |
Note that get_messages
expects a subscription key - this is because there may be multiple subscriptions for each topic so the method needs to know from whose queue to return the messages and it is subscription key that points to each queue.
Security-wise, method subscribe
check if the endpoint given to it on input has correct permissions to topics that the endpoint is about to subscribe to.
On the other hand, publish
and get_messages
assume that the service that execute them has already carried out any necessary input validation and authentication or authorization checks, i.e. the methods should be used in trusted code paths because they potentially allow one to access any arbitrary topics and subscriptions.
self.pubsub.publish(topic_name, **kwargs)
Publishes a message to input topic. Except for topic_name, all parameters are passed in as keyword arguments and all are optional, e.g. message ID does not have to be provided on input unless there is an actual need for it.
data
: str - Business data being publishedmsg_id
: str - Message ID - callers must ensure this is globally unique (such as UUID4)has_gd
: bool - Indicates if message should be covered by Guaranteed Delivery or notpriority
: int - Message priority, 1-9 (1=min)expiration
: int - Message expiration in millisecondsmime_type
: str - MIME type of input datacorrel_id
: str - Correlation ID, if message is part of a broader series of messagesin_reply_to
: str - Message ID of a message this message is a response to, if anyext_client_id
: str - An arbitrary string uniquely identifying the client application, or its instance, on whose behalf the method is calledext_pub_time
: str - When the message was sent, as interpreted from the perspective of the calling applicationReturns
: str - Message ID, either automatically assigned or taken from input if it was provided# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class PublishSample(Service):
def handle(self):
# Prepare input
topic_name = '/zato/demo/sample'
data = 'My data'
priority = 9
# Publish the message
msg_id = self.pubsub.publish(topic_name, data=data, priority=priority)
# Log the message ID received
self.logger.info('Message ID is `%s`', msg_id)
self.pubsub.get_messages(topic_name, sub_key)
Returns all messages enqueued for input sub_key which must be associated with an existing subscription for input topic_name.
topic_name
: str - Name of a topic that messages were published tosub_key
: str - Subscription key to return messages forReturns
: A list of dicts, each representing a single message# -*- coding: utf-8 -*-
from zato.server.service import Service
class GetMessages(Service):
def handle(self):
# Prepare input
topic_name = '/zato/demo/sample'
sub_key = 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f'
# Get all messages available ..
messages = self.pubsub.get_messages(topic_name, sub_key)
# .. and log them all.
self.logger.info('Messages `%s`', messages)
INFO - Messages `[
{
'delivery_count': 0,
'has_gd': False,
'server_name': 'server1',
'topic_name': '/zato/demo/sample',
'pub_time_iso': '2024-07-04T19:58:08.020493',
'priority': 5,
'expiration_time_iso': '2086-07-22T23:12:15.020493',
'expiration': 2147483647000,
'server_pid': 19218,
'size': 27,
'data': 'This is a sample message #2',
'sub_key': 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f',
'mime_type': 'text/plain'},
{
'delivery_count': 0,
'has_gd': False,
'server_name': 'server1',
'topic_name': '/zato/demo/sample',
'pub_time_iso': '2024-07-04T19:58:01.144177',
'priority': 5,
'expiration_time_iso': '2086-07-22T23:12:08.144177',
'expiration': 2147483647000,
'server_pid': 19218,
'size': 24,
'data': 'This is a sample message',
'sub_key': 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f',
'mime_type': 'text/plain'
}]`
self.pubsub.subscribe(topic_name, **kwargs)
Subscribes an endpoint to input topic_name.
For non-WebSocket endpoints (e.g. REST), the endpoint that is about to be subscribed must be provided in input endpoint_name
parameter.
For WebSocket endpoints, the subscription will be made for the endpoint that current WebSocket channel is pointing to.
On output, subscription key is returned which is a unique token pointing to this particular subscription. Subscription keys should be treated as secrets and must not be shared.
Note that all the permission checks apply and endpoints used must have correct permissions that allow them to subscribe to input topic_name.
topic_name
: str - Name of topic to subscribe toendpoint_name
: str - Endpoint to subscribe to the topic (required for non-WebSocket subscriptions)use_current_wsx
: bool - Indicates that currently used WebSocket channel should be used as the basis for finding the endpoint to subscribe to topic_name (required for WebSocket subscriptions only)service
: object - Always equal to self (required for WebSocket subscriptions only)Returns
: str - Subscription key# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class SubscribeWebSocket(Service):
def handle(self):
# Prepare input
topic_name = '/zato/demo/sample'
endpoint_name = 'My REST Endpoint'
# Subscribe an explicitly named endpoint..
sub_key = self.pubsub.subscribe(topic_name, endpoint_name=endpoint_name)
# .. and log the subscription key received.
self.logger.info('Received sub_key `%s`', sub_key)
# -*- coding: utf-8 -*-
from zato.server.service import Service
class SubscribeWebSocket(Service):
def handle(self):
# Prepare input
topic_name = '/zato/demo/sample'
# Subscribe current WebSocket ..
sub_key = self.pubsub.subscribe(topic_name, use_current_wsx=True, service=self)
# .. and log the subscription key received.
self.logger.info('Received sub_key `%s`', sub_key)
self.pubsub.resume_wsx_subscription(sub_key, service)
Resumes delivery for input sub_key to a previously created subscription of a WebSocket client that reconnected after having its TCP-level connection dropped (e.g. after closing a web browser's window).
Input data is validated and sub_key is checked to have been issued to the same endpoint that the calling WebSocket channel is associated with - if this is not fulfilled, an exception is raised (type Exception should be expected by callers of this method).
sub_key
: str - Subscription key to resume delivery forservice
: object - Service that invokes the method (i.e. self)Returns
: (None)# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class ResumeWSXSubscription(Service):
def handle(self):
# Prepare input
sub_key = 'zpsk.wsx.6b0b83ebea3f6e6a64e8e73f'
# Resume delivery for that key,
# if there is no exception, it means that everything went fine
self.pubsub.resume_wsx_subscription(sub_key, self)
Introducing the security mechanisms that all pub/sub endpoints use
How arbitrary REST-based applications can participate in publish/subscribe scenarios
Understanding publication and subscription patterns that permit endpoints to publish and receive messages
How to publish and receive messages using Python code