from fastapi.testclient import TestClient from sqlalchemy import select from sqlalchemy.orm import Session from app.core.secrets import encrypt_secret from app.models import AlertRule, Asset, Credential, Monitor def test_create_website_monitor_creates_asset_and_alert_rule(client: TestClient, db_session: Session) -> None: response = client.post( "/monitors/website", json={ "name": "Example Site", "url": "https://example.com", "expected_status": 200, "expected_text": "Example Domain", "unexpected_text": None, "timeout_seconds": 7, "check_tls_expiry": True, "tls_warning_days": 45, "interval_seconds": 60, "create_asset": True, "alert_enabled": True, "alert_severity": "critical", "failure_threshold": 2, }, ) assert response.status_code == 200 body = response.json() assert body["name"] == "Example Site" assert body["monitor_type"] == "http" assert body["target"] == "https://example.com" assert body["config"]["check_tls_expiry"] is True assert body["config"]["tls_warning_days"] == 45 monitor = db_session.get(Monitor, body["id"]) assert monitor is not None assert monitor.asset_id is not None asset = db_session.get(Asset, monitor.asset_id) assert asset is not None assert asset.name == "Example Site" assert asset.asset_type == "website" assert asset.address == "https://example.com" rule = db_session.scalar(select(AlertRule).where(AlertRule.monitor_id == monitor.id)) assert rule is not None assert rule.name == "Example Site website failure" assert rule.severity == "critical" assert rule.condition == {"type": "status_not_up"} assert rule.failure_threshold == 2 assert rule.cooldown_seconds == 300 assert rule.is_enabled is True def test_create_website_monitor_can_skip_default_alert_rule(client: TestClient, db_session: Session) -> None: response = client.post( "/monitors/website", json={ "name": "Status Only", "url": "https://status.example.com", "interval_seconds": 120, "create_asset": False, "alert_enabled": False, }, ) assert response.status_code == 200 body = response.json() monitor = db_session.get(Monitor, body["id"]) assert monitor is not None assert monitor.asset_id is None assert db_session.scalars(select(AlertRule).where(AlertRule.monitor_id == monitor.id)).all() == [] def test_create_website_monitor_can_attach_existing_asset_without_default_alert(client: TestClient, db_session: Session) -> None: asset = Asset(name="Existing App", asset_type="application", address="app.example.com", status="unknown", extra={}) db_session.add(asset) db_session.commit() response = client.post( "/monitors/website", json={ "name": "Existing App HTTPS", "url": "https://app.example.com", "asset_id": asset.id, "create_asset": True, "alert_enabled": False, }, ) assert response.status_code == 200 body = response.json() monitor = db_session.get(Monitor, body["id"]) assert monitor is not None assert monitor.asset_id == asset.id assert db_session.scalars(select(AlertRule).where(AlertRule.monitor_id == monitor.id)).all() == [] assert db_session.scalars(select(Asset)).all() == [asset] def test_create_ping_monitor_rejects_missing_asset(client: TestClient) -> None: response = client.post( "/monitors/ping", json={ "name": "Missing Asset Ping", "host": "192.0.2.10", "asset_id": 999, "create_asset": False, "alert_enabled": False, }, ) assert response.status_code == 404 def test_create_tcp_monitor_can_attach_existing_asset(client: TestClient, db_session: Session) -> None: asset = Asset(name="Router", asset_type="network_device", address="192.0.2.1", status="unknown", extra={}) db_session.add(asset) db_session.commit() response = client.post( "/monitors/tcp", json={ "name": "Router SSH", "host": "192.0.2.1", "port": 22, "asset_id": asset.id, "create_asset": False, "alert_enabled": False, }, ) assert response.status_code == 200 body = response.json() assert body["asset_id"] == asset.id assert body["target"] == "192.0.2.1:22" def test_create_snmp_monitors_from_discovery_attaches_asset_and_skips_alerts(client: TestClient, db_session: Session) -> None: asset = Asset(name="Core Switch", asset_type="network_device", address="192.0.2.10", status="unknown", extra={}) profile = Credential( name="Core Switch Read Only", credential_type="snmp", encrypted_secret=encrypt_secret("private-community"), extra={"version": "2c"}, ) db_session.add_all([asset, profile]) db_session.commit() response = client.post( "/monitors/snmp/from-discovery", json={ "host": "192.0.2.10", "asset_id": asset.id, "credential_profile_id": profile.id, "interval_seconds": 120, "selected_items": [ { "item_id": "device.uptime", "item_type": "device_uptime", "group": "Device Health", "label": "Device uptime", "unit": "seconds", }, { "item_id": "interface.1.status", "item_type": "interface_status", "group": "Interface uplink", "label": "uplink status", "unit": None, }, ], }, ) assert response.status_code == 200 body = response.json() assert len(body) == 2 assert {monitor["name"] for monitor in body} == {"Core Switch Device uptime", "Core Switch uplink status"} assert all(monitor["asset_id"] == asset.id for monitor in body) assert all(monitor["monitor_type"] == "snmp" for monitor in body) assert all(monitor["interval_seconds"] == 120 for monitor in body) assert all("1.3.6" not in str(monitor["config"]) for monitor in body) monitor_ids = [monitor["id"] for monitor in body] assert db_session.scalars(select(AlertRule).where(AlertRule.monitor_id.in_(monitor_ids))).all() == [] def test_create_snmp_monitors_rejects_missing_profile(client: TestClient, db_session: Session) -> None: asset = Asset(name="Core Switch", asset_type="network_device", address="192.0.2.10", status="unknown", extra={}) db_session.add(asset) db_session.commit() response = client.post( "/monitors/snmp/from-discovery", json={ "host": "192.0.2.10", "asset_id": asset.id, "credential_profile_id": 999, "selected_items": [ { "item_id": "device.uptime", "item_type": "device_uptime", "group": "Device Health", "label": "Device uptime", } ], }, ) assert response.status_code == 404