- Add login page with JWT authentication - Add dashboard with stats and quick actions - Add links management page (full CRUD with search) - Add collections management page - Add API key management page with copy-to-clipboard - Add admin user management page (admin only) - Fix UUID type mismatches across all endpoints - Add updated_at column to api_keys and audit_log in schema.sql - Fix DB_PASSWORD default in docker-compose.yml - Add PyJWT to requirements.txt - Fix API docs URL (/docs instead of /api/docs) - Improve JS error handling (show actual messages) - Rewrite conftest.py with proper DB lifecycle management - Add 42 new integration tests (84 total, all passing) - test_admin.py: 15 tests for admin endpoints - test_auth_extended.py: 9 tests for API key CRUD - test_tags.py: 12 tests for tag endpoints - test_sync.py: 6 tests for sync endpoints
204 lines
6.1 KiB
Python
204 lines
6.1 KiB
Python
"""
|
|
LinkSyncServer - Admin Endpoints
|
|
"""
|
|
|
|
import uuid
|
|
from typing import List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
|
from pydantic import BaseModel, EmailStr, Field
|
|
|
|
from api.endpoints.auth import hash_password, require_admin
|
|
from models.base import AuditLog, Bookmark, Collection, Tag, User, get_session
|
|
|
|
router = APIRouter(prefix="/api/admin", tags=["Admin"])
|
|
|
|
|
|
class UserCreate(BaseModel):
|
|
username: str
|
|
email: EmailStr
|
|
password: str
|
|
role: str = Field(default="user", pattern="^(admin|user)$")
|
|
is_active: bool = True
|
|
|
|
|
|
class UserUpdate(BaseModel):
|
|
email: Optional[EmailStr] = None
|
|
role: Optional[str] = Field(None, pattern="^(admin|user)$")
|
|
is_active: Optional[bool] = None
|
|
password: Optional[str] = None
|
|
|
|
|
|
class SettingsUpdate(BaseModel):
|
|
debug: Optional[bool] = None
|
|
cors_origins: Optional[str] = None
|
|
|
|
|
|
def parse_uuid(id_str: str):
|
|
try:
|
|
return uuid.UUID(id_str) if isinstance(id_str, str) else id_str
|
|
except (ValueError, AttributeError):
|
|
return None
|
|
|
|
|
|
@router.get("/users", response_model=List[dict])
|
|
async def list_users(
|
|
limit: int = Query(20, le=100, ge=1),
|
|
offset: int = Query(0, ge=0),
|
|
current_admin: dict = Depends(require_admin),
|
|
):
|
|
db = get_session()
|
|
try:
|
|
users = db.query(User).order_by(User.created_at.desc()).offset(offset).limit(limit).all()
|
|
return [u.to_dict() for u in users]
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.post("/users", response_model=dict, status_code=status.HTTP_201_CREATED)
|
|
async def create_user(
|
|
data: UserCreate,
|
|
current_admin: dict = Depends(require_admin),
|
|
):
|
|
db = get_session()
|
|
try:
|
|
existing = db.query(User).filter(
|
|
(User.username == data.username) | (User.email == data.email)
|
|
).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Username or email already exists")
|
|
|
|
user = User(
|
|
id=uuid.uuid4(),
|
|
username=data.username,
|
|
email=data.email,
|
|
password_hash=hash_password(data.password),
|
|
role=data.role,
|
|
is_active=data.is_active,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user.to_dict()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/users/{user_id}", response_model=dict)
|
|
async def get_user(
|
|
user_id: str,
|
|
current_admin: dict = Depends(require_admin),
|
|
):
|
|
db = get_session()
|
|
try:
|
|
parsed_id = parse_uuid(user_id)
|
|
if parsed_id is None:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
user = db.query(User).filter(User.id == parsed_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user.to_dict()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.put("/users/{user_id}", response_model=dict)
|
|
async def update_user(
|
|
user_id: str,
|
|
data: UserUpdate,
|
|
current_admin: dict = Depends(require_admin),
|
|
):
|
|
db = get_session()
|
|
try:
|
|
parsed_id = parse_uuid(user_id)
|
|
if parsed_id is None:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
user = db.query(User).filter(User.id == parsed_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
update_data = data.model_dump(exclude_unset=True)
|
|
if "password" in update_data:
|
|
update_data["password_hash"] = hash_password(update_data.pop("password"))
|
|
|
|
for field, value in update_data.items():
|
|
setattr(user, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user.to_dict()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.delete("/users/{user_id}", response_model=dict)
|
|
async def delete_user(
|
|
user_id: str,
|
|
current_admin: dict = Depends(require_admin),
|
|
):
|
|
db = get_session()
|
|
try:
|
|
parsed_id = parse_uuid(user_id)
|
|
if parsed_id is None:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
user = db.query(User).filter(User.id == parsed_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
if user.username == current_admin.get("username"):
|
|
raise HTTPException(status_code=400, detail="Cannot delete yourself")
|
|
|
|
db.delete(user)
|
|
db.commit()
|
|
return {"message": "User deleted successfully", "deleted_id": user_id}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/stats", response_model=dict)
|
|
async def get_system_stats(current_admin: dict = Depends(require_admin)):
|
|
db = get_session()
|
|
try:
|
|
return {
|
|
"total_users": db.query(User).count(),
|
|
"total_bookmarks": db.query(Bookmark).count(),
|
|
"total_collections": db.query(Collection).count(),
|
|
"total_tags": db.query(Tag).count(),
|
|
"total_audit_logs": db.query(AuditLog).count(),
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/audit", response_model=List[dict])
|
|
async def get_audit_log(
|
|
limit: int = Query(50, le=200, ge=1),
|
|
offset: int = Query(0, ge=0),
|
|
entity_type: Optional[str] = Query(None),
|
|
action: Optional[str] = Query(None),
|
|
current_admin: dict = Depends(require_admin),
|
|
):
|
|
db = get_session()
|
|
try:
|
|
query = db.query(AuditLog)
|
|
if entity_type:
|
|
query = query.filter(AuditLog.entity_type == entity_type)
|
|
if action:
|
|
query = query.filter(AuditLog.action == action)
|
|
logs = query.order_by(AuditLog.created_at.desc()).offset(offset).limit(limit).all()
|
|
return [
|
|
{
|
|
"id": str(log.id),
|
|
"user_id": str(log.user_id) if log.user_id else None,
|
|
"action": log.action,
|
|
"entity_type": log.entity_type,
|
|
"entity_id": str(log.entity_id) if log.entity_id else None,
|
|
"old_value": log.old_value,
|
|
"new_value": log.new_value,
|
|
"ip_address": str(log.ip_address) if log.ip_address else None,
|
|
"created_at": log.created_at.isoformat() if log.created_at else None,
|
|
}
|
|
for log in logs
|
|
]
|
|
finally:
|
|
db.close()
|