Panorama's API returns XML responses and uses an async job pattern for queries. You submit a query, poll for completion, then fetch results.
Submit a log query and handle the async job pattern.
# -*- coding: utf-8 -*-
import time
import xmltodict
from zato.server.service import Service
class GetTrafficLogs(Service):
name = 'security.paloalto.traffic-logs'
input = 'query', '-nlogs'
def handle(self):
# Connection defined in Dashboard
conn = self.out.rest['Panorama'].conn
# Default to 10 logs if not specified
nlogs = self.request.input.nlogs or 10
# Headers for authentication
headers = {'Authorization': 'Basic YOUR_BASE64_CREDENTIALS'}
# Submit the log query
params = {
'type': 'log',
'log-type': 'traffic',
'query': self.request.input.query,
'nlogs': nlogs
}
response = conn.get(self.cid, params, headers=headers)
result = xmltodict.parse(response.data)
# Get the job ID
job_id = result['response']['result']['job']
# Poll until complete
job_status = ''
attempts = 0
while job_status != 'FIN' and attempts < 40:
time.sleep(3)
attempts += 1
status_params = {
'type': 'log',
'action': 'get',
'job-id': job_id
}
status_response = conn.get(self.cid, status_params, headers=headers)
status_result = xmltodict.parse(status_response.data)
job_status = status_result['response']['result']['job']['status']
if job_status != 'FIN':
raise Exception('Query timed out')
# Fetch the results
logs = status_result['response']['result']['log']['logs']
count = logs['@count']
entries = logs.get('entry', [])
self.response.payload = {
'count': count,
'entries': self._map_entries(entries)
}
def _map_entries(self, entries):
if not entries:
return []
# Handle single entry (not a list)
if not isinstance(entries, list):
entries = [entries]
return [
{
'action': e['action'],
'app': e['app'],
'src': e['src'],
'dst': e['dst'],
'port': e['dport'],
'time': e['receive_time']
}
for e in entries
]
Query threat logs to detect security incidents.
# -*- coding: utf-8 -*-
import xmltodict
from zato.server.service import Service
class GetThreatLogs(Service):
name = 'security.paloalto.threat-logs'
input = '-query', '-nlogs'
def handle(self):
conn = self.out.rest['Panorama'].conn
headers = {'Authorization': 'Basic YOUR_BASE64_CREDENTIALS'}
# Default query for recent threats
query = self.request.input.query or '(subtype neq file)'
nlogs = self.request.input.nlogs or 50
# Submit query
params = {
'type': 'log',
'log-type': 'threat',
'query': query,
'nlogs': nlogs
}
response = conn.get(self.cid, params, headers=headers)
result = xmltodict.parse(response.data)
job_id = result['response']['result']['job']
# Wait for completion (simplified - use polling in production)
entries = self._wait_and_fetch(conn, job_id, headers)
self.response.payload = {
'threats': [
{
'threat_id': e['threatid'],
'type': e['subtype'],
'action': e['action'],
'src': e['src'],
'dst': e['dst'],
'app': e['app'],
'time': e['receive_time']
}
for e in entries
]
}
def _wait_and_fetch(self, conn, job_id, headers):
import time
for _ in range(40):
time.sleep(3)
params = {'type': 'log', 'action': 'get', 'job-id': job_id}
response = conn.get(self.cid, params, headers=headers)
result = xmltodict.parse(response.data)
if result['response']['result']['job']['status'] == 'FIN':
entries = result['response']['result']['log']['logs'].get('entry', [])
if not isinstance(entries, list):
entries = [entries] if entries else []
return entries
return []
Track URL access patterns and policy violations.
# -*- coding: utf-8 -*-
from zato.server.service import Service
class GetUrlLogs(Service):
name = 'security.paloalto.url-logs'
input = '-query', '-nlogs'
def handle(self):
conn = self.out.rest['Panorama'].conn
headers = {'Authorization': 'Basic YOUR_BASE64_CREDENTIALS'}
query = self.request.input.query or ''
nlogs = self.request.input.nlogs or 100
# Fetch URL logs using the pattern above
entries = self._query_logs(conn, 'url', query, nlogs, headers)
self.response.payload = {
'url_logs': [
{
'url': e['misc'],
'category': e['url_category_list'],
'action': e['action'],
'src': e['src'],
'dst': e['dst'],
'app': e['app'],
'time': e['receive_time']
}
for e in entries
]
}
def _query_logs(self, conn, log_type, query, nlogs, headers):
import time
import xmltodict
params = {
'type': 'log',
'log-type': log_type,
'query': query,
'nlogs': nlogs
}
response = conn.get(self.cid, params, headers=headers)
result = xmltodict.parse(response.data)
job_id = result['response']['result']['job']
for _ in range(40):
time.sleep(3)
status_params = {'type': 'log', 'action': 'get', 'job-id': job_id}
response = conn.get(self.cid, status_params, headers=headers)
result = xmltodict.parse(response.data)
if result['response']['result']['job']['status'] == 'FIN':
entries = result['response']['result']['log']['logs'].get('entry', [])
if not isinstance(entries, list):
entries = [entries] if entries else []
return entries
return []
Create reusable mappers for different log types.
# -*- coding: utf-8 -*-
class TrafficLogMapper:
def map(self, entry):
return {
'action': entry['action'],
'app': entry['app'],
'src': entry['src'],
'dst': entry['dst'],
'port': entry['dport'],
'from_zone': entry['from'],
'to_zone': entry['to'],
'decrypted': entry['flag-proxy'],
'device': entry['device_name'],
'time': entry['receive_time']
}
class ThreatLogMapper:
def map(self, entry):
return {
'threat_id': entry['threatid'],
'type': entry['subtype'],
'action': entry['action'],
'src': entry['src'],
'dst': entry['dst'],
'port': entry['dport'],
'app': entry['app'],
'device': entry['device_name'],
'time': entry['receive_time']
}
class DecryptionLogMapper:
def map(self, entry):
return {
'cn': entry.get('cn'),
'sni': entry.get('sni'),
'error': entry['error'],
'src': entry['src'],
'dst': entry['dst'],
'port': entry['dport'],
'app': entry['app'],
'device': entry['device_name'],
'time': entry['receive_time']
}