3b75075426
Add optional TLS certificate expiry checks for website monitors and update product, package, environment, Docker, and documentation naming.
118 lines
4.1 KiB
Python
118 lines
4.1 KiB
Python
from fastapi import APIRouter, Depends, HTTPException
|
|
import httpx
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.auth.dependencies import get_current_user, require_role
|
|
from app.core.secrets import decrypt_secret, encrypt_secret
|
|
from app.db.session import get_db
|
|
from app.models import NotificationChannel, User
|
|
from app.schemas.core import NotificationChannelCreate, NotificationChannelRead, NotificationChannelUpdate
|
|
|
|
router = APIRouter(prefix="/notifications/channels", tags=["notifications"])
|
|
|
|
|
|
def _channel_to_read(channel: NotificationChannel) -> NotificationChannelRead:
|
|
settings = dict(channel.settings or {})
|
|
settings.setdefault("username", "OrbitalWard")
|
|
return NotificationChannelRead(
|
|
id=channel.id,
|
|
name=channel.name,
|
|
channel_type=channel.channel_type,
|
|
settings=settings,
|
|
has_secret=bool(channel.encrypted_secret),
|
|
is_enabled=channel.is_enabled,
|
|
created_at=channel.created_at,
|
|
updated_at=channel.updated_at,
|
|
)
|
|
|
|
|
|
@router.get("", response_model=list[NotificationChannelRead])
|
|
def list_channels(_: User = Depends(get_current_user), db: Session = Depends(get_db)) -> list[NotificationChannelRead]:
|
|
channels = db.scalars(select(NotificationChannel).order_by(NotificationChannel.name)).all()
|
|
return [_channel_to_read(channel) for channel in channels]
|
|
|
|
|
|
@router.post("", response_model=NotificationChannelRead)
|
|
def create_channel(
|
|
payload: NotificationChannelCreate,
|
|
_: User = Depends(require_role("admin")),
|
|
db: Session = Depends(get_db),
|
|
) -> NotificationChannelRead:
|
|
channel_settings = dict(payload.settings or {})
|
|
channel_settings.setdefault("username", "OrbitalWard")
|
|
channel = NotificationChannel(
|
|
name=payload.name,
|
|
channel_type=payload.channel_type,
|
|
settings=channel_settings,
|
|
encrypted_secret=encrypt_secret(payload.secret),
|
|
is_enabled=payload.is_enabled,
|
|
)
|
|
db.add(channel)
|
|
db.commit()
|
|
db.refresh(channel)
|
|
return _channel_to_read(channel)
|
|
|
|
|
|
@router.post("/{channel_id}/test")
|
|
def test_channel(
|
|
channel_id: int,
|
|
_: User = Depends(require_role("admin")),
|
|
db: Session = Depends(get_db),
|
|
) -> dict[str, str]:
|
|
channel = db.get(NotificationChannel, channel_id)
|
|
if channel is None:
|
|
raise HTTPException(status_code=404, detail="Notification channel not found")
|
|
url = decrypt_secret(channel.encrypted_secret)
|
|
if not url:
|
|
raise HTTPException(status_code=400, detail="Notification channel has no usable secret URL")
|
|
try:
|
|
response = httpx.post(
|
|
url,
|
|
json={
|
|
"username": (channel.settings or {}).get("username") or "OrbitalWard",
|
|
"text": f"OrbitalWard test notification for {channel.name}",
|
|
},
|
|
timeout=10,
|
|
)
|
|
response.raise_for_status()
|
|
except httpx.HTTPError as exc:
|
|
raise HTTPException(status_code=502, detail="Notification test failed") from exc
|
|
return {"status": "sent", "message": "Notification test sent"}
|
|
|
|
|
|
@router.patch("/{channel_id}", response_model=NotificationChannelRead)
|
|
def update_channel(
|
|
channel_id: int,
|
|
payload: NotificationChannelUpdate,
|
|
_: User = Depends(require_role("admin")),
|
|
db: Session = Depends(get_db),
|
|
) -> NotificationChannelRead:
|
|
channel = db.get(NotificationChannel, channel_id)
|
|
if channel is None:
|
|
raise HTTPException(status_code=404, detail="Notification channel not found")
|
|
|
|
changes = payload.model_dump(exclude_unset=True)
|
|
secret = changes.pop("secret", None)
|
|
if secret is not None:
|
|
channel.encrypted_secret = encrypt_secret(secret)
|
|
for field, value in changes.items():
|
|
setattr(channel, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(channel)
|
|
return _channel_to_read(channel)
|
|
|
|
|
|
@router.delete("/{channel_id}", status_code=204)
|
|
def delete_channel(
|
|
channel_id: int,
|
|
_: User = Depends(require_role("admin")),
|
|
db: Session = Depends(get_db),
|
|
) -> None:
|
|
channel = db.get(NotificationChannel, channel_id)
|
|
if channel is None:
|
|
raise HTTPException(status_code=404, detail="Notification channel not found")
|
|
db.delete(channel)
|
|
db.commit()
|