# -*- coding: utf-8 -*- """M5 协作 Web 编辑器后端(FastAPI 单文件)。 少数人 + 共享口令;事件存 SQLite;校验/编译走 ir_core(与 CLI 同口径)。 起服务: pip install -r requirements.txt set STORY_WEB_PASSWORD=your-pass (默认 story) uvicorn app:app --host 0.0.0.0 --port 8787 浏览器打开 http://:8787 。 """ import datetime import io import json import os import sys import zipfile from fastapi import FastAPI, Request, Response from fastapi.responses import JSONResponse, FileResponse, StreamingResponse from fastapi.staticfiles import StaticFiles _HERE = os.path.dirname(os.path.abspath(__file__)) _AUTHORING = os.path.dirname(_HERE) # tools/event_authoring _PROJ = os.path.dirname(os.path.dirname(_AUTHORING)) # 项目根 sys.path.insert(0, _AUTHORING) import ir_core # noqa: E402 (tools/event_authoring/ir_core) import db # noqa: E402 _DICT_PATH = os.path.join(_AUTHORING, "ir_dictionary.json") # 点位集目录:容器内用 STORY_POINTSETS_DIR 指向挂载卷;本地默认指向项目 Assets。 _POINTSETS_DIR = os.environ.get("STORY_POINTSETS_DIR") or \ os.path.join(_PROJ, "Assets", "StreamingAssets", "Story", "PointSets") _STATIC_DIR = os.path.join(_HERE, "static") PASSWORD = os.environ.get("STORY_WEB_PASSWORD", "story") COOKIE = "story_auth" db.init_db() app = FastAPI(title="Story Event Web Editor (M5)") def _now(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # ---------- 鉴权中间件 ---------- @app.middleware("http") async def auth_guard(request: Request, call_next): path = request.url.path # 放行登录、静态资源、根 if path.startswith("/api/") and path != "/api/login": if request.cookies.get(COOKIE) != PASSWORD: return JSONResponse({"error": "未授权"}, status_code=401) return await call_next(request) # ---------- 鉴权 ---------- @app.post("/api/login") async def login(request: Request): body = await request.json() if body.get("password") != PASSWORD: return JSONResponse({"error": "口令错误"}, status_code=403) resp = JSONResponse({"ok": True}) resp.set_cookie(COOKIE, PASSWORD, httponly=True, samesite="lax", max_age=30 * 86400) return resp @app.post("/api/logout") async def logout(): resp = JSONResponse({"ok": True}) resp.delete_cookie(COOKIE) return resp # ---------- 词典 / 点位集(驱动前端下拉与试走) ---------- @app.get("/api/dictionary") async def dictionary(): with open(_DICT_PATH, encoding="utf-8") as f: return json.load(f) @app.get("/api/pointsets") async def pointsets(): out = {} if os.path.isdir(_POINTSETS_DIR): for fn in os.listdir(_POINTSETS_DIR): if fn.endswith(".points.json"): name = fn[: -len(".points.json")] try: with open(os.path.join(_POINTSETS_DIR, fn), encoding="utf-8") as f: ps = json.load(f) out[name] = { "mapId": ps.get("mapId", ""), "points": [p.get("name") for p in ps.get("points", [])], } except Exception as e: out[name] = {"error": str(e)} return out # ---------- 事件 CRUD ---------- @app.get("/api/events") async def events(status: str = "all"): return db.list_events(status) @app.get("/api/events/{group}") async def event_detail(group: str): d = db.get_event(group) if not d: return JSONResponse({"error": "不存在"}, status_code=404) return d @app.post("/api/import") async def import_events(request: Request): body = await request.json() by = body.get("by", "匿名") items = body.get("events", []) if isinstance(items, dict): # 容错:单个 IR items = [items] saved, errors = [], [] for ir in items: if not isinstance(ir, dict) or "id" not in ir: errors.append("缺少 id 字段的条目已跳过") continue db.upsert_event(ir, by, _now()) saved.append(ir["id"]) return {"saved": saved, "errors": errors} @app.put("/api/events/{group}") async def update_event(group: str, request: Request): body = await request.json() ir = body.get("ir") if not ir or ir.get("id") != group: return JSONResponse({"error": "ir.id 与 group 不一致"}, status_code=400) db.upsert_event(ir, body.get("by", "匿名"), _now(), notes=body.get("notes")) return {"ok": True, "updated_at": _now()} @app.post("/api/events/{group}/status") async def change_status(group: str, request: Request): body = await request.json() ok = db.set_status(group, body.get("status"), body.get("by", "匿名"), _now()) if not ok: return JSONResponse({"error": "事件不存在"}, status_code=404) return {"ok": True} # ---------- 校验 ---------- @app.post("/api/validate") async def validate(request: Request): body = await request.json() ir = body.get("ir") if not ir: return JSONResponse({"error": "缺少 ir"}, status_code=400) dic = ir_core.load_dictionary(_DICT_PATH) try: errs, warns = ir_core.validate(ir, dic, points_dir=_POINTSETS_DIR) except Exception as e: return {"errors": ["[校验异常] %s" % e], "warnings": []} return {"errors": errs, "warnings": warns} # ---------- 导出(编译所有 confirmed -> zip) ---------- @app.post("/api/export") async def export_zip(): dic = ir_core.load_dictionary(_DICT_PATH) confirmed = db.confirmed_events() if not confirmed: return JSONResponse({"error": "没有 confirmed 事件可导出"}, status_code=422) # 校验门:任一 confirmed 有 error 即整体拒绝 report = {} blocked = False for group, ir in confirmed: errs, warns = ir_core.validate(ir, dic, points_dir=_POINTSETS_DIR) report[group] = {"errors": errs, "warnings": warns} if errs: blocked = True if blocked: return JSONResponse({"error": "存在校验失败的 confirmed 事件,已拒绝导出", "report": report}, status_code=422) buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z: for group, ir in confirmed: rows = ir_core.compile_ir(ir, dic) z.writestr(group + ".events.json", json.dumps(rows, ensure_ascii=False, indent=2)) texts = ir_core.extract_texts(ir) tsv = ["# 简体中文(key,勿改)\t韩文(待译;繁体无需填,SGameText 自动转换)"] tsv += ["%s\t" % t for t in texts] z.writestr(group + ".i18n.tsv", "\n".join(tsv)) buf.seek(0) return StreamingResponse( buf, media_type="application/zip", headers={"Content-Disposition": 'attachment; filename="story_export.zip"'}) # ---------- 静态前端 ---------- @app.get("/") async def index(): return FileResponse(os.path.join(_STATIC_DIR, "index.html")) app.mount("/", StaticFiles(directory=_STATIC_DIR), name="static")