Schedule a demo

Palo Alto integration examples

Panorama's API returns XML responses and uses an async job pattern for queries. You submit a query, poll for completion, then fetch results.

Querying logs

Submit a log query and handle the async job pattern.

# -*- coding: utf-8 -*-

import time
import xmltodict
from zato.server.service import Service

class GetTrafficLogs(Service):
    name = 'security.paloalto.traffic-logs'

    input = 'query', '-nlogs'

    def handle(self):

        # Connection defined in Dashboard
        conn = self.out.rest['Panorama'].conn

        # Default to 10 logs if not specified
        nlogs = self.request.input.nlogs or 10

        # Headers for authentication
        headers = {'Authorization': 'Basic YOUR_BASE64_CREDENTIALS'}

        # Submit the log query
        params = {
            'type': 'log',
            'log-type': 'traffic',
            'query': self.request.input.query,
            'nlogs': nlogs
        }

        response = conn.get(self.cid, params, headers=headers)
        result = xmltodict.parse(response.data)

        # Get the job ID
        job_id = result['response']['result']['job']

        # Poll until complete
        job_status = ''
        attempts = 0

        while job_status != 'FIN' and attempts < 40:
            time.sleep(3)
            attempts += 1

            status_params = {
                'type': 'log',
                'action': 'get',
                'job-id': job_id
            }

            status_response = conn.get(self.cid, status_params, headers=headers)
            status_result = xmltodict.parse(status_response.data)
            job_status = status_result['response']['result']['job']['status']

        if job_status != 'FIN':
            raise Exception('Query timed out')

        # Fetch the results
        logs = status_result['response']['result']['log']['logs']
        count = logs['@count']
        entries = logs.get('entry', [])

        self.response.payload = {
            'count': count,
            'entries': self._map_entries(entries)
        }

    def _map_entries(self, entries):

        if not entries:
            return []

        # Handle single entry (not a list)
        if not isinstance(entries, list):
            entries = [entries]

        return [
            {
                'action': e['action'],
                'app': e['app'],
                'src': e['src'],
                'dst': e['dst'],
                'port': e['dport'],
                'time': e['receive_time']
            }
            for e in entries
        ]

Threat monitoring

Query threat logs to detect security incidents.

# -*- coding: utf-8 -*-

import xmltodict
from zato.server.service import Service

class GetThreatLogs(Service):
    name = 'security.paloalto.threat-logs'

    input = '-query', '-nlogs'

    def handle(self):

        conn = self.out.rest['Panorama'].conn
        headers = {'Authorization': 'Basic YOUR_BASE64_CREDENTIALS'}

        # Default query for recent threats
        query = self.request.input.query or '(subtype neq file)'
        nlogs = self.request.input.nlogs or 50

        # Submit query
        params = {
            'type': 'log',
            'log-type': 'threat',
            'query': query,
            'nlogs': nlogs
        }

        response = conn.get(self.cid, params, headers=headers)
        result = xmltodict.parse(response.data)
        job_id = result['response']['result']['job']

        # Wait for completion (simplified - use polling in production)
        entries = self._wait_and_fetch(conn, job_id, headers)

        self.response.payload = {
            'threats': [
                {
                    'threat_id': e['threatid'],
                    'type': e['subtype'],
                    'action': e['action'],
                    'src': e['src'],
                    'dst': e['dst'],
                    'app': e['app'],
                    'time': e['receive_time']
                }
                for e in entries
            ]
        }

    def _wait_and_fetch(self, conn, job_id, headers):
        import time

        for _ in range(40):
            time.sleep(3)
            params = {'type': 'log', 'action': 'get', 'job-id': job_id}
            response = conn.get(self.cid, params, headers=headers)
            result = xmltodict.parse(response.data)

            if result['response']['result']['job']['status'] == 'FIN':
                entries = result['response']['result']['log']['logs'].get('entry', [])
                if not isinstance(entries, list):
                    entries = [entries] if entries else []
                return entries

        return []

URL logging

Track URL access patterns and policy violations.

# -*- coding: utf-8 -*-

from zato.server.service import Service

class GetUrlLogs(Service):
    name = 'security.paloalto.url-logs'

    input = '-query', '-nlogs'

    def handle(self):

        conn = self.out.rest['Panorama'].conn
        headers = {'Authorization': 'Basic YOUR_BASE64_CREDENTIALS'}

        query = self.request.input.query or ''
        nlogs = self.request.input.nlogs or 100

        # Fetch URL logs using the pattern above
        entries = self._query_logs(conn, 'url', query, nlogs, headers)

        self.response.payload = {
            'url_logs': [
                {
                    'url': e['misc'],
                    'category': e['url_category_list'],
                    'action': e['action'],
                    'src': e['src'],
                    'dst': e['dst'],
                    'app': e['app'],
                    'time': e['receive_time']
                }
                for e in entries
            ]
        }

    def _query_logs(self, conn, log_type, query, nlogs, headers):
        import time
        import xmltodict

        params = {
            'type': 'log',
            'log-type': log_type,
            'query': query,
            'nlogs': nlogs
        }

        response = conn.get(self.cid, params, headers=headers)
        result = xmltodict.parse(response.data)
        job_id = result['response']['result']['job']

        for _ in range(40):
            time.sleep(3)
            status_params = {'type': 'log', 'action': 'get', 'job-id': job_id}
            response = conn.get(self.cid, status_params, headers=headers)
            result = xmltodict.parse(response.data)

            if result['response']['result']['job']['status'] == 'FIN':
                entries = result['response']['result']['log']['logs'].get('entry', [])
                if not isinstance(entries, list):
                    entries = [entries] if entries else []
                return entries

        return []

Log entry mappers

Create reusable mappers for different log types.

# -*- coding: utf-8 -*-

class TrafficLogMapper:
    def map(self, entry):
        return {
            'action': entry['action'],
            'app': entry['app'],
            'src': entry['src'],
            'dst': entry['dst'],
            'port': entry['dport'],
            'from_zone': entry['from'],
            'to_zone': entry['to'],
            'decrypted': entry['flag-proxy'],
            'device': entry['device_name'],
            'time': entry['receive_time']
        }


class ThreatLogMapper:
    def map(self, entry):
        return {
            'threat_id': entry['threatid'],
            'type': entry['subtype'],
            'action': entry['action'],
            'src': entry['src'],
            'dst': entry['dst'],
            'port': entry['dport'],
            'app': entry['app'],
            'device': entry['device_name'],
            'time': entry['receive_time']
        }


class DecryptionLogMapper:
    def map(self, entry):
        return {
            'cn': entry.get('cn'),
            'sni': entry.get('sni'),
            'error': entry['error'],
            'src': entry['src'],
            'dst': entry['dst'],
            'port': entry['dport'],
            'app': entry['app'],
            'device': entry['device_name'],
            'time': entry['receive_time']
        }

Learn more