# -*- coding: utf-8 -*- """M5 协作 Web 编辑器后端(FastAPI 单文件)。 少数人,每人一把专属口令(口令即身份);事件存 SQLite;校验/编译走 ir_core(与 CLI 同口径)。 起服务: pip install -r requirements.txt set STORY_WEB_USERS=bia:口令A,ljl:口令B (未配置则拒绝启动) uvicorn app:app --host 0.0.0.0 --port 8787 浏览器打开 http://:8787 。 """ import asyncio import datetime import io import json import os import secrets import sys import time import zipfile from fastapi import FastAPI, Request, Response from fastapi.responses import JSONResponse, FileResponse, StreamingResponse from fastapi.staticfiles import StaticFiles _HERE = os.path.dirname(os.path.abspath(__file__)) _AUTHORING = os.path.dirname(_HERE) # tools/event_authoring _PROJ = os.path.dirname(os.path.dirname(_AUTHORING)) # 项目根 sys.path.insert(0, _AUTHORING) import ir_core # noqa: E402 (tools/event_authoring/ir_core) import db # noqa: E402 _DICT_PATH = os.path.join(_AUTHORING, "ir_dictionary.json") # 点位集目录:容器内用 STORY_POINTSETS_DIR 指向挂载卷;本地默认指向项目 Assets。 _POINTSETS_DIR = os.environ.get("STORY_POINTSETS_DIR") or \ os.path.join(_PROJ, "Assets", "StreamingAssets", "Story", "PointSets") _STATIC_DIR = os.path.join(_HERE, "static") COOKIE = "story_auth" SESSION_DAYS = 30 # 免鉴权开关:仅本地开发用。设 STORY_WEB_NO_AUTH=1 时跳过登录(不要在线上设)。 # 线上不设此变量 → 行为不变,照常要口令。 NO_AUTH = (os.environ.get("STORY_WEB_NO_AUTH") or "").strip().lower() in ("1", "true", "yes", "on") def _load_users(): """解析 STORY_WEB_USERS="名字1:口令1,名字2:口令2"。返回 {口令: 名字}。 未配置/格式错/口令过短/口令重复 → 直接拒绝启动(宁可起不来,不可弱口令裸奔)。 口令即身份:登录只输口令,服务端按口令认人,updated_by 由服务端填写。 """ if NO_AUTH: return {} # 免鉴权模式(仅本地):无需口令,不校验 STORY_WEB_USERS raw = (os.environ.get("STORY_WEB_USERS") or "").strip() if not raw: sys.exit('[story-web] 未配置 STORY_WEB_USERS,拒绝启动。' '格式: STORY_WEB_USERS="bia:口令A,ljl:口令B"(每人口令≥8位且互不相同;' '旧的 STORY_WEB_PASSWORD 已废弃)') users = {} names = set() for part in raw.split(","): part = part.strip() if not part: continue if ":" not in part: sys.exit('[story-web] STORY_WEB_USERS 条目格式错误(应为 名字:口令): %r' % part) name, pw = part.split(":", 1) name, pw = name.strip(), pw.strip() if not name or not pw: sys.exit('[story-web] STORY_WEB_USERS 条目名字/口令为空: %r' % part) if len(pw) < 8: sys.exit('[story-web] 用户 %s 的口令不足 8 位,拒绝启动' % name) if pw in users: sys.exit('[story-web] 用户 %s 与 %s 口令相同(口令即身份,必须唯一)' % (name, users[pw])) if name in names: sys.exit('[story-web] 用户名重复: %s' % name) users[pw] = name names.add(name) return users USERS = _load_users() db.init_db() db.purge_sessions(time.time()) app = FastAPI(title="Story Event Web Editor (M5)") def _now(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # ---------- 鉴权中间件 ---------- def _session_user(request): """cookie 里的随机 token → 会话用户名;过期/不存在/已被移出口令表 → None。""" tok = request.cookies.get(COOKIE) if not tok: return None user = db.get_session_user(tok, time.time()) if user is not None and user not in USERS.values(): return None # 从 STORY_WEB_USERS 删掉的人,旧 token 立即失效(按人吊销) return user @app.middleware("http") async def auth_guard(request: Request, call_next): path = request.url.path if NO_AUTH: # 本地免鉴权:全部放行,署名记为「本地」 request.state.user = "本地" return await call_next(request) # 放行登录、静态资源、根 if path.startswith("/api/") and path != "/api/login": user = _session_user(request) if user is None: return JSONResponse({"error": "未授权"}, status_code=401) request.state.user = user return await call_next(request) # ---------- 鉴权 ---------- _fail_count = 0 # 连续失败计数(全局节流;内部小工具,无需按 IP 细分) @app.post("/api/login") async def login(request: Request): global _fail_count body = await request.json() pw = str(body.get("password") or "") user = None for p, n in USERS.items(): if secrets.compare_digest(pw, p): # 常量时间比较 user = n if user is None: _fail_count += 1 await asyncio.sleep(min(_fail_count, 5)) # 爆破节流:连错越多等越久 return JSONResponse({"error": "口令错误"}, status_code=403) _fail_count = 0 token = secrets.token_urlsafe(32) db.create_session(token, user, time.time() + SESSION_DAYS * 86400) resp = JSONResponse({"ok": True, "user": user}) resp.set_cookie(COOKIE, token, httponly=True, samesite="lax", max_age=SESSION_DAYS * 86400) return resp @app.post("/api/logout") async def logout(request: Request): tok = request.cookies.get(COOKIE) if tok: db.delete_session(tok) resp = JSONResponse({"ok": True}) resp.delete_cookie(COOKIE) return resp # ---------- 词典 / 点位集(驱动前端下拉与试走) ---------- @app.get("/api/dictionary") async def dictionary(): with open(_DICT_PATH, encoding="utf-8") as f: return json.load(f) @app.get("/api/pointsets") async def pointsets(): out = {} if os.path.isdir(_POINTSETS_DIR): for fn in os.listdir(_POINTSETS_DIR): if fn.endswith(".points.json"): name = fn[: -len(".points.json")] try: with open(os.path.join(_POINTSETS_DIR, fn), encoding="utf-8") as f: ps = json.load(f) pts = ps.get("points", []) out[name] = { "mapId": ps.get("mapId", ""), "points": [p.get("name") for p in pts], # 名字数组:兼容现有表单下拉 # 含坐标的锚点:白模演出预览用(map-local pos[x,y,z] + rot) "anchors": [ { "name": p.get("name"), "pos": p.get("pos") or [0, 0, 0], "rot": p.get("rot", 0), "kind": p.get("kind", ""), "npc": p.get("npc", ""), } for p in pts ], } except Exception as e: out[name] = {"error": str(e)} return out # ---------- 事件 CRUD ---------- @app.get("/api/events") async def events(status: str = "all"): return db.list_events(status) @app.get("/api/events/{group}") async def event_detail(group: str): d = db.get_event(group) if not d: return JSONResponse({"error": "不存在"}, status_code=404) return d @app.post("/api/import") async def import_events(request: Request): body = await request.json() by = request.state.user # 改动者=会话身份,不信前端自报 items = body.get("events", []) if isinstance(items, dict): # 容错:单个 IR items = [items] saved, errors = [], [] for ir in items: if not isinstance(ir, dict) or "id" not in ir: errors.append("缺少 id 字段的条目已跳过") continue db.upsert_event(ir, by, _now()) saved.append(ir["id"]) return {"saved": saved, "errors": errors} @app.put("/api/events/{group}") async def update_event(group: str, request: Request): body = await request.json() ir = body.get("ir") if not ir or ir.get("id") != group: return JSONResponse({"error": "ir.id 与 group 不一致"}, status_code=400) db.upsert_event(ir, request.state.user, _now(), notes=body.get("notes")) return {"ok": True, "updated_at": _now()} @app.post("/api/events/{group}/status") async def change_status(group: str, request: Request): body = await request.json() ok = db.set_status(group, body.get("status"), request.state.user, _now()) if not ok: return JSONResponse({"error": "事件不存在"}, status_code=404) return {"ok": True} # ---------- 校验 ---------- @app.post("/api/validate") async def validate(request: Request): body = await request.json() ir = body.get("ir") if not ir: return JSONResponse({"error": "缺少 ir"}, status_code=400) dic = ir_core.load_dictionary(_DICT_PATH) try: errs, warns = ir_core.validate(ir, dic, points_dir=_POINTSETS_DIR) except Exception as e: return {"errors": ["[校验异常] %s" % e], "warnings": []} return {"errors": errs, "warnings": warns} # ---------- 导出(编译所有 confirmed -> zip) ---------- @app.post("/api/export") async def export_zip(): dic = ir_core.load_dictionary(_DICT_PATH) confirmed = db.confirmed_events() if not confirmed: return JSONResponse({"error": "没有 confirmed 事件可导出"}, status_code=422) # 校验门:任一 confirmed 有 error 即整体拒绝 report = {} blocked = False for group, ir in confirmed: errs, warns = ir_core.validate(ir, dic, points_dir=_POINTSETS_DIR) report[group] = {"errors": errs, "warnings": warns} if errs: blocked = True if blocked: return JSONResponse({"error": "存在校验失败的 confirmed 事件,已拒绝导出", "report": report}, status_code=422) buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z: for group, ir in confirmed: rows = ir_core.compile_ir(ir, dic) z.writestr(group + ".events.json", json.dumps(rows, ensure_ascii=False, indent=2)) texts = ir_core.extract_texts(ir) tsv = ["# 简体中文(key,勿改)\t韩文(待译;繁体无需填,SGameText 自动转换)"] tsv += ["%s\t" % t for t in texts] z.writestr(group + ".i18n.tsv", "\n".join(tsv)) buf.seek(0) return StreamingResponse( buf, media_type="application/zip", headers={"Content-Disposition": 'attachment; filename="story_export.zip"'}) # ---------- 静态前端 ---------- @app.get("/") async def index(): return FileResponse(os.path.join(_STATIC_DIR, "index.html")) app.mount("/", StaticFiles(directory=_STATIC_DIR), name="static")