API publish/subscribe between Zato services

One of the additions in the upcoming Zato 3.2 release of is an extension to its publish/subscribe mechanism that lets services publish messages directly to other services. Let's check how to use it and how it compares to other means of invoking one's API services.

How does it work?

In your Zato service, you can publish a message to any other services as below. Simply point self.pubsub.publish to the target service by the latter's name and it will receive your message.

Here, we have two services - my.source publishes a message to my.target:

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Int, Service

# ############################################################################

# For code completion
if 0:
    from zato.common.pubsub import PubSubMessage
    PubSubMessage = PubSubMessage # For flake8

# ############################################################################

class MySource(Service):
    name = 'my.source'

    def handle(self):
        self.pubsub.publish('my.target', data={'abc':'Hello World'})

# ############################################################################

class MyTarget(Service):
    name = 'my.target'

    def handle(self):

        # Our message read from a topic ..
        msg = self.request.raw_request # type: PubSubMessage

        # .. let's log its contents now.
        self.logger.info('My message is %s', msg.data)

# ############################################################################

In server.log:

INFO - My message is {'abc': 'Hello World'}

This looks straightforward, and it is in usage, but there is a lot going on under the hood:

  • When you publish a message to a service, a topic and subscriptions are created automatically for the target service

  • The message that you publish is stored in a persistent database

  • The message is then delivered to your service asynchronously

  • Now comes the first crucial point, if your service raises an exception for any reason, the message is understood not to have been delivered safely and Zato will retry its delivery

  • Another key point is that, because the message is kept in persistent storage, it will be still available for Zato to deliver it even if you stop all of your servers - when starting, they will enqueue any undelivered messages to repeat their delivery

In other words, Zato services can participate in publish/subscribe just like other endpoints, e.g. REST, SOAP, AMQP or WebSockets.

For instance, we can use Zato Dashboard to check the topic - what its current depth is, what the last message was, or to browse messages enqueued for the subscriber service.

Let's compare pub/sub with other methods of communication between services.

How does it differ from self.invoke?

You use self.invoke to invoke another service synchronously, within the same operating system's process. If the target service raises an error, you get a live Python exception object in the source service.

How does it differ from self.invoke_async?

Using self.invoke_async lets you send a message to another service asynchronously, which is similar to what self.pubsub.publish can do. However, self.invoke_async operates in RAM only.

On one hand, this means that it is much more efficient than publish/subscribe.

Yet, on the other hand, this means that if a server sending a message to a service using self.invoke_async is shut down, the message is lost irrevocably because it only ever exists in RAM.

How does it differ from self.patterns?

Several integration patterns can be accessed through self.patterns - fan-out/fan-in, parallel execution, invoke/retry and async invocation with a callback.

All of them can be used to form and support complex integration scenarios and what all of them have in common is that they work on messages that exist in RAM only, whereas publish/subscribe uses persistent storage.

When to choose which?

As usual, there is not one choice to cover all needs but several guidelines can be found:

  • Use self.invoke if you need immediate feedback (an exception) if the service that you invoke fails for any reason. For instance, get-like services (GetClient, GetInvoice etc.) typically fall into this category.

  • Use self.invoke_async if you need asynchronous invocations and it is acceptable that a message can be dropped in case of a server restart. This will work in cases when it is always possible for the initial application or service to retry the transmission.

  • Use self.patterns if you need to build more advanced scenarios but keep in mind that all the messages exist in RAM only.

  • Use self.pubsub.publish if you need the highest degree of durability and retransmissions are not possible; all messages are always saved in a database before they are delivered which means that they are not lost if servers restart.

Finally, keep in mind that you can always mix them all - a service can invoke other services using, for instance, parallel invoke which in turn publishes messages to other services that, on their part, invoke more services using self.invoke - this kind of interactions is very well possible too.