Schedule a demo

Cloud storage

Integration workflows often need to store files - audit logs, generated reports, workflow state, or documents received from external systems. Cloud storage services like Azure Blob Storage and AWS S3 provide reliable, scalable storage that your services can access through their SDKs.

Credentials go in Zato's configuration, not in your code. Your services focus on the business logic of what to store and when.

Azure Blob Storage

Uploading blobs

# -*- coding: utf-8 -*-

import html
import xmltodict
from azure.storage.blob import BlobServiceClient
from zato.server.service import Service

class UploadBlob(Service):
    name = 'azure.blob.upload'

    input = 'folder', 'filename', 'content'

    def handle(self):

        folder = self.request.input.folder
        filename = self.request.input.filename
        content = self.request.input.content

        # Get credentials from config
        account_url = self.config.azure.storage.account_url
        account_key = self.config.azure.storage.account_key
        container_name = self.config.azure.storage.container_name

        # Initialize the client
        blob_service = BlobServiceClient(
            account_url=account_url, 
            credential=account_key
        )

        # Get container and blob clients
        container = blob_service.get_container_client(container_name)
        blob = container.get_blob_client(f'{folder}/{filename}')

        try:
            blob.upload_blob(content)

            self.response.payload = {
                'status': 'uploaded',
                'path': f'{folder}/{filename}'
            }
            self.logger.info(f'{filename} uploaded to Azure Blob Storage')

        except Exception as e:
            # Parse Azure error response
            error_str = html.unescape(str(e))
            xml_start = error_str.find("<?xml")
            xml_end = error_str.rfind("</Error>") + len("</Error>")

            if xml_start >= 0 and xml_end > xml_start:
                xml_content = error_str[xml_start:xml_end]
                error_response = xmltodict.parse(xml_content)
                self.logger.error(f'Azure error: {error_response["Error"]["Code"]}')
                self.response.payload = error_response
            else:
                self.logger.error(f'Upload failed: {e}')
                self.response.payload = {'error': str(e)}

Downloading blobs

# -*- coding: utf-8 -*-

from azure.storage.blob import BlobServiceClient
from zato.server.service import Service

class DownloadBlob(Service):
    name = 'azure.blob.download'

    input = 'folder', 'filename'

    def handle(self):

        folder = self.request.input.folder
        filename = self.request.input.filename

        account_url = self.config.azure.storage.account_url
        account_key = self.config.azure.storage.account_key
        container_name = self.config.azure.storage.container_name

        blob_service = BlobServiceClient(
            account_url=account_url,
            credential=account_key
        )

        container = blob_service.get_container_client(container_name)
        blob = container.get_blob_client(f'{folder}/{filename}')

        # Download blob content
        download_stream = blob.download_blob()
        content = download_stream.readall()

        self.response.payload = content
        self.response.headers['Content-Disposition'] = f'attachment; filename={filename}'

Listing blobs

# -*- coding: utf-8 -*-

from azure.storage.blob import BlobServiceClient
from zato.server.service import Service

class ListBlobs(Service):
    name = 'azure.blob.list'

    input = '-prefix'

    def handle(self):

        prefix = self.request.input.prefix or ''

        account_url = self.config.azure.storage.account_url
        account_key = self.config.azure.storage.account_key
        container_name = self.config.azure.storage.container_name

        blob_service = BlobServiceClient(
            account_url=account_url,
            credential=account_key
        )

        container = blob_service.get_container_client(container_name)

        blobs = []
        for blob in container.list_blobs(name_starts_with=prefix):
            blobs.append({
                'name': blob.name,
                'size': blob.size,
                'last_modified': blob.last_modified.isoformat()
            })

        self.response.payload = {'blobs': blobs, 'count': len(blobs)}

AWS S3

Uploading to S3

# -*- coding: utf-8 -*-

import boto3
from zato.server.service import Service

class UploadToS3(Service):
    name = 'aws.s3.upload'

    input = 'bucket', 'key', 'content'

    def handle(self):

        bucket = self.request.input.bucket
        key = self.request.input.key
        content = self.request.input.content

        # Get credentials from config
        s3 = boto3.client(
            's3',
            aws_access_key_id=self.config.aws.access_key_id,
            aws_secret_access_key=self.config.aws.secret_access_key,
            region_name=self.config.aws.region
        )

        s3.put_object(Bucket=bucket, Key=key, Body=content)

        self.response.payload = {
            'status': 'uploaded',
            'bucket': bucket,
            'key': key
        }

Downloading from S3

# -*- coding: utf-8 -*-

import boto3
from zato.server.service import Service

class DownloadFromS3(Service):
    name = 'aws.s3.download'

    input = 'bucket', 'key'

    def handle(self):

        bucket = self.request.input.bucket
        key = self.request.input.key

        s3 = boto3.client(
            's3',
            aws_access_key_id=self.config.aws.access_key_id,
            aws_secret_access_key=self.config.aws.secret_access_key,
            region_name=self.config.aws.region
        )

        response = s3.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read()

        filename = key.split('/')[-1]

        self.response.payload = content
        self.response.headers['Content-Disposition'] = f'attachment; filename={filename}'

Storing workflow data

Use cloud storage to persist data between workflow steps:

# -*- coding: utf-8 -*-

import json
from datetime import datetime
from zato.server.service import Service

class StoreWorkflowData(Service):
    name = 'workflow.data.store'

    input = 'workflow_id', 'step', 'data'

    def handle(self):

        workflow_id = self.request.input.workflow_id
        step = self.request.input.step
        data = self.request.input.data

        # Create blob content
        blob_content = json.dumps({
            'workflow_id': workflow_id,
            'step': step,
            'timestamp': datetime.now().isoformat(),
            'data': data
        }, ensure_ascii=False).encode('utf-8')

        # Store in cloud
        folder = f'workflows/{datetime.now().date()}'
        filename = f'{workflow_id}-{step}.json'

        self.invoke('azure.blob.upload',
            folder=folder,
            filename=filename,
            content=blob_content)

        self.response.payload = {'stored': f'{folder}/{filename}'}

Learn more