From 3f7166f89ec463bb7a12ee4a2e4907fa486e2bb2 Mon Sep 17 00:00:00 2001 From: qorgh529 Date: Fri, 10 Apr 2026 17:59:44 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EB=B9=84=EB=B0=80=EB=B2=88=ED=98=B8=20?= =?UTF-8?q?=EA=B4=80=EB=A6=AC=20=EA=B8=B0=EB=8A=A5=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/main.py | 160 ++++++++++++++++- frontend/index.html | 428 ++++++++++++++++++++++++++------------------ 2 files changed, 406 insertions(+), 182 deletions(-) diff --git a/backend/main.py b/backend/main.py index c8c2fa2..d2be17e 100755 --- a/backend/main.py +++ b/backend/main.py @@ -8,6 +8,8 @@ import psycopg2.extras import bcrypt import jwt import os +import secrets +import string from datetime import datetime, timedelta app = FastAPI(title="Web Portal API") @@ -23,6 +25,7 @@ app.add_middleware( SECRET_KEY = os.getenv("JWT_SECRET", "supersecretkey1234") ALGORITHM = "HS256" security = HTTPBearer() +MAX_LOGIN_ATTEMPTS = 5 DB_CONFIG = { "host": os.getenv("DB_HOST", "postgres-service"), @@ -48,6 +51,10 @@ def init_db(): username VARCHAR(100) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, is_admin BOOLEAN DEFAULT FALSE, + must_change_password BOOLEAN DEFAULT TRUE, + login_attempts INTEGER DEFAULT 0, + is_locked BOOLEAN DEFAULT FALSE, + password_change_requested BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS webpages ( @@ -63,18 +70,32 @@ def init_db(): PRIMARY KEY (user_id, webpage_id) ); """) + # 컬럼 추가 (기존 DB 마이그레이션) + for col, definition in [ + ("must_change_password", "BOOLEAN DEFAULT TRUE"), + ("login_attempts", "INTEGER DEFAULT 0"), + ("is_locked", "BOOLEAN DEFAULT FALSE"), + ("password_change_requested", "BOOLEAN DEFAULT FALSE"), + ]: + try: + cur.execute(f"ALTER TABLE users ADD COLUMN IF NOT EXISTS {col} {definition}") + except Exception: + pass + + # 기본 관리자 계정 (최초 로그인 시 비밀번호 변경 불필요) cur.execute("SELECT id FROM users WHERE username = 'admin'") if not cur.fetchone(): hashed = bcrypt.hashpw("admin1234".encode(), bcrypt.gensalt()).decode() cur.execute( - "INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, TRUE)", + "INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, TRUE, FALSE)", ("admin", hashed) ) + # 테스트 일반 사용자 (최초 로그인 시 비밀번호 변경 필요) cur.execute("SELECT id FROM users WHERE username = 'user1'") if not cur.fetchone(): hashed = bcrypt.hashpw("user1234".encode(), bcrypt.gensalt()).decode() cur.execute( - "INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, FALSE)", + "INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, FALSE, TRUE)", ("user1", hashed) ) conn.commit() @@ -93,11 +114,12 @@ def startup(): print(f"DB not ready, retrying... {e}") time.sleep(3) -def create_token(user_id: int, username: str, is_admin: bool): +def create_token(user_id: int, username: str, is_admin: bool, must_change_password: bool): payload = { "sub": str(user_id), "username": username, "is_admin": is_admin, + "must_change_password": must_change_password, "exp": datetime.utcnow() + timedelta(hours=8) } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) @@ -116,6 +138,11 @@ def require_admin(token=Depends(verify_token)): raise HTTPException(status_code=403, detail="Admin only") return token +def generate_temp_password(length=10): + chars = string.ascii_letters + string.digits + return ''.join(secrets.choice(chars) for _ in range(length)) + +# ─── Auth ─────────────────────────────────────────────── class LoginRequest(BaseModel): username: str password: str @@ -125,15 +152,77 @@ def login(req: LoginRequest, conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT * FROM users WHERE username = %s", (req.username,)) user = cur.fetchone() - if not user or not bcrypt.checkpw(req.password.encode(), user["password_hash"].encode()): + + if not user: raise HTTPException(status_code=401, detail="Invalid credentials") - token = create_token(user["id"], user["username"], user["is_admin"]) - return {"token": token, "username": user["username"], "is_admin": user["is_admin"]} + + # 계정 잠금 확인 + if user["is_locked"]: + raise HTTPException(status_code=403, detail="Account locked. Please contact admin.") + + # 비밀번호 검증 + if not bcrypt.checkpw(req.password.encode(), user["password_hash"].encode()): + attempts = (user["login_attempts"] or 0) + 1 + locked = attempts >= MAX_LOGIN_ATTEMPTS + cur.execute( + "UPDATE users SET login_attempts=%s, is_locked=%s, password_change_requested=%s WHERE id=%s", + (attempts, locked, locked, user["id"]) + ) + conn.commit() + if locked: + raise HTTPException(status_code=403, detail="Account locked due to too many failed attempts. Please contact admin.") + remaining = MAX_LOGIN_ATTEMPTS - attempts + raise HTTPException(status_code=401, detail=f"Invalid credentials. {remaining} attempts remaining.") + + # 로그인 성공 - 실패 횟수 초기화 + cur.execute("UPDATE users SET login_attempts=0 WHERE id=%s", (user["id"],)) + conn.commit() + + token = create_token(user["id"], user["username"], user["is_admin"], user["must_change_password"]) + return { + "token": token, + "username": user["username"], + "is_admin": user["is_admin"], + "must_change_password": user["must_change_password"] + } @app.get("/api/auth/me") def me(token=Depends(verify_token)): - return {"username": token["username"], "is_admin": token["is_admin"]} + return { + "username": token["username"], + "is_admin": token["is_admin"], + "must_change_password": token.get("must_change_password", False) + } +# ─── 비밀번호 변경 (본인) ──────────────────────────────── +class ChangePasswordRequest(BaseModel): + current_password: str + new_password: str + +@app.post("/api/auth/change-password") +def change_password(req: ChangePasswordRequest, token=Depends(verify_token), conn=Depends(get_db)): + cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + user_id = int(token["sub"]) + cur.execute("SELECT * FROM users WHERE id = %s", (user_id,)) + user = cur.fetchone() + + if not bcrypt.checkpw(req.current_password.encode(), user["password_hash"].encode()): + raise HTTPException(status_code=400, detail="Current password is incorrect") + + if len(req.new_password) < 6: + raise HTTPException(status_code=400, detail="New password must be at least 6 characters") + + hashed = bcrypt.hashpw(req.new_password.encode(), bcrypt.gensalt()).decode() + cur.execute( + "UPDATE users SET password_hash=%s, must_change_password=FALSE, login_attempts=0, is_locked=FALSE WHERE id=%s", + (hashed, user_id) + ) + conn.commit() + + new_token = create_token(user["id"], user["username"], user["is_admin"], False) + return {"ok": True, "token": new_token} + +# ─── My Pages ─────────────────────────────────────────── @app.get("/api/my-pages") def my_pages(token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) @@ -148,6 +237,7 @@ def my_pages(token=Depends(verify_token), conn=Depends(get_db)): """, (user_id,)) return cur.fetchall() +# ─── Admin: Webpages CRUD ──────────────────────────────── @app.get("/api/admin/webpages") def list_webpages(token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) @@ -189,10 +279,15 @@ def delete_webpage(page_id: int, token=Depends(require_admin), conn=Depends(get_ conn.commit() return {"ok": True} +# ─── Admin: Users ──────────────────────────────────────── @app.get("/api/admin/users") def list_users(token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) - cur.execute("SELECT id, username, is_admin, created_at FROM users ORDER BY username") + cur.execute(""" + SELECT id, username, is_admin, must_change_password, + login_attempts, is_locked, password_change_requested, created_at + FROM users ORDER BY username + """) return cur.fetchall() class UserCreate(BaseModel): @@ -206,7 +301,8 @@ def create_user(data: UserCreate, token=Depends(require_admin), conn=Depends(get hashed = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode() try: cur.execute( - "INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, %s) RETURNING id, username, is_admin", + """INSERT INTO users (username, password_hash, is_admin, must_change_password) + VALUES (%s, %s, %s, TRUE) RETURNING id, username, is_admin""", (data.username, hashed, data.is_admin) ) conn.commit() @@ -221,6 +317,52 @@ def delete_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db) conn.commit() return {"ok": True} +# ─── Admin: 비밀번호 변경 (관리자가 타 사용자) ────────────── +class AdminPasswordChange(BaseModel): + new_password: str + +@app.put("/api/admin/users/{user_id}/password") +def admin_change_password(user_id: int, data: AdminPasswordChange, token=Depends(require_admin), conn=Depends(get_db)): + if len(data.new_password) < 6: + raise HTTPException(status_code=400, detail="Password must be at least 6 characters") + cur = conn.cursor() + hashed = bcrypt.hashpw(data.new_password.encode(), bcrypt.gensalt()).decode() + cur.execute( + """UPDATE users SET password_hash=%s, must_change_password=TRUE, + login_attempts=0, is_locked=FALSE, password_change_requested=FALSE + WHERE id=%s AND username != 'admin'""", + (hashed, user_id) + ) + conn.commit() + return {"ok": True} + +# ─── Admin: 임시 비밀번호 발급 ─────────────────────────── +@app.post("/api/admin/users/{user_id}/reset-password") +def reset_password(user_id: int, token=Depends(require_admin), conn=Depends(get_db)): + temp_pw = generate_temp_password() + hashed = bcrypt.hashpw(temp_pw.encode(), bcrypt.gensalt()).decode() + cur = conn.cursor() + cur.execute( + """UPDATE users SET password_hash=%s, must_change_password=TRUE, + login_attempts=0, is_locked=FALSE, password_change_requested=FALSE + WHERE id=%s""", + (hashed, user_id) + ) + conn.commit() + return {"ok": True, "temp_password": temp_pw} + +# ─── Admin: 계정 잠금 해제 ─────────────────────────────── +@app.post("/api/admin/users/{user_id}/unlock") +def unlock_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)): + cur = conn.cursor() + cur.execute( + "UPDATE users SET is_locked=FALSE, login_attempts=0 WHERE id=%s", + (user_id,) + ) + conn.commit() + return {"ok": True} + +# ─── Admin: User Access ────────────────────────────────── @app.get("/api/admin/users/{user_id}/pages") def get_user_pages(user_id: int, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) diff --git a/frontend/index.html b/frontend/index.html index 25d1d0b..8674065 100755 --- a/frontend/index.html +++ b/frontend/index.html @@ -6,89 +6,53 @@ Web Portal @@ -172,19 +123,27 @@

Web Portal

조직 내부 웹페이지 통합 포털

-
- - -
-
- - -
+
+
+ +
+
+

🔒 비밀번호 변경 필요

+

보안을 위해 비밀번호를 변경해야 합니다.
변경 후 서비스를 이용하실 수 있습니다.

+
⚠️ 초기 비밀번호는 반드시 변경해주세요.
+
+
+
+ +
+
+
+
@@ -197,10 +156,11 @@
+ -