Pub/sub - API - Services Zato

Lecture prérequise : Architecture Pub/sub.

Vue d'ensemble

Du point de vue des auteurs de services Zato, l'architecture publish/subscribe offre quelques méthodes pour la publication et la réception de messages:

Méthode Notes
self.pubsub.publish Envoie un message à une destination. La destination peut être un nom de sujet ou un nom de service.
self.pubsub.subscribe S'abonne à un sujet et renvoie la clé d'abonnement à la sortie.
self.pubsub.get_messages Reçoit tous les messages en suspens de la file d'attente d'entrée par clé d'abonnement.
self.pubsub.resume_wsx_subscription Appelé au nom des clients WebSocket pour reprendre la livraison des messages pour une clé d'abonnement déjà existante après que les clients se soient reconnectés à Zato.

Notez que get_messages attend une clé d'abonnement - c'est parce qu'il peut y avoir plusieurs abonnements pour chaque sujet, donc la méthode doit savoir à partir de quelle file d'attente retourner les messages et c'est la clé d'abonnement qui pointe vers chaque file d'attente.

Du point de vue de la sécurité, les méthodes publish et subscribe vérifient si les endpoints qui leur sont donnés en entrée ont les permissions correctes pour les sujets que les endpoints sont sur le point de publier ou de s'abonner.

D'autre part, get_messages suppose que le service qui l'exécute a déjà effectué toutes les validations d'entrée nécessaires et les contrôles d'authentification ou d'autorisation, c'est-à-dire que la méthode doit être utilisée dans des chemins de code de confiance car elle permet potentiellement d'accéder à n'importe quels sujets et abonnements arbitraires.

self.pubsub.publish

self.pubsub.publish(topic_name, **kwargs)

Publie un message sur le sujet d'entrée. Tous les paramètres sont passés en tant qu'arguments de mots-clés et tous sont facultatifs, par exemple, l'identifiant du message ne doit pas être fourni en entrée, sauf en cas de besoin réel.

  • data : str - Business data à publier.
  • msg_id : str - ID du message - les appelants doivent s'assurer qu'il est unique au monde (tel que UUID4).
  • has_gd : bool - Indique si le message doit être couvert par la garantie de livraison ou non.
  • priority : int - Priorité du message, 1-9 (1=min)
  • expiration : int - Expiration du message en millisecondes
  • mime_type : str - Type MIME des données d'entrée
  • correl_id : str - ID de corrélation, si le message fait partie d'une série plus large de messages
  • in_reply_to : str - ID du message auquel ce message est une réponse, le cas échéant.
  • ext_client_id : str - Une chaîne arbitraire identifiant de manière unique l'application client, ou son instance, pour laquelle la méthode est appelée.
  • ext_pub_time : str - Date à laquelle l
# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class PublishSample(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'
        data = 'My data'
        priority = 9

        # Publish the message
        msg_id = self.pubsub.publish(topic_name, data=data, priority=priority)

        # Log the message ID received
        self.logger.info('Message ID is `%s`', msg_id)
INFO - Message ID is `zpsmc53c2e078eafd1c476b49623`

self.pubsub.subscribe

self.pubsub.subscribe(topic_name, **kwargs)

Souscrit un endpoint à l'entrée topic_name.

Pour les endpoints non-WebSocket (par exemple REST, SOAP ou AMQP), l'endpoint qui est sur le point d'être abonné doit être fourni dans le paramètre d'entrée endpoint_name.

Pour les points d'extrémité WebSocket, l'abonnement sera effectué pour l'endpoint vers lequel le canal WebSocket actuel pointe vers.

En sortie, la clé d'abonnement est retournée, qui est un token unique pointant vers cet abonnement particulier. Les clés d'abonnement doivent être traitées comme des secrets et ne doivent pas être partagées.

