Add SNMP credential profiles

This commit is contained in:
Keith Smith
2026-05-23 20:11:09 -06:00
parent 19d4c6e603
commit 0cbc6b6ea8
10 changed files with 535 additions and 16 deletions
+101
View File
@@ -0,0 +1,101 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.auth.dependencies import get_current_user, require_role
from app.core.secrets import encrypt_secret
from app.db.session import get_db
from app.models import Credential, User
from app.schemas.core import SnmpCredentialProfileCreate, SnmpCredentialProfileRead, SnmpCredentialProfileUpdate
router = APIRouter(prefix="/credentials", tags=["credentials"])
SNMP_CREDENTIAL_TYPE = "snmp"
def _snmp_profile_to_read(profile: Credential) -> SnmpCredentialProfileRead:
extra = dict(profile.extra or {})
return SnmpCredentialProfileRead(
id=profile.id,
name=profile.name,
credential_type=profile.credential_type,
version=str(extra.get("version") or "2c"),
port=int(extra.get("port") or 161),
timeout_seconds=int(extra.get("timeout_seconds") or 5),
retries=int(extra.get("retries") or 1),
has_secret=bool(profile.encrypted_secret),
created_at=profile.created_at,
updated_at=profile.updated_at,
)
@router.get("/snmp", response_model=list[SnmpCredentialProfileRead])
def list_snmp_profiles(_: User = Depends(get_current_user), db: Session = Depends(get_db)) -> list[SnmpCredentialProfileRead]:
profiles = db.scalars(
select(Credential).where(Credential.credential_type == SNMP_CREDENTIAL_TYPE).order_by(Credential.name)
).all()
return [_snmp_profile_to_read(profile) for profile in profiles]
@router.post("/snmp", response_model=SnmpCredentialProfileRead)
def create_snmp_profile(
payload: SnmpCredentialProfileCreate,
_: User = Depends(require_role("admin")),
db: Session = Depends(get_db),
) -> SnmpCredentialProfileRead:
profile = Credential(
name=payload.name,
credential_type=SNMP_CREDENTIAL_TYPE,
encrypted_secret=encrypt_secret(payload.community),
extra={
"version": payload.version,
"port": payload.port,
"timeout_seconds": payload.timeout_seconds,
"retries": payload.retries,
},
)
db.add(profile)
db.commit()
db.refresh(profile)
return _snmp_profile_to_read(profile)
@router.patch("/snmp/{profile_id}", response_model=SnmpCredentialProfileRead)
def update_snmp_profile(
profile_id: int,
payload: SnmpCredentialProfileUpdate,
_: User = Depends(require_role("admin")),
db: Session = Depends(get_db),
) -> SnmpCredentialProfileRead:
profile = db.get(Credential, profile_id)
if profile is None or profile.credential_type != SNMP_CREDENTIAL_TYPE:
raise HTTPException(status_code=404, detail="SNMP credential profile not found")
changes = payload.model_dump(exclude_unset=True)
community = changes.pop("community", None)
if community is not None:
profile.encrypted_secret = encrypt_secret(community)
if "name" in changes:
profile.name = changes.pop("name")
next_extra = dict(profile.extra or {})
for field, value in changes.items():
next_extra[field] = value
profile.extra = next_extra
db.commit()
db.refresh(profile)
return _snmp_profile_to_read(profile)
@router.delete("/snmp/{profile_id}", status_code=204)
def delete_snmp_profile(
profile_id: int,
_: User = Depends(require_role("admin")),
db: Session = Depends(get_db),
) -> None:
profile = db.get(Credential, profile_id)
if profile is None or profile.credential_type != SNMP_CREDENTIAL_TYPE:
raise HTTPException(status_code=404, detail="SNMP credential profile not found")
db.delete(profile)
db.commit()
+2 -1
View File
@@ -6,7 +6,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.exc import SQLAlchemyError
from app.api import alerts, assets, auth, health, monitors, notifications
from app.api import alerts, assets, auth, credentials, health, monitors, notifications
from app.core.config import settings
from app.db.session import SessionLocal
from app.services.bootstrap import ensure_initial_admin
@@ -45,3 +45,4 @@ app.include_router(assets.router)
app.include_router(monitors.router)
app.include_router(alerts.router)
app.include_router(notifications.router)
app.include_router(credentials.router)
+32
View File
@@ -1,5 +1,6 @@
from datetime import datetime
from typing import Any
from typing import Literal
from pydantic import BaseModel, Field
@@ -179,3 +180,34 @@ class NotificationChannelRead(BaseModel):
updated_at: datetime
model_config = {"from_attributes": True}
class SnmpCredentialProfileCreate(BaseModel):
name: str = Field(min_length=1, max_length=160)
version: Literal["2c"] = "2c"
community: str = Field(min_length=1, max_length=255)
port: int = Field(default=161, ge=1, le=65535)
timeout_seconds: int = Field(default=5, ge=1, le=120)
retries: int = Field(default=1, ge=0, le=10)
class SnmpCredentialProfileUpdate(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=160)
version: Literal["2c"] | None = None
community: str | None = Field(default=None, min_length=1, max_length=255)
port: int | None = Field(default=None, ge=1, le=65535)
timeout_seconds: int | None = Field(default=None, ge=1, le=120)
retries: int | None = Field(default=None, ge=0, le=10)
class SnmpCredentialProfileRead(BaseModel):
id: int
name: str
credential_type: str
version: str
port: int
timeout_seconds: int
retries: int
has_secret: bool
created_at: datetime
updated_at: datetime
+125
View File
@@ -0,0 +1,125 @@
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.core.secrets import decrypt_secret
from app.models import Credential
def test_create_snmp_profile_encrypts_community_and_masks_secret(client: TestClient, db_session: Session) -> None:
response = client.post(
"/credentials/snmp",
json={
"name": "Core Switch Read Only",
"version": "2c",
"community": "public-read",
"port": 161,
"timeout_seconds": 7,
"retries": 2,
},
)
assert response.status_code == 200
body = response.json()
assert body["name"] == "Core Switch Read Only"
assert body["credential_type"] == "snmp"
assert body["version"] == "2c"
assert body["port"] == 161
assert body["timeout_seconds"] == 7
assert body["retries"] == 2
assert body["has_secret"] is True
assert "community" not in body
assert "encrypted_secret" not in body
profile = db_session.get(Credential, body["id"])
assert profile is not None
assert profile.credential_type == "snmp"
assert profile.encrypted_secret != "public-read"
assert decrypt_secret(profile.encrypted_secret) == "public-read"
assert profile.extra == {"version": "2c", "port": 161, "timeout_seconds": 7, "retries": 2}
def test_update_snmp_profile_without_community_preserves_saved_secret(client: TestClient, db_session: Session) -> None:
create_response = client.post(
"/credentials/snmp",
json={
"name": "Access Switch",
"community": "access-read",
},
)
profile_id = create_response.json()["id"]
original_secret = db_session.get(Credential, profile_id).encrypted_secret
update_response = client.patch(
f"/credentials/snmp/{profile_id}",
json={
"name": "Access Switches",
"timeout_seconds": 9,
"retries": 3,
},
)
assert update_response.status_code == 200
body = update_response.json()
assert body["name"] == "Access Switches"
assert body["timeout_seconds"] == 9
assert body["retries"] == 3
assert body["has_secret"] is True
assert "community" not in body
assert "encrypted_secret" not in body
profile = db_session.get(Credential, profile_id)
assert profile is not None
assert profile.encrypted_secret == original_secret
assert decrypt_secret(profile.encrypted_secret) == "access-read"
def test_update_snmp_profile_can_rotate_community(client: TestClient, db_session: Session) -> None:
create_response = client.post(
"/credentials/snmp",
json={
"name": "Router",
"community": "old-community",
},
)
profile_id = create_response.json()["id"]
update_response = client.patch(
f"/credentials/snmp/{profile_id}",
json={
"community": "new-community",
},
)
assert update_response.status_code == 200
assert update_response.json()["has_secret"] is True
profile = db_session.get(Credential, profile_id)
assert profile is not None
assert decrypt_secret(profile.encrypted_secret) == "new-community"
def test_list_and_delete_snmp_profiles(client: TestClient) -> None:
create_response = client.post(
"/credentials/snmp",
json={
"name": "Distribution Switch",
"community": "dist-read",
},
)
profile_id = create_response.json()["id"]
list_response = client.get("/credentials/snmp")
assert list_response.status_code == 200
profiles = list_response.json()
assert len(profiles) == 1
assert profiles[0]["id"] == profile_id
assert profiles[0]["has_secret"] is True
assert "community" not in profiles[0]
assert "encrypted_secret" not in profiles[0]
delete_response = client.delete(f"/credentials/snmp/{profile_id}")
assert delete_response.status_code == 204
list_after_delete_response = client.get("/credentials/snmp")
assert list_after_delete_response.status_code == 200
assert list_after_delete_response.json() == []