-
Notifications
You must be signed in to change notification settings - Fork 433
Expand file tree
/
Copy pathbitstream.py
More file actions
115 lines (92 loc) · 3.22 KB
/
bitstream.py
File metadata and controls
115 lines (92 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import cython
import cython.cimports.libav as lib
from cython.cimports.av.error import err_check
from cython.cimports.av.packet import Packet
from cython.cimports.av.stream import Stream
from cython.cimports.libc.errno import EAGAIN
@cython.cclass
class BitStreamFilterContext:
"""
Initializes a bitstream filter: a way to directly modify packet data.
Wraps :ffmpeg:`AVBSFContext`
:param Stream in_stream: A stream that defines the input codec for the bitfilter.
:param Stream out_stream: A stream whose codec is overwritten using the output parameters from the bitfilter.
"""
def __cinit__(
self,
filter_description,
in_stream: Stream | None = None,
out_stream: Stream | None = None,
):
res: cython.int
filter_str: cython.p_char = filter_description
with cython.nogil:
res = lib.av_bsf_list_parse_str(filter_str, cython.address(self.ptr))
err_check(res)
if in_stream is not None:
with cython.nogil:
res = lib.avcodec_parameters_copy(
self.ptr.par_in, in_stream.ptr.codecpar
)
err_check(res)
with cython.nogil:
res = lib.av_bsf_init(self.ptr)
err_check(res)
if out_stream is not None:
with cython.nogil:
res = lib.avcodec_parameters_copy(
out_stream.ptr.codecpar, self.ptr.par_out
)
err_check(res)
lib.avcodec_parameters_to_context(
out_stream.codec_context.ptr, out_stream.ptr.codecpar
)
def __dealloc__(self):
if self.ptr:
lib.av_bsf_free(cython.address(self.ptr))
@cython.ccall
def filter(self, packet: Packet | None = None):
"""
Processes a packet based on the filter_description set during initialization.
Multiple packets may be created.
:type: list[Packet]
"""
res: cython.int
new_packet: Packet
with cython.nogil:
res = lib.av_bsf_send_packet(
self.ptr, packet.ptr if packet is not None else cython.NULL
)
err_check(res)
output: list = []
while True:
new_packet = Packet()
with cython.nogil:
res = lib.av_bsf_receive_packet(self.ptr, new_packet.ptr)
if res == -EAGAIN or res == lib.AVERROR_EOF:
return output
err_check(res)
if res:
return output
output.append(new_packet)
@cython.ccall
def flush(self):
"""
Reset the internal state of the filter.
Should be called e.g. when seeking.
Can be used to make the filter usable again after draining it with EOF marker packet.
"""
lib.av_bsf_flush(self.ptr)
@cython.cfunc
def get_filter_names() -> set:
names: set = set()
ptr: cython.pointer[cython.const[lib.AVBitStreamFilter]]
opaque: cython.p_void = cython.NULL
while True:
ptr = lib.av_bsf_iterate(cython.address(opaque))
if ptr:
names.add(ptr.name)
else:
break
return names
bitstream_filters_available = get_filter_names()