Notez que tous les contrôles de permission s'appliquent et que les endpoints utilisés doivent avoir des permissions correctes qui leur permettent de s'abonner à l'entrée topic_name.

  • topic_name : str - Nom du sujet auquel s'abonner.
  • endpoint_name : str - Endpoint pour s'abonner au sujet (requis pour les abonnements non-WebSocket)
  • use_current_wsx : bool - Indique que le canal WebSocket actuellement utilisé doit être utilisé comme base pour trouver l'endpoint pour s'abonner à topic_name (requis pour les abonnements WebSocket uniquement)
  • `service : object - Toujours égal à self (requis pour les abonnements WebSocket uniquement)
  • Returns : str - Clé d'abonnement

Comment s'abonner à des endpoints non WebSocket

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class SubscribeWebSocket(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'
        endpoint_name = 'My REST Endpoint'

        # Subscribe an explicitly named endpoint..
        sub_key = self.pubsub.subscribe(topic_name, endpoint_name=endpoint_name)

        # .. and log the subscription key received.
        self.logger.info('Received sub_key `%s`', sub_key)
INFO - Received sub_key `zpsk.rest.a01be4755a87d6fd24276929`

Comment s'abonner à des WebSockets

# -*- coding: utf-8 -*-

from zato.server.service import Service

class SubscribeWebSocket(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'

        # Subscribe current WebSocket ..
        sub_key = self.pubsub.subscribe(topic_name, use_current_wsx=True, service=self)

        # .. and log the subscription key received.
        self.logger.info('Received sub_key `%s`', sub_key)
INFO - Received sub_key `zpsk.wsx.d552ea2b0f2ff7a3815f9282`

self.pubsub.get_messages

self.pubsub.get_messages(topic_name, sub_key)

Retourne tous les messages mis en file d'attente pour l'entrée sub_key qui doit être associée à un abonnement existant pour l'entrée topic_name.

  • topic_name : str - Nom du sujet pour lequel les messages ont été publiés.
  • sub_key : str - Clé d'abonnement pour laquelle les messages doivent être retournés.
  • Returns : Une liste de dicts, chacun représentant un seul message.
# -*- coding: utf-8 -*-

from zato.server.service import Service

class GetMessages(Service):
    def handle(self):

        # Prepare input
        topic_name = '/zato/demo/sample'
        sub_key = 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f'

        # Get all messages available ..
        messages = self.pubsub.get_messages(topic_name, sub_key)

        # .. and log them all.
        self.logger.info('Messages `%s`', messages)
INFO - Messages `[
  {
    'delivery_count': 0,
    'has_gd': False,
    'server_name': 'server1',
    'topic_name': '/zato/demo/sample',
    'pub_time_iso': '2022-07-04T19:58:08.020493',
    'priority': 5,
    'expiration_time_iso': '2086-07-22T23:12:15.020493',
    'expiration': 2147483647000,
    'server_pid': 19218,
    'size': 27,
    'data': 'This is a sample message #2',
    'sub_key': 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f',
    'mime_type': 'text/plain'},
  {
    'delivery_count': 0,
    'has_gd': False,
    'server_name': 'server1',
    'topic_name': '/zato/demo/sample',
    'pub_time_iso': '2022-07-04T19:58:01.144177',
    'priority': 5,
    'expiration_time_iso': '2086-07-22T23:12:08.144177',
    'expiration': 2147483647000,
    'server_pid': 19218,
    'size': 24,
    'data': 'This is a sample message',
    'sub_key': 'zpsk.rest.6b0b83ebea3f6e6a64e8e73f',
    'mime_type': 'text/plain'
  }]`

self.pubsub.resume_wsx_subscription

self.pubsub.resume_wsx_subscription(sub_key, service)

Reprend la livraison pour l'entrée sub_key à un abonnement précédemment créé d'un client WebSocket qui s'est reconnecté après que sa connexion au niveau TCP ait été abandonnée (par exemple, après la fermeture de la fenêtre d'un navigateur Web).

Les données d'entrée sont validées et sub_key est vérifié pour avoir été émis au même endpoint que le canal WebSocket appelant est associé - si ce n'est pas le cas, une exception est levée (le type Exception doit être attendu par les appelants de cette méthode).

  • sub_key : str - Clé d'abonnement à reprendre la livraison pour
  • service : object - Service qui invoque la méthode (i.e. self)
  • Returns : (None)
# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class ResumeWSXSubscription(Service):
    def handle(self):

        # Prepare input
        sub_key = 'zpsk.wsx.6b0b83ebea3f6e6a64e8e73f'

        # Resume delivery for that key,
        # if there is no exception, it means that everything went fine
        self.pubsub.resume_wsx_subscription(sub_key, self)

Sujets connexes