-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathfunasr_client_api.py
More file actions
158 lines (131 loc) · 4.73 KB
/
funasr_client_api.py
File metadata and controls
158 lines (131 loc) · 4.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""
Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights
Reserved. MIT License (https://opensource.org/licenses/MIT)
2022-2023 by zhaomingwork@qq.com
"""
# pip install websocket-client
import ssl
from websocket import ABNF
from websocket import create_connection
from queue import Queue
import threading
import traceback
import json
import time
import numpy as np
# class for recognizer in websocket
class Funasr_websocket_recognizer:
"""
python asr recognizer lib
"""
def __init__(
self,
host="127.0.0.1",
port="30035",
is_ssl=True,
chunk_size="0, 10, 5",
chunk_interval=10,
mode="offline",
wav_name="default",
):
"""
host: server host ip
port: server port
is_ssl: True for wss protocal, False for ws
"""
try:
if is_ssl == True:
ssl_context = ssl.SSLContext()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
uri = "wss://{}:{}".format(host, port)
ssl_opt = {"cert_reqs": ssl.CERT_NONE}
else:
uri = "ws://{}:{}".format(host, port)
ssl_context = None
ssl_opt = None
self.host = host
self.port = port
self.msg_queue = Queue() # used for recognized result text
print("connect to url", uri)
self.websocket = create_connection(uri, ssl=ssl_context, sslopt=ssl_opt)
self.thread_msg = threading.Thread(
target=Funasr_websocket_recognizer.thread_rec_msg, args=(self,)
)
self.thread_msg.start()
chunk_size = [int(x) for x in chunk_size.split(",")]
message = json.dumps(
{
"mode": mode,
"chunk_size": chunk_size,
"encoder_chunk_look_back": 4,
"decoder_chunk_look_back": 1,
"chunk_interval": chunk_interval,
"wav_name": wav_name,
"is_speaking": True,
}
)
self.websocket.send(message)
print("send json", message)
except Exception as e:
print("Exception:", e)
traceback.print_exc()
# threads for rev msg
def thread_rec_msg(self):
try:
while True:
msg = self.websocket.recv()
if msg is None or len(msg) == 0:
continue
msg = json.loads(msg)
self.msg_queue.put(msg)
except Exception as e:
print("client closed")
# feed data to asr engine, wait_time means waiting for result until time out
def feed_chunk(self, chunk, wait_time=0.01):
try:
self.websocket.send(chunk, ABNF.OPCODE_BINARY)
# loop to check if there is a message, timeout in 0.01s
while True:
msg = self.msg_queue.get(timeout=wait_time)
if self.msg_queue.empty():
break
return msg
except:
return ""
def close(self, timeout=1):
message = json.dumps({"is_speaking": False})
self.websocket.send(message)
# sleep for timeout seconds to wait for result
time.sleep(timeout)
msg = ""
while not self.msg_queue.empty():
msg = self.msg_queue.get()
self.websocket.close()
# only resturn the last msg
return msg
if __name__ == "__main__":
print("example for Funasr_websocket_recognizer")
import wave
wav_path = "/Users/zhifu/Downloads/modelscope_models/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch/example/asr_example.wav"
with wave.open(wav_path, "rb") as wav_file:
params = wav_file.getparams()
frames = wav_file.readframes(wav_file.getnframes())
audio_bytes = bytes(frames)
stride = int(60 * 10 / 10 / 1000 * 16000 * 2)
chunk_num = (len(audio_bytes) - 1) // stride + 1
# create an recognizer
rcg = Funasr_websocket_recognizer(
host="127.0.0.1", port="10095", is_ssl=True, mode="2pass", chunk_size="0,10,5"
)
# loop to send chunk
for i in range(chunk_num):
beg = i * stride
data = audio_bytes[beg : beg + stride]
text = rcg.feed_chunk(data, wait_time=0.02)
if len(text) > 0:
print("text", text)
time.sleep(0.05)
# get last message
text = rcg.close(timeout=3)
print("text", text)