from fastapi import FastAPI, HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, List import psycopg2 import psycopg2.extras import bcrypt import jwt import os from datetime import datetime, timedelta app = FastAPI(title="Web Portal API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) SECRET_KEY = os.getenv("JWT_SECRET", "supersecretkey1234") ALGORITHM = "HS256" security = HTTPBearer() DB_CONFIG = { "host": os.getenv("DB_HOST", "postgres-service"), "port": int(os.getenv("DB_PORT", "5432")), "database": os.getenv("DB_NAME", "portaldb"), "user": os.getenv("DB_USER", "portaluser"), "password": os.getenv("DB_PASSWORD", "portalpass"), } def get_db(): conn = psycopg2.connect(**DB_CONFIG) try: yield conn finally: conn.close() def init_db(): conn = psycopg2.connect(**DB_CONFIG) cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, username VARCHAR(100) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, is_admin BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS webpages ( id SERIAL PRIMARY KEY, name VARCHAR(200) NOT NULL, url VARCHAR(500) NOT NULL, description TEXT, created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS user_webpage_access ( user_id INTEGER REFERENCES users(id) ON DELETE CASCADE, webpage_id INTEGER REFERENCES webpages(id) ON DELETE CASCADE, PRIMARY KEY (user_id, webpage_id) ); """) cur.execute("SELECT id FROM users WHERE username = 'admin'") if not cur.fetchone(): hashed = bcrypt.hashpw("admin1234".encode(), bcrypt.gensalt()).decode() cur.execute( "INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, TRUE)", ("admin", hashed) ) cur.execute("SELECT id FROM users WHERE username = 'user1'") if not cur.fetchone(): hashed = bcrypt.hashpw("user1234".encode(), bcrypt.gensalt()).decode() cur.execute( "INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, FALSE)", ("user1", hashed) ) conn.commit() cur.close() conn.close() @app.on_event("startup") def startup(): import time for _ in range(10): try: init_db() print("DB initialized successfully") break except Exception as e: print(f"DB not ready, retrying... {e}") time.sleep(3) def create_token(user_id: int, username: str, is_admin: bool): payload = { "sub": str(user_id), "username": username, "is_admin": is_admin, "exp": datetime.utcnow() + timedelta(hours=8) } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") def require_admin(token=Depends(verify_token)): if not token.get("is_admin"): raise HTTPException(status_code=403, detail="Admin only") return token class LoginRequest(BaseModel): username: str password: str @app.post("/api/auth/login") def login(req: LoginRequest, conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT * FROM users WHERE username = %s", (req.username,)) user = cur.fetchone() if not user or not bcrypt.checkpw(req.password.encode(), user["password_hash"].encode()): raise HTTPException(status_code=401, detail="Invalid credentials") token = create_token(user["id"], user["username"], user["is_admin"]) return {"token": token, "username": user["username"], "is_admin": user["is_admin"]} @app.get("/api/auth/me") def me(token=Depends(verify_token)): return {"username": token["username"], "is_admin": token["is_admin"]} @app.get("/api/my-pages") def my_pages(token=Depends(verify_token), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) user_id = int(token["sub"]) if token["is_admin"]: cur.execute("SELECT * FROM webpages ORDER BY name") else: cur.execute(""" SELECT w.* FROM webpages w JOIN user_webpage_access ua ON w.id = ua.webpage_id WHERE ua.user_id = %s ORDER BY w.name """, (user_id,)) return cur.fetchall() @app.get("/api/admin/webpages") def list_webpages(token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT * FROM webpages ORDER BY name") return cur.fetchall() class WebpageCreate(BaseModel): name: str url: str description: Optional[str] = "" @app.post("/api/admin/webpages") def create_webpage(data: WebpageCreate, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( "INSERT INTO webpages (name, url, description) VALUES (%s, %s, %s) RETURNING *", (data.name, data.url, data.description) ) conn.commit() return cur.fetchone() @app.put("/api/admin/webpages/{page_id}") def update_webpage(page_id: int, data: WebpageCreate, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( "UPDATE webpages SET name=%s, url=%s, description=%s WHERE id=%s RETURNING *", (data.name, data.url, data.description, page_id) ) result = cur.fetchone() if not result: raise HTTPException(status_code=404, detail="Not found") conn.commit() return result @app.delete("/api/admin/webpages/{page_id}") def delete_webpage(page_id: int, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor() cur.execute("DELETE FROM webpages WHERE id = %s", (page_id,)) conn.commit() return {"ok": True} @app.get("/api/admin/users") def list_users(token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("SELECT id, username, is_admin, created_at FROM users ORDER BY username") return cur.fetchall() class UserCreate(BaseModel): username: str password: str is_admin: bool = False @app.post("/api/admin/users") def create_user(data: UserCreate, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) hashed = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode() try: cur.execute( "INSERT INTO users (username, password_hash, is_admin) VALUES (%s, %s, %s) RETURNING id, username, is_admin", (data.username, hashed, data.is_admin) ) conn.commit() return cur.fetchone() except psycopg2.errors.UniqueViolation: raise HTTPException(status_code=400, detail="Username already exists") @app.delete("/api/admin/users/{user_id}") def delete_user(user_id: int, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor() cur.execute("DELETE FROM users WHERE id = %s AND username != 'admin'", (user_id,)) conn.commit() return {"ok": True} @app.get("/api/admin/users/{user_id}/pages") def get_user_pages(user_id: int, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute(""" SELECT w.id, w.name, w.url, w.description, (ua.user_id IS NOT NULL) as has_access FROM webpages w LEFT JOIN user_webpage_access ua ON w.id = ua.webpage_id AND ua.user_id = %s ORDER BY w.name """, (user_id,)) return cur.fetchall() class AccessUpdate(BaseModel): webpage_ids: List[int] @app.put("/api/admin/users/{user_id}/pages") def update_user_pages(user_id: int, data: AccessUpdate, token=Depends(require_admin), conn=Depends(get_db)): cur = conn.cursor() cur.execute("DELETE FROM user_webpage_access WHERE user_id = %s", (user_id,)) for page_id in data.webpage_ids: cur.execute( "INSERT INTO user_webpage_access (user_id, webpage_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (user_id, page_id) ) conn.commit() return {"ok": True} @app.get("/health") def health(): return {"status": "ok"}