feat: 비밀번호 관리 기능 추가
Some checks failed
Build and Push Images / build-backend (push) Has been cancelled
Some checks failed
Build and Push Images / build-backend (push) Has been cancelled
This commit is contained in:
160
backend/main.py
160
backend/main.py
@@ -8,6 +8,8 @@ import psycopg2.extras
|
||||
import bcrypt
|
||||
import jwt
|
||||
import os
|
||||
import secrets
|
||||
import string
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
app = FastAPI(title="Web Portal API")
|
||||
@@ -23,6 +25,7 @@ app.add_middleware(
|
||||
SECRET_KEY = os.getenv("JWT_SECRET", "supersecretkey1234")
|
||||
ALGORITHM = "HS256"
|
||||
security = HTTPBearer()
|
||||
MAX_LOGIN_ATTEMPTS = 5
|
||||
|
||||
DB_CONFIG = {
|
||||
"host": os.getenv("DB_HOST", "postgres-service"),
|
||||
@@ -48,6 +51,10 @@ def init_db():
|
||||
username VARCHAR(100) UNIQUE NOT NULL,
|
||||
password_hash VARCHAR(255) NOT NULL,
|
||||
is_admin BOOLEAN DEFAULT FALSE,
|
||||
must_change_password BOOLEAN DEFAULT TRUE,
|
||||
login_attempts INTEGER DEFAULT 0,
|
||||
is_locked BOOLEAN DEFAULT FALSE,
|
||||
password_change_requested BOOLEAN DEFAULT FALSE,
|
||||
created_at TIMESTAMP DEFAULT NOW()
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS webpages (
|
||||
@@ -63,18 +70,32 @@ def init_db():
|
||||
PRIMARY KEY (user_id, webpage_id)
|
||||
);
|
||||
""")
|
||||
# 컬럼 추가 (기존 DB 마이그레이션)
|
||||
for col, definition in [
|
||||
("must_change_password", "BOOLEAN DEFAULT TRUE"),
|
||||
("login_attempts", "INTEGER DEFAULT 0"),
|
||||
("is_locked", "BOOLEAN DEFAULT FALSE"),
|
||||
("password_change_requested", "BOOLEAN DEFAULT FALSE"),
|
||||
]:
|
||||
try:
|
||||
cur.execute(f"ALTER TABLE users ADD COLUMN IF NOT EXISTS {col} {definition}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 기본 관리자 계정 (최초 로그인 시 비밀번호 변경 불필요)
|
||||
cur.execute("SELECT id FROM users WHERE username = 'admin'")
|
||||
if not cur.fetchone():
|
||||
hashed = bcrypt.hashpw("admin1234".encode(), bcrypt.gensalt()).decode()
|
||||
cur.execute(
|
||||
"INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, TRUE)",
|
||||
"INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, TRUE, FALSE)",
|
||||
("admin", hashed)
|
||||
)
|
||||
# 테스트 일반 사용자 (최초 로그인 시 비밀번호 변경 필요)
|
||||
cur.execute("SELECT id FROM users WHERE username = 'user1'")
|
||||
if not cur.fetchone():
|
||||
hashed = bcrypt.hashpw("user1234".encode(), bcrypt.gensalt()).decode()
|
||||
cur.execute(
|
||||
"INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, FALSE)",
|
||||
"INSERT INTO users (username, password_hash, is_admin, must_change_password) VALUES (%s, %s, FALSE, TRUE)",
|
||||
("user1", hashed)
|
||||
)
|
||||
conn.commit()
|
||||
@@ -93,11 +114,12 @@ def startup():
|
||||
print(f"DB not ready, retrying... {e}")
|
||||
time.sleep(3)
|
||||
|
||||
def create_token(user_id: int, username: str, is_admin: bool):
|
||||
def create_token(user_id: int, username: str, is_admin: bool, must_change_password: bool):
|
||||
payload = {
|
||||
"sub": str(user_id),
|
||||
"username": username,
|
||||
"is_admin": is_admin,
|
||||
"must_change_password": must_change_password,
|
||||
"exp": datetime.utcnow() + timedelta(hours=8)
|
||||
}
|
||||
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
|
||||
@@ -116,6 +138,11 @@ def require_admin(token=Depends(verify_token)):
|
||||
raise HTTPException(status_code=403, detail="Admin only")
|
||||
return token
|
||||
|
||||
def generate_temp_password(length=10):
|
||||
chars = string.ascii_letters + string.digits
|
||||
return ''.join(secrets.choice(chars) for _ in range(length))
|
||||
|
||||
# ─── Auth ───────────────────────────────────────────────
|
||||
class LoginRequest(BaseModel):
|
||||
username: str
|
||||
password: str
|
||||
@@ -125,15 +152,77 @@ def login(req: LoginRequest, conn=Depends(get_db)):
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
cur.execute("SELECT * FROM users WHERE username = %s", (req.username,))
|
||||
user = cur.fetchone()
|
||||
if not user or not bcrypt.checkpw(req.password.encode(), user["password_hash"].encode()):
|
||||
|
||||
if not user:
|
||||
raise HTTPException(status_code=401, detail="Invalid credentials")
|
||||
token = create_token(user["id"], user["username"], user["is_admin"])
|
||||
return {"token": token, "username": user["username"], "is_admin": user["is_admin"]}
|
||||
|
||||
# 계정 잠금 확인
|
||||
if user["is_locked"]:
|
||||
raise HTTPException(status_code=403, detail="Account locked. Please contact admin.")
|
||||
|
||||
# 비밀번호 검증
|
||||
if not bcrypt.checkpw(req.password.encode(), user["password_hash"].encode()):
|
||||
attempts = (user["login_attempts"] or 0) + 1
|
||||
locked = attempts >= MAX_LOGIN_ATTEMPTS
|
||||
cur.execute(
|
||||
"UPDATE users SET login_attempts=%s, is_locked=%s, password_change_requested=%s WHERE id=%s",
|
||||
(attempts, locked, locked, user["id"])
|
||||
)
|
||||
conn.commit()
|
||||
if locked:
|
||||
raise HTTPException(status_code=403, detail="Account locked due to too many failed attempts. Please contact admin.")
|
||||
remaining = MAX_LOGIN_ATTEMPTS - attempts
|
||||
raise HTTPException(status_code=401, detail=f"Invalid credentials. {remaining} attempts remaining.")
|
||||
|
||||
# 로그인 성공 - 실패 횟수 초기화
|
||||
cur.execute("UPDATE users SET login_attempts=0 WHERE id=%s", (user["id"],))
|
||||
conn.commit()
|
||||
|
||||
token = create_token(user["id"], user["username"], user["is_admin"], user["must_change_password"])
|
||||
return {
|
||||
"token": token,
|
||||
"username": user["username"],
|
||||
"is_admin": user["is_admin"],
|
||||
"must_change_password": user["must_change_password"]
|
||||
}
|
||||
|
||||
@app.get("/api/auth/me")
|
||||
def me(token=Depends(verify_token)):
|
||||
return {"username": token["username"], "is_admin": token["is_admin"]}
|
||||
return {
|
||||
"username": token["username"],
|
||||
"is_admin": token["is_admin"],
|
||||
"must_change_password": token.get("must_change_password", False)
|
||||
}
|
||||
|
||||
# ─── 비밀번호 변경 (본인) ────────────────────────────────
|
||||
class ChangePasswordRequest(BaseModel):
|
||||
current_password: str
|
||||
new_password: str
|
||||
|
||||
@app.post("/api/auth/change-password")
|
||||
def change_password(req: ChangePasswordRequest, token=Depends(verify_token), conn=Depends(get_db)):
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
user_id = int(token["sub"])
|
||||
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
|
||||
user = cur.fetchone()
|
||||
|
||||
if not bcrypt.checkpw(req.current_password.encode(), user["password_hash"].encode()):
|
||||
raise HTTPException(status_code=400, detail="Current password is incorrect")
|
||||
|
||||
if len(req.new_password) < 6:
|
||||
raise HTTPException(status_code=400, detail="New password must be at least 6 characters")
|
||||
|
||||
hashed = bcrypt.hashpw(req.new_password.encode(), bcrypt.gensalt()).decode()
|
||||
cur.execute(
|
||||
"UPDATE users SET password_hash=%s, must_change_password=FALSE, login_attempts=0, is_locked=FALSE WHERE id=%s",
|
||||
(hashed, user_id)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
new_token = create_token(user["id"], user["username"], user["is_admin"], False)
|
||||
return {"ok": True, "token": new_token}
|
||||
|
||||
# ─── My Pages ───────────────────────────────────────────
|
||||
@app.get("/api/my-pages")
|
||||
def my_pages(token=Depends(verify_token), conn=Depends(get_db)):
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
@@ -148,6 +237,7 @@ def my_pages(token=Depends(verify_token), conn=Depends(get_db)):
|
||||
""", (user_id,))
|
||||
return cur.fetchall()
|
||||
|
||||
# ─── Admin: Webpages CRUD ────────────────────────────────
|
||||
@app.get("/api/admin/webpages")
|
||||
def list_webpages(token=Depends(require_admin), conn=Depends(get_db)):
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
@@ -189,10 +279,15 @@ def delete_webpage(page_id: int, token=Depends(require_admin), conn=Depends(get_
|
||||
conn.commit()
|
||||
return {"ok": True}
|
||||
|
||||
# ─── Admin: Users ────────────────────────────────────────
|
||||
@app.get("/api/admin/users")
|
||||
def list_users(token=Depends(require_admin), conn=Depends(get_db)):
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
cur.execute("SELECT id, username, is_admin, created_at FROM users ORDER BY username")
|
||||
cur.execute("""
|
||||
SELECT id, username, is_admin, must_change_password,
|
||||
login_attempts, is_locked, password_change_requested, created_at
|
||||
FROM users ORDER BY username
|
||||
""")
|
||||
return cur.fetchall()
|
||||
|
||||
class UserCreate(BaseModel):
|
||||
@@ -206,7 +301,8 @@ def create_user(data: UserCreate, token=Depends(require_admin), conn=Depends(get
|
||||
hashed = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode()
|
||||
try:
|
||||
cur.execute(
|
||||
"INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, %s) RETURNING id, username, is_admin",
|
||||
"""INSERT INTO users (username, password_hash, is_admin, must_change_password)
|
||||
VALUES (%s, %s, %s, TRUE) RETURNING id, username, is_admin""",
|
||||
(data.username, hashed, data.is_admin)
|
||||
)
|
||||
conn.commit()
|
||||
@@ -221,6 +317,52 @@ def delete_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)
|
||||
conn.commit()
|
||||
return {"ok": True}
|
||||
|
||||
# ─── Admin: 비밀번호 변경 (관리자가 타 사용자) ──────────────
|
||||
class AdminPasswordChange(BaseModel):
|
||||
new_password: str
|
||||
|
||||
@app.put("/api/admin/users/{user_id}/password")
|
||||
def admin_change_password(user_id: int, data: AdminPasswordChange, token=Depends(require_admin), conn=Depends(get_db)):
|
||||
if len(data.new_password) < 6:
|
||||
raise HTTPException(status_code=400, detail="Password must be at least 6 characters")
|
||||
cur = conn.cursor()
|
||||
hashed = bcrypt.hashpw(data.new_password.encode(), bcrypt.gensalt()).decode()
|
||||
cur.execute(
|
||||
"""UPDATE users SET password_hash=%s, must_change_password=TRUE,
|
||||
login_attempts=0, is_locked=FALSE, password_change_requested=FALSE
|
||||
WHERE id=%s AND username != 'admin'""",
|
||||
(hashed, user_id)
|
||||
)
|
||||
conn.commit()
|
||||
return {"ok": True}
|
||||
|
||||
# ─── Admin: 임시 비밀번호 발급 ───────────────────────────
|
||||
@app.post("/api/admin/users/{user_id}/reset-password")
|
||||
def reset_password(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
|
||||
temp_pw = generate_temp_password()
|
||||
hashed = bcrypt.hashpw(temp_pw.encode(), bcrypt.gensalt()).decode()
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"""UPDATE users SET password_hash=%s, must_change_password=TRUE,
|
||||
login_attempts=0, is_locked=FALSE, password_change_requested=FALSE
|
||||
WHERE id=%s""",
|
||||
(hashed, user_id)
|
||||
)
|
||||
conn.commit()
|
||||
return {"ok": True, "temp_password": temp_pw}
|
||||
|
||||
# ─── Admin: 계정 잠금 해제 ───────────────────────────────
|
||||
@app.post("/api/admin/users/{user_id}/unlock")
|
||||
def unlock_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"UPDATE users SET is_locked=FALSE, login_attempts=0 WHERE id=%s",
|
||||
(user_id,)
|
||||
)
|
||||
conn.commit()
|
||||
return {"ok": True}
|
||||
|
||||
# ─── Admin: User Access ──────────────────────────────────
|
||||
@app.get("/api/admin/users/{user_id}/pages")
|
||||
def get_user_pages(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
|
||||
Reference in New Issue
Block a user