Publish/subscribe - Python API

Overview

From the perspective of Zato service authors, publish/subscribe offers a few methods for publications and reception of messages:

MethodNotes
self.pubsub.publishSends a message to a destination. The destination can be a topic name or service name.
self.pubsub.get_messagesReceives all outstanding messages from input queue by subscription key
self.pubsub.subscribeSubscribes to a topic, returning subscription key on output
self.pubsub.resume_wsx_subscriptionInvoked on behalf of WebSocket clients to resume delivery of messages for an already existing subscription key after clients reconnect to Zato

Note that get_messages expects a subscription key - this is because there may be multiple subscriptions for each topic so the method needs to know from whose queue to return the messages and it is subscription key that points to each queue.

Security-wise, method subscribe check if the endpoint given to it on input has correct permissions to topics that the endpoint is about to subscribe to.

On the other hand, publish and get_messages assume that the service that execute them has already carried out any necessary input validation and authentication or authorization checks, i.e. the methods should be used in trusted code paths because they potentially allow one to access any arbitrary topics and subscriptions.

self.pubsub.publish

self.pubsub.publish(topic_name, **kwargs)

Publishes a message to input topic. Except for topic_name, all parameters are passed in as keyword arguments and all are optional, e.g. message ID does not have to be provided on input unless there is an actual need for it.

  • data: str - Business data being published
  • msg_id: str - Message ID - callers must ensure this is globally unique (such as UUID4)
  • has_gd: bool - Indicates if message should be covered by Guaranteed Delivery or not
  • priority: int - Message priority, 1-9 (1=min)
  • expiration: int - Message expiration in milliseconds
  • mime_type: str - MIME type of input data
  • correl_id: str - Correlation ID, if message is part of a broader series of messages
  • in_reply_to: str - Message ID of a message this message is a response to, if any
  • ext_client_id: str - An arbitrary string uniquely identifying the client application, or its instance, on whose behalf the method is called
  • ext_pub_time: str - When the message was sent, as interpreted from the perspective of the calling application
  • Returns: str - Message ID, either automatically assigned or taken from input if it was provided
# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class PublishSample(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'
        data = 'My data'
        priority = 9

        # Publish the message
        msg_id = self.pubsub.publish(topic_name, data=data, priority=priority)

        # Log the message ID received
        self.logger.info('Message ID is `%s`', msg_id)
INFO - Message ID is `zpsmc53c2e078eafd1c476b49623`

self.pubsub.get_messages

self.pubsub.get_messages(topic_name, sub_key)

Returns all messages enqueued for input sub_key which must be associated with an existing subscription for input topic_name.

  • topic_name: str - Name of a topic that messages were published to
  • sub_key: str - Subscription key to return messages for
  • Returns: A list of dicts, each representing a single message
# -*- coding: utf-8 -*-

from zato.server.service import Service

class GetMessages(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'
        sub_key = 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f'

        # Get all messages available ..
        messages = self.pubsub.get_messages(topic_name, sub_key)

        # .. and log them all.
        self.logger.info('Messages `%s`', messages)
INFO - Messages `[
  {
    'delivery_count': 0,
    'has_gd': False,
    'server_name': 'server1',
    'topic_name': '/zato/demo/sample',
    'pub_time_iso': '2024-07-04T19:58:08.020493',
    'priority': 5,
    'expiration_time_iso': '2086-07-22T23:12:15.020493',
    'expiration': 2147483647000,
    'server_pid': 19218,
    'size': 27,
    'data': 'This is a sample message #2',
    'sub_key': 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f',
    'mime_type': 'text/plain'},
  {
    'delivery_count': 0,
    'has_gd': False,
    'server_name': 'server1',
    'topic_name': '/zato/demo/sample',
    'pub_time_iso': '2024-07-04T19:58:01.144177',
    'priority': 5,
    'expiration_time_iso': '2086-07-22T23:12:08.144177',
    'expiration': 2147483647000,
    'server_pid': 19218,
    'size': 24,
    'data': 'This is a sample message',
    'sub_key': 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f',
    'mime_type': 'text/plain'
  }]`

self.pubsub.subscribe

self.pubsub.subscribe(topic_name, **kwargs)

Subscribes an endpoint to input topic_name.

For non-WebSocket endpoints (e.g. REST), the endpoint that is about to be subscribed must be provided in input endpoint_name parameter.

For WebSocket endpoints, the subscription will be made for the endpoint that current WebSocket channel is pointing to.

On output, subscription key is returned which is a unique token pointing to this particular subscription. Subscription keys should be treated as secrets and must not be shared.

Note that all the permission checks apply and endpoints used must have correct permissions that allow them to subscribe to input topic_name.

  • topic_name: str - Name of topic to subscribe to
  • endpoint_name: str - Endpoint to subscribe to the topic (required for non-WebSocket subscriptions)
  • use_current_wsx: bool - Indicates that currently used WebSocket channel should be used as the basis for finding the endpoint to subscribe to topic_name (required for WebSocket subscriptions only)
  • service: object - Always equal to self (required for WebSocket subscriptions only)
  • Returns: str - Subscription key

How to subscribe non-WebSocket endpoints

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class SubscribeWebSocket(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'
        endpoint_name = 'My REST Endpoint'

        # Subscribe an explicitly named endpoint..
        sub_key = self.pubsub.subscribe(topic_name, endpoint_name=endpoint_name)

        # .. and log the subscription key received.
        self.logger.info('Received sub_key `%s`', sub_key)
INFO - Received sub_key `zpsk.rest.a01be4755a87d6fd24276929`

How to subscribe WebSockets

# -*- coding: utf-8 -*-

from zato.server.service import Service

class SubscribeWebSocket(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'

        # Subscribe current WebSocket ..
        sub_key = self.pubsub.subscribe(topic_name, use_current_wsx=True, service=self)

        # .. and log the subscription key received.
        self.logger.info('Received sub_key `%s`', sub_key)
INFO - Received sub_key `zpsk.wsx.d552ea2b0f2ff7a3815f9282`

self.pubsub.resume_wsx_subscription

self.pubsub.resume_wsx_subscription(sub_key, service)

Resumes delivery for input sub_key to a previously created subscription of a WebSocket client that reconnected after having its TCP-level connection dropped (e.g. after closing a web browser's window).

Input data is validated and sub_key is checked to have been issued to the same endpoint that the calling WebSocket channel is associated with - if this is not fulfilled, an exception is raised (type Exception should be expected by callers of this method).

  • sub_key: str - Subscription key to resume delivery for
  • service: object - Service that invokes the method (i.e. self)
  • Returns: (None)
# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class ResumeWSXSubscription(Service):
    def handle(self):

        # Prepare input
        sub_key = 'zpsk.wsx.6b0b83ebea3f6e6a64e8e73f'

        # Resume delivery for that key,
        # if there is no exception, it means that everything went fine
        self.pubsub.resume_wsx_subscription(sub_key, self)