Schedule a demo

SQL and stored procedures

Many integrations need to read from or write to databases - fetching reference data, logging audit records, or calling stored procedures that encapsulate business logic. Zato provides outgoing SQL connections that you configure once and use across all your services.

The pattern is similar to REST connections: get a connection by name, create a session, execute queries, and map results to your response format. Always close sessions in a finally block to prevent connection leaks.

Configuring database connections

Create outgoing SQL connections in Dashboard:

Executing stored procedures

# -*- coding: utf-8 -*-

from http import HTTPStatus
from zato.server.service import Service

class GetCompanyGuarantors(Service):
    name = 'companies.guarantors.list'

    def handle(self):

        # Stored procedure to call
        stored_proc = "EXEC sammi.spGetCompanyGuarantors"

        # Connection name from Dashboard
        conn_name = 'CompanyDatabase'

        # Get connection and create session
        conn = self.outgoing.sql.get(conn_name)
        session = conn.session()

        try:
            # Execute the stored procedure
            result = session.execute(stored_proc)

            # Map results to response
            guarantors = []
            for row in result:
                guarantors.append({
                    'company': row['CompanyName'],
                    'company_id': row['CompanyIdentifier'],
                    'guarantor_name': row['GuarantorName'],
                    'guarantor_ssn': row['GuarantorSSN'],
                    'guarantor_email': row['GuarantorEmail'],
                    'guarantor_phone': row['GuarantorPhone']
                })

            self.response.payload = {'guarantors': guarantors}

        except Exception as e:
            self.logger.error(f'Database error: {e}')
            self.response.status_code = HTTPStatus.INTERNAL_SERVER_ERROR
            self.response.payload = {'error': str(e)}

        finally:
            if hasattr(session, 'close'):
                session.close()

Expose on a channel at /api/companies/guarantors and test:

curl http://localhost:11223/api/companies/guarantors

The response is JSON with the mapped field names from your database.

Stored procedures with parameters

# -*- coding: utf-8 -*-

from zato.server.service import Service

class GetEmployeeByDepartment(Service):
    name = 'employees.by-department'

    input = 'department_id'

    def handle(self):

        department_id = self.request.input.department_id

        # Parameterized stored procedure
        stored_proc = f"EXEC hr.spGetEmployeesByDepartment @DeptID = {department_id}"

        conn = self.outgoing.sql.get('HRDatabase')
        session = conn.session()

        try:
            result = session.execute(stored_proc)

            employees = []
            for row in result:
                employees.append({
                    'id': row['EmployeeID'],
                    'name': row['FullName'],
                    'email': row['Email'],
                    'hire_date': row['HireDate'].isoformat() if row['HireDate'] else None
                })

            self.response.payload = {
                'department_id': department_id,
                'employees': employees,
                'count': len(employees)
            }

        finally:
            session.close()

Raw SQL queries

# -*- coding: utf-8 -*-

from zato.server.service import Service

class SearchProducts(Service):
    name = 'products.search'

    input = 'query', '-category', '-min_price', '-max_price'

    def handle(self):

        query = self.request.input.query
        category = self.request.input.category
        min_price = self.request.input.min_price
        max_price = self.request.input.max_price

        # Build query dynamically
        sql = """
            SELECT ProductID, Name, Category, Price, InStock
            FROM Products
            WHERE Name LIKE :query
        """
        params = {'query': f'%{query}%'}

        if category:
            sql += " AND Category = :category"
            params['category'] = category

        if min_price:
            sql += " AND Price >= :min_price"
            params['min_price'] = min_price

        if max_price:
            sql += " AND Price <= :max_price"
            params['max_price'] = max_price

        sql += " ORDER BY Name"

        conn = self.outgoing.sql.get('ProductDatabase')
        session = conn.session()

        try:
            result = session.execute(sql, params)

            products = []
            for row in result:
                products.append({
                    'id': row['ProductID'],
                    'name': row['Name'],
                    'category': row['Category'],
                    'price': float(row['Price']),
                    'in_stock': row['InStock']
                })

            self.response.payload = {'products': products}

        finally:
            session.close()

Insert and update operations

# -*- coding: utf-8 -*-

from http import HTTPStatus
from zato.server.service import Service

class CreateAuditLog(Service):
    name = 'audit.log.create'

    input = 'action', 'user_id', 'details'

    def handle(self):

        action = self.request.input.action
        user_id = self.request.input.user_id
        details = self.request.input.details

        sql = """
            INSERT INTO AuditLog (Action, UserID, Details, Timestamp)
            VALUES (:action, :user_id, :details, GETDATE())
        """

        params = {
            'action': action,
            'user_id': user_id,
            'details': details
        }

        conn = self.outgoing.sql.get('AuditDatabase')
        session = conn.session()

        try:
            session.execute(sql, params)
            session.commit()

            self.response.status_code = HTTPStatus.CREATED
            self.response.payload = {'status': 'logged'}

        except Exception as e:
            session.rollback()
            self.logger.error(f'Failed to create audit log: {e}')
            self.response.status_code = HTTPStatus.INTERNAL_SERVER_ERROR
            self.response.payload = {'error': str(e)}

        finally:
            session.close()

Mapping results to models

# -*- coding: utf-8 -*-

from dataclasses import dataclass
from zato.server.service import Service

@dataclass
class Guarantor:
    company: str = ''
    company_id: str = ''
    name: str = ''
    ssn: str = ''
    email: str = ''
    phone: str = ''

class GetGuarantors(Service):
    name = 'guarantors.list'

    def handle(self):

        conn = self.outgoing.sql.get('CompanyDatabase')
        session = conn.session()

        try:
            result = session.execute("EXEC sammi.spGetCompanyGuarantors")

            guarantors = []
            for row in result:
                g = Guarantor()
                g.company = row['CompanyName']
                g.company_id = row['CompanyIdentifier']
                g.name = row['GuarantorName']
                g.ssn = row['GuarantorSSN']
                g.email = row['GuarantorEmail']
                g.phone = row['GuarantorPhone']
                guarantors.append(g)

            # Convert to dicts for JSON response
            self.response.payload = {
                'guarantors': [g.__dict__ for g in guarantors]
            }

        finally:
            session.close()

Connection list in Dashboard

Learn more