Files
2026-05-26 21:08:07 -06:00

153 lines
5.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.api.credentials import SNMP_CREDENTIAL_TYPE
from app.auth.dependencies import require_role
from app.core.secrets import decrypt_secret
from app.db.session import get_db
from app.models import Credential, User
from app.schemas.core import (
SnmpDiscoveredInterfaceRead,
SnmpDiscoveryItemRead,
SnmpDiscoveryRead,
SnmpDiscoveryRequest,
)
from app.services.snmp import DiscoveredSnmpDevice, SnmpCredential, SnmpDiscoveryError, discover_snmp_device
router = APIRouter(prefix="/discovery", tags=["discovery"])
@router.post("/snmp", response_model=SnmpDiscoveryRead)
def discover_snmp(
payload: SnmpDiscoveryRequest,
_: User = Depends(require_role("admin")),
db: Session = Depends(get_db),
) -> SnmpDiscoveryRead:
profile = db.get(Credential, payload.credential_profile_id)
if profile is None or profile.credential_type != SNMP_CREDENTIAL_TYPE:
raise HTTPException(status_code=404, detail="SNMP credential profile not found")
extra = dict(profile.extra or {})
version = str(extra.get("version") or "2c")
if version != "2c":
raise HTTPException(status_code=400, detail="SNMP discovery currently supports SNMPv2c profiles")
community = decrypt_secret(profile.encrypted_secret)
if not community:
raise HTTPException(status_code=400, detail="SNMP credential profile has no usable community string")
credential = SnmpCredential(
community=community,
port=int(extra.get("port") or 161),
timeout_seconds=int(extra.get("timeout_seconds") or 5),
retries=int(extra.get("retries") or 1),
)
try:
discovered = discover_snmp_device(payload.host, credential, asset_type=payload.asset_type)
except SnmpDiscoveryError as exc:
raise HTTPException(status_code=502, detail="SNMP discovery failed") from exc
return _discovery_to_read(payload.credential_profile_id, discovered)
def _discovery_to_read(credential_profile_id: int, discovered: DiscoveredSnmpDevice) -> SnmpDiscoveryRead:
interfaces = [
SnmpDiscoveredInterfaceRead(
index=interface.index,
name=interface.name,
label=interface.label,
description=interface.description,
admin_status=interface.admin_status,
oper_status=interface.oper_status,
speed_bps=interface.speed_bps,
)
for interface in discovered.interfaces
]
return SnmpDiscoveryRead(
host=discovered.host,
credential_profile_id=credential_profile_id,
profile_key=discovered.profile_key,
profile_name=discovered.profile_name,
capabilities=discovered.capabilities,
device_name=discovered.device_name,
description=discovered.description,
uptime_seconds=discovered.uptime_seconds,
interfaces=interfaces,
monitorable_items=_monitorable_items(discovered),
)
def _monitorable_items(discovered: DiscoveredSnmpDevice) -> list[SnmpDiscoveryItemRead]:
items = []
if discovered.uptime_seconds is not None:
items.append(
SnmpDiscoveryItemRead(
item_id="device.uptime",
item_type="device_uptime",
group="Device Health",
label="Device uptime",
unit="seconds",
current_value=_format_duration(discovered.uptime_seconds),
)
)
items.extend(
SnmpDiscoveryItemRead(
item_id=item.item_id,
item_type=item.item_type,
group=item.group,
label=item.label,
unit=item.unit,
current_value=item.current_value,
)
for item in discovered.health_items
)
for interface in discovered.interfaces:
group = f"Interface {interface.label}"
item_prefix = f"interface.{interface.index}"
items.extend(
[
SnmpDiscoveryItemRead(
item_id=f"{item_prefix}.status",
item_type="interface_status",
group=group,
label=f"{interface.label} status",
current_value=_interface_status_value(interface.admin_status, interface.oper_status),
),
SnmpDiscoveryItemRead(
item_id=f"{item_prefix}.traffic",
item_type="interface_traffic",
group=group,
label=f"{interface.label} traffic",
unit="bps",
current_value="Rate after first check",
),
SnmpDiscoveryItemRead(
item_id=f"{item_prefix}.errors",
item_type="interface_errors",
group=group,
label=f"{interface.label} errors and discards",
unit="count",
),
]
)
return items
def _format_duration(seconds: int | None) -> str | None:
if seconds is None:
return None
days = seconds // 86_400
hours = (seconds % 86_400) // 3_600
minutes = (seconds % 3_600) // 60
if days:
return f"{days}d {hours}h"
if hours:
return f"{hours}h {minutes}m"
return f"{minutes}m"
def _interface_status_value(admin_status: str | None, oper_status: str | None) -> str | None:
if admin_status and oper_status:
return f"admin {admin_status}, oper {oper_status}"
return admin_status or oper_status