from fastapi import FastAPI, HTTPException, Depends, status from contextlib import asynccontextmanager from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, List import psycopg2 import psycopg2.extras import bcrypt import jwt import os import secrets import string from datetime import datetime, timedelta from notifier import notify_both, notify_email_only, notify_discord_only from monitor import start_scheduler @asynccontextmanager async def lifespan(app: FastAPI): scheduler = start_scheduler() yield scheduler.shutdown() app = FastAPI(title="Web Portal API", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) SECRET_KEY = os.getenv("JWT_SECRET", "supersecretkey1234") ALGORITHM = "HS256" security = HTTPBearer() MAX_LOGIN_ATTEMPTS = 5 DB_CONFIG = { "host": os.getenv("DB_HOST", "postgres-service"), "port": int(os.getenv("DB_PORT", "5432")), "database": os.getenv("DB_NAME", "portaldb"), "user": os.getenv("DB_USER", "portaluser"), "password": os.getenv("DB_PASSWORD", "portalpass"), } def get_db(): conn = psycopg2.connect(**DB_CONFIG) try: yield conn finally: conn.close() def init_db(): conn = psycopg2.connect(**DB_CONFIG) cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, username VARCHAR(100) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, is_admin BOOLEAN DEFAULT FALSE, must_change_password BOOLEAN DEFAULT TRUE, login_attempts INTEGER DEFAULT 0, is_locked BOOLEAN DEFAULT FALSE, password_change_requested BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS webpages ( id SERIAL PRIMARY KEY, name VARCHAR(200) NOT NULL, url VARCHAR(500) NOT NULL, description TEXT, created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS user_webpage_access ( user_id INTEGER REFERENCES users(id) ON DELETE CASCADE, webpage_id INTEGER REFERENCES webpages(id) ON DELETE CASCADE, PRIMARY KEY (user_id, webpage_id) ); CREATE TABLE IF NOT EXISTS notify_config ( key VARCHAR(100) PRIMARY KEY, value TEXT NOT NULL, updated_at TIMESTAMP DEFAULT NOW() ); """) # 컬럼 추가 (기존 DB 마이그레이션) for col, definition in [ ("must_change_password", "BOOLEAN DEFAULT TRUE"), ("login_attempts", "INTEGER DEFAULT 0"), ("is_locked", "BOOLEAN DEFAULT FALSE"), ("password_change_requested", "BOOLEAN DEFAULT FALSE"), ]: try: cur.execute(f"ALTER TABLE users ADD COLUMN IF NOT EXISTS {col} {definition}") except Exception: pass # 기본 관리자 계정 (최초 로그인 시 비밀번호 변경 불필요) cur.execute("SELECT id FROM users WHERE username = 'admin'") if not cur.fetchone(): hashed = bcrypt.hashpw("admin1234".encode(), bcrypt.gensalt()).decode() cur.execute( "INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, TRUE, FALSE)", ("admin", hashed) ) # 테스트 일반 사용자 (최초 로그인 시 비밀번호 변경 필요) cur.execute("SELECT id FROM users WHERE username = 'user1'") if not cur.fetchone(): hashed = bcrypt.hashpw("user1234".encode(), bcrypt.gensalt()).decode() cur.execute( "INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, FALSE, TRUE)", ("user1", hashed) ) conn.commit() cur.close() conn.close() @app.on_event("startup") def startup(): import time for _ in range(10): try: init_db() print("DB initialized successfully") break except Exception as e: print(f"DB not ready, retrying... {e}") time.sleep(3) def create_token(user_id: int, username: str, is_admin: bool, must_change_password: bool): payload = { "sub": str(user_id), "username": username, "is_admin": is_admin, "must_change_password": must_change_password, "exp": datetime.utcnow() + timedelta(hours=8) } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") def require_admin(token=Depends(verify_token)): if not token.get("is_admin"): raise HTTPException(status_code=403, detail="Admin only") return token def generate_temp_password(length=10): chars = string.ascii_letters + string.digits return ''.join(secrets.choice(chars) for _ in range(length)) # ─── Auth ─────────────────────────────────────────────── class LoginRequest(BaseModel): username: str password: str @app.post("/api/auth/login") async def login(req: LoginRequest, conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT * FROM users WHERE username = %s", (req.username,)) user = cur.fetchone() if not user: raise HTTPException(status_code=401, detail="Invalid credentials") # 계정 잠금 확인 if user["is_locked"]: raise HTTPException(status_code=403, detail="Account locked. Please contact admin.") # 비밀번호 검증 if not bcrypt.checkpw(req.password.encode(), user["password_hash"].encode()): attempts = (user["login_attempts"] or 0) + 1 locked = attempts >= MAX_LOGIN_ATTEMPTS cur.execute( "UPDATE users SET login_attempts=%s, is_locked=%s, password_change_requested=%s WHERE id=%s", (attempts, locked, locked, user["id"]) ) conn.commit() if locked: await notify_discord_only( title="🔒 계정 잠금 발생", message=( f"사용자: `{req.username}`\n" f"사유: 비밀번호 {MAX_LOGIN_ATTEMPTS}회 오류로 계정이 잠겼습니다.\n" f"관리자 페이지에서 잠금 해제 또는 임시 비밀번호를 발급해주세요." ), color=0xe74c3c ) raise HTTPException(status_code=403, detail="Account locked due to too many failed attempts. Please contact admin.") remaining = MAX_LOGIN_ATTEMPTS - attempts raise HTTPException(status_code=401, detail=f"Invalid credentials. {remaining} attempts remaining.") # 로그인 성공 - 실패 횟수 초기화 cur.execute("UPDATE users SET login_attempts=0 WHERE id=%s", (user["id"],)) conn.commit() token = create_token(user["id"], user["username"], user["is_admin"], user["must_change_password"]) return { "token": token, "username": user["username"], "is_admin": user["is_admin"], "must_change_password": user["must_change_password"] } @app.get("/api/auth/me") def me(token=Depends(verify_token)): return { "username": token["username"], "is_admin": token["is_admin"], "must_change_password": token.get("must_change_password", False) } # ─── 비밀번호 변경 (본인) ──────────────────────────────── class ChangePasswordRequest(BaseModel): current_password: str new_password: str @app.post("/api/auth/change-password") def change_password(req: ChangePasswordRequest, token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) user_id = int(token["sub"]) cur.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user = cur.fetchone() if not bcrypt.checkpw(req.current_password.encode(), user["password_hash"].encode()): raise HTTPException(status_code=400, detail="Current password is incorrect") if len(req.new_password) < 6: raise HTTPException(status_code=400, detail="New password must be at least 6 characters") hashed = bcrypt.hashpw(req.new_password.encode(), bcrypt.gensalt()).decode() cur.execute( "UPDATE users SET password_hash=%s, must_change_password=FALSE, login_attempts=0, is_locked=FALSE WHERE id=%s", (hashed, user_id) ) conn.commit() new_token = create_token(user["id"], user["username"], user["is_admin"], False) return {"ok": True, "token": new_token} # ─── My Pages ─────────────────────────────────────────── @app.get("/api/my-pages") def my_pages(token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) user_id = int(token["sub"]) if token["is_admin"]: cur.execute("SELECT * FROM webpages ORDER BY name") else: cur.execute(""" SELECT w.* FROM webpages w JOIN user_webpage_access ua ON w.id = ua.webpage_id WHERE ua.user_id = %s ORDER BY w.name """, (user_id,)) return cur.fetchall() # ─── Admin: Webpages CRUD ──────────────────────────────── @app.get("/api/admin/webpages") def list_webpages(token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT * FROM webpages ORDER BY name") return cur.fetchall() class WebpageCreate(BaseModel): name: str url: str description: Optional[str] = "" @app.post("/api/admin/webpages") def create_webpage(data: WebpageCreate, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( "INSERT INTO webpages (name, url, description) VALUES (%s, %s, %s) RETURNING *", (data.name, data.url, data.description) ) conn.commit() return cur.fetchone() @app.put("/api/admin/webpages/{page_id}") def update_webpage(page_id: int, data: WebpageCreate, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( "UPDATE webpages SET name=%s, url=%s, description=%s WHERE id=%s RETURNING *", (data.name, data.url, data.description, page_id) ) result = cur.fetchone() if not result: raise HTTPException(status_code=404, detail="Not found") conn.commit() return result @app.delete("/api/admin/webpages/{page_id}") def delete_webpage(page_id: int, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor() cur.execute("DELETE FROM webpages WHERE id = %s", (page_id,)) conn.commit() return {"ok": True} # ─── Admin: Users ──────────────────────────────────────── @app.get("/api/admin/users") def list_users(token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute(""" SELECT id, username, is_admin, must_change_password, login_attempts, is_locked, password_change_requested, created_at FROM users ORDER BY username """) return cur.fetchall() class UserCreate(BaseModel): username: str password: str is_admin: bool = False @app.post("/api/admin/users") def create_user(data: UserCreate, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) hashed = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode() try: cur.execute( """INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, %s, TRUE) RETURNING id, username, is_admin""", (data.username, hashed, data.is_admin) ) conn.commit() return cur.fetchone() except psycopg2.errors.UniqueViolation: raise HTTPException(status_code=400, detail="Username already exists") @app.delete("/api/admin/users/{user_id}") def delete_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor() cur.execute("DELETE FROM users WHERE id = %s AND username != 'admin'", (user_id,)) conn.commit() return {"ok": True} # ─── Admin: 비밀번호 변경 (관리자가 타 사용자) ────────────── class AdminPasswordChange(BaseModel): new_password: str @app.put("/api/admin/users/{user_id}/password") def admin_change_password(user_id: int, data: AdminPasswordChange, token=Depends(require_admin), conn=Depends(get_db)): if len(data.new_password) < 6: raise HTTPException(status_code=400, detail="Password must be at least 6 characters") cur = conn.cursor() hashed = bcrypt.hashpw(data.new_password.encode(), bcrypt.gensalt()).decode() cur.execute( """UPDATE users SET password_hash=%s, must_change_password=TRUE, login_attempts=0, is_locked=FALSE, password_change_requested=FALSE WHERE id=%s AND username != 'admin'""", (hashed, user_id) ) conn.commit() return {"ok": True} # ─── Admin: 임시 비밀번호 발급 ─────────────────────────── @app.post("/api/admin/users/{user_id}/reset-password") async def reset_password(user_id: int, token=Depends(require_admin), conn=Depends(get_db)): temp_pw = generate_temp_password() hashed = bcrypt.hashpw(temp_pw.encode(), bcrypt.gensalt()).decode() cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT username FROM users WHERE id=%s", (user_id,)) user = cur.fetchone() cur.execute( """UPDATE users SET password_hash=%s, must_change_password=TRUE, login_attempts=0, is_locked=FALSE, password_change_requested=FALSE WHERE id=%s""", (hashed, user_id) ) conn.commit() if user: await notify_discord_only( title="🔑 임시 비밀번호 발급", message=( f"관리자 `{token['username']}` 이(가) 임시 비밀번호를 발급했습니다.\n" f"대상 사용자: `{user['username']}`" ), color=0x3498db ) return {"ok": True, "temp_password": temp_pw} # ─── Admin: 계정 잠금 해제 ─────────────────────────────── @app.post("/api/admin/users/{user_id}/unlock") def unlock_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor() cur.execute( "UPDATE users SET is_locked=FALSE, login_attempts=0 WHERE id=%s", (user_id,) ) conn.commit() return {"ok": True} # ─── Admin: User Access ────────────────────────────────── @app.get("/api/admin/users/{user_id}/pages") def get_user_pages(user_id: int, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute(""" SELECT w.id, w.name, w.url, w.description, (ua.user_id IS NOT NULL) as has_access FROM webpages w LEFT JOIN user_webpage_access ua ON w.id = ua.webpage_id AND ua.user_id = %s ORDER BY w.name """, (user_id,)) return cur.fetchall() class AccessUpdate(BaseModel): webpage_ids: List[int] @app.put("/api/admin/users/{user_id}/pages") def update_user_pages(user_id: int, data: AccessUpdate, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor() cur.execute("DELETE FROM user_webpage_access WHERE user_id = %s", (user_id,)) for page_id in data.webpage_ids: cur.execute( "INSERT INTO user_webpage_access (user_id, webpage_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (user_id, page_id) ) conn.commit() return {"ok": True} # ─── Admin: 알림 설정 ──────────────────────────────────── class NotifyConfig(BaseModel): discord_webhook_url: Optional[str] = "" gmail_user: Optional[str] = "" gmail_app_password: Optional[str] = "" alert_email_to: Optional[str] = "" def get_notify_config_from_db(conn) -> dict: """DB에서 알림 설정 조회, 없으면 환경변수 fallback""" cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT key, value FROM notify_config") rows = cur.fetchall() db_config = {r["key"]: r["value"] for r in rows} return { "discord_webhook_url": db_config.get("discord_webhook_url", os.getenv("DISCORD_WEBHOOK_URL", "")), "gmail_user": db_config.get("gmail_user", os.getenv("GMAIL_USER", "")), "gmail_app_password": db_config.get("gmail_app_password", os.getenv("GMAIL_APP_PASSWORD", "")), "alert_email_to": db_config.get("alert_email_to", os.getenv("ALERT_EMAIL_TO", "")), } @app.get("/api/admin/notify-config") def get_notify_config(token=Depends(require_admin), conn=Depends(get_db)): cfg = get_notify_config_from_db(conn) # 비밀번호는 설정 여부만 반환 (보안) return { "discord_webhook_url": cfg["discord_webhook_url"], "gmail_user": cfg["gmail_user"], "gmail_app_password": "••••••••" if cfg["gmail_app_password"] else "", "alert_email_to": cfg["alert_email_to"], } @app.put("/api/admin/notify-config") def save_notify_config(data: NotifyConfig, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor() fields = { "discord_webhook_url": data.discord_webhook_url, "gmail_user": data.gmail_user, "alert_email_to": data.alert_email_to, } # 비밀번호가 마스킹값이면 업데이트 안 함 if data.gmail_app_password and data.gmail_app_password != "••••••••": fields["gmail_app_password"] = data.gmail_app_password for key, value in fields.items(): if value is not None: cur.execute(""" INSERT INTO notify_config (key, value, updated_at) VALUES (%s, %s, NOW()) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value, updated_at=NOW() """, (key, value)) conn.commit() # notifier 모듈 환경변수 동적 업데이트 import notifier cfg = get_notify_config_from_db(conn) notifier.DISCORD_WEBHOOK_URL = cfg["discord_webhook_url"] notifier.GMAIL_USER = cfg["gmail_user"] notifier.GMAIL_APP_PASSWORD = cfg["gmail_app_password"] notifier.ALERT_EMAIL_TO = cfg["alert_email_to"] return {"ok": True} @app.get("/api/admin/notify-test") async def notify_test(token=Depends(require_admin)): # Discord + Gmail (Pod 이상/복구 테스트) await notify_both( title="✅ [Discord+Gmail] 알림 테스트", message=( f"관리자 `{token['username']}` 이(가) 알림 테스트를 실행했습니다.\n" f"Pod 이상/복구 알림 채널입니다." ), color=0x2ecc71 ) # Gmail only (인증서 만료 테스트) await notify_email_only( title="✅ [Gmail 전용] 알림 테스트", message=( f"인증서 만료 임박 알림 채널입니다.\n" f"Gmail로만 발송됩니다." ), color=0xf39c12 ) # Discord only (계정 잠금/임시PW 테스트) await notify_discord_only( title="✅ [Discord 전용] 알림 테스트", message=( f"계정 잠금/임시 비밀번호 발급 알림 채널입니다.\n" f"Discord로만 발송됩니다." ), color=0x3498db ) return {"ok": True, "message": "채널별 알림 테스트 완료 (Discord+Gmail / Gmail전용 / Discord전용)"} @app.get("/health") def health(): return {"status": "ok"} # ─── 게시판 ────────────────────────────────────────────── class PostCreate(BaseModel): title: str content: str class ReplyCreate(BaseModel): content: str def init_board_db(): conn = psycopg2.connect(**DB_CONFIG) cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS board_posts ( id SERIAL PRIMARY KEY, title VARCHAR(300) NOT NULL, content TEXT NOT NULL, author_id INTEGER REFERENCES users(id) ON DELETE SET NULL, author_name VARCHAR(100) NOT NULL, created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS board_replies ( id SERIAL PRIMARY KEY, post_id INTEGER REFERENCES board_posts(id) ON DELETE CASCADE, content TEXT NOT NULL, author_id INTEGER REFERENCES users(id) ON DELETE SET NULL, author_name VARCHAR(100) NOT NULL, created_at TIMESTAMP DEFAULT NOW() ); """) conn.commit() cur.close() conn.close() @app.on_event("startup") def startup_board(): import time for _ in range(10): try: init_board_db() print("Board DB initialized") break except Exception as e: print(f"Board DB not ready... {e}") time.sleep(3) @app.get("/api/board/posts") def list_posts(token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT id, title, author_name, created_at FROM board_posts ORDER BY created_at DESC") return cur.fetchall() @app.post("/api/board/posts") def create_post(data: PostCreate, token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( "INSERT INTO board_posts (title, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *", (data.title, data.content, int(token["sub"]), token["username"]) ) conn.commit() return cur.fetchone() @app.get("/api/board/posts/{post_id}") def get_post(post_id: int, token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT * FROM board_posts WHERE id = %s", (post_id,)) post = cur.fetchone() if not post: raise HTTPException(status_code=404, detail="Post not found") cur.execute("SELECT * FROM board_replies WHERE post_id = %s ORDER BY created_at ASC", (post_id,)) replies = cur.fetchall() return {"post": post, "replies": replies} @app.delete("/api/board/posts/{post_id}") def delete_post(post_id: int, token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT * FROM board_posts WHERE id = %s", (post_id,)) post = cur.fetchone() if not post: raise HTTPException(status_code=404, detail="Post not found") if not token.get("is_admin") and post["author_id"] != int(token["sub"]): raise HTTPException(status_code=403, detail="Permission denied") cur.execute("DELETE FROM board_posts WHERE id = %s", (post_id,)) conn.commit() return {"ok": True} @app.post("/api/board/posts/{post_id}/replies") def create_reply(post_id: int, data: ReplyCreate, token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( "INSERT INTO board_replies (post_id, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *", (post_id, data.content, int(token["sub"]), token["username"]) ) conn.commit() return cur.fetchone() @app.delete("/api/board/replies/{reply_id}") def delete_reply(reply_id: int, token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT * FROM board_replies WHERE id = %s", (reply_id,)) reply = cur.fetchone() if not reply: raise HTTPException(status_code=404, detail="Reply not found") if not token.get("is_admin") and reply["author_id"] != int(token["sub"]): raise HTTPException(status_code=403, detail="Permission denied") cur.execute("DELETE FROM board_replies WHERE id = %s", (reply_id,)) conn.commit() return {"ok": True} # ─── 공지사항 ────────────────────────────────────────────── class NoticeCreate(BaseModel): title: str content: str class NoticeReplyCreate(BaseModel): content: str def init_notice_db(): conn = psycopg2.connect(**DB_CONFIG) cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS notice_posts ( id SERIAL PRIMARY KEY, title VARCHAR(300) NOT NULL, content TEXT NOT NULL, author_id INTEGER REFERENCES users(id) ON DELETE SET NULL, author_name VARCHAR(100) NOT NULL, created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS notice_replies ( id SERIAL PRIMARY KEY, post_id INTEGER REFERENCES notice_posts(id) ON DELETE CASCADE, content TEXT NOT NULL, author_id INTEGER REFERENCES users(id) ON DELETE SET NULL, author_name VARCHAR(100) NOT NULL, created_at TIMESTAMP DEFAULT NOW() ); """) conn.commit() cur.close() conn.close() @app.on_event("startup") def startup_notice(): import time for _ in range(10): try: init_notice_db() print("Notice DB initialized") break except Exception as e: print(f"Notice DB not ready... {e}") time.sleep(3) @app.get("/api/notice/posts") def list_notice_posts(token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT id, title, author_name, created_at FROM notice_posts ORDER BY created_at DESC") return cur.fetchall() @app.post("/api/notice/posts") def create_notice_post(data: NoticeCreate, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( "INSERT INTO notice_posts (title, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *", (data.title, data.content, int(token["sub"]), token["username"]) ) conn.commit() return cur.fetchone() @app.get("/api/notice/posts/{post_id}") def get_notice_post(post_id: int, token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT * FROM notice_posts WHERE id = %s", (post_id,)) post = cur.fetchone() if not post: raise HTTPException(status_code=404, detail="Post not found") cur.execute("SELECT * FROM notice_replies WHERE post_id = %s ORDER BY created_at ASC", (post_id,)) replies = cur.fetchall() return {"post": post, "replies": replies} @app.delete("/api/notice/posts/{post_id}") def delete_notice_post(post_id: int, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor() cur.execute("DELETE FROM notice_posts WHERE id = %s", (post_id,)) conn.commit() return {"ok": True} @app.post("/api/notice/posts/{post_id}/replies") def create_notice_reply(post_id: int, data: NoticeReplyCreate, token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( "INSERT INTO notice_replies (post_id, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *", (post_id, data.content, int(token["sub"]), token["username"]) ) conn.commit() return cur.fetchone() @app.delete("/api/notice/replies/{reply_id}") def delete_notice_reply(reply_id: int, token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT * FROM notice_replies WHERE id = %s", (reply_id,)) reply = cur.fetchone() if not reply: raise HTTPException(status_code=404, detail="Reply not found") if not token.get("is_admin") and reply["author_id"] != int(token["sub"]): raise HTTPException(status_code=403, detail="Permission denied") cur.execute("DELETE FROM notice_replies WHERE id = %s", (reply_id,)) conn.commit() return {"ok": True}