Integration workflows often need to store files - audit logs, generated reports, workflow state, or documents received from external systems. Cloud storage services like Azure Blob Storage and AWS S3 provide reliable, scalable storage that your services can access through their SDKs.
Credentials go in Zato's configuration, not in your code. Your services focus on the business logic of what to store and when.
# -*- coding: utf-8 -*-
import html
import xmltodict
from azure.storage.blob import BlobServiceClient
from zato.server.service import Service
class UploadBlob(Service):
name = 'azure.blob.upload'
input = 'folder', 'filename', 'content'
def handle(self):
folder = self.request.input.folder
filename = self.request.input.filename
content = self.request.input.content
# Get credentials from config
account_url = self.config.azure.storage.account_url
account_key = self.config.azure.storage.account_key
container_name = self.config.azure.storage.container_name
# Initialize the client
blob_service = BlobServiceClient(
account_url=account_url,
credential=account_key
)
# Get container and blob clients
container = blob_service.get_container_client(container_name)
blob = container.get_blob_client(f'{folder}/{filename}')
try:
blob.upload_blob(content)
self.response.payload = {
'status': 'uploaded',
'path': f'{folder}/{filename}'
}
self.logger.info(f'{filename} uploaded to Azure Blob Storage')
except Exception as e:
# Parse Azure error response
error_str = html.unescape(str(e))
xml_start = error_str.find("<?xml")
xml_end = error_str.rfind("</Error>") + len("</Error>")
if xml_start >= 0 and xml_end > xml_start:
xml_content = error_str[xml_start:xml_end]
error_response = xmltodict.parse(xml_content)
self.logger.error(f'Azure error: {error_response["Error"]["Code"]}')
self.response.payload = error_response
else:
self.logger.error(f'Upload failed: {e}')
self.response.payload = {'error': str(e)}
# -*- coding: utf-8 -*-
from azure.storage.blob import BlobServiceClient
from zato.server.service import Service
class DownloadBlob(Service):
name = 'azure.blob.download'
input = 'folder', 'filename'
def handle(self):
folder = self.request.input.folder
filename = self.request.input.filename
account_url = self.config.azure.storage.account_url
account_key = self.config.azure.storage.account_key
container_name = self.config.azure.storage.container_name
blob_service = BlobServiceClient(
account_url=account_url,
credential=account_key
)
container = blob_service.get_container_client(container_name)
blob = container.get_blob_client(f'{folder}/{filename}')
# Download blob content
download_stream = blob.download_blob()
content = download_stream.readall()
self.response.payload = content
self.response.headers['Content-Disposition'] = f'attachment; filename={filename}'
# -*- coding: utf-8 -*-
from azure.storage.blob import BlobServiceClient
from zato.server.service import Service
class ListBlobs(Service):
name = 'azure.blob.list'
input = '-prefix'
def handle(self):
prefix = self.request.input.prefix or ''
account_url = self.config.azure.storage.account_url
account_key = self.config.azure.storage.account_key
container_name = self.config.azure.storage.container_name
blob_service = BlobServiceClient(
account_url=account_url,
credential=account_key
)
container = blob_service.get_container_client(container_name)
blobs = []
for blob in container.list_blobs(name_starts_with=prefix):
blobs.append({
'name': blob.name,
'size': blob.size,
'last_modified': blob.last_modified.isoformat()
})
self.response.payload = {'blobs': blobs, 'count': len(blobs)}
# -*- coding: utf-8 -*-
import boto3
from zato.server.service import Service
class UploadToS3(Service):
name = 'aws.s3.upload'
input = 'bucket', 'key', 'content'
def handle(self):
bucket = self.request.input.bucket
key = self.request.input.key
content = self.request.input.content
# Get credentials from config
s3 = boto3.client(
's3',
aws_access_key_id=self.config.aws.access_key_id,
aws_secret_access_key=self.config.aws.secret_access_key,
region_name=self.config.aws.region
)
s3.put_object(Bucket=bucket, Key=key, Body=content)
self.response.payload = {
'status': 'uploaded',
'bucket': bucket,
'key': key
}
# -*- coding: utf-8 -*-
import boto3
from zato.server.service import Service
class DownloadFromS3(Service):
name = 'aws.s3.download'
input = 'bucket', 'key'
def handle(self):
bucket = self.request.input.bucket
key = self.request.input.key
s3 = boto3.client(
's3',
aws_access_key_id=self.config.aws.access_key_id,
aws_secret_access_key=self.config.aws.secret_access_key,
region_name=self.config.aws.region
)
response = s3.get_object(Bucket=bucket, Key=key)
content = response['Body'].read()
filename = key.split('/')[-1]
self.response.payload = content
self.response.headers['Content-Disposition'] = f'attachment; filename={filename}'
Use cloud storage to persist data between workflow steps:
# -*- coding: utf-8 -*-
import json
from datetime import datetime
from zato.server.service import Service
class StoreWorkflowData(Service):
name = 'workflow.data.store'
input = 'workflow_id', 'step', 'data'
def handle(self):
workflow_id = self.request.input.workflow_id
step = self.request.input.step
data = self.request.input.data
# Create blob content
blob_content = json.dumps({
'workflow_id': workflow_id,
'step': step,
'timestamp': datetime.now().isoformat(),
'data': data
}, ensure_ascii=False).encode('utf-8')
# Store in cloud
folder = f'workflows/{datetime.now().date()}'
filename = f'{workflow_id}-{step}.json'
self.invoke('azure.blob.upload',
folder=folder,
filename=filename,
content=blob_content)
self.response.payload = {'stored': f'{folder}/{filename}'}