import logging import re import secrets import tempfile from datetime import datetime, timedelta from html import escape from pathlib import Path from fastapi import APIRouter, Depends, Header, HTTPException, status from fastapi.responses import FileResponse from sqlalchemy.orm import Session from app.config import settings from app.database import get_db from app.models import HTMLFile from app.schemas import HTMLGenerateRequest, HTMLGenerateResponse router = APIRouter(prefix="/html", tags=["html"]) logger = logging.getLogger(__name__) DANGEROUS_HTML_PATTERNS = ( (re.compile(r"<\s*script\b", re.IGNORECASE), "script tags are not allowed"), (re.compile(r"<\s*iframe\b", re.IGNORECASE), "iframe tags are not allowed"), (re.compile(r"<\s*(?:object|embed|base)\b", re.IGNORECASE), "embedded active content is not allowed"), (re.compile(r"<\s*form\b", re.IGNORECASE), "form tags are not allowed"), (re.compile(r"<\s*link\b", re.IGNORECASE), "external stylesheet or import tags are not allowed"), ( re.compile(r"<\s*meta\b[^>]*http-equiv\s*=\s*['\"]?\s*refresh", re.IGNORECASE), "automatic refresh or redirect is not allowed", ), (re.compile(r"\son[a-z]+\s*=", re.IGNORECASE), "inline event handlers are not allowed"), (re.compile(r"javascript\s*:", re.IGNORECASE), "javascript URLs are not allowed"), ) CONTENT_SECURITY_POLICY = "; ".join( [ "default-src 'none'", "img-src 'self' data: https:", "style-src 'unsafe-inline'", "font-src 'self' data: https:", "media-src https:", "script-src 'none'", "connect-src 'none'", "object-src 'none'", "base-uri 'none'", "form-action 'none'", "frame-ancestors 'none'", ] ) def build_response_headers() -> dict[str, str]: headers = { "X-Content-Type-Options": "nosniff", "Referrer-Policy": "no-referrer", "Cache-Control": "public, max-age=300", } if not settings.allow_unsafe_html: headers["Content-Security-Policy"] = CONTENT_SECURITY_POLICY return headers def require_api_key(x_api_key: str | None = Header(default=None, alias="X-API-Key")) -> None: if not settings.api_key: return if x_api_key != settings.api_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key", ) def build_content_url(unique_id: str) -> str: return f"{settings.public_base_url}{settings.api_prefix}/html/{unique_id}/content" def build_query_url(unique_id: str) -> str: return f"{settings.public_base_url}{settings.api_prefix}/html/{unique_id}" def generate_unique_id(db: Session) -> str: for _ in range(10): unique_id = secrets.token_urlsafe(12).replace("-", "").replace("_", "") if not db.query(HTMLFile.id).filter(HTMLFile.unique_id == unique_id).first(): return unique_id raise RuntimeError("Unable to generate a unique id") def build_html_document(raw_html: str, title: str | None) -> str: normalized_html = raw_html.strip() if re.search(r" {escaped_title}
{normalized_html}
""" def validate_html_safety(html_content: str) -> None: if settings.allow_unsafe_html: return for pattern, message in DANGEROUS_HTML_PATTERNS: if pattern.search(html_content): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Unsafe HTML rejected: {message}", ) def write_html_file(target_path: Path, html_content: str) -> None: target_path.parent.mkdir(parents=True, exist_ok=True) temporary_path: Path | None = None try: with tempfile.NamedTemporaryFile( "w", encoding="utf-8", delete=False, dir=target_path.parent, suffix=".tmp", ) as temporary_file: temporary_file.write(html_content) temporary_path = Path(temporary_file.name) temporary_path.replace(target_path) finally: if temporary_path and temporary_path.exists(): temporary_path.unlink(missing_ok=True) def delete_stored_file(filename: str) -> None: file_path = settings.html_storage_dir / filename if file_path.exists(): file_path.unlink(missing_ok=True) def cleanup_expired_files(db: Session) -> int: expired_records = HTMLFile.list_expired_records( db, settings.default_retention_days, ) if not expired_records: return 0 for record in expired_records: delete_stored_file(record.filename) db.delete(record) db.commit() return len(expired_records) def get_record_or_404(unique_id: str, db: Session) -> HTMLFile: html_file = ( db.query(HTMLFile) .filter(HTMLFile.unique_id == unique_id) .first() ) if html_file is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="HTML file not found", ) expires_at = html_file.resolved_expires_at(settings.default_retention_days) if expires_at <= datetime.utcnow(): delete_stored_file(html_file.filename) db.delete(html_file) db.commit() raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="HTML file has expired", ) return html_file def build_response(html_file: HTMLFile) -> HTMLGenerateResponse: return HTMLGenerateResponse( message="HTML file generated successfully", unique_id=html_file.unique_id, url=build_content_url(html_file.unique_id), query_url=build_query_url(html_file.unique_id), title=html_file.title, source=html_file.source, request_id=html_file.request_id, size_bytes=html_file.size_bytes or 0, created_at=html_file.created_at, expires_at=html_file.resolved_expires_at(settings.default_retention_days), ) @router.post( "/publish", response_model=HTMLGenerateResponse, status_code=status.HTTP_201_CREATED, include_in_schema=False, ) @router.post( "/generate", response_model=HTMLGenerateResponse, status_code=status.HTTP_201_CREATED, summary="Generate and publish an HTML explanation page", description=( "Accepts agent-generated HTML, stores it with a unique random filename, " "and returns a direct access URL." ), ) def generate_html( request: HTMLGenerateRequest, _: None = Depends(require_api_key), db: Session = Depends(get_db), ) -> HTMLGenerateResponse: html_path: Path | None = None try: deleted_count = cleanup_expired_files(db) if deleted_count > 0: logger.info("Deleted %s expired HTML files", deleted_count) validate_html_safety(request.html_content) unique_id = generate_unique_id(db) html_filename = f"{unique_id}.html" html_path = settings.html_storage_dir / html_filename html_document = build_html_document(request.html_content, request.title) expires_at = datetime.utcnow() + timedelta( days=request.ttl_days or settings.default_retention_days ) size_bytes = len(html_document.encode("utf-8")) write_html_file(html_path, html_document) html_file = HTMLFile( unique_id=unique_id, filename=html_filename, title=request.title, source=request.source, request_id=request.request_id, size_bytes=size_bytes, expires_at=expires_at, ) db.add(html_file) db.commit() db.refresh(html_file) return build_response(html_file) except HTTPException: raise except Exception as exc: logger.exception("Failed to generate HTML file") db.rollback() if html_path and html_path.exists(): html_path.unlink(missing_ok=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to generate HTML file: {exc}", ) from exc @router.get( "/{unique_id}", response_model=HTMLGenerateResponse, summary="Query metadata for a generated HTML file", ) def get_html_file(unique_id: str, db: Session = Depends(get_db)) -> HTMLGenerateResponse: html_file = get_record_or_404(unique_id, db) file_path = settings.html_storage_dir / html_file.filename if not file_path.exists(): db.delete(html_file) db.commit() raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="HTML file has been removed from storage", ) return build_response(html_file) @router.get( "/{unique_id}/content", summary="Serve the generated HTML content", response_description="The generated HTML page", ) def get_html_content(unique_id: str, db: Session = Depends(get_db)) -> FileResponse: html_file = get_record_or_404(unique_id, db) file_path = settings.html_storage_dir / html_file.filename if not file_path.exists(): db.delete(html_file) db.commit() raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="HTML file has been removed from storage", ) return FileResponse( path=file_path, media_type="text/html", headers=build_response_headers(), )