Real integration projects rarely involve just one API. You need to fetch data from an HR system, check an access control database, update a ticket tracker, and combine everything into a single response. This is orchestration - coordinating multiple API calls to accomplish a business task.
In Zato, orchestration happens through self.invoke(). Each call to self.invoke() runs another service, which might call an external API or do local processing. This keeps your orchestration logic clean and your API-specific code isolated in adapter services.
Fetch data from one API, then use it to call another:
# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class GetEmployeeWithAccessCard(Service):
# The 'name' attribute uniquely identifies this service
name = 'employee.get-with-access-card'
# The 'input' attribute declares expected request parameters
input = 'employee_id'
def handle(self):
# Access parameters declared in 'input' above
employee_id = self.request.input.employee_id
employee = self.invoke('hr.employee.get', employee_id=employee_id)
# When a service returns a model, access its fields with dot notation
access_card = self.invoke('access.card.get', email=employee.email)
# Build the response
has_access_card = access_card is not None
# Payload is automatically serialized to JSON
self.response.payload = {
'employee': employee,
'has_access_card': has_access_card,
'access_card': access_card
}
Expose this on a channel at /api/employee/{employee_id} and test:
The response combines data from two different backend systems into one JSON object.
Combine data from several independent APIs:
# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class GetCustomerDashboard(Service):
name = 'customer.dashboard.get'
input = 'customer_id'
def handle(self):
# Access declared input parameters
customer_id = self.request.input.customer_id
# Each call runs a service synchronously and waits for response
profile = self.invoke('crm.customer.get', customer_id=customer_id)
orders = self.invoke('orders.list', customer_id=customer_id)
tickets = self.invoke('support.tickets.list', customer_id=customer_id)
billing = self.invoke('billing.get', customer_id=customer_id)
# Count open tickets
open_tickets = [item for item in tickets if item.status == 'open']
open_ticket_count = len(open_tickets)
# Build customer info
customer = {
'id': customer_id,
'name': profile.name,
'email': profile.email,
'since': profile.created_at
}
# Build summary
summary = {
'total_orders': len(orders),
'open_tickets': open_ticket_count,
'account_balance': billing.amount
}
# Aggregate into dashboard response
self.response.payload = {
'customer': customer,
'summary': summary,
'recent_orders': orders[:5],
'recent_tickets': tickets[:3]
}
Expose on a channel at /api/customer/{customer_id}/dashboard:
Your client gets a complete dashboard in one request instead of making four separate API calls.
Call different APIs based on business logic:
# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class ProcessTrainingCompletion(Service):
name = 'training.completion.process'
# Multiple input parameters are declared as a tuple
input = 'course_id', 'user_id'
def handle(self):
# Each declared input becomes an attribute on self.request.input
course_id = self.request.input.course_id
user_id = self.request.input.user_id
# You can pass keyword arguments to the target service
course = self.invoke('lms.course.get', id=course_id)
completion = self.invoke('lms.completion.get', course_id=course_id, user_id=user_id)
# Response objects have attributes matching their model fields
notifications = course.notify_on_completion
jira_done_transition = 31
# Conditional logic determines which downstream services to call
if 'HR' in notifications:
self.invoke('hr.training.record', employee_id=completion.employee_id, course=course)
# You can use the standard Python logger
self.logger.info(f'Recorded training in HR for {user_id}')
if 'Jira' in notifications:
ticket_id = completion.access_request_ticket
if ticket_id:
transition_id = jira_done_transition
self.invoke('jira.transition', ticket_id=ticket_id, transition_id=transition_id)
self.logger.info(f'Transitioned Jira ticket')
self.response.payload = {'status': 'processed'}
When you need to sync data to multiple systems or gather data from independent sources, fan out the calls. Each target system gets the same input data.
# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class SyncAllSystems(Service):
name = 'sync.all-systems'
def handle(self):
# You can pass complex objects like lists to other services
employees = self.invoke('hr.employees.active')
# Fan out the same data to multiple target systems
lms_result = self.invoke('lms.sync', employees=employees)
access_result = self.invoke('access.sync', employees=employees)
directory_result = self.invoke('directory.sync', employees=employees)
# Response models have typed attributes for easy access
self.response.payload = {
'employees_processed': len(employees),
'lms_synced': lms_result.count,
'access_synced': access_result.count,
'directory_synced': directory_result.count
}
When orchestrating multiple APIs, decide which failures are fatal and which are acceptable. Core data might be required, while enrichments can be skipped if unavailable.
# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class GetEmployeeFullProfile(Service):
name = 'employee.profile.full'
input = 'employee_id'
def handle(self):
# Access declared input parameters
employee_id = self.request.input.employee_id
# Invoke raises exceptions on errors - catch them for optional data
profile = self.invoke('hr.employee.get', employee_id=employee_id)
result = {
'employee_id': employee_id,
'profile': profile
}
# Wrap optional enrichments in try/except to handle exceptions
try:
response = self.invoke('lms.courses', email=profile.email)
result['training'] = response
except Exception as e:
self.logger.warning(f'Training data unavailable: {e}')
result['training'] = None
try:
response = self.invoke('access.history', employee_id=employee_id)
result['access_history'] = response
except Exception as e:
self.logger.warning(f'Access history unavailable: {e}')
result['access_history'] = None
# The final payload includes whatever data was available
self.response.payload = result