""" 주기적으로 실행되는 모니터링 작업 - Pod 상태 체크 (1분마다) → Discord + Gmail - 인증서 만료 임박 체크 (1일마다) → Gmail만 """ import os import asyncio from datetime import datetime, timezone from apscheduler.schedulers.asyncio import AsyncIOScheduler from notifier import notify_both, notify_email_only NAMESPACE = os.getenv("NAMESPACE", "web-portal") ALERT_CERT_DAYS = int(os.getenv("ALERT_CERT_DAYS", "30")) # 중복 알림 방지 캐시 _alerted_pods = set() _alerted_certs = set() # ── K8s 클라이언트 ──────────────────────────────────────── def get_k8s_clients(): try: from kubernetes import client, config try: config.load_incluster_config() except Exception: config.load_kube_config() return client.CoreV1Api(), client.CustomObjectsApi() except Exception as e: print(f"[MONITOR] K8s client init failed: {e}") return None, None # ── Pod 모니터링 → Discord + Gmail ─────────────────────── async def check_pods(): v1, _ = get_k8s_clients() if not v1: return try: pods = v1.list_namespaced_pod(namespace=NAMESPACE) for pod in pods.items: name = pod.metadata.name phase = pod.status.phase reason = "" if pod.status.container_statuses: for cs in pod.status.container_statuses: if cs.state.waiting and cs.state.waiting.reason: reason = cs.state.waiting.reason if cs.restart_count and cs.restart_count >= 5: reason = f"RestartCount={cs.restart_count}" is_unhealthy = ( phase in ("Failed", "Unknown") or reason in ("CrashLoopBackOff", "OOMKilled", "Error", "ImagePullBackOff") ) if is_unhealthy and name not in _alerted_pods: _alerted_pods.add(name) await notify_both( title="🚨 Pod 이상 감지", message=( f"네임스페이스: `{NAMESPACE}`\n" f"Pod: `{name}`\n" f"상태: `{phase}`\n" f"원인: `{reason or '알 수 없음'}`\n\n" f"즉시 확인이 필요합니다." ), color=0xe74c3c ) elif not is_unhealthy and name in _alerted_pods: _alerted_pods.discard(name) await notify_both( title="✅ Pod 복구됨", message=( f"네임스페이스: `{NAMESPACE}`\n" f"Pod: `{name}` 이 정상 상태로 복구되었습니다." ), color=0x2ecc71 ) except Exception as e: print(f"[MONITOR] Pod check error: {e}") # ── 인증서 만료 모니터링 → Gmail만 ─────────────────────── async def check_certificates(): _, custom = get_k8s_clients() if not custom: return try: namespaces = ["web-portal", "gitea", "argocd"] for ns in namespaces: try: certs = custom.list_namespaced_custom_object( group="cert-manager.io", version="v1", namespace=ns, plural="certificates" ) for cert in certs.get("items", []): name = cert["metadata"]["name"] not_after = cert.get("status", {}).get("notAfter", "") if not not_after: continue expiry = datetime.fromisoformat(not_after.replace("Z", "+00:00")) now = datetime.now(timezone.utc) days_left = (expiry - now).days alert_key = f"{ns}/{name}" if days_left <= ALERT_CERT_DAYS and alert_key not in _alerted_certs: _alerted_certs.add(alert_key) await notify_email_only( title="⚠️ 인증서 만료 임박", message=( f"네임스페이스: `{ns}`\n" f"인증서: `{name}`\n" f"만료까지: `{days_left}일 남음`\n" f"만료일: `{expiry.strftime('%Y-%m-%d')}`\n\n" f"cert-manager가 자동 갱신을 시도합니다.\n" f"갱신 실패 시 수동으로 확인하세요." ), color=0xf39c12 ) elif days_left > ALERT_CERT_DAYS and alert_key in _alerted_certs: _alerted_certs.discard(alert_key) except Exception: pass except Exception as e: print(f"[MONITOR] Certificate check error: {e}") # ── 스케줄러 시작 ───────────────────────────────────────── def start_scheduler(): scheduler = AsyncIOScheduler() scheduler.add_job(check_pods, "interval", minutes=1, id="pod_check") scheduler.add_job(check_certificates, "interval", hours=24, id="cert_check") scheduler.start() print("[MONITOR] Scheduler started (Pod: 1min / Cert: 24hr)") return scheduler