102 lines
3.6 KiB
Python
102 lines
3.6 KiB
Python
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.auth.dependencies import get_current_user, require_role
|
|
from app.core.secrets import encrypt_secret
|
|
from app.db.session import get_db
|
|
from app.models import Credential, User
|
|
from app.schemas.core import SnmpCredentialProfileCreate, SnmpCredentialProfileRead, SnmpCredentialProfileUpdate
|
|
|
|
router = APIRouter(prefix="/credentials", tags=["credentials"])
|
|
|
|
SNMP_CREDENTIAL_TYPE = "snmp"
|
|
|
|
|
|
def _snmp_profile_to_read(profile: Credential) -> SnmpCredentialProfileRead:
|
|
extra = dict(profile.extra or {})
|
|
return SnmpCredentialProfileRead(
|
|
id=profile.id,
|
|
name=profile.name,
|
|
credential_type=profile.credential_type,
|
|
version=str(extra.get("version") or "2c"),
|
|
port=int(extra.get("port") or 161),
|
|
timeout_seconds=int(extra.get("timeout_seconds") or 5),
|
|
retries=int(extra.get("retries") or 1),
|
|
has_secret=bool(profile.encrypted_secret),
|
|
created_at=profile.created_at,
|
|
updated_at=profile.updated_at,
|
|
)
|
|
|
|
|
|
@router.get("/snmp", response_model=list[SnmpCredentialProfileRead])
|
|
def list_snmp_profiles(_: User = Depends(get_current_user), db: Session = Depends(get_db)) -> list[SnmpCredentialProfileRead]:
|
|
profiles = db.scalars(
|
|
select(Credential).where(Credential.credential_type == SNMP_CREDENTIAL_TYPE).order_by(Credential.name)
|
|
).all()
|
|
return [_snmp_profile_to_read(profile) for profile in profiles]
|
|
|
|
|
|
@router.post("/snmp", response_model=SnmpCredentialProfileRead)
|
|
def create_snmp_profile(
|
|
payload: SnmpCredentialProfileCreate,
|
|
_: User = Depends(require_role("admin")),
|
|
db: Session = Depends(get_db),
|
|
) -> SnmpCredentialProfileRead:
|
|
profile = Credential(
|
|
name=payload.name,
|
|
credential_type=SNMP_CREDENTIAL_TYPE,
|
|
encrypted_secret=encrypt_secret(payload.community),
|
|
extra={
|
|
"version": payload.version,
|
|
"port": payload.port,
|
|
"timeout_seconds": payload.timeout_seconds,
|
|
"retries": payload.retries,
|
|
},
|
|
)
|
|
db.add(profile)
|
|
db.commit()
|
|
db.refresh(profile)
|
|
return _snmp_profile_to_read(profile)
|
|
|
|
|
|
@router.patch("/snmp/{profile_id}", response_model=SnmpCredentialProfileRead)
|
|
def update_snmp_profile(
|
|
profile_id: int,
|
|
payload: SnmpCredentialProfileUpdate,
|
|
_: User = Depends(require_role("admin")),
|
|
db: Session = Depends(get_db),
|
|
) -> SnmpCredentialProfileRead:
|
|
profile = db.get(Credential, profile_id)
|
|
if profile is None or profile.credential_type != SNMP_CREDENTIAL_TYPE:
|
|
raise HTTPException(status_code=404, detail="SNMP credential profile not found")
|
|
|
|
changes = payload.model_dump(exclude_unset=True)
|
|
community = changes.pop("community", None)
|
|
if community is not None:
|
|
profile.encrypted_secret = encrypt_secret(community)
|
|
if "name" in changes:
|
|
profile.name = changes.pop("name")
|
|
|
|
next_extra = dict(profile.extra or {})
|
|
for field, value in changes.items():
|
|
next_extra[field] = value
|
|
profile.extra = next_extra
|
|
|
|
db.commit()
|
|
db.refresh(profile)
|
|
return _snmp_profile_to_read(profile)
|
|
|
|
|
|
@router.delete("/snmp/{profile_id}", status_code=204)
|
|
def delete_snmp_profile(
|
|
profile_id: int,
|
|
_: User = Depends(require_role("admin")),
|
|
db: Session = Depends(get_db),
|
|
) -> None:
|
|
profile = db.get(Credential, profile_id)
|
|
if profile is None or profile.credential_type != SNMP_CREDENTIAL_TYPE:
|
|
raise HTTPException(status_code=404, detail="SNMP credential profile not found")
|
|
db.delete(profile)
|
|
db.commit()
|