Schedule a demo

Orchestrating multiple APIs

Real integration projects rarely involve just one API. You need to fetch data from an HR system, check an access control database, update a ticket tracker, and combine everything into a single response. This is orchestration - coordinating multiple API calls to accomplish a business task.

In Zato, orchestration happens through self.invoke(). Each call to self.invoke() runs another service, which might call an external API or do local processing. This keeps your orchestration logic clean and your API-specific code isolated in adapter services.

Calling multiple APIs sequentially

Fetch data from one API, then use it to call another:

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class GetEmployeeWithAccessCard(Service):

    # The 'name' attribute uniquely identifies this service
    name = 'employee.get-with-access-card'

    # The 'input' attribute declares expected request parameters
    input = 'employee_id'

    def handle(self):

        # Access parameters declared in 'input' above
        employee_id = self.request.input.employee_id

        employee = self.invoke('hr.employee.get', employee_id=employee_id)

        # When a service returns a model, access its fields with dot notation
        access_card = self.invoke('access.card.get', email=employee.email)

        # Build the response
        has_access_card = access_card is not None

        # Payload is automatically serialized to JSON
        self.response.payload = {
            'employee': employee,
            'has_access_card': has_access_card,
            'access_card': access_card
        }

Expose this on a channel at /api/employee/{employee_id} and test:

curl http://localhost:11223/api/employee/EMP-001

The response combines data from two different backend systems into one JSON object.

Aggregating data from multiple sources

Combine data from several independent APIs:

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class GetCustomerDashboard(Service):
    name = 'customer.dashboard.get'

    input = 'customer_id'

    def handle(self):

        # Access declared input parameters
        customer_id = self.request.input.customer_id

        # Each call runs a service synchronously and waits for response
        profile = self.invoke('crm.customer.get', customer_id=customer_id)
        orders = self.invoke('orders.list', customer_id=customer_id)
        tickets = self.invoke('support.tickets.list', customer_id=customer_id)
        billing = self.invoke('billing.get', customer_id=customer_id)

        # Count open tickets
        open_tickets = [item for item in tickets if item.status == 'open']
        open_ticket_count = len(open_tickets)

        # Build customer info
        customer = {
            'id': customer_id,
            'name': profile.name,
            'email': profile.email,
            'since': profile.created_at
        }

        # Build summary
        summary = {
            'total_orders': len(orders),
            'open_tickets': open_ticket_count,
            'account_balance': billing.amount
        }

        # Aggregate into dashboard response
        self.response.payload = {
            'customer': customer,
            'summary': summary,
            'recent_orders': orders[:5],
            'recent_tickets': tickets[:3]
        }

Expose on a channel at /api/customer/{customer_id}/dashboard:

curl http://localhost:11223/api/customer/CUST-123/dashboard

Your client gets a complete dashboard in one request instead of making four separate API calls.

Conditional orchestration

Call different APIs based on business logic:

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class ProcessTrainingCompletion(Service):
    name = 'training.completion.process'

    # Multiple input parameters are declared as a tuple
    input = 'course_id', 'user_id'

    def handle(self):

        # Each declared input becomes an attribute on self.request.input
        course_id = self.request.input.course_id
        user_id = self.request.input.user_id

        # You can pass keyword arguments to the target service
        course = self.invoke('lms.course.get', id=course_id)
        completion = self.invoke('lms.completion.get', course_id=course_id, user_id=user_id)

        # Response objects have attributes matching their model fields
        notifications = course.notify_on_completion
        jira_done_transition = 31

        # Conditional logic determines which downstream services to call
        if 'HR' in notifications:
            self.invoke('hr.training.record', employee_id=completion.employee_id, course=course)
            # You can use the standard Python logger
            self.logger.info(f'Recorded training in HR for {user_id}')

        if 'Jira' in notifications:
            ticket_id = completion.access_request_ticket
            if ticket_id:
                transition_id = jira_done_transition
                self.invoke('jira.transition', ticket_id=ticket_id, transition_id=transition_id)
                self.logger.info(f'Transitioned Jira ticket')

        self.response.payload = {'status': 'processed'}

Fan-out pattern

When you need to sync data to multiple systems or gather data from independent sources, fan out the calls. Each target system gets the same input data.

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class SyncAllSystems(Service):
    name = 'sync.all-systems'

    def handle(self):

        # You can pass complex objects like lists to other services
        employees = self.invoke('hr.employees.active')

        # Fan out the same data to multiple target systems
        lms_result = self.invoke('lms.sync', employees=employees)
        access_result = self.invoke('access.sync', employees=employees)
        directory_result = self.invoke('directory.sync', employees=employees)

        # Response models have typed attributes for easy access
        self.response.payload = {
            'employees_processed': len(employees),
            'lms_synced': lms_result.count,
            'access_synced': access_result.count,
            'directory_synced': directory_result.count
        }

Error handling in orchestration

When orchestrating multiple APIs, decide which failures are fatal and which are acceptable. Core data might be required, while enrichments can be skipped if unavailable.

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class GetEmployeeFullProfile(Service):
    name = 'employee.profile.full'

    input = 'employee_id'

    def handle(self):

        # Access declared input parameters
        employee_id = self.request.input.employee_id

        # Invoke raises exceptions on errors - catch them for optional data
        profile = self.invoke('hr.employee.get', employee_id=employee_id)

        result = {
            'employee_id': employee_id,
            'profile': profile
        }

        # Wrap optional enrichments in try/except to handle exceptions
        try:
            response = self.invoke('lms.courses', email=profile.email)
            result['training'] = response
        except Exception as e:
            self.logger.warning(f'Training data unavailable: {e}')
            result['training'] = None

        try:
            response = self.invoke('access.history', employee_id=employee_id)
            result['access_history'] = response
        except Exception as e:
            self.logger.warning(f'Access history unavailable: {e}')
            result['access_history'] = None

        # The final payload includes whatever data was available
        self.response.payload = result

Learn more