Prerequisite reading: Pub/sub architecture.
From the perspective of Zato service authors, publish/subscribe offers a few methods for publications and reception of messages:
Method | Notes |
---|---|
self.pubsub.publish | Sends a message to a destination. The destination can be a topic name or service name. |
self.pubsub.subscribe | Subscribes to a topic, returning subscription key on output |
self.pubsub.get_messages | Receives all outstanding messages from input queue by subscription key |
self.pubsub.resume_wsx_subscription | Invoked on behalf of WebSocket clients to resume delivery of messages for an already existing subscription key after clients reconnect to Zato |
Note that get_messages
expects a subscription key - this is because there may be multiple subscriptions
for each topic so the method needs to know from whose queue to return the messages and it is subscription key
that points to each queue.
Security-wise, methods publish
and subscribe
check if the endpoints given to them on input have
correct permissions to topics that the endpoints are about to publish or subscribe to.
On the other hand, get_messages
assumes that the service that execute it has already carried out any necessary
input validation and authentication or authorization checks, i.e. the method should be used in
trusted code paths because it potentially allows one to access any arbitrary topics and subscriptions.
self.pubsub.publish(topic_name, **kwargs)
Publishes a message to input topic. All parameters are passed in as keyword arguments and all are optional, e.g. message ID does not have to be provided on input unless there is an actual need.
data
: str - Business data being publishedmsg_id
: str - Message ID - callers must ensure this is globally unique (such as UUID4)has_gd
: bool - Indicates if message should be covered by Guaranteed Delivery or notpriority
: int - Message priority, 1-9 (1=min)expiration
: int - Message expiration in millisecondsmime_type
: str - MIME type of input datacorrel_id
: str - Correlation ID, if message is part of a broader series of messagesin_reply_to
: str - Message ID of a message this message is a response to, if anyext_client_id
: str - An arbitrary string uniquely identifying the client application, or its instance, on whose behalf the method is calledext_pub_time
: str - When the message was sent, as interpreted from the perspective of the calling applicationReturns
: str - Message ID, either automatically assigned or taken from input if it was provided# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class PublishSample(Service):
def handle(self):
# Prepare input
topic_name = '/zato/demo/sample'
data = 'My data'
priority = 9
# Publish the message
msg_id = self.pubsub.publish(topic_name, data=data, priority=priority)
# Log the message ID received
self.logger.info('Message ID is `%s`', msg_id)
self.pubsub.subscribe(topic_name, **kwargs)
Subscribes an endpoint to input topic_name.
For non-WebSocket endpoints (e.g. REST, SOAP or AMQP), the endpoint that is about to be subscribed must be provided in input
endpoint_name
parameter.
For WebSocket endpoints, the subscription will be made for the endpoint that current WebSocket channel is pointing to.
On output, subscription key is returned which is a unique token pointing to this particular subscription. Subscription keys should be treated as secrets and must not be shared.
Note that all the permission checks apply and endpoints used must have correct permissions that allow them to subscribe to input topic_name.
topic_name
: str - Name of topic to subscribe toendpoint_name
: str - Endpoint to subscribe to the topic (required for non-WebSocket subscriptions)use_current_wsx
: bool - Indicates that currently used WebSocket channel should be used as the basis for finding the endpoint to subscribe to topic_name (required for WebSocket subscriptions only)service
: object - Always equal to self (required for WebSocket subscriptions only)Returns
: str - Subscription key# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class SubscribeWebSocket(Service):
def handle(self):
# Prepare input
topic_name = '/zato/demo/sample'
endpoint_name = 'My REST Endpoint'
# Subscribe an explicitly named endpoint..
sub_key = self.pubsub.subscribe(topic_name, endpoint_name=endpoint_name)
# .. and log the subscription key received.
self.logger.info('Received sub_key `%s`', sub_key)
# -*- coding: utf-8 -*-
from zato.server.service import Service
class SubscribeWebSocket(Service):
def handle(self):
# Prepare input
topic_name = '/zato/demo/sample'
# Subscribe current WebSocket ..
sub_key = self.pubsub.subscribe(topic_name, use_current_wsx=True, service=self)
# .. and log the subscription key received.
self.logger.info('Received sub_key `%s`', sub_key)
self.pubsub.get_messages(topic_name, sub_key)
Returns all messages enqueued for input sub_key which must be associated with an existing subscription for input topic_name.
topic_name
: str - Name of a topic that messages were published tosub_key
: str - Subscription key to return messages forReturns
: A list of dicts, each representing a single message# -*- coding: utf-8 -*-
from zato.server.service import Service
class GetMessages(Service):
def handle(self):
# Prepare input
topic_name = '/zato/demo/sample'
sub_key = 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f'
# Get all messages available ..
messages = self.pubsub.get_messages(topic_name, sub_key)
# .. and log them all.
self.logger.info('Messages `%s`', messages)
INFO - Messages `[
{
'delivery_count': 0,
'has_gd': False,
'server_name': 'server1',
'topic_name': '/zato/demo/sample',
'pub_time_iso': '2022-07-04T19:58:08.020493',
'priority': 5,
'expiration_time_iso': '2086-07-22T23:12:15.020493',
'expiration': 2147483647000,
'server_pid': 19218,
'size': 27,
'data': 'This is a sample message #2',
'sub_key': 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f',
'mime_type': 'text/plain'},
{
'delivery_count': 0,
'has_gd': False,
'server_name': 'server1',
'topic_name': '/zato/demo/sample',
'pub_time_iso': '2022-07-04T19:58:01.144177',
'priority': 5,
'expiration_time_iso': '2086-07-22T23:12:08.144177',
'expiration': 2147483647000,
'server_pid': 19218,
'size': 24,
'data': 'This is a sample message',
'sub_key': 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f',
'mime_type': 'text/plain'
}]`
self.pubsub.resume_wsx_subscription(sub_key, service)
Resumes delivery for input sub_key to a previously created subscription of a WebSocket client that reconnected after having its TCP-level connection dropped (e.g. after closing a web browser's window).
Input data is validated and sub_key is checked to have been issued to the same endpoint that the calling WebSocket channel is associated with - if this is not fulfilled, an exception is raised (type Exception should be expected by callers of this method).
sub_key
: str - Subscription key to resume delivery forservice
: object - Service that invokes the method (i.e. self)Returns
: (None)# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class ResumeWSXSubscription(Service):
def handle(self):
# Prepare input
sub_key = 'zpsk.wsx.6b0b83ebea3f6e6a64e8e73f'
# Resume delivery for that key,
# if there is no exception, it means that everything went fine
self.pubsub.resume_wsx_subscription(sub_key, self)