security: 每人一把口令(口令即身份) + 随机会话token + 无配置拒绝启动 + 爆破节流

- STORY_WEB_PASSWORD(默认story) 废弃 → STORY_WEB_USERS=名字1:口令1,名字2:口令2;
  未配置/口令<8位/口令或用户名重复 → 启动即退出,杜绝弱默认口令裸奔
- cookie 不再存口令原文:登录发 secrets.token_urlsafe(32) 随机token,
  会话存 SQLite sessions 表(30天);登出删token;从 USERS 移除某人=吊销其全部会话
- updated_by 改由服务端按会话身份填写,前端自报 by 不再可信;登录框去掉昵称字段
- 登录失败全局递增节流(最多sleep 5s),口令比较用 secrets.compare_digest
- Dockerfile/compose 移除一切口令默认值;compose 未设 STORY_WEB_USERS 直接报错
- 顺手修 playtest.js 走位/动画/out_ref 行未转义的存储型XSS(esc补齐)
This commit is contained in:
2026-06-10 17:34:50 +08:00
parent 0f42fa13f1
commit 90402c4a17
9 changed files with 152 additions and 34 deletions

View File

@ -1,18 +1,21 @@
# -*- coding: utf-8 -*-
"""M5 协作 Web 编辑器后端FastAPI 单文件)。
少数人 + 共享口令;事件存 SQLite校验/编译走 ir_core与 CLI 同口径)。
少数人,每人一把专属口令(口令即身份);事件存 SQLite校验/编译走 ir_core与 CLI 同口径)。
起服务:
pip install -r requirements.txt
set STORY_WEB_PASSWORD=your-pass (默认 story)
set STORY_WEB_USERS=bia:口令A,ljl:口令B (未配置则拒绝启动)
uvicorn app:app --host 0.0.0.0 --port 8787
浏览器打开 http://<host>:8787 。
"""
import asyncio
import datetime
import io
import json
import os
import secrets
import sys
import time
import zipfile
from fastapi import FastAPI, Request, Response
@ -33,10 +36,49 @@ _POINTSETS_DIR = os.environ.get("STORY_POINTSETS_DIR") or \
os.path.join(_PROJ, "Assets", "StreamingAssets", "Story", "PointSets")
_STATIC_DIR = os.path.join(_HERE, "static")
PASSWORD = os.environ.get("STORY_WEB_PASSWORD", "story")
COOKIE = "story_auth"
SESSION_DAYS = 30
def _load_users():
"""解析 STORY_WEB_USERS="名字1:口令1,名字2:口令2"。返回 {口令: 名字}。
未配置/格式错/口令过短/口令重复 → 直接拒绝启动(宁可起不来,不可弱口令裸奔)。
口令即身份登录只输口令服务端按口令认人updated_by 由服务端填写。
"""
raw = (os.environ.get("STORY_WEB_USERS") or "").strip()
if not raw:
sys.exit('[story-web] 未配置 STORY_WEB_USERS拒绝启动。'
'格式: STORY_WEB_USERS="bia:口令A,ljl:口令B"每人口令≥8位且互不相同'
'旧的 STORY_WEB_PASSWORD 已废弃)')
users = {}
names = set()
for part in raw.split(","):
part = part.strip()
if not part:
continue
if ":" not in part:
sys.exit('[story-web] STORY_WEB_USERS 条目格式错误(应为 名字:口令): %r' % part)
name, pw = part.split(":", 1)
name, pw = name.strip(), pw.strip()
if not name or not pw:
sys.exit('[story-web] STORY_WEB_USERS 条目名字/口令为空: %r' % part)
if len(pw) < 8:
sys.exit('[story-web] 用户 %s 的口令不足 8 位,拒绝启动' % name)
if pw in users:
sys.exit('[story-web] 用户 %s%s 口令相同(口令即身份,必须唯一)'
% (name, users[pw]))
if name in names:
sys.exit('[story-web] 用户名重复: %s' % name)
users[pw] = name
names.add(name)
return users
USERS = _load_users()
db.init_db()
db.purge_sessions(time.time())
app = FastAPI(title="Story Event Web Editor (M5)")
@ -45,29 +87,60 @@ def _now():
# ---------- 鉴权中间件 ----------
def _session_user(request):
"""cookie 里的随机 token → 会话用户名;过期/不存在/已被移出口令表 → None。"""
tok = request.cookies.get(COOKIE)
if not tok:
return None
user = db.get_session_user(tok, time.time())
if user is not None and user not in USERS.values():
return None # 从 STORY_WEB_USERS 删掉的人,旧 token 立即失效(按人吊销)
return user
@app.middleware("http")
async def auth_guard(request: Request, call_next):
path = request.url.path
# 放行登录、静态资源、根
if path.startswith("/api/") and path != "/api/login":
if request.cookies.get(COOKIE) != PASSWORD:
user = _session_user(request)
if user is None:
return JSONResponse({"error": "未授权"}, status_code=401)
request.state.user = user
return await call_next(request)
# ---------- 鉴权 ----------
_fail_count = 0 # 连续失败计数(全局节流;内部小工具,无需按 IP 细分)
@app.post("/api/login")
async def login(request: Request):
global _fail_count
body = await request.json()
if body.get("password") != PASSWORD:
pw = str(body.get("password") or "")
user = None
for p, n in USERS.items():
if secrets.compare_digest(pw, p): # 常量时间比较
user = n
if user is None:
_fail_count += 1
await asyncio.sleep(min(_fail_count, 5)) # 爆破节流:连错越多等越久
return JSONResponse({"error": "口令错误"}, status_code=403)
resp = JSONResponse({"ok": True})
resp.set_cookie(COOKIE, PASSWORD, httponly=True, samesite="lax", max_age=30 * 86400)
_fail_count = 0
token = secrets.token_urlsafe(32)
db.create_session(token, user, time.time() + SESSION_DAYS * 86400)
resp = JSONResponse({"ok": True, "user": user})
resp.set_cookie(COOKIE, token, httponly=True, samesite="lax",
max_age=SESSION_DAYS * 86400)
return resp
@app.post("/api/logout")
async def logout():
async def logout(request: Request):
tok = request.cookies.get(COOKIE)
if tok:
db.delete_session(tok)
resp = JSONResponse({"ok": True})
resp.delete_cookie(COOKIE)
return resp
@ -116,7 +189,7 @@ async def event_detail(group: str):
@app.post("/api/import")
async def import_events(request: Request):
body = await request.json()
by = body.get("by", "匿名")
by = request.state.user # 改动者=会话身份,不信前端自报
items = body.get("events", [])
if isinstance(items, dict): # 容错:单个 IR
items = [items]
@ -136,14 +209,14 @@ async def update_event(group: str, request: Request):
ir = body.get("ir")
if not ir or ir.get("id") != group:
return JSONResponse({"error": "ir.id 与 group 不一致"}, status_code=400)
db.upsert_event(ir, body.get("by", "匿名"), _now(), notes=body.get("notes"))
db.upsert_event(ir, request.state.user, _now(), notes=body.get("notes"))
return {"ok": True, "updated_at": _now()}
@app.post("/api/events/{group}/status")
async def change_status(group: str, request: Request):
body = await request.json()
ok = db.set_status(group, body.get("status"), body.get("by", "匿名"), _now())
ok = db.set_status(group, body.get("status"), request.state.user, _now())
if not ok:
return JSONResponse({"error": "事件不存在"}, status_code=404)
return {"ok": True}