Pub/sub - API - Zato services

Preliminary materials: Pub/sub architecture.

Overview

From the perspective of Zato service authors, publish/subscribe offers a few methods for publications and reception of messages:

Method Notes
self.pubsub.publish Sends a message to input topic
self.pubsub.subscribe Subscribes to a topic, returning subscription key on output
self.pubsub.get_messages Receives all outstanding messages from input queue by subscription key
self.pubsub.resume_wsx_subscription Invoked on behalf of WebSocket clients to resume delivery of messages for an already existing subscription key after clients reconnect to Zato

Note that get_messages expects a subscription key - this is because there may be multiple subscriptions for each topic so the method needs to know from whose queue to return the messages and it is subscription key that points to each queue.

Security-wise, methods publish and subscribe check if the endpoints given to them on input have correct permissions to topics that the endpoints are about to publish or subscribe to.

On the other hand, get_messages assumes that the service that execute it has already carried out any necessary input validation and authentication or authorization checks, i.e. the method should be used in trusted code paths because it potentially allows one to access any arbitrary topics and subscriptions.

self.pubsub.publish

self.pubsub.publish(topic_name, **kwargs)

Publishes a message to input topic. All parameters are passed in as keyword arguments and all are optional, e.g. message ID does not have to be provided on input unless there is an actual need.

Parameters:
  • data (str) – Business data being published
  • msg_id (str) – Message ID - callers must ensure this is globally unique (such as UUID4)
  • has_gd (bool) – Indicates if message should be covered by Guaranteed Delivery or not
  • priority (int) – Message priority, 1-9 (1=min)
  • expiration (int) – Message expiration in milliseconds
  • mime_type (str) – MIME type of input data
  • correl_id (str) – Correlation ID, if message is part of a broader series of messages
  • in_reply_to (str) – Message ID of a message this message is a response to, if any
  • ext_client_id (str) – An arbitrary string uniquely identifying the client application, or its instance, on whose behalf the method is called
  • ext_pub_time (str) – When the message was sent, as interpreted from the perspective of the calling application
Returns:

Message ID, either automatically assigned or taken from input if it was provided

Return type:

str

# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, unicode_literals

from zato.server.service import Service

class PublishSample(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'
        data = 'My data'
        priority = 9

        # Publish the message
        msg_id = self.pubsub.publish(topic_name, data=data, priority=priority)

        # Log the message ID received
        self.logger.info('Message ID is `%s`', msg_id)
INFO - Message ID is `zpsmc53c2e078eafd1c476b49623`
../../_images/python-publish.png

self.pubsub.subscribe

self.pubsub.subscribe(topic_name, **kwargs)

Subscribes an endpoint to input topic_name.

For non-WebSocket endpoints (e.g. REST, SOAP or AMQP), the endpoint that is about to be subscribed must be provided in input endpoint_name parameter.

For WebSocket endpoints, the subscription will be made for the endpoint that current WebSocket channel is pointing to.

On output, subscription key is returned which is a unique token pointing to this particular subscription. Subscription keys should be treated as secrets and must not be shared.

Note that all the permission checks apply and endpoints used must have correct permissions that allow them to subscribe to input topic_name.

Parameters:
  • topic_name (str) -- Name of topic to subscribe to
  • endpoint_name (str) -- Endpoint to subscribe to the topic (required for non-WebSocket subscriptions)
  • use_current_wsx (bool) -- Indicates that currently used WebSocket channel should be used as the basis for finding the endpoint to subscribe to topic_name (required for WebSocket subscriptions only)
  • service (object) -- Always equal to self (required for WebSocket subscriptions only)
Returns:

Subscription key

Return type:

str

How to subscribe non-WebSocket endpoints

# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, unicode_literals

from zato.server.service import Service

class SubscribeWebSocket(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'
        endpoint_name = 'My REST Endpoint'

        # Subscribe an explicitly named endpoint..
        sub_key = self.pubsub.subscribe(topic_name, endpoint_name=endpoint_name)

        # .. and log the subscription key received.
        self.logger.info('Received sub_key `%s`', sub_key)
INFO - Received sub_key `zpsk.rest.a01be4755a87d6fd24276929`

How to subscribe WebSockets

# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, unicode_literals

from zato.server.service import Service

class SubscribeWebSocket(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'

        # Subscribe current WebSocket ..
        sub_key = self.pubsub.subscribe(topic_name, use_current_wsx=True, service=self)

        # .. and log the subscription key received.
        self.logger.info('Received sub_key `%s`', sub_key)
INFO - Received sub_key `zpsk.websockets.d552ea2b0f2ff7a3815f9282`

self.pubsub.get_messages

self.pubsub.get_messages(topic_name, sub_key)

Returns all messages enqueued for input sub_key which must be associated with an existing subscription for input topic_name.

Parameters:
  • topic_name (str) -- Name of a topic that messages were published to
  • sub_key (str) -- Subscription key to return messages for
Returns:

A list of messages, if any - each element in the list is a dictionary containing data and metadata about the message

Return type:

list of dicts

# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, unicode_literals

from zato.server.service import Service

class GetMessages(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'
        sub_key = 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f'

        # Get all messages available ..
        messages = self.pubsub.get_messages(topic_name, sub_key)

        # .. and log them all.
        self.logger.info('Messages `%s`', messages)
INFO - Messages `[
  {
   u'delivery_count': 0,
   u'has_gd': False,
   u'server_name': u'server1',
   u'topic_name': u'/zato/demo/sample',
   u'pub_time_iso': u'2018-07-04T19:58:08.020493',
   u'priority': 5,
   u'expiration_time_iso': u'2086-07-22T23:12:15.020493',
   u'expiration': 2147483647000,
   u'server_pid': 19218,
   u'size': 27,
   u'data': u'This is a sample message #2',
   u'sub_key': u'zpsk.rest.6b0b83ebea3f6e6a64e8e73f',
   u'mime_type': u'text/plain'},
  {
    u'delivery_count': 0,
    u'has_gd': False,
    u'server_name': u'server1',
    u'topic_name': u'/zato/demo/sample',
    u'pub_time_iso': u'2018-07-04T19:58:01.144177',
    u'priority': 5,
    u'expiration_time_iso': u'2086-07-22T23:12:08.144177',
    u'expiration': 2147483647000,
    u'server_pid': 19218,
    u'size': 24,
    u'data': u'This is a sample message',
    u'sub_key': u'zpsk.rest.6b0b83ebea3f6e6a64e8e73f',
    u'mime_type': u'text/plain'
  }]`

self.pubsub.resume_wsx_subscription

self.pubsub.resume_wsx_subscription(sub_key, service)

Resumes delivery for input sub_key to a previously created subscription of a WebSocket client that reconnected after having its TCP-level connection dropped (e.g. after closing a web browser's window).

Input data is validated and sub_key is checked to have been issued to the same endpoint that the calling WebSocket channel is associated with - if this is not fulfilled, an exception is raised (type Exception should be caught).

Parameters:
  • sub_key (str) -- Subscription key to resume delivery for
  • service (object) -- Service that invokes the method (i.e. self)
Returns:

None

# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, unicode_literals

from zato.server.service import Service

class ResumeWSXSubscription(Service):
    def handle(self):

        # Prepare input
        sub_key = 'zpsk.wsx.6b0b83ebea3f6e6a64e8e73f'

        # Resume delivery for that key,
        # if there is no exception, it means that everything went fine
        self.pubsub.resume_wsx_subscription(sub_key, self)