-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhotwords.py
119 lines (102 loc) · 4.03 KB
/
hotwords.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python
__author__ = 'ziyan.yin'
from copy import deepcopy
from typing import Dict, List
import stringutils
import numpy as np
class HotWordsRecognize:
"""
input articles, comments, issues or files
>>> this = HotWordsRecognize()
>>> this.read_file('z:/comments.txt')
or
>>> this.add_content('you are my best')
use recognize method to analyze total content
>>> words = this.recognize(top=10)
words like [('a', 1000), ('b', 200)] in times order desc
"""
__slots__ = ['content', 'keywords', 'word_tree', 'similar']
def __init__(self, similar=0.8):
self.content: List[str] = []
self.keywords: Dict[str, int] = dict()
self.word_tree: Dict[str, Dict[str, int]] = dict()
self.similar: float = similar
def read_file(self, file, encoding='utf-8'):
with open(file, 'r', encoding=encoding) as f:
lines = f.readlines()
for line in lines:
self.add_content(line)
def add_content(self, context: str):
if not stringutils.is_empty(context):
self.content.extend(stringutils.words_standard(context).split(' '))
def _find_suffix_tree(self):
def __suffix_words(head: str, words: str, length: int, tree: dict):
if len(words) > length:
if head not in tree:
tree[head] = dict()
raw_word = words[:length + 1]
if raw_word not in tree[head]:
tree[head][raw_word] = 0
tree[head][raw_word] += 1
for c in self.content:
for i in range(len(c) - 1):
for le in range(6):
__suffix_words(c[i], c[i + 1:], le, self.word_tree)
def _filter_sparse_times(self):
times = []
for k, v in self.word_tree.items():
for k_1, v_1 in v.items():
times.append(v_1)
median = np.median(times)
for k, v in deepcopy(self.word_tree).items():
for k_1, v_1 in v.items():
if v_1 <= median + 1:
del self.word_tree[k][k_1]
def _scan(self):
for k, v in self.word_tree.items():
word_list = sorted(v.items(), key=lambda x: x[0])
parent = ('', 0)
for word in word_list:
if parent[0]:
if word[0].startswith(parent[0]):
if float(word[1])/float(parent[1]) <= self.similar:
self.keywords[k + parent[0]] = int(parent[1])
else:
self.keywords[k + parent[0]] = int(parent[1])
parent = word
if parent[0]:
self.keywords[k + parent[0]] = int(parent[1])
def _check_back(self, top=20, size=30):
copy_keywords = sorted(self.keywords.items(), key=lambda x: x[1], reverse=True)[0:size]
res = []
for word, count in copy_keywords:
if word in self.keywords:
for ex_word in self.keywords:
if ex_word == word:
continue
if word in ex_word and self.keywords[ex_word]/count >= self.similar:
word = ex_word
count = self.keywords[ex_word]
break
if (word, count) not in res:
res.append((word, count))
if len(res) < top:
res = self._check_back(top, size+20)
return sorted(res, key=lambda x: x[1], reverse=True)[:top]
def recognize(self, top):
self.word_tree.clear()
if len(self.content) > 0:
self._find_suffix_tree()
self._filter_sparse_times()
self._scan()
return self._check_back(top, top+20)
return []
def read_file(file):
reg = HotWordsRecognize()
reg.read_file(file)
return reg.recognize(top=20)
def read_lines(lines):
reg = HotWordsRecognize()
for line in lines:
reg.add_content(line)
return reg.recognize(top=20)