-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtext_analysis.py
More file actions
239 lines (195 loc) · 7.76 KB
/
text_analysis.py
File metadata and controls
239 lines (195 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
from string import punctuation
from os import path
from sys import exit
from findspark import init
try:
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
except ImportError as e:
import nltk
nltk.download('punkt')
nltk.download('porter')
nltk.download("stopwords")
print("Can not import NLTK Modules", e)
try:
init()
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import Tokenizer, HashingTF as HTF
except ImportError as e:
print("Can not import Spark Modules", e)
exit(1)
# Module-level global variables for the `tokenize` function below
PUNCTUATION = set(punctuation)
STOPWORDS = set(stopwords.words('english'))
STEMMER = PorterStemmer()
# Function to break sentence into "tokens", lowercase them, remove punctuation and stopwords, and stem them
def tokenize(sentence):
tokens = word_tokenize(sentence)
lowercased = [t.lower() for t in tokens]
no_punctuation = []
for word in lowercased:
punct_removed = ''.join([letter for letter in word if not letter in PUNCTUATION])
no_punctuation.append(punct_removed)
no_stopwords = [w for w in no_punctuation if not w in STOPWORDS]
stemmed = [STEMMER.stem(w) for w in no_stopwords]
return [w for w in stemmed if w]
def train_naive_bayes_model(data, spark_context, model_folder):
"""
:param data:
:type data: RDD
:param spark_context: the current spark context
:type spark_context: SparkContext
:param model_folder:
:type model_folder: basestring
:return:
:rtype:
"""
data_hashed = prepare_data(data)
# Split data 70/30 into training and test data sets
train_hashed, test_hashed = data_hashed.randomSplit([0.9, 0.1])
model = get_naive_bayes_model(spark_context, train_hashed, model_folder)
print 'training Accuracy :'
get_accuracy(model, train_hashed)
print 'testing Accuracy :'
get_accuracy(model, test_hashed)
def prepare_data(data):
"""
:param data:
:type data: RDD
:return: data cleaned and hashed
:rtype: RDD
"""
# Extract relevant fields in dataset -- category target and sentence content
data_pared = data.map(lambda line: (line['target'], line['sentence']))
# Prepare sentence for analysis using our tokenize function to clean it up
data_cleaned = data_pared.map(lambda (target, sentence): (target, tokenize(sentence)))
# Hashing term frequency vectorizer with 50k features
htf = HashingTF(50000)
# Create an RDD of LabeledPoints using category labels as labels and tokenized, hashed sentence as feature vectors
data_hashed = data_cleaned.map(lambda (target, sentence): LabeledPoint(target, htf.transform(sentence)))
# Ask Spark to persist the RDD so it won't have to be re-created later
# data_hashed.persist()
return data_hashed
def get_naive_bayes_model(spark_context, train_hashed, model_folder):
"""
:param spark_context: the current spark context
:type spark_context: SparkContext
:param train_hashed:
:type train_hashed: DataFrame
:param model_folder:
:type model_folder: basestring
:return: a trained Naive Bayes model
:rtype: NaiveBayesModel
"""
if not path.exists(model_folder):
# Train a Naive Bayes model on the training data
model = NaiveBayes.train(train_hashed)
# Ask Spark to save the model so it won't have to be re-trained later
model.save(spark_context, model_folder)
else:
model = NaiveBayesModel.load(spark_context, model_folder)
return model
def get_accuracy(model, test_hashed):
"""
:param model:
:type model: NaiveBayesModel
:param test_hashed:
:type test_hashed: RDD
:return: the accuracy of the given model
:rtype: float
"""
# Compare predicted labels to actual labels
prediction_and_labels = test_hashed.map(lambda point: (model.predict(point.features), point.label))
# Filter to only correct predictions
correct = prediction_and_labels.filter(lambda (predicted, actual): predicted == actual)
# Calculate and print accuracy rate
accuracy = correct.count() / float(test_hashed.count())
print "Classifier correctly predicted category " + str(accuracy * 100) + " percent of the time"
return accuracy
def predict_row(sentence, spark_context, model_folder):
"""
:param sentence: a sentence to be analysed
:type sentence: basestring
:param spark_context: the current spark context
:type spark_context: SparkContext
:param model_folder:
:type model_folder: basestring
:return: 0.0 if the sentence is negative, 1 if the sentence is neutral and 2 if the sentence is positive
:rtype: float
"""
htf = HashingTF(50000)
sentence_features = htf.transform(tokenize(sentence))
model = NaiveBayesModel.load(spark_context, model_folder)
prediction = model.predict(sentence_features)
print 'prediction :', prediction
return prediction
def get_data_transformers():
"""
Creates Data Transformers
:return: tokenizer, hasher, classifier
:rtype: Tokenizer, HashingTF, MultilayerPerceptronClassifier
"""
# Tokenizer : Splits each name into words
tokenizer = Tokenizer(inputCol="name", outputCol="words")
# HashingTF : builds term frequency feature vectors from text data
hasher = HTF(inputCol=tokenizer.getOutputCol(), outputCol="features", numFeatures=8)
"""
specify layers for the neural network:
input layer of size 4 (features), two intermediate of size 5 and 4
and output of size 3 (classes)
"""
# Network params
maxIter = 20
layers = 8, 5, 4, 5, 2
blockSize = 128
seed = 1234
# Creating the trainer and set its parameters
classifier = MultilayerPerceptronClassifier(maxIter=maxIter,
layers=layers,
blockSize=blockSize,
seed=seed)
return tokenizer, hasher, classifier
def classify_using_nn(data, spark_context, model_folder):
# TODO: review how do we transform data to RDD
# Splitting data
train, test = data.randomSplit([0.6, 0.4], 1234)
# if the model is already trained and saved
# we just load it
if not path.exists(model_folder):
# Transforming data
tokenizer, hasher, classifier = get_data_transformers()
# Estimating data
pipeline = Pipeline(stages=[tokenizer, hasher, classifier])
# Train the model
model = pipeline.fit(train)
model.save(model_folder)
else:
model = PipelineModel.load(model_folder)
# Compute accuracy on the train set
print("Train set accuracy = {:.3g}.".format(evaluate(model, train)))
# Compute accuracy on the test set
print("Test set accuracy = {:.3g}.".format(evaluate(model, test)))
spark_context.stop()
def evaluate(model, data):
"""
Evaluates the model
:param model: the trained model
:type model: PipelineModel
:param data:
:type data: DataFrame
:return: the accuracy of the model
:rtype: float
"""
evaluator = MulticlassClassificationEvaluator(labelCol="label",
predictionCol="prediction",
metricName="accuracy")
# Compute accuracy on the train set
result = model.transform(data)
result.show()
return evaluator.evaluate(result)