#!/usr/bin/env python3 """ UniFi Network Monitor Comprehensive monitoring for UniFi Controller """ import requests import urllib3 import json import os from datetime import datetime, timedelta # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class UniFiMonitor: def __init__(self, controller_url, username, password, site_id="default"): self.controller_url = controller_url self.username = username self.password = password self.site_id = site_id self.cookies = None self.authenticated = False def login(self): """Authenticate with UniFi Controller""" url = f"{self.controller_url}/api/auth/login" data = { "username": self.username, "password": self.password } try: response = requests.post(url, json=data, verify=False, timeout=10) response.raise_for_status() self.cookies = response.cookies self.authenticated = True return True except Exception as e: print(f"[ERROR] Login failed: {e}") return False def get_clients(self): """Get all connected clients""" if not self.authenticated: return [] url = f"{self.controller_url}/proxy/network/api/s/{self.site_id}/stat/sta" try: response = requests.get(url, cookies=self.cookies, verify=False, timeout=10) response.raise_for_status() return response.json().get('data', []) except Exception as e: print(f"[ERROR] Failed to get clients: {e}") return [] def get_devices(self): """Get all UniFi devices (APs, switches, gateways)""" if not self.authenticated: return [] url = f"{self.controller_url}/proxy/network/api/s/{self.site_id}/stat/device" try: response = requests.get(url, cookies=self.cookies, verify=False, timeout=10) response.raise_for_status() return response.json().get('data', []) except Exception as e: print(f"[ERROR] Failed to get devices: {e}") return [] def get_health(self): """Get network health status""" if not self.authenticated: return {} url = f"{self.controller_url}/proxy/network/api/s/{self.site_id}/stat/health" try: response = requests.get(url, cookies=self.cookies, verify=False, timeout=10) response.raise_for_status() return response.json().get('data', {}) except Exception as e: print(f"[ERROR] Failed to get health: {e}") return {} def get_alerts(self, hours=24): """Get recent alerts""" if not self.authenticated: return [] url = f"{self.controller_url}/proxy/network/api/s/{self.site_id}/stat/event" try: response = requests.get(url, cookies=self.cookies, verify=False, timeout=10) response.raise_for_status() events = response.json().get('data', []) # Filter alerts from last N hours cutoff = datetime.now() - timedelta(hours=hours) alerts = [] for event in events: if event.get('subsystem') in ['wlan', 'wan', 'lan']: event_time = datetime.fromtimestamp(event.get('time', 0) / 1000) if event_time > cutoff: alerts.append(event) return alerts[:10] # Last 10 alerts except Exception as e: print(f"[ERROR] Failed to get alerts: {e}") return [] def get_bandwidth_stats(self): """Get bandwidth statistics""" devices = self.get_devices() total_rx = 0 total_tx = 0 for device in devices: stats = device.get('stat', {}) total_rx += stats.get('rx_bytes', 0) total_tx += stats.get('tx_bytes', 0) # Convert to GB total_rx_gb = total_rx / (1024**3) total_tx_gb = total_tx / (1024**3) return { 'rx_gb': round(total_rx_gb, 2), 'tx_gb': round(total_tx_gb, 2), 'total_gb': round(total_rx_gb + total_tx_gb, 2) } def generate_report(self, output_format="text"): """Generate comprehensive network report""" if not self.login(): return "[ERROR] Failed to authenticate with UniFi Controller" # Collect data clients = self.get_clients() devices = self.get_devices() health = self.get_health() alerts = self.get_alerts(hours=24) bandwidth = self.get_bandwidth_stats() report = { 'timestamp': datetime.now().isoformat(), 'summary': { 'connected_clients': len(clients), 'total_devices': len(devices), 'alerts_24h': len(alerts), 'wan_status': 'Unknown' }, 'clients': clients, 'devices': devices, 'bandwidth': bandwidth, 'alerts': alerts } if output_format == "json": return json.dumps(report, indent=2) # Text format text_report = f"""# UniFi Network Report - {datetime.now().strftime('%Y-%m-%d %H:%M')} ## [STATS] Network Overview | Metric | Value | |--------|-------| | Connected Clients | {len(clients)} | | UniFi Devices | {len(devices)} | | WAN Status | {health.get('wan', {}).get('status', 'Unknown')} | | Alerts (24h) | {len(alerts)} | ## [NET] Bandwidth Usage (All Time) | Direction | Usage | |-----------|-------| | Download (RX) | {bandwidth['rx_gb']} GB | | Upload (TX) | {bandwidth['tx_gb']} GB | | **Total** | **{bandwidth['total_gb']} GB** | ## [CLIENTS] Connected Clients """ # Group clients by connection type wired = [] wireless = [] for client in clients: if client.get('is_wired', False): wired.append(client) else: wireless.append(client) if wireless: text_report += f"### [WIFI] Wireless ({len(wireless)} clients)\n\n" for idx, client in enumerate(wireless[:10], 1): name = client.get('name', client.get('hostname', 'Unknown')) ip = client.get('ip', 'N/A') mac = client.get('mac', 'N/A') signal = client.get('signal', 0) ap_name = client.get('ap_name', 'Unknown AP') text_report += f"{idx}. **{name}**\n" text_report += f" - IP: {ip} | MAC: {mac}\n" text_report += f" - Signal: {signal} dBm | AP: {ap_name}\n\n" if wired: text_report += f"### [WIRED] Wired ({len(wired)} clients)\n\n" for idx, client in enumerate(wired[:5], 1): name = client.get('name', client.get('hostname', 'Unknown')) ip = client.get('ip', 'N/A') mac = client.get('mac', 'N/A') switch = client.get('sw_name', 'Unknown Switch') text_report += f"{idx}. **{name}** - {ip}\n" # Network Devices text_report += f"\n## [DEVICES] UniFi Devices ({len(devices)} total)\n\n" for device in devices[:10]: name = device.get('name', device.get('mac', 'Unknown')) model = device.get('model', 'Unknown') status = "[OK] Online" if device.get('state') == 1 else "[DOWN] Offline" uptime = device.get('uptime', 0) uptime_str = f"{uptime // 86400}d {(uptime % 86400) // 3600}h" text_report += f"- **{name}** ({model}) - {status} - Uptime: {uptime_str}\n" # Alerts if alerts: text_report += f"\n## [ALERTS] Recent Alerts ({len(alerts)} in 24h)\n\n" for alert in alerts[:5]: msg = alert.get('msg', 'Unknown alert') time = datetime.fromtimestamp(alert.get('time', 0) / 1000).strftime('%H:%M') text_report += f"- [{time}] {msg}\n" else: text_report += "\n## [OK] No Alerts (24h)\n\n" return text_report def main(): """Main entry point for cron jobs""" # Load credentials from environment or config controller_url = os.getenv('UNIFI_URL', 'https://192.168.0.39:8443') username = os.getenv('UNIFI_USER', 'corey') password = os.getenv('UNIFI_PASS', 'is41945549') monitor = UniFiMonitor(controller_url, username, password) report = monitor.generate_report(output_format="text") print(report) return report if __name__ == "__main__": main()