Modèles: Fan-out/Fan-in

Vue d'ensemble

Fan-out/fan-in permet aux services d'invoquer un nombre arbitraire d'autres services en fournissant un ensemble de services de rappel (callback) à exécuter une fois que tous les services cibles sont terminés.

En option, des rappels supplémentaires peuvent être invoqués chaque fois qu'un service de la liste cible originale termine son exécution.

Les services cibles s'exécutent en parallèle, de manière asynchrone, et ne sont pas informés de la progression de l'exécution des autres services. Tous les rappels configurés s'exécutent également de manière asynchrone sans se bloquer mutuellement.

Le schéma ci-dessous illustre la fonctionnalité dans toute sa portée. Un service source émet des ventilations vers deux services cibles, chacun avec une liste de ses callbacks d'achèvement. Lorsque les deux services cibles terminent leur exécution, le flux de traitement s'étend vers une liste finale de rappels. Ainsi, les callbacks finaux peuvent être certains que toutes les cibles ont déjà été invoquées.

Exemples d'utilisation et API

Usage courant

Dans le cas le plus courant, l'utilisation sera la suivante. Un dictionnaire contenant les cibles et leurs requêtes ainsi qu'une liste de callbacks est fourni à self.patterns.fanout.invoke. Toutes les cibles sont invoquées en arrière-plan et, une fois qu'elles sont toutes terminées, chacun des callbacks est invoqué.

from zato.server.service import Service

class MyService(Service):

    def handle(self):

        # A dictionary of services to invoke along with requests they receive
        targets = {
            'service1': {'hello':'from-fan-out1'},
            'service2': {'hello':'from-fan-out2'},
        }

        # Callbacks to invoke when both services above finish
        callbacks = ['my.callback1', 'my.callback2']

        # Call's Correlation ID is returned on output
        cid = self.patterns.fanout.invoke(targets, callbacks)

Callbacks d'achèvement sur cible

Ici, des callbacks supplémentaires sont exécutés après l'achèvement de chacune des cibles. Ils s'ajoutent aux rappels finaux qui sont exécutés, que les callbacks de cibles soient utilisés ou non.

Notez que les callbacks de cible sont toujours les mêmes pour chaque cible, il n'y a aucun moyen de spécifier des rappels de cible distincts pour les cibles individuellement. Cependant, les callbacks peuvent distinguer les services cibles grâce aux métadonnées qu'ils reçoivent en entrée et qui contiennent le nom de la cible.

from zato.server.service import Service

class MyService(Service):

    def handle(self):

        # A dictionary of services to invoke along with requests they receive
        targets = {
            'service1': {'hello':'from-fan-out1'},
            'service2': {'hello':'from-fan-out2'},
        }

        # Callbacks to invoke each time one of the services above completes
        target_callbacks = ['my.target.callback1', 'my.target.callback2']

        # Callbacks to invoke when both services above finish
        final_callbacks = ['my.callback1', 'my.callback2']

        # Call's Correlation ID is returned on output
        cid = self.patterns.fanout.invoke(targets, final_callbacks, target_callbacks)

ID de corrélation définis par l'utilisateur

Les utilisateurs peuvent fournir leurs propres ID de corrélation pour invoquer les services. Il est de la responsabilité de l'utilisateur de s'assurer que les ID sont suffisamment uniques et qu'il n'y aura pas de doublons, si cela se produit : les résultats sont indéfinis.

from zato.server.service import Service

class MyService(Service):

    def handle(self):

        # A dictionary of services to invoke along with requests they receive
        targets = {
            'service1': {'hello':'from-fan-out1'},
            'service2': {'hello':'from-fan-out2'},
        }

        # Callbacks to invoke when both services above finish
        callbacks = ['my.callback1', 'my.callback2']

        # User-provided Correlation ID
        cid = '2963-2645-9715-1719'

        # CID returned is the one received on input
        cid = self.patterns.fanout.invoke(targets, callbacks, cid=cid)

Création de callbacks

Un callback est un service Zato ordinaire qui est invoqué à des moments spécifiques d'un appel fan-out/fan-in. Le même service peut servir de callback final et de callback cible. Chaque callback est invoqué de manière asynchrone, indépendamment de tout autre callback défini.

Lors de l'exécution, l'attribut self.channel de chaque callback sera défini comme suit FANOUT_ON_FINAL ou FANOUT_ON_TARGET.

En entrée, l'attribut self.request.payload d'une callback finale sera un dictionnaire Python dont le contenu aura le format suivant :

  {
   'source': 'fanout1.my-service',
   'req_ts_utc': '2025-01-19 19:55:46',
   'on_target': '',
   'on_final': ['my.cb1', 'zato.helpers.input-logger'],
   'data':
    {'my.service1': {
      'source': 'fanout1.my-service',
      'target': 'my.service1',
      'response': 'My response',
      'req_ts_utc': '2025-01-19 19:55:45',
      'resp_ts_utc': '2025-01-19 19:55:46',
      'ok': True,
      'exception': None,
      'cid': '63e7c3a2ba616badecb125ce'},
    'my.service2': {
      'source': 'fanout1.my-service',
      'target': 'my.service2',
      'response': 'My response2',
      'req_ts_utc': '2025-01-19 19:55:45',
      'resp_ts_utc': '2025-01-19 19:55:46',
      'ok': True,
      'exception': None,
      'cid': '9ffc1e5d79f6616b2dfb2c35'}}}
Clé Notes
source Nom du service d'origine émettant l'appel de fan-out/fan-in
req_ts_utc Quand l'appel de sortie initial a-t-il été exécuté, en UTC
on_target Une liste, le cas échéant, des rappels à exécuter chaque fois qu'un service cible se termine.
on_final Une liste de callbacks à invoquer lorsque toutes les cibles sont terminées.
data Un dictionnaire de données décrivant les réponses des services cibles avec comme clé le nom des cibles.

Le dictionnaire 'data' aura les clés suivantes :

  • source : nom du service d'origine émettant l'appel de fan-out/fan-in (répété)
  • target : nom d'un service, l'une des cibles (répété)
  • response : réponse produite par le service.
  • req_ts_utc : horodatage de la demande en UTC (répété)
  • resp_ts_utc : date de création de la réponse, en UTC
  • ok : Vrai/Faux selon que l'appel a réussi (n'a pas soulevé d'exception) ou non.
  • exception : traceback de l'exception sous forme de chaîne, si ok n'est pas True
  • cid : ID de corrélation, le même que celui que le service source a reçu en sortie de self.patterns.fanout.invoke.

Les callbacks ciblés reçoivent self.request.payload égal à une entrée individuelle dans la clé 'data' ci-dessus:

  {
    'source': 'fanout1.my-service',
    'target': 'my.service1',
    'response': 'My response',
    'req_ts_utc': '2025-01-19 19:55:45',
    'resp_ts_utc': '2025-01-19 19:55:46',
    'ok': True,
    'exception': None,
    'cid': 'c6722658d6520777849dee0'
  }

Similitudes avec l'exécution parallèle

La fonctionnalité est très similaire au modèle Parallel Execution : la principale différence étant uniquement que dans Fan-out/Fan-in, ce sont les rappels finaux qui sont requis tandis que ceux sur cible sont facultatif alors que dans l'exécution parallèle, il n'y a pas de rappels finaux pendant que la cible est requise.