-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathKnnClassifier.py
183 lines (143 loc) · 6.43 KB
/
KnnClassifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# -*- coding: utf-8 -*-
from __future__ import division
"""
A basic KNN classifier
http://nlp.stanford.edu/IR-book/pdf/14vcat.pdf
The basic idea here is to represent each document as a vector of counts
of ngrams.
- The vectors are normalized to length 1
- New documents are classifier according the K training documents
they are closest to.
- In the current implementation, the distance measure is cosine.
This does not seem to be well-suited to tweet classification as the
vectors are all sparse with element values rarely greater than 1.
"""
import math, heapq
import preprocessing
from vectorizer_lib import TfidfVectorizer
class KnnClassifier:
K = 20
# Precision = 0.716, Recall = 0.716, F1 = 0.716
weight_bigrams = 4.4661
weight_trigrams = 15.8206
backoff = 0.9867
backoff2 = 0.9895
threshold = 0.4948
@staticmethod
def set_params(weight_bigrams, weight_trigrams, backoff, backoff2, threshold):
KnnClassifier.weight_bigrams = weight_bigrams
KnnClassifier.weight_trigrams = weight_trigrams
KnnClassifier.backoff = backoff
KnnClassifier.backoff2 = backoff2
KnnClassifier.threshold = threshold
@staticmethod
def get_params():
return (
KnnClassifier.weight_bigrams,
KnnClassifier.weight_trigrams,
KnnClassifier.backoff,
KnnClassifier.backoff2,
KnnClassifier.threshold
)
@staticmethod
def get_param_names():
return (
'weight_bigrams',
'weight_trigrams',
'backoff',
'backoff2',
'threshold'
)
@staticmethod
def get_weights():
weights = {
1 : 1.0,
2 : KnnClassifier.weight_bigrams,
3 : KnnClassifier.weight_trigrams
}
total = sum(weights.values())
return dict((n,w/total) for n,w in weights.items())
def __init__(self, training_data):
"""KnnClassifier initialization
"""
self.documents = dict((n,[]) for n in (1,2,3))
self.vectorizers = {}
self.train(training_data)
def __repr__(self):
def show_pos_neg(n, vocab, pos_centroid, neg_centroid):
def pn(word):
p = pos_centroid.get(word, 0.0)
n = neg_centroid.get(word, 0.0)
return '%6.4f - %6.4f = %7.4f %s' % (p, n, p-n, word)
def order(word):
return neg_centroid.get(word, 0.0) - pos_centroid.get(word, 0.0)
return 'pos neg n=%d\n%s' % (n, '\n'.join(pn(word) for word in sorted(vocab, key = order)))
return '\n'.join(show_pos_neg(n, self.vocab[n], self.pos_centroid[n], self.neg_centroid[n])
for n in (3,2,1))
def _add_example(self, cls, message):
"""Add a training example
"""
words = preprocessing.extract_words(message)
if not words: # Treat cases where 'paper cut' is pre-processing as negatives.
return
# Update self.documents[n] and self.vocab[n] for ngrams and cls
for n in (1,2,3):
ngrams = preprocessing.get_ngrams(n, words)
self.documents[n].append((cls,ngrams))
def train(self, training_data):
for cls,message in training_data:
self._add_example(cls, message)
for n in (1,2,3):
self.vectorizers[n] = TfidfVectorizer([doc for _,doc in self.documents[n]])
def classify(self, message, detailed=False):
"""
'message' is a string to classify. Return True or False classification.
Method is to calculate a log_odds from a liklihood based on
trigram, bigram and unigram (p,n) counts in the training set
For each trigram
return smoothed trigram score if trigram in training set, else
for the 2 bigrams in the trigram
return smoothed bigram score if bigram in training set, else
for the 2 unigrams in the bigram
return smoothed unigram score
get_score() shows the smoothed scoring
<n>gram_score() shows the backoff and smoothing factors
"""
def get_docs_with_terms(vectorizer, ngrams):
docs = set()
for term in ngrams:
if term in vectorizer.vocab:
docs |= set(vectorizer.tfidf[term].keys())
return docs
def get_nearest(K, documents, vectorizer, ngrams, doc_ids):
"""Return doc ids of K documents nearest query_vec
"""
# Compute scores and add to a priority queue
scores = []
for i in doc_ids:
heapq.heappush(scores, (vectorizer.get_distance(i, ngrams), i, documents[i][0]))
# Return top K scores
return [(cls,i,dist) for dist,i,cls in heapq.nlargest(K,scores)]
words = preprocessing.extract_words(message)
if not words:
return False, 0.0
ngrams = dict((n,preprocessing.get_ngrams(n, words)) for n in (1,2,3))
diffs = {}
for n in (1,2,3):
doc_ids = get_docs_with_terms(self.vectorizers[n], ngrams[n])
nearest = get_nearest(KnnClassifier.K, self.documents[n], self.vectorizers[n], ngrams[n], doc_ids )
pos = sum((1 if cls else -1) * (KnnClassifier.backoff ** k) for k,(cls,_,_) in enumerate(nearest))
max_pos = sum(KnnClassifier.backoff ** k for k in range(len(nearest)))
# pos2/max_pos2 is in range [-1,+1]
pos2 = sum((1 if cls else -1) * (KnnClassifier.backoff2 ** (2*k)) for k,(cls,_,_) in enumerate(nearest))
max_pos2 = sum(KnnClassifier.backoff2 ** (2*k) for k in range(len(nearest)))
pos *= pos2
max_pos *= max_pos2
diffs[n] = pos/max_pos if max_pos else 0.0
weights = KnnClassifier.get_weights()
diff = sum(diffs[n]*weights[n] for n in (1,2,3))
return diff > KnnClassifier.threshold, diff
if __name__ == '__main__':
# Just dump the KnnClassifier class to stdout
for k in sorted(KnnClassifier.__dict__):
print '%20s : %s' % (k, BayesClassifier.__dict__[k])