Zato 3.0 will sport a completely rewritten AMQP subsystem and the changes are nothing short of exciting. Put simply, everything is faster, leaner and even more robust.

Synchronous channels

AMQP channels are now synchronous which means that an AMQP broker will wait for a Zato service to confirm that the message was received or it was not. In earlier Zato versions, AMQP channels worked in an implicit always-acknowledge mode. Now each message taken off an AMQP queue can be explicitly acknowledged or rejected, as below:

from zato.server.service import Service

class MyService(Service):

    def handle(self):
        if 'foo' in self.request.payload:
            self.request.amqp.ack()
        else:
            self.request.amqp.reject()

Message details

Channels have access to all metadata regarding an incoming message, for instance:

from zato.server.service import Service

class MyService(Service):

    def handle(self):
        self.logger.info(self.request.amqp.msg)

Screenshots

Connection pools

Previously, a single connection pool was shared by all channels and outgoing connections to a single AMQP broker.

Now each channel or outgoing connection may have its own pool, managed independently, which lets Zato servers use multiple CPUs in AMQP connections to improve performance - the underlying framework for connection pools was redesigned to achieve drastically reduced RAM usage - from megabytes down to kilobytes for each pool.

Channels:

Screenshots

Outgoing connections:

Screenshots

Correlating connections and consumers

When using AMQP GUIs, it was already possible to easily find Zato connections but the idea has been extended and now each connection, channel and consumer carry accounting information on which Zato component is the source or consumer of a message, including remote hostname, process ID, thread/greenlet ID and consumer tag prefix:

Screenshots

Screenshots

Get it from GitHub

A preview version of Zato 3.0 can be already installed from source code - all the new AMQP features are in the default branch, called 'main'. And remember to always feel free to get in touch - leave feedback in the forum or contact support if you need anything!

This is a preview of an upcoming feature in Zato 3.0 that will let one generate and distribute specifications for API services.

Introduction

Let's consider the following two modules with three services related to customers and customer cases. The implementation is left out so as to focus on I/O only.

# api1.py

from zato.server.service import Service

namespace = 'my.api.customer'

class GetCustomer(Service):
    """ Returns basic information about a customer.
    """
    name = 'get-customer'

    class SimpleIO:
        input_required = ('cust_id',)
        input_optional = ('cust_type', 'segment',)
        output_required = ('name', 'is_active', 'region')
        output_optional = ('last_seen',)

    def handle(self):
        pass # Skip implementation

class UpdateCustomer(Service):
    """ Updates basic information about a customer.
    """
    name = 'update-customer'

    class SimpleIO:
        input_required = ('name', 'is_active', 'region')
        output_required = ('result',)

    def handle(self):
        pass # Skip implementation
# api2.py

from zato.server.service import Service

namespace = 'my.api.customer-cases'

class GetCustomerCase(Service):
    """ Returns basic information about an individual case opened by a customer.
    """
    name = 'get-customer-cases'

    class SimpleIO:
        input_required = ('case_id',)
        output_required = ('cust_id', 'is_open', 'status', 'last_contact_date',)
        output_optional = ('supervisor_id', 'region_id')

    def handle(self):
        pass # Skip implementation

Screenshots

In Zato 3.0, and already available if installed from source, a specification for such services will look like below:

Screenshot

Screenshot

Screenshot

Screenshot

Key points

  • Services can be grouped into namespaces
  • For each service its documentation, as extracted from docstrings, is provided
  • Each service can optionally export information what other services it invokes, i.e. depends on
  • Input/Output for each service is based on SimpleIO
  • All parameters are automatically converted to correct types, e.g. is_active is a boolean, cust_id an integer
  • Brand-specific information and CSS can be applied to customize the output - here 'My API spec' is only a generic name that can be replaced with anything else
  • The API specification is output using a service and REST channels thus it can be secured as required just like any other channel in Zato

Future plans

While for now the output is the documentation, the underlying backend is already capable to automatically generate data fpr other API specification formats such as Swagger/OpenAPI, RAML or WSDL - this is exactly what the next Zato version will do, it will generate specs in several formats for easy consumption by API clients.

Overview

While JSON or SOAP-wrapped messages are ubiquitous in HTTP-based APIs, there are times when unformatted messages are of advantage.

