222 lines
7.7 KiB
Python
222 lines
7.7 KiB
Python
from fastapi.testclient import TestClient
|
|
|
|
from app.core.secrets import encrypt_secret
|
|
from app.models import Credential
|
|
from app.services.snmp import (
|
|
ENT_PHYSICAL_NAME,
|
|
ENT_PHY_SENSOR_TYPE,
|
|
ENT_PHY_SENSOR_VALUE,
|
|
HR_PROCESSOR_LOAD,
|
|
HR_STORAGE_ALLOCATION_UNITS,
|
|
HR_STORAGE_DESCR,
|
|
HR_STORAGE_FIXED_DISK,
|
|
HR_STORAGE_RAM,
|
|
HR_STORAGE_SIZE,
|
|
HR_STORAGE_TYPE,
|
|
HR_STORAGE_USED,
|
|
SYS_DESCR,
|
|
SYS_NAME,
|
|
SYS_UPTIME,
|
|
DiscoveredSnmpDevice,
|
|
DiscoveredSnmpInterface,
|
|
SnmpCredential,
|
|
SnmpDiscoveryError,
|
|
discover_snmp_device,
|
|
)
|
|
|
|
|
|
def test_snmp_discovery_uses_profile_and_returns_friendly_results(client: TestClient, db_session, monkeypatch) -> None:
|
|
profile = Credential(
|
|
name="Core Switch",
|
|
credential_type="snmp",
|
|
encrypted_secret=encrypt_secret("private-community"),
|
|
extra={"version": "2c", "port": 1161, "timeout_seconds": 4, "retries": 2},
|
|
)
|
|
db_session.add(profile)
|
|
db_session.commit()
|
|
calls: list[tuple[str, SnmpCredential]] = []
|
|
|
|
def fake_discover(host: str, credential: SnmpCredential) -> DiscoveredSnmpDevice:
|
|
calls.append((host, credential))
|
|
return DiscoveredSnmpDevice(
|
|
host=host,
|
|
device_name="core-sw-1",
|
|
description="Core switch",
|
|
uptime_seconds=12345,
|
|
interfaces=[
|
|
DiscoveredSnmpInterface(
|
|
index=1,
|
|
name="Gi1/0/1",
|
|
description="Uplink",
|
|
admin_status="up",
|
|
oper_status="up",
|
|
speed_bps=1_000_000_000,
|
|
)
|
|
],
|
|
)
|
|
|
|
monkeypatch.setattr("app.api.discovery.discover_snmp_device", fake_discover)
|
|
|
|
response = client.post("/discovery/snmp", json={"host": "192.0.2.10", "credential_profile_id": profile.id})
|
|
|
|
assert response.status_code == 200
|
|
assert calls == [
|
|
(
|
|
"192.0.2.10",
|
|
SnmpCredential(community="private-community", port=1161, timeout_seconds=4, retries=2),
|
|
)
|
|
]
|
|
body = response.json()
|
|
assert body["host"] == "192.0.2.10"
|
|
assert body["credential_profile_id"] == profile.id
|
|
assert body["profile_key"] == "generic_snmp"
|
|
assert body["profile_name"] == "Generic SNMP"
|
|
assert body["capabilities"] == {}
|
|
assert body["device_name"] == "core-sw-1"
|
|
assert body["description"] == "Core switch"
|
|
assert body["uptime_seconds"] == 12345
|
|
assert body["interfaces"] == [
|
|
{
|
|
"index": 1,
|
|
"name": "Gi1/0/1",
|
|
"description": "Uplink",
|
|
"admin_status": "up",
|
|
"oper_status": "up",
|
|
"speed_bps": 1_000_000_000,
|
|
}
|
|
]
|
|
assert body["monitorable_items"] == [
|
|
{
|
|
"item_id": "device.uptime",
|
|
"item_type": "device_uptime",
|
|
"group": "Device Health",
|
|
"label": "Device uptime",
|
|
"unit": "seconds",
|
|
},
|
|
{
|
|
"item_id": "interface.1.status",
|
|
"item_type": "interface_status",
|
|
"group": "Interface Gi1/0/1",
|
|
"label": "Gi1/0/1 status",
|
|
"unit": None,
|
|
},
|
|
{
|
|
"item_id": "interface.1.traffic",
|
|
"item_type": "interface_traffic",
|
|
"group": "Interface Gi1/0/1",
|
|
"label": "Gi1/0/1 traffic",
|
|
"unit": "bps",
|
|
},
|
|
{
|
|
"item_id": "interface.1.errors",
|
|
"item_type": "interface_errors",
|
|
"group": "Interface Gi1/0/1",
|
|
"label": "Gi1/0/1 errors and discards",
|
|
"unit": "count",
|
|
},
|
|
]
|
|
assert "private-community" not in response.text
|
|
assert "1.3.6" not in response.text
|
|
|
|
|
|
def test_snmp_profile_mapping_discovers_standard_health_items(monkeypatch) -> None:
|
|
class FakeClient:
|
|
def __init__(self, host: str, credential: SnmpCredential) -> None:
|
|
self.host = host
|
|
self.credential = credential
|
|
|
|
def get_many(self, _oids):
|
|
return {
|
|
SYS_NAME: "edge-router",
|
|
SYS_DESCR: "Linux edge-router net-snmp",
|
|
SYS_UPTIME: 10_000,
|
|
}
|
|
|
|
def walk(self, base_oid, max_items=128):
|
|
values = {
|
|
HR_PROCESSOR_LOAD: {(*HR_PROCESSOR_LOAD, 196608): 17},
|
|
HR_STORAGE_TYPE: {
|
|
(*HR_STORAGE_TYPE, 1): HR_STORAGE_RAM,
|
|
(*HR_STORAGE_TYPE, 31): HR_STORAGE_FIXED_DISK,
|
|
},
|
|
HR_STORAGE_DESCR: {
|
|
(*HR_STORAGE_DESCR, 1): "Physical memory",
|
|
(*HR_STORAGE_DESCR, 31): "/",
|
|
},
|
|
HR_STORAGE_ALLOCATION_UNITS: {
|
|
(*HR_STORAGE_ALLOCATION_UNITS, 1): 1024,
|
|
(*HR_STORAGE_ALLOCATION_UNITS, 31): 4096,
|
|
},
|
|
HR_STORAGE_SIZE: {
|
|
(*HR_STORAGE_SIZE, 1): 2048,
|
|
(*HR_STORAGE_SIZE, 31): 4096,
|
|
},
|
|
HR_STORAGE_USED: {
|
|
(*HR_STORAGE_USED, 1): 1024,
|
|
(*HR_STORAGE_USED, 31): 1024,
|
|
},
|
|
ENT_PHY_SENSOR_TYPE: {(*ENT_PHY_SENSOR_TYPE, 10): 8},
|
|
ENT_PHY_SENSOR_VALUE: {(*ENT_PHY_SENSOR_VALUE, 10): 310},
|
|
ENT_PHYSICAL_NAME: {(*ENT_PHYSICAL_NAME, 10): "Inlet"},
|
|
}
|
|
return values.get(base_oid, {})
|
|
|
|
monkeypatch.setattr("app.services.snmp.SnmpV2Client", FakeClient)
|
|
|
|
discovered = discover_snmp_device("192.0.2.20", SnmpCredential(community="private-community"))
|
|
|
|
assert discovered.profile_key == "net_snmp"
|
|
assert discovered.profile_name == "Net-SNMP Host Resources"
|
|
assert discovered.capabilities["cpu"] is True
|
|
assert discovered.capabilities["memory"] is True
|
|
assert discovered.capabilities["storage"] is True
|
|
assert discovered.capabilities["sensors"] is True
|
|
assert [(item.item_id, item.item_type, item.group, item.label, item.unit) for item in discovered.health_items] == [
|
|
("cpu.196608.load", "cpu_load", "Device Health", "CPU load", "%"),
|
|
("storage.1.memory", "memory_usage", "Device Health", "Memory used", "%"),
|
|
("storage.31.usage", "storage_usage", "Storage", "Disk / usage", "%"),
|
|
("sensor.10.value", "sensor_value", "Environmental", "Temperature Inlet", "C"),
|
|
]
|
|
|
|
|
|
def test_snmp_discovery_rejects_missing_profile(client: TestClient) -> None:
|
|
response = client.post("/discovery/snmp", json={"host": "192.0.2.10", "credential_profile_id": 999})
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_snmp_discovery_rejects_profile_without_secret(client: TestClient, db_session) -> None:
|
|
profile = Credential(
|
|
name="No Secret",
|
|
credential_type="snmp",
|
|
encrypted_secret=None,
|
|
extra={"version": "2c"},
|
|
)
|
|
db_session.add(profile)
|
|
db_session.commit()
|
|
|
|
response = client.post("/discovery/snmp", json={"host": "192.0.2.10", "credential_profile_id": profile.id})
|
|
|
|
assert response.status_code == 400
|
|
|
|
|
|
def test_snmp_discovery_reports_probe_failure(client: TestClient, db_session, monkeypatch) -> None:
|
|
profile = Credential(
|
|
name="Core Switch",
|
|
credential_type="snmp",
|
|
encrypted_secret=encrypt_secret("private-community"),
|
|
extra={"version": "2c"},
|
|
)
|
|
db_session.add(profile)
|
|
db_session.commit()
|
|
|
|
def fake_discover(host: str, credential: SnmpCredential) -> DiscoveredSnmpDevice:
|
|
raise SnmpDiscoveryError("timeout")
|
|
|
|
monkeypatch.setattr("app.api.discovery.discover_snmp_device", fake_discover)
|
|
|
|
response = client.post("/discovery/snmp", json={"host": "192.0.2.10", "credential_profile_id": profile.id})
|
|
|
|
assert response.status_code == 502
|
|
assert "private-community" not in response.text
|