# -*- coding: utf-8 -*- """M5 协作 Web 编辑器后端(FastAPI 单文件)。 少数人,每人一把专属口令(口令即身份);事件存 SQLite;校验/编译走 ir_core(与 CLI 同口径)。 起服务: pip install -r requirements.txt set STORY_WEB_USERS=bia:口令A,ljl:口令B (未配置则拒绝启动) uvicorn app:app --host 0.0.0.0 --port 8787 浏览器打开 http://:8787 。 """ import asyncio import datetime import io import json import os import re import secrets import sys import time import zipfile from fastapi import FastAPI, Request, Response from fastapi.responses import JSONResponse, FileResponse, StreamingResponse, HTMLResponse from fastapi.staticfiles import StaticFiles _HERE = os.path.dirname(os.path.abspath(__file__)) _AUTHORING = os.path.dirname(_HERE) # tools/event_authoring _PROJ = os.path.dirname(os.path.dirname(_AUTHORING)) # 项目根 sys.path.insert(0, _AUTHORING) import ir_core # noqa: E402 (tools/event_authoring/ir_core) import db # noqa: E402 _DICT_PATH = os.path.join(_AUTHORING, "ir_dictionary.json") # 点位集目录:容器内用 STORY_POINTSETS_DIR 指向挂载卷;本地默认指向项目 Assets。 _POINTSETS_DIR = os.environ.get("STORY_POINTSETS_DIR") or \ os.path.join(_PROJ, "Assets", "StreamingAssets", "Story", "PointSets") # 场景俯视底图目录:Unity「剧情场景俯视抓拍」产出 {name}.png + {name}.shot.json(含覆盖的 map-local 范围)。 _SCENESHOTS_DIR = os.environ.get("STORY_SCENESHOTS_DIR") or \ os.path.join(_PROJ, "Assets", "StreamingAssets", "Story", "SceneShots") _STATIC_DIR = os.path.join(_HERE, "static") COOKIE = "story_auth" SESSION_DAYS = 30 # 免鉴权开关:仅本地开发用。设 STORY_WEB_NO_AUTH=1 时跳过登录(不要在线上设)。 # 线上不设此变量 → 行为不变,照常要口令。 NO_AUTH = (os.environ.get("STORY_WEB_NO_AUTH") or "").strip().lower() in ("1", "true", "yes", "on") def _load_users(): """解析 STORY_WEB_USERS="名字1:口令1,名字2:口令2"。返回 {口令: 名字}。 未配置/格式错/口令过短/口令重复 → 直接拒绝启动(宁可起不来,不可弱口令裸奔)。 口令即身份:登录只输口令,服务端按口令认人,updated_by 由服务端填写。 """ if NO_AUTH: return {} # 免鉴权模式(仅本地):无需口令,不校验 STORY_WEB_USERS raw = (os.environ.get("STORY_WEB_USERS") or "").strip() if not raw: sys.exit('[story-web] 未配置 STORY_WEB_USERS,拒绝启动。' '格式: STORY_WEB_USERS="bia:口令A,ljl:口令B"(每人口令≥8位且互不相同;' '旧的 STORY_WEB_PASSWORD 已废弃)') users = {} names = set() for part in raw.split(","): part = part.strip() if not part: continue if ":" not in part: sys.exit('[story-web] STORY_WEB_USERS 条目格式错误(应为 名字:口令): %r' % part) name, pw = part.split(":", 1) name, pw = name.strip(), pw.strip() if not name or not pw: sys.exit('[story-web] STORY_WEB_USERS 条目名字/口令为空: %r' % part) if len(pw) < 8: sys.exit('[story-web] 用户 %s 的口令不足 8 位,拒绝启动' % name) if pw in users: sys.exit('[story-web] 用户 %s 与 %s 口令相同(口令即身份,必须唯一)' % (name, users[pw])) if name in names: sys.exit('[story-web] 用户名重复: %s' % name) users[pw] = name names.add(name) return users USERS = _load_users() db.init_db() db.purge_sessions(time.time()) app = FastAPI(title="Story Event Web Editor (M5)") def _now(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # ---------- 鉴权中间件 ---------- def _session_user(request): """cookie 里的随机 token → 会话用户名;过期/不存在/已被移出口令表 → None。""" tok = request.cookies.get(COOKIE) if not tok: return None user = db.get_session_user(tok, time.time()) if user is not None and user not in USERS.values(): return None # 从 STORY_WEB_USERS 删掉的人,旧 token 立即失效(按人吊销) return user @app.middleware("http") async def auth_guard(request: Request, call_next): path = request.url.path if NO_AUTH: # 本地免鉴权:全部放行,署名记为「本地」 request.state.user = "本地" return await call_next(request) # 放行登录、静态资源、根 if path.startswith("/api/") and path != "/api/login": user = _session_user(request) if user is None: return JSONResponse({"error": "未授权"}, status_code=401) request.state.user = user return await call_next(request) # ---------- 鉴权 ---------- _fail_count = 0 # 连续失败计数(全局节流;内部小工具,无需按 IP 细分) @app.post("/api/login") async def login(request: Request): global _fail_count body = await request.json() pw = str(body.get("password") or "") user = None for p, n in USERS.items(): if secrets.compare_digest(pw, p): # 常量时间比较 user = n if user is None: _fail_count += 1 await asyncio.sleep(min(_fail_count, 5)) # 爆破节流:连错越多等越久 return JSONResponse({"error": "口令错误"}, status_code=403) _fail_count = 0 token = secrets.token_urlsafe(32) db.create_session(token, user, time.time() + SESSION_DAYS * 86400) resp = JSONResponse({"ok": True, "user": user}) resp.set_cookie(COOKIE, token, httponly=True, samesite="lax", max_age=SESSION_DAYS * 86400) return resp @app.post("/api/logout") async def logout(request: Request): tok = request.cookies.get(COOKIE) if tok: db.delete_session(tok) resp = JSONResponse({"ok": True}) resp.delete_cookie(COOKIE) return resp # ---------- 词典 / 点位集(驱动前端下拉与试走) ---------- @app.get("/api/dictionary") async def dictionary(): with open(_DICT_PATH, encoding="utf-8") as f: return json.load(f) def _load_shot(name): """读 {name}.shot.json(含俯视底图覆盖的 map-local 范围);图与 sidecar 都在才算有效。""" try: meta = os.path.join(_SCENESHOTS_DIR, name + ".shot.json") png = os.path.join(_SCENESHOTS_DIR, name + ".png") if not (os.path.isfile(meta) and os.path.isfile(png)): return None with open(meta, encoding="utf-8") as f: m = json.load(f) b = m.get("bounds") if not (isinstance(b, list) and len(b) == 4): return None # 缓存击穿:附 png mtime,图更新后前端能拿到新图 return {"url": "/sceneshot/" + name + ".png?v=" + str(int(os.path.getmtime(png))), "bounds": b, "w": m.get("w"), "h": m.get("h")} except Exception: return None @app.get("/api/pointsets") async def pointsets(): out = {} if os.path.isdir(_POINTSETS_DIR): for fn in os.listdir(_POINTSETS_DIR): if fn.endswith(".points.json"): name = fn[: -len(".points.json")] try: with open(os.path.join(_POINTSETS_DIR, fn), encoding="utf-8") as f: ps = json.load(f) pts = ps.get("points", []) out[name] = { "mapId": ps.get("mapId", ""), "points": [p.get("name") for p in pts], # 名字数组:兼容现有表单下拉 # 含坐标的锚点:白模演出预览用(map-local pos[x,y,z] + rot) "anchors": [ { "name": p.get("name"), "pos": p.get("pos") or [0, 0, 0], "rot": p.get("rot", 0), "kind": p.get("kind", ""), "npc": p.get("npc", ""), } for p in pts ], } shot = _load_shot(name) if shot: out[name]["shot"] = shot # 有正交俯视底图 → 场景/点位页叠真实场景 except Exception as e: out[name] = {"error": str(e)} return out @app.get("/sceneshot/{name}.png") async def sceneshot(name: str): # 防目录穿越:只认纯文件名 if not name or "/" in name or "\\" in name or ".." in name: return JSONResponse({"error": "bad name"}, status_code=400) p = os.path.join(_SCENESHOTS_DIR, name + ".png") if not os.path.isfile(p): return JSONResponse({"error": "not found"}, status_code=404) return FileResponse(p, media_type="image/png") # ---------- 事件 CRUD ---------- @app.get("/api/events") async def events(status: str = "all"): return db.list_events(status) @app.get("/api/events/{group}") async def event_detail(group: str): d = db.get_event(group) if not d: return JSONResponse({"error": "不存在"}, status_code=404) return d @app.post("/api/import") async def import_events(request: Request): body = await request.json() by = request.state.user # 改动者=会话身份,不信前端自报 items = body.get("events", []) if isinstance(items, dict): # 容错:单个 IR items = [items] saved, errors = [], [] for ir in items: if not isinstance(ir, dict) or "id" not in ir: errors.append("缺少 id 字段的条目已跳过") continue db.upsert_event(ir, by, _now()) saved.append(ir["id"]) return {"saved": saved, "errors": errors} @app.put("/api/events/{group}") async def update_event(group: str, request: Request): body = await request.json() ir = body.get("ir") if not ir or ir.get("id") != group: return JSONResponse({"error": "ir.id 与 group 不一致"}, status_code=400) db.upsert_event(ir, request.state.user, _now(), notes=body.get("notes")) return {"ok": True, "updated_at": _now()} @app.post("/api/events/{group}/status") async def change_status(group: str, request: Request): body = await request.json() ok = db.set_status(group, body.get("status"), request.state.user, _now()) if not ok: return JSONResponse({"error": "事件不存在"}, status_code=404) return {"ok": True} # ---------- 校验 ---------- @app.post("/api/validate") async def validate(request: Request): body = await request.json() ir = body.get("ir") if not ir: return JSONResponse({"error": "缺少 ir"}, status_code=400) dic = ir_core.load_dictionary(_DICT_PATH) try: errs, warns = ir_core.validate(ir, dic, points_dir=_POINTSETS_DIR) except Exception as e: return {"errors": ["[校验异常] %s" % e], "warnings": []} return {"errors": errs, "warnings": warns} # ---------- 导出(编译所有 confirmed -> zip) ---------- @app.post("/api/export") async def export_zip(): dic = ir_core.load_dictionary(_DICT_PATH) confirmed = db.confirmed_events() if not confirmed: return JSONResponse({"error": "没有 confirmed 事件可导出"}, status_code=422) # 校验门:任一 confirmed 有 error 即整体拒绝。 # 同步做预编译探测——捕获 CompileError(含 P2 scene 导出 gate D3:含 scene 的事件暂不可导出), # 把编译失败也并入 report,避免 compile 抛异常变成 500。编译成功的结果缓存复用,不重复编译。 report = {} compiled = {} blocked = False for group, ir in confirmed: errs, warns = ir_core.validate(ir, dic, points_dir=_POINTSETS_DIR) if not errs: try: compiled[group] = ir_core.compile_ir(ir, dic) except ir_core.CompileError as e: errs = errs + ["[编译失败] %s" % e] report[group] = {"errors": errs, "warnings": warns} if errs: blocked = True if blocked: return JSONResponse({"error": "存在校验/编译失败的 confirmed 事件,已拒绝导出", "report": report}, status_code=422) buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z: for group, ir in confirmed: rows = compiled[group] z.writestr(group + ".events.json", json.dumps(rows, ensure_ascii=False, indent=2)) texts = ir_core.extract_texts(ir) tsv = ["# 简体中文(key,勿改)\t韩文(待译;繁体无需填,SGameText 自动转换)"] tsv += ["%s\t" % t for t in texts] z.writestr(group + ".i18n.tsv", "\n".join(tsv)) buf.seek(0) return StreamingResponse( buf, media_type="application/zip", headers={"Content-Disposition": 'attachment; filename="story_export.zip"'}) # ---------- 静态前端 ---------- def _stamp_static_refs(html): """给 index.html 里引用的本地 js/css 加 ?v=<文件mtime> 破浏览器缓存。 只给改过的文件 bump 版本号(mtime 变才变)→ 改前端不必手动改版本、也不会再出现 新 html 配旧缓存 js 的崩溃。外链(//)与已带 ?查询串的引用跳过。""" def repl(m): attr, ref = m.group(1), m.group(2) if ref.startswith("http") or ref.startswith("//"): return m.group(0) fp = os.path.join(_STATIC_DIR, ref) if os.path.isfile(fp): return '%s="%s?v=%d"' % (attr, ref, int(os.path.getmtime(fp))) return m.group(0) return re.sub(r'(src|href)="([^"?]+\.(?:js|css))"', repl, html) @app.get("/") async def index(): with open(os.path.join(_STATIC_DIR, "index.html"), encoding="utf-8") as f: html = f.read() return HTMLResponse(_stamp_static_refs(html)) app.mount("/", StaticFiles(directory=_STATIC_DIR), name="static")