-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlstm_tf.py
163 lines (126 loc) · 5.06 KB
/
lstm_tf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from __future__ import print_function
import numpy as np
import tensorflow as tf
from tensorflow.contrib import rnn
import random
import collections
import time
start_time = time.time()
def elapsed(sec):
if(sec<60):
return str(sec)+"sec"
elif(sec<(60*60)):
return str(sec/60)+"min"
else:
return str(sec/(60*60))+"hr"
log_path = 'Tensorflow/rnn_words'
writer = tf.summary.FileWriter(log_path)
training_fl = 'catandthetext.txt'
def read_data(fname):
with open(fname) as f:
content = f.readlines()
#Stripping spaces from each line
content = [x.strip() for x in content]
#Add words from each line to the content array
content = [word for i in range(len(content)) for word in content[i].split()]
content = np.array(content) #Converting it to a numpy array
return content
training_data = read_data(training_fl)
print("Loaded Training Data....\n")
def build_dataset(words):
count = collections.Counter(words).most_common()
dictionary = dict()
for word, _ in count:
dictionary[word] = len(dictionary)
reverse_dictionary = dict(zip(dictionary.values(),dictionary.keys()))
return dictionary,reverse_dictionary
dictionary,reverse_dictionary = build_dataset(training_data)
vocab_size = len(dictionary)
#Parameters
learning_rate = 0.001
training_iters = 50000
display_step = 1000
n_input = 10
#number of units in RNN cell
n_hidden = 512
#tf Graph Input
x = tf.placeholder("float",[None,n_input,1])
y = tf.placeholder("float",[None,vocab_size])
#RNN output node weights and biases
weights = {'out':tf.Variable(tf.random_normal([n_hidden,vocab_size]))}
biases = {'out':tf.Variable(tf.random_normal([vocab_size]))}
def RNN(x,weights,biases):
#Reshape to [1,n_input]
x = tf.reshape(x,[-1,n_input])
#Generate a n_input-element sequence of inputs
#(eg. [had] [a] [general] --> [20] [6] [33])
x = tf.split(x,n_input,1)
#3Layer LSTM ,each layer has n_hidden units
rnn_cell = rnn.MultiRNNCell([rnn.BasicLSTMCell(n_hidden),rnn.BasicLSTMCell(n_hidden),rnn.BasicLSTMCell(n_hidden)])
#rnn_cell = rnn.BasicLSTMCell(n_hidden) One layer LSTM with n_hidden
#generate prediction
outputs,states = rnn.static_rnn(rnn_cell,x,dtype=tf.float32)
#There are n_input outputs but we only want the last output
return (tf.matmul(outputs[-1],weights['out'])+biases['out'])
pred = RNN(x,weights,biases)
#Loss and Optimizer
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=pred,labels=y))
optimizer = tf.train.RMSPropOptimizer(learning_rate=learning_rate).minimize(cost)
#Model Evaluation
correct_pred = tf.equal(tf.argmax(pred,1),tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred,tf.float32))
#Initializing the variables
init = tf.global_variables_initializer()
with tf.Session() as session:
session.run(init)
step = 0
offset = random.randint(0,n_input+1)
end_offset = n_input + 1
acc_total = 0
loss_total = 0
writer.add_graph(session.graph)
while(step<training_iters):
if(offset > (len(training_data)-end_offset)):
offset = random.randint(0,n_input+1)
symbols_in_keys = [[dictionary[str(training_data[i])]] for i in range(offset,offset+n_input)]
symbols_in_keys = np.reshape(np.array(symbols_in_keys),[-1,n_input,1])
symbols_out_onehot = np.zeros([vocab_size],dtype=float)
symbols_out_onehot[dictionary[str(training_data[offset+n_input])]] = 1.0
symbols_out_onehot = np.reshape(symbols_out_onehot,[1,-1])
_,acc,loss,onehot_pred = session.run([optimizer,accuracy,cost,pred],feed_dict={x:symbols_in_keys,y:symbols_out_onehot})
loss_total += loss
acc_total += acc
if((step+1)%display_step == 0):
print("Iter = "+str(step+1) + "Average Loss = "+ "{:.6f}".format(loss_total/display_step) + ", Average Accuracy= " + "{:.2f}%".format(100*acc_total/display_step))
acc_total = 0
loss_total = 0
symbols_in = [training_data[i] for i in range(offset,offset+n_input)]
symbols_out = training_data[offset+n_input]
symbols_out_pred = reverse_dictionary[int(tf.argmax(onehot_pred,1).eval())]
print("%s-[%s] vs [%s]" %(symbols_in,symbols_out,symbols_out_pred))
step += 1
offset += (n_input + 1)
print("Optimization Finished!")
print("elapsed time :",elapsed(time.time()-start_time))
print("Run on command line")
print("\t tensorboard --logdir=%s" %(log_path))
print("Open the browser to: http://localhost:6006/")
while(True):
prompt = "%s words:" %n_input
sentence = input(prompt)
sentence = sentence.strip()
words = sentence.split(' ')
if(len(words) != n_input):
continue
try:
symbols_in_keys = [dictionary[str(words[i])] for i in range(len(words))]
for i in range(32):
keys = np.reshape(np.array(symbols_in_keys),[-1,n_input,1])
onehot_pred = session.run(pred, feed_dict={x: keys})
onehot_pred_index = int(tf.argmax(onehot_pred, 1).eval())
sentence = "%s %s" % (sentence,reverse_dictionary[onehot_pred_index])
symbols_in_keys = symbols_in_keys[1:]
symbols_in_keys.append(onehot_pred_index)
print(sentence)
except:
print("Word not in dictionary")