Files
OrbitWard/worker/tests/test_snmp_collector.py
T
2026-05-26 17:43:33 -06:00

283 lines
10 KiB
Python

import unittest
from unittest.mock import patch
from app.collectors.snmp import (
IF_ADMIN_STATUS,
IF_HC_IN_OCTETS,
IF_HC_OUT_OCTETS,
IF_IN_DISCARDS,
IF_IN_ERRORS,
IF_OPER_STATUS,
IF_OUT_DISCARDS,
IF_OUT_ERRORS,
ENT_PHY_SENSOR_OPER_STATUS,
ENT_PHY_SENSOR_PRECISION,
ENT_PHY_SENSOR_SCALE,
ENT_PHY_SENSOR_TYPE,
ENT_PHY_SENSOR_VALUE,
HR_PROCESSOR_LOAD,
HR_STORAGE_ALLOCATION_UNITS,
HR_STORAGE_SIZE,
HR_STORAGE_USED,
SYS_UPTIME,
UCD_DSK_PERCENT,
UCD_LA_LOAD_INT,
UCD_MEM_AVAIL_REAL,
UCD_MEM_TOTAL_REAL,
SnmpCheckConfig,
_with_index,
run_snmp_check,
)
class SnmpCollectorTestCase(unittest.IsolatedAsyncioTestCase):
async def test_collects_device_uptime(self) -> None:
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {SYS_UPTIME: 123_400}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="device.uptime",
item_type="device_uptime",
)
)
assert result.status == "up"
assert result.message == "Device uptime is 1234 seconds"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("uptime_seconds", 1234.0, "seconds")
]
async def test_collects_cpu_load(self) -> None:
oid = _with_index(HR_PROCESSOR_LOAD, 196608)
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {oid: 42}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="cpu.196608.load",
item_type="cpu_load",
)
)
assert result.status == "up"
assert result.message == "CPU load is 42%"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("load_percent", 42.0, "%")
]
async def test_collects_linux_load_average(self) -> None:
oid = _with_index(UCD_LA_LOAD_INT, 1)
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {oid: 123}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="linux.load.1",
item_type="linux_load_average",
label="Load average 1 minute",
)
)
assert result.status == "up"
assert result.message == "Load average 1 minute is 1.23"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("load_average", 1.23, None)
]
async def test_collects_linux_memory_usage(self) -> None:
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {
UCD_MEM_TOTAL_REAL: 1000,
UCD_MEM_AVAIL_REAL: 250,
}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="linux.memory.real",
item_type="linux_memory_usage",
)
)
assert result.status == "up"
assert result.message == "Memory is 75.0% used"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("used_percent", 75.0, "%"),
("used_bytes", 768000.0, "bytes"),
("total_bytes", 1024000.0, "bytes"),
]
async def test_collects_linux_disk_usage(self) -> None:
oid = _with_index(UCD_DSK_PERCENT, 31)
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {oid: 81}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="linux.disk.31",
item_type="linux_disk_usage",
label="Disk / usage",
)
)
assert result.status == "up"
assert result.message == "Disk / usage is 81% used"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("used_percent", 81.0, "%")
]
async def test_collects_storage_usage(self) -> None:
oids = [
_with_index(HR_STORAGE_ALLOCATION_UNITS, 31),
_with_index(HR_STORAGE_SIZE, 31),
_with_index(HR_STORAGE_USED, 31),
]
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {
oids[0]: 4096,
oids[1]: 100,
oids[2]: 25,
}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="storage.31.usage",
item_type="storage_usage",
label="Disk / usage",
)
)
assert result.status == "up"
assert result.message == "Disk / usage is 25.0% used"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("used_percent", 25.0, "%"),
("used_bytes", 102400.0, "bytes"),
("total_bytes", 409600.0, "bytes"),
]
async def test_collects_sensor_value_and_status(self) -> None:
oids = [
_with_index(ENT_PHY_SENSOR_TYPE, 10),
_with_index(ENT_PHY_SENSOR_SCALE, 10),
_with_index(ENT_PHY_SENSOR_PRECISION, 10),
_with_index(ENT_PHY_SENSOR_VALUE, 10),
_with_index(ENT_PHY_SENSOR_OPER_STATUS, 10),
]
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {
oids[0]: 8,
oids[1]: 9,
oids[2]: 1,
oids[3]: 310,
oids[4]: 1,
}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="sensor.10.value",
item_type="sensor_value",
label="Temperature Inlet",
unit="C",
)
)
assert result.status == "up"
assert result.message == "Temperature Inlet is 31C; sensor status ok"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("sensor_value", 31.0, "C"),
("sensor_status", 1.0, None),
]
async def test_collects_interface_status(self) -> None:
admin_oid = _with_index(IF_ADMIN_STATUS, 7)
oper_oid = _with_index(IF_OPER_STATUS, 7)
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {admin_oid: 1, oper_oid: 2}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="interface.7.status",
item_type="interface_status",
)
)
assert result.status == "down"
assert result.message == "Interface admin up, operational down"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("admin_status", 1.0, None),
("oper_status", 2.0, None),
]
async def test_collects_interface_traffic_from_high_capacity_counters(self) -> None:
in_oid = _with_index(IF_HC_IN_OCTETS, 3)
out_oid = _with_index(IF_HC_OUT_OCTETS, 3)
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {in_oid: 123, out_oid: 456}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="interface.3.traffic",
item_type="interface_traffic",
)
)
assert result.status == "up"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("in_octets", 123.0, "bytes"),
("out_octets", 456.0, "bytes"),
]
async def test_collects_interface_errors_and_discards(self) -> None:
oids = [
_with_index(IF_IN_ERRORS, 5),
_with_index(IF_OUT_ERRORS, 5),
_with_index(IF_IN_DISCARDS, 5),
_with_index(IF_OUT_DISCARDS, 5),
]
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {
oids[0]: 1,
oids[1]: 2,
oids[2]: 3,
oids[3]: 4,
}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="interface.5.errors",
item_type="interface_errors",
)
)
assert result.status == "up"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("in_errors", 1.0, "count"),
("out_errors", 2.0, "count"),
("in_discards", 3.0, "count"),
("out_discards", 4.0, "count"),
]
if __name__ == "__main__":
unittest.main()