from dataclasses import dataclass from datetime import UTC, datetime import asyncio import socket import ssl from time import perf_counter from urllib.parse import urlparse import httpx from cryptography import x509 @dataclass(frozen=True) class WebsiteCheckConfig: url: str expected_status: int = 200 expected_text: str | None = None unexpected_text: str | None = None timeout_seconds: float = 10.0 check_tls_expiry: bool = False tls_warning_days: int = 30 @dataclass(frozen=True) class WebsiteCheckResult: status: str response_time_ms: int | None message: str @dataclass(frozen=True) class TlsExpiryResult: status: str response_time_ms: int message: str async def run_website_check(config: WebsiteCheckConfig) -> WebsiteCheckResult: started = perf_counter() tls_message: str | None = None if config.check_tls_expiry: tls_result = await check_tls_expiry(config.url, config.tls_warning_days, config.timeout_seconds) if tls_result.status != "up": return tls_result tls_message = tls_result.message try: async with httpx.AsyncClient(follow_redirects=True, timeout=config.timeout_seconds) as client: response = await client.get(config.url) except httpx.HTTPError as exc: return WebsiteCheckResult(status="down", response_time_ms=None, message=str(exc)) response_time_ms = int((perf_counter() - started) * 1000) if response.status_code != config.expected_status: return WebsiteCheckResult( status="down", response_time_ms=response_time_ms, message=f"Expected HTTP {config.expected_status}, got {response.status_code}", ) if config.expected_text and config.expected_text not in response.text: return WebsiteCheckResult( status="down", response_time_ms=response_time_ms, message="Expected text was not present", ) if config.unexpected_text and config.unexpected_text in response.text: return WebsiteCheckResult( status="down", response_time_ms=response_time_ms, message="Unexpected text was present", ) message = "Website check passed" if tls_message: message = f"{message}; {tls_message}" return WebsiteCheckResult(status="up", response_time_ms=response_time_ms, message=message) async def check_tls_expiry(url: str, warning_days: int, timeout_seconds: float) -> TlsExpiryResult: parsed_url = urlparse(url) if parsed_url.scheme != "https": return TlsExpiryResult(status="up", response_time_ms=0, message="TLS expiry check skipped for non-HTTPS target") if not parsed_url.hostname: return TlsExpiryResult(status="down", response_time_ms=0, message="TLS check could not determine the hostname") started = perf_counter() try: expires_at = await _get_certificate_expiry(parsed_url.hostname, parsed_url.port or 443, timeout_seconds) except (OSError, ssl.SSLError, ValueError) as exc: return TlsExpiryResult(status="down", response_time_ms=int((perf_counter() - started) * 1000), message=f"TLS certificate check failed: {exc}") response_time_ms = int((perf_counter() - started) * 1000) now = datetime.now(UTC) days_remaining = (expires_at - now).total_seconds() / 86400 days_remaining_text = str(max(0, int(days_remaining))) if days_remaining < 0: return TlsExpiryResult( status="down", response_time_ms=response_time_ms, message=f"TLS certificate expired on {expires_at.date().isoformat()}", ) if days_remaining <= warning_days: return TlsExpiryResult( status="warning", response_time_ms=response_time_ms, message=f"TLS certificate expires in {days_remaining_text} days on {expires_at.date().isoformat()}", ) return TlsExpiryResult( status="up", response_time_ms=response_time_ms, message=f"TLS certificate valid for {days_remaining_text} days until {expires_at.date().isoformat()}", ) async def _get_certificate_expiry(hostname: str, port: int, timeout_seconds: float) -> datetime: return await asyncio.to_thread(_get_certificate_expiry_sync, hostname, port, timeout_seconds) def _get_certificate_expiry_sync(hostname: str, port: int, timeout_seconds: float) -> datetime: context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE with socket.create_connection((hostname, port), timeout=timeout_seconds) as sock: with context.wrap_socket(sock, server_hostname=hostname) as tls_sock: certificate_bytes = tls_sock.getpeercert(binary_form=True) if not certificate_bytes: raise ValueError("certificate did not include an expiry date") certificate = x509.load_der_x509_certificate(certificate_bytes) return certificate.not_valid_after_utc