3b75075426
Add optional TLS certificate expiry checks for website monitors and update product, package, environment, Docker, and documentation naming.
114 lines
4.3 KiB
Python
114 lines
4.3 KiB
Python
from dataclasses import dataclass
|
|
from datetime import UTC, datetime
|
|
import asyncio
|
|
import socket
|
|
import ssl
|
|
from time import perf_counter
|
|
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
from cryptography import x509
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class WebsiteCheckConfig:
|
|
url: str
|
|
expected_status: int = 200
|
|
expected_text: str | None = None
|
|
unexpected_text: str | None = None
|
|
timeout_seconds: float = 10.0
|
|
check_tls_expiry: bool = False
|
|
tls_warning_days: int = 30
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class WebsiteCheckResult:
|
|
status: str
|
|
response_time_ms: int | None
|
|
message: str
|
|
|
|
|
|
async def run_website_check(config: WebsiteCheckConfig) -> WebsiteCheckResult:
|
|
started = perf_counter()
|
|
|
|
if config.check_tls_expiry:
|
|
tls_result = await check_tls_expiry(config.url, config.tls_warning_days, config.timeout_seconds)
|
|
if tls_result is not None:
|
|
return tls_result
|
|
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=config.timeout_seconds) as client:
|
|
response = await client.get(config.url)
|
|
except httpx.HTTPError as exc:
|
|
return WebsiteCheckResult(status="down", response_time_ms=None, message=str(exc))
|
|
|
|
response_time_ms = int((perf_counter() - started) * 1000)
|
|
if response.status_code != config.expected_status:
|
|
return WebsiteCheckResult(
|
|
status="down",
|
|
response_time_ms=response_time_ms,
|
|
message=f"Expected HTTP {config.expected_status}, got {response.status_code}",
|
|
)
|
|
if config.expected_text and config.expected_text not in response.text:
|
|
return WebsiteCheckResult(
|
|
status="down",
|
|
response_time_ms=response_time_ms,
|
|
message="Expected text was not present",
|
|
)
|
|
if config.unexpected_text and config.unexpected_text in response.text:
|
|
return WebsiteCheckResult(
|
|
status="down",
|
|
response_time_ms=response_time_ms,
|
|
message="Unexpected text was present",
|
|
)
|
|
return WebsiteCheckResult(status="up", response_time_ms=response_time_ms, message="Website check passed")
|
|
|
|
|
|
async def check_tls_expiry(url: str, warning_days: int, timeout_seconds: float) -> WebsiteCheckResult | None:
|
|
parsed_url = urlparse(url)
|
|
if parsed_url.scheme != "https":
|
|
return None
|
|
if not parsed_url.hostname:
|
|
return WebsiteCheckResult(status="down", response_time_ms=None, message="TLS check could not determine the hostname")
|
|
|
|
started = perf_counter()
|
|
try:
|
|
expires_at = await _get_certificate_expiry(parsed_url.hostname, parsed_url.port or 443, timeout_seconds)
|
|
except (OSError, ssl.SSLError, ValueError) as exc:
|
|
return WebsiteCheckResult(status="down", response_time_ms=None, message=f"TLS certificate check failed: {exc}")
|
|
|
|
response_time_ms = int((perf_counter() - started) * 1000)
|
|
now = datetime.now(UTC)
|
|
days_remaining = (expires_at - now).total_seconds() / 86400
|
|
if days_remaining < 0:
|
|
return WebsiteCheckResult(
|
|
status="down",
|
|
response_time_ms=response_time_ms,
|
|
message=f"TLS certificate expired on {expires_at.date().isoformat()}",
|
|
)
|
|
if days_remaining <= warning_days:
|
|
return WebsiteCheckResult(
|
|
status="warning",
|
|
response_time_ms=response_time_ms,
|
|
message=f"TLS certificate expires in {int(days_remaining)} days on {expires_at.date().isoformat()}",
|
|
)
|
|
return None
|
|
|
|
|
|
async def _get_certificate_expiry(hostname: str, port: int, timeout_seconds: float) -> datetime:
|
|
return await asyncio.to_thread(_get_certificate_expiry_sync, hostname, port, timeout_seconds)
|
|
|
|
|
|
def _get_certificate_expiry_sync(hostname: str, port: int, timeout_seconds: float) -> datetime:
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
with socket.create_connection((hostname, port), timeout=timeout_seconds) as sock:
|
|
with context.wrap_socket(sock, server_hostname=hostname) as tls_sock:
|
|
certificate_bytes = tls_sock.getpeercert(binary_form=True)
|
|
|
|
if not certificate_bytes:
|
|
raise ValueError("certificate did not include an expiry date")
|
|
certificate = x509.load_der_x509_certificate(certificate_bytes)
|
|
return certificate.not_valid_after_utc
|