Files
nginx-portal/backend/main.py
qorgh529 68e9fc0a32
Some checks failed
Build and Push Images / build-backend (push) Has been cancelled
feat: 공지 탭 추가
2026-04-10 19:58:43 +09:00

606 lines
24 KiB
Python
Executable File

from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List
import psycopg2
import psycopg2.extras
import bcrypt
import jwt
import os
import secrets
import string
from datetime import datetime, timedelta
app = FastAPI(title="Web Portal API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
SECRET_KEY = os.getenv("JWT_SECRET", "supersecretkey1234")
ALGORITHM = "HS256"
security = HTTPBearer()
MAX_LOGIN_ATTEMPTS = 5
DB_CONFIG = {
"host": os.getenv("DB_HOST", "postgres-service"),
"port": int(os.getenv("DB_PORT", "5432")),
"database": os.getenv("DB_NAME", "portaldb"),
"user": os.getenv("DB_USER", "portaluser"),
"password": os.getenv("DB_PASSWORD", "portalpass"),
}
def get_db():
conn = psycopg2.connect(**DB_CONFIG)
try:
yield conn
finally:
conn.close()
def init_db():
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
is_admin BOOLEAN DEFAULT FALSE,
must_change_password BOOLEAN DEFAULT TRUE,
login_attempts INTEGER DEFAULT 0,
is_locked BOOLEAN DEFAULT FALSE,
password_change_requested BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS webpages (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
url VARCHAR(500) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS user_webpage_access (
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
webpage_id INTEGER REFERENCES webpages(id) ON DELETE CASCADE,
PRIMARY KEY (user_id, webpage_id)
);
""")
# 컬럼 추가 (기존 DB 마이그레이션)
for col, definition in [
("must_change_password", "BOOLEAN DEFAULT TRUE"),
("login_attempts", "INTEGER DEFAULT 0"),
("is_locked", "BOOLEAN DEFAULT FALSE"),
("password_change_requested", "BOOLEAN DEFAULT FALSE"),
]:
try:
cur.execute(f"ALTER TABLE users ADD COLUMN IF NOT EXISTS {col} {definition}")
except Exception:
pass
# 기본 관리자 계정 (최초 로그인 시 비밀번호 변경 불필요)
cur.execute("SELECT id FROM users WHERE username = 'admin'")
if not cur.fetchone():
hashed = bcrypt.hashpw("admin1234".encode(), bcrypt.gensalt()).decode()
cur.execute(
"INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, TRUE, FALSE)",
("admin", hashed)
)
# 테스트 일반 사용자 (최초 로그인 시 비밀번호 변경 필요)
cur.execute("SELECT id FROM users WHERE username = 'user1'")
if not cur.fetchone():
hashed = bcrypt.hashpw("user1234".encode(), bcrypt.gensalt()).decode()
cur.execute(
"INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, FALSE, TRUE)",
("user1", hashed)
)
conn.commit()
cur.close()
conn.close()
@app.on_event("startup")
def startup():
import time
for _ in range(10):
try:
init_db()
print("DB initialized successfully")
break
except Exception as e:
print(f"DB not ready, retrying... {e}")
time.sleep(3)
def create_token(user_id: int, username: str, is_admin: bool, must_change_password: bool):
payload = {
"sub": str(user_id),
"username": username,
"is_admin": is_admin,
"must_change_password": must_change_password,
"exp": datetime.utcnow() + timedelta(hours=8)
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
def require_admin(token=Depends(verify_token)):
if not token.get("is_admin"):
raise HTTPException(status_code=403, detail="Admin only")
return token
def generate_temp_password(length=10):
chars = string.ascii_letters + string.digits
return ''.join(secrets.choice(chars) for _ in range(length))
# ─── Auth ───────────────────────────────────────────────
class LoginRequest(BaseModel):
username: str
password: str
@app.post("/api/auth/login")
def login(req: LoginRequest, conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM users WHERE username = %s", (req.username,))
user = cur.fetchone()
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# 계정 잠금 확인
if user["is_locked"]:
raise HTTPException(status_code=403, detail="Account locked. Please contact admin.")
# 비밀번호 검증
if not bcrypt.checkpw(req.password.encode(), user["password_hash"].encode()):
attempts = (user["login_attempts"] or 0) + 1
locked = attempts >= MAX_LOGIN_ATTEMPTS
cur.execute(
"UPDATE users SET login_attempts=%s, is_locked=%s, password_change_requested=%s WHERE id=%s",
(attempts, locked, locked, user["id"])
)
conn.commit()
if locked:
raise HTTPException(status_code=403, detail="Account locked due to too many failed attempts. Please contact admin.")
remaining = MAX_LOGIN_ATTEMPTS - attempts
raise HTTPException(status_code=401, detail=f"Invalid credentials. {remaining} attempts remaining.")
# 로그인 성공 - 실패 횟수 초기화
cur.execute("UPDATE users SET login_attempts=0 WHERE id=%s", (user["id"],))
conn.commit()
token = create_token(user["id"], user["username"], user["is_admin"], user["must_change_password"])
return {
"token": token,
"username": user["username"],
"is_admin": user["is_admin"],
"must_change_password": user["must_change_password"]
}
@app.get("/api/auth/me")
def me(token=Depends(verify_token)):
return {
"username": token["username"],
"is_admin": token["is_admin"],
"must_change_password": token.get("must_change_password", False)
}
# ─── 비밀번호 변경 (본인) ────────────────────────────────
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
@app.post("/api/auth/change-password")
def change_password(req: ChangePasswordRequest, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
user_id = int(token["sub"])
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cur.fetchone()
if not bcrypt.checkpw(req.current_password.encode(), user["password_hash"].encode()):
raise HTTPException(status_code=400, detail="Current password is incorrect")
if len(req.new_password) < 6:
raise HTTPException(status_code=400, detail="New password must be at least 6 characters")
hashed = bcrypt.hashpw(req.new_password.encode(), bcrypt.gensalt()).decode()
cur.execute(
"UPDATE users SET password_hash=%s, must_change_password=FALSE, login_attempts=0, is_locked=FALSE WHERE id=%s",
(hashed, user_id)
)
conn.commit()
new_token = create_token(user["id"], user["username"], user["is_admin"], False)
return {"ok": True, "token": new_token}
# ─── My Pages ───────────────────────────────────────────
@app.get("/api/my-pages")
def my_pages(token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
user_id = int(token["sub"])
if token["is_admin"]:
cur.execute("SELECT * FROM webpages ORDER BY name")
else:
cur.execute("""
SELECT w.* FROM webpages w
JOIN user_webpage_access ua ON w.id = ua.webpage_id
WHERE ua.user_id = %s ORDER BY w.name
""", (user_id,))
return cur.fetchall()
# ─── Admin: Webpages CRUD ────────────────────────────────
@app.get("/api/admin/webpages")
def list_webpages(token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM webpages ORDER BY name")
return cur.fetchall()
class WebpageCreate(BaseModel):
name: str
url: str
description: Optional[str] = ""
@app.post("/api/admin/webpages")
def create_webpage(data: WebpageCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO webpages (name, url, description) VALUES (%s, %s, %s) RETURNING *",
(data.name, data.url, data.description)
)
conn.commit()
return cur.fetchone()
@app.put("/api/admin/webpages/{page_id}")
def update_webpage(page_id: int, data: WebpageCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"UPDATE webpages SET name=%s, url=%s, description=%s WHERE id=%s RETURNING *",
(data.name, data.url, data.description, page_id)
)
result = cur.fetchone()
if not result:
raise HTTPException(status_code=404, detail="Not found")
conn.commit()
return result
@app.delete("/api/admin/webpages/{page_id}")
def delete_webpage(page_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM webpages WHERE id = %s", (page_id,))
conn.commit()
return {"ok": True}
# ─── Admin: Users ────────────────────────────────────────
@app.get("/api/admin/users")
def list_users(token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("""
SELECT id, username, is_admin, must_change_password,
login_attempts, is_locked, password_change_requested, created_at
FROM users ORDER BY username
""")
return cur.fetchall()
class UserCreate(BaseModel):
username: str
password: str
is_admin: bool = False
@app.post("/api/admin/users")
def create_user(data: UserCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
hashed = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode()
try:
cur.execute(
"""INSERT INTO users (username, password_hash, is_admin, must_change_password)
VALUES (%s, %s, %s, TRUE) RETURNING id, username, is_admin""",
(data.username, hashed, data.is_admin)
)
conn.commit()
return cur.fetchone()
except psycopg2.errors.UniqueViolation:
raise HTTPException(status_code=400, detail="Username already exists")
@app.delete("/api/admin/users/{user_id}")
def delete_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM users WHERE id = %s AND username != 'admin'", (user_id,))
conn.commit()
return {"ok": True}
# ─── Admin: 비밀번호 변경 (관리자가 타 사용자) ──────────────
class AdminPasswordChange(BaseModel):
new_password: str
@app.put("/api/admin/users/{user_id}/password")
def admin_change_password(user_id: int, data: AdminPasswordChange, token=Depends(require_admin), conn=Depends(get_db)):
if len(data.new_password) < 6:
raise HTTPException(status_code=400, detail="Password must be at least 6 characters")
cur = conn.cursor()
hashed = bcrypt.hashpw(data.new_password.encode(), bcrypt.gensalt()).decode()
cur.execute(
"""UPDATE users SET password_hash=%s, must_change_password=TRUE,
login_attempts=0, is_locked=FALSE, password_change_requested=FALSE
WHERE id=%s AND username != 'admin'""",
(hashed, user_id)
)
conn.commit()
return {"ok": True}
# ─── Admin: 임시 비밀번호 발급 ───────────────────────────
@app.post("/api/admin/users/{user_id}/reset-password")
def reset_password(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
temp_pw = generate_temp_password()
hashed = bcrypt.hashpw(temp_pw.encode(), bcrypt.gensalt()).decode()
cur = conn.cursor()
cur.execute(
"""UPDATE users SET password_hash=%s, must_change_password=TRUE,
login_attempts=0, is_locked=FALSE, password_change_requested=FALSE
WHERE id=%s""",
(hashed, user_id)
)
conn.commit()
return {"ok": True, "temp_password": temp_pw}
# ─── Admin: 계정 잠금 해제 ───────────────────────────────
@app.post("/api/admin/users/{user_id}/unlock")
def unlock_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute(
"UPDATE users SET is_locked=FALSE, login_attempts=0 WHERE id=%s",
(user_id,)
)
conn.commit()
return {"ok": True}
# ─── Admin: User Access ──────────────────────────────────
@app.get("/api/admin/users/{user_id}/pages")
def get_user_pages(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("""
SELECT w.id, w.name, w.url, w.description,
(ua.user_id IS NOT NULL) as has_access
FROM webpages w
LEFT JOIN user_webpage_access ua ON w.id = ua.webpage_id AND ua.user_id = %s
ORDER BY w.name
""", (user_id,))
return cur.fetchall()
class AccessUpdate(BaseModel):
webpage_ids: List[int]
@app.put("/api/admin/users/{user_id}/pages")
def update_user_pages(user_id: int, data: AccessUpdate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM user_webpage_access WHERE user_id = %s", (user_id,))
for page_id in data.webpage_ids:
cur.execute(
"INSERT INTO user_webpage_access (user_id, webpage_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
(user_id, page_id)
)
conn.commit()
return {"ok": True}
@app.get("/health")
def health():
return {"status": "ok"}
# ─── 게시판 ──────────────────────────────────────────────
class PostCreate(BaseModel):
title: str
content: str
class ReplyCreate(BaseModel):
content: str
def init_board_db():
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS board_posts (
id SERIAL PRIMARY KEY,
title VARCHAR(300) NOT NULL,
content TEXT NOT NULL,
author_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
author_name VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS board_replies (
id SERIAL PRIMARY KEY,
post_id INTEGER REFERENCES board_posts(id) ON DELETE CASCADE,
content TEXT NOT NULL,
author_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
author_name VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
""")
conn.commit()
cur.close()
conn.close()
@app.on_event("startup")
def startup_board():
import time
for _ in range(10):
try:
init_board_db()
print("Board DB initialized")
break
except Exception as e:
print(f"Board DB not ready... {e}")
time.sleep(3)
@app.get("/api/board/posts")
def list_posts(token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT id, title, author_name, created_at FROM board_posts ORDER BY created_at DESC")
return cur.fetchall()
@app.post("/api/board/posts")
def create_post(data: PostCreate, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO board_posts (title, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *",
(data.title, data.content, int(token["sub"]), token["username"])
)
conn.commit()
return cur.fetchone()
@app.get("/api/board/posts/{post_id}")
def get_post(post_id: int, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM board_posts WHERE id = %s", (post_id,))
post = cur.fetchone()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
cur.execute("SELECT * FROM board_replies WHERE post_id = %s ORDER BY created_at ASC", (post_id,))
replies = cur.fetchall()
return {"post": post, "replies": replies}
@app.delete("/api/board/posts/{post_id}")
def delete_post(post_id: int, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM board_posts WHERE id = %s", (post_id,))
post = cur.fetchone()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if not token.get("is_admin") and post["author_id"] != int(token["sub"]):
raise HTTPException(status_code=403, detail="Permission denied")
cur.execute("DELETE FROM board_posts WHERE id = %s", (post_id,))
conn.commit()
return {"ok": True}
@app.post("/api/board/posts/{post_id}/replies")
def create_reply(post_id: int, data: ReplyCreate, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO board_replies (post_id, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *",
(post_id, data.content, int(token["sub"]), token["username"])
)
conn.commit()
return cur.fetchone()
@app.delete("/api/board/replies/{reply_id}")
def delete_reply(reply_id: int, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM board_replies WHERE id = %s", (reply_id,))
reply = cur.fetchone()
if not reply:
raise HTTPException(status_code=404, detail="Reply not found")
if not token.get("is_admin") and reply["author_id"] != int(token["sub"]):
raise HTTPException(status_code=403, detail="Permission denied")
cur.execute("DELETE FROM board_replies WHERE id = %s", (reply_id,))
conn.commit()
return {"ok": True}
# ─── 공지사항 ──────────────────────────────────────────────
class NoticeCreate(BaseModel):
title: str
content: str
class NoticeReplyCreate(BaseModel):
content: str
def init_notice_db():
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS notice_posts (
id SERIAL PRIMARY KEY,
title VARCHAR(300) NOT NULL,
content TEXT NOT NULL,
author_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
author_name VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS notice_replies (
id SERIAL PRIMARY KEY,
post_id INTEGER REFERENCES notice_posts(id) ON DELETE CASCADE,
content TEXT NOT NULL,
author_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
author_name VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
""")
conn.commit()
cur.close()
conn.close()
@app.on_event("startup")
def startup_notice():
import time
for _ in range(10):
try:
init_notice_db()
print("Notice DB initialized")
break
except Exception as e:
print(f"Notice DB not ready... {e}")
time.sleep(3)
@app.get("/api/notice/posts")
def list_notice_posts(token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT id, title, author_name, created_at FROM notice_posts ORDER BY created_at DESC")
return cur.fetchall()
@app.post("/api/notice/posts")
def create_notice_post(data: NoticeCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO notice_posts (title, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *",
(data.title, data.content, int(token["sub"]), token["username"])
)
conn.commit()
return cur.fetchone()
@app.get("/api/notice/posts/{post_id}")
def get_notice_post(post_id: int, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM notice_posts WHERE id = %s", (post_id,))
post = cur.fetchone()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
cur.execute("SELECT * FROM notice_replies WHERE post_id = %s ORDER BY created_at ASC", (post_id,))
replies = cur.fetchall()
return {"post": post, "replies": replies}
@app.delete("/api/notice/posts/{post_id}")
def delete_notice_post(post_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM notice_posts WHERE id = %s", (post_id,))
conn.commit()
return {"ok": True}
@app.post("/api/notice/posts/{post_id}/replies")
def create_notice_reply(post_id: int, data: NoticeReplyCreate, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO notice_replies (post_id, content, author_id, author_name) VALUES (%s, %s, %s, %s) RETURNING *",
(post_id, data.content, int(token["sub"]), token["username"])
)
conn.commit()
return cur.fetchone()
@app.delete("/api/notice/replies/{reply_id}")
def delete_notice_reply(reply_id: int, token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM notice_replies WHERE id = %s", (reply_id,))
reply = cur.fetchone()
if not reply:
raise HTTPException(status_code=404, detail="Reply not found")
if not token.get("is_admin") and reply["author_id"] != int(token["sub"]):
raise HTTPException(status_code=403, detail="Permission denied")
cur.execute("DELETE FROM notice_replies WHERE id = %s", (reply_id,))
conn.commit()
return {"ok": True}