Files
nginx-portal/backend/main.py
qorgh529 5e7e245858
Some checks failed
Build and Push Images / build-backend (push) Has been cancelled
init: web portal
2026-04-06 21:16:17 +09:00

254 lines
8.9 KiB
Python
Executable File

from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List
import psycopg2
import psycopg2.extras
import bcrypt
import jwt
import os
from datetime import datetime, timedelta
app = FastAPI(title="Web Portal API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
SECRET_KEY = os.getenv("JWT_SECRET", "supersecretkey1234")
ALGORITHM = "HS256"
security = HTTPBearer()
DB_CONFIG = {
"host": os.getenv("DB_HOST", "postgres-service"),
"port": int(os.getenv("DB_PORT", "5432")),
"database": os.getenv("DB_NAME", "portaldb"),
"user": os.getenv("DB_USER", "portaluser"),
"password": os.getenv("DB_PASSWORD", "portalpass"),
}
def get_db():
conn = psycopg2.connect(**DB_CONFIG)
try:
yield conn
finally:
conn.close()
def init_db():
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
is_admin BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS webpages (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
url VARCHAR(500) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS user_webpage_access (
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
webpage_id INTEGER REFERENCES webpages(id) ON DELETE CASCADE,
PRIMARY KEY (user_id, webpage_id)
);
""")
cur.execute("SELECT id FROM users WHERE username = 'admin'")
if not cur.fetchone():
hashed = bcrypt.hashpw("admin1234".encode(), bcrypt.gensalt()).decode()
cur.execute(
"INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, TRUE)",
("admin", hashed)
)
cur.execute("SELECT id FROM users WHERE username = 'user1'")
if not cur.fetchone():
hashed = bcrypt.hashpw("user1234".encode(), bcrypt.gensalt()).decode()
cur.execute(
"INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, FALSE)",
("user1", hashed)
)
conn.commit()
cur.close()
conn.close()
@app.on_event("startup")
def startup():
import time
for _ in range(10):
try:
init_db()
print("DB initialized successfully")
break
except Exception as e:
print(f"DB not ready, retrying... {e}")
time.sleep(3)
def create_token(user_id: int, username: str, is_admin: bool):
payload = {
"sub": str(user_id),
"username": username,
"is_admin": is_admin,
"exp": datetime.utcnow() + timedelta(hours=8)
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
def require_admin(token=Depends(verify_token)):
if not token.get("is_admin"):
raise HTTPException(status_code=403, detail="Admin only")
return token
class LoginRequest(BaseModel):
username: str
password: str
@app.post("/api/auth/login")
def login(req: LoginRequest, conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM users WHERE username = %s", (req.username,))
user = cur.fetchone()
if not user or not bcrypt.checkpw(req.password.encode(), user["password_hash"].encode()):
raise HTTPException(status_code=401, detail="Invalid credentials")
token = create_token(user["id"], user["username"], user["is_admin"])
return {"token": token, "username": user["username"], "is_admin": user["is_admin"]}
@app.get("/api/auth/me")
def me(token=Depends(verify_token)):
return {"username": token["username"], "is_admin": token["is_admin"]}
@app.get("/api/my-pages")
def my_pages(token=Depends(verify_token), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
user_id = int(token["sub"])
if token["is_admin"]:
cur.execute("SELECT * FROM webpages ORDER BY name")
else:
cur.execute("""
SELECT w.* FROM webpages w
JOIN user_webpage_access ua ON w.id = ua.webpage_id
WHERE ua.user_id = %s ORDER BY w.name
""", (user_id,))
return cur.fetchall()
@app.get("/api/admin/webpages")
def list_webpages(token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM webpages ORDER BY name")
return cur.fetchall()
class WebpageCreate(BaseModel):
name: str
url: str
description: Optional[str] = ""
@app.post("/api/admin/webpages")
def create_webpage(data: WebpageCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"INSERT INTO webpages (name, url, description) VALUES (%s, %s, %s) RETURNING *",
(data.name, data.url, data.description)
)
conn.commit()
return cur.fetchone()
@app.put("/api/admin/webpages/{page_id}")
def update_webpage(page_id: int, data: WebpageCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"UPDATE webpages SET name=%s, url=%s, description=%s WHERE id=%s RETURNING *",
(data.name, data.url, data.description, page_id)
)
result = cur.fetchone()
if not result:
raise HTTPException(status_code=404, detail="Not found")
conn.commit()
return result
@app.delete("/api/admin/webpages/{page_id}")
def delete_webpage(page_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM webpages WHERE id = %s", (page_id,))
conn.commit()
return {"ok": True}
@app.get("/api/admin/users")
def list_users(token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT id, username, is_admin, created_at FROM users ORDER BY username")
return cur.fetchall()
class UserCreate(BaseModel):
username: str
password: str
is_admin: bool = False
@app.post("/api/admin/users")
def create_user(data: UserCreate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
hashed = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode()
try:
cur.execute(
"INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, %s) RETURNING id, username, is_admin",
(data.username, hashed, data.is_admin)
)
conn.commit()
return cur.fetchone()
except psycopg2.errors.UniqueViolation:
raise HTTPException(status_code=400, detail="Username already exists")
@app.delete("/api/admin/users/{user_id}")
def delete_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM users WHERE id = %s AND username != 'admin'", (user_id,))
conn.commit()
return {"ok": True}
@app.get("/api/admin/users/{user_id}/pages")
def get_user_pages(user_id: int, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("""
SELECT w.id, w.name, w.url, w.description,
(ua.user_id IS NOT NULL) as has_access
FROM webpages w
LEFT JOIN user_webpage_access ua ON w.id = ua.webpage_id AND ua.user_id = %s
ORDER BY w.name
""", (user_id,))
return cur.fetchall()
class AccessUpdate(BaseModel):
webpage_ids: List[int]
@app.put("/api/admin/users/{user_id}/pages")
def update_user_pages(user_id: int, data: AccessUpdate, token=Depends(require_admin), conn=Depends(get_db)):
cur = conn.cursor()
cur.execute("DELETE FROM user_webpage_access WHERE user_id = %s", (user_id,))
for page_id in data.webpage_ids:
cur.execute(
"INSERT INTO user_webpage_access (user_id, webpage_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
(user_id, page_id)
)
conn.commit()
return {"ok": True}
@app.get("/health")
def health():
return {"status": "ok"}