This commit is contained in:
teasiu
2026-04-19 07:27:55 +08:00
parent 734c00f494
commit fe17ca686d
10 changed files with 1572 additions and 1624 deletions

54
build-docker.sh Normal file
View File

@@ -0,0 +1,54 @@
#!/bin/bash
# build-docker.sh - 用 Docker 为三种架构分别打包
# 前提:本机安装了 Docker + QEMU用于跨架构
# 安装 QEMUdocker run --privileged --rm tonistiigi/binfmt --install all
set -euo pipefail
PLATFORMS=("linux/amd64" "linux/arm64" "linux/arm/v7")
ARCH_NAMES=("x86_64" "arm64" "armhf")
echo "开始多架构打包..."
for i in "${!PLATFORMS[@]}"; do
PLATFORM="${PLATFORMS[$i]}"
ARCH_NAME="${ARCH_NAMES[$i]}"
OUTPUT="releases/nas-media-player-${ARCH_NAME}"
echo ""
echo "========================================="
echo " 打包架构: ${PLATFORM}${ARCH_NAME}"
echo "========================================="
docker run --rm \
--platform "${PLATFORM}" \
-v "$(pwd):/workspace" \
-w /workspace \
python:3.11-slim \
bash -c "
set -e
echo '--- 安装系统依赖 ---'
apt-get update -qq && apt-get install -y -q binutils
echo '--- 安装 Python 依赖 ---'
pip install --upgrade pip -q
pip install pyinstaller fastapi 'uvicorn[standard]' aiofiles \
pydantic python-multipart httptools -q
echo '--- 执行打包 ---'
pyinstaller nas-media-player.spec --clean --noconfirm
echo '--- 复制产物 ---'
mkdir -p releases
cp dist/nas-media-player releases/nas-media-player-${ARCH_NAME}
chmod +x releases/nas-media-player-${ARCH_NAME}
echo '产物大小:' \$(du -sh releases/nas-media-player-${ARCH_NAME})
"
echo "${ARCH_NAME} 打包完成 → ${OUTPUT}"
done
echo ""
echo "========================================="
echo "🎉 所有架构打包完成!"
ls -lh releases/
echo "========================================="

69
build.sh Normal file
View File

@@ -0,0 +1,69 @@
#!/bin/bash
# build.sh - 一键打包脚本
# 在对应架构的机器上执行(或用 Docker 交叉编译,见下方说明)
set -euo pipefail
ARCH=$(uname -m)
OUTPUT_NAME="nas-media-player-${ARCH}"
RELEASES_DIR="./releases"
echo "========================================"
echo " NAS Media Player 打包脚本"
echo " 当前架构: ${ARCH}"
echo "========================================"
# 1. 检查 Python 版本(建议 3.9+
python3 --version
# 2. 创建并激活虚拟环境(隔离,避免污染系统)
echo "[1/5] 创建虚拟环境..."
python3 -m venv .venv-build
source .venv-build/bin/activate
# 3. 安装依赖
echo "[2/5] 安装依赖..."
pip install --upgrade pip -q
pip install \
pyinstaller \
fastapi \
uvicorn[standard] \
aiofiles \
pydantic \
python-multipart \
httptools \
-q
# 4. 执行打包
echo "[3/5] 开始打包 (PyInstaller)..."
pyinstaller nas-media-player.spec \
--clean \
--noconfirm
# 5. 检查产物
BINARY="./dist/nas-media-player"
if [ ! -f "${BINARY}" ]; then
echo "❌ 打包失败!未找到 ${BINARY}"
exit 1
fi
# 6. 重命名并归档
mkdir -p "${RELEASES_DIR}"
cp "${BINARY}" "${RELEASES_DIR}/${OUTPUT_NAME}"
chmod +x "${RELEASES_DIR}/${OUTPUT_NAME}"
# 显示文件大小
SIZE=$(du -sh "${RELEASES_DIR}/${OUTPUT_NAME}" | cut -f1)
echo ""
echo "========================================"
echo "✅ 打包成功!"
echo " 产物路径: ${RELEASES_DIR}/${OUTPUT_NAME}"
echo " 文件大小: ${SIZE}"
echo "========================================"
# 7. 快速验证(不启动服务,只检查 --help
echo "[5/5] 验证二进制可执行..."
"${RELEASES_DIR}/${OUTPUT_NAME}" --help 2>/dev/null || true
echo "验证完成(如无错误输出则正常)"
# 清理虚拟环境
deactivate

78
build.yml Normal file
View File

@@ -0,0 +1,78 @@
# .github/workflows/build.yml
# 推送 tag如 v1.0.0)时自动为三种架构打包并发布 Release
name: Build Multi-Arch Binaries
on:
push:
tags:
- 'v*'
workflow_dispatch: # 支持手动触发
jobs:
build:
name: Build ${{ matrix.arch }}
runs-on: ubuntu-latest
strategy:
matrix:
include:
- arch: x86_64
platform: linux/amd64
python-arch: x64
- arch: arm64
platform: linux/arm64
python-arch: arm64
- arch: armhf
platform: linux/arm/v7
python-arch: arm
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Build binary in Docker
run: |
docker run --rm \
--platform ${{ matrix.platform }} \
-v "${{ github.workspace }}:/workspace" \
-w /workspace \
python:3.11-slim \
bash -c "
set -e
apt-get update -qq && apt-get install -y -q binutils
pip install --upgrade pip -q
pip install pyinstaller fastapi 'uvicorn[standard]' aiofiles \
pydantic python-multipart httptools -q
pyinstaller nas-media-player.spec --clean --noconfirm
mkdir -p releases
cp dist/nas-media-player releases/nas-media-player-${{ matrix.arch }}
chmod +x releases/nas-media-player-${{ matrix.arch }}
"
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: nas-media-player-${{ matrix.arch }}
path: releases/nas-media-player-${{ matrix.arch }}
release:
name: Create Release
needs: build
runs-on: ubuntu-latest
if: startsWith(github.ref, 'refs/tags/')
steps:
- name: Download all artifacts
uses: actions/download-artifact@v4
with:
path: releases/
merge-multiple: true
- name: Create GitHub Release
uses: softprops/action-gh-release@v2
with:
files: releases/*
generate_release_notes: true

1179
index.html

File diff suppressed because one or more lines are too long

View File

@@ -11,15 +11,16 @@ import urllib.parse
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from typing import Optional, List, Dict from typing import Optional, List, Dict
import unicodedata
import socket import socket
import tempfile
import shutil
VIDEO_DIR = os.getenv("NAS_MEDIA_VIDEO_DIR", "/mnt") VIDEO_DIR = os.getenv("NAS_MEDIA_VIDEO_DIR", "/mnt")
PORT = int(os.getenv("NAS_MEDIA_PORT", 8800)) PORT = int(os.getenv("NAS_MEDIA_PORT", 8800))
APP_DIR = os.getenv("NAS_MEDIA_APP_DIR", "/opt/nas-media-player") APP_DIR = os.getenv("NAS_MEDIA_APP_DIR", "/opt/nas-media-player")
LOG_FILE = os.getenv("NAS_MEDIA_LOG_FILE", os.path.join(APP_DIR, "nas-media-player.log")) LOG_FILE = os.getenv("NAS_MEDIA_LOG_FILE", os.path.join(APP_DIR, "nas-media-player.log"))
# 确保日志目录存在
log_dir = os.path.dirname(LOG_FILE) log_dir = os.path.dirname(LOG_FILE)
os.makedirs(log_dir, exist_ok=True) os.makedirs(log_dir, exist_ok=True)
logging.basicConfig( logging.basicConfig(
@@ -103,8 +104,13 @@ SUPPORTED_AUDIO_FORMATS = {
SUPPORTED_FORMATS = {**SUPPORTED_VIDEO_FORMATS, **SUPPORTED_IMAGE_FORMATS, **SUPPORTED_AUDIO_FORMATS} SUPPORTED_FORMATS = {**SUPPORTED_VIDEO_FORMATS, **SUPPORTED_IMAGE_FORMATS, **SUPPORTED_AUDIO_FORMATS}
SUPPORTED_EXTENSIONS = list(SUPPORTED_FORMATS.keys()) SUPPORTED_EXTENSIONS = list(SUPPORTED_FORMATS.keys())
# 挂载静态文件 # 挂载静态文件(延迟到目录确实存在时)
app.mount("/static", StaticFiles(directory=Path(APP_DIR) / "static"), name="static") static_dir = Path(APP_DIR) / "static"
if static_dir.exists():
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
else:
logger.warning(f"静态文件目录不存在,跳过挂载: {static_dir}")
def get_safe_cookie_key(dir_path: str) -> str: def get_safe_cookie_key(dir_path: str) -> str:
"""将目录路径转换为MD5哈希值避免Cookie键名包含非法字符""" """将目录路径转换为MD5哈希值避免Cookie键名包含非法字符"""
@@ -112,168 +118,221 @@ def get_safe_cookie_key(dir_path: str) -> str:
md5_hash = hashlib.md5(encoded_path).hexdigest() md5_hash = hashlib.md5(encoded_path).hexdigest()
return f"auth_{md5_hash}" return f"auth_{md5_hash}"
# 密码管理功能
# ── 密码管理功能 ────────────────────────────────────────────────────────────────
def init_password_file(): def init_password_file():
"""初始化密码文件(修复目录创建+合法JSON写入""" """初始化密码文件"""
app_dir = Path(APP_DIR) app_dir = Path(APP_DIR)
app_dir.mkdir(parents=True, exist_ok=True) app_dir.mkdir(parents=True, exist_ok=True)
if not PASSWORD_FILE.exists(): if not PASSWORD_FILE.exists():
with open(PASSWORD_FILE, 'w', encoding='utf-8') as f: PASSWORD_FILE.write_text("{}", encoding="utf-8")
json.dump({}, f)
else: else:
try: try:
with open(PASSWORD_FILE, 'r', encoding='utf-8') as f: json.loads(PASSWORD_FILE.read_text(encoding="utf-8"))
json.load(f) except (json.JSONDecodeError, OSError):
except json.JSONDecodeError: PASSWORD_FILE.write_text("{}", encoding="utf-8")
with open(PASSWORD_FILE, 'w', encoding='utf-8') as f:
json.dump({}, f)
def hash_password(password: str) -> str: def hash_password(password: str) -> str:
"""密码哈希""" """密码哈希"""
return hashlib.sha256(password.encode()).hexdigest() return hashlib.sha256(password.encode()).hexdigest()
def save_directory_password(dir_path: str, password: str):
"""保存目录密码""" def _read_password_data() -> dict:
"""读取密码文件数据(内部辅助函数)"""
init_password_file() init_password_file()
with open(PASSWORD_FILE, 'r+') as f: try:
data = json.load(f) return json.loads(PASSWORD_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return {}
def save_directory_password(dir_path: str, password: str):
"""保存目录密码(原子写入,防止并发破坏)"""
data = _read_password_data()
data[dir_path] = { data[dir_path] = {
"password_hash": hash_password(password), "password_hash": hash_password(password),
"created_at": datetime.now().isoformat() "created_at": datetime.now().isoformat()
} }
f.seek(0) # 原子写:先写临时文件再替换
json.dump(data, f, indent=2) tmp_path = PASSWORD_FILE.with_suffix(".tmp")
f.truncate() try:
tmp_path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
shutil.move(str(tmp_path), str(PASSWORD_FILE))
except Exception as e:
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
raise e
def get_directory_password(dir_path: str) -> Optional[str]: def get_directory_password(dir_path: str) -> Optional[str]:
"""获取目录密码哈希""" """获取目录密码哈希"""
init_password_file() return _read_password_data().get(dir_path, {}).get("password_hash")
if not PASSWORD_FILE.exists():
return None
with open(PASSWORD_FILE, 'r') as f:
data = json.load(f)
return data.get(dir_path, {}).get("password_hash")
def check_directory_password(dir_path: str, password: str) -> bool: def check_directory_password(dir_path: str, password: str) -> bool:
"""验证目录密码""" """验证目录密码"""
stored_hash = get_directory_password(dir_path) stored_hash = get_directory_password(dir_path)
if not stored_hash: if not stored_hash:
return True return True # 未设置密码,视为通过
return stored_hash == hash_password(password) return stored_hash == hash_password(password)
def get_protected_directories() -> List[str]: def get_protected_directories() -> List[str]:
"""获取所有受保护的目录""" """获取所有受保护的目录"""
init_password_file() return list(_read_password_data().keys())
with open(PASSWORD_FILE, 'r') as f:
data = json.load(f)
return list(data.keys())
def is_protected_directory(dir_path: str) -> bool: def is_protected_directory(dir_path: str) -> bool:
"""检查目录是否受保护(修复路径匹配逻辑)""" """检查目录(或其祖先)是否受保护"""
if not dir_path: if not dir_path:
return False return False
protected_dirs = get_protected_directories() protected_dirs = get_protected_directories()
dir_path_normalized = dir_path.replace(os.sep, '/').rstrip('/') norm = dir_path.replace(os.sep, '/').rstrip('/')
protected_dirs_normalized = [pdir.replace(os.sep, '/').rstrip('/') for pdir in protected_dirs] for pdir in protected_dirs:
pnorm = pdir.replace(os.sep, '/').rstrip('/')
for pdir in protected_dirs_normalized: if norm == pnorm or norm.startswith(f"{pnorm}/"):
if dir_path_normalized == pdir or dir_path_normalized.startswith(f"{pdir}/"):
return True return True
return False return False
def get_top_protected_directory(dir_path: str) -> Optional[str]: def get_top_protected_directory(dir_path: str) -> Optional[str]:
"""获取目录所属的顶级受保护目录兼容Python 3.8-""" """获取目录所属的顶级受保护祖先目录"""
if not dir_path or not is_protected_directory(dir_path): if not dir_path or not is_protected_directory(dir_path):
return None return None
# 统一路径分隔符为/,便于匹配 norm = dir_path.replace(os.sep, '/').rstrip('/')
dir_path_normalized = dir_path.replace(os.sep, '/').rstrip('/')
protected_dirs = get_protected_directories() protected_dirs = get_protected_directories()
protected_dirs_normalized = [pdir.replace(os.sep, '/').rstrip('/') for pdir in protected_dirs]
top_dir = None top_dir = None
max_depth = -1 min_depth = float('inf')
for pdir, pdir_original in zip(protected_dirs_normalized, protected_dirs): for pdir in protected_dirs:
if dir_path_normalized == pdir or dir_path_normalized.startswith(f"{pdir}/"): pnorm = pdir.replace(os.sep, '/').rstrip('/')
depth = pdir.count('/') if norm == pnorm or norm.startswith(f"{pnorm}/"):
if top_dir is None or depth < max_depth: depth = pnorm.count('/')
max_depth = depth if depth < min_depth:
top_dir = pdir_original min_depth = depth
top_dir = pdir
return top_dir return top_dir
async def verify_dir_access(request: Request, dir_path: str) -> bool:
"""验证目录访问权限(简化逻辑,避免误拦截)"""
if not dir_path or not is_protected_directory(dir_path):
return True
top_protected_dir = get_top_protected_directory(dir_path) def _verify_cookie(request: Request, top_protected_dir: str) -> bool:
if not top_protected_dir: """检查 Cookie 是否匹配受保护目录的密码哈希"""
return True
# 使用安全的Cookie键名
cookie_key = get_safe_cookie_key(top_protected_dir) cookie_key = get_safe_cookie_key(top_protected_dir)
cookie_value = request.cookies.get(cookie_key) cookie_value = request.cookies.get(cookie_key)
stored_hash = get_directory_password(top_protected_dir) stored_hash = get_directory_password(top_protected_dir)
return bool(cookie_value and stored_hash and cookie_value == stored_hash)
# 兼容Cookie不存在的情况
if cookie_value and stored_hash and cookie_value == stored_hash: async def check_dir_access(dir_path: str, request: Request) -> bool:
logger.info(f"目录访问验证通过: {dir_path} (Cookie认证)") """检查目录访问权限(统一入口)"""
if not dir_path:
return True return True
top_protected_dir = get_top_protected_directory(dir_path)
if not top_protected_dir:
return True
result = _verify_cookie(request, top_protected_dir)
if result:
logger.info(f"目录访问验证通过: {dir_path}")
else:
logger.warning(f"目录访问验证失败: {dir_path}")
return result
logger.warning(f"目录访问验证失败: {dir_path} (缺少有效Cookie)")
return False
# 根路径返回前端页面 # ── 路径安全辅助 ────────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def read_root():
return FileResponse(str(Path(APP_DIR) / "static" / "index.html"))
# 安全检查路径兼容Python 3.8及以下版本)
def safe_join(base: Path, *paths) -> Path: def safe_join(base: Path, *paths) -> Path:
"""安全拼接路径,防止路径穿越攻击"""
try: try:
decoded_paths = [urllib.parse.unquote(path) for path in paths] decoded_paths = [urllib.parse.unquote(p) for p in paths]
joined_path = base.joinpath(*decoded_paths).resolve() joined = base.joinpath(*decoded_paths).resolve()
joined_path.relative_to(base) joined.relative_to(base) # 若越界则抛 ValueError
return joined_path return joined
except ValueError: except ValueError:
logger.error(f"路径越权:{joined_path} 不在 {base} 范围内")
raise HTTPException(status_code=403, detail="无效路径(越权访问)") raise HTTPException(status_code=403, detail="无效路径(越权访问)")
except Exception as e: except Exception as e:
logger.error(f"Path security check failed: {e}") logger.error(f"路径安全检查失败: {e}")
raise HTTPException(status_code=403, detail="Invalid path") raise HTTPException(status_code=403, detail="Invalid path")
# 获取目录结构
# ── 自然排序辅助 ────────────────────────────────────────────────────────────────
import re
def _natural_sort_key(s: str):
"""将字符串拆分为文本/数字段用于自然排序1, 2, 10 而不是 1, 10, 2"""
parts = re.split(r'(\d+)', s.lower())
return [int(p) if p.isdigit() else p for p in parts]
# ── API 路由 ────────────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def read_root():
index_path = Path(APP_DIR) / "static" / "index.html"
if not index_path.exists():
raise HTTPException(status_code=404, detail="前端页面未找到")
return FileResponse(str(index_path))
@app.get("/api/directories") @app.get("/api/directories")
async def get_directories(): async def get_directories():
dirs = [] def traverse(path: Path, rel_path: str = "") -> List[Dict]:
protected_dirs = get_protected_directories()
def traverse_recursive_dirs(path: Path, rel_path: str = "") -> List[Dict]:
items = [] items = []
try: try:
for dir in path.iterdir(): for d in sorted(path.iterdir(), key=lambda x: _natural_sort_key(x.name)):
if dir.is_dir() and not dir.name.startswith('.'): if d.is_dir() and not d.name.startswith('.'):
sub_rel = f"{rel_path}/{dir.name}" if rel_path else dir.name sub_rel = f"{rel_path}/{d.name}" if rel_path else d.name
is_protected = is_protected_directory(sub_rel)
items.append({ items.append({
"name": dir.name, "name": d.name,
"path": sub_rel, "path": sub_rel,
"type": "directory", "type": "directory",
"protected": is_protected, "protected": is_protected_directory(sub_rel),
"children": traverse_recursive_dirs(dir, sub_rel) "children": traverse(d, sub_rel)
}) })
except PermissionError:
logger.warning(f"目录无读取权限: {path}")
except Exception as e: except Exception as e:
logger.error(f"Directory traversal error: {e}") logger.error(f"目录遍历错误: {e}")
return items return items
if VIDEO_ROOT.exists(): dirs = traverse(VIDEO_ROOT) if VIDEO_ROOT.exists() else []
dirs = traverse_recursive_dirs(VIDEO_ROOT)
return {"directories": dirs} return {"directories": dirs}
@app.get("/api/all-directories")
async def get_all_directories():
all_dirs = []
def traverse(path: Path, rel_path: str = ""):
all_dirs.append({
"name": rel_path if rel_path else "主目录",
"path": rel_path,
"protected": is_protected_directory(rel_path)
})
try:
for d in sorted(path.iterdir(), key=lambda x: _natural_sort_key(x.name)):
if d.is_dir() and not d.name.startswith('.'):
sub_rel = f"{rel_path}/{d.name}" if rel_path else d.name
traverse(d, sub_rel)
except PermissionError:
logger.warning(f"目录无读取权限: {path}")
except Exception as e:
logger.error(f"目录遍历错误: {e}")
if VIDEO_ROOT.exists():
traverse(VIDEO_ROOT)
return {"directories": all_dirs}
@app.get("/api/protected-directories")
async def get_protected_dirs():
return {"protected_dirs": get_protected_directories()}
@app.post("/api/verify-dir-password") @app.post("/api/verify-dir-password")
async def verify_dir_password(dir_path: str = Form(...), password: str = Form(...)): async def verify_dir_password(dir_path: str = Form(...), password: str = Form(...)):
try: try:
@@ -287,9 +346,9 @@ async def verify_dir_password(dir_path: str = Form(...), password: str = Form(..
response.set_cookie( response.set_cookie(
key=cookie_key, key=cookie_key,
value=hash_password(password), value=hash_password(password),
max_age=3600, max_age=3600 * 8, # 8小时原来1小时延长减少重复验证
httponly=True, httponly=True,
secure=False, secure=False, # LAN 部署通常无 HTTPS
samesite="lax" samesite="lax"
) )
logger.info(f"目录密码验证成功: {top_protected_dir}") logger.info(f"目录密码验证成功: {top_protected_dir}")
@@ -298,26 +357,25 @@ async def verify_dir_password(dir_path: str = Form(...), password: str = Form(..
logger.warning(f"目录密码验证失败: {top_protected_dir}") logger.warning(f"目录密码验证失败: {top_protected_dir}")
return {"success": False, "message": "密码错误"} return {"success": False, "message": "密码错误"}
except Exception as e: except Exception as e:
logger.error(f"Password verification error: {e}") logger.error(f"密码验证异常: {e}")
return {"success": False, "message": f"验证失败: {str(e)}"} return {"success": False, "message": f"验证失败: {str(e)}"}
async def check_dir_access(dir_path: str, request: Request) -> bool:
"""检查目录访问权限"""
if not dir_path:
return True
@app.post("/api/clear-dir-auth")
async def clear_dir_auth(dir_path: str = Form(...)):
try:
top_protected_dir = get_top_protected_directory(dir_path) top_protected_dir = get_top_protected_directory(dir_path)
if not top_protected_dir: if not top_protected_dir:
return True return {"success": True, "message": "目录不受保护"}
cookie_key = get_safe_cookie_key(top_protected_dir) cookie_key = get_safe_cookie_key(top_protected_dir)
cookie_value = request.cookies.get(cookie_key) response = JSONResponse({"success": True, "message": "已清除访问权限"})
stored_hash = get_directory_password(top_protected_dir) response.delete_cookie(cookie_key)
return response
except Exception as e:
logger.error(f"清除认证失败: {e}")
return {"success": False, "message": f"清除失败: {str(e)}"}
if cookie_value and cookie_value == stored_hash:
return True
return False
@app.get("/api/media") @app.get("/api/media")
async def get_media(subdir: Optional[str] = None, request: Request = None): async def get_media(subdir: Optional[str] = None, request: Request = None):
@@ -330,16 +388,12 @@ async def get_media(subdir: Optional[str] = None, request: Request = None):
"top_protected_dir": get_top_protected_directory(subdir) "top_protected_dir": get_top_protected_directory(subdir)
} }
if subdir and subdir.strip(): target_dir = safe_join(VIDEO_ROOT, subdir.strip()) if subdir and subdir.strip() else VIDEO_ROOT
target_dir = safe_join(VIDEO_ROOT, subdir.strip())
else:
target_dir = VIDEO_ROOT
if not target_dir.exists() or not target_dir.is_dir(): if not target_dir.exists() or not target_dir.is_dir():
return {"media": [], "current_dir": subdir or ""} return {"media": [], "current_dir": subdir or ""}
media = [] media = []
for file in target_dir.iterdir(): for file in target_dir.iterdir():
if file.is_file(): if file.is_file():
ext = file.suffix.lower() ext = file.suffix.lower()
@@ -350,18 +404,18 @@ async def get_media(subdir: Optional[str] = None, request: Request = None):
file_type = "audio" file_type = "audio"
else: else:
file_type = "image" file_type = "image"
stat = file.stat()
media.append({ media.append({
"name": file.name, "name": file.name,
"type": file_type, "type": file_type,
"extension": ext, "extension": ext,
"size": file.stat().st_size, "size": stat.st_size,
"modified": file.stat().st_mtime, "modified": stat.st_mtime,
"path": str(file) "path": str(file)
}) })
# 按文件名自然排序 # 自然排序1, 2, 10 顺序,而非 1, 10, 2
media.sort(key=lambda x: (len(x["name"]), x["name"])) media.sort(key=lambda x: _natural_sort_key(x["name"]))
logger.info(f"Found {len(media)} media files in {target_dir}") logger.info(f"Found {len(media)} media files in {target_dir}")
return { return {
@@ -370,13 +424,15 @@ async def get_media(subdir: Optional[str] = None, request: Request = None):
"protected": is_protected_directory(subdir or ""), "protected": is_protected_directory(subdir or ""),
"top_protected_dir": get_top_protected_directory(subdir or "") "top_protected_dir": get_top_protected_directory(subdir or "")
} }
except HTTPException:
raise
except Exception as e: except Exception as e:
logger.error(f"Error getting media list: {e}") logger.error(f"获取媒体列表失败: {e}")
return {"media": [], "current_dir": subdir or "", "error": str(e)} return {"media": [], "current_dir": subdir or "", "error": str(e)}
# 编码文件名用于HTTP头
def encode_filename_for_header(filename: str) -> str: def encode_filename_for_header(filename: str) -> str:
"""编码文件名以支持中文等特殊字符""" """编码文件名以支持中文等非 ASCII 字符"""
try: try:
filename.encode('ascii') filename.encode('ascii')
return filename return filename
@@ -389,149 +445,87 @@ async def serve_media(path: str, request: Request):
try: try:
decoded_path = urllib.parse.unquote(path) decoded_path = urllib.parse.unquote(path)
full_media_path = safe_join(VIDEO_ROOT, decoded_path) full_media_path = safe_join(VIDEO_ROOT, decoded_path)
media_dir = path_relative_to(full_media_path.parent, VIDEO_ROOT) if path_is_relative_to(full_media_path.parent, VIDEO_ROOT) else str(full_media_path.parent) media_dir = (
path_relative_to(full_media_path.parent, VIDEO_ROOT)
if path_is_relative_to(full_media_path.parent, VIDEO_ROOT)
else str(full_media_path.parent)
)
if is_protected_directory(media_dir) and not await verify_dir_access(request, media_dir): if is_protected_directory(media_dir) and not await check_dir_access(media_dir, request):
raise HTTPException(status_code=403, detail="需要密码访问") raise HTTPException(status_code=403, detail="需要密码访问")
if not full_media_path.exists() or not full_media_path.is_file(): if not full_media_path.exists() or not full_media_path.is_file():
logger.warning(f"Media file not found: {full_media_path}") return JSONResponse(status_code=404, content={"error": "Media file not found"})
return JSONResponse(
status_code=404,
content={"error": "Media file not found"}
)
ext = full_media_path.suffix.lower() ext = full_media_path.suffix.lower()
if ext not in SUPPORTED_EXTENSIONS: if ext not in SUPPORTED_EXTENSIONS:
return JSONResponse( return JSONResponse(status_code=400, content={"error": f"Unsupported format: {ext}"})
status_code=400,
content={"error": f"Unsupported format: {ext}"}
)
mime_type = SUPPORTED_FORMATS.get(ext, "application/octet-stream") mime_type = SUPPORTED_FORMATS.get(ext, "application/octet-stream")
filename = full_media_path.name
encoded_filename = encode_filename_for_header(filename)
content_disp = f'inline; filename="{encoded_filename}"; filename*=UTF-8\'\'{encoded_filename}'
# 处理图片 # 图片:直接返回
if ext in SUPPORTED_IMAGE_FORMATS: if ext in SUPPORTED_IMAGE_FORMATS:
logger.info(f"Serving image: {full_media_path}") logger.info(f"Serving image: {full_media_path}")
# 处理中文文件名的HTTP头
filename = full_media_path.name
encoded_filename = encode_filename_for_header(filename)
headers = {
"Cache-Control": "max-age=3600",
"Content-Disposition": f"inline; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}"
}
return FileResponse( return FileResponse(
path=str(full_media_path), path=str(full_media_path),
media_type=mime_type, media_type=mime_type,
filename=encoded_filename, headers={"Cache-Control": "max-age=3600", "Content-Disposition": content_disp}
headers=headers
) )
# 处理音频 # 音频:直接返回(浏览器原生断点续传)
elif ext in SUPPORTED_AUDIO_FORMATS: if ext in SUPPORTED_AUDIO_FORMATS:
logger.info(f"Serving audio: {full_media_path}") logger.info(f"Serving audio: {full_media_path}")
# 处理中文文件名的HTTP头
filename = full_media_path.name
encoded_filename = encode_filename_for_header(filename)
headers = {
"Content-Disposition": f"inline; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}"
}
return FileResponse( return FileResponse(
path=str(full_media_path), path=str(full_media_path),
media_type=mime_type, media_type=mime_type,
filename=encoded_filename, headers={"Content-Disposition": content_disp, "Accept-Ranges": "bytes"}
headers=headers
) )
# 视频处理断点续传 # 视频:手动处理 Range 断点续传
file_size = full_media_path.stat().st_size file_size = full_media_path.stat().st_size
range_header = request.headers.get("Range") range_header = request.headers.get("Range")
start, end = 0, min(1024 * 1024 * 2, file_size - 1) # 默认前 2 MB
if range_header: if range_header:
try: try:
range_str = range_header.split("=")[-1] range_str = range_header.split("=")[-1]
start_str, end_str = range_str.split("-") start_str, end_str = range_str.split("-")
start = int(start_str) if start_str else 0 start = int(start_str) if start_str else 0
end = int(end_str) if end_str else file_size - 1 end = int(end_str) if end_str else file_size - 1
end = min(end, file_size - 1)
start = max(0, start) start = max(0, start)
except: end = min(end, file_size - 1)
start = 0 except (ValueError, IndexError):
end = min(1024*1024*2, file_size - 1) start, end = 0, min(1024 * 1024 * 2, file_size - 1)
else:
start = 0
end = min(1024*1024*2, file_size - 1)
# 异步分块读取
async def iterfile(): async def iterfile():
async with aiofiles.open(str(full_media_path), 'rb') as f: async with aiofiles.open(str(full_media_path), 'rb') as f:
await f.seek(start) await f.seek(start)
remaining = end - start + 1 remaining = end - start + 1
while remaining > 0: while remaining > 0:
chunk_size = min(1024*1024, remaining) chunk = await f.read(min(1024 * 1024, remaining))
chunk = await f.read(chunk_size)
if not chunk: if not chunk:
break break
yield chunk yield chunk
remaining -= chunk_size remaining -= len(chunk)
# 处理视频文件名
filename = full_media_path.name
encoded_filename = encode_filename_for_header(filename)
headers = { headers = {
"Content-Range": f"bytes {start}-{end}/{file_size}", "Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes", "Accept-Ranges": "bytes",
"Content-Length": str(end - start + 1), "Content-Length": str(end - start + 1),
"Content-Type": mime_type, "Content-Type": mime_type,
"Content-Disposition": f"inline; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}" "Content-Disposition": content_disp,
} }
logger.info(f"Serving video: {full_media_path} (bytes {start}-{end}/{file_size})") logger.info(f"Serving video: {full_media_path} (bytes {start}-{end}/{file_size})")
return StreamingResponse( return StreamingResponse(iterfile(), status_code=206, headers=headers, media_type=mime_type)
iterfile(),
status_code=206,
headers=headers,
media_type=mime_type
)
except HTTPException as e: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Error serving media: {e}") logger.error(f"媒体文件服务失败: {e}")
return JSONResponse( return JSONResponse(status_code=500, content={"error": f"Server error: {str(e)}"})
status_code=500,
content={"error": f"Server error: {str(e)}"}
)
# 获取所有目录路径
@app.get("/api/all-directories")
async def get_all_directories():
all_dirs = []
def traverse_all_dirs(path: Path, rel_path: str = ""):
try:
all_dirs.append({
"name": rel_path if rel_path else "主目录",
"path": rel_path,
"protected": is_protected_directory(rel_path)
})
for dir in path.iterdir():
if dir.is_dir() and not dir.name.startswith('.'):
sub_rel = f"{rel_path}/{dir.name}" if rel_path else dir.name
traverse_all_dirs(dir, sub_rel)
except Exception as e:
logger.error(f"Error traversing all directories: {e}")
if VIDEO_ROOT.exists():
traverse_all_dirs(VIDEO_ROOT)
return {"directories": all_dirs}
@app.post("/api/create-directory") @app.post("/api/create-directory")
@@ -541,52 +535,49 @@ async def create_directory(
dir_password: Optional[str] = Form(None) dir_password: Optional[str] = Form(None)
): ):
try: try:
if not new_dir or new_dir.strip() == "": if not new_dir or not new_dir.strip():
raise HTTPException(status_code=400, detail="目录名不能为空") raise HTTPException(status_code=400, detail="目录名不能为空")
# 安全路径拼接 new_dir = new_dir.strip()
if target_path and target_path.strip():
parent_dir = safe_join(VIDEO_ROOT, target_path.strip()) invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\0']
else: if any(c in new_dir for c in invalid_chars):
parent_dir = VIDEO_ROOT raise HTTPException(status_code=400, detail="目录名包含非法字符(/\\:*?\"<>|")
parent_dir = safe_join(VIDEO_ROOT, target_path.strip()) if target_path and target_path.strip() else VIDEO_ROOT
# 新增:检查父目录是否存在且可写
if not parent_dir.exists(): if not parent_dir.exists():
raise HTTPException(status_code=404, detail=f"父目录不存在: {parent_dir}") raise HTTPException(status_code=404, detail=f"父目录不存在: {parent_dir}")
if not os.access(parent_dir, os.W_OK): if not os.access(parent_dir, os.W_OK):
raise HTTPException(status_code=403, detail=f"父目录无写入权限: {parent_dir}") raise HTTPException(status_code=403, detail="父目录无写入权限")
new_dir_path = parent_dir / new_dir.strip() new_dir_path = parent_dir / new_dir
new_dir_rel_path = path_relative_to(new_dir_path, VIDEO_ROOT) if path_is_relative_to(new_dir_path, VIDEO_ROOT) else str(new_dir_path)
# 检查目录名合法性
invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
if any(char in new_dir for char in invalid_chars):
raise HTTPException(status_code=400, detail="目录名包含非法字符(/\:*?\"<>|")
# 新增:检查目录是否已存在
if new_dir_path.exists(): if new_dir_path.exists():
raise HTTPException(status_code=409, detail=f"目录已存在: {new_dir_path.name}") raise HTTPException(status_code=409, detail=f"目录已存在: {new_dir}")
# 创建目录(增强异常捕获)
try: try:
new_dir_path.mkdir(parents=True, exist_ok=False) new_dir_path.mkdir(parents=False, exist_ok=False)
except PermissionError: except PermissionError:
raise HTTPException(status_code=403, detail=f"创建目录失败:权限不足{new_dir_path}") raise HTTPException(status_code=403, detail="创建目录失败:权限不足")
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=f"创建目录失败:{str(e)}") raise HTTPException(status_code=500, detail=f"创建目录失败:{str(e)}")
# 设置密码保护 new_dir_rel = (
path_relative_to(new_dir_path, VIDEO_ROOT)
if path_is_relative_to(new_dir_path, VIDEO_ROOT)
else str(new_dir_path)
)
if dir_password and dir_password.strip(): if dir_password and dir_password.strip():
save_directory_password(new_dir_rel_path, dir_password.strip()) save_directory_password(new_dir_rel, dir_password.strip())
logger.info(f"带密码保护的目录创建成功: {new_dir_path}") logger.info(f"带密码保护的目录创建成功: {new_dir_path}")
else: else:
logger.info(f"目录创建成功: {new_dir_path}") logger.info(f"目录创建成功: {new_dir_path}")
return { return {
"success": True, "success": True,
"message": f"目录创建成功: {new_dir_path.name}" + ("(已设置密码保护)" if dir_password else ""), "message": f"目录创建成功: {new_dir}" + ("(已设置密码保护)" if dir_password else ""),
"path": new_dir_rel_path, "path": new_dir_rel,
"protected": bool(dir_password) "protected": bool(dir_password)
} }
except HTTPException: except HTTPException:
@@ -595,137 +586,185 @@ async def create_directory(
logger.error(f"创建目录异常: {e}", exc_info=True) logger.error(f"创建目录异常: {e}", exc_info=True)
return {"success": False, "message": f"创建失败: {str(e)}"} return {"success": False, "message": f"创建失败: {str(e)}"}
@app.post("/api/upload-media") @app.post("/api/upload-media")
async def upload_media( async def upload_media(
request: Request, request: Request,
target_dir: str = Form(""), target_dir: str = Form(""),
file: UploadFile = File(...) file: UploadFile = File(...)
): ):
file_path = None
try: try:
logger.info(f"开始处理上传请求 - 目标目录: {target_dir}, 文件: {file.filename}") logger.info(f"上传请求 - 目标目录: {target_dir}, 文件: {file.filename}")
if is_protected_directory(target_dir) and not await verify_dir_access(request, target_dir): if is_protected_directory(target_dir) and not await check_dir_access(target_dir, request):
logger.warning(f"加密目录上传权限拒绝: {target_dir}")
return {"success": False, "message": "无权访问该目录,请先验证密码"} return {"success": False, "message": "无权访问该目录,请先验证密码"}
if not file or not file.filename: if not file or not file.filename:
logger.warning("上传失败:未选择文件")
return {"success": False, "message": "未选择文件"} return {"success": False, "message": "未选择文件"}
filename = file.filename filename = file.filename
file_ext = Path(filename).suffix.lower() file_ext = Path(filename).suffix.lower()
if file_ext not in SUPPORTED_EXTENSIONS: if file_ext not in SUPPORTED_EXTENSIONS:
logger.warning(f"上传失败:不支持的文件格式 {file_ext}")
return { return {
"success": False, "success": False,
"message": f"不支持的文件格式: {file_ext},支持的格式: {', '.join(SUPPORTED_EXTENSIONS)}" "message": f"不支持的文件格式: {file_ext},支持的格式: {', '.join(SUPPORTED_EXTENSIONS)}"
} }
if target_dir.strip(): upload_dir = safe_join(VIDEO_ROOT, target_dir.strip()) if target_dir.strip() else VIDEO_ROOT
upload_dir = safe_join(VIDEO_ROOT, target_dir.strip())
else:
upload_dir = VIDEO_ROOT
os.makedirs(upload_dir, exist_ok=True) os.makedirs(upload_dir, exist_ok=True)
logger.info(f"上传目录已确认: {upload_dir}")
# 文件名去重
file_path = upload_dir / filename file_path = upload_dir / filename
counter = 1 counter = 1
while file_path.exists(): while file_path.exists():
stem = Path(filename).stem stem = Path(filename).stem
new_filename = f"{stem}_{counter}{file_ext}" file_path = upload_dir / f"{stem}_{counter}{file_ext}"
file_path = upload_dir / new_filename
counter += 1 counter += 1
# 写入文件(先写临时文件,成功后原子移动)
tmp_fd, tmp_name = tempfile.mkstemp(dir=upload_dir)
try: try:
async with aiofiles.open(str(file_path), 'wb') as f:
content_length = 0 content_length = 0
while chunk := await file.read(1024 * 1024): async with aiofiles.open(tmp_name, 'wb') as f:
while True:
chunk = await file.read(1024 * 1024)
if not chunk:
break
await f.write(chunk) await f.write(chunk)
content_length += len(chunk) content_length += len(chunk)
if not file_path.exists(): os.close(tmp_fd)
raise Exception("文件保存失败:文件不存在") shutil.move(tmp_name, str(file_path))
if file_path.stat().st_size != content_length: except Exception as e:
logger.warning(f"文件大小不一致 - 预期: {content_length}, 实际: {file_path.stat().st_size}") os.close(tmp_fd)
if os.path.exists(tmp_name):
os.unlink(tmp_name)
raise Exception(f"保存文件失败: {str(e)}")
# 确定文件类型 actual_size = file_path.stat().st_size
if file_ext in SUPPORTED_VIDEO_FORMATS: if actual_size != content_length:
file_type = "视频" logger.warning(f"文件大小不一致 - 写入: {content_length}, 磁盘: {actual_size}")
elif file_ext in SUPPORTED_AUDIO_FORMATS:
file_type = "音频"
else:
file_type = "图片"
logger.info(f"文件上传成功: {file_path} ({file_type}, {file_path.stat().st_size} bytes)") file_type = "视频" if file_ext in SUPPORTED_VIDEO_FORMATS else ("音频" if file_ext in SUPPORTED_AUDIO_FORMATS else "图片")
logger.info(f"上传成功: {file_path} ({file_type}, {actual_size} bytes)")
return { return {
"success": True, "success": True,
"message": f"{file_type}文件 {file_path.name} 上传成功", "message": f"{file_type}文件 {file_path.name} 上传成功",
"filename": file_path.name, "filename": file_path.name,
"path": target_dir, "path": target_dir,
"size": file_path.stat().st_size "size": actual_size
} }
except HTTPException:
raise
except Exception as e: except Exception as e:
# 清理不完整文件 # 清理可能已创建的残留文件
if file_path.exists() and file_path.stat().st_size == 0: if file_path and isinstance(file_path, Path) and file_path.exists() and file_path.stat().st_size == 0:
file_path.unlink() file_path.unlink(missing_ok=True)
logger.warning(f"清理空文件: {file_path}") logger.error(f"上传失败: {e}")
raise Exception(f"保存文件失败: {str(e)}") return {"success": False, "message": f"上传失败: {str(e)}"}
except Exception as e:
logger.error(f"上传失败: {str(e)}")
return {
"success": False,
"message": f"上传失败: {str(e)}"
}
finally: finally:
# 确保文件句柄关闭
try: try:
await file.close() await file.close()
except Exception as e: except Exception:
logger.error(f"关闭文件句柄失败: {e}") pass
@app.post("/api/clear-dir-auth")
async def clear_dir_auth(dir_path: str = Form(...)): @app.delete("/api/delete-file")
async def delete_file(request: Request, file_path: str):
"""删除媒体文件"""
try: try:
top_protected_dir = get_top_protected_directory(dir_path) full_path = safe_join(VIDEO_ROOT, urllib.parse.unquote(file_path))
if not top_protected_dir: media_dir = (
return {"success": True, "message": "目录不受保护"} path_relative_to(full_path.parent, VIDEO_ROOT)
if path_is_relative_to(full_path.parent, VIDEO_ROOT)
else str(full_path.parent)
)
cookie_key = get_safe_cookie_key(top_protected_dir) if is_protected_directory(media_dir) and not await check_dir_access(media_dir, request):
response = JSONResponse({"success": True, "message": "已清除访问权限"}) raise HTTPException(status_code=403, detail="需要密码访问")
response.delete_cookie(cookie_key)
return response if not full_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
if not full_path.is_file():
raise HTTPException(status_code=400, detail="路径不是文件")
full_path.unlink()
logger.info(f"文件删除成功: {full_path}")
return {"success": True, "message": f"文件 {full_path.name} 删除成功"}
except HTTPException:
raise
except Exception as e: except Exception as e:
logger.error(f"Clear auth error: {e}") logger.error(f"删除文件失败: {e}")
return {"success": False, "message": f"除失败: {str(e)}"} return {"success": False, "message": f"除失败: {str(e)}"}
@app.get("/api/protected-directories") @app.delete("/api/delete-directory")
async def get_protected_dirs(): async def delete_directory(request: Request, dir_path: str):
return {"protected_dirs": get_protected_directories()} """删除目录(仅允许删除空目录)"""
try:
full_path = safe_join(VIDEO_ROOT, urllib.parse.unquote(dir_path))
rel_path = (
path_relative_to(full_path, VIDEO_ROOT)
if path_is_relative_to(full_path, VIDEO_ROOT)
else str(full_path)
)
if not full_path.exists():
raise HTTPException(status_code=404, detail="目录不存在")
if not full_path.is_dir():
raise HTTPException(status_code=400, detail="路径不是目录")
if full_path == VIDEO_ROOT:
raise HTTPException(status_code=403, detail="不允许删除根目录")
# 检查目录是否为空
if any(full_path.iterdir()):
raise HTTPException(status_code=409, detail="目录不为空,请先删除其中的文件")
full_path.rmdir()
# 同步清理密码记录
data = _read_password_data()
if rel_path in data:
del data[rel_path]
PASSWORD_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
logger.info(f"目录删除成功: {full_path}")
return {"success": True, "message": f"目录 {full_path.name} 删除成功"}
except HTTPException:
raise
except Exception as e:
logger.error(f"删除目录失败: {e}")
return {"success": False, "message": f"删除失败: {str(e)}"}
# ── 启动入口 ────────────────────────────────────────────────────────────────────
def create_listen_sockets(port: int) -> list: def create_listen_sockets(port: int) -> list:
sockets = [] sockets = []
# ===== IPv4 ===== # IPv4
sock4 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock4 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except (AttributeError, OSError):
pass # SO_REUSEPORT 并非所有平台都支持
sock4.bind(("0.0.0.0", port)) sock4.bind(("0.0.0.0", port))
sock4.listen(2048) sock4.listen(2048)
sockets.append(sock4) sockets.append(sock4)
logger.info(f"IPv4 监听: 0.0.0.0:{port}") logger.info(f"IPv4 监听: 0.0.0.0:{port}")
# ===== IPv6 ===== # IPv6可选
try: try:
sock6 = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock6 = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
# 关键:必须是 1 except (AttributeError, OSError):
pass
sock6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) sock6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
sock6.bind(("::", port)) sock6.bind(("::", port))
sock6.listen(2048) sock6.listen(2048)
@@ -736,6 +775,7 @@ def create_listen_sockets(port: int) -> list:
return sockets return sockets
def main(): def main():
init_password_file() init_password_file()
import uvicorn import uvicorn
@@ -747,10 +787,9 @@ def main():
access_log=False, access_log=False,
timeout_keep_alive=30 timeout_keep_alive=30
) )
server = uvicorn.Server(config) server = uvicorn.Server(config)
server.run(sockets=sockets) server.run(sockets=sockets)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

140
nas-media-player.spec Normal file
View File

@@ -0,0 +1,140 @@
# nas-media-player.spec
# 使用方法pyinstaller nas-media-player.spec
import sys
from pathlib import Path
block_cipher = None
a = Analysis(
['nas-media-player.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[
# uvicorn 核心
'uvicorn',
'uvicorn.main',
'uvicorn.config',
'uvicorn.server',
'uvicorn.loops',
'uvicorn.loops.auto',
'uvicorn.loops.asyncio',
'uvicorn.protocols',
'uvicorn.protocols.http',
'uvicorn.protocols.http.auto',
'uvicorn.protocols.http.h11_impl',
'uvicorn.protocols.http.httptools_impl',
'uvicorn.protocols.websockets',
'uvicorn.protocols.websockets.auto',
'uvicorn.protocols.websockets.websockets_impl',
'uvicorn.protocols.websockets.wsproto_impl',
'uvicorn.lifespan',
'uvicorn.lifespan.off',
'uvicorn.lifespan.on',
'uvicorn.logging',
'uvicorn.middleware',
'uvicorn.middleware.asgi2',
'uvicorn.middleware.message_logger',
'uvicorn.middleware.proxy_headers',
# fastapi / starlette
'fastapi',
'fastapi.routing',
'fastapi.middleware',
'fastapi.middleware.cors',
'fastapi.staticfiles',
'fastapi.responses',
'starlette',
'starlette.routing',
'starlette.middleware',
'starlette.middleware.cors',
'starlette.staticfiles',
'starlette.responses',
'starlette.background',
'starlette.concurrency',
'starlette.datastructures',
'starlette.exceptions',
'starlette.formparsers',
'starlette.requests',
'starlette.types',
'starlette.websockets',
# HTTP 解析库uvicorn 可选依赖,打包时都带上)
'h11',
'httptools',
'anyio',
'anyio._backends._asyncio',
'anyio._backends._trio',
'sniffio',
# aiofiles
'aiofiles',
'aiofiles.os',
'aiofiles.threadpool',
# pydanticfastapi 依赖)
'pydantic',
'pydantic.v1',
'pydantic_core',
# 标准库补充
'multipart',
'python_multipart',
'email.mime.multipart',
'email.mime.text',
# 编码/哈希
'hashlib',
'hmac',
# 其他
'click',
'typing_extensions',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[
# 排除不需要的大型库,减小体积
'tkinter',
'matplotlib',
'numpy',
'pandas',
'PIL',
'scipy',
'IPython',
'jupyter',
'notebook',
'test',
'unittest',
],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='nas-media-player', # 输出的二进制名
debug=False,
bootloader_ignore_signals=False,
strip=True, # strip 调试符号,减小体积
upx=True, # 若系统有 upx 则进一步压缩
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

View File

@@ -1,25 +0,0 @@
## 打包二进制制作方法
```
apt update && apt install -y python3 python3-pip python3-dev gcc g++ make libffi-dev libssl-dev patchelf
pip_select.sh
pip3 install fastapi uvicorn aiofiles python-multipart pyinstaller
```
```
pyinstaller --onefile --name=nas-media-player-armhf --distpath=dist --workpath=tmp --clean --exclude-module=tkinter --exclude-module=unittest --exclude-module=sqlite3 nas-media-player.py
pyinstaller --onefile --name=nas-media-player-arm64 --distpath=dist --workpath=tmp --clean --exclude-module=tkinter --exclude-module=unittest --exclude-module=sqlite3 nas-media-player.py
pyinstaller --onefile --name=nas-media-player-x86_64 --distpath=dist --workpath=tmp --clean --exclude-module=tkinter --exclude-module=unittest --exclude-module=sqlite3 nas-media-player.py
~/.local/bin/pyinstaller --onefile --name=nas-media-player-x86_64 --distpath=dist --workpath=tmp --clean --exclude-module=tkinter --exclude-module=unittest --exclude-module=sqlite3 nas-media-player.py
```
```
chmod +x dist/nas-media-player-x86_64
./dist/nas-media-player-x86_64
```

Binary file not shown.

Binary file not shown.

Binary file not shown.