This article will explain how to create Zato endpoints accessing such data and to top it off, will also introduce the concept of outgoing FTP connections through which requests can be stored at remote FTP resources.

Code

Hot-deploy the following service onto a Zato cluster.

The key point is self.request.raw_request - this is how a programmer can access input data as it was received by Zato prior to its de-serialization from JSON or XML.

Note that this attribute will be available even if parsing took place, that is, one can always access raw request no matter what data format was used on input.

# Zato
from zato.common.util import fs_safe_now
from zato.server.service import Service

class FTPHandler(Service):
    """ Stores anything that is given on input at a remote FTP server
    pointed to by an outgoing connection called 'my.ftp'.
    """
    def handle(self):

        # Obtain handle to an FTP connection by its name
        conn = self.outgoing.ftp.get('my.ftp')

        # Create file name in a format that is safe for file-systems,
        # i.e. no colons or whitespace.
        file_name = '/home/api/request.{}'.format(fs_safe_now())

        # Store file on FTP
        conn.setcontents(file_name, self.request.raw_request)

Configuration

Now that the code is deployed, we need configuration to plug it into the whole framework. Use web-admin to create two objects per screenshots below.

One is an HTTP channel that exposes the deployed service through an HTTP channel - note that 'Data format' was left blank so as not to require JSON or XML on input.

Screenshot

The other is a definition of the FTP server that the service will store files in.

Screenshot

Don't forget to set password for the newly configured FTP connection to log in with in run-time.

Screenshot

As always with Zato, one's code never accesses any resources directly, everything is routed through named objects such as self.outgoing.ftp['my.ftp'] that actually point to a required resource. In this manner, if anything needs to be reconfigured, it can be done on fly, without changes to code and without server restarts.

Testing

With code deployed and configuration created, we can now upload sample data from command line and confirm it's made its way to an FTP server.

Screenshot

And sure enough - the file is there!

Screenshot

Screenshot

Summary

Accessing raw requests in Zato services is trivially easy. Likewise, making use of FTP connections is a matter of a single line of code, while hot-deployment and configuration mean that changes introduced in web-admin are immediately reflected all throughout a Zato cluster.

Want more?

Overview

One of fundamental principles of programming with Zato is that one's services are typically insulated from inner workings of underlying data formats or security schemes - after all, why bother with mundane tasks such as authentication or authorization, it should be the platform's job whereas user services should rather focus on their own job.

Conversely, once a security definition is created in web-admin, it can be applied in multiple places without requiring any changes to Python code.

All of that applies to a newly added feature of Zato that lets one keep API secrets in Vault and this blog post introduces key concepts behind it.

Screenshot

What is Vault?

Vault is a dedicated tool to store and make use of secrets that offers support for common authentication and authorization related workflows, such as secure storage and broad or fine-grained secrets management as well as policies describing what action a given holder of a secret can perform.

The system is open to customizations and makes for a great companion to Zato in letting one easily express who can access what, for how long, and under what conditions.

Vault needs to be installed separately to Zato. This post assumes that there is a Vault instance running on http://localhost:49517, already unsealed, using the configuration as below.

Essentially, this is a development server but unlike the Vault's default dev server, this one keeps data on disk instead of RAM.

disable_mlock = true

backend "file" {
  path = "./vault-dev.db"
}

listener "tcp" {
  address = "127.0.0.1:49517"
  tls_disable = 1
}

Creating Vault connections

The easiest way to create a new connection is through web-admin, simply fill out the form as below and a new connection to Vault will be created.

Screenshot

  • Name - an arbitrary name of the connection
  • URL - where Vault can be found
  • Token - Vault token, note that it should be possible for this token to look up tokens and credentials of incoming requests
  • Default authentication method - which authentication backend to use default to authenticate requests, can be Token, Username/password or GitHub. LDAP is coming up soon.
  • Service - a service that can be invoked if no default method was defined - the service can extract credentials from request and indicate what method to use.
  • Timeout - how long to wait for responses from Vault
  • TLS options - whether TLS connections should be verified and if so, using what CA certs unless default ones should be employed. Also, an optional client TLS key and certificate can be uploaded so that Zato itself authenticates with Vault using this key/cert pair.

Attaching Vault connections to channels

