+
媒体文件上传
+
+
+
+
+
+
🔒 目录已加密保护
+
请输入访问密码
+
+
+
+
+
-
- 支持格式:.mp4,.avi,.mkv,.webm,.mov,.flv,.wmv,.mpeg,.mpg,.m4v,.jpg,.jpeg,.png,.gif,.bmp,.webp,.tiff,.tif,.mp3,.wav,.ogg,.flac,.aac,.m4a,.wma,.ape,.alac
-
-
+init();
+
-
diff --git a/nas-media-player.py b/nas-media-player.py
index ab089bb..691f6fa 100644
--- a/nas-media-player.py
+++ b/nas-media-player.py
@@ -11,15 +11,16 @@ import urllib.parse
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict
-import unicodedata
import socket
+import tempfile
+import shutil
VIDEO_DIR = os.getenv("NAS_MEDIA_VIDEO_DIR", "/mnt")
PORT = int(os.getenv("NAS_MEDIA_PORT", 8800))
APP_DIR = os.getenv("NAS_MEDIA_APP_DIR", "/opt/nas-media-player")
LOG_FILE = os.getenv("NAS_MEDIA_LOG_FILE", os.path.join(APP_DIR, "nas-media-player.log"))
-
+# 确保日志目录存在
log_dir = os.path.dirname(LOG_FILE)
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
@@ -103,8 +104,13 @@ SUPPORTED_AUDIO_FORMATS = {
SUPPORTED_FORMATS = {**SUPPORTED_VIDEO_FORMATS, **SUPPORTED_IMAGE_FORMATS, **SUPPORTED_AUDIO_FORMATS}
SUPPORTED_EXTENSIONS = list(SUPPORTED_FORMATS.keys())
-# 挂载静态文件
-app.mount("/static", StaticFiles(directory=Path(APP_DIR) / "static"), name="static")
+# 挂载静态文件(延迟到目录确实存在时)
+static_dir = Path(APP_DIR) / "static"
+if static_dir.exists():
+ app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
+else:
+ logger.warning(f"静态文件目录不存在,跳过挂载: {static_dir}")
+
def get_safe_cookie_key(dir_path: str) -> str:
"""将目录路径转换为MD5哈希值,避免Cookie键名包含非法字符"""
@@ -112,184 +118,237 @@ def get_safe_cookie_key(dir_path: str) -> str:
md5_hash = hashlib.md5(encoded_path).hexdigest()
return f"auth_{md5_hash}"
-# 密码管理功能
+
+# ── 密码管理功能 ────────────────────────────────────────────────────────────────
+
def init_password_file():
- """初始化密码文件(修复目录创建+合法JSON写入)"""
+ """初始化密码文件"""
app_dir = Path(APP_DIR)
app_dir.mkdir(parents=True, exist_ok=True)
if not PASSWORD_FILE.exists():
- with open(PASSWORD_FILE, 'w', encoding='utf-8') as f:
- json.dump({}, f)
+ PASSWORD_FILE.write_text("{}", encoding="utf-8")
else:
try:
- with open(PASSWORD_FILE, 'r', encoding='utf-8') as f:
- json.load(f)
- except json.JSONDecodeError:
- with open(PASSWORD_FILE, 'w', encoding='utf-8') as f:
- json.dump({}, f)
+ json.loads(PASSWORD_FILE.read_text(encoding="utf-8"))
+ except (json.JSONDecodeError, OSError):
+ PASSWORD_FILE.write_text("{}", encoding="utf-8")
+
def hash_password(password: str) -> str:
"""密码哈希"""
return hashlib.sha256(password.encode()).hexdigest()
-def save_directory_password(dir_path: str, password: str):
- """保存目录密码"""
+
+def _read_password_data() -> dict:
+ """读取密码文件数据(内部辅助函数)"""
init_password_file()
- with open(PASSWORD_FILE, 'r+') as f:
- data = json.load(f)
- data[dir_path] = {
- "password_hash": hash_password(password),
- "created_at": datetime.now().isoformat()
- }
- f.seek(0)
- json.dump(data, f, indent=2)
- f.truncate()
+ try:
+ return json.loads(PASSWORD_FILE.read_text(encoding="utf-8"))
+ except (json.JSONDecodeError, OSError):
+ return {}
+
+
+def save_directory_password(dir_path: str, password: str):
+ """保存目录密码(原子写入,防止并发破坏)"""
+ data = _read_password_data()
+ data[dir_path] = {
+ "password_hash": hash_password(password),
+ "created_at": datetime.now().isoformat()
+ }
+ # 原子写:先写临时文件再替换
+ tmp_path = PASSWORD_FILE.with_suffix(".tmp")
+ try:
+ tmp_path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
+ shutil.move(str(tmp_path), str(PASSWORD_FILE))
+ except Exception as e:
+ if tmp_path.exists():
+ tmp_path.unlink(missing_ok=True)
+ raise e
+
def get_directory_password(dir_path: str) -> Optional[str]:
"""获取目录密码哈希"""
- init_password_file()
- if not PASSWORD_FILE.exists():
- return None
- with open(PASSWORD_FILE, 'r') as f:
- data = json.load(f)
- return data.get(dir_path, {}).get("password_hash")
+ return _read_password_data().get(dir_path, {}).get("password_hash")
+
def check_directory_password(dir_path: str, password: str) -> bool:
"""验证目录密码"""
stored_hash = get_directory_password(dir_path)
if not stored_hash:
- return True
+ return True # 未设置密码,视为通过
return stored_hash == hash_password(password)
+
def get_protected_directories() -> List[str]:
"""获取所有受保护的目录"""
- init_password_file()
- with open(PASSWORD_FILE, 'r') as f:
- data = json.load(f)
- return list(data.keys())
+ return list(_read_password_data().keys())
+
def is_protected_directory(dir_path: str) -> bool:
- """检查目录是否受保护(修复路径匹配逻辑)"""
+ """检查目录(或其祖先)是否受保护"""
if not dir_path:
return False
protected_dirs = get_protected_directories()
- dir_path_normalized = dir_path.replace(os.sep, '/').rstrip('/')
- protected_dirs_normalized = [pdir.replace(os.sep, '/').rstrip('/') for pdir in protected_dirs]
-
- for pdir in protected_dirs_normalized:
- if dir_path_normalized == pdir or dir_path_normalized.startswith(f"{pdir}/"):
+ norm = dir_path.replace(os.sep, '/').rstrip('/')
+ for pdir in protected_dirs:
+ pnorm = pdir.replace(os.sep, '/').rstrip('/')
+ if norm == pnorm or norm.startswith(f"{pnorm}/"):
return True
return False
+
def get_top_protected_directory(dir_path: str) -> Optional[str]:
- """获取目录所属的顶级受保护目录(兼容Python 3.8-)"""
+ """获取目录所属的顶级受保护祖先目录"""
if not dir_path or not is_protected_directory(dir_path):
return None
-
- # 统一路径分隔符为/,便于匹配
- dir_path_normalized = dir_path.replace(os.sep, '/').rstrip('/')
+
+ norm = dir_path.replace(os.sep, '/').rstrip('/')
protected_dirs = get_protected_directories()
- protected_dirs_normalized = [pdir.replace(os.sep, '/').rstrip('/') for pdir in protected_dirs]
-
top_dir = None
- max_depth = -1
-
- for pdir, pdir_original in zip(protected_dirs_normalized, protected_dirs):
- if dir_path_normalized == pdir or dir_path_normalized.startswith(f"{pdir}/"):
- depth = pdir.count('/')
- if top_dir is None or depth < max_depth:
- max_depth = depth
- top_dir = pdir_original
-
+ min_depth = float('inf')
+
+ for pdir in protected_dirs:
+ pnorm = pdir.replace(os.sep, '/').rstrip('/')
+ if norm == pnorm or norm.startswith(f"{pnorm}/"):
+ depth = pnorm.count('/')
+ if depth < min_depth:
+ min_depth = depth
+ top_dir = pdir
+
return top_dir
-async def verify_dir_access(request: Request, dir_path: str) -> bool:
- """验证目录访问权限(简化逻辑,避免误拦截)"""
- if not dir_path or not is_protected_directory(dir_path):
- return True
-
- top_protected_dir = get_top_protected_directory(dir_path)
- if not top_protected_dir:
- return True
-
- # 使用安全的Cookie键名
+
+def _verify_cookie(request: Request, top_protected_dir: str) -> bool:
+ """检查 Cookie 是否匹配受保护目录的密码哈希"""
cookie_key = get_safe_cookie_key(top_protected_dir)
cookie_value = request.cookies.get(cookie_key)
stored_hash = get_directory_password(top_protected_dir)
-
- # 兼容Cookie不存在的情况
- if cookie_value and stored_hash and cookie_value == stored_hash:
- logger.info(f"目录访问验证通过: {dir_path} (Cookie认证)")
+ return bool(cookie_value and stored_hash and cookie_value == stored_hash)
+
+
+async def check_dir_access(dir_path: str, request: Request) -> bool:
+ """检查目录访问权限(统一入口)"""
+ if not dir_path:
return True
-
- logger.warning(f"目录访问验证失败: {dir_path} (缺少有效Cookie)")
- return False
+ top_protected_dir = get_top_protected_directory(dir_path)
+ if not top_protected_dir:
+ return True
+ result = _verify_cookie(request, top_protected_dir)
+ if result:
+ logger.info(f"目录访问验证通过: {dir_path}")
+ else:
+ logger.warning(f"目录访问验证失败: {dir_path}")
+ return result
-# 根路径返回前端页面
-@app.get("/", response_class=HTMLResponse)
-async def read_root():
- return FileResponse(str(Path(APP_DIR) / "static" / "index.html"))
-# 安全检查路径(兼容Python 3.8及以下版本)
+# ── 路径安全辅助 ────────────────────────────────────────────────────────────────
+
def safe_join(base: Path, *paths) -> Path:
+ """安全拼接路径,防止路径穿越攻击"""
try:
- decoded_paths = [urllib.parse.unquote(path) for path in paths]
- joined_path = base.joinpath(*decoded_paths).resolve()
- joined_path.relative_to(base)
- return joined_path
+ decoded_paths = [urllib.parse.unquote(p) for p in paths]
+ joined = base.joinpath(*decoded_paths).resolve()
+ joined.relative_to(base) # 若越界则抛 ValueError
+ return joined
except ValueError:
- logger.error(f"路径越权:{joined_path} 不在 {base} 范围内")
raise HTTPException(status_code=403, detail="无效路径(越权访问)")
except Exception as e:
- logger.error(f"Path security check failed: {e}")
+ logger.error(f"路径安全检查失败: {e}")
raise HTTPException(status_code=403, detail="Invalid path")
-# 获取目录结构
+
+# ── 自然排序辅助 ────────────────────────────────────────────────────────────────
+
+import re
+
+def _natural_sort_key(s: str):
+ """将字符串拆分为文本/数字段,用于自然排序(1, 2, 10 而不是 1, 10, 2)"""
+ parts = re.split(r'(\d+)', s.lower())
+ return [int(p) if p.isdigit() else p for p in parts]
+
+
+# ── API 路由 ────────────────────────────────────────────────────────────────────
+
+@app.get("/", response_class=HTMLResponse)
+async def read_root():
+ index_path = Path(APP_DIR) / "static" / "index.html"
+ if not index_path.exists():
+ raise HTTPException(status_code=404, detail="前端页面未找到")
+ return FileResponse(str(index_path))
+
+
@app.get("/api/directories")
async def get_directories():
- dirs = []
- protected_dirs = get_protected_directories()
-
- def traverse_recursive_dirs(path: Path, rel_path: str = "") -> List[Dict]:
+ def traverse(path: Path, rel_path: str = "") -> List[Dict]:
items = []
try:
- for dir in path.iterdir():
- if dir.is_dir() and not dir.name.startswith('.'):
- sub_rel = f"{rel_path}/{dir.name}" if rel_path else dir.name
- is_protected = is_protected_directory(sub_rel)
+ for d in sorted(path.iterdir(), key=lambda x: _natural_sort_key(x.name)):
+ if d.is_dir() and not d.name.startswith('.'):
+ sub_rel = f"{rel_path}/{d.name}" if rel_path else d.name
items.append({
- "name": dir.name,
+ "name": d.name,
"path": sub_rel,
"type": "directory",
- "protected": is_protected,
- "children": traverse_recursive_dirs(dir, sub_rel)
+ "protected": is_protected_directory(sub_rel),
+ "children": traverse(d, sub_rel)
})
+ except PermissionError:
+ logger.warning(f"目录无读取权限: {path}")
except Exception as e:
- logger.error(f"Directory traversal error: {e}")
+ logger.error(f"目录遍历错误: {e}")
return items
-
- if VIDEO_ROOT.exists():
- dirs = traverse_recursive_dirs(VIDEO_ROOT)
+
+ dirs = traverse(VIDEO_ROOT) if VIDEO_ROOT.exists() else []
return {"directories": dirs}
+@app.get("/api/all-directories")
+async def get_all_directories():
+ all_dirs = []
+
+ def traverse(path: Path, rel_path: str = ""):
+ all_dirs.append({
+ "name": rel_path if rel_path else "主目录",
+ "path": rel_path,
+ "protected": is_protected_directory(rel_path)
+ })
+ try:
+ for d in sorted(path.iterdir(), key=lambda x: _natural_sort_key(x.name)):
+ if d.is_dir() and not d.name.startswith('.'):
+ sub_rel = f"{rel_path}/{d.name}" if rel_path else d.name
+ traverse(d, sub_rel)
+ except PermissionError:
+ logger.warning(f"目录无读取权限: {path}")
+ except Exception as e:
+ logger.error(f"目录遍历错误: {e}")
+
+ if VIDEO_ROOT.exists():
+ traverse(VIDEO_ROOT)
+ return {"directories": all_dirs}
+
+
+@app.get("/api/protected-directories")
+async def get_protected_dirs():
+ return {"protected_dirs": get_protected_directories()}
+
+
@app.post("/api/verify-dir-password")
async def verify_dir_password(dir_path: str = Form(...), password: str = Form(...)):
try:
top_protected_dir = get_top_protected_directory(dir_path)
if not top_protected_dir:
return {"success": True, "message": "目录不受保护"}
-
+
if check_directory_password(top_protected_dir, password):
cookie_key = get_safe_cookie_key(top_protected_dir)
response = JSONResponse({"success": True, "message": "密码正确"})
response.set_cookie(
key=cookie_key,
value=hash_password(password),
- max_age=3600,
+ max_age=3600 * 8, # 8小时(原来1小时,延长减少重复验证)
httponly=True,
- secure=False,
+ secure=False, # LAN 部署通常无 HTTPS
samesite="lax"
)
logger.info(f"目录密码验证成功: {top_protected_dir}")
@@ -298,48 +357,43 @@ async def verify_dir_password(dir_path: str = Form(...), password: str = Form(..
logger.warning(f"目录密码验证失败: {top_protected_dir}")
return {"success": False, "message": "密码错误"}
except Exception as e:
- logger.error(f"Password verification error: {e}")
+ logger.error(f"密码验证异常: {e}")
return {"success": False, "message": f"验证失败: {str(e)}"}
-async def check_dir_access(dir_path: str, request: Request) -> bool:
- """检查目录访问权限"""
- if not dir_path:
- return True
-
- top_protected_dir = get_top_protected_directory(dir_path)
- if not top_protected_dir:
- return True
-
- cookie_key = get_safe_cookie_key(top_protected_dir)
- cookie_value = request.cookies.get(cookie_key)
- stored_hash = get_directory_password(top_protected_dir)
-
- if cookie_value and cookie_value == stored_hash:
- return True
-
- return False
+
+@app.post("/api/clear-dir-auth")
+async def clear_dir_auth(dir_path: str = Form(...)):
+ try:
+ top_protected_dir = get_top_protected_directory(dir_path)
+ if not top_protected_dir:
+ return {"success": True, "message": "目录不受保护"}
+
+ cookie_key = get_safe_cookie_key(top_protected_dir)
+ response = JSONResponse({"success": True, "message": "已清除访问权限"})
+ response.delete_cookie(cookie_key)
+ return response
+ except Exception as e:
+ logger.error(f"清除认证失败: {e}")
+ return {"success": False, "message": f"清除失败: {str(e)}"}
+
@app.get("/api/media")
async def get_media(subdir: Optional[str] = None, request: Request = None):
try:
if subdir and not await check_dir_access(subdir, request):
return {
- "media": [],
- "current_dir": subdir or "",
+ "media": [],
+ "current_dir": subdir or "",
"protected": True,
"top_protected_dir": get_top_protected_directory(subdir)
}
-
- if subdir and subdir.strip():
- target_dir = safe_join(VIDEO_ROOT, subdir.strip())
- else:
- target_dir = VIDEO_ROOT
-
+
+ target_dir = safe_join(VIDEO_ROOT, subdir.strip()) if subdir and subdir.strip() else VIDEO_ROOT
+
if not target_dir.exists() or not target_dir.is_dir():
return {"media": [], "current_dir": subdir or ""}
-
- media = []
+ media = []
for file in target_dir.iterdir():
if file.is_file():
ext = file.suffix.lower()
@@ -350,33 +404,35 @@ async def get_media(subdir: Optional[str] = None, request: Request = None):
file_type = "audio"
else:
file_type = "image"
-
+ stat = file.stat()
media.append({
"name": file.name,
"type": file_type,
"extension": ext,
- "size": file.stat().st_size,
- "modified": file.stat().st_mtime,
+ "size": stat.st_size,
+ "modified": stat.st_mtime,
"path": str(file)
})
-
- # 按文件名自然排序
- media.sort(key=lambda x: (len(x["name"]), x["name"]))
+
+ # 自然排序(1, 2, 10 顺序,而非 1, 10, 2)
+ media.sort(key=lambda x: _natural_sort_key(x["name"]))
logger.info(f"Found {len(media)} media files in {target_dir}")
-
+
return {
"media": media,
"current_dir": subdir or "",
"protected": is_protected_directory(subdir or ""),
"top_protected_dir": get_top_protected_directory(subdir or "")
}
+ except HTTPException:
+ raise
except Exception as e:
- logger.error(f"Error getting media list: {e}")
+ logger.error(f"获取媒体列表失败: {e}")
return {"media": [], "current_dir": subdir or "", "error": str(e)}
-# 编码文件名用于HTTP头
+
def encode_filename_for_header(filename: str) -> str:
- """编码文件名以支持中文等特殊字符"""
+ """编码文件名以支持中文等非 ASCII 字符"""
try:
filename.encode('ascii')
return filename
@@ -389,204 +445,139 @@ async def serve_media(path: str, request: Request):
try:
decoded_path = urllib.parse.unquote(path)
full_media_path = safe_join(VIDEO_ROOT, decoded_path)
- media_dir = path_relative_to(full_media_path.parent, VIDEO_ROOT) if path_is_relative_to(full_media_path.parent, VIDEO_ROOT) else str(full_media_path.parent)
-
- if is_protected_directory(media_dir) and not await verify_dir_access(request, media_dir):
+ media_dir = (
+ path_relative_to(full_media_path.parent, VIDEO_ROOT)
+ if path_is_relative_to(full_media_path.parent, VIDEO_ROOT)
+ else str(full_media_path.parent)
+ )
+
+ if is_protected_directory(media_dir) and not await check_dir_access(media_dir, request):
raise HTTPException(status_code=403, detail="需要密码访问")
-
+
if not full_media_path.exists() or not full_media_path.is_file():
- logger.warning(f"Media file not found: {full_media_path}")
- return JSONResponse(
- status_code=404,
- content={"error": "Media file not found"}
- )
-
+ return JSONResponse(status_code=404, content={"error": "Media file not found"})
+
ext = full_media_path.suffix.lower()
if ext not in SUPPORTED_EXTENSIONS:
- return JSONResponse(
- status_code=400,
- content={"error": f"Unsupported format: {ext}"}
- )
-
+ return JSONResponse(status_code=400, content={"error": f"Unsupported format: {ext}"})
+
mime_type = SUPPORTED_FORMATS.get(ext, "application/octet-stream")
-
- # 处理图片
+ filename = full_media_path.name
+ encoded_filename = encode_filename_for_header(filename)
+ content_disp = f'inline; filename="{encoded_filename}"; filename*=UTF-8\'\'{encoded_filename}'
+
+ # 图片:直接返回
if ext in SUPPORTED_IMAGE_FORMATS:
logger.info(f"Serving image: {full_media_path}")
-
- # 处理中文文件名的HTTP头
- filename = full_media_path.name
- encoded_filename = encode_filename_for_header(filename)
-
- headers = {
- "Cache-Control": "max-age=3600",
- "Content-Disposition": f"inline; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}"
- }
-
return FileResponse(
path=str(full_media_path),
media_type=mime_type,
- filename=encoded_filename,
- headers=headers
+ headers={"Cache-Control": "max-age=3600", "Content-Disposition": content_disp}
)
-
- # 处理音频
- elif ext in SUPPORTED_AUDIO_FORMATS:
+
+ # 音频:直接返回(浏览器原生断点续传)
+ if ext in SUPPORTED_AUDIO_FORMATS:
logger.info(f"Serving audio: {full_media_path}")
-
- # 处理中文文件名的HTTP头
- filename = full_media_path.name
- encoded_filename = encode_filename_for_header(filename)
-
- headers = {
- "Content-Disposition": f"inline; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}"
- }
-
return FileResponse(
path=str(full_media_path),
media_type=mime_type,
- filename=encoded_filename,
- headers=headers
+ headers={"Content-Disposition": content_disp, "Accept-Ranges": "bytes"}
)
-
- # 视频处理断点续传
+
+ # 视频:手动处理 Range 断点续传
file_size = full_media_path.stat().st_size
range_header = request.headers.get("Range")
-
+
+ start, end = 0, min(1024 * 1024 * 2, file_size - 1) # 默认前 2 MB
if range_header:
try:
range_str = range_header.split("=")[-1]
start_str, end_str = range_str.split("-")
start = int(start_str) if start_str else 0
end = int(end_str) if end_str else file_size - 1
- end = min(end, file_size - 1)
start = max(0, start)
- except:
- start = 0
- end = min(1024*1024*2, file_size - 1)
- else:
- start = 0
- end = min(1024*1024*2, file_size - 1)
-
- # 异步分块读取
+ end = min(end, file_size - 1)
+ except (ValueError, IndexError):
+ start, end = 0, min(1024 * 1024 * 2, file_size - 1)
+
async def iterfile():
async with aiofiles.open(str(full_media_path), 'rb') as f:
await f.seek(start)
remaining = end - start + 1
while remaining > 0:
- chunk_size = min(1024*1024, remaining)
- chunk = await f.read(chunk_size)
+ chunk = await f.read(min(1024 * 1024, remaining))
if not chunk:
break
yield chunk
- remaining -= chunk_size
-
- # 处理视频文件名
- filename = full_media_path.name
- encoded_filename = encode_filename_for_header(filename)
-
+ remaining -= len(chunk)
+
headers = {
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(end - start + 1),
"Content-Type": mime_type,
- "Content-Disposition": f"inline; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}"
+ "Content-Disposition": content_disp,
}
-
logger.info(f"Serving video: {full_media_path} (bytes {start}-{end}/{file_size})")
- return StreamingResponse(
- iterfile(),
- status_code=206,
- headers=headers,
- media_type=mime_type
- )
-
- except HTTPException as e:
+ return StreamingResponse(iterfile(), status_code=206, headers=headers, media_type=mime_type)
+
+ except HTTPException:
raise
except Exception as e:
- logger.error(f"Error serving media: {e}")
- return JSONResponse(
- status_code=500,
- content={"error": f"Server error: {str(e)}"}
- )
-
-# 获取所有目录路径
-@app.get("/api/all-directories")
-async def get_all_directories():
- all_dirs = []
-
- def traverse_all_dirs(path: Path, rel_path: str = ""):
- try:
- all_dirs.append({
- "name": rel_path if rel_path else "主目录",
- "path": rel_path,
- "protected": is_protected_directory(rel_path)
- })
- for dir in path.iterdir():
- if dir.is_dir() and not dir.name.startswith('.'):
- sub_rel = f"{rel_path}/{dir.name}" if rel_path else dir.name
- traverse_all_dirs(dir, sub_rel)
- except Exception as e:
- logger.error(f"Error traversing all directories: {e}")
-
- if VIDEO_ROOT.exists():
- traverse_all_dirs(VIDEO_ROOT)
- return {"directories": all_dirs}
+ logger.error(f"媒体文件服务失败: {e}")
+ return JSONResponse(status_code=500, content={"error": f"Server error: {str(e)}"})
@app.post("/api/create-directory")
async def create_directory(
- target_path: str = Form(""),
+ target_path: str = Form(""),
new_dir: str = Form(...),
dir_password: Optional[str] = Form(None)
):
try:
- if not new_dir or new_dir.strip() == "":
+ if not new_dir or not new_dir.strip():
raise HTTPException(status_code=400, detail="目录名不能为空")
-
- # 安全路径拼接
- if target_path and target_path.strip():
- parent_dir = safe_join(VIDEO_ROOT, target_path.strip())
- else:
- parent_dir = VIDEO_ROOT
-
- # 新增:检查父目录是否存在且可写
+
+ new_dir = new_dir.strip()
+
+ invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\0']
+ if any(c in new_dir for c in invalid_chars):
+ raise HTTPException(status_code=400, detail="目录名包含非法字符(/\\:*?\"<>|)")
+
+ parent_dir = safe_join(VIDEO_ROOT, target_path.strip()) if target_path and target_path.strip() else VIDEO_ROOT
+
if not parent_dir.exists():
raise HTTPException(status_code=404, detail=f"父目录不存在: {parent_dir}")
if not os.access(parent_dir, os.W_OK):
- raise HTTPException(status_code=403, detail=f"父目录无写入权限: {parent_dir}")
-
- new_dir_path = parent_dir / new_dir.strip()
- new_dir_rel_path = path_relative_to(new_dir_path, VIDEO_ROOT) if path_is_relative_to(new_dir_path, VIDEO_ROOT) else str(new_dir_path)
-
- # 检查目录名合法性
- invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
- if any(char in new_dir for char in invalid_chars):
- raise HTTPException(status_code=400, detail="目录名包含非法字符(/\:*?\"<>|)")
-
- # 新增:检查目录是否已存在
+ raise HTTPException(status_code=403, detail="父目录无写入权限")
+
+ new_dir_path = parent_dir / new_dir
if new_dir_path.exists():
- raise HTTPException(status_code=409, detail=f"目录已存在: {new_dir_path.name}")
-
- # 创建目录(增强异常捕获)
+ raise HTTPException(status_code=409, detail=f"目录已存在: {new_dir}")
+
try:
- new_dir_path.mkdir(parents=True, exist_ok=False)
+ new_dir_path.mkdir(parents=False, exist_ok=False)
except PermissionError:
- raise HTTPException(status_code=403, detail=f"创建目录失败:权限不足({new_dir_path})")
+ raise HTTPException(status_code=403, detail="创建目录失败:权限不足")
except Exception as e:
raise HTTPException(status_code=500, detail=f"创建目录失败:{str(e)}")
-
- # 设置密码保护
+
+ new_dir_rel = (
+ path_relative_to(new_dir_path, VIDEO_ROOT)
+ if path_is_relative_to(new_dir_path, VIDEO_ROOT)
+ else str(new_dir_path)
+ )
+
if dir_password and dir_password.strip():
- save_directory_password(new_dir_rel_path, dir_password.strip())
+ save_directory_password(new_dir_rel, dir_password.strip())
logger.info(f"带密码保护的目录创建成功: {new_dir_path}")
else:
logger.info(f"目录创建成功: {new_dir_path}")
-
+
return {
- "success": True,
- "message": f"目录创建成功: {new_dir_path.name}" + ("(已设置密码保护)" if dir_password else ""),
- "path": new_dir_rel_path,
+ "success": True,
+ "message": f"目录创建成功: {new_dir}" + ("(已设置密码保护)" if dir_password else ""),
+ "path": new_dir_rel,
"protected": bool(dir_password)
}
except HTTPException:
@@ -595,137 +586,185 @@ async def create_directory(
logger.error(f"创建目录异常: {e}", exc_info=True)
return {"success": False, "message": f"创建失败: {str(e)}"}
+
@app.post("/api/upload-media")
async def upload_media(
request: Request,
target_dir: str = Form(""),
file: UploadFile = File(...)
):
+ file_path = None
try:
- logger.info(f"开始处理上传请求 - 目标目录: {target_dir}, 文件名: {file.filename}")
-
- if is_protected_directory(target_dir) and not await verify_dir_access(request, target_dir):
- logger.warning(f"加密目录上传权限拒绝: {target_dir}")
+ logger.info(f"上传请求 - 目标目录: {target_dir}, 文件: {file.filename}")
+
+ if is_protected_directory(target_dir) and not await check_dir_access(target_dir, request):
return {"success": False, "message": "无权访问该目录,请先验证密码"}
-
+
if not file or not file.filename:
- logger.warning("上传失败:未选择文件")
return {"success": False, "message": "未选择文件"}
-
+
filename = file.filename
file_ext = Path(filename).suffix.lower()
if file_ext not in SUPPORTED_EXTENSIONS:
- logger.warning(f"上传失败:不支持的文件格式 {file_ext}")
return {
- "success": False,
+ "success": False,
"message": f"不支持的文件格式: {file_ext},支持的格式: {', '.join(SUPPORTED_EXTENSIONS)}"
}
-
- if target_dir.strip():
- upload_dir = safe_join(VIDEO_ROOT, target_dir.strip())
- else:
- upload_dir = VIDEO_ROOT
-
+
+ upload_dir = safe_join(VIDEO_ROOT, target_dir.strip()) if target_dir.strip() else VIDEO_ROOT
os.makedirs(upload_dir, exist_ok=True)
- logger.info(f"上传目录已确认: {upload_dir}")
-
+
+ # 文件名去重
file_path = upload_dir / filename
counter = 1
while file_path.exists():
stem = Path(filename).stem
- new_filename = f"{stem}_{counter}{file_ext}"
- file_path = upload_dir / new_filename
+ file_path = upload_dir / f"{stem}_{counter}{file_ext}"
counter += 1
-
+
+ # 写入文件(先写临时文件,成功后原子移动)
+ tmp_fd, tmp_name = tempfile.mkstemp(dir=upload_dir)
try:
- async with aiofiles.open(str(file_path), 'wb') as f:
- content_length = 0
- while chunk := await file.read(1024 * 1024):
+ content_length = 0
+ async with aiofiles.open(tmp_name, 'wb') as f:
+ while True:
+ chunk = await file.read(1024 * 1024)
+ if not chunk:
+ break
await f.write(chunk)
content_length += len(chunk)
-
- if not file_path.exists():
- raise Exception("文件保存失败:文件不存在")
- if file_path.stat().st_size != content_length:
- logger.warning(f"文件大小不一致 - 预期: {content_length}, 实际: {file_path.stat().st_size}")
-
- # 确定文件类型
- if file_ext in SUPPORTED_VIDEO_FORMATS:
- file_type = "视频"
- elif file_ext in SUPPORTED_AUDIO_FORMATS:
- file_type = "音频"
- else:
- file_type = "图片"
-
- logger.info(f"文件上传成功: {file_path} ({file_type}, {file_path.stat().st_size} bytes)")
-
- return {
- "success": True,
- "message": f"{file_type}文件 {file_path.name} 上传成功",
- "filename": file_path.name,
- "path": target_dir,
- "size": file_path.stat().st_size
- }
-
+
+ os.close(tmp_fd)
+ shutil.move(tmp_name, str(file_path))
except Exception as e:
- # 清理不完整文件
- if file_path.exists() and file_path.stat().st_size == 0:
- file_path.unlink()
- logger.warning(f"清理空文件: {file_path}")
+ os.close(tmp_fd)
+ if os.path.exists(tmp_name):
+ os.unlink(tmp_name)
raise Exception(f"保存文件失败: {str(e)}")
-
- except Exception as e:
- logger.error(f"上传失败: {str(e)}")
+
+ actual_size = file_path.stat().st_size
+ if actual_size != content_length:
+ logger.warning(f"文件大小不一致 - 写入: {content_length}, 磁盘: {actual_size}")
+
+ file_type = "视频" if file_ext in SUPPORTED_VIDEO_FORMATS else ("音频" if file_ext in SUPPORTED_AUDIO_FORMATS else "图片")
+ logger.info(f"上传成功: {file_path} ({file_type}, {actual_size} bytes)")
+
return {
- "success": False,
- "message": f"上传失败: {str(e)}"
+ "success": True,
+ "message": f"{file_type}文件 {file_path.name} 上传成功",
+ "filename": file_path.name,
+ "path": target_dir,
+ "size": actual_size
}
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ # 清理可能已创建的残留文件
+ if file_path and isinstance(file_path, Path) and file_path.exists() and file_path.stat().st_size == 0:
+ file_path.unlink(missing_ok=True)
+ logger.error(f"上传失败: {e}")
+ return {"success": False, "message": f"上传失败: {str(e)}"}
finally:
- # 确保文件句柄关闭
try:
await file.close()
- except Exception as e:
- logger.error(f"关闭文件句柄失败: {e}")
+ except Exception:
+ pass
-@app.post("/api/clear-dir-auth")
-async def clear_dir_auth(dir_path: str = Form(...)):
+
+@app.delete("/api/delete-file")
+async def delete_file(request: Request, file_path: str):
+ """删除媒体文件"""
try:
- top_protected_dir = get_top_protected_directory(dir_path)
- if not top_protected_dir:
- return {"success": True, "message": "目录不受保护"}
-
- cookie_key = get_safe_cookie_key(top_protected_dir)
- response = JSONResponse({"success": True, "message": "已清除访问权限"})
- response.delete_cookie(cookie_key)
-
- return response
+ full_path = safe_join(VIDEO_ROOT, urllib.parse.unquote(file_path))
+ media_dir = (
+ path_relative_to(full_path.parent, VIDEO_ROOT)
+ if path_is_relative_to(full_path.parent, VIDEO_ROOT)
+ else str(full_path.parent)
+ )
+
+ if is_protected_directory(media_dir) and not await check_dir_access(media_dir, request):
+ raise HTTPException(status_code=403, detail="需要密码访问")
+
+ if not full_path.exists():
+ raise HTTPException(status_code=404, detail="文件不存在")
+ if not full_path.is_file():
+ raise HTTPException(status_code=400, detail="路径不是文件")
+
+ full_path.unlink()
+ logger.info(f"文件删除成功: {full_path}")
+ return {"success": True, "message": f"文件 {full_path.name} 删除成功"}
+ except HTTPException:
+ raise
except Exception as e:
- logger.error(f"Clear auth error: {e}")
- return {"success": False, "message": f"清除失败: {str(e)}"}
+ logger.error(f"删除文件失败: {e}")
+ return {"success": False, "message": f"删除失败: {str(e)}"}
-@app.get("/api/protected-directories")
-async def get_protected_dirs():
- return {"protected_dirs": get_protected_directories()}
+@app.delete("/api/delete-directory")
+async def delete_directory(request: Request, dir_path: str):
+ """删除目录(仅允许删除空目录)"""
+ try:
+ full_path = safe_join(VIDEO_ROOT, urllib.parse.unquote(dir_path))
+ rel_path = (
+ path_relative_to(full_path, VIDEO_ROOT)
+ if path_is_relative_to(full_path, VIDEO_ROOT)
+ else str(full_path)
+ )
+
+ if not full_path.exists():
+ raise HTTPException(status_code=404, detail="目录不存在")
+ if not full_path.is_dir():
+ raise HTTPException(status_code=400, detail="路径不是目录")
+ if full_path == VIDEO_ROOT:
+ raise HTTPException(status_code=403, detail="不允许删除根目录")
+
+ # 检查目录是否为空
+ if any(full_path.iterdir()):
+ raise HTTPException(status_code=409, detail="目录不为空,请先删除其中的文件")
+
+ full_path.rmdir()
+
+ # 同步清理密码记录
+ data = _read_password_data()
+ if rel_path in data:
+ del data[rel_path]
+ PASSWORD_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
+
+ logger.info(f"目录删除成功: {full_path}")
+ return {"success": True, "message": f"目录 {full_path.name} 删除成功"}
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"删除目录失败: {e}")
+ return {"success": False, "message": f"删除失败: {str(e)}"}
+
+
+# ── 启动入口 ────────────────────────────────────────────────────────────────────
def create_listen_sockets(port: int) -> list:
sockets = []
- # ===== IPv4 =====
+ # IPv4
sock4 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ try:
+ sock4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ except (AttributeError, OSError):
+ pass # SO_REUSEPORT 并非所有平台都支持
sock4.bind(("0.0.0.0", port))
sock4.listen(2048)
sockets.append(sock4)
logger.info(f"IPv4 监听: 0.0.0.0:{port}")
- # ===== IPv6 =====
+ # IPv6(可选)
try:
sock6 = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- # 关键:必须是 1
+ try:
+ sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ except (AttributeError, OSError):
+ pass
sock6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
sock6.bind(("::", port))
sock6.listen(2048)
@@ -736,6 +775,7 @@ def create_listen_sockets(port: int) -> list:
return sockets
+
def main():
init_password_file()
import uvicorn
@@ -747,10 +787,9 @@ def main():
access_log=False,
timeout_keep_alive=30
)
-
server = uvicorn.Server(config)
server.run(sockets=sockets)
+
if __name__ == "__main__":
main()
-
diff --git a/nas-media-player.spec b/nas-media-player.spec
new file mode 100644
index 0000000..68991f9
--- /dev/null
+++ b/nas-media-player.spec
@@ -0,0 +1,140 @@
+# nas-media-player.spec
+# 使用方法:pyinstaller nas-media-player.spec
+
+import sys
+from pathlib import Path
+
+block_cipher = None
+
+a = Analysis(
+ ['nas-media-player.py'],
+ pathex=[],
+ binaries=[],
+ datas=[],
+ hiddenimports=[
+ # uvicorn 核心
+ 'uvicorn',
+ 'uvicorn.main',
+ 'uvicorn.config',
+ 'uvicorn.server',
+ 'uvicorn.loops',
+ 'uvicorn.loops.auto',
+ 'uvicorn.loops.asyncio',
+ 'uvicorn.protocols',
+ 'uvicorn.protocols.http',
+ 'uvicorn.protocols.http.auto',
+ 'uvicorn.protocols.http.h11_impl',
+ 'uvicorn.protocols.http.httptools_impl',
+ 'uvicorn.protocols.websockets',
+ 'uvicorn.protocols.websockets.auto',
+ 'uvicorn.protocols.websockets.websockets_impl',
+ 'uvicorn.protocols.websockets.wsproto_impl',
+ 'uvicorn.lifespan',
+ 'uvicorn.lifespan.off',
+ 'uvicorn.lifespan.on',
+ 'uvicorn.logging',
+ 'uvicorn.middleware',
+ 'uvicorn.middleware.asgi2',
+ 'uvicorn.middleware.message_logger',
+ 'uvicorn.middleware.proxy_headers',
+
+ # fastapi / starlette
+ 'fastapi',
+ 'fastapi.routing',
+ 'fastapi.middleware',
+ 'fastapi.middleware.cors',
+ 'fastapi.staticfiles',
+ 'fastapi.responses',
+ 'starlette',
+ 'starlette.routing',
+ 'starlette.middleware',
+ 'starlette.middleware.cors',
+ 'starlette.staticfiles',
+ 'starlette.responses',
+ 'starlette.background',
+ 'starlette.concurrency',
+ 'starlette.datastructures',
+ 'starlette.exceptions',
+ 'starlette.formparsers',
+ 'starlette.requests',
+ 'starlette.types',
+ 'starlette.websockets',
+
+ # HTTP 解析库(uvicorn 可选依赖,打包时都带上)
+ 'h11',
+ 'httptools',
+ 'anyio',
+ 'anyio._backends._asyncio',
+ 'anyio._backends._trio',
+ 'sniffio',
+
+ # aiofiles
+ 'aiofiles',
+ 'aiofiles.os',
+ 'aiofiles.threadpool',
+
+ # pydantic(fastapi 依赖)
+ 'pydantic',
+ 'pydantic.v1',
+ 'pydantic_core',
+
+ # 标准库补充
+ 'multipart',
+ 'python_multipart',
+ 'email.mime.multipart',
+ 'email.mime.text',
+
+ # 编码/哈希
+ 'hashlib',
+ 'hmac',
+
+ # 其他
+ 'click',
+ 'typing_extensions',
+ ],
+ hookspath=[],
+ hooksconfig={},
+ runtime_hooks=[],
+ excludes=[
+ # 排除不需要的大型库,减小体积
+ 'tkinter',
+ 'matplotlib',
+ 'numpy',
+ 'pandas',
+ 'PIL',
+ 'scipy',
+ 'IPython',
+ 'jupyter',
+ 'notebook',
+ 'test',
+ 'unittest',
+ ],
+ win_no_prefer_redirects=False,
+ win_private_assemblies=False,
+ cipher=block_cipher,
+ noarchive=False,
+)
+
+pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
+
+exe = EXE(
+ pyz,
+ a.scripts,
+ a.binaries,
+ a.zipfiles,
+ a.datas,
+ [],
+ name='nas-media-player', # 输出的二进制名
+ debug=False,
+ bootloader_ignore_signals=False,
+ strip=True, # strip 调试符号,减小体积
+ upx=True, # 若系统有 upx 则进一步压缩
+ upx_exclude=[],
+ runtime_tmpdir=None,
+ console=True,
+ disable_windowed_traceback=False,
+ argv_emulation=False,
+ target_arch=None,
+ codesign_identity=None,
+ entitlements_file=None,
+)
diff --git a/releases/README.md b/releases/README.md
deleted file mode 100644
index 57ca204..0000000
--- a/releases/README.md
+++ /dev/null
@@ -1,25 +0,0 @@
-## 打包二进制制作方法
-
-```
-apt update && apt install -y python3 python3-pip python3-dev gcc g++ make libffi-dev libssl-dev patchelf
-pip_select.sh
-pip3 install fastapi uvicorn aiofiles python-multipart pyinstaller
-
-```
-
-```
-pyinstaller --onefile --name=nas-media-player-armhf --distpath=dist --workpath=tmp --clean --exclude-module=tkinter --exclude-module=unittest --exclude-module=sqlite3 nas-media-player.py
-pyinstaller --onefile --name=nas-media-player-arm64 --distpath=dist --workpath=tmp --clean --exclude-module=tkinter --exclude-module=unittest --exclude-module=sqlite3 nas-media-player.py
-pyinstaller --onefile --name=nas-media-player-x86_64 --distpath=dist --workpath=tmp --clean --exclude-module=tkinter --exclude-module=unittest --exclude-module=sqlite3 nas-media-player.py
-
-~/.local/bin/pyinstaller --onefile --name=nas-media-player-x86_64 --distpath=dist --workpath=tmp --clean --exclude-module=tkinter --exclude-module=unittest --exclude-module=sqlite3 nas-media-player.py
-
-```
-
-```
-chmod +x dist/nas-media-player-x86_64
-
-./dist/nas-media-player-x86_64
-```
-
-
diff --git a/releases/nas-media-player-arm64 b/releases/nas-media-player-arm64
deleted file mode 100755
index 11cce91..0000000
Binary files a/releases/nas-media-player-arm64 and /dev/null differ
diff --git a/releases/nas-media-player-armhf b/releases/nas-media-player-armhf
deleted file mode 100755
index eabf4fd..0000000
Binary files a/releases/nas-media-player-armhf and /dev/null differ
diff --git a/releases/nas-media-player-x86_64 b/releases/nas-media-player-x86_64
deleted file mode 100755
index 318d380..0000000
Binary files a/releases/nas-media-player-x86_64 and /dev/null differ