An integral part of Zato, its scalable, service-oriented scheduler makes it is possible to execute high-level API integration processes as background tasks. The scheduler runs periodic jobs which in turn trigger services and services are what is used to integrate systems.

Integration process

In this article we will check how to use the scheduler with three kinds of jobs, one-time, interval-based and Cron-style ones.

Sample integration process

What we want to achieve is a sample yet fairly common use-case:

  • Periodically consult a remote REST endpoint for new data
  • Store data found in Redis
  • Push data found as an e-mail attachment

Instead of, or in addition to, Redis or e-mail, we could use SQL and SMS, or MongoDB and AMQP or anything else - Redis and e-mail are just example technologies frequently used in data synchronisation processes that we use to highlight the workings of the scheduler.

No matter the input and output channels, the scheduler works always the same - a definition of a job is created and the job's underlying service is invoked according to the schedule. It is then up to the service to perform all the actions required in a given integration process.

Python code

Our integration service will read as below:

# -*- coding: utf-8 -*-

# Zato
from zato.common.api import SMTPMessage
from zato.server.service import Service

class SyncData(Service):
    name = 'api.scheduler.sync'

    def handle(self):

        # Which REST outgoing connection to use
        rest_out_name = 'My Data Source'

        # Which SMTP connection to send an email through
        smtp_out_name = 'My SMTP'

        # Who the recipient of the email will be
        smtp_to = 'hello@example.com'

        # Who to put on CC
        smtp_cc = 'hello.cc@example.com'

        # Now, let's get the new data from a remote endpoint ..

        # .. get a REST connection by name ..
        rest_conn = self.out.plain_http[rest_out_name].conn

        # .. download newest data ..
        data = rest_conn.get(self.cid).text

        # .. construct a new e-mail message ..
        message = SMTPMessage()
        message.subject = 'New data'
        message.body = 'Check attached data'

        # .. add recipients ..
        message.to = smtp_to
        message.cc = smtp_cc

        # .. attach the new data to the message ..
        message.attach('my.data.txt', data)

        # .. get an SMTP connection by name ..
        smtp_conn = self.email.smtp[smtp_out_name].conn

        # .. send the e-mail message with newest data ..
        smtp_conn.send(message)

        # .. and now store the data in Redis.
        self.kvdb.conn.set('newest.data', data)

Now, we just need to make it run periodically in background.

Mind the timezone

In the next steps, we will use web-admin to configure new jobs for the scheduler.

Keep it mind that any date and time that you enter in web-admin is always interepreted to be in your web-admin user's timezone and this applies to the scheduler too - by default the timezone is UTC. You can change it by clicking Settings and picking the right timezone to make sure that the scheduled jobs run as expected.

It does not matter what timezone your Zato servers are in - they may be in different ones than the user that is configuring the jobs.

User settings

Endpoint definitions

First, let's use web-admin to define the endpoints that the service uses. Note that Redis does not need an explicit declaration because it is always available under "self.kvdb" in each service.

  • Configuring outgoing REST APIs

Outgoing REST connections menu

Outgoing REST connections form

  • Configuring SMTP e-mail

Outgoing SMTP e-mail connections menu

Outgoing SMTP e-mail connections form

Now, we can move on to the actual scheduler jobs.

Three types of jobs

To cover different integration needs, three types of jobs are available:

  • One-time - fires once only at a specific date and time and then never runs again
  • Interval-based - for periodic processes, can use any combination of weeks, days, hours, minutes and seconds for the interval
  • Cron-style - similar to interval-based but uses the syntax of Cron for its configuration

Creating a new scheduler job

One-time

Select one-time if the job should not be repeated after it runs once.

Creating a new one-time scheduler job

Interval-based

Select interval-based if the job should be repeated periodically. Note that such a job will by default run indefinitely but you can also specify after how many times it should stop, letting you to express concepts such as "Execute once per hour but for the next seven days".

Creating a new interval-based scheduler job

Cron-style

Select cron-style if you are already familiar with the syntax of Cron or if you have some Cron tasks that you would like to migrate to Zato.

Creating a new Cron-style scheduler job

