Files
openclaw-workspace/skills/unifi/unifi_monitor.py
2026-04-11 09:45:12 -05:00

249 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
UniFi Network Monitor
Comprehensive monitoring for UniFi Controller
"""
import requests
import urllib3
import json
import os
from datetime import datetime, timedelta
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class UniFiMonitor:
def __init__(self, controller_url, username, password, site_id="default"):
self.controller_url = controller_url
self.username = username
self.password = password
self.site_id = site_id
self.cookies = None
self.authenticated = False
def login(self):
"""Authenticate with UniFi Controller"""
url = f"{self.controller_url}/api/auth/login"
data = {
"username": self.username,
"password": self.password
}
try:
response = requests.post(url, json=data, verify=False, timeout=10)
response.raise_for_status()
self.cookies = response.cookies
self.authenticated = True
return True
except Exception as e:
print(f"[ERROR] Login failed: {e}")
return False
def get_clients(self):
"""Get all connected clients"""
if not self.authenticated:
return []
url = f"{self.controller_url}/proxy/network/api/s/{self.site_id}/stat/sta"
try:
response = requests.get(url, cookies=self.cookies, verify=False, timeout=10)
response.raise_for_status()
return response.json().get('data', [])
except Exception as e:
print(f"[ERROR] Failed to get clients: {e}")
return []
def get_devices(self):
"""Get all UniFi devices (APs, switches, gateways)"""
if not self.authenticated:
return []
url = f"{self.controller_url}/proxy/network/api/s/{self.site_id}/stat/device"
try:
response = requests.get(url, cookies=self.cookies, verify=False, timeout=10)
response.raise_for_status()
return response.json().get('data', [])
except Exception as e:
print(f"[ERROR] Failed to get devices: {e}")
return []
def get_health(self):
"""Get network health status"""
if not self.authenticated:
return {}
url = f"{self.controller_url}/proxy/network/api/s/{self.site_id}/stat/health"
try:
response = requests.get(url, cookies=self.cookies, verify=False, timeout=10)
response.raise_for_status()
return response.json().get('data', {})
except Exception as e:
print(f"[ERROR] Failed to get health: {e}")
return {}
def get_alerts(self, hours=24):
"""Get recent alerts"""
if not self.authenticated:
return []
url = f"{self.controller_url}/proxy/network/api/s/{self.site_id}/stat/event"
try:
response = requests.get(url, cookies=self.cookies, verify=False, timeout=10)
response.raise_for_status()
events = response.json().get('data', [])
# Filter alerts from last N hours
cutoff = datetime.now() - timedelta(hours=hours)
alerts = []
for event in events:
if event.get('subsystem') in ['wlan', 'wan', 'lan']:
event_time = datetime.fromtimestamp(event.get('time', 0) / 1000)
if event_time > cutoff:
alerts.append(event)
return alerts[:10] # Last 10 alerts
except Exception as e:
print(f"[ERROR] Failed to get alerts: {e}")
return []
def get_bandwidth_stats(self):
"""Get bandwidth statistics"""
devices = self.get_devices()
total_rx = 0
total_tx = 0
for device in devices:
stats = device.get('stat', {})
total_rx += stats.get('rx_bytes', 0)
total_tx += stats.get('tx_bytes', 0)
# Convert to GB
total_rx_gb = total_rx / (1024**3)
total_tx_gb = total_tx / (1024**3)
return {
'rx_gb': round(total_rx_gb, 2),
'tx_gb': round(total_tx_gb, 2),
'total_gb': round(total_rx_gb + total_tx_gb, 2)
}
def generate_report(self, output_format="text"):
"""Generate comprehensive network report"""
if not self.login():
return "[ERROR] Failed to authenticate with UniFi Controller"
# Collect data
clients = self.get_clients()
devices = self.get_devices()
health = self.get_health()
alerts = self.get_alerts(hours=24)
bandwidth = self.get_bandwidth_stats()
report = {
'timestamp': datetime.now().isoformat(),
'summary': {
'connected_clients': len(clients),
'total_devices': len(devices),
'alerts_24h': len(alerts),
'wan_status': 'Unknown'
},
'clients': clients,
'devices': devices,
'bandwidth': bandwidth,
'alerts': alerts
}
if output_format == "json":
return json.dumps(report, indent=2)
# Text format
text_report = f"""# UniFi Network Report - {datetime.now().strftime('%Y-%m-%d %H:%M')}
## [STATS] Network Overview
| Metric | Value |
|--------|-------|
| Connected Clients | {len(clients)} |
| UniFi Devices | {len(devices)} |
| WAN Status | {health.get('wan', {}).get('status', 'Unknown')} |
| Alerts (24h) | {len(alerts)} |
## [NET] Bandwidth Usage (All Time)
| Direction | Usage |
|-----------|-------|
| Download (RX) | {bandwidth['rx_gb']} GB |
| Upload (TX) | {bandwidth['tx_gb']} GB |
| **Total** | **{bandwidth['total_gb']} GB** |
## [CLIENTS] Connected Clients
"""
# Group clients by connection type
wired = []
wireless = []
for client in clients:
if client.get('is_wired', False):
wired.append(client)
else:
wireless.append(client)
if wireless:
text_report += f"### [WIFI] Wireless ({len(wireless)} clients)\n\n"
for idx, client in enumerate(wireless[:10], 1):
name = client.get('name', client.get('hostname', 'Unknown'))
ip = client.get('ip', 'N/A')
mac = client.get('mac', 'N/A')
signal = client.get('signal', 0)
ap_name = client.get('ap_name', 'Unknown AP')
text_report += f"{idx}. **{name}**\n"
text_report += f" - IP: {ip} | MAC: {mac}\n"
text_report += f" - Signal: {signal} dBm | AP: {ap_name}\n\n"
if wired:
text_report += f"### [WIRED] Wired ({len(wired)} clients)\n\n"
for idx, client in enumerate(wired[:5], 1):
name = client.get('name', client.get('hostname', 'Unknown'))
ip = client.get('ip', 'N/A')
mac = client.get('mac', 'N/A')
switch = client.get('sw_name', 'Unknown Switch')
text_report += f"{idx}. **{name}** - {ip}\n"
# Network Devices
text_report += f"\n## [DEVICES] UniFi Devices ({len(devices)} total)\n\n"
for device in devices[:10]:
name = device.get('name', device.get('mac', 'Unknown'))
model = device.get('model', 'Unknown')
status = "[OK] Online" if device.get('state') == 1 else "[DOWN] Offline"
uptime = device.get('uptime', 0)
uptime_str = f"{uptime // 86400}d {(uptime % 86400) // 3600}h"
text_report += f"- **{name}** ({model}) - {status} - Uptime: {uptime_str}\n"
# Alerts
if alerts:
text_report += f"\n## [ALERTS] Recent Alerts ({len(alerts)} in 24h)\n\n"
for alert in alerts[:5]:
msg = alert.get('msg', 'Unknown alert')
time = datetime.fromtimestamp(alert.get('time', 0) / 1000).strftime('%H:%M')
text_report += f"- [{time}] {msg}\n"
else:
text_report += "\n## [OK] No Alerts (24h)\n\n"
return text_report
def main():
"""Main entry point for cron jobs"""
# Load credentials from environment or config
controller_url = os.getenv('UNIFI_URL', 'https://192.168.0.39:8443')
username = os.getenv('UNIFI_USER', 'corey')
password = os.getenv('UNIFI_PASS', 'is41945549')
monitor = UniFiMonitor(controller_url, username, password)
report = monitor.generate_report(output_format="text")
print(report)
return report
if __name__ == "__main__":
main()