Files
DavidSaylor 77b076c7d7 feat: add web UI with login, CRUD, admin, and API key management
- Add login page with JWT authentication
- Add dashboard with stats and quick actions
- Add links management page (full CRUD with search)
- Add collections management page
- Add API key management page with copy-to-clipboard
- Add admin user management page (admin only)
- Fix UUID type mismatches across all endpoints
- Add updated_at column to api_keys and audit_log in schema.sql
- Fix DB_PASSWORD default in docker-compose.yml
- Add PyJWT to requirements.txt
- Fix API docs URL (/docs instead of /api/docs)
- Improve JS error handling (show actual messages)
- Rewrite conftest.py with proper DB lifecycle management
- Add 42 new integration tests (84 total, all passing)
  - test_admin.py: 15 tests for admin endpoints
  - test_auth_extended.py: 9 tests for API key CRUD
  - test_tags.py: 12 tests for tag endpoints
  - test_sync.py: 6 tests for sync endpoints
2026-05-21 07:21:49 -05:00

314 lines
9.5 KiB
Python

"""
LinkSyncServer - Authentication Endpoints
"""
import hashlib
import os
import secrets
import uuid
from datetime import datetime, timedelta
from typing import List, Optional
import bcrypt
import jwt
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr
from config.settings import settings
from models.base import ApiKey, User, get_session
router = APIRouter(prefix="/api/auth", tags=["Authentication"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False)
class RegisterRequest(BaseModel):
username: str
email: EmailStr
password: str
is_admin: bool = False
class TokenResponse(BaseModel):
access_token: str
token_type: str
user: dict
class ApiKeyResponse(BaseModel):
api_key: str
key_id: str
name: str
expires_at: Optional[str] = None
def hash_password(password: str) -> str:
return bcrypt.hashpw(
password.encode("utf-8"),
bcrypt.gensalt(rounds=settings.BCRYPT_COST_FACTOR),
).decode("utf-8")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return bcrypt.checkpw(
plain_password.encode("utf-8"), hashed_password.encode("utf-8")
)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (
expires_delta or timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
return {"username": username, "role": payload.get("role", "user")}
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
def require_admin(current_user: dict = Depends(get_current_user)) -> dict:
if current_user.get("role") != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return current_user
def get_db():
session = get_session()
try:
yield session
finally:
session.close()
@router.post("/register", response_model=dict)
async def register(data: RegisterRequest):
db = get_session()
try:
existing = db.query(User).filter(
(User.username == data.username) | (User.email == data.email)
).first()
if existing:
raise HTTPException(status_code=400, detail="Username or email already exists")
user = User(
username=data.username,
email=data.email,
password_hash=hash_password(data.password),
role="admin" if data.is_admin else "user",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return {
"message": "User registered successfully",
"user": user.to_dict(),
}
finally:
db.close()
@router.post("/login", response_model=TokenResponse)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
db = get_session()
try:
if (
form_data.username == settings.ADMIN_USERNAME
and form_data.password == settings.ADMIN_PASSWORD
):
user = db.query(User).filter(User.username == settings.ADMIN_USERNAME).first()
if not user:
user = User(
username=settings.ADMIN_USERNAME,
email="admin@linksync.local",
password_hash=hash_password(settings.ADMIN_PASSWORD),
role="admin",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
token = create_access_token(
data={"sub": user.username, "role": user.role, "type": "access"}
)
return {
"access_token": token,
"token_type": "bearer",
"user": {"username": user.username, "role": user.role},
}
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid credentials")
if not user.is_active:
raise HTTPException(status_code=403, detail="Account disabled")
token = create_access_token(
data={"sub": user.username, "role": user.role, "type": "access"}
)
return {
"access_token": token,
"token_type": "bearer",
"user": {"username": user.username, "role": user.role},
}
finally:
db.close()
@router.post("/logout")
async def logout():
return {"message": "Logged out successfully"}
@router.get("/api-keys", response_model=List[dict])
async def list_api_keys(
current_user: dict = Depends(get_current_user),
):
db = get_session()
try:
user = db.query(User).filter(User.username == current_user["username"]).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
api_keys = db.query(ApiKey).filter(ApiKey.user_id == user.id).order_by(ApiKey.created_at.desc()).all()
return [
{
"id": str(key.id),
"key_id": str(key.id),
"name": key.name,
"is_active": key.is_active,
"expires_at": key.expires_at.isoformat() if key.expires_at else None,
"created_at": key.created_at.isoformat() if key.created_at else None,
}
for key in api_keys
]
finally:
db.close()
@router.post("/api-key", response_model=ApiKeyResponse)
async def create_api_key(
name: str = "default",
current_user: dict = Depends(get_current_user),
):
db = get_session()
try:
raw_key = secrets.token_urlsafe(64)
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
user = db.query(User).filter(User.username == current_user["username"]).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
api_key = ApiKey(
user_id=user.id,
key_hash=key_hash,
name=name,
is_active=True,
)
db.add(api_key)
db.commit()
db.refresh(api_key)
return {
"api_key": raw_key,
"key_id": str(api_key.id),
"name": api_key.name,
"expires_at": api_key.expires_at.isoformat() if api_key.expires_at else None,
}
finally:
db.close()
@router.get("/api-key/{key_id}")
async def get_api_key_info(
key_id: str,
current_user: dict = Depends(get_current_user),
):
db = get_session()
try:
user = db.query(User).filter(User.username == current_user["username"]).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
try:
parsed_key_id = uuid.UUID(key_id)
except (ValueError, AttributeError):
raise HTTPException(status_code=404, detail="API key not found")
api_key = db.query(ApiKey).filter(
ApiKey.id == parsed_key_id, ApiKey.user_id == user.id
).first()
if not api_key:
raise HTTPException(status_code=404, detail="API key not found")
return {
"key_id": str(api_key.id),
"name": api_key.name,
"is_active": api_key.is_active,
"expires_at": api_key.expires_at.isoformat() if api_key.expires_at else None,
"created_at": api_key.created_at.isoformat() if api_key.created_at else None,
}
finally:
db.close()
@router.delete("/api-key/{key_id}")
async def delete_api_key(
key_id: str,
current_user: dict = Depends(get_current_user),
):
db = get_session()
try:
user = db.query(User).filter(User.username == current_user["username"]).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
try:
parsed_key_id = uuid.UUID(key_id)
except (ValueError, AttributeError):
raise HTTPException(status_code=404, detail="API key not found")
api_key = db.query(ApiKey).filter(
ApiKey.id == parsed_key_id, ApiKey.user_id == user.id
).first()
if not api_key:
raise HTTPException(status_code=404, detail="API key not found")
db.delete(api_key)
db.commit()
return {"message": "API key deleted successfully"}
finally:
db.close()
@router.get("/me")
async def get_current_user_info(current_user: dict = Depends(get_current_user)):
db = get_session()
try:
user = db.query(User).filter(User.username == current_user["username"]).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user.to_dict()
finally:
db.close()