Add worker alert and notification tests

This commit is contained in:
Keith Smith
2026-05-23 19:50:13 -06:00
parent 68d5e0a705
commit 19d4c6e603
3 changed files with 175 additions and 4 deletions
+10 -1
View File
@@ -154,7 +154,10 @@ class Scheduler:
.order_by(Incident.opened_at.desc())
.limit(1)
)
if latest_incident is not None and latest_incident.opened_at + timedelta(seconds=rule.cooldown_seconds) > now:
if (
latest_incident is not None
and self._as_utc(latest_incident.opened_at) + timedelta(seconds=rule.cooldown_seconds) > now
):
return
incident = Incident(
@@ -221,6 +224,12 @@ class Scheduler:
response = await client.post(url, json={"username": username, "text": message})
response.raise_for_status()
@staticmethod
def _as_utc(value: datetime) -> datetime:
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
def _format_incident_message(self, incident: Incident, monitor: Monitor, event_type: str) -> str:
if event_type == "resolved":
title = f"RESOLVED: {monitor.name} recovered"