Files
myworkspace/LinkSyncServer/api/endpoints/auth.py
DavidSaylor 09d30427f4 Complete LinkSyncServer and LinkSyncExtension implementation
LinkSyncServer:
- Fix app.py imports, add CORS middleware, lifespan events
- Create api/routes.py router aggregator
- Create config/settings.py for centralized configuration
- Rewrite models/base.py with proper relationships and serialization
- Rewrite all API endpoints with real DB integration (auth, links, collections, sync, queries, tags)
- Add admin endpoints (user management, stats, audit log)
- Complete query parser with recursive descent and proper precedence
- Complete query executor with set operations and field filters
- Set up Alembic migrations with initial schema
- Create web interface (templates, CSS, JS)
- Add 42 passing tests (auth, links, collections, queries)
- Add deploy.ps1 and deploy.sh scripts
- Update README with deployment workflow

LinkSyncExtension:
- Create utils/api.js (REST client with retries, auth, error handling)
- Create utils/sync.js (3 sync modes + conflict detection)
- Create utils/collection.js (collection management)
- Create utils/query-engine.js (client-side query parser)
- Rewrite background.js (sync loop, bookmark events, message routing)
- Rewrite popup.js (tabs, settings modal, notifications, CRUD)
- Update popup.html (tabbed interface, query builder, modal)
- Update popup.css (full redesign)
- Create content/content.js (page metadata extraction)
- Create options.html/js (dedicated settings page)
- Generate icons (48x48, 96x96)
- Update manifest.json (host permissions, content scripts, options)
- Create AGENTS.md
2026-05-19 13:21:26 -05:00

277 lines
8.2 KiB
Python

"""
LinkSyncServer - Authentication Endpoints
"""
import hashlib
import os
import secrets
from datetime import datetime, timedelta
from typing import Optional
import bcrypt
import jwt
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr
from config.settings import settings
from models.base import ApiKey, User, get_session
router = APIRouter(prefix="/api/auth", tags=["Authentication"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False)
class RegisterRequest(BaseModel):
username: str
email: EmailStr
password: str
is_admin: bool = False
class TokenResponse(BaseModel):
access_token: str
token_type: str
user: dict
class ApiKeyResponse(BaseModel):
api_key: str
key_id: str
name: str
expires_at: Optional[str] = None
def hash_password(password: str) -> str:
return bcrypt.hashpw(
password.encode("utf-8"),
bcrypt.gensalt(rounds=settings.BCRYPT_COST_FACTOR),
).decode("utf-8")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return bcrypt.checkpw(
plain_password.encode("utf-8"), hashed_password.encode("utf-8")
)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (
expires_delta or timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
return {"username": username, "role": payload.get("role", "user")}
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
def require_admin(current_user: dict = Depends(get_current_user)) -> dict:
if current_user.get("role") != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return current_user
def get_db():
session = get_session()
try:
yield session
finally:
session.close()
@router.post("/register", response_model=dict)
async def register(data: RegisterRequest):
db = get_session()
try:
existing = db.query(User).filter(
(User.username == data.username) | (User.email == data.email)
).first()
if existing:
raise HTTPException(status_code=400, detail="Username or email already exists")
user = User(
username=data.username,
email=data.email,
password_hash=hash_password(data.password),
role="admin" if data.is_admin else "user",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return {
"message": "User registered successfully",
"user": user.to_dict(),
}
finally:
db.close()
@router.post("/login", response_model=TokenResponse)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
db = get_session()
try:
if (
form_data.username == settings.ADMIN_USERNAME
and form_data.password == settings.ADMIN_PASSWORD
):
user = db.query(User).filter(User.username == settings.ADMIN_USERNAME).first()
if not user:
user = User(
username=settings.ADMIN_USERNAME,
email="admin@linksync.local",
password_hash=hash_password(settings.ADMIN_PASSWORD),
role="admin",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
token = create_access_token(
data={"sub": user.username, "role": user.role, "type": "access"}
)
return {
"access_token": token,
"token_type": "bearer",
"user": {"username": user.username, "role": user.role},
}
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid credentials")
if not user.is_active:
raise HTTPException(status_code=403, detail="Account disabled")
token = create_access_token(
data={"sub": user.username, "role": user.role, "type": "access"}
)
return {
"access_token": token,
"token_type": "bearer",
"user": {"username": user.username, "role": user.role},
}
finally:
db.close()
@router.post("/logout")
async def logout():
return {"message": "Logged out successfully"}
@router.post("/api-key", response_model=ApiKeyResponse)
async def create_api_key(
name: str = "default",
current_user: dict = Depends(get_current_user),
):
db = get_session()
try:
raw_key = secrets.token_urlsafe(64)
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
user = db.query(User).filter(User.username == current_user["username"]).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
api_key = ApiKey(
user_id=user.id,
key_hash=key_hash,
name=name,
is_active=True,
)
db.add(api_key)
db.commit()
db.refresh(api_key)
return {
"api_key": raw_key,
"key_id": api_key.id,
"name": api_key.name,
"expires_at": api_key.expires_at.isoformat() if api_key.expires_at else None,
}
finally:
db.close()
@router.get("/api-key/{key_id}")
async def get_api_key_info(
key_id: str,
current_user: dict = Depends(get_current_user),
):
db = get_session()
try:
user = db.query(User).filter(User.username == current_user["username"]).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
api_key = db.query(ApiKey).filter(
ApiKey.id == key_id, ApiKey.user_id == user.id
).first()
if not api_key:
raise HTTPException(status_code=404, detail="API key not found")
return {
"key_id": api_key.id,
"name": api_key.name,
"is_active": api_key.is_active,
"expires_at": api_key.expires_at.isoformat() if api_key.expires_at else None,
"created_at": api_key.created_at.isoformat() if api_key.created_at else None,
}
finally:
db.close()
@router.delete("/api-key/{key_id}")
async def delete_api_key(
key_id: str,
current_user: dict = Depends(get_current_user),
):
db = get_session()
try:
user = db.query(User).filter(User.username == current_user["username"]).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
api_key = db.query(ApiKey).filter(
ApiKey.id == key_id, ApiKey.user_id == user.id
).first()
if not api_key:
raise HTTPException(status_code=404, detail="API key not found")
db.delete(api_key)
db.commit()
return {"message": "API key deleted successfully"}
finally:
db.close()
@router.get("/me")
async def get_current_user_info(current_user: dict = Depends(get_current_user)):
db = get_session()
try:
user = db.query(User).filter(User.username == current_user["username"]).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user.to_dict()
finally:
db.close()