Files
minimap1.1/backend/app/routers/html.py
2026-03-21 21:27:20 +08:00

333 lines
9.9 KiB
Python

import logging
import re
import secrets
import tempfile
from datetime import datetime, timedelta
from html import escape
from pathlib import Path
from fastapi import APIRouter, Depends, Header, HTTPException, status
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from app.config import settings
from app.database import get_db
from app.models import HTMLFile
from app.schemas import HTMLGenerateRequest, HTMLGenerateResponse
router = APIRouter(prefix="/html", tags=["html"])
logger = logging.getLogger(__name__)
DANGEROUS_HTML_PATTERNS = (
(re.compile(r"<\s*script\b", re.IGNORECASE), "script tags are not allowed"),
(re.compile(r"<\s*iframe\b", re.IGNORECASE), "iframe tags are not allowed"),
(re.compile(r"<\s*(?:object|embed|base)\b", re.IGNORECASE), "embedded active content is not allowed"),
(re.compile(r"<\s*form\b", re.IGNORECASE), "form tags are not allowed"),
(re.compile(r"<\s*link\b", re.IGNORECASE), "external stylesheet or import tags are not allowed"),
(
re.compile(r"<\s*meta\b[^>]*http-equiv\s*=\s*['\"]?\s*refresh", re.IGNORECASE),
"automatic refresh or redirect is not allowed",
),
(re.compile(r"\son[a-z]+\s*=", re.IGNORECASE), "inline event handlers are not allowed"),
(re.compile(r"javascript\s*:", re.IGNORECASE), "javascript URLs are not allowed"),
)
CONTENT_SECURITY_POLICY = "; ".join(
[
"default-src 'none'",
"img-src 'self' data: https:",
"style-src 'unsafe-inline'",
"font-src 'self' data: https:",
"media-src https:",
"script-src 'none'",
"connect-src 'none'",
"object-src 'none'",
"base-uri 'none'",
"form-action 'none'",
"frame-ancestors 'none'",
]
)
def build_response_headers() -> dict[str, str]:
headers = {
"X-Content-Type-Options": "nosniff",
"Referrer-Policy": "no-referrer",
"Cache-Control": "public, max-age=300",
}
if not settings.allow_unsafe_html:
headers["Content-Security-Policy"] = CONTENT_SECURITY_POLICY
return headers
def require_api_key(x_api_key: str | None = Header(default=None, alias="X-API-Key")) -> None:
if not settings.api_key:
return
if x_api_key != settings.api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
)
def build_content_url(unique_id: str) -> str:
return f"{settings.public_base_url}{settings.api_prefix}/html/{unique_id}/content"
def build_query_url(unique_id: str) -> str:
return f"{settings.public_base_url}{settings.api_prefix}/html/{unique_id}"
def generate_unique_id(db: Session) -> str:
for _ in range(10):
unique_id = secrets.token_urlsafe(12).replace("-", "").replace("_", "")
if not db.query(HTMLFile.id).filter(HTMLFile.unique_id == unique_id).first():
return unique_id
raise RuntimeError("Unable to generate a unique id")
def build_html_document(raw_html: str, title: str | None) -> str:
normalized_html = raw_html.strip()
if re.search(r"<!doctype\s+html|<html\b", normalized_html, re.IGNORECASE):
return normalized_html
escaped_title = escape(title or "知识点讲解")
return f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>{escaped_title}</title>
<style>
:root {{
color-scheme: light;
}}
* {{
box-sizing: border-box;
}}
body {{
margin: 0;
background: #f5f7fb;
color: #18202a;
font-family: "PingFang SC", "Microsoft YaHei", sans-serif;
line-height: 1.75;
}}
main {{
max-width: 960px;
margin: 0 auto;
padding: 32px 20px 48px;
}}
</style>
</head>
<body>
<main>
{normalized_html}
</main>
</body>
</html>
"""
def validate_html_safety(html_content: str) -> None:
if settings.allow_unsafe_html:
return
for pattern, message in DANGEROUS_HTML_PATTERNS:
if pattern.search(html_content):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsafe HTML rejected: {message}",
)
def write_html_file(target_path: Path, html_content: str) -> None:
target_path.parent.mkdir(parents=True, exist_ok=True)
temporary_path: Path | None = None
try:
with tempfile.NamedTemporaryFile(
"w",
encoding="utf-8",
delete=False,
dir=target_path.parent,
suffix=".tmp",
) as temporary_file:
temporary_file.write(html_content)
temporary_path = Path(temporary_file.name)
temporary_path.replace(target_path)
finally:
if temporary_path and temporary_path.exists():
temporary_path.unlink(missing_ok=True)
def delete_stored_file(filename: str) -> None:
file_path = settings.html_storage_dir / filename
if file_path.exists():
file_path.unlink(missing_ok=True)
def cleanup_expired_files(db: Session) -> int:
expired_records = HTMLFile.list_expired_records(
db,
settings.default_retention_days,
)
if not expired_records:
return 0
for record in expired_records:
delete_stored_file(record.filename)
db.delete(record)
db.commit()
return len(expired_records)
def get_record_or_404(unique_id: str, db: Session) -> HTMLFile:
html_file = (
db.query(HTMLFile)
.filter(HTMLFile.unique_id == unique_id)
.first()
)
if html_file is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="HTML file not found",
)
expires_at = html_file.resolved_expires_at(settings.default_retention_days)
if expires_at <= datetime.utcnow():
delete_stored_file(html_file.filename)
db.delete(html_file)
db.commit()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="HTML file has expired",
)
return html_file
def build_response(html_file: HTMLFile) -> HTMLGenerateResponse:
return HTMLGenerateResponse(
message="HTML file generated successfully",
unique_id=html_file.unique_id,
url=build_content_url(html_file.unique_id),
query_url=build_query_url(html_file.unique_id),
title=html_file.title,
source=html_file.source,
request_id=html_file.request_id,
size_bytes=html_file.size_bytes or 0,
created_at=html_file.created_at,
expires_at=html_file.resolved_expires_at(settings.default_retention_days),
)
@router.post(
"/generate",
response_model=HTMLGenerateResponse,
status_code=status.HTTP_201_CREATED,
summary="Generate and publish an HTML explanation page",
description=(
"Accepts agent-generated HTML, stores it with a unique random filename, "
"and returns a direct access URL."
),
)
def generate_html(
request: HTMLGenerateRequest,
_: None = Depends(require_api_key),
db: Session = Depends(get_db),
) -> HTMLGenerateResponse:
html_path: Path | None = None
try:
deleted_count = cleanup_expired_files(db)
if deleted_count > 0:
logger.info("Deleted %s expired HTML files", deleted_count)
validate_html_safety(request.html_content)
unique_id = generate_unique_id(db)
html_filename = f"{unique_id}.html"
html_path = settings.html_storage_dir / html_filename
html_document = build_html_document(request.html_content, request.title)
expires_at = datetime.utcnow() + timedelta(
days=request.ttl_days or settings.default_retention_days
)
size_bytes = len(html_document.encode("utf-8"))
write_html_file(html_path, html_document)
html_file = HTMLFile(
unique_id=unique_id,
filename=html_filename,
title=request.title,
source=request.source,
request_id=request.request_id,
size_bytes=size_bytes,
expires_at=expires_at,
)
db.add(html_file)
db.commit()
db.refresh(html_file)
return build_response(html_file)
except HTTPException:
raise
except Exception as exc:
logger.exception("Failed to generate HTML file")
db.rollback()
if html_path and html_path.exists():
html_path.unlink(missing_ok=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to generate HTML file: {exc}",
) from exc
@router.get(
"/{unique_id}",
response_model=HTMLGenerateResponse,
summary="Query metadata for a generated HTML file",
)
def get_html_file(unique_id: str, db: Session = Depends(get_db)) -> HTMLGenerateResponse:
html_file = get_record_or_404(unique_id, db)
file_path = settings.html_storage_dir / html_file.filename
if not file_path.exists():
db.delete(html_file)
db.commit()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="HTML file has been removed from storage",
)
return build_response(html_file)
@router.get(
"/{unique_id}/content",
summary="Serve the generated HTML content",
response_description="The generated HTML page",
)
def get_html_content(unique_id: str, db: Session = Depends(get_db)) -> FileResponse:
html_file = get_record_or_404(unique_id, db)
file_path = settings.html_storage_dir / html_file.filename
if not file_path.exists():
db.delete(html_file)
db.commit()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="HTML file has been removed from storage",
)
return FileResponse(
path=file_path,
media_type="text/html",
headers=build_response_headers(),
)