Authentication with Vault works with regular HTTP REST or SOAP channels as well as with WebSockets - pick the newly created Vault connection from the list, click OK, and this is it, your channels will now be secured by Vault.

If you have already existing channels, you can swap out their current security definitions for Vault-based ones and things will just work without server restarts.

Screenshot

Screenshot

Policies

The above is everything that is needed to define API endpoints backed by Vault which will now on behalf of Zato authenticate users according to what was provided on input when invoking a service.

However, a complementary aspect is that of authorization - i.e., assuming that in the incoming request there were valid credentials and the service can be invoked at all, we can do one better, go a step further, and define Vault policies to express business relations between objects exchanged in API calls. This will let one store rules in a central place accessible to any API endpoint.

This part of the Zato-Vault interface is still under active development - watch this space for more information!

Overview

This blog post introduces the Zato scheduler - a feature of Zato that lets one configure API services and background jobs to be run at selected intervals and according to specified execution plans without any programming needed.

Future posts will introduce all the details whereas this one gently describes major features - overall architecture, one-time jobs, interval-based jobs, cron-style jobs and public API to manage jobs from custom tools.

Architecture

In Zato, scheduler jobs are one of channels that a service can be invoked from - that is, the same service can be mounted on a REST, SOAP, AMQP, ZeroMQ or any other channel in addition to being triggered from the scheduler. No changes in code are required to achieve it.

Screenshot

For instance, let's consider the code below of a hypothetical service that downloads billings from an FTP resource and sends them off to a REST endpoint:

from zato.server.service import Service

class BillingHandler(Service):
    def handle(self):

        # Download data ..
        ftp = self.outgoing.ftp.get('Billings')
        contents = ftp.getcontents('/data/current.csv')

        # .. and send it to its recipient.
        self.outgoing.plain_http['ERP'].post(self.cid, contents)

Nowhere in the service is any reference embedded as to how it will be invoked and this is the crucial part of Zato design - services only focus on doing their jobs not on how to make themselves available from one channel or another.

Thus, even if a service to bulk transfer billings initially will likely be invoked from the scheduler only, there is nothing preventing it from being triggered by a REST call or from command line as needed.

Or perhaps a message sent to an AMQP should trigger it - that is fine as well and the service will not need be changed to accomodate it.

Working with scheduler jobs

There are three types of scheduler jobs:

  • one-time jobs - great if a service should be invoked at a specific time but it does not need to be repeated further
  • interval-based jobs - let one specify how often to invoke a given service (e.g. once in four weeks, twice an hour, five times a minute) as well as when to stop it so as form complex plans such as 'After two weeks from now, invoke this service twice an hour but do it twelve times only'.
  • cron-style jobs - work similar to interval-based ones but use syntax of Cron so 00 3-6 * * 1-5 will mean 'run the service each full hour from 3am to 6am but only Monday to Friday (i.e. excluding weekends)'

Note that job definitions always survive cluster restarts - this means that if you fully shut down a whole cluster of Zato servers then all jobs will continue to execute once the server is back. However, any jobs missed during the downtime will not be re-scheduled.

When a job is being triggered, its target service can receive extra data that may be possibly needed for that service to perform its tasks - this data is completely opaque to Zato and can be in any format, JSON, XML, YAML, plain text, anything.

If a job should not be scheduled anymore - be it because it was a one-time job or because it reached its execution limit, it becomes inactive rather than being deleted.

Such an inactive job still is available in web-admin and can be made active again, possibly with a different schedule plan. On the other hand, actually deleting a job deletes it permanently.

Screenshot

Screenshot

Screenshot

Screenshot

Full public API is available to manage jobs either through REST or SOAP calls as well as from other services directly in Python, such as below:

from zato.common import SCHEDULER

class JobManager(Service):

    def handle(self):

        # Create a sample job that will trigger one of built-in test services

        self.invoke('zato.scheduler.job.create', {
            'cluster_id': self.server.cluster_id,
            'is_active': True,
            'name': 'My Sample',
            'service': 'zato.helpers.input-logger',
            'job_type': SCHEDULER.JOB_TYPE.INTERVAL_BASED,
            'seconds': 2,
        })

Stay tuned for more!

This was just the first installment that introduced core concepts behind the Zato scheduler - coming up are details of how to work with each kind of the jobs, their API and how to efficiently manage their definitions in source code repositories.