Files
story-edit-web/web/db.py
邓雨鹏 90402c4a17 security: 每人一把口令(口令即身份) + 随机会话token + 无配置拒绝启动 + 爆破节流
- STORY_WEB_PASSWORD(默认story) 废弃 → STORY_WEB_USERS=名字1:口令1,名字2:口令2;
  未配置/口令<8位/口令或用户名重复 → 启动即退出,杜绝弱默认口令裸奔
- cookie 不再存口令原文:登录发 secrets.token_urlsafe(32) 随机token,
  会话存 SQLite sessions 表(30天);登出删token;从 USERS 移除某人=吊销其全部会话
- updated_by 改由服务端按会话身份填写,前端自报 by 不再可信;登录框去掉昵称字段
- 登录失败全局递增节流(最多sleep 5s),口令比较用 secrets.compare_digest
- Dockerfile/compose 移除一切口令默认值;compose 未设 STORY_WEB_USERS 直接报错
- 顺手修 playtest.js 走位/动画/out_ref 行未转义的存储型XSS(esc补齐)
2026-06-10 17:34:50 +08:00

147 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""M5 Web 编辑器的 SQLite 存储层。
表 eventsgroup(PK)/title/theme/status/ir_json/updated_at/updated_by/notes。
末次写入生效(设计接受),不做锁。
表 sessionstoken(PK)/user/expires_at——登录会话cookie 只存随机 token不存口令
"""
import json
import os
import sqlite3
# DB 路径:容器内用 STORY_DB_PATH 指向挂载卷(持久化);本地默认同目录。
_DB_PATH = os.environ.get("STORY_DB_PATH") or \
os.path.join(os.path.dirname(os.path.abspath(__file__)), "story_events.db")
STATUSES = ("pending", "confirmed", "discarded")
def _conn(path=None):
c = sqlite3.connect(path or _DB_PATH)
c.row_factory = sqlite3.Row
return c
def init_db(path=None):
d = os.path.dirname(path or _DB_PATH)
if d and not os.path.isdir(d):
os.makedirs(d, exist_ok=True)
with _conn(path) as c:
c.execute(
"""CREATE TABLE IF NOT EXISTS events (
"group" TEXT PRIMARY KEY,
title TEXT,
theme TEXT,
status TEXT NOT NULL DEFAULT 'pending',
ir_json TEXT NOT NULL,
updated_at TEXT,
updated_by TEXT,
notes TEXT
)"""
)
c.execute(
"""CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user TEXT NOT NULL,
expires_at REAL NOT NULL
)"""
)
# ---------- 会话 ----------
def create_session(token, user, expires_at, path=None):
with _conn(path) as c:
c.execute("INSERT OR REPLACE INTO sessions (token, user, expires_at) "
"VALUES (?,?,?)", (token, user, expires_at))
def get_session_user(token, now, path=None):
"""token 有效返回用户名;过期则顺手删除并返回 None。"""
with _conn(path) as c:
r = c.execute("SELECT user, expires_at FROM sessions WHERE token=?",
(token,)).fetchone()
if not r:
return None
if r["expires_at"] < now:
c.execute("DELETE FROM sessions WHERE token=?", (token,))
return None
return r["user"]
def delete_session(token, path=None):
with _conn(path) as c:
c.execute("DELETE FROM sessions WHERE token=?", (token,))
def purge_sessions(now, path=None):
with _conn(path) as c:
c.execute("DELETE FROM sessions WHERE expires_at < ?", (now,))
def list_events(status=None, path=None):
"""列表(不含 ir_json轻量"""
sql = ('SELECT "group", title, theme, status, updated_at, updated_by, notes '
"FROM events")
args = []
if status and status != "all":
sql += " WHERE status = ?"
args.append(status)
sql += " ORDER BY updated_at DESC"
with _conn(path) as c:
return [dict(r) for r in c.execute(sql, args).fetchall()]
def get_event(group, path=None):
with _conn(path) as c:
r = c.execute('SELECT * FROM events WHERE "group" = ?', (group,)).fetchone()
if not r:
return None
d = dict(r)
d["ir"] = json.loads(d.pop("ir_json"))
return d
def upsert_event(ir, by, now, notes=None, keep_status=True, path=None):
"""插入或更新。已存在时默认保留状态(仅刷新 ir/title/theme/元信息)。"""
group = ir["id"]
title = ir.get("title", "")
theme = ir.get("theme", "")
ir_str = json.dumps(ir, ensure_ascii=False)
with _conn(path) as c:
exists = c.execute('SELECT status FROM events WHERE "group" = ?',
(group,)).fetchone()
if exists:
c.execute(
'UPDATE events SET title=?, theme=?, ir_json=?, updated_at=?, '
'updated_by=?, notes=COALESCE(?, notes) WHERE "group"=?',
(title, theme, ir_str, now, by, notes, group),
)
else:
c.execute(
'INSERT INTO events ("group", title, theme, status, ir_json, '
"updated_at, updated_by, notes) VALUES (?,?,?,?,?,?,?,?)",
(group, title, theme, "pending", ir_str, now, by, notes or ""),
)
return group
def set_status(group, status, by, now, path=None):
if status not in STATUSES:
raise ValueError("非法状态: %r" % status)
with _conn(path) as c:
cur = c.execute(
'UPDATE events SET status=?, updated_at=?, updated_by=? WHERE "group"=?',
(status, now, by, group),
)
return cur.rowcount > 0
def confirmed_events(path=None):
"""所有 confirmed 事件的 (group, ir) 列表,供导出编译。"""
with _conn(path) as c:
rows = c.execute(
'SELECT "group", ir_json FROM events WHERE status=? ORDER BY "group"',
("confirmed",),
).fetchall()
return [(r["group"], json.loads(r["ir_json"])) for r in rows]