Files
story-edit-web/web/app.py
邓雨鹏 676df30c67 @
feat(web): 演出配置 Timeline 加「场景底图」选择器(venue 特写)

- 后端 /api/sceneshots:列 SceneShots 全部俯视底图(venue 特写) name->{url,bounds}
- timeline.js:底图优先级 ir.stage.backdrop(venue) > 点位集默认 shot;
  顶栏加底图下拉 renderMapInfo + applyBackdrop(换底+改投影范围+重画+回调)
- app.js:拉 /api/sceneshots;performSelect 传入;saveBackdrop 写 ir.stage.backdrop 并 PUT
- venue 特写与点位集同 map-local → 换底图后锚点自动落对位(无头实拍擂台验证)
- ir.stage.backdrop 是编辑器元数据:validate 不读、compile 不碰
@
2026-06-15 12:01:14 +08:00

381 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""M5 协作 Web 编辑器后端FastAPI 单文件)。
少数人,每人一把专属口令(口令即身份);事件存 SQLite校验/编译走 ir_core与 CLI 同口径)。
起服务:
pip install -r requirements.txt
set STORY_WEB_USERS=bia:口令A,ljl:口令B (未配置则拒绝启动)
uvicorn app:app --host 0.0.0.0 --port 8787
浏览器打开 http://<host>:8787 。
"""
import asyncio
import datetime
import io
import json
import os
import re
import secrets
import sys
import time
import zipfile
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse, FileResponse, StreamingResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
_HERE = os.path.dirname(os.path.abspath(__file__))
_AUTHORING = os.path.dirname(_HERE) # tools/event_authoring
_PROJ = os.path.dirname(os.path.dirname(_AUTHORING)) # 项目根
sys.path.insert(0, _AUTHORING)
import ir_core # noqa: E402 tools/event_authoring/ir_core
import db # noqa: E402
_DICT_PATH = os.path.join(_AUTHORING, "ir_dictionary.json")
# 点位集目录:容器内用 STORY_POINTSETS_DIR 指向挂载卷;本地默认指向项目 Assets。
_POINTSETS_DIR = os.environ.get("STORY_POINTSETS_DIR") or \
os.path.join(_PROJ, "Assets", "StreamingAssets", "Story", "PointSets")
# 场景俯视底图目录Unity「剧情场景俯视抓拍」产出 {name}.png + {name}.shot.json含覆盖的 map-local 范围)。
_SCENESHOTS_DIR = os.environ.get("STORY_SCENESHOTS_DIR") or \
os.path.join(_PROJ, "Assets", "StreamingAssets", "Story", "SceneShots")
_STATIC_DIR = os.path.join(_HERE, "static")
COOKIE = "story_auth"
SESSION_DAYS = 30
# 免鉴权开关:仅本地开发用。设 STORY_WEB_NO_AUTH=1 时跳过登录(不要在线上设)。
# 线上不设此变量 → 行为不变,照常要口令。
NO_AUTH = (os.environ.get("STORY_WEB_NO_AUTH") or "").strip().lower() in ("1", "true", "yes", "on")
def _load_users():
"""解析 STORY_WEB_USERS="名字1:口令1,名字2:口令2"。返回 {口令: 名字}。
未配置/格式错/口令过短/口令重复 → 直接拒绝启动(宁可起不来,不可弱口令裸奔)。
口令即身份登录只输口令服务端按口令认人updated_by 由服务端填写。
"""
if NO_AUTH:
return {} # 免鉴权模式(仅本地):无需口令,不校验 STORY_WEB_USERS
raw = (os.environ.get("STORY_WEB_USERS") or "").strip()
if not raw:
sys.exit('[story-web] 未配置 STORY_WEB_USERS拒绝启动。'
'格式: STORY_WEB_USERS="bia:口令A,ljl:口令B"每人口令≥8位且互不相同'
'旧的 STORY_WEB_PASSWORD 已废弃)')
users = {}
names = set()
for part in raw.split(","):
part = part.strip()
if not part:
continue
if ":" not in part:
sys.exit('[story-web] STORY_WEB_USERS 条目格式错误(应为 名字:口令): %r' % part)
name, pw = part.split(":", 1)
name, pw = name.strip(), pw.strip()
if not name or not pw:
sys.exit('[story-web] STORY_WEB_USERS 条目名字/口令为空: %r' % part)
if len(pw) < 8:
sys.exit('[story-web] 用户 %s 的口令不足 8 位,拒绝启动' % name)
if pw in users:
sys.exit('[story-web] 用户 %s%s 口令相同(口令即身份,必须唯一)'
% (name, users[pw]))
if name in names:
sys.exit('[story-web] 用户名重复: %s' % name)
users[pw] = name
names.add(name)
return users
USERS = _load_users()
db.init_db()
db.purge_sessions(time.time())
app = FastAPI(title="Story Event Web Editor (M5)")
def _now():
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# ---------- 鉴权中间件 ----------
def _session_user(request):
"""cookie 里的随机 token → 会话用户名;过期/不存在/已被移出口令表 → None。"""
tok = request.cookies.get(COOKIE)
if not tok:
return None
user = db.get_session_user(tok, time.time())
if user is not None and user not in USERS.values():
return None # 从 STORY_WEB_USERS 删掉的人,旧 token 立即失效(按人吊销)
return user
@app.middleware("http")
async def auth_guard(request: Request, call_next):
path = request.url.path
if NO_AUTH: # 本地免鉴权:全部放行,署名记为「本地」
request.state.user = "本地"
return await call_next(request)
# 放行登录、静态资源、根
if path.startswith("/api/") and path != "/api/login":
user = _session_user(request)
if user is None:
return JSONResponse({"error": "未授权"}, status_code=401)
request.state.user = user
return await call_next(request)
# ---------- 鉴权 ----------
_fail_count = 0 # 连续失败计数(全局节流;内部小工具,无需按 IP 细分)
@app.post("/api/login")
async def login(request: Request):
global _fail_count
body = await request.json()
pw = str(body.get("password") or "")
user = None
for p, n in USERS.items():
if secrets.compare_digest(pw, p): # 常量时间比较
user = n
if user is None:
_fail_count += 1
await asyncio.sleep(min(_fail_count, 5)) # 爆破节流:连错越多等越久
return JSONResponse({"error": "口令错误"}, status_code=403)
_fail_count = 0
token = secrets.token_urlsafe(32)
db.create_session(token, user, time.time() + SESSION_DAYS * 86400)
resp = JSONResponse({"ok": True, "user": user})
resp.set_cookie(COOKIE, token, httponly=True, samesite="lax",
max_age=SESSION_DAYS * 86400)
return resp
@app.post("/api/logout")
async def logout(request: Request):
tok = request.cookies.get(COOKIE)
if tok:
db.delete_session(tok)
resp = JSONResponse({"ok": True})
resp.delete_cookie(COOKIE)
return resp
# ---------- 词典 / 点位集(驱动前端下拉与试走) ----------
@app.get("/api/dictionary")
async def dictionary():
with open(_DICT_PATH, encoding="utf-8") as f:
return json.load(f)
def _load_shot(name):
"""{name}.shot.json含俯视底图覆盖的 map-local 范围);图与 sidecar 都在才算有效。"""
try:
meta = os.path.join(_SCENESHOTS_DIR, name + ".shot.json")
png = os.path.join(_SCENESHOTS_DIR, name + ".png")
if not (os.path.isfile(meta) and os.path.isfile(png)):
return None
with open(meta, encoding="utf-8") as f:
m = json.load(f)
b = m.get("bounds")
if not (isinstance(b, list) and len(b) == 4):
return None
# 缓存击穿:附 png mtime图更新后前端能拿到新图
return {"url": "/sceneshot/" + name + ".png?v=" + str(int(os.path.getmtime(png))),
"bounds": b, "w": m.get("w"), "h": m.get("h")}
except Exception:
return None
@app.get("/api/pointsets")
async def pointsets():
out = {}
if os.path.isdir(_POINTSETS_DIR):
for fn in os.listdir(_POINTSETS_DIR):
if fn.endswith(".points.json"):
name = fn[: -len(".points.json")]
try:
with open(os.path.join(_POINTSETS_DIR, fn), encoding="utf-8") as f:
ps = json.load(f)
pts = ps.get("points", [])
out[name] = {
"mapId": ps.get("mapId", ""),
"points": [p.get("name") for p in pts], # 名字数组:兼容现有表单下拉
# 含坐标的锚点白模演出预览用map-local pos[x,y,z] + rot
"anchors": [
{
"name": p.get("name"),
"pos": p.get("pos") or [0, 0, 0],
"rot": p.get("rot", 0),
"kind": p.get("kind", ""),
"npc": p.get("npc", ""),
}
for p in pts
],
}
shot = _load_shot(name)
if shot:
out[name]["shot"] = shot # 有正交俯视底图 → 场景/点位页叠真实场景
except Exception as e:
out[name] = {"error": str(e)}
return out
@app.get("/api/sceneshots")
async def sceneshots():
"""列出 SceneShots 目录里所有可用俯视底图venue 特写等name -> {url,bounds,w,h}。
供演出配置「场景底图」选择器:事件可挑一张当 Timeline 底图(写进 ir.stage.backdrop"""
out = {}
if os.path.isdir(_SCENESHOTS_DIR):
for fn in os.listdir(_SCENESHOTS_DIR):
if fn.endswith(".shot.json"):
name = fn[: -len(".shot.json")]
shot = _load_shot(name)
if shot:
out[name] = shot
return out
@app.get("/sceneshot/{name}.png")
async def sceneshot(name: str):
# 防目录穿越:只认纯文件名
if not name or "/" in name or "\\" in name or ".." in name:
return JSONResponse({"error": "bad name"}, status_code=400)
p = os.path.join(_SCENESHOTS_DIR, name + ".png")
if not os.path.isfile(p):
return JSONResponse({"error": "not found"}, status_code=404)
return FileResponse(p, media_type="image/png")
# ---------- 事件 CRUD ----------
@app.get("/api/events")
async def events(status: str = "all"):
return db.list_events(status)
@app.get("/api/events/{group}")
async def event_detail(group: str):
d = db.get_event(group)
if not d:
return JSONResponse({"error": "不存在"}, status_code=404)
return d
@app.post("/api/import")
async def import_events(request: Request):
body = await request.json()
by = request.state.user # 改动者=会话身份,不信前端自报
items = body.get("events", [])
if isinstance(items, dict): # 容错:单个 IR
items = [items]
saved, errors = [], []
for ir in items:
if not isinstance(ir, dict) or "id" not in ir:
errors.append("缺少 id 字段的条目已跳过")
continue
db.upsert_event(ir, by, _now())
saved.append(ir["id"])
return {"saved": saved, "errors": errors}
@app.put("/api/events/{group}")
async def update_event(group: str, request: Request):
body = await request.json()
ir = body.get("ir")
if not ir or ir.get("id") != group:
return JSONResponse({"error": "ir.id 与 group 不一致"}, status_code=400)
db.upsert_event(ir, request.state.user, _now(), notes=body.get("notes"))
return {"ok": True, "updated_at": _now()}
@app.post("/api/events/{group}/status")
async def change_status(group: str, request: Request):
body = await request.json()
ok = db.set_status(group, body.get("status"), request.state.user, _now())
if not ok:
return JSONResponse({"error": "事件不存在"}, status_code=404)
return {"ok": True}
# ---------- 校验 ----------
@app.post("/api/validate")
async def validate(request: Request):
body = await request.json()
ir = body.get("ir")
if not ir:
return JSONResponse({"error": "缺少 ir"}, status_code=400)
dic = ir_core.load_dictionary(_DICT_PATH)
try:
errs, warns = ir_core.validate(ir, dic, points_dir=_POINTSETS_DIR)
except Exception as e:
return {"errors": ["[校验异常] %s" % e], "warnings": []}
return {"errors": errs, "warnings": warns}
# ---------- 导出(编译所有 confirmed -> zip ----------
@app.post("/api/export")
async def export_zip():
dic = ir_core.load_dictionary(_DICT_PATH)
confirmed = db.confirmed_events()
if not confirmed:
return JSONResponse({"error": "没有 confirmed 事件可导出"}, status_code=422)
# 校验门:任一 confirmed 有 error 即整体拒绝。
# 同步做预编译探测——捕获 CompileError含 P2 scene 导出 gate D3含 scene 的事件暂不可导出),
# 把编译失败也并入 report避免 compile 抛异常变成 500。编译成功的结果缓存复用不重复编译。
report = {}
compiled = {}
blocked = False
for group, ir in confirmed:
errs, warns = ir_core.validate(ir, dic, points_dir=_POINTSETS_DIR)
if not errs:
try:
compiled[group] = ir_core.compile_ir(ir, dic)
except ir_core.CompileError as e:
errs = errs + ["[编译失败] %s" % e]
report[group] = {"errors": errs, "warnings": warns}
if errs:
blocked = True
if blocked:
return JSONResponse({"error": "存在校验/编译失败的 confirmed 事件,已拒绝导出",
"report": report}, status_code=422)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z:
for group, ir in confirmed:
rows = compiled[group]
z.writestr(group + ".events.json",
json.dumps(rows, ensure_ascii=False, indent=2))
texts = ir_core.extract_texts(ir)
tsv = ["# 简体中文(key,勿改)\t韩文(待译繁体无需填SGameText 自动转换)"]
tsv += ["%s\t" % t for t in texts]
z.writestr(group + ".i18n.tsv", "\n".join(tsv))
buf.seek(0)
return StreamingResponse(
buf, media_type="application/zip",
headers={"Content-Disposition": 'attachment; filename="story_export.zip"'})
# ---------- 静态前端 ----------
def _stamp_static_refs(html):
"""给 index.html 里引用的本地 js/css 加 ?v=<文件mtime> 破浏览器缓存。
只给改过的文件 bump 版本号mtime 变才变)→ 改前端不必手动改版本、也不会再出现
新 html 配旧缓存 js 的崩溃。外链(//)与已带 ?查询串的引用跳过。"""
def repl(m):
attr, ref = m.group(1), m.group(2)
if ref.startswith("http") or ref.startswith("//"):
return m.group(0)
fp = os.path.join(_STATIC_DIR, ref)
if os.path.isfile(fp):
return '%s="%s?v=%d"' % (attr, ref, int(os.path.getmtime(fp)))
return m.group(0)
return re.sub(r'(src|href)="([^"?]+\.(?:js|css))"', repl, html)
@app.get("/")
async def index():
with open(os.path.join(_STATIC_DIR, "index.html"), encoding="utf-8") as f:
html = f.read()
return HTMLResponse(_stamp_static_refs(html))
app.mount("/", StaticFiles(directory=_STATIC_DIR), name="static")