从零部署 Emby 媒体服与用户管理
本文整理自一次完整的 Emby 搭建过程,覆盖 NAT VPS 与独立服务器两种场景。涉及到的 Token、API Key、域名、Telegram ID 请全部替换成你自己的值,不要直接复制示例里的占位符。
0. 最终目标
我们要搭建的是一套完整的个人媒体系统:
Google Drive / OpenList / OneDrive ↓ rclone 挂载/mnt/gdrive 或 /mnt/openlist ↓Emby 读取媒体库 ↓Caddy / Cloudflare 提供访问线路 ↓客户端播放、网页管理、用户自助注册、统计面板同时还会额外搭建一个 Telegram 上传机器人:
你把资源发给 Telegram 机器人 ↓服务器自动接收文件 ↓rclone 上传到网盘指定目录 ↓你整理命名后 Emby 扫库可选管理系统有两个方向:
| 方案 | 用途 | 适合人群 |
|---|---|---|
| EmbyPulse | Web 面板、统计、用户中心、播放排行、缺集等 | 想要图形化管理面板 |
| EmbyTGBot | Telegram 管理用户、注册码、续期、查询在线人数 | 想通过 TG 管理用户 |
WARNING不要把机器人 Token、Emby API Key、Google OAuth Secret、rclone.conf 公开到博客或 GitHub。
1. 服务器场景与端口规划
本文覆盖两种服务器。
1.1 NAT VPS 场景
假设只有少量端口,比如:
服务器 IP:176.123.6.17可用端口:48111-48119推荐规划:
48111 Emby 反代线路48112 EmbyPulse 管理后台48113 EmbyPulse 用户中心48114 Emby 直连播放线路48119 SSH,若服务商已经分配给 SSH 就不要占用NAT VPS 的重点是:服务商面板必须把外部端口映射到 VPS 内部端口。如果外部访问超时,第一优先排查端口映射。
1.2 独立服务器场景
假设服务器有独立公网 IP:
服务器 IP:147.224.40.215SSH 端口:22700已有占用端口示例:
80/tcp caddy443/tcp caddy22700/tcp sshd8001/tcp sing-box50501/tcp sing-box50503/udp sing-box50504/udp sing-box40823/udp sing-box5353/udp avahi-daemon / openclaw-gateway新增端口建议:
8096/tcp Emby 直连播放线路18080/tcp 可选 Web API,仅监听 127.0.0.118081/tcp 可选 Web 前端,仅监听 127.0.0.118082/tcp 本地 Telegram Bot API,仅监听 127.0.0.1独立服务器推荐线路:
https://emby.example.com Cloudflare 橙云线路,用于网页管理https://play.example.com DNS only 灰云线路,用于直连播放http://147.224.40.215:8096 IP 直连线路,用于调试或客户端播放IMPORTANT视频播放不建议长期走 Cloudflare 橙云线路。Cloudflare 线路适合网页访问、管理、登录;播放大文件建议走灰云直连域名或 IP:8096。
2. 系统初始化
以下命令以 Debian 12 为例。
ssh root@你的服务器IP -p 你的SSH端口更新系统并安装基础组件:
apt update && apt upgrade -yapt install -y ca-certificates curl gnupg lsb-release nano vim unzip fuse3 jq socat openssl githostnamectl set-timezone Asia/Shanghai检查端口占用:
ss -lntup3. 安装 Docker 和 Docker Compose
for pkg in docker.io docker-doc docker-compose podman-docker containerd runc; do apt remove -y "$pkg" || truedone
install -m 0755 -d /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.ascchmod a+r /etc/apt/keyrings/docker.asc
cat > /etc/apt/sources.list.d/docker.sources <<EOFTypes: debURIs: https://download.docker.com/linux/debianSuites: $(. /etc/os-release && echo "$VERSION_CODENAME")Components: stableArchitectures: $(dpkg --print-architecture)Signed-By: /etc/apt/keyrings/docker.ascEOF
apt updateapt install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
systemctl enable --now docker
docker versiondocker compose versiondocker run --rm hello-world创建 Docker 网络:
docker network create media-net 2>/dev/null || true4. 配置 rclone 与 Google Drive
4.1 安装 rclone
curl https://rclone.org/install.sh | bashrclone version允许 FUSE 挂载被 Docker 容器读取:
grep -qxF 'user_allow_other' /etc/fuse.conf || echo 'user_allow_other' >> /etc/fuse.conf创建目录:
mkdir -p /mnt/gdrivemkdir -p /var/cache/rclone-gdrivemkdir -p /root/.config/rclonechmod 700 /root/.config/rclone4.2 给 rclone 配自己的 Google Drive Client ID
不建议长期使用 rclone 默认共享 client_id,容易遇到 Google Drive API 限流:
googleapi: Error 403: Quota exceededRATE_LIMIT_EXCEEDED去 Google Cloud Console 创建项目:
1. 创建项目,例如 rclone-emby-upload2. APIs & Services -> Library3. 启用 Google Drive API4. Google Auth Platform / OAuth consent screen5. Data Access 添加 scope: https://www.googleapis.com/auth/drive6. Audience 添加自己的 Google 账号为测试用户,或发布到 In production7. Clients -> Create OAuth client8. Application type 选择 Desktop app9. 复制 Client ID 和 Client Secret然后在服务器配置:
rclone config按提示填写:
n) New remotename> gdriveStorage> driveclient_id> 你的 Client IDclient_secret> 你的 Client Secretscope> driveroot_folder_id> 直接回车service_account_file> 直接回车Edit advanced config? nUse web browser to automatically authenticate rclone with remote? n如果服务器没有浏览器,rclone 会要求你在本地电脑执行:
rclone authorize "drive" "一大串配置"如果本地网络访问 Google 有问题,PowerShell 先设置代理:
$env:HTTP_PROXY="http://127.0.0.1:7890"$env:HTTPS_PROXY="http://127.0.0.1:7890"授权成功后,把 JSON token 粘贴回服务器。
测试:
rclone about gdrive:rclone lsd gdrive:创建媒体目录:
rclone mkdir "gdrive:媒体/电影"rclone mkdir "gdrive:媒体/电视剧"rclone mkdir "gdrive:媒体/动漫"rclone mkdir "gdrive:TelegramUploads/待整理"4.3 挂载 Google Drive
创建 systemd 服务:
cat > /etc/systemd/system/rclone-gdrive.service <<'EOF'[Unit]Description=Rclone mount Google Drive for EmbyWants=network-online.targetAfter=network-online.target
[Service]Type=simpleExecStartPre=/bin/mkdir -p /mnt/gdrive /var/cache/rclone-gdriveExecStart=/usr/bin/rclone mount gdrive: /mnt/gdrive \ --config=/root/.config/rclone/rclone.conf \ --allow-other \ --read-only \ --uid=1000 \ --gid=1000 \ --umask=002 \ --dir-cache-time=72h \ --poll-interval=15s \ --vfs-cache-mode=full \ --vfs-cache-max-size=50G \ --vfs-cache-max-age=6h \ --buffer-size=64M \ --cache-dir=/var/cache/rclone-gdriveExecStop=/usr/bin/fusermount3 -uz /mnt/gdriveRestart=on-failureRestartSec=10
[Install]WantedBy=multi-user.targetEOF启动:
systemctl daemon-reloadsystemctl enable --now rclone-gdrivesystemctl status rclone-gdrive --no-pagerls -la /mnt/gdrive5. 可选:通过 OpenList WebDAV 挂载网盘
如果你已经搭好了 OpenList,可以通过 WebDAV 挂到本地。
OpenList WebDAV 地址格式:
https://openlist.example.com/dav/配置 rclone:
rclone config示例:
n) New remotename> openlistStorage> webdavurl> https://openlist.example.com/dav/vendor> otheruser> OpenList 用户名password> OpenList 密码bearer_token> 直接回车测试:
rclone lsd openlist:创建挂载:
mkdir -p /mnt/openlistmkdir -p /var/cache/rclone-openlist创建 systemd:
cat > /etc/systemd/system/rclone-openlist.service <<'EOF'[Unit]Description=Rclone mount OpenList WebDAV for EmbyWants=network-online.targetAfter=network-online.target
[Service]Type=simpleExecStartPre=/bin/mkdir -p /mnt/openlist /var/cache/rclone-openlistExecStart=/usr/bin/rclone mount openlist: /mnt/openlist \ --config=/root/.config/rclone/rclone.conf \ --allow-other \ --read-only \ --uid=1000 \ --gid=1000 \ --umask=002 \ --dir-cache-time=72h \ --poll-interval=30s \ --vfs-cache-mode=full \ --vfs-cache-max-size=20G \ --vfs-cache-max-age=6h \ --buffer-size=32M \ --cache-dir=/var/cache/rclone-openlistExecStop=/usr/bin/fusermount3 -uz /mnt/openlistRestart=on-failureRestartSec=10
[Install]WantedBy=multi-user.targetEOF启动:
systemctl daemon-reloadsystemctl enable --now rclone-openlistls -la /mnt/openlist6. 部署 Emby
6.1 判断服务器架构
uname -mdocker info | grep -i architecture如果是 x86_64 / amd64,用:
emby/embyserver:latest如果是 aarch64 / arm64,用:
emby/embyserver_arm64v8:latestWARNINGARM64 服务器如果误用 amd64 镜像,会出现
exec /init: exec format error。
6.2 创建 Compose
mkdir -p /opt/media-stack/embymkdir -p /opt/media/emby/configmkdir -p /opt/media/emby/transcodechown -R 1000:1000 /opt/media/embyamd64 示例:
cat > /opt/media-stack/emby/docker-compose.yml <<'EOF'services: emby: image: emby/embyserver:latest container_name: embyserver restart: unless-stopped ports: - "8096:8096" environment: UID: "1000" GID: "1000" GIDLIST: "1000" TZ: Asia/Shanghai volumes: - /opt/media/emby/config:/config - /opt/media/emby/transcode:/transcode - type: bind source: /mnt/gdrive target: /mnt/gdrive read_only: true bind: propagation: rslave networks: - media-net
networks: media-net: external: trueEOFARM64 改成:
image: emby/embyserver_arm64v8:latestplatform: linux/arm64/v8启动:
cd /opt/media-stack/embydocker compose pulldocker compose up -ddocker logs --tail=100 embyservercurl -I http://127.0.0.1:8096让 Docker 尽量等 rclone 挂载后启动:
mkdir -p /etc/systemd/system/docker.service.d
cat > /etc/systemd/system/docker.service.d/override.conf <<'EOF'[Unit]Wants=rclone-gdrive.serviceAfter=rclone-gdrive.serviceEOF
systemctl daemon-reload7. Caddy、Cloudflare 与播放线路
7.1 独立服务器推荐配置
Cloudflare DNS:
A emby 服务器IP Proxied / 橙云A play 服务器IP DNS only / 灰云Caddy 配置:
mkdir -p /etc/caddy/conf.dcp /etc/caddy/Caddyfile /etc/caddy/Caddyfile.bak.$(date +%F-%H%M%S)
grep -q 'import /etc/caddy/conf.d/\*.caddy' /etc/caddy/Caddyfile || \ printf '\nimport /etc/caddy/conf.d/*.caddy\n' >> /etc/caddy/Caddyfile
cat > /etc/caddy/conf.d/emby.caddy <<'EOF'emby.example.com { encode zstd gzip reverse_proxy 127.0.0.1:8096}
play.example.com { encode zstd gzip reverse_proxy 127.0.0.1:8096}EOF
caddy fmt --overwrite /etc/caddy/Caddyfile /etc/caddy/conf.d/emby.caddycaddy validate --config /etc/caddy/Caddyfilesystemctl reload caddy最终入口:
Cloudflare 代理线路:https://emby.example.com
直连 HTTPS 线路:https://play.example.com
IP 直连线路:http://服务器IP:80967.2 NAT VPS 推荐配置
如果只有高端口:
48111 -> Emby 反代线路48114 -> Emby 直连播放线路Caddyfile 示例:
{ auto_https off}
:48111 { encode gzip reverse_proxy 127.0.0.1:8096}直连播放线路可以用服务商面板直接配置:
外部 48114 -> 内部 8096如果服务商只能同端口映射,则用 socat:
apt install -y socat
cat > /etc/systemd/system/emby-direct-48114.service <<'EOF'[Unit]Description=Direct TCP forward 48114 to Emby 8096After=network-online.target docker.serviceWants=network-online.target
[Service]ExecStart=/usr/bin/socat TCP-LISTEN:48114,fork,reuseaddr TCP:127.0.0.1:8096Restart=alwaysRestartSec=3
[Install]WantedBy=multi-user.targetEOF
systemctl daemon-reloadsystemctl enable --now emby-direct-48114访问:
http://服务器IP:48111 Emby 反代线路http://服务器IP:48114 Emby 直连播放线路8. 初始化 Emby 与媒体库
浏览器打开:
http://服务器IP:8096或:
https://play.example.com首次向导:
语言:中文创建管理员账号允许远程访问:开启添加媒体库:
电影库:内容类型:电影路径:/mnt/gdrive/媒体/电影
电视剧库:内容类型:电视节目路径:/mnt/gdrive/媒体/电视剧
动漫库:内容类型:电视节目路径:/mnt/gdrive/媒体/动漫9. 媒体库整理规范
Emby 主要根据媒体库类型、文件夹名、文件名和季集编号识别,不会真正理解视频内容。
9.1 电影
媒体/ 电影/ 流浪地球 (2019)/ 流浪地球 (2019).mkv
Avatar (2009)/ Avatar (2009).mkv9.2 电视剧
媒体/ 电视剧/ 庆余年 (2019)/ Season 01/ 庆余年 S01E01.mkv 庆余年 S01E02.mkv Season 02/ 庆余年 S02E01.mkv9.3 动漫
媒体/ 动漫/ 葬送的芙莉莲 (2023)/ Season 01/ 葬送的芙莉莲 S01E01.mkv 葬送的芙莉莲 S01E02.mkv Specials/ 葬送的芙莉莲 S00E01.mkv9.4 不推荐
电视剧/ 第01集.mkv 第02集.mkv 01.mp4或把所有电影、剧集、动漫混在一个媒体库里。
TIPTelegram 上传机器人建议只上传到
gdrive:TelegramUploads/待整理,不要直接传到正式媒体库。确认片名、年份、季集后,再移动到正式目录。
10. Telegram 上传网盘机器人
这个机器人负责:
接收 Telegram 文件自动排队上传到当前设置的网盘目录支持 /setdir 切换上传目录支持 /cancel 中止当前任务支持 /queue 查看等待队列支持每 10 秒刷新上传进度10.1 准备信息
需要:
BOT_TOKENTELEGRAM_API_IDTELEGRAM_API_HASH你的 Telegram 数字 IDBOT_TOKEN 从 @BotFather 创建。
TELEGRAM_API_ID 和 TELEGRAM_API_HASH 从 https://my.telegram.org 创建 App 获取。
10.2 创建目录与配置
mkdir -p /opt/tg-bot-uploader/botapi-datamkdir -p /opt/tg-bot-uploader/downloadscd /opt/tg-bot-uploader创建 .env:
cat > /opt/tg-bot-uploader/.env <<'EOF'BOT_TOKEN=这里填你的BOT_TOKENTELEGRAM_API_ID=这里填你的API_IDTELEGRAM_API_HASH=这里填你的API_HASH
ALLOWED_USER_IDS=
DEST_REMOTE=gdrive:TelegramUploads/待整理ALLOWED_DEST_PREFIXES=gdrive:
BOT_API_URL=http://127.0.0.1:18082FILE_API_URL=http://127.0.0.1:18082/file
LOCAL_BOT_API_PATH_PREFIX=/var/lib/telegram-bot-apiHOST_BOT_API_PATH_PREFIX=/opt/tg-bot-uploader/botapi-data
DOWNLOAD_DIR=/opt/tg-bot-uploader/downloadsSTATE_FILE=/opt/tg-bot-uploader/state.jsonPROGRESS_INTERVAL=10SKIP_OLD_UPDATES_ON_START=trueEOF
chmod 600 /opt/tg-bot-uploader/.envvim /opt/tg-bot-uploader/.env10.3 部署本地 Telegram Bot API Server
cat > /opt/tg-bot-uploader/docker-compose.yml <<'EOF'services: telegram-bot-api: image: aiogram/telegram-bot-api:latest container_name: telegram-bot-api restart: unless-stopped ports: - "127.0.0.1:18082:8081" env_file: - /opt/tg-bot-uploader/.env environment: TELEGRAM_LOCAL: "1" volumes: - /opt/tg-bot-uploader/botapi-data:/var/lib/telegram-bot-apiEOF启动:
cd /opt/tg-bot-uploaderdocker compose pulldocker compose up -ddocker logs --tail=100 telegram-bot-api测试:
source /opt/tg-bot-uploader/.envcurl "http://127.0.0.1:18082/bot${BOT_TOKEN}/getMe"返回 "ok":true 即成功。
给机器人发 /start,然后获取你的 Telegram ID:
curl -s "http://127.0.0.1:18082/bot${BOT_TOKEN}/getUpdates" | jq把 from.id 写进 .env:
ALLOWED_USER_IDS=12345678910.4 上传机器人脚本
cat > /opt/tg-bot-uploader/tg_uploader.py <<'PY'#!/usr/bin/env python3import osimport reimport timeimport jsonimport shutilimport selectimport threadingimport subprocessfrom pathlib import Pathfrom collections import dequefrom urllib.parse import quotefrom urllib.request import urlopen, Request
BOT_TOKEN = os.environ["BOT_TOKEN"]BOT_API_URL = os.environ.get("BOT_API_URL", "http://127.0.0.1:18082").rstrip("/")FILE_API_URL = os.environ.get("FILE_API_URL", "http://127.0.0.1:18082/file").rstrip("/")DEFAULT_DEST_REMOTE = os.environ.get("DEST_REMOTE", "gdrive:TelegramUploads/待整理").rstrip("/")STATE_FILE = Path(os.environ.get("STATE_FILE", "/opt/tg-bot-uploader/state.json"))DOWNLOAD_DIR = Path(os.environ.get("DOWNLOAD_DIR", "/opt/tg-bot-uploader/downloads"))PROGRESS_INTERVAL = int(os.environ.get("PROGRESS_INTERVAL", "10"))SKIP_OLD = os.environ.get("SKIP_OLD_UPDATES_ON_START", "true").lower() in ["1", "true", "yes", "y"]
ALLOWED_DEST_PREFIXES = [x.strip() for x in os.environ.get("ALLOWED_DEST_PREFIXES", "gdrive:").split(",") if x.strip()]ALLOWED_USER_IDS = {int(x.strip()) for x in os.environ.get("ALLOWED_USER_IDS", "").split(",") if x.strip()}
LOCAL_PREFIX = os.environ.get("LOCAL_BOT_API_PATH_PREFIX", "")HOST_PREFIX = os.environ.get("HOST_BOT_API_PATH_PREFIX", "")
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
state_lock = threading.Lock()queue_cond = threading.Condition()job_queue = deque()current_job = Nonejob_seq = 0
class Cancelled(Exception): pass
def load_state_unlocked(): if not STATE_FILE.exists(): return {"users": {}, "last_update_id": None} try: data = json.loads(STATE_FILE.read_text("utf-8")) except Exception: return {"users": {}, "last_update_id": None} data.setdefault("users", {}) data.setdefault("last_update_id", None) return data
def save_state_unlocked(data): tmp = STATE_FILE.with_suffix(".tmp") tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), "utf-8") tmp.replace(STATE_FILE)
def get_user_dest(user_id): with state_lock: state = load_state_unlocked() return state["users"].get(str(user_id), {}).get("dest", DEFAULT_DEST_REMOTE)
def set_user_dest(user_id, dest): dest = dest.strip().rstrip("/") if not dest: raise ValueError("目录不能为空") if not any(dest.startswith(prefix) for prefix in ALLOWED_DEST_PREFIXES): raise ValueError(f"不允许的目录。允许前缀:{', '.join(ALLOWED_DEST_PREFIXES)}") with state_lock: state = load_state_unlocked() state["users"].setdefault(str(user_id), {}) state["users"][str(user_id)]["dest"] = dest save_state_unlocked(state)
def get_last_update_id(): with state_lock: return load_state_unlocked().get("last_update_id")
def set_last_update_id(update_id): with state_lock: state = load_state_unlocked() state["last_update_id"] = update_id save_state_unlocked(state)
def api(method, data=None, timeout=300): url = f"{BOT_API_URL}/bot{BOT_TOKEN}/{method}" if data is None: req = Request(url) else: req = Request(url, data=json.dumps(data).encode("utf-8"), headers={"Content-Type": "application/json"}) with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8"))
def send(chat_id, text): try: res = api("sendMessage", {"chat_id": chat_id, "text": text}) if res.get("ok"): return res["result"]["message_id"] except Exception as e: print("sendMessage error:", repr(e), flush=True) return None
def edit(chat_id, message_id, text): if not message_id: return try: api("editMessageText", {"chat_id": chat_id, "message_id": message_id, "text": text}) except Exception as e: print("editMessageText error:", repr(e), flush=True)
def human_size(num): try: num = float(num) except Exception: return "未知" units = ["B", "KB", "MB", "GB", "TB"] for unit in units: if abs(num) < 1024: return f"{num:.2f} {unit}" num /= 1024 return f"{num:.2f} PB"
def safe_name(name): return name.replace("/", "_").replace("\\", "_").strip() or f"telegram_file_{int(time.time())}"
def rclone_join(remote_dir, file_name): remote_dir = remote_dir.strip().rstrip("/") return remote_dir + file_name if remote_dir.endswith(":") else remote_dir + "/" + file_name
def ensure_remote_dir(remote_dir): subprocess.run(["rclone", "mkdir", remote_dir], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def get_media(message): for key in ["document", "video", "audio"]: if key in message: obj = message[key] file_id = obj["file_id"] file_name = obj.get("file_name") or f"{key}_{message['message_id']}" + (".mp4" if key == "video" else "") size = obj.get("file_size", 0) return file_id, safe_name(file_name), size if "photo" in message: obj = message["photo"][-1] return obj["file_id"], f"photo_{message['message_id']}.jpg", obj.get("file_size", 0) return None, None, None
def get_file_path(file_id): info = api("getFile", {"file_id": file_id}) if not info.get("ok"): raise RuntimeError(info) return info["result"]["file_path"]
def resolve_local_path(file_path): paths = [] if file_path.startswith("/"): paths.append(file_path) if LOCAL_PREFIX and HOST_PREFIX and file_path.startswith(LOCAL_PREFIX): paths.append(file_path.replace(LOCAL_PREFIX, HOST_PREFIX, 1)) for p in paths: if os.path.exists(p): return Path(p) return None
def download_via_http(file_path, file_name, job): url = f"{FILE_API_URL}/bot{BOT_TOKEN}/{quote(file_path)}" local_path = DOWNLOAD_DIR / file_name with urlopen(url, timeout=3600) as r, open(local_path, "wb") as f: total = r.headers.get("Content-Length") total = int(total) if total and total.isdigit() else 0 done = 0 last_update = 0 while True: if job["cancel_event"].is_set(): raise Cancelled("已中止当前任务") chunk = r.read(8 * 1024 * 1024) if not chunk: break f.write(chunk) done += len(chunk) now = time.time() if now - last_update >= PROGRESS_INTERVAL: if total: percent = done / total * 100 text = f"正在从 Telegram 下载到服务器:\n{file_name}\n\n进度:{percent:.2f}%\n已下载:{human_size(done)} / {human_size(total)}\n\n可发送 /cancel 中止当前任务" else: text = f"正在从 Telegram 下载到服务器:\n{file_name}\n\n已下载:{human_size(done)}\n\n可发送 /cancel 中止当前任务" edit(job["chat_id"], job["progress_message_id"], text) last_update = now return local_path
def strip_ansi(s): return re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", s).replace("\r", "").replace("\n", "").strip()
def get_queue_size(): with queue_cond: return len(job_queue)
def format_progress_message(job, line): line = strip_ansi(line) or "等待 rclone 输出进度..." if len(line) > 900: line = line[-900:] return ( f"正在上传:\n{job['file_name']}\n\n" f"目标目录:\n{job['dest_dir']}\n\n" f"rclone 进度:\n{line}\n\n" f"队列中等待:{get_queue_size()} 个\n" f"可发送 /cancel 中止当前任务" )
def terminate_process(proc): if not proc: return try: proc.terminate() proc.wait(timeout=8) except Exception: try: proc.kill() except Exception: pass
def upload_with_rclone_progress(src, job): src = Path(src) dest_path = rclone_join(job["dest_dir"], job["file_name"]) ensure_remote_dir(job["dest_dir"])
cmd = [ "rclone", "copyto", str(src), dest_path, "-P", "--stats=1s", "--stats-one-line", "--stats-unit=bytes", "--stats-log-level=NOTICE", "--transfers=1", "--checkers=1", "--tpslimit=3", "--tpslimit-burst=3", "--drive-pacer-min-sleep=200ms", "--drive-pacer-burst=10", "--retries=20", "--retries-sleep=30s", "--low-level-retries=20", "--log-level=NOTICE" ]
proc = subprocess.Popen(cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=0) job["process"] = proc job["stage"] = "uploading"
last_edit = 0 line_buf = "" last_line = "等待 rclone 输出进度..."
try: while True: if job["cancel_event"].is_set(): terminate_process(proc) raise Cancelled("已中止当前任务")
if proc.poll() is not None: break
ready, _, _ = select.select([proc.stdout], [], [], 1.0) if not ready: continue
ch = proc.stdout.read(1) if not ch: continue
if ch in ["\r", "\n"]: clean = strip_ansi(line_buf) line_buf = "" if clean: last_line = clean now = time.time() if now - last_edit >= PROGRESS_INTERVAL: edit(job["chat_id"], job["progress_message_id"], format_progress_message(job, last_line)) last_edit = now else: line_buf += ch if len(line_buf) > 3000: line_buf = line_buf[-3000:]
rc = proc.wait() if job["cancel_event"].is_set(): raise Cancelled("已中止当前任务") if rc != 0: raise RuntimeError(f"rclone 上传失败,退出码:{rc}\n最后输出:{last_line}")
edit( job["chat_id"], job["progress_message_id"], f"上传完成:\n{job['file_name']}\n\n目标目录:\n{job['dest_dir']}\n\n网盘路径:\n{dest_path}\n\n队列中等待:{get_queue_size()} 个" ) return dest_path finally: job["process"] = None
def queue_snapshot(limit=10): with queue_cond: items = list(job_queue)[:limit] total = len(job_queue) if not items: return "当前没有等待中的任务。" lines = [f"等待队列:共 {total} 个任务"] for i, job in enumerate(items, 1): lines.append(f"{i}. #{job['id']} {job['file_name']} -> {job['dest_dir']}") if total > limit: lines.append(f"... 还有 {total - limit} 个未显示") return "\n".join(lines)
def status_text(): with queue_cond: cur = current_job total = len(job_queue) if cur: cur_text = f"当前任务:\n#{cur['id']} {cur['file_name']}\n阶段:{cur.get('stage', '未知')}\n目标目录:{cur['dest_dir']}\n" else: cur_text = "当前没有正在上传的任务。\n" return f"{cur_text}\n等待队列:{total} 个\n\n命令:\n/queue 查看队列\n/cancel 中止当前任务\n/clear 清空等待队列"
def help_text(user_id): current = get_user_dest(user_id) return ( "上传机器人使用说明\n\n" "直接发送文件、视频或音频给我,我会自动上传到当前网盘目录。\n" "多个文件会自动排队,按顺序依次上传。\n\n" "目录命令:\n" "/dir 查看当前上传目录\n" "/setdir gdrive:TelegramUploads/待整理 设置上传目录\n" "/setdir gdrive:媒体/电视剧/剧名 (年份)/Season 01 切换到指定目录\n\n" "队列命令:\n" "/status 查看当前任务和队列数量\n" "/queue 查看等待队列\n" "/cancel 中止当前正在上传的任务\n" "/clear 清空等待队列,不影响当前正在上传的任务\n\n" f"当前上传目录:\n{current}\n\n" "建议发送时选择“作为文件发送 / Send as File”,不要选择压缩视频。" )
def normalize_command(text): if not text: return "", "" parts = text.strip().split(maxsplit=1) cmd = parts[0].split("@", 1)[0] arg = parts[1].strip() if len(parts) > 1 else "" return cmd, arg
def add_job(message, chat_id, user_id): global job_seq file_id, file_name, size = get_media(message) if not file_id: send(chat_id, help_text(user_id)) return
dest_dir = get_user_dest(user_id)
with queue_cond: job_seq += 1 job_id = job_seq position = len(job_queue) + (1 if current_job else 0) + 1
progress_message_id = send( chat_id, f"已加入上传队列:\n#{job_id} {file_name}\n\n" f"大小:{human_size(size) if size else '未知'}\n" f"目标目录:\n{dest_dir}\n\n" f"当前排队位置:{position}\n" f"发送 /queue 查看队列,/cancel 中止当前上传。" )
job = { "id": job_id, "chat_id": chat_id, "user_id": user_id, "file_id": file_id, "file_name": file_name, "size": size, "dest_dir": dest_dir, "progress_message_id": progress_message_id, "stage": "queued", "cancel_event": threading.Event(), "process": None, }
with queue_cond: job_queue.append(job) queue_cond.notify()
def handle_message(message): chat_id = message.get("chat", {}).get("id") user_id = message.get("from", {}).get("id") if not chat_id or not user_id: return
if ALLOWED_USER_IDS and user_id not in ALLOWED_USER_IDS: send(chat_id, "你没有权限使用这个上传机器人。") return
text = (message.get("text") or "").strip() cmd, arg = normalize_command(text)
if cmd in ["/start", "/help"]: send(chat_id, help_text(user_id)) return if cmd == "/dir": send(chat_id, f"当前上传目录:\n{get_user_dest(user_id)}") return if cmd == "/setdir": if not arg: send(chat_id, "用法:\n/setdir gdrive:TelegramUploads/待整理\n/setdir gdrive:媒体/电视剧/剧名 (年份)/Season 01") return try: set_user_dest(user_id, arg) ensure_remote_dir(arg) send(chat_id, f"已切换上传目录:\n{arg}") except Exception as e: send(chat_id, f"设置失败:{e}") return if cmd == "/queue": send(chat_id, queue_snapshot()) return if cmd == "/status": send(chat_id, status_text()) return if cmd == "/clear": with queue_cond: count = len(job_queue) job_queue.clear() send(chat_id, f"已清空等待队列:{count} 个任务。当前正在上传的任务不受影响。") return if cmd == "/cancel": with queue_cond: cur = current_job if not cur: send(chat_id, "当前没有正在上传的任务。") return cur["cancel_event"].set() if cur.get("process"): terminate_process(cur["process"]) send(chat_id, f"已请求中止当前任务:\n#{cur['id']} {cur['file_name']}") return
add_job(message, chat_id, user_id)
def worker_loop(): global current_job while True: with queue_cond: while not job_queue: queue_cond.wait() job = job_queue.popleft() current_job = job
src = None should_delete = False
try: job["stage"] = "preparing" edit(job["chat_id"], job["progress_message_id"], f"开始处理:\n#{job['id']} {job['file_name']}\n\n目标目录:\n{job['dest_dir']}\n\n正在获取 Telegram 文件信息...")
if job["cancel_event"].is_set(): raise Cancelled("已中止当前任务")
file_path = get_file_path(job["file_id"]) local_path = resolve_local_path(file_path)
if local_path: src = local_path should_delete = False job["stage"] = "ready" else: job["stage"] = "downloading" src = download_via_http(file_path, job["file_name"], job) should_delete = True
if job["cancel_event"].is_set(): raise Cancelled("已中止当前任务")
upload_with_rclone_progress(src, job)
except Cancelled as e: edit(job["chat_id"], job["progress_message_id"], f"任务已中止:\n#{job['id']} {job['file_name']}\n\n{e}\n\n队列中等待:{get_queue_size()} 个") except Exception as e: edit(job["chat_id"], job["progress_message_id"], f"任务失败:\n#{job['id']} {job['file_name']}\n\n错误:{e}\n\n队列中等待:{get_queue_size()} 个") print("worker error:", repr(e), flush=True) finally: if should_delete and src and Path(src).exists(): try: Path(src).unlink() except Exception: pass with queue_cond: current_job = None queue_cond.notify_all()
def initialize_offset(): last = get_last_update_id() if last is not None: return int(last) + 1 if not SKIP_OLD: return 0 try: res = api("getUpdates", {"timeout": 0, "limit": 100}, timeout=30) updates = res.get("result", []) if updates: max_id = max(u["update_id"] for u in updates) set_last_update_id(max_id) return max_id + 1 except Exception as e: print("initialize_offset error:", repr(e), flush=True) return 0
def polling_loop(): offset = initialize_offset() print(f"polling started, offset={offset}", flush=True) while True: try: res = api("getUpdates", {"offset": offset, "timeout": 50, "allowed_updates": ["message"]}, timeout=70) for upd in res.get("result", []): update_id = upd["update_id"] offset = update_id + 1 set_last_update_id(update_id) msg = upd.get("message") or {} if msg: handle_message(msg) except Exception as e: print("polling error:", repr(e), flush=True) time.sleep(5)
def main(): print("tg-uploader started", flush=True) worker = threading.Thread(target=worker_loop, daemon=True) worker.start() polling_loop()
if __name__ == "__main__": main()PY检查语法:
chmod +x /opt/tg-bot-uploader/tg_uploader.pypython3 -m py_compile /opt/tg-bot-uploader/tg_uploader.py创建 systemd:
cat > /etc/systemd/system/tg-uploader.service <<'EOF'[Unit]Description=Telegram file uploader to rclone remoteAfter=network-online.target docker.serviceWants=network-online.target
[Service]Type=simpleEnvironmentFile=/opt/tg-bot-uploader/.envWorkingDirectory=/opt/tg-bot-uploaderExecStart=/usr/bin/python3 /opt/tg-bot-uploader/tg_uploader.pyRestart=alwaysRestartSec=5
[Install]WantedBy=multi-user.targetEOF
systemctl daemon-reloadsystemctl enable --now tg-uploaderjournalctl -u tg-uploader -f10.5 机器人命令
/help查看帮助
/dir查看当前上传目录
/setdir gdrive:TelegramUploads/待整理设置上传目录
/setdir gdrive:媒体/电视剧/剧名 (年份)/Season 01切换到指定目录
/status查看当前任务
/queue查看等待队列
/cancel中止当前正在上传的任务
/clear清空等待队列11. EmbyPulse 管理方案
EmbyPulse 适合需要 Web 管理面板、用户中心、统计、播放排行等功能的场景。
11.1 Emby 里安装 Playback Reporting
Emby 后台:
管理服务器插件目录 / Catalog搜索 Playback Reporting安装重启 Emby11.2 创建 Emby API Key
管理服务器服务器API新增 API Key名称:EmbyPulse11.3 部署 EmbyPulse
mkdir -p /opt/embypulse/configmkdir -p /opt/embypulse/datamkdir -p /opt/media-stack/embypulsecd /opt/media-stack/embypulse.env:
cat > .env <<'EOF'LOCAL_ADMIN_USERNAME=adminLOCAL_ADMIN_PASSWORD=这里改成强密码
EMBY_HOST=http://host.docker.internal:8096EMBY_API_KEY=这里填Emby_API_KeyEOF
chmod 600 .envdocker-compose.yml:
services: emby-pulse: image: zeyu8023/embypulse-pro:latest container_name: emby-pulse restart: unless-stopped ports: - "127.0.0.1:10307:10307" - "127.0.0.1:10308:10308" extra_hosts: - "host.docker.internal:host-gateway" volumes: - /opt/embypulse/config:/workspace/config - /opt/embypulse/data:/workspace/data environment: TZ: Asia/Shanghai PORT: "10307" REQUEST_PORT: "10308" LOCAL_AUTH_ENABLED: "true" LOCAL_ADMIN_USERNAME: "${LOCAL_ADMIN_USERNAME}" LOCAL_ADMIN_PASSWORD: "${LOCAL_ADMIN_PASSWORD}" EMBY_HOST: "${EMBY_HOST}" EMBY_API_KEY: "${EMBY_API_KEY}"启动:
docker compose pulldocker compose up -ddocker logs --tail=100 emby-pulseCaddy 反代:
admin.example.com { reverse_proxy 127.0.0.1:10307}
user.example.com { reverse_proxy 127.0.0.1:10308}12. EmbyTGBot 管理方案
EmbyTGBot 适合通过 Telegram 管理 Emby 用户。
12.1 准备
需要:
Emby API KeyEmby 模板用户 testone管理员 Bot Token客户端 Bot Token管理员 Telegram IDEmby 创建模板用户:
用户名:testone用途:新用户复制它的权限和配置12.2 部署
mkdir -p /opt/emby_tg_admincd /opt/emby_tg_admin
git clone https://github.com/sd87671067/EmbyTGBot.git EmbyTGBotcd /opt/emby_tg_admin/EmbyTGBot
cp .env.example .envchmod 600 .env生成密钥:
openssl rand -hex 32编辑 .env:
APP_NAME=Emby TG 管理中心APP_ENV=productionAPP_PORT=18080APP_BASE_URL=http://127.0.0.1:18080APP_TIMEZONE=Asia/ShanghaiAPP_MASTER_KEY=这里填openssl生成的字符串
APP_WEB_ADMIN_USERNAME=adminAPP_WEB_ADMIN_PASSWORD=一个强密码
EMBY_BASE_URL=http://host.docker.internal:8096EMBY_API_KEY=Emby_API_Key
EMBY_SERVER_PUBLIC_URL=http://play1.example.com:8096,https://play2.example.com
EMBY_TEMPLATE_USER=testoneEMBY_IMPORT_IGNORE_USERNAMES=admin,testoneEMBY_SYNC_LOCAL_DEFAULT_PASSWORD=1234
ADMIN_BOT_TOKEN=管理员BotTokenADMIN_CHAT_IDS=你的Telegram数字ID
CLIENT_BOT_TOKEN=客户端BotToken
ADMIN_CONTACT_TG_USERNAME=@你的TG用户名ADMIN_CONTACT_TG_USER_ID=你的Telegram数字ID
DEFAULT_USER_EXPIRE_DAYS=90REGISTER_CODE_LENGTH=16CODE_BATCH_LIMIT=500WEB_EXPIRING_SOON_DAYS=3EXPIRY_CHECK_SECONDS=3600ONLINE_CHECK_SECONDS=60修改 docker-compose.yml,给容器访问宿主机 Emby:
extra_hosts: - "host.docker.internal:host-gateway"启动:
docker compose up -d --builddocker logs -f --tail=120 emby_tg_admin12.3 常见问题
Bot Token 无效
日志:
TokenValidationError: Token is invalid!检查:
source .envcurl "https://api.telegram.org/bot${ADMIN_BOT_TOKEN}/getMe"curl "https://api.telegram.org/bot${CLIENT_BOT_TOKEN}/getMe"ARM64 镜像问题
Emby 本体需要 ARM64 镜像:
image: emby/embyserver_arm64v8:latestplatform: linux/arm64/v8查询有效用户只显示一个
可以确认 SQLite 数据:
python3 - <<'PY'import sqlite3from pathlib import Path
for db in Path("data").rglob("*.db"): print(f"\n===== {db} =====") conn = sqlite3.connect(db) cur = conn.cursor() for t in [r[0] for r in cur.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")]: cols = [r[1] for r in cur.execute(f'PRAGMA table_info("{t}")')] if "username" in cols: print(f"\n--- {t} ---") cur.execute(f'SELECT * FROM "{t}"') for row in cur.fetchall(): print(row) conn.close()PY如果数据库里有多个用户,但机器人只显示一个,需要修正 send_users_page 函数或等待项目更新。
服务地址一行显示两个 URL
可以把客户端显示改成:
服务地址:直连线路:http://play1.example.com:8096CF线路:https://play2.example.com核心逻辑是把逗号分隔的 EMBY_SERVER_PUBLIC_URL 格式化成多行。
13. Foam 卸载
如果之前部署过 Foam,可以这样卸载:
cd /opt/media-stack/foam 2>/dev/null || true
if [ -f docker-compose.yml ]; then docker compose down --remove-orphansfi
docker rm -f foam foam-api foam-mysql foam-redis foam-selenium 2>/dev/null || true
mkdir -p /root/backupif [ -d /opt/media-stack/foam ]; then tar -czf /root/backup/foam-backup-$(date +%F-%H%M%S).tar.gz /opt/media-stack/foamfi
rm -rf /opt/media-stack/foamdocker image prune -f清理 Caddy 中 Foam 域名反代:
grep -R "foam" -n /etc/caddy 2>/dev/null || truevim /etc/caddy/conf.d/media-stack.caddycaddy validate --config /etc/caddy/Caddyfilesystemctl reload caddy14. 启停与维护
14.1 停止全部服务
docker stop embyserver 2>/dev/null || truedocker stop emby_tg_admin 2>/dev/null || truedocker stop telegram-bot-api 2>/dev/null || true
systemctl stop tg-uploader 2>/dev/null || true14.2 重启全部服务
systemctl restart rclone-gdrive
cd /opt/media-stack/embydocker compose up -d --force-recreate
cd /opt/emby_tg_admin/EmbyTGBotdocker compose up -d --force-recreate
cd /opt/tg-bot-uploaderdocker compose up -d --force-recreate
systemctl restart tg-uploadersystemctl reload caddy14.3 查看日志
docker logs -f --tail=100 embyserverdocker logs -f --tail=100 emby_tg_admindocker logs -f --tail=100 telegram-bot-apijournalctl -u tg-uploader -fjournalctl -u rclone-gdrive -n 100 --no-pager14.4 更新 Emby
cd /opt/media-stack/embydocker compose pulldocker compose up -d14.5 更新 EmbyTGBot
cd /opt/emby_tg_admin/EmbyTGBotgit pull --ff-onlydocker compose up -d --build14.6 更新上传机器人
cd /opt/tg-bot-uploaderdocker compose pulldocker compose up -dsystemctl restart tg-uploader15. 备份
mkdir -p /root/backup
tar -czf /root/backup/media-server-backup-$(date +%F).tar.gz \ /opt/media-stack \ /opt/media/emby/config \ /opt/tg-bot-uploader \ /opt/emby_tg_admin/EmbyTGBot/.env \ /opt/emby_tg_admin/EmbyTGBot/data \ /root/.config/rclone \ /etc/systemd/system/rclone-gdrive.service \ /etc/systemd/system/tg-uploader.service \ /etc/caddy重点备份:
/opt/media/emby/config/root/.config/rclone/rclone.conf/opt/tg-bot-uploader/.env/opt/tg-bot-uploader/state.json/opt/emby_tg_admin/EmbyTGBot/.env/opt/emby_tg_admin/EmbyTGBot/data/etc/caddy16. 最终推荐使用流程
日常新增资源:
1. Telegram 上传机器人: /setdir gdrive:TelegramUploads/待整理
2. 把资源作为文件发送给机器人
3. 上传完成后,用 rclone moveto 整理到正式目录: gdrive:媒体/电影 gdrive:媒体/电视剧 gdrive:媒体/动漫
4. Emby 扫描媒体库
5. 如果识别错误,在 Emby 中手动 Identify用户管理:
方式 A:EmbyPulse Web 面板方式 B:EmbyTGBot Telegram 管理播放线路:
首选:直连线路 / 灰云域名备用:IP:8096管理:Cloudflare 橙云域名17. 总结
这套系统的核心原则是:
Emby 只负责播放和刮削rclone 负责把网盘挂成本地目录Telegram 上传机器人负责收资源和上传网盘EmbyPulse 或 EmbyTGBot 负责用户管理Caddy 和 Cloudflare 负责访问线路如果想稳定,最重要的是三点:
1. 媒体文件命名规范2. 播放线路不要长期走 Cloudflare 橙云3. rclone 使用自己的 Google Drive client_id把这三点做好,整套 Emby + 网盘媒体库的体验会稳定很多。