196 lines
5.5 KiB
Python
196 lines
5.5 KiB
Python
"""Periodic latency monitor for controls network devices."""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Iterable, Optional
|
|
|
|
import mysql.connector
|
|
|
|
PING_TIMEOUT_MS = int(os.environ.get("LATENCY_PING_TIMEOUT_MS", "2000"))
|
|
POLL_INTERVAL_SECONDS = int(os.environ.get("LATENCY_POLL_INTERVAL", "15"))
|
|
DB_CONFIG = {
|
|
"host": os.environ.get("CONTROLS_DB_HOST", "localhost"),
|
|
"user": os.environ.get("CONTROLS_DB_USER", "corey"),
|
|
"password": os.environ.get("CONTROLS_DB_PASSWORD", "41945549"),
|
|
"database": os.environ.get("CONTROLS_DB_NAME", "controls"),
|
|
"port": int(os.environ.get("CONTROLS_DB_PORT", "3306")),
|
|
"autocommit": False,
|
|
"connection_timeout": 5, # Add this line
|
|
}
|
|
|
|
STATUS_UP = "up"
|
|
STATUS_DOWN = "down"
|
|
STATUS_UNKNOWN = "unknown"
|
|
|
|
|
|
@dataclass
|
|
class Device:
|
|
device_id: int
|
|
host: str
|
|
label: str
|
|
|
|
|
|
@dataclass
|
|
class PingResult:
|
|
device: Device
|
|
status: str
|
|
latency_ms: Optional[int]
|
|
checked_at: datetime
|
|
|
|
|
|
@contextmanager
|
|
def mysql_connection():
|
|
print("About to connect to MySQL...") # Debug
|
|
try:
|
|
connection = mysql.connector.connect(**DB_CONFIG)
|
|
print("Connected to MySQL!") # Debug
|
|
yield connection
|
|
except Exception as e:
|
|
print(f"Error connecting to MySQL: {e}", file=sys.stderr)
|
|
raise
|
|
finally:
|
|
try:
|
|
connection.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def fetch_devices(connection) -> Iterable[Device]:
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute(
|
|
"SELECT id, host, label FROM monitoring_devices WHERE is_active = 1 ORDER BY id"
|
|
)
|
|
for row in cursor:
|
|
yield Device(device_id=row["id"], host=row["host"], label=row["label"])
|
|
cursor.close()
|
|
|
|
|
|
def ping_device(host: str) -> tuple[str, Optional[int]]:
|
|
command = [
|
|
"ping",
|
|
"-n",
|
|
"1",
|
|
"-w",
|
|
str(PING_TIMEOUT_MS),
|
|
host,
|
|
]
|
|
completed = subprocess.run(
|
|
command,
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
output = completed.stdout + completed.stderr
|
|
|
|
if completed.returncode != 0:
|
|
return STATUS_DOWN, None
|
|
|
|
match = re.search(r"time[=<]([0-9]+)ms", output)
|
|
if match:
|
|
return STATUS_UP, int(match.group(1))
|
|
|
|
return STATUS_UNKNOWN, None
|
|
|
|
|
|
def record_latency(connection, result: PingResult) -> None:
|
|
cursor = connection.cursor()
|
|
cursor.execute(
|
|
(
|
|
"INSERT INTO monitoring_latency_log"
|
|
" (device_id, latency_ms, status, checked_at)"
|
|
" VALUES (%s, %s, %s, %s)"
|
|
),
|
|
(
|
|
result.device.device_id,
|
|
result.latency_ms,
|
|
result.status,
|
|
result.checked_at,
|
|
),
|
|
)
|
|
connection.commit()
|
|
cursor.close()
|
|
|
|
|
|
def monitor_once() -> None:
|
|
print("monitor_once() called") # Debug
|
|
try:
|
|
print("Opening DB connection...")
|
|
with mysql_connection() as connection:
|
|
print("DB connection opened.")
|
|
try:
|
|
devices = list(fetch_devices(connection))
|
|
print(f"Fetched {len(devices)} devices") # Add this line
|
|
except Exception as fetch_err:
|
|
print(f"Error fetching devices: {fetch_err}", file=sys.stderr)
|
|
return
|
|
|
|
if not devices:
|
|
print("No active devices found in monitoring_devices table.")
|
|
return
|
|
|
|
for device in devices:
|
|
print(f"Pinging device: {device.label} ({device.host})") # Debug
|
|
status, latency = ping_device(device.host)
|
|
result = PingResult(
|
|
device=device,
|
|
status=status,
|
|
latency_ms=latency,
|
|
checked_at=datetime.now(timezone.utc),
|
|
)
|
|
try:
|
|
record_latency(connection, result)
|
|
except Exception as log_err:
|
|
print(f"Error logging latency: {log_err}", file=sys.stderr)
|
|
latency_display = f"{latency} ms" if latency is not None else "n/a"
|
|
print(
|
|
f"[{result.checked_at.isoformat()}] {device.label}"
|
|
f" ({device.host}) -> {status} ({latency_display})"
|
|
)
|
|
except mysql.connector.Error as error:
|
|
print(f"MySQL error: {error}", file=sys.stderr)
|
|
except Exception as error: # pylint: disable=broad-except
|
|
print(f"Unexpected error: {error}", file=sys.stderr)
|
|
|
|
|
|
def run_forever() -> None:
|
|
stop_requested = False
|
|
|
|
def _signal_handler(signum, frame): # noqa: D401, ANN001, ANN202
|
|
nonlocal stop_requested
|
|
stop_requested = True
|
|
print(f"Received signal {signum}; shutting down...")
|
|
|
|
signal.signal(signal.SIGINT, _signal_handler)
|
|
signal.signal(signal.SIGTERM, _signal_handler)
|
|
|
|
while not stop_requested:
|
|
monitor_once()
|
|
if stop_requested:
|
|
break
|
|
time.sleep(POLL_INTERVAL_SECONDS)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Script started") # Debug
|
|
|
|
if not DB_CONFIG.get("user") or DB_CONFIG.get("password") is None:
|
|
print(
|
|
"Database credentials are not set."
|
|
" Provide CONTROLS_DB_USER and CONTROLS_DB_PASSWORD env vars.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
if os.environ.get("LATENCY_RUN_ONCE") == "1":
|
|
monitor_once()
|
|
else:
|
|
run_forever()
|