from dataclasses import dataclass, field import random import re import socket from typing import Any class SnmpDiscoveryError(Exception): pass @dataclass(frozen=True) class SnmpCredential: community: str port: int = 161 timeout_seconds: int = 5 retries: int = 1 @dataclass(frozen=True) class DiscoveredSnmpInterface: index: int name: str label: str description: str | None admin_status: str | None oper_status: str | None speed_bps: int | None @dataclass(frozen=True) class DiscoveredSnmpHealthItem: item_id: str item_type: str group: str label: str unit: str | None = None @dataclass(frozen=True) class DiscoveredSnmpDevice: host: str device_name: str | None description: str | None uptime_seconds: int | None interfaces: list[DiscoveredSnmpInterface] profile_key: str = "generic_snmp" profile_name: str = "Generic SNMP" capabilities: dict[str, bool] = field(default_factory=dict) health_items: list[DiscoveredSnmpHealthItem] = field(default_factory=list) @dataclass(frozen=True) class InterfaceIdentity: label: str name: str description: str | None SYS_DESCR = (1, 3, 6, 1, 2, 1, 1, 1, 0) SYS_UPTIME = (1, 3, 6, 1, 2, 1, 1, 3, 0) SYS_NAME = (1, 3, 6, 1, 2, 1, 1, 5, 0) IF_DESCR = (1, 3, 6, 1, 2, 1, 2, 2, 1, 2) IF_SPEED = (1, 3, 6, 1, 2, 1, 2, 2, 1, 5) IF_ADMIN_STATUS = (1, 3, 6, 1, 2, 1, 2, 2, 1, 7) IF_OPER_STATUS = (1, 3, 6, 1, 2, 1, 2, 2, 1, 8) IF_NAME = (1, 3, 6, 1, 2, 1, 31, 1, 1, 1, 1) HR_PROCESSOR_LOAD = (1, 3, 6, 1, 2, 1, 25, 3, 3, 1, 2) HR_STORAGE_TYPE = (1, 3, 6, 1, 2, 1, 25, 2, 3, 1, 2) HR_STORAGE_DESCR = (1, 3, 6, 1, 2, 1, 25, 2, 3, 1, 3) HR_STORAGE_ALLOCATION_UNITS = (1, 3, 6, 1, 2, 1, 25, 2, 3, 1, 4) HR_STORAGE_SIZE = (1, 3, 6, 1, 2, 1, 25, 2, 3, 1, 5) HR_STORAGE_USED = (1, 3, 6, 1, 2, 1, 25, 2, 3, 1, 6) ENT_PHYSICAL_DESCR = (1, 3, 6, 1, 2, 1, 47, 1, 1, 1, 1, 2) ENT_PHYSICAL_NAME = (1, 3, 6, 1, 2, 1, 47, 1, 1, 1, 1, 7) ENT_PHY_SENSOR_TYPE = (1, 3, 6, 1, 2, 1, 99, 1, 1, 1, 1) ENT_PHY_SENSOR_SCALE = (1, 3, 6, 1, 2, 1, 99, 1, 1, 1, 2) ENT_PHY_SENSOR_PRECISION = (1, 3, 6, 1, 2, 1, 99, 1, 1, 1, 3) ENT_PHY_SENSOR_VALUE = (1, 3, 6, 1, 2, 1, 99, 1, 1, 1, 4) ENT_PHY_SENSOR_OPER_STATUS = (1, 3, 6, 1, 2, 1, 99, 1, 1, 1, 5) UCD_LA_LOAD_INT = (1, 3, 6, 1, 4, 1, 2021, 10, 1, 5) UCD_MEM_TOTAL_REAL = (1, 3, 6, 1, 4, 1, 2021, 4, 5, 0) UCD_MEM_AVAIL_REAL = (1, 3, 6, 1, 4, 1, 2021, 4, 6, 0) UCD_DSK_PATH = (1, 3, 6, 1, 4, 1, 2021, 9, 1, 2) UCD_DSK_PERCENT = (1, 3, 6, 1, 4, 1, 2021, 9, 1, 9) HR_STORAGE_RAM = "1.3.6.1.2.1.25.2.1.2" HR_STORAGE_VIRTUAL_MEMORY = "1.3.6.1.2.1.25.2.1.3" HR_STORAGE_FIXED_DISK = "1.3.6.1.2.1.25.2.1.4" HR_STORAGE_REMOVABLE_DISK = "1.3.6.1.2.1.25.2.1.5" STATUS_LABELS = { 1: "up", 2: "down", 3: "testing", 4: "unknown", 5: "dormant", 6: "not present", 7: "lower layer down", } SENSOR_TYPE_LABELS = { 3: ("AC voltage", "V"), 4: ("DC voltage", "V"), 5: ("Current", "A"), 6: ("Power", "W"), 7: ("Frequency", "Hz"), 8: ("Temperature", "C"), 9: ("Humidity", "%"), 10: ("Fan speed", "rpm"), 11: ("Airflow", "m3/min"), 12: ("Sensor state", None), } @dataclass(frozen=True) class SnmpProfile: key: str name: str match_terms: tuple[str, ...] = () health_source: str = "standard" include_interfaces: bool = True def matches(self, system_text: str) -> bool: return any(term in system_text for term in self.match_terms) def discover_health_items(self, client: "SnmpV2Client") -> list[DiscoveredSnmpHealthItem]: if self.health_source == "linux": linux_items = _discover_linux_server_items(client) host_items = _discover_host_resource_items(client) if any(item.item_type == "linux_memory_usage" for item in linux_items): host_items = [item for item in host_items if item.item_type != "memory_usage"] return _deduplicate_items([*linux_items, *host_items, *_discover_sensor_items(client)]) return [*_discover_host_resource_items(client), *_discover_sensor_items(client)] SNMP_PROFILES = ( SnmpProfile("cisco_ios", "Cisco IOS SNMP", ("cisco", "ios")), SnmpProfile("mikrotik_routeros", "MikroTik RouterOS SNMP", ("mikrotik", "routeros")), SnmpProfile("proxmox_ve", "Proxmox VE Server", ("proxmox", "-pve", " pve "), health_source="linux"), SnmpProfile("linux_server", "Linux Server", ("linux", "net-snmp"), health_source="linux"), SnmpProfile("generic_snmp", "Generic SNMP"), ) def discover_snmp_device(host: str, credential: SnmpCredential, asset_type: str | None = None) -> DiscoveredSnmpDevice: client = SnmpV2Client(host, credential) system = client.get_many([SYS_NAME, SYS_DESCR, SYS_UPTIME]) profile = _select_profile(system, asset_type) interfaces = _discover_interfaces(client) if profile.include_interfaces else [] health_items = profile.discover_health_items(client) return DiscoveredSnmpDevice( host=host, device_name=_string_value(system.get(SYS_NAME)), description=_string_value(system.get(SYS_DESCR)), uptime_seconds=_timeticks_to_seconds(system.get(SYS_UPTIME)), interfaces=interfaces, profile_key=profile.key, profile_name=profile.name, capabilities=_capabilities(system, interfaces, health_items), health_items=health_items, ) def _select_profile(system: dict[tuple[int, ...], Any], asset_type: str | None = None) -> SnmpProfile: system_text = " ".join( value.lower() for value in [_string_value(system.get(SYS_NAME)), _string_value(system.get(SYS_DESCR))] if value ) normalized_asset_type = (asset_type or "").strip().lower() if normalized_asset_type == "server": proxmox_profile = _profile_by_key("proxmox_ve") if proxmox_profile.matches(system_text): return proxmox_profile return _profile_by_key("linux_server") if normalized_asset_type == "network_device": for profile in SNMP_PROFILES: if profile.include_interfaces and profile.match_terms and profile.matches(system_text): return profile return _profile_by_key("generic_snmp") for profile in SNMP_PROFILES: if profile.match_terms and profile.matches(system_text): return profile return SNMP_PROFILES[-1] def _profile_by_key(key: str) -> SnmpProfile: for profile in SNMP_PROFILES: if profile.key == key: return profile raise KeyError(key) def _capabilities( system: dict[tuple[int, ...], Any], interfaces: list[DiscoveredSnmpInterface], health_items: list[DiscoveredSnmpHealthItem], ) -> dict[str, bool]: item_types = {item.item_type for item in health_items} return { "system": any(system.get(oid) is not None for oid in [SYS_NAME, SYS_DESCR, SYS_UPTIME]), "interfaces": bool(interfaces), "interface_status": bool(interfaces), "interface_traffic": bool(interfaces), "interface_errors": bool(interfaces), "cpu": bool({"cpu_load", "linux_load_average"} & item_types), "memory": bool({"memory_usage", "linux_memory_usage"} & item_types), "storage": bool({"storage_usage", "linux_disk_usage"} & item_types), "sensors": "sensor_value" in item_types, "environmental": "sensor_value" in item_types, } def _discover_interfaces(client: "SnmpV2Client") -> list[DiscoveredSnmpInterface]: names = client.walk(IF_NAME) descriptions = client.walk(IF_DESCR) admin_statuses = client.walk(IF_ADMIN_STATUS) oper_statuses = client.walk(IF_OPER_STATUS) speeds = client.walk(IF_SPEED) indexes = sorted( { *_indexed_values(names).keys(), *_indexed_values(descriptions).keys(), *_indexed_values(admin_statuses).keys(), *_indexed_values(oper_statuses).keys(), *_indexed_values(speeds).keys(), } ) name_by_index = _indexed_values(names) description_by_index = _indexed_values(descriptions) admin_by_index = _indexed_values(admin_statuses) oper_by_index = _indexed_values(oper_statuses) speed_by_index = _indexed_values(speeds) interfaces: list[DiscoveredSnmpInterface] = [] for index in indexes: identity = _interface_identity(index, name_by_index.get(index), description_by_index.get(index)) interfaces.append( DiscoveredSnmpInterface( index=index, name=identity.name, label=identity.label, description=identity.description, admin_status=_status_label(admin_by_index.get(index)), oper_status=_status_label(oper_by_index.get(index)), speed_bps=_int_value(speed_by_index.get(index)), ) ) return interfaces def _interface_identity(index: int, name_value: Any, description_value: Any) -> InterfaceIdentity: raw_name = _string_value(name_value) raw_description = _string_value(description_value) name = raw_name or raw_description or f"Interface {index}" description = raw_description if raw_description and raw_description != name else None return InterfaceIdentity( label=_friendly_interface_label(name, raw_description, index), name=name, description=description, ) def _friendly_interface_label(name: str, description: str | None, index: int) -> str: value = _clean_interface_text(name) or _clean_interface_text(description or "") or f"Interface {index}" lower = value.lower() if lower in {"lo", "lo0", "loopback", "loopback0"}: return "Loopback" if lower.startswith("vlan"): suffix = _suffix_after_prefix(value, "vlan") return f"VLAN {suffix}" if suffix else "VLAN" if lower.startswith("bridge"): suffix = _suffix_after_prefix(value, "bridge") return f"Bridge {suffix}" if suffix else "Bridge" if lower.startswith("br-"): return f"Bridge {value[3:]}" if lower.startswith("bond"): suffix = _suffix_after_prefix(value, "bond") return f"Bond {suffix}" if suffix else "Bond" if lower.startswith("lag"): suffix = _suffix_after_prefix(value, "lag") return f"LAG {suffix}" if suffix else "LAG" if lower.startswith("sfp-sfpplus"): suffix = _suffix_after_prefix(value, "sfp-sfpplus") return f"SFP+ {suffix}" if suffix else "SFP+" if lower.startswith("sfpplus"): suffix = _suffix_after_prefix(value, "sfpplus") return f"SFP+ {suffix}" if suffix else "SFP+" if lower.startswith("sfp"): suffix = _suffix_after_prefix(value, "sfp") return f"SFP {suffix}" if suffix else "SFP" if lower.startswith("ethernet"): suffix = _suffix_after_prefix(value, "ethernet") return f"Ethernet {suffix}" if suffix else "Ethernet" if lower.startswith("ether"): suffix = _suffix_after_prefix(value, "ether") return f"Port {suffix}" if suffix else "Ethernet port" if lower.startswith("gi"): suffix = _suffix_after_prefix(value, "gi") return f"GigabitEthernet {suffix}" if suffix else "GigabitEthernet" if lower.startswith("te"): suffix = _suffix_after_prefix(value, "te") return f"TenGigabitEthernet {suffix}" if suffix else "TenGigabitEthernet" if lower.startswith("fa"): suffix = _suffix_after_prefix(value, "fa") return f"FastEthernet {suffix}" if suffix else "FastEthernet" if lower.startswith("ge-"): return f"GigabitEthernet {value[3:]}" if lower.startswith("xe-"): return f"TenGigabitEthernet {value[3:]}" if lower.startswith("et-"): return f"Ethernet {value[3:]}" return value def _clean_interface_text(value: str) -> str: cleaned = " ".join(value.strip().split()) if not cleaned: return "" if cleaned.startswith("Interface "): return cleaned return cleaned def _suffix_after_prefix(value: str, prefix: str) -> str: suffix = value[len(prefix) :].strip(" -_./") return re.sub(r"\s+", " ", suffix) def _discover_host_resource_items(client: "SnmpV2Client") -> list[DiscoveredSnmpHealthItem]: items: list[DiscoveredSnmpHealthItem] = [] processor_loads = _indexed_values(client.walk(HR_PROCESSOR_LOAD, max_items=256)) processor_indexes = sorted(index for index, value in processor_loads.items() if _int_value(value) is not None) for position, index in enumerate(processor_indexes, start=1): label = "CPU load" if len(processor_indexes) == 1 else f"CPU {position} load" items.append( DiscoveredSnmpHealthItem( item_id=f"cpu.{index}.load", item_type="cpu_load", group="Device Health", label=label, unit="%", ) ) storage_types = _indexed_values(client.walk(HR_STORAGE_TYPE, max_items=256)) descriptions = _indexed_values(client.walk(HR_STORAGE_DESCR, max_items=256)) allocation_units = _indexed_values(client.walk(HR_STORAGE_ALLOCATION_UNITS, max_items=256)) sizes = _indexed_values(client.walk(HR_STORAGE_SIZE, max_items=256)) used = _indexed_values(client.walk(HR_STORAGE_USED, max_items=256)) for index in sorted(storage_types): storage_type = _string_value(storage_types.get(index)) allocation_unit = _int_value(allocation_units.get(index)) size = _int_value(sizes.get(index)) used_blocks = _int_value(used.get(index)) if not storage_type or not allocation_unit or not size or used_blocks is None: continue description = _string_value(descriptions.get(index)) or f"Storage {index}" if storage_type in {HR_STORAGE_RAM, HR_STORAGE_VIRTUAL_MEMORY}: items.append( DiscoveredSnmpHealthItem( item_id=f"storage.{index}.memory", item_type="memory_usage", group="Device Health", label="Memory used", unit="%", ) ) elif storage_type in {HR_STORAGE_FIXED_DISK, HR_STORAGE_REMOVABLE_DISK}: if not _is_monitorable_storage_path(description): continue items.append( DiscoveredSnmpHealthItem( item_id=f"storage.{index}.usage", item_type="storage_usage", group="Storage", label=_storage_usage_label(description), unit="%", ) ) return _deduplicate_items(items) def _discover_linux_server_items(client: "SnmpV2Client") -> list[DiscoveredSnmpHealthItem]: items: list[DiscoveredSnmpHealthItem] = [] load_values = _indexed_values(client.walk(UCD_LA_LOAD_INT, max_items=16)) for index, label in [(1, "Load average 1 minute"), (2, "Load average 5 minutes"), (3, "Load average 15 minutes")]: if _int_value(load_values.get(index)) is not None: items.append( DiscoveredSnmpHealthItem( item_id=f"linux.load.{index}", item_type="linux_load_average", group="Server Health", label=label, ) ) memory = client.get_many([UCD_MEM_TOTAL_REAL, UCD_MEM_AVAIL_REAL]) if _int_value(memory.get(UCD_MEM_TOTAL_REAL)) and _int_value(memory.get(UCD_MEM_AVAIL_REAL)) is not None: items.append( DiscoveredSnmpHealthItem( item_id="linux.memory.real", item_type="linux_memory_usage", group="Server Health", label="Memory used", unit="%", ) ) disk_paths = _indexed_values(client.walk(UCD_DSK_PATH, max_items=256)) disk_percent = _indexed_values(client.walk(UCD_DSK_PERCENT, max_items=256)) for index in sorted(disk_paths): path = _string_value(disk_paths.get(index)) if not path or _int_value(disk_percent.get(index)) is None: continue items.append( DiscoveredSnmpHealthItem( item_id=f"linux.disk.{index}", item_type="linux_disk_usage", group="Storage", label=_storage_usage_label(path), unit="%", ) ) return _deduplicate_items(items) def _discover_sensor_items(client: "SnmpV2Client") -> list[DiscoveredSnmpHealthItem]: sensor_types = _indexed_values(client.walk(ENT_PHY_SENSOR_TYPE, max_items=256)) sensor_values = _indexed_values(client.walk(ENT_PHY_SENSOR_VALUE, max_items=256)) sensor_names = _indexed_values(client.walk(ENT_PHYSICAL_NAME, max_items=256)) sensor_descriptions = _indexed_values(client.walk(ENT_PHYSICAL_DESCR, max_items=256)) items: list[DiscoveredSnmpHealthItem] = [] for index in sorted(sensor_types): sensor_type = _int_value(sensor_types.get(index)) if sensor_type not in SENSOR_TYPE_LABELS or _int_value(sensor_values.get(index)) is None: continue kind, unit = SENSOR_TYPE_LABELS[sensor_type] name = _string_value(sensor_names.get(index)) or _string_value(sensor_descriptions.get(index)) label = kind if not name else f"{kind} {name}" items.append( DiscoveredSnmpHealthItem( item_id=f"sensor.{index}.value", item_type="sensor_value", group="Environmental", label=label, unit=unit, ) ) return items def _storage_usage_label(description: str) -> str: normalized = description.strip() if normalized in {"/", "/boot", "/home", "/var"}: return f"Disk {normalized} usage" if "disk" not in normalized.lower() and normalized.startswith("/"): return f"Disk {normalized} usage" return f"{normalized} usage" def _is_monitorable_storage_path(description: str) -> bool: normalized = description.strip() if not normalized.startswith("/"): return True ignored_exact = {"/dev", "/dev/shm", "/proc", "/run", "/sys", "/tmp"} ignored_prefixes = ( "/dev/", "/proc/", "/run/", "/sys/", ) return normalized not in ignored_exact and not normalized.startswith(ignored_prefixes) def _deduplicate_items(items: list[DiscoveredSnmpHealthItem]) -> list[DiscoveredSnmpHealthItem]: seen: set[tuple[str, str]] = set() deduplicated: list[DiscoveredSnmpHealthItem] = [] for item in items: key = (item.item_type, item.label) if key in seen: continue seen.add(key) deduplicated.append(item) return deduplicated def _indexed_values(values: dict[tuple[int, ...], Any]) -> dict[int, Any]: indexed: dict[int, Any] = {} for oid, value in values.items(): if oid: indexed[oid[-1]] = value return indexed def _string_value(value: Any) -> str | None: if value is None: return None if isinstance(value, bytes): return value.decode("utf-8", errors="replace") return str(value) def _int_value(value: Any) -> int | None: if isinstance(value, int): return value return None def _status_label(value: Any) -> str | None: if not isinstance(value, int): return None return STATUS_LABELS.get(value, f"status {value}") def _timeticks_to_seconds(value: Any) -> int | None: if not isinstance(value, int): return None return int(value / 100) class SnmpV2Client: def __init__(self, host: str, credential: SnmpCredential) -> None: self.host = host self.credential = credential def get_many(self, oids: list[tuple[int, ...]]) -> dict[tuple[int, ...], Any]: return dict(self._request(0xA0, oids)) def walk(self, base_oid: tuple[int, ...], max_items: int = 128) -> dict[tuple[int, ...], Any]: values: dict[tuple[int, ...], Any] = {} next_oid = base_oid for _ in range(max_items): response = self._request(0xA1, [next_oid]) if not response: break returned_oid, value = response[0] if not _oid_starts_with(returned_oid, base_oid): break values[returned_oid] = value next_oid = returned_oid return values def _request(self, pdu_tag: int, oids: list[tuple[int, ...]]) -> list[tuple[tuple[int, ...], Any]]: request_id = random.randint(1, 2_147_483_647) packet = _encode_message(pdu_tag, request_id, self.credential.community, oids) last_error: OSError | None = None for _ in range(self.credential.retries + 1): try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.settimeout(self.credential.timeout_seconds) sock.sendto(packet, (self.host, self.credential.port)) response, _ = sock.recvfrom(65535) return _decode_response(response, request_id) except OSError as exc: last_error = exc raise SnmpDiscoveryError(f"SNMP request failed for {self.host}") from last_error def _oid_starts_with(oid: tuple[int, ...], base_oid: tuple[int, ...]) -> bool: return oid[: len(base_oid)] == base_oid def _encode_message(pdu_tag: int, request_id: int, community: str, oids: list[tuple[int, ...]]) -> bytes: varbinds = b"".join(_sequence(_encode_oid(oid) + _tlv(0x05, b"")) for oid in oids) pdu = _tlv( pdu_tag, _encode_integer(request_id) + _encode_integer(0) + _encode_integer(0) + _sequence(varbinds), ) return _sequence(_encode_integer(1) + _tlv(0x04, community.encode("utf-8")) + pdu) def _sequence(value: bytes) -> bytes: return _tlv(0x30, value) def _tlv(tag: int, value: bytes) -> bytes: return bytes([tag]) + _encode_length(len(value)) + value def _encode_length(length: int) -> bytes: if length < 128: return bytes([length]) encoded = length.to_bytes((length.bit_length() + 7) // 8, "big") return bytes([0x80 | len(encoded)]) + encoded def _encode_integer(value: int) -> bytes: if value == 0: return _tlv(0x02, b"\x00") encoded = value.to_bytes((value.bit_length() + 7) // 8, "big") if encoded[0] & 0x80: encoded = b"\x00" + encoded return _tlv(0x02, encoded) def _encode_oid(oid: tuple[int, ...]) -> bytes: if len(oid) < 2: raise ValueError("OID must have at least two parts") body = bytes([oid[0] * 40 + oid[1]]) for part in oid[2:]: body += _encode_base128(part) return _tlv(0x06, body) def _encode_base128(value: int) -> bytes: chunks = [value & 0x7F] value >>= 7 while value: chunks.insert(0, 0x80 | (value & 0x7F)) value >>= 7 return bytes(chunks) def _decode_response(data: bytes, expected_request_id: int) -> list[tuple[tuple[int, ...], Any]]: tag, message_value, _ = _read_tlv(data, 0) if tag != 0x30: raise SnmpDiscoveryError("SNMP response was not a sequence") offset = 0 _, _, offset = _read_tlv(message_value, offset) _, _, offset = _read_tlv(message_value, offset) pdu_tag, pdu_value, offset = _read_tlv(message_value, offset) if pdu_tag != 0xA2: raise SnmpDiscoveryError("SNMP response was not a GetResponse") pdu_offset = 0 _, request_id_value, pdu_offset = _read_tlv(pdu_value, pdu_offset) if _decode_integer(request_id_value) != expected_request_id: raise SnmpDiscoveryError("SNMP response request id did not match") _, error_status_value, pdu_offset = _read_tlv(pdu_value, pdu_offset) error_status = _decode_integer(error_status_value) _, _, pdu_offset = _read_tlv(pdu_value, pdu_offset) if error_status: raise SnmpDiscoveryError(f"SNMP agent returned error status {error_status}") varbind_list_tag, varbind_list_value, _ = _read_tlv(pdu_value, pdu_offset) if varbind_list_tag != 0x30: raise SnmpDiscoveryError("SNMP response did not include a varbind list") responses: list[tuple[tuple[int, ...], Any]] = [] varbind_offset = 0 while varbind_offset < len(varbind_list_value): varbind_tag, varbind_value, varbind_offset = _read_tlv(varbind_list_value, varbind_offset) if varbind_tag != 0x30: raise SnmpDiscoveryError("SNMP response included an invalid varbind") oid_tag, oid_value, value_offset = _read_tlv(varbind_value, 0) if oid_tag != 0x06: raise SnmpDiscoveryError("SNMP varbind did not include an object identifier") value_tag, value_value, _ = _read_tlv(varbind_value, value_offset) responses.append((_decode_oid(oid_value), _decode_value(value_tag, value_value))) return responses def _read_tlv(data: bytes, offset: int) -> tuple[int, bytes, int]: if offset >= len(data): raise SnmpDiscoveryError("SNMP response ended unexpectedly") tag = data[offset] length, offset = _read_length(data, offset + 1) end = offset + length if end > len(data): raise SnmpDiscoveryError("SNMP response length exceeded available data") return tag, data[offset:end], end def _read_length(data: bytes, offset: int) -> tuple[int, int]: first = data[offset] offset += 1 if first < 128: return first, offset byte_count = first & 0x7F if byte_count == 0: raise SnmpDiscoveryError("SNMP response used indefinite length") return int.from_bytes(data[offset : offset + byte_count], "big"), offset + byte_count def _decode_integer(value: bytes) -> int: if not value: return 0 return int.from_bytes(value, "big", signed=bool(value[0] & 0x80)) def _decode_oid(value: bytes) -> tuple[int, ...]: if not value: raise SnmpDiscoveryError("SNMP response included an empty object identifier") oid = [value[0] // 40, value[0] % 40] number = 0 for byte in value[1:]: number = (number << 7) | (byte & 0x7F) if not byte & 0x80: oid.append(number) number = 0 return tuple(oid) def _decode_value(tag: int, value: bytes) -> Any: if tag == 0x02: return _decode_integer(value) if tag in {0x41, 0x42, 0x43, 0x46}: return int.from_bytes(value, "big") if tag == 0x04: return value.decode("utf-8", errors="replace") if tag == 0x06: return ".".join(str(part) for part in _decode_oid(value)) if tag in {0x05, 0x80, 0x81, 0x82}: return None return value