Running jobs manually

At times, it is convenient to run a job on demand, no matter what its schedule is and regardless of what type a particular job is. Web-admin lets you always execute a job directly. Simply find the job in the listing, click "Execute" and it will run immediately.

Extra context

It is very often useful to provide additional context data to a service that the scheduler runs - to achieve it, simply enter any arbitrary value in the "Extra" field when creating or an editing a job in web-admin.

Afterwards, that information will be available as self.request.raw_request in the service's handle method.

Reusability

There is nothing else required - all is done and the service will run in accordance with a job's schedule.

Yet, before concluding, observe that our integration service is completely reusable - there is nothing scheduler-specific in it despite the fact that we currently run it from the scheduler.

We could now invoke the service from command line. Or we could mount it on a REST, AMQP, WebSocket or trigger it from any other channel - exactly the same Python code will run in exactly the same fashion, without any new programming effort needed.

Enabling rate-limiting in Zato means that access to Zato-based APIs can be throttled per endpoint, user or service - including options to make limits apply to specific IP addresses only - and if limits are exceeded within a selected period of time, the invocation will fail. Let's check how to use it all.

Where and when limits apply

Rate-limiting aware objects in  Zato

API rate limiting works on several levels and the configuration is always checked in the order below, which follows from the narrowest, most specific parts of the system (endpoints), through users which may apply to multiple endpoints, up to services which in turn may be used by both multiple endpoints and users.

  • First, per-endpoint limits
  • Then, per-user limits
  • Finally, per-service limits

When a request arrives through an endpoint, that endpoint's rate limiting configuration is checked. If the limit is already reached for the IP address or network of the calling application, the request is rejected.

Next, if there is any user associated with the endpoint, that account's rate limits are checked in the same manner and, similarly, if they are reached, the request is rejected.

Finally, if the endpoint's underlying service is configured to do so, it also checks if its invocation limits are not exceeded, rejecting the message accordingly if they are.

Note that the three levels are distinct yet they overlap in what they allow one to achieve.

For instance, it is possible to have the same user credentials be used in multiple endpoints and express ideas such as "Allow this and that user to invoke my APIs 1,000 requests/day but limit each endpoint to at most 5 requests/minute no matter which user".

Moreover, because limits can be set on services, it is possible to make it even more flexible, e.g. "Let this service be invoked at most 10,000 requests/hour, no matter which user it is, with particular users being able to invoke at most 500 requests/minute, no matter which service, topping it off with per separate limits for REST vs. SOAP vs. JSON-RPC endpoint, depending on what application is invoke the endpoints". That lets one conveniently express advanced scenarios that often occur in practical situations.

Also, observe that API rate limiting applies to REST, SOAP and JSON-RPC endpoints only, it is not used with other API endpoints, such as AMQP, IBM MQ, SAP, task scheduler or any other technologies. However, per-service limits work no matter which endpoint the service is invoked with and they will work with endpoints such as WebSockets, ZeroMQ or any other.

Lastly, limits pertain to with incoming requests only - any outgoing ones, from Zato to external resources - are not covered by it.

Per-IP restrictions

The architecture is made even more versatile thanks to the fact that for each object - endpoint, user or service - different limits can be configured depending on the caller's IP address.

This adds yet another dimension and allows to express ideas commonly witnessed in API-based projects, such as:

  • External applications, depending on their IP addresses, can have their own limits
  • Internal users, e.g. employees of the company using VPN, may have hire limits if their addresses are in the 172.x.x.x range
  • For performance testing purposes, access to Zato from a few selected hosts may have no limits at all

IP-based limits work hand in hand are an integral part of the mechanism - they do not rule out per-endpoit, user or service limits. In fact, for each such object, multiple IP-using limits can be set independently, thus allowing for highest degree of flexibility.

Exact or approximate

Rate limits come in two types:

  • Exact
  • Approximate

Exact rate limits are just that, exact - they en that a limit is not exceeded at all, not even by a single request.

Approximate limits may let a very small number of requests to exceed the limit with the benefit being that approximate limits are faster to check than exact ones.

