-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathm3u8_dl.py
More file actions
261 lines (221 loc) · 8.46 KB
/
m3u8_dl.py
File metadata and controls
261 lines (221 loc) · 8.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# -*- coding: utf-8 -*-
"""
m3u8_dl.py
~~~~~~~~~~
通用 M3U8 下载/解密/合并工具(AES-128/CBC)。
用法:
python m3u8_dl.py -u <m3u8_url> -o <输出.mp4> [-k <key_hex_or_url>] [-t 超时秒数] [-H <headers>]
示例:
# 无加密
python m3u8_dl.py -u https://example.com/index.m3u8 -o movie.mp4
# 已知 key
python m3u8_dl.py -u https://example.com/index.m3u8 -o movie.mp4 -k 0123456789abcdef0123456789abcdef
# key 由服务器下发
python m3u8_dl.py -u https://example.com/index.m3u8 -o movie.mp4 -k https://example.com/key.bin
# 使用命令行直接指定 HTTP 头
python m3u8_dl.py -u https://example.com/index.m3u8 -o movie.mp4 -H "Authorization:token123 User-Agent:Mozilla/5.0"
# 使用头文件
python m3u8_dl.py -u https://example.com/index.m3u8 -o movie.mp4 -H headers.txt
"""
from __future__ import annotations
import argparse
import sys
import time
from pathlib import Path
from typing import List, Optional, Tuple
from cli_logger import log_init, logger
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
# ------------------ 核心下载器 ------------------
class M3U8Downloader:
def __init__(
self,
m3u8_url: str,
output: str,
key: Optional[str] = None,
timeout: int = 30,
headers: Optional[dict] = None,
) -> None:
self.m3u8_url = m3u8_url
self.output = Path(output)
self.key: Optional[bytes] = None
self.iv: Optional[bytes] = None
self.timeout = timeout
self.ts_urls: List[str] = []
self.headers = headers or {}
# 自动创建输出目录
self.output.parent.mkdir(parents=True, exist_ok=True)
# ---------- 1. 解析 ----------
def parse(self) -> None:
logger.info("正在下载与解析 m3u8...")
try:
resp = requests.get(
self.m3u8_url, headers=self.headers, timeout=self.timeout
)
resp.raise_for_status()
except KeyboardInterrupt:
logger.error("用户中断")
sys.exit(130)
except Exception as e:
logger.error(f"m3u8 下载失败: {e}")
sys.exit(1)
lines = resp.text.splitlines()
base_url = self.m3u8_url.rsplit("/", 1)[0] + "/"
for line in lines:
line = line.strip()
if line.startswith("#EXT-X-KEY"):
self._parse_key(line)
elif line and not line.startswith("#"):
# 处理相对路径
self.ts_urls.append(
line if line.startswith("http") else base_url + line
)
logger.info(f"共解析到 {len(self.ts_urls)} 个分片")
def _parse_key(self, line: str) -> None:
"""
解析 #EXT-X-KEY:METHOD=AES-128,URI="xxx",IV=0x...
支持十六进制或远程 key 文件
"""
uri_part = iv_part = ""
for seg in line.split(","):
seg = seg.strip()
if seg.startswith("URI="):
uri_part = seg[4:].strip('"')
elif seg.startswith("IV="):
iv_part = seg[3:].strip('"')
# 下载 key
if uri_part.startswith("http"):
try:
key_bytes = requests.get(
uri_part, headers=self.headers, timeout=self.timeout
).content
except KeyboardInterrupt:
logger.error("用户中断")
sys.exit(130)
except Exception as e:
logger.error(f"key 下载失败: {e}")
sys.exit(1)
else:
# 直接当成 hex
try:
key_bytes = bytes.fromhex(uri_part)
except ValueError:
logger.error("key 格式错误,应为 hex 或 http(s) 链接")
sys.exit(1)
self.key = key_bytes
self.iv = bytes.fromhex(iv_part[2:]) if iv_part.startswith("0x") else key_bytes
logger.info(f"获取加密 key={self.key.hex()} iv={self.iv.hex()}")
# ---------- 2. 下载 ----------
def download(self) -> None:
logger.info("开始下载分片...")
temp_dir = self.output.with_suffix(".parts")
temp_dir.mkdir(exist_ok=True)
for idx, ts_url in enumerate(self.ts_urls, 1):
retry = 3
while retry:
try:
resp = requests.get(
ts_url, headers=self.headers, timeout=self.timeout
)
resp.raise_for_status()
break
except KeyboardInterrupt:
logger.error("用户中断")
sys.exit(130)
except Exception as e:
retry -= 1
logger.warning(
f"[{idx}/{len(self.ts_urls)}] 下载失败: {e},剩余重试 {retry}"
)
time.sleep(1)
else:
logger.error("多次重试仍失败,程序终止")
sys.exit(1)
data = resp.content
if self.key:
data = self._decrypt(data)
temp_file = temp_dir / f"{idx:06}.ts"
temp_file.write_bytes(data)
logger.success(f"[{idx}/{len(self.ts_urls)}] 完成")
def _decrypt(self, data: bytes) -> bytes:
cipher = AES.new(self.key, AES.MODE_CBC, iv=self.iv)
return unpad(cipher.decrypt(data), AES.block_size)
# ---------- 3. 合并 ----------
def merge(self) -> None:
logger.info("正在合并分片...")
temp_dir = self.output.with_suffix(".parts")
with open(self.output, "wb") as fout:
for ts_file in sorted(temp_dir.glob("*.ts")):
fout.write(ts_file.read_bytes())
logger.success(f"合并完成 -> {self.output.absolute()}")
# ---------- 4. 清理 ----------
def clean(self) -> None:
temp_dir = self.output.with_suffix(".parts")
for f in temp_dir.glob("*.ts"):
f.unlink()
temp_dir.rmdir()
# ---------- 5. 一键执行 ----------
def run(self) -> None:
self.parse()
self.download()
self.merge()
self.clean()
logger.success("全部任务完成!")
# ------------------ CLI ------------------
def parse_headers(headers_arg: Optional[str]) -> dict:
"""
解析 headers 参数,支持:
1. 文件路径:读取文件中的每一行作为一个头部
2. 命令行参数:直接解析 "Header1:value1 Header2:value2" 格式
返回一个 headers 字典
"""
headers = {}
if not headers_arg:
return headers
# 尝试作为文件路径处理
file_path = Path(headers_arg)
if file_path.exists() and file_path.is_file():
try:
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and ":" in line:
key, value = line.split(":", 1)
headers[key.strip()] = value.strip()
logger.info(f"从文件 {file_path} 加载请求头")
return headers
except Exception as e:
logger.warning(f"读取请求头文件失败: {e}")
# 作为命令行参数解析
header_pairs = headers_arg.split()
for pair in header_pairs:
if ":" in pair:
key, value = pair.split(":", 1)
headers[key.strip()] = value.strip()
if headers:
logger.info(f"解析到 {len(headers)} 个请求头")
return headers
def main() -> None:
parser = argparse.ArgumentParser(description="M3U8 下载/解密/合并工具")
parser.add_argument("-u", "--url", required=True, help="m3u8 地址")
parser.add_argument("-o", "--output", required=True, help="输出文件(mp4/ts 均可)")
parser.add_argument(
"-k", "--key", help="16 字节 hex 或 key 文件 url(留空自动读取 m3u8)"
)
parser.add_argument(
"-t", "--timeout", type=int, default=30, help="超时秒数,默认 30"
)
parser.add_argument(
"-H",
"--headers",
help="HTTP 请求头文件路径或直接输入的头信息(格式: Header1:value1 Header2:value2 或文件路径)",
)
args = parser.parse_args()
# 解析 headers
headers = parse_headers(args.headers)
dl = M3U8Downloader(args.url, args.output, args.key, args.timeout, headers)
dl.run()
if __name__ == "__main__":
log_init()
main()