Many integrations need to read from or write to databases - fetching reference data, logging audit records, or calling stored procedures that encapsulate business logic. Zato provides outgoing SQL connections that you configure once and use across all your services.
The pattern is similar to REST connections: get a connection by name, create a session, execute queries, and map results to your response format. Always close sessions in a finally block to prevent connection leaks.
Create outgoing SQL connections in Dashboard:

# -*- coding: utf-8 -*-
from http import HTTPStatus
from zato.server.service import Service
class GetCompanyGuarantors(Service):
name = 'companies.guarantors.list'
def handle(self):
# Stored procedure to call
stored_proc = "EXEC sammi.spGetCompanyGuarantors"
# Connection name from Dashboard
conn_name = 'CompanyDatabase'
# Get connection and create session
conn = self.outgoing.sql.get(conn_name)
session = conn.session()
try:
# Execute the stored procedure
result = session.execute(stored_proc)
# Map results to response
guarantors = []
for row in result:
guarantors.append({
'company': row['CompanyName'],
'company_id': row['CompanyIdentifier'],
'guarantor_name': row['GuarantorName'],
'guarantor_ssn': row['GuarantorSSN'],
'guarantor_email': row['GuarantorEmail'],
'guarantor_phone': row['GuarantorPhone']
})
self.response.payload = {'guarantors': guarantors}
except Exception as e:
self.logger.error(f'Database error: {e}')
self.response.status_code = HTTPStatus.INTERNAL_SERVER_ERROR
self.response.payload = {'error': str(e)}
finally:
if hasattr(session, 'close'):
session.close()
Expose on a channel at /api/companies/guarantors and test:
The response is JSON with the mapped field names from your database.
# -*- coding: utf-8 -*-
from zato.server.service import Service
class GetEmployeeByDepartment(Service):
name = 'employees.by-department'
input = 'department_id'
def handle(self):
department_id = self.request.input.department_id
# Parameterized stored procedure
stored_proc = f"EXEC hr.spGetEmployeesByDepartment @DeptID = {department_id}"
conn = self.outgoing.sql.get('HRDatabase')
session = conn.session()
try:
result = session.execute(stored_proc)
employees = []
for row in result:
employees.append({
'id': row['EmployeeID'],
'name': row['FullName'],
'email': row['Email'],
'hire_date': row['HireDate'].isoformat() if row['HireDate'] else None
})
self.response.payload = {
'department_id': department_id,
'employees': employees,
'count': len(employees)
}
finally:
session.close()
# -*- coding: utf-8 -*-
from zato.server.service import Service
class SearchProducts(Service):
name = 'products.search'
input = 'query', '-category', '-min_price', '-max_price'
def handle(self):
query = self.request.input.query
category = self.request.input.category
min_price = self.request.input.min_price
max_price = self.request.input.max_price
# Build query dynamically
sql = """
SELECT ProductID, Name, Category, Price, InStock
FROM Products
WHERE Name LIKE :query
"""
params = {'query': f'%{query}%'}
if category:
sql += " AND Category = :category"
params['category'] = category
if min_price:
sql += " AND Price >= :min_price"
params['min_price'] = min_price
if max_price:
sql += " AND Price <= :max_price"
params['max_price'] = max_price
sql += " ORDER BY Name"
conn = self.outgoing.sql.get('ProductDatabase')
session = conn.session()
try:
result = session.execute(sql, params)
products = []
for row in result:
products.append({
'id': row['ProductID'],
'name': row['Name'],
'category': row['Category'],
'price': float(row['Price']),
'in_stock': row['InStock']
})
self.response.payload = {'products': products}
finally:
session.close()
# -*- coding: utf-8 -*-
from http import HTTPStatus
from zato.server.service import Service
class CreateAuditLog(Service):
name = 'audit.log.create'
input = 'action', 'user_id', 'details'
def handle(self):
action = self.request.input.action
user_id = self.request.input.user_id
details = self.request.input.details
sql = """
INSERT INTO AuditLog (Action, UserID, Details, Timestamp)
VALUES (:action, :user_id, :details, GETDATE())
"""
params = {
'action': action,
'user_id': user_id,
'details': details
}
conn = self.outgoing.sql.get('AuditDatabase')
session = conn.session()
try:
session.execute(sql, params)
session.commit()
self.response.status_code = HTTPStatus.CREATED
self.response.payload = {'status': 'logged'}
except Exception as e:
session.rollback()
self.logger.error(f'Failed to create audit log: {e}')
self.response.status_code = HTTPStatus.INTERNAL_SERVER_ERROR
self.response.payload = {'error': str(e)}
finally:
session.close()
# -*- coding: utf-8 -*-
from dataclasses import dataclass
from zato.server.service import Service
@dataclass
class Guarantor:
company: str = ''
company_id: str = ''
name: str = ''
ssn: str = ''
email: str = ''
phone: str = ''
class GetGuarantors(Service):
name = 'guarantors.list'
def handle(self):
conn = self.outgoing.sql.get('CompanyDatabase')
session = conn.session()
try:
result = session.execute("EXEC sammi.spGetCompanyGuarantors")
guarantors = []
for row in result:
g = Guarantor()
g.company = row['CompanyName']
g.company_id = row['CompanyIdentifier']
g.name = row['GuarantorName']
g.ssn = row['GuarantorSSN']
g.email = row['GuarantorEmail']
g.phone = row['GuarantorPhone']
guarantors.append(g)
# Convert to dicts for JSON response
self.response.payload = {
'guarantors': [g.__dict__ for g in guarantors]
}
finally:
session.close()
