import asyncio from dataclasses import dataclass, field import random import socket from time import perf_counter from typing import Any class SnmpCheckError(Exception): pass @dataclass(frozen=True) class SnmpMetricValue: name: str value: float unit: str | None = None @dataclass(frozen=True) class SnmpCheckConfig: host: str community: str item_id: str item_type: str label: str | None = None unit: str | None = None port: int = 161 timeout_seconds: float = 5.0 retries: int = 1 @dataclass(frozen=True) class SnmpCheckResult: status: str response_time_ms: int | None message: str metrics: list[SnmpMetricValue] = field(default_factory=list) SYS_UPTIME = (1, 3, 6, 1, 2, 1, 1, 3, 0) IF_ADMIN_STATUS = (1, 3, 6, 1, 2, 1, 2, 2, 1, 7) IF_OPER_STATUS = (1, 3, 6, 1, 2, 1, 2, 2, 1, 8) IF_IN_OCTETS = (1, 3, 6, 1, 2, 1, 2, 2, 1, 10) IF_IN_DISCARDS = (1, 3, 6, 1, 2, 1, 2, 2, 1, 13) IF_IN_ERRORS = (1, 3, 6, 1, 2, 1, 2, 2, 1, 14) IF_OUT_OCTETS = (1, 3, 6, 1, 2, 1, 2, 2, 1, 16) IF_OUT_DISCARDS = (1, 3, 6, 1, 2, 1, 2, 2, 1, 19) IF_OUT_ERRORS = (1, 3, 6, 1, 2, 1, 2, 2, 1, 20) IF_HC_IN_OCTETS = (1, 3, 6, 1, 2, 1, 31, 1, 1, 1, 6) IF_HC_OUT_OCTETS = (1, 3, 6, 1, 2, 1, 31, 1, 1, 1, 10) HR_PROCESSOR_LOAD = (1, 3, 6, 1, 2, 1, 25, 3, 3, 1, 2) HR_STORAGE_ALLOCATION_UNITS = (1, 3, 6, 1, 2, 1, 25, 2, 3, 1, 4) HR_STORAGE_SIZE = (1, 3, 6, 1, 2, 1, 25, 2, 3, 1, 5) HR_STORAGE_USED = (1, 3, 6, 1, 2, 1, 25, 2, 3, 1, 6) ENT_PHY_SENSOR_TYPE = (1, 3, 6, 1, 2, 1, 99, 1, 1, 1, 1) ENT_PHY_SENSOR_SCALE = (1, 3, 6, 1, 2, 1, 99, 1, 1, 1, 2) ENT_PHY_SENSOR_PRECISION = (1, 3, 6, 1, 2, 1, 99, 1, 1, 1, 3) ENT_PHY_SENSOR_VALUE = (1, 3, 6, 1, 2, 1, 99, 1, 1, 1, 4) ENT_PHY_SENSOR_OPER_STATUS = (1, 3, 6, 1, 2, 1, 99, 1, 1, 1, 5) UCD_LA_LOAD_INT = (1, 3, 6, 1, 4, 1, 2021, 10, 1, 5) UCD_MEM_TOTAL_REAL = (1, 3, 6, 1, 4, 1, 2021, 4, 5, 0) UCD_MEM_AVAIL_REAL = (1, 3, 6, 1, 4, 1, 2021, 4, 6, 0) UCD_DSK_PERCENT = (1, 3, 6, 1, 4, 1, 2021, 9, 1, 9) STATUS_LABELS = { 1: "up", 2: "down", 3: "testing", 4: "unknown", 5: "dormant", 6: "not present", 7: "lower layer down", } SENSOR_STATUS_LABELS = { 1: "ok", 2: "unavailable", 3: "nonoperational", } SENSOR_TYPE_UNITS = { 3: "V", 4: "V", 5: "A", 6: "W", 7: "Hz", 8: "C", 9: "%", 10: "rpm", 11: "m3/min", } async def run_snmp_check(config: SnmpCheckConfig) -> SnmpCheckResult: try: return await asyncio.to_thread(_run_snmp_check_sync, config) except (OSError, SnmpCheckError) as exc: return SnmpCheckResult(status="down", response_time_ms=None, message=f"SNMP check failed: {exc}") def _run_snmp_check_sync(config: SnmpCheckConfig) -> SnmpCheckResult: started = perf_counter() client = SnmpV2Client(config.host, config.community, config.port, config.timeout_seconds, config.retries) if config.item_type == "device_uptime": value = _int_value(client.get_many([SYS_UPTIME]).get(SYS_UPTIME)) response_time_ms = int((perf_counter() - started) * 1000) if value is None: return SnmpCheckResult(status="down", response_time_ms=response_time_ms, message="Device uptime was not reported") uptime_seconds = int(value / 100) return SnmpCheckResult( status="up", response_time_ms=response_time_ms, message=f"Device uptime is {uptime_seconds} seconds", metrics=[SnmpMetricValue(name="uptime_seconds", value=float(uptime_seconds), unit="seconds")], ) if config.item_type == "cpu_load": processor_index = _item_index(config.item_id, "cpu") if processor_index is None: return SnmpCheckResult(status="down", response_time_ms=0, message="SNMP CPU item was not valid") oid = _with_index(HR_PROCESSOR_LOAD, processor_index) value = _int_value(client.get_many([oid]).get(oid)) response_time_ms = int((perf_counter() - started) * 1000) if value is None: return SnmpCheckResult(status="down", response_time_ms=response_time_ms, message="CPU load was not reported") return SnmpCheckResult( status="up", response_time_ms=response_time_ms, message=f"CPU load is {value}%", metrics=[SnmpMetricValue(name="load_percent", value=float(value), unit="%")], ) if config.item_type == "linux_load_average": load_index = _item_index(config.item_id, "linux", position=2) if load_index is None: return SnmpCheckResult(status="down", response_time_ms=0, message="SNMP Linux load item was not valid") oid = _with_index(UCD_LA_LOAD_INT, load_index) value = _int_value(client.get_many([oid]).get(oid)) response_time_ms = int((perf_counter() - started) * 1000) if value is None: return SnmpCheckResult(status="down", response_time_ms=response_time_ms, message="Linux load average was not reported") load_average = value / 100 label = config.label or "Load average" return SnmpCheckResult( status="up", response_time_ms=response_time_ms, message=f"{label} is {load_average:.2f}", metrics=[SnmpMetricValue(name="load_average", value=load_average)], ) if config.item_type == "linux_memory_usage": values = client.get_many([UCD_MEM_TOTAL_REAL, UCD_MEM_AVAIL_REAL]) response_time_ms = int((perf_counter() - started) * 1000) total_kb = _int_value(values.get(UCD_MEM_TOTAL_REAL)) available_kb = _int_value(values.get(UCD_MEM_AVAIL_REAL)) if not total_kb or available_kb is None: return SnmpCheckResult(status="down", response_time_ms=response_time_ms, message="Linux memory was not reported") used_kb = total_kb - available_kb used_percent = (used_kb / total_kb) * 100 return SnmpCheckResult( status="up", response_time_ms=response_time_ms, message=f"Memory is {used_percent:.1f}% used", metrics=[ SnmpMetricValue(name="used_percent", value=used_percent, unit="%"), SnmpMetricValue(name="used_bytes", value=float(used_kb * 1024), unit="bytes"), SnmpMetricValue(name="total_bytes", value=float(total_kb * 1024), unit="bytes"), ], ) if config.item_type == "linux_disk_usage": disk_index = _item_index(config.item_id, "linux", position=2) if disk_index is None: return SnmpCheckResult(status="down", response_time_ms=0, message="SNMP Linux disk item was not valid") oid = _with_index(UCD_DSK_PERCENT, disk_index) used_percent = _int_value(client.get_many([oid]).get(oid)) response_time_ms = int((perf_counter() - started) * 1000) if used_percent is None: return SnmpCheckResult(status="down", response_time_ms=response_time_ms, message="Linux disk usage was not reported") label = config.label or "Disk" return SnmpCheckResult( status="up", response_time_ms=response_time_ms, message=f"{label} is {used_percent}% used", metrics=[SnmpMetricValue(name="used_percent", value=float(used_percent), unit="%")], ) if config.item_type in {"memory_usage", "storage_usage"}: storage_index = _item_index(config.item_id, "storage") if storage_index is None: return SnmpCheckResult(status="down", response_time_ms=0, message="SNMP storage item was not valid") oids = [ _with_index(HR_STORAGE_ALLOCATION_UNITS, storage_index), _with_index(HR_STORAGE_SIZE, storage_index), _with_index(HR_STORAGE_USED, storage_index), ] values = client.get_many(oids) response_time_ms = int((perf_counter() - started) * 1000) allocation_unit = _int_value(values.get(oids[0])) size = _int_value(values.get(oids[1])) used = _int_value(values.get(oids[2])) if not allocation_unit or not size or used is None: return SnmpCheckResult(status="down", response_time_ms=response_time_ms, message="Storage usage was not reported") total_bytes = float(size * allocation_unit) used_bytes = float(used * allocation_unit) used_percent = (used / size) * 100 label = config.label or ("Memory" if config.item_type == "memory_usage" else "Storage") return SnmpCheckResult( status="up", response_time_ms=response_time_ms, message=f"{label} is {used_percent:.1f}% used", metrics=[ SnmpMetricValue(name="used_percent", value=used_percent, unit="%"), SnmpMetricValue(name="used_bytes", value=used_bytes, unit="bytes"), SnmpMetricValue(name="total_bytes", value=total_bytes, unit="bytes"), ], ) if config.item_type == "sensor_value": sensor_index = _item_index(config.item_id, "sensor") if sensor_index is None: return SnmpCheckResult(status="down", response_time_ms=0, message="SNMP sensor item was not valid") oids = [ _with_index(ENT_PHY_SENSOR_TYPE, sensor_index), _with_index(ENT_PHY_SENSOR_SCALE, sensor_index), _with_index(ENT_PHY_SENSOR_PRECISION, sensor_index), _with_index(ENT_PHY_SENSOR_VALUE, sensor_index), _with_index(ENT_PHY_SENSOR_OPER_STATUS, sensor_index), ] values = client.get_many(oids) response_time_ms = int((perf_counter() - started) * 1000) sensor_type = _int_value(values.get(oids[0])) scale = _int_value(values.get(oids[1])) precision = _int_value(values.get(oids[2])) raw_value = _int_value(values.get(oids[3])) oper_status = _int_value(values.get(oids[4])) if raw_value is None: return SnmpCheckResult(status="down", response_time_ms=response_time_ms, message="Sensor value was not reported") value = _scaled_sensor_value(raw_value, scale, precision) unit = config.unit or SENSOR_TYPE_UNITS.get(sensor_type or 0) status_label = SENSOR_STATUS_LABELS.get(oper_status or 1, f"status {oper_status}") status = "up" if oper_status in {None, 1} else "down" label = config.label or "Sensor" return SnmpCheckResult( status=status, response_time_ms=response_time_ms, message=f"{label} is {value:g}{unit or ''}; sensor status {status_label}", metrics=[ SnmpMetricValue(name="sensor_value", value=value, unit=unit), *([SnmpMetricValue(name="sensor_status", value=float(oper_status))] if oper_status is not None else []), ], ) interface_index = _interface_index(config.item_id) if interface_index is None: return SnmpCheckResult(status="down", response_time_ms=0, message="SNMP interface item was not valid") if config.item_type == "interface_status": oids = [_with_index(IF_ADMIN_STATUS, interface_index), _with_index(IF_OPER_STATUS, interface_index)] values = client.get_many(oids) response_time_ms = int((perf_counter() - started) * 1000) admin_value = _int_value(values.get(oids[0])) oper_value = _int_value(values.get(oids[1])) if admin_value is None or oper_value is None: return SnmpCheckResult(status="down", response_time_ms=response_time_ms, message="Interface status was not reported") admin_status = STATUS_LABELS.get(admin_value, f"status {admin_value}") oper_status = STATUS_LABELS.get(oper_value, f"status {oper_value}") status = "up" if admin_value == 1 and oper_value == 1 else "down" return SnmpCheckResult( status=status, response_time_ms=response_time_ms, message=f"Interface admin {admin_status}, operational {oper_status}", metrics=[ SnmpMetricValue(name="admin_status", value=float(admin_value)), SnmpMetricValue(name="oper_status", value=float(oper_value)), ], ) if config.item_type == "interface_traffic": oids = [ _with_index(IF_HC_IN_OCTETS, interface_index), _with_index(IF_HC_OUT_OCTETS, interface_index), _with_index(IF_IN_OCTETS, interface_index), _with_index(IF_OUT_OCTETS, interface_index), ] values = client.get_many(oids) response_time_ms = int((perf_counter() - started) * 1000) in_octets = _int_value(values.get(oids[0])) or _int_value(values.get(oids[2])) out_octets = _int_value(values.get(oids[1])) or _int_value(values.get(oids[3])) if in_octets is None and out_octets is None: return SnmpCheckResult(status="down", response_time_ms=response_time_ms, message="Interface traffic counters were not reported") metrics = [] if in_octets is not None: metrics.append(SnmpMetricValue(name="in_octets", value=float(in_octets), unit="bytes")) if out_octets is not None: metrics.append(SnmpMetricValue(name="out_octets", value=float(out_octets), unit="bytes")) return SnmpCheckResult( status="up", response_time_ms=response_time_ms, message="Interface traffic counters collected", metrics=metrics, ) if config.item_type == "interface_errors": oids = [ _with_index(IF_IN_ERRORS, interface_index), _with_index(IF_OUT_ERRORS, interface_index), _with_index(IF_IN_DISCARDS, interface_index), _with_index(IF_OUT_DISCARDS, interface_index), ] values = client.get_many(oids) response_time_ms = int((perf_counter() - started) * 1000) metric_values = [ ("in_errors", _int_value(values.get(oids[0])), "count"), ("out_errors", _int_value(values.get(oids[1])), "count"), ("in_discards", _int_value(values.get(oids[2])), "count"), ("out_discards", _int_value(values.get(oids[3])), "count"), ] metrics = [SnmpMetricValue(name=name, value=float(value), unit=unit) for name, value, unit in metric_values if value is not None] if not metrics: return SnmpCheckResult(status="down", response_time_ms=response_time_ms, message="Interface error counters were not reported") return SnmpCheckResult( status="up", response_time_ms=response_time_ms, message="Interface error and discard counters collected", metrics=metrics, ) return SnmpCheckResult(status="down", response_time_ms=0, message=f"Unsupported SNMP item type: {config.item_type}") def _interface_index(item_id: str) -> int | None: return _item_index(item_id, "interface") def _item_index(item_id: str, expected_prefix: str, position: int = 1) -> int | None: parts = item_id.split(".") if len(parts) <= position or parts[0] != expected_prefix: return None try: return int(parts[position]) except ValueError: return None def _with_index(oid: tuple[int, ...], index: int) -> tuple[int, ...]: return (*oid, index) def _int_value(value: Any) -> int | None: if isinstance(value, int): return value return None def _scaled_sensor_value(raw_value: int, scale: int | None, precision: int | None) -> float: scale_multiplier = 10 ** ((scale or 9) - 9) precision_divisor = 10 ** (precision or 0) return float(raw_value * scale_multiplier / precision_divisor) class SnmpV2Client: def __init__(self, host: str, community: str, port: int, timeout_seconds: float, retries: int) -> None: self.host = host self.community = community self.port = port self.timeout_seconds = timeout_seconds self.retries = retries def get_many(self, oids: list[tuple[int, ...]]) -> dict[tuple[int, ...], Any]: return dict(self._request(0xA0, oids)) def _request(self, pdu_tag: int, oids: list[tuple[int, ...]]) -> list[tuple[tuple[int, ...], Any]]: request_id = random.randint(1, 2_147_483_647) packet = _encode_message(pdu_tag, request_id, self.community, oids) last_error: OSError | None = None for _ in range(self.retries + 1): try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.settimeout(self.timeout_seconds) sock.sendto(packet, (self.host, self.port)) response, _ = sock.recvfrom(65535) return _decode_response(response, request_id) except OSError as exc: last_error = exc raise SnmpCheckError(f"SNMP request failed for {self.host}") from last_error def _encode_message(pdu_tag: int, request_id: int, community: str, oids: list[tuple[int, ...]]) -> bytes: varbinds = b"".join(_sequence(_encode_oid(oid) + _tlv(0x05, b"")) for oid in oids) pdu = _tlv( pdu_tag, _encode_integer(request_id) + _encode_integer(0) + _encode_integer(0) + _sequence(varbinds), ) return _sequence(_encode_integer(1) + _tlv(0x04, community.encode("utf-8")) + pdu) def _sequence(value: bytes) -> bytes: return _tlv(0x30, value) def _tlv(tag: int, value: bytes) -> bytes: return bytes([tag]) + _encode_length(len(value)) + value def _encode_length(length: int) -> bytes: if length < 128: return bytes([length]) encoded = length.to_bytes((length.bit_length() + 7) // 8, "big") return bytes([0x80 | len(encoded)]) + encoded def _encode_integer(value: int) -> bytes: if value == 0: return _tlv(0x02, b"\x00") encoded = value.to_bytes((value.bit_length() + 7) // 8, "big") if encoded[0] & 0x80: encoded = b"\x00" + encoded return _tlv(0x02, encoded) def _encode_oid(oid: tuple[int, ...]) -> bytes: if len(oid) < 2: raise ValueError("OID must have at least two parts") body = bytes([oid[0] * 40 + oid[1]]) for part in oid[2:]: body += _encode_base128(part) return _tlv(0x06, body) def _encode_base128(value: int) -> bytes: chunks = [value & 0x7F] value >>= 7 while value: chunks.insert(0, 0x80 | (value & 0x7F)) value >>= 7 return bytes(chunks) def _decode_response(data: bytes, expected_request_id: int) -> list[tuple[tuple[int, ...], Any]]: tag, message_value, _ = _read_tlv(data, 0) if tag != 0x30: raise SnmpCheckError("SNMP response was not a sequence") offset = 0 _, _, offset = _read_tlv(message_value, offset) _, _, offset = _read_tlv(message_value, offset) pdu_tag, pdu_value, _ = _read_tlv(message_value, offset) if pdu_tag != 0xA2: raise SnmpCheckError("SNMP response was not a GetResponse") pdu_offset = 0 _, request_id_value, pdu_offset = _read_tlv(pdu_value, pdu_offset) if _decode_integer(request_id_value) != expected_request_id: raise SnmpCheckError("SNMP response request id did not match") _, error_status_value, pdu_offset = _read_tlv(pdu_value, pdu_offset) error_status = _decode_integer(error_status_value) _, _, pdu_offset = _read_tlv(pdu_value, pdu_offset) if error_status: raise SnmpCheckError(f"SNMP agent returned error status {error_status}") varbind_list_tag, varbind_list_value, _ = _read_tlv(pdu_value, pdu_offset) if varbind_list_tag != 0x30: raise SnmpCheckError("SNMP response did not include a varbind list") responses: list[tuple[tuple[int, ...], Any]] = [] varbind_offset = 0 while varbind_offset < len(varbind_list_value): varbind_tag, varbind_value, varbind_offset = _read_tlv(varbind_list_value, varbind_offset) if varbind_tag != 0x30: raise SnmpCheckError("SNMP response included an invalid varbind") oid_tag, oid_value, value_offset = _read_tlv(varbind_value, 0) if oid_tag != 0x06: raise SnmpCheckError("SNMP varbind did not include an object identifier") value_tag, value_value, _ = _read_tlv(varbind_value, value_offset) responses.append((_decode_oid(oid_value), _decode_value(value_tag, value_value))) return responses def _read_tlv(data: bytes, offset: int) -> tuple[int, bytes, int]: if offset >= len(data): raise SnmpCheckError("SNMP response ended unexpectedly") tag = data[offset] length, offset = _read_length(data, offset + 1) end = offset + length if end > len(data): raise SnmpCheckError("SNMP response length exceeded available data") return tag, data[offset:end], end def _read_length(data: bytes, offset: int) -> tuple[int, int]: first = data[offset] offset += 1 if first < 128: return first, offset byte_count = first & 0x7F if byte_count == 0: raise SnmpCheckError("SNMP response used indefinite length") return int.from_bytes(data[offset : offset + byte_count], "big"), offset + byte_count def _decode_integer(value: bytes) -> int: if not value: return 0 return int.from_bytes(value, "big", signed=bool(value[0] & 0x80)) def _decode_oid(value: bytes) -> tuple[int, ...]: if not value: raise SnmpCheckError("SNMP response included an empty object identifier") oid = [value[0] // 40, value[0] % 40] number = 0 for byte in value[1:]: number = (number << 7) | (byte & 0x7F) if not byte & 0x80: oid.append(number) number = 0 return tuple(oid) def _decode_value(tag: int, value: bytes) -> Any: if tag == 0x02: return _decode_integer(value) if tag in {0x41, 0x42, 0x43, 0x46}: return int.from_bytes(value, "big") if tag == 0x04: return value.decode("utf-8", errors="replace") if tag == 0x06: return ".".join(str(part) for part in _decode_oid(value)) if tag in {0x05, 0x80, 0x81, 0x82}: return None return value