No programming is needed to consume messages from Kafka topics. Create a Kafka channel in Dashboard, point it to a topic, and a service of your choice will be invoked for each message received.
The message payload will be available in self.request.raw_request.
from zato.server.service import Service
class MyService(Service):
def handle(self):
# The raw Kafka message payload
data = self.request.raw_request
# Log what was received
self.logger.info('Kafka message: %s', data)
Create a Kafka outgoing connection in Dashboard and use self.kafka to publish messages.
The connection is looked up by name, and .send() accepts strings, bytes, dicts, lists or any other JSON-serializable object.
from zato.server.service import Service
class MyService(Service):
def handle(self):
# Get a connection by name
conn = self.kafka['my-publisher']
# Send a string
conn.send('Hello from Zato')
When you pass a dict or list, it is automatically serialized to JSON before publishing.
from zato.server.service import Service
class MyService(Service):
def handle(self):
conn = self.kafka['my-publisher']
conn.send({
'event': 'order.created',
'order_id': '12345',
'customer': 'Acme Corp',
})
A common pattern is to consume a message from one topic, process it and publish the result to another.
from zato.server.service import Service
class MyService(Service):
def handle(self):
# Incoming message from a Kafka channel
data = self.request.raw_request
self.logger.info('Processing: %s', data)
# Forward to another topic through a different outgoing connection
conn = self.kafka['processed-events']
conn.send(data)
Book a demo with an expert who will help you build meaningful systems that match your ambitions
"For me, Zato Source is the only technology partner to help with operational improvements."