-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp_ver5.py
More file actions
220 lines (173 loc) · 6.89 KB
/
app_ver5.py
File metadata and controls
220 lines (173 loc) · 6.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
from kafka import KafkaConsumer, KafkaProducer
from flask import Flask, request, jsonify
import numpy as np
import tensorflow as tf
import json
from sklearn.metrics.pairwise import cosine_similarity
import pickle
import threading
app = Flask(__name__)
# Load model
model = tf.keras.models.load_model('synonym_model.h5')
# Load từ điển word_to_index và index_to_word
with open('word_to_index.pkl', 'rb') as f:
word_to_index = pickle.load(f)
with open('index_to_word.pkl', 'rb') as f:
index_to_word = pickle.load(f)
# Lấy ma trận nhúng từ lớp Embedding
embedding_matrix = model.get_layer('embedding_2').get_weights()[0]
# Hàm tìm từ đồng nghĩa
def find_synonyms(word, top_n=5):
if word not in word_to_index:
return []
word_idx = word_to_index[word]
word_vector = embedding_matrix[word_idx].reshape(1, -1)
similarities = cosine_similarity(word_vector, embedding_matrix)[0]
similar_indices = np.argsort(-similarities)[1:top_n + 1]
similar_words = [index_to_word[idx] for idx in similar_indices]
return similar_words
# Kafka Consumer
def consume_kafka():
consumer = KafkaConsumer(
'synonyms_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
value_deserializer=lambda x: x.decode('utf-8')
)
print("Listening to Kafka topic: synonyms_topic")
for message in consumer:
raw_value = message.value
print(f"Raw Value: {raw_value}")
# Parse JSON từ Kafka
parsed_data = convert_to_json(raw_value)
if not parsed_data:
print("Invalid JSON format. Skipping message.")
continue
# Xử lý tin nhắn
processed_data = process_message(parsed_data)
print(f"Processed Data: {json.dumps(processed_data, indent=4)}")
user_id = processed_data.get("userId", "")
video_data = processed_data.get("videoData", [])
produce_message(user_id, video_data)
def process_message(parsed_data):
user_id = parsed_data.get("userId", "").strip()
video_data = parsed_data.get("videoData", {})
if not isinstance(video_data, dict):
print("Invalid videoData format. Skipping message.")
return {"error": "Invalid videoData format"}
# Loại bỏ giá trị null hoặc None
video_data_cleaned = {
key: value for key, value in video_data.items() if value is not None
}
# Tạo danh sách từ đồng nghĩa tổng hợp
combined_synonyms = set() # Sử dụng set để tránh trùng lặp
for key in video_data_cleaned.keys():
words = key.split()
for word in words:
word = word.strip()
if word: # Bỏ qua từ rỗng
synonyms = find_synonyms(word)
combined_synonyms.add(word) # Thêm từ chính
combined_synonyms.update(synonyms) # Thêm từ đồng nghĩa
# Trả về kết quả đã xử lý
result = {
"userId": user_id,
"videoData": list(combined_synonyms) # Chuyển set thành danh sách
}
return result
def convert_to_json(raw_value):
try:
return json.loads(raw_value)
except json.JSONDecodeError as e:
print(f"Error converting to JSON: {e}")
return None
@app.route('/process_synonyms', methods=['POST'])
def process_synonyms():
data = request.json # Lấy dữ liệu đầu vào
if not isinstance(data, dict) or not data:
return jsonify({'error': 'Input must be a non-empty JSON object'}), 400
# Xử lý userId
user_id = data.get("userId", "").strip()
# Xử lý videoData
video_data = data.get("videoData", {})
if not isinstance(video_data, dict):
return jsonify({'error': '"videoData" must be a JSON object'}), 400
# Tìm từ đồng nghĩa cho các khóa của videoData
synonyms_result = {}
for key in video_data.keys():
words = key.split() # Tách các từ trong khóa
for word in words:
word = word.strip() # Loại bỏ khoảng trắng
if word: # Bỏ qua từ rỗng
synonyms = find_synonyms(word)
if word not in synonyms_result: # Tránh trùng lặp
synonyms_result[word] = synonyms
# Kết quả đầu ra
result = {
"userId": user_id,
"videoData": synonyms_result
}
return jsonify(result)
# Chạy Kafka Consumer trong một thread riêng
kafka_thread = threading.Thread(target=consume_kafka, daemon=True)
kafka_thread.start()
def produce_message(user_id, genres):
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
message = {
"userId": user_id,
"genres": genres
}
producer.send('recommendation_topic', value=message)
producer.flush()
print(f"Message sent: {message}")
#####################################################################################
from keybert import KeyBERT
kw_model = KeyBERT()
@app.route('/extract_keywords', methods=['POST'])
def extract_keywords():
data = request.json
description = data.get("description", "")
if not description:
return jsonify({"error": "Description is required"}), 400
keywords = kw_model.extract_keywords(description, keyphrase_ngram_range=(1, 2), stop_words='english')
keyword_list = [keyword[0] for keyword in keywords]
return jsonify({"keywords": keyword_list})
# Kafka Consumer Configuration
consumer = KafkaConsumer(
'video-description-topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='video-description-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# Kafka Producer Configuration
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
def consume_kafka():
for message in consumer:
data = message.value
video_id = data.get("videoID", "")
description = data.get("description", "")
if description:
print(f"Processing description: {description}")
# Gọi hàm phân tích keywords tại đây
keywords = kw_model.extract_keywords(description, keyphrase_ngram_range=(1, 2), stop_words='english')
keyword_list = [keyword[0] for keyword in keywords]
print(f"Extracted keywords: {keyword_list}")
producer.send(
'video-genres-topic',
value={"videoID": video_id, "genres": keyword_list}
)
print(f"Sent keywords to Kafka: {keyword_list}")
if __name__ == '__main__':
kafka_thread = threading.Thread(target=consume_kafka)
kafka_thread.start()
# kafka_threadSynonyms = threading.Thread(target=consume_kafkaSynonyms)
# kafka_threadSynonyms.start()
app.run(host='0.0.0.0', port=5050)