When to use which type depends on a particular project:

  • In some projects, it does not really matter if callers have a limit of 1,000 requests/minute or 1,005 requests/minute because the difference is too tiny to make a business impact. Approximate limits work best in this case.

  • In other projects, there may be requirements that the limit never be exceeded no matter the circumstances. Use exact limits here.

Python code and web-admin

Alright, let's check how to define the limits in Zato web-admin. We will use the sample service below:

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class Sample(Service):
    name = 'api.sample'

    def handle(self):

        # Return a simple string on response
        self.response.payload = 'Hello there!\n'

Now, in web-admin, we will configure limits - separately for the service, a new and a new REST API channel (endpoint).

Configuring rate limits for service

Configuring rate limits for user

Configuring rate limits for user

Points of interest:

  • Configuration for each type of object is independent - within the same invocation some limits may be exact, some may be approximate
  • There can be multiple configuration entries for each object
  • A unit of time is "m", "h" or "d", depending on whether the limit is per minute, hour or day, respectively
  • All limits within the same configuration are checked in the order of their definition which is why the most generic ones should be listed first

Testing it out

Now, all is left is to invoke the service from curl.

As long as limits are not reached, a business response is returned:

$ curl http://my.user:password@localhost:11223/api/sample
Hello there!
$

But if a limit is reached, the caller receives an error message with the 429 HTTP status.

$ curl -v http://my.user:password@localhost:11223/api/sample
*   Trying 127.0.0.1...

...

< HTTP/1.1 429 Too Many Requests
< Server: Zato
< X-Zato-CID: b8053d68612d626d338b02

...

{"zato_env":{"result":"ZATO_ERROR","cid":"b8053d68612d626d338b02eb",
 "details":"Error 429 Too Many Requests"}}
$

Note that the caller never knows what the limit was - that information is saved in Zato server logs along with other details so that API authors can correlate what callers get with the very rate limiting definition that prevented them from accessing the service.

zato.common.rate_limiting.common.RateLimitReached: Max. rate limit of 100/m reached;
from:`10.74.199.53`, network:`*`; last_from:`127.0.0.1;
last_request_time_utc:`2020-11-22T15:30:41.943794;
last_cid:`5f4f1ef65490a23e5c37eda1`; (cid:b8053d68612d626d338b02)

And this is it - we have created a new API rate limiting definition in Zato and tested it out successfully!

Cloud-based connections are a staple of modern API integrations - this article shows how, in just a few lines of Python code, Zato makes their usage easy, convenient and extremely effective.

Architecture

Overview

As per the diagram, in this article we will integrate REST and FTP based resources with Dropbox but it needs to be emphasised that the exactly same code would work with other protocols.

REST and FTP are just the most popular ways to upload data that can be delivered to Dropbox but if the source files were made available via AMQP, SAP or other applications - everything would the same.

Similarly, in place of Dropbox we could use services based on AWS, Azure, OpenStack or other cloud providers - the same logic, approach and patterns would continue to work.

Also, speaking of Dropbox, for simplicity we will focus on file uploads here but the full Drobpox API is available to Zato services, so anything that Dropbox offers is available in Zato too.

Layers

We will use two layers in the solution:

  • Channel services - receive data from external resources (here, REST and FTP) and deliver it to the Dropbox connector
  • Dropbox connector - receives data from channels and uploads it to Dropbox

The separation of concerns lets us easily add new channels as needs arise without having to modify the layer that connects to Dropbox.

In this way, the solution can be extended at any time - if at one day we need to add SFTP, no changes to any already existing part will be required.

First, we will create a Dropbox connector into which we can plug channel services.

Dropbox connector - web-admin

Let's create a new Dropbox connection definition in Zato web-admin first. Click Cloud -> Dropbox and fill out the form as below, remembering to click "Change token" afterwards.

Note that the "User agent field" is required - this is part of metadata that Dropbox will accept. You can use it, for instance, to indicate whether you are connecting to Dropbox from a test vs. production environment.

Web-admin menu - Dropbox

Web-admin - Dropbox connection creation form

Dropbox connector - Python

And Here is the Python code that acts as the Dropbox connector. Note a few interesting points:

  • It does not care where its input comes from. It just receives data. This is crucial because it means we can add any kind of a channel and the actual connector will continue to work without any interruptions.

  • The connector focuses on business functionality only - it is only web-admin that specifies what the connection details are, e.g. the connector itself just sends data and does not even deals with details as low-level as security tokens.

  • The underlying client object is an instance of dropbox.Dropbox from the official Python SDK

After hot-deploying the file, the service will be available as api.connector.dropbox.

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

# Code completion imports
if 0:
    from dropbox import Dropbox
    from zato.server.generic.api.cloud_dropbox import CloudDropbox

class DropboxConnector(Service):
    """ Receives data to be uploaded to Dropbox.
    """
    name = 'api.connector.dropbox'

    class SimpleIO:
        input_required = 'file_name', 'data'

    def handle(self):

        # Connection to use
        conn_name = 'My Connection'

        # Get the connection object
        conn = self.cloud.dropbox[conn_name].conn # type: CloudDropbox

        # Get the underlying Dropbox client
        client = conn.client # type: Dropbox

        # Upload the file received
        client.files_upload(self.request.input.data, self.request.input.name)

REST channel

Now, let's add a service to accept REST-based file transfers - it will be a thin layer that will extract data from the HTTP payload to hand it over to the already existing connector service. The service can be added in the same file as the connector or it can be a separate file, it is up to you, it will work in the same. Below, it is in its own file.

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class APIChannelRESTUpload(Service):
    """ Receives data to be uploaded to Dropbox.
    """
    name = 'api.channel.rest.upload'

    def handle(self):

        # File name as a query parameter
        file_name = self.request.http.params.file_name

        # The whole data uploaded
        data = self.request.raw_request

        # Invoke the Dropbox connector with our input
        self.invoke('api.connector.dropbox', {
            'file_name': file_name,
            'data': data
        })

Having uploaded the REST channel service, we need to create an actual REST channel for it. In web-admin, go to Connections -> Channels -> REST and fill out the form.

Web-admin menu - Dropbox

Web-admin - Dropbox connection creation form

First tests

At this point, we can already test it all out. Let's use curl to POST data to Zato. Afterwards, we confirm in Dropbox that a new file was created as expected.

Note that we use POST to send the input file which is why we need the file_name query parameter too.

$ curl -XPOST --data-binary \
    @/path/to/my-file.txt \
    http://localhost:11223/api/upload?file_name="my-file.txt"

Web-admin - Dropbox listing with a file uploaded

FTP connection definition

Having made sure that the connector delivers its files through REST, let's focus on FTP, first creating a new FTP connection definition in web-admin.

Web-admin menu - FTP

Web-admin - FTP connection creation form

FTP service

We need some Python code now - it will connect to the FTP server, list all files in a specific directory and send them all to the Dropbox connector.

The connector will not even notice that the files do not come from REST this time, it will simply accept them on input like previously.

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class APIChannelSchedulerUpload(Service):
    """ Receives data to be uploaded to Dropbox.
    """
    name = 'api.channel.scheduler.upload'

    def handle(self):

        # Get a handle to an FTP connection
        conn = self.outgoing.ftp.get('My FTP Connection')

        # Directory to find the files in
        file_dir = '/'

        # List all files ..
        for file_name in conn.listdir(file_dir):

            # .. construct the full path ..
            full_path = os.path.join(file_dir, file_name)

            # .. download each file ..
            data = conn.getbytes(full_path)

            # .. send it to the Dropbox connector ..
            self.invoke('api.connector.dropbox', {
                'file_name': file_name,
                'data': data
            })

            # .. and delete the file from the FTP server.
            conn.remove(full_path)

More tests

We have an FTP connection and we have a service - after uploading some files to the FTP server, we can test the new service now using web-admin. Navigate to Services -> List services -> Choose "api.channel.scheduler.upload" -> Invoker and click Submit.

This will invoke the service directly, without a need for any channel. Next, we can list recent additions in Dropbox to confirm that the file we were uploaded which means that the service connected to FTP, the files were downloaded and the Dropbox connector delivered them successfully.

Web-admin - Dropbox listing with several files uploaded

Scheduler

Invoker the service from web-admin is good but we would like to automate the process of transferring data from FTP to Dropbox - for that, we will create a new job in the scheduler. In web-admin, go to Scheduler and create a new interval-based job.

Note, however, that the job's start date will be sent to the scheduler using your user's preferred timezone. By default it is set to UTC so make sure that you set it to another one if your current timezone is not UTC - go to Settings and pick the correct timezone.

Web-admin - User settings

Web-admin - Change one's timezone

Now, on to the creation of a new job. Note that if this was a real integration project, the interval would be probably set to a more realistic one, e.g. if you batch transfer PDF invoices then doing it once a minute or twice an hour would probably suffice.

Web-admin menu - Scheduler

Web-admin - Scheduler, creating a new job

Going even further

We have just concluded the process - in a few steps we connected REST, FTP and Dropbox. Moreover, it was done in an extensible way. Should a business need arise, there is nothing preventing us from adding more data sources.

Not only that, if one day we need to add more data sinks, e.g. S3 or SQL, we could it in the sam easy way. Or we could publish messages to topics and guaranteed delivery queues for Zato to manage the whole delivery life-cycle, there are no limits whatsoever.

This integration example was but a tiny part of what Zato is capable of. To learn more about the platform - do visit the documentation and read the friendly tutorial that will get you started in no time.

Zato and Mac logo

The next Zato release will offer a native Mac installer while for now an installation from source is needed - read on for details on how to set up Zato today using Homebrew.

The fundamental idea behind supporting non-Linux environments is that of making it easier for developers to work on their API services before the code is shipped to Linux test and production environments. That is, Linux is the final destination for code but that should not prevent one from using a non-Linux system during development.

One aspect to keep in mind is that the Mac version is still a technology preview - it is not a stable release yet and there may be some changes to core Zato before the final release is published. Be sure to keep your source updated.

Source installation

Make sure that you have Mac High Sierra 10.3 or newer

% sw_vers -productVersion
10.13.6
%

If it is not available in the system yet, install Homebrew

% brew -v
Homebrew 2.4.13-67-gf943af3
Homebrew/homebrew-core (git revision 5a20f; last commit 2020-08-23)
%

Clone Zato from GitHub:

% git clone https://github.com/zatosource/zato && cd ./zato

Run the Zato source installer as below - it is the only command needed and the resulting Zato binaries will be kept in the same directory that you cloned the repository to

% ./code/install.sh -p python3

The installation may take 10-20 minutes, depending on your Internet connection and whether additional Homebrew packages need to be installed or not

After the installer finishes, confirm the Zato version installed

% ./code/bin/zato --version
Zato 3.1+rev.078992724-py3.8.5
%

Creating a quickstart environment

Prepare an empty directory

% mkdir -p ~/env-qs1

Create a quickstart cluster

% ./code/bin/zato quickstart create ~/env/qs-1/ sqlite localhost 6379 --servers 1

The command will quickly set up a fully working Zato environment in your ~/env/qs-1 directory - this is a complete cluster, with a server, load-balancer, scheduler and web-admin GUI, all ready to use.

A few screenshots

Screenshot

Screenshot

This is it. If you have not done it before, you are ready to follow the main tutorial using your Mac now!

Zato startup callable objects are a means through which arbitrary Python functions or classes can be invoked when a server is booting up in order to influence its configuration or setup, even before any service is invoked.

This technique offers customisation possibilities in addition to the standard configuration options set before a server starts - read on to learn details.

Startup lifecycle

When a Zato server is starting, it goes through several stages. Most of them are internal details but two of them are exposed to users. Their names are FS_CONFIG_ONLY and AFTER_STARTED.

  • FS_CONFIG_ONLY - one of the earliest stages during which only server configuration files exist.

    No connections to any resources are opened yet, no services are deployed, the server does even have a connection to its configuration databases established - the only piece of configuration in existence are server config files.

    During this stage it is possible to make broad, low-level changes to configuration, such as reading configuration options from external endpoints, as in the first example below.

  • AFTER_STARTED - one of the latest stages in the process during which all of the configuration is already read in.

    All services are deployed and most HTTP-based connections are started. Notably, connections started in connectors, e.g. Odoo or WebSockets may be not started yet.

    This stage is useful if access to services is needed, as in the second example which invokes an internal service to create a new cache definition.

How to configure startup callable objects

A startup callable is a regular Python function or a Python class placed on a server's PYTHONPATH. For instance, if Zato is installed to /opt/zato/current, placing a Python module my_startup.py in, depending on one's Zato version, /opt/zato/current/extlib or /opt/zato/current/zato_extra_paths will make it appear on PYTHONPATH.

Next, add a dotted name of the callable to server.conf, under the [misc] stanza. For instance, if a function called "update_fs_config" is in a module named "my_startup.py", the configuration will look like below:

[misc]
...
startup_callable=my_startup.create_cache
...

Note that key "startup_callable" can be a comma-separated list of callable objects to invoke, such as:

[misc]
...
startup_callable=my_startup.update_fs_config, my_startup.create_cache
...

Each callable will be invoked in the order defined in the key and each will need to decide whether it is interested in a particular phase or not.

Python code

The only part left now is to create a callable - this can be anything in Python that can be invoked and the example below uses two functions, already registered in server.conf above.

The code consists of the usual imports, including ones for type completion hints, followed by actual callable functions.

Note that each callable is given a PhaseCtx object on input - this is an object that describes the current stage, the current phase the server is going through.

A callable will filter out stages it is not interested, in this case, each callable is interested in a single one only but there is nothing preventing a callable to handle more than one.

Note that there are no limits to what a callable can do - it has access to all the Python libraries that a server has on PYTHONPATH, and, depending on the stage, to all the services already deployed, all of which means that there are virtually no limits to customisation choices of the startup process.

# -*- coding: utf-8 -*-

# Requests
import requests

# Zato
from zato.common import CACHE, SERVER_STARTUP
from zato.common.util import as_bool

# Type completion imports
if 0:
    from zato.server.base.parallel import ParallelServer
    from zato.server.startup_callable import PhaseCtx

    ParallelServer = ParallelServer
    PhaseCtx = PhaseCtx

def update_fs_config(ctx):
    """ This callable has access to only to file-system based configuration files.
    """
    # type: (PhaseCtx)

    if ctx.phase == SERVER_STARTUP.PHASE.FS_CONFIG_ONLY:

        # server.conf
        server_config = ctx.kwargs['server_config']

        # pickup.conf
        pickup_config = ctx.kwargs['pickup_config']

        # simple-io.conf
        sio_config = ctx.kwargs['sio_config']

        # sso.conf
        sso_config = ctx.kwargs['sso_config']

        # Base directory the server was installed to
        base_dir = ctx.kwargs['base_dir']

        # Consult a remote resource and check
        # if Cassandra connections should be enabled
        response = requests.get('https://example.com/')

        # Convert to a bool object
        is_enabled = as_bool(response.text)

        # Update the server configuration in place
        server_config.component_enabled.cassandara = is_enabled

def create_cache(ctx):
    """ This is the callable that is invoked when the server is started.
    """
    # type: (PhaseCtx)

    if ctx.phase == SERVER_STARTUP.PHASE.AFTER_STARTED:

        # Server object where all our services are deployed to already
        server = ctx.kwargs['server'] # type: ParallelServer

        # Service to invoke
        service = 'zato.cache.builtin.create'

        # Input data to the service
        request = {
            'cluster_id': server.cluster_id,
            'name': 'my.cache',
            'is_active': True,
            'is_default': False,
            'max_size': 10000,
            'max_item_size': 10000,
            'extend_expiry_on_get': True,
            'extend_expiry_on_set': True,
            'sync_method': CACHE.SYNC_METHOD.IN_BACKGROUND.id,
            'persistent_storage': CACHE.PERSISTENT_STORAGE.NO_PERSISTENT_STORAGE.id,
            'cache_type': CACHE.TYPE.BUILTIN,
        }

        # Create a cache object by invoke the service
        server.invoke(service, request)