import asyncio import os import socket import struct from dataclasses import dataclass from time import perf_counter @dataclass(frozen=True) class NetworkCheckResult: status: str response_time_ms: int | None message: str @dataclass(frozen=True) class PingCheckConfig: host: str timeout_seconds: float = 5.0 @dataclass(frozen=True) class TcpCheckConfig: host: str port: int timeout_seconds: float = 5.0 async def run_ping_check(config: PingCheckConfig) -> NetworkCheckResult: try: response_time_ms = await asyncio.to_thread(_run_ping_check_sync, config.host, config.timeout_seconds) except PermissionError: return NetworkCheckResult(status="down", response_time_ms=None, message="ICMP ping requires raw socket permission") except TimeoutError: return NetworkCheckResult(status="down", response_time_ms=None, message="Ping timed out") except OSError as exc: return NetworkCheckResult(status="down", response_time_ms=None, message=f"Ping failed: {exc}") return NetworkCheckResult(status="up", response_time_ms=response_time_ms, message="Ping check passed") async def run_tcp_check(config: TcpCheckConfig) -> NetworkCheckResult: started = perf_counter() try: connection = asyncio.open_connection(config.host, config.port) reader, writer = await asyncio.wait_for(connection, timeout=config.timeout_seconds) writer.close() await writer.wait_closed() reader.feed_eof() except (TimeoutError, OSError) as exc: return NetworkCheckResult(status="down", response_time_ms=None, message=f"TCP connection failed: {exc}") response_time_ms = int((perf_counter() - started) * 1000) return NetworkCheckResult(status="up", response_time_ms=response_time_ms, message="TCP connection succeeded") def _run_ping_check_sync(host: str, timeout_seconds: float) -> int: address = _resolve_ipv4(host) identifier = os.getpid() & 0xFFFF sequence = 1 packet = _build_icmp_echo_request(identifier, sequence) with socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) as sock: sock.settimeout(timeout_seconds) started = perf_counter() sock.sendto(packet, (address, 0)) while True: response, _ = sock.recvfrom(1024) if _matches_icmp_echo_reply(response, identifier, sequence): return int((perf_counter() - started) * 1000) def _resolve_ipv4(host: str) -> str: results = socket.getaddrinfo(host, None, socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) if not results: raise OSError("Could not resolve an IPv4 address") return str(results[0][4][0]) def _build_icmp_echo_request(identifier: int, sequence: int) -> bytes: payload = b"OrbitalWard ping" header = struct.pack("!BBHHH", 8, 0, 0, identifier, sequence) checksum = _icmp_checksum(header + payload) header = struct.pack("!BBHHH", 8, 0, checksum, identifier, sequence) return header + payload def _matches_icmp_echo_reply(response: bytes, identifier: int, sequence: int) -> bool: if len(response) < 28: return False ip_header_length = (response[0] & 0x0F) * 4 icmp_header = response[ip_header_length : ip_header_length + 8] if len(icmp_header) < 8: return False icmp_type, _, _, reply_identifier, reply_sequence = struct.unpack("!BBHHH", icmp_header) return icmp_type == 0 and reply_identifier == identifier and reply_sequence == sequence def _icmp_checksum(data: bytes) -> int: if len(data) % 2: data += b"\x00" checksum = 0 for index in range(0, len(data), 2): checksum += (data[index] << 8) + data[index + 1] checksum = (checksum & 0xFFFF) + (checksum >> 16) return ~checksum & 0xFFFF