Files
OrbitWard/worker/tests/test_snmp_collector.py
T
2026-05-24 00:44:02 -06:00

118 lines
4.0 KiB
Python

import unittest
from unittest.mock import patch
from app.collectors.snmp import (
IF_ADMIN_STATUS,
IF_HC_IN_OCTETS,
IF_HC_OUT_OCTETS,
IF_IN_DISCARDS,
IF_IN_ERRORS,
IF_OPER_STATUS,
IF_OUT_DISCARDS,
IF_OUT_ERRORS,
SYS_UPTIME,
SnmpCheckConfig,
_with_index,
run_snmp_check,
)
class SnmpCollectorTestCase(unittest.IsolatedAsyncioTestCase):
async def test_collects_device_uptime(self) -> None:
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {SYS_UPTIME: 123_400}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="device.uptime",
item_type="device_uptime",
)
)
assert result.status == "up"
assert result.message == "Device uptime is 1234 seconds"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("uptime_seconds", 1234.0, "seconds")
]
async def test_collects_interface_status(self) -> None:
admin_oid = _with_index(IF_ADMIN_STATUS, 7)
oper_oid = _with_index(IF_OPER_STATUS, 7)
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {admin_oid: 1, oper_oid: 2}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="interface.7.status",
item_type="interface_status",
)
)
assert result.status == "down"
assert result.message == "Interface admin up, operational down"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("admin_status", 1.0, None),
("oper_status", 2.0, None),
]
async def test_collects_interface_traffic_from_high_capacity_counters(self) -> None:
in_oid = _with_index(IF_HC_IN_OCTETS, 3)
out_oid = _with_index(IF_HC_OUT_OCTETS, 3)
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {in_oid: 123, out_oid: 456}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="interface.3.traffic",
item_type="interface_traffic",
)
)
assert result.status == "up"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("in_octets", 123.0, "bytes"),
("out_octets", 456.0, "bytes"),
]
async def test_collects_interface_errors_and_discards(self) -> None:
oids = [
_with_index(IF_IN_ERRORS, 5),
_with_index(IF_OUT_ERRORS, 5),
_with_index(IF_IN_DISCARDS, 5),
_with_index(IF_OUT_DISCARDS, 5),
]
with patch("app.collectors.snmp.SnmpV2Client") as client_class:
client_class.return_value.get_many.return_value = {
oids[0]: 1,
oids[1]: 2,
oids[2]: 3,
oids[3]: 4,
}
result = await run_snmp_check(
SnmpCheckConfig(
host="192.0.2.10",
community="private-community",
item_id="interface.5.errors",
item_type="interface_errors",
)
)
assert result.status == "up"
assert [(metric.name, metric.value, metric.unit) for metric in result.metrics] == [
("in_errors", 1.0, "count"),
("out_errors", 2.0, "count"),
("in_discards", 3.0, "count"),
("out_discards", 4.0, "count"),
]
if __name__ == "__main__":
unittest.main()