Files
nginx-portal/backend/main.py
qorgh529 6379f8a526
Some checks failed
Build and Push Images / build-backend (push) Has been cancelled
fix: asyncio.create_task를 await로 수정
2026-04-27 18:48:06 +09:00

667 lines
26 KiB
Python
Executable File

from fastapi import FastAPI, HTTPException, Depends, status
from contextlib import asynccontextmanager
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List
import psycopg2
import psycopg2.extras
import bcrypt
import jwt
import os
import secrets
import string
from datetime import datetime, timedelta
from notifier import notify_both, notify_email_only, notify_discord_only
from monitor import start_scheduler
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = start_scheduler()
yield
scheduler.shutdown()
app = FastAPI(title="Web Portal API", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
SECRET_KEY = os.getenv("JWT_SECRET", "supersecretkey1234")
ALGORITHM = "HS256"
security = HTTPBearer()
MAX_LOGIN_ATTEMPTS = 5
DB_CONFIG = {
"host": os.getenv("DB_HOST", "postgres-service"),
"port": int(os.getenv("DB_PORT", "5432")),
"database": os.getenv("DB_NAME", "portaldb"),
"user": os.getenv("DB_USER", "portaluser"),
"password": os.getenv("DB_PASSWORD", "portalpass"),
}
def get_db():
conn = psycopg2.connect(**DB_CONFIG)
try:
yield conn
finally:
conn.close()
def init_db():
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
is_admin BOOLEAN DEFAULT FALSE,
must_change_password BOOLEAN DEFAULT TRUE,
login_attempts INTEGER DEFAULT 0,
is_locked BOOLEAN DEFAULT FALSE,
password_change_requested BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS webpages (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
url VARCHAR(500) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS user_webpage_access (
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
webpage_id INTEGER REFERENCES webpages(id) ON DELETE CASCADE,
PRIMARY KEY (user_id, webpage_id)
);
""")
# 컬럼 추가 (기존 DB 마이그레이션)
for col, definition in [
("must_change_password", "BOOLEAN DEFAULT TRUE"),
("login_attempts", "INTEGER DEFAULT 0"),
("is_locked", "BOOLEAN DEFAULT FALSE"),
("password_change_requested", "BOOLEAN DEFAULT FALSE"),
]:
try:
cur.execute(f"ALTER TABLE users ADD COLUMN IF NOT EXISTS {col} {definition}")
except Exception:
pass
# 기본 관리자 계정 (최초 로그인 시 비밀번호 변경 불필요)
cur.execute("SELECT id FROM users WHERE username = 'admin'")
if not cur.fetchone():
hashed = bcrypt.hashpw("admin1234".encode(), bcrypt.gensalt()).decode()
cur.execute(
"INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, TRUE, FALSE)",
("admin", hashed)
)
# 테스트 일반 사용자 (최초 로그인 시 비밀번호 변경 필요)
cur.execute("SELECT id FROM users WHERE username = 'user1'")
if not cur.fetchone():
hashed = bcrypt.hashpw("user1234".encode(), bcrypt.gensalt()).decode()
cur.execute(
"INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, FALSE, TRUE)",
("user1", hashed)
)
conn.commit()
cur.close()
conn.close()
@app.on_event("startup")
def startup():
import time
for _ in range(10):
try:
init_db()
print("DB initialized successfully")
break
except Exception as e:
print(f"DB not ready, retrying... {e}")
time.sleep(3)
def create_token(user_id: int, username: str, is_admin: bool, must_change_password: bool):
payload = {
"sub": str(user_id),
"username": username,
"is_admin": is_admin,
"must_change_password": must_change_password,
"exp": datetime.utcnow() + timedelta(hours=8)
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
def require_admin(token=Depends(verify_token)):
if not token.get("is_admin"):
raise HTTPException(status_code=403, detail="Admin only")
return token
def generate_temp_password(length=10):
chars = string.ascii_letters + string.digits
return ''.join(secrets.choice(chars) for _ in range(length))
# ─── Auth ───────────────────────────────────────────────
class LoginRequest(BaseModel):
username: str
password: str
@app.post("/api/auth/login")
async def login(req: LoginRequest, conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM users WHERE username = %s", (req.username,))
user = cur.fetchone()
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# 계정 잠금 확인
if user["is_locked"]:
raise HTTPException(status_code=403, detail="Account locked. Please contact admin.")
# 비밀번호 검증
if not bcrypt.checkpw(req.password.encode(), user["password_hash"].encode()):
attempts = (user["login_attempts"] or 0) + 1
locked = attempts >= MAX_LOGIN_ATTEMPTS
cur.execute(
"UPDATE users SET login_attempts=%s, is_locked=%s, password_change_requested=%s WHERE id=%s",
(attempts, locked, locked, user["id"])
)
conn.commit()
if locked:
await notify_discord_only(
title="🔒 계정 잠금 발생",
message=(
f"사용자: `{req.username}`\n"
f"사유: 비밀번호 {MAX_LOGIN_ATTEMPTS}회 오류로 계정이 잠겼습니다.\n"
f"관리자 페이지에서 잠금 해제 또는 임시 비밀번호를 발급해주세요."
),
color=0xe74c3c
)
raise HTTPException(status_code=403, detail="Account locked due to too many failed attempts. Please contact admin.")
remaining = MAX_LOGIN_ATTEMPTS - attempts
raise HTTPException(status_code=401, detail=f"Invalid credentials. {remaining} attempts remaining.")
# 로그인 성공 - 실패 횟수 초기화
cur.execute("UPDATE users SET login_attempts=0 WHERE id=%s", (user["id"],))
conn.commit()
token = create_token(user["id"], user["username"], user["is_admin"], user["must_change_password"])
return {
"token": token,
"username": user["username"],
"is_admin": user["is_admin"],
"must_change_password": user["must_change_password"]
}
@app.get("/api/auth/me")
def me(token=Depends(verify_token)):
return {
"username": token["username"],
"is_admin": token["is_admin"],
"must_change_password": token.get("must_change_password", False)
}
# ─── 비밀번호 변경 (본인) ────────────────────────────────
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
@app.post("/api/auth/change-password")
def change_password(req: ChangePasswordRequest, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
user_id = int(token["sub"])
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cur.fetchone()
if not bcrypt.checkpw(req.current_password.encode(), user["password_hash"].encode()):
raise HTTPException(status_code=400, detail="Current password is incorrect")
if len(req.new_password) < 6:
raise HTTPException(status_code=400, detail="New password must be at least 6 characters")
hashed = bcrypt.hashpw(req.new_password.encode(), bcrypt.gensalt()).decode()
cur.execute(
"UPDATE users SET password_hash=%s, must_change_password=FALSE, login_attempts=0, is_locked=FALSE WHERE id=%s",
(hashed, user_id)
)
conn.commit()
new_token = create_token(user["id"], user["username"], user["is_admin"], False)
return {"ok": True, "token": new_token}
# ─── My Pages ───────────────────────────────────────────
@app.get("/api/my-pages")
def my_pages(token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
user_id = int(token["sub"])
if token["is_admin"]:
cur.execute("SELECT * FROM webpages ORDER BY name")
else:
cur.execute("""
SELECT w.* FROM webpages w
JOIN user_webpage_access ua ON w.id = ua.webpage_id
WHERE ua.user_id = %s ORDER BY w.name
""", (user_id,))
return cur.fetchall()
# ─── Admin: Webpages CRUD ────────────────────────────────
@app.get("/api/admin/webpages")
def list_webpages(token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM webpages ORDER BY name")
return cur.fetchall()
class WebpageCreate(BaseModel):
name: str
url: str
description: Optional[str] = ""
@app.post("/api/admin/webpages")
def create_webpage(data: WebpageCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO webpages (name, url, description) VALUES (%s, %s, %s) RETURNING *",
(data.name, data.url, data.description)
)
conn.commit()
return cur.fetchone()
@app.put("/api/admin/webpages/{page_id}")
def update_webpage(page_id: int, data: WebpageCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"UPDATE webpages SET name=%s, url=%s, description=%s WHERE id=%s RETURNING *",
(data.name, data.url, data.description, page_id)
)
result = cur.fetchone()
if not result:
raise HTTPException(status_code=404, detail="Not found")
conn.commit()
return result
@app.delete("/api/admin/webpages/{page_id}")
def delete_webpage(page_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM webpages WHERE id = %s", (page_id,))
conn.commit()
return {"ok": True}
# ─── Admin: Users ────────────────────────────────────────
@app.get("/api/admin/users")
def list_users(token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("""
SELECT id, username, is_admin, must_change_password,
login_attempts, is_locked, password_change_requested, created_at
FROM users ORDER BY username
""")
return cur.fetchall()
class UserCreate(BaseModel):
username: str
password: str
is_admin: bool = False
@app.post("/api/admin/users")
def create_user(data: UserCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
hashed = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode()
try:
cur.execute(
"""INSERT INTO users (username, password_hash, is_admin, must_change_password)
VALUES (%s, %s, %s, TRUE) RETURNING id, username, is_admin""",
(data.username, hashed, data.is_admin)
)
conn.commit()
return cur.fetchone()
except psycopg2.errors.UniqueViolation:
raise HTTPException(status_code=400, detail="Username already exists")
@app.delete("/api/admin/users/{user_id}")
def delete_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM users WHERE id = %s AND username != 'admin'", (user_id,))
conn.commit()
return {"ok": True}
# ─── Admin: 비밀번호 변경 (관리자가 타 사용자) ──────────────
class AdminPasswordChange(BaseModel):
new_password: str
@app.put("/api/admin/users/{user_id}/password")
def admin_change_password(user_id: int, data: AdminPasswordChange, token=Depends(require_admin), conn=Depends(get_db)):
if len(data.new_password) < 6:
raise HTTPException(status_code=400, detail="Password must be at least 6 characters")
cur = conn.cursor()
hashed = bcrypt.hashpw(data.new_password.encode(), bcrypt.gensalt()).decode()
cur.execute(
"""UPDATE users SET password_hash=%s, must_change_password=TRUE,
login_attempts=0, is_locked=FALSE, password_change_requested=FALSE
WHERE id=%s AND username != 'admin'""",
(hashed, user_id)
)
conn.commit()
return {"ok": True}
# ─── Admin: 임시 비밀번호 발급 ───────────────────────────
@app.post("/api/admin/users/{user_id}/reset-password")
async def reset_password(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
temp_pw = generate_temp_password()
hashed = bcrypt.hashpw(temp_pw.encode(), bcrypt.gensalt()).decode()
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT username FROM users WHERE id=%s", (user_id,))
user = cur.fetchone()
cur.execute(
"""UPDATE users SET password_hash=%s, must_change_password=TRUE,
login_attempts=0, is_locked=FALSE, password_change_requested=FALSE
WHERE id=%s""",
(hashed, user_id)
)
conn.commit()
if user:
await notify_discord_only(
title="🔑 임시 비밀번호 발급",
message=(
f"관리자 `{token['username']}` 이(가) 임시 비밀번호를 발급했습니다.\n"
f"대상 사용자: `{user['username']}`"
),
color=0x3498db
)
return {"ok": True, "temp_password": temp_pw}
# ─── Admin: 계정 잠금 해제 ───────────────────────────────
@app.post("/api/admin/users/{user_id}/unlock")
def unlock_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute(
"UPDATE users SET is_locked=FALSE, login_attempts=0 WHERE id=%s",
(user_id,)
)
conn.commit()
return {"ok": True}
# ─── Admin: User Access ──────────────────────────────────
@app.get("/api/admin/users/{user_id}/pages")
def get_user_pages(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("""
SELECT w.id, w.name, w.url, w.description,
(ua.user_id IS NOT NULL) as has_access
FROM webpages w
LEFT JOIN user_webpage_access ua ON w.id = ua.webpage_id AND ua.user_id = %s
ORDER BY w.name
""", (user_id,))
return cur.fetchall()
class AccessUpdate(BaseModel):
webpage_ids: List[int]
@app.put("/api/admin/users/{user_id}/pages")
def update_user_pages(user_id: int, data: AccessUpdate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM user_webpage_access WHERE user_id = %s", (user_id,))
for page_id in data.webpage_ids:
cur.execute(
"INSERT INTO user_webpage_access (user_id, webpage_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
(user_id, page_id)
)
conn.commit()
return {"ok": True}
@app.get("/api/admin/notify-test")
async def notify_test(token=Depends(require_admin)):
# Discord + Gmail (Pod 이상/복구 테스트)
await notify_both(
title="✅ [Discord+Gmail] 알림 테스트",
message=(
f"관리자 `{token['username']}` 이(가) 알림 테스트를 실행했습니다.\n"
f"Pod 이상/복구 알림 채널입니다."
),
color=0x2ecc71
)
# Gmail only (인증서 만료 테스트)
await notify_email_only(
title="✅ [Gmail 전용] 알림 테스트",
message=(
f"인증서 만료 임박 알림 채널입니다.\n"
f"Gmail로만 발송됩니다."
),
color=0xf39c12
)
# Discord only (계정 잠금/임시PW 테스트)
await notify_discord_only(
title="✅ [Discord 전용] 알림 테스트",
message=(
f"계정 잠금/임시 비밀번호 발급 알림 채널입니다.\n"
f"Discord로만 발송됩니다."
),
color=0x3498db
)
return {"ok": True, "message": "채널별 알림 테스트 완료 (Discord+Gmail / Gmail전용 / Discord전용)"}
@app.get("/health")
def health():
return {"status": "ok"}
# ─── 게시판 ──────────────────────────────────────────────
class PostCreate(BaseModel):
title: str
content: str
class ReplyCreate(BaseModel):
content: str
def init_board_db():
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS board_posts (
id SERIAL PRIMARY KEY,
title VARCHAR(300) NOT NULL,
content TEXT NOT NULL,
author_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
author_name VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS board_replies (
id SERIAL PRIMARY KEY,
post_id INTEGER REFERENCES board_posts(id) ON DELETE CASCADE,
content TEXT NOT NULL,
author_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
author_name VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
""")
conn.commit()
cur.close()
conn.close()
@app.on_event("startup")
def startup_board():
import time
for _ in range(10):
try:
init_board_db()
print("Board DB initialized")
break
except Exception as e:
print(f"Board DB not ready... {e}")
time.sleep(3)
@app.get("/api/board/posts")
def list_posts(token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT id, title, author_name, created_at FROM board_posts ORDER BY created_at DESC")
return cur.fetchall()
@app.post("/api/board/posts")
def create_post(data: PostCreate, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO board_posts (title, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *",
(data.title, data.content, int(token["sub"]), token["username"])
)
conn.commit()
return cur.fetchone()
@app.get("/api/board/posts/{post_id}")
def get_post(post_id: int, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM board_posts WHERE id = %s", (post_id,))
post = cur.fetchone()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
cur.execute("SELECT * FROM board_replies WHERE post_id = %s ORDER BY created_at ASC", (post_id,))
replies = cur.fetchall()
return {"post": post, "replies": replies}
@app.delete("/api/board/posts/{post_id}")
def delete_post(post_id: int, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM board_posts WHERE id = %s", (post_id,))
post = cur.fetchone()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if not token.get("is_admin") and post["author_id"] != int(token["sub"]):
raise HTTPException(status_code=403, detail="Permission denied")
cur.execute("DELETE FROM board_posts WHERE id = %s", (post_id,))
conn.commit()
return {"ok": True}
@app.post("/api/board/posts/{post_id}/replies")
def create_reply(post_id: int, data: ReplyCreate, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO board_replies (post_id, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *",
(post_id, data.content, int(token["sub"]), token["username"])
)
conn.commit()
return cur.fetchone()
@app.delete("/api/board/replies/{reply_id}")
def delete_reply(reply_id: int, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM board_replies WHERE id = %s", (reply_id,))
reply = cur.fetchone()
if not reply:
raise HTTPException(status_code=404, detail="Reply not found")
if not token.get("is_admin") and reply["author_id"] != int(token["sub"]):
raise HTTPException(status_code=403, detail="Permission denied")
cur.execute("DELETE FROM board_replies WHERE id = %s", (reply_id,))
conn.commit()
return {"ok": True}
# ─── 공지사항 ──────────────────────────────────────────────
class NoticeCreate(BaseModel):
title: str
content: str
class NoticeReplyCreate(BaseModel):
content: str
def init_notice_db():
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS notice_posts (
id SERIAL PRIMARY KEY,
title VARCHAR(300) NOT NULL,
content TEXT NOT NULL,
author_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
author_name VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS notice_replies (
id SERIAL PRIMARY KEY,
post_id INTEGER REFERENCES notice_posts(id) ON DELETE CASCADE,
content TEXT NOT NULL,
author_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
author_name VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
""")
conn.commit()
cur.close()
conn.close()
@app.on_event("startup")
def startup_notice():
import time
for _ in range(10):
try:
init_notice_db()
print("Notice DB initialized")
break
except Exception as e:
print(f"Notice DB not ready... {e}")
time.sleep(3)
@app.get("/api/notice/posts")
def list_notice_posts(token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT id, title, author_name, created_at FROM notice_posts ORDER BY created_at DESC")
return cur.fetchall()
@app.post("/api/notice/posts")
def create_notice_post(data: NoticeCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO notice_posts (title, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *",
(data.title, data.content, int(token["sub"]), token["username"])
)
conn.commit()
return cur.fetchone()
@app.get("/api/notice/posts/{post_id}")
def get_notice_post(post_id: int, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM notice_posts WHERE id = %s", (post_id,))
post = cur.fetchone()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
cur.execute("SELECT * FROM notice_replies WHERE post_id = %s ORDER BY created_at ASC", (post_id,))
replies = cur.fetchall()
return {"post": post, "replies": replies}
@app.delete("/api/notice/posts/{post_id}")
def delete_notice_post(post_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM notice_posts WHERE id = %s", (post_id,))
conn.commit()
return {"ok": True}
@app.post("/api/notice/posts/{post_id}/replies")
def create_notice_reply(post_id: int, data: NoticeReplyCreate, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO notice_replies (post_id, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *",
(post_id, data.content, int(token["sub"]), token["username"])
)
conn.commit()
return cur.fetchone()
@app.delete("/api/notice/replies/{reply_id}")
def delete_notice_reply(reply_id: int, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM notice_replies WHERE id = %s", (reply_id,))
reply = cur.fetchone()
if not reply:
raise HTTPException(status_code=404, detail="Reply not found")
if not token.get("is_admin") and reply["author_id"] != int(token["sub"]):
raise HTTPException(status_code=403, detail="Permission denied")
cur.execute("DELETE FROM notice_replies WHERE id = %s", (reply_id,))
conn.commit()
return {"ok": True}