-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNECount.py
More file actions
70 lines (54 loc) · 2.35 KB
/
NECount.py
File metadata and controls
70 lines (54 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, explode, to_json, struct, lit, desc
from pyspark.sql.types import StringType, ArrayType
import spacy
def getNamedEntities(text):
"""Extract named entities using spaCy."""
doc = nlp(text)
return [ent.text.lower() for ent in doc.ents] # Extract entity text only
if __name__ == "__main__":
bootstrapServers = "kafka:9092"
inTopic = "news-articles"
outTopic = "named-entities"
spark = SparkSession\
.builder\
.appName("NamedEntitiesFromHeadlines")\
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option("subscribe", inTopic)\
.load()\
.selectExpr("CAST(value AS STRING)")
nlp = spacy.load("en_core_web_sm")
# UDF to extract entities
ner_udf = udf(getNamedEntities, ArrayType(StringType()))
# Assuming `lines` is the input DataFrame from Kafka
entities_df = lines.withColumn("entities", ner_udf("value"))
entities_exploded = entities_df.select(explode("entities").alias("entity"))
# Count named entities
wordCounts = entities_exploded.groupBy("entity").count()
toStreamVals = wordCounts.orderBy(desc("count")).limit(10)
# Write to console for debugging. Since the output for this is in the spark docker, attach to see debugging output in the log.
console_query = toStreamVals.writeStream \
.outputMode("complete") \
.format("console") \
.start()
# Prepare for Kafka output: key as null, value as JSON string
output_df = toStreamVals.withColumn("key", lit(None).cast("string")) \
.withColumn("value", to_json(struct("entity", "count")))
# Write to Kafka every minute.
kafka_query = output_df.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrapServers) \
.option("topic", outTopic) \
.option("checkpointLocation", "/tmp/kafka-ner-checkpoint") \
.trigger(processingTime="1 minutes") \
.outputMode("complete") \
.start()
kafka_query.awaitTermination()
console_query.awaitTermination()