-
Notifications
You must be signed in to change notification settings - Fork 434
Expand file tree
/
Copy pathtest_streams.py
More file actions
302 lines (241 loc) · 10.8 KB
/
test_streams.py
File metadata and controls
302 lines (241 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
import os
import pytest
import av
import av.datasets
from .common import fate_suite
class TestStreams:
@pytest.fixture(autouse=True)
def cleanup(self):
yield
for file in (
"data.ts",
"data_source.ts",
"data_copy.ts",
"data_with_codec.ts",
"data_invalid.ts",
"out.mkv",
"video_with_attachment.mkv",
"remuxed_attachment.mkv",
):
if os.path.exists(file):
os.remove(file)
def test_stream_tuples(self) -> None:
for fate_name in ("h264/interlaced_crop.mp4",):
container = av.open(fate_suite(fate_name))
video_streams = tuple([s for s in container.streams if s.type == "video"])
assert video_streams == container.streams.video
audio_streams = tuple([s for s in container.streams if s.type == "audio"])
assert audio_streams == container.streams.audio
def test_loudnorm(self) -> None:
container = av.open(
fate_suite("amv/MTV_high_res_320x240_sample_Penguin_Joke_MTV_from_WMV.amv")
)
audio = container.streams.audio[0]
stats = av.filter.loudnorm.stats("i=-24.0:lra=7.0:tp=-2.0", audio)
assert isinstance(stats, bytes) and len(stats) > 30
assert b"inf" not in stats
assert b'"input_i"' in stats
def test_selection(self) -> None:
container = av.open(
fate_suite("amv/MTV_high_res_320x240_sample_Penguin_Joke_MTV_from_WMV.amv")
)
video = container.streams.video[0]
video.thread_type = av.codec.context.ThreadType.AUTO
assert video.thread_type == av.codec.context.ThreadType.AUTO
video.thread_type = 0x03
assert video.thread_type == av.codec.context.ThreadType.AUTO
video.thread_type = "AUTO"
assert video.thread_type == av.codec.context.ThreadType.AUTO
audio = container.streams.audio[0]
assert [video] == container.streams.get(video=0)
assert [video] == container.streams.get(video=(0,))
assert video == container.streams.best("video")
assert audio == container.streams.best("audio")
container = av.open(fate_suite("sub/MovText_capability_tester.mp4"))
subtitle = container.streams.subtitles[0]
assert subtitle == container.streams.best("subtitle")
container = av.open(fate_suite("mxf/track_01_v02.mxf"))
data = container.streams.data[0]
assert data == container.streams.best("data")
def test_printing_video_stream(self) -> None:
input_ = av.open(
fate_suite("amv/MTV_high_res_320x240_sample_Penguin_Joke_MTV_from_WMV.amv")
)
container = av.open("out.mkv", "w")
video_stream = container.add_stream("h264", rate=30)
encoder = video_stream.codec.name
video_stream.width = input_.streams.video[0].width
video_stream.height = input_.streams.video[0].height
video_stream.pix_fmt = "yuv420p"
for frame in input_.decode(video=0):
container.mux(video_stream.encode(frame))
break
repr = f"{video_stream}"
assert repr.startswith(f"<av.VideoStream #0 {encoder}, yuv420p 160x120 at ")
assert repr.endswith(">")
container.close()
input_.close()
def test_printing_video_stream2(self) -> None:
input_ = av.open(fate_suite("h264/interlaced_crop.mp4"))
input_stream = input_.streams.video[0]
container = av.open("out.mkv", "w")
video_stream = container.add_stream_from_template(input_stream)
encoder = video_stream.codec.name
for frame in input_.decode(video=0):
container.mux(video_stream.encode(frame))
break
repr = f"{video_stream}"
assert repr.startswith(f"<av.VideoStream #0 {encoder}, yuv420p 640x360 at ")
assert repr.endswith(">")
container.close()
input_.close()
def test_data_stream(self) -> None:
# First test writing and reading a simple data stream
container1 = av.open("data.ts", "w")
data_stream = container1.add_data_stream()
test_data = [b"test data 1", b"test data 2", b"test data 3"]
for i, data_ in enumerate(test_data):
packet = av.Packet(data_)
packet.pts = i
packet.stream = data_stream
container1.mux(packet)
container1.close()
# Test reading back the data stream
container = av.open("data.ts")
# Test best stream selection
data = container.streams.best("data")
assert data == container.streams.data[0]
# Test get method
assert [data] == container.streams.get(data=0)
assert [data] == container.streams.get(data=(0,))
# Verify we can read back all the packets, ignoring empty ones
packets = [p for p in container.demux(data) if bytes(p)]
assert len(packets) == len(test_data)
for packet, original_data in zip(packets, test_data):
assert bytes(packet) == original_data
# Test string representation
repr = f"{data_stream}"
assert repr.startswith("<av.DataStream #0") and repr.endswith(">")
container.close()
def test_data_stream_from_template(self) -> None:
source_path = "data_source.ts"
payloads = [b"payload-a", b"payload-b", b"payload-c"]
with av.open(source_path, "w") as source:
source_stream = source.add_data_stream()
for i, payload in enumerate(payloads):
packet = av.Packet(payload)
packet.pts = i
packet.stream = source_stream
source.mux(packet)
copied_payloads: list[bytes] = []
with av.open(source_path) as input_container:
input_data_stream = input_container.streams.data[0]
with av.open("data_copy.ts", "w") as output_container:
output_data_stream = output_container.add_stream_from_template(
input_data_stream
)
for data_packet in input_container.demux(input_data_stream):
payload = bytes(data_packet)
if not payload:
continue
copied_payloads.append(payload)
clone = av.Packet(payload)
clone.pts = data_packet.pts
clone.dts = data_packet.dts
clone.time_base = data_packet.time_base
clone.stream = output_data_stream
output_container.mux(clone)
with av.open("data_copy.ts") as remuxed:
output_stream = remuxed.streams.data[0]
assert output_stream.codec_context is None
remuxed_payloads: list[bytes] = []
for data_packet in remuxed.demux(output_stream):
payload = bytes(data_packet)
if payload:
remuxed_payloads.append(payload)
assert remuxed_payloads == copied_payloads
def test_data_stream_with_codec(self) -> None:
"""Test adding a data stream with a specific codec name."""
# Test that invalid codec names raise appropriate errors
with pytest.raises(ValueError, match="Unknown data codec"):
container = av.open("data_invalid.ts", "w")
try:
container.add_data_stream("not_a_real_codec_name_12345")
finally:
container.close()
# Test that add_data_stream with codec parameter works correctly
# We use "bin_data" which is a data codec that's always available
output_path = "data_with_codec.ts"
with av.open(output_path, "w") as container:
# Try to create a data stream with a codec
# bin_data is a simple passthrough codec for binary data
data_stream = container.add_data_stream("bin_data")
klv_stream = container.add_data_stream("klv")
assert data_stream.type == "data"
assert klv_stream.type == "data"
# Note: codec_context may be None for descriptor-only data codecs
test_data = [b"test1", b"test2", b"test3"]
for i, data in enumerate(test_data):
packet = av.Packet(data)
packet.pts = i
packet.stream = data_stream
container.mux(packet)
with av.open(output_path) as newcontainer:
data_stream = newcontainer.streams.data[0]
klv_stream = newcontainer.streams.data[1]
assert data_stream.type == "data"
assert klv_stream.type == "data"
assert "bin_data" in str(data_stream)
assert "klv" in str(klv_stream)
assert data_stream.name == "bin_data"
assert klv_stream.name == "klv"
try:
os.remove(output_path)
except Exception:
pass
def test_attachment_stream(self) -> None:
input_path = av.datasets.curated(
"pexels/time-lapse-video-of-night-sky-857195.mp4"
)
input_ = av.open(input_path)
out1_path = "video_with_attachment.mkv"
with av.open(out1_path, "w") as out1:
out1.add_attachment(
name="attachment.txt", mimetype="text/plain", data=b"hello\n"
)
in_v = input_.streams.video[0]
out_v = out1.add_stream_from_template(in_v)
for packet in input_.demux(in_v):
if packet.dts is None:
continue
packet.stream = out_v
out1.mux(packet)
input_.close()
with av.open(out1_path) as c:
attachments = c.streams.attachments
assert len(attachments) == 1
att = attachments[0]
assert att.name == "attachment.txt"
assert att.mimetype == "text/plain"
assert att.data == b"hello\n"
out2_path = "remuxed_attachment.mkv"
with av.open(out1_path) as ic, av.open(out2_path, "w") as oc:
stream_map = {}
for s in ic.streams:
stream_map[s.index] = oc.add_stream_from_template(s)
for packet in ic.demux(ic.streams.video):
if packet.dts is None:
continue
updated_stream = stream_map.get(packet.stream.index)
if isinstance(updated_stream, av.video.stream.VideoStream):
packet.stream = updated_stream
else:
raise ValueError("Expected a VideoStream")
oc.mux(packet)
with av.open(out2_path) as c:
attachments = c.streams.attachments
assert len(attachments) == 1
att = attachments[0]
assert att.name == "attachment.txt"
assert att.mimetype == "text/plain"
assert att.data == b"hello\n"