-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDQN.py
154 lines (129 loc) · 5.42 KB
/
DQN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from keras.optimizers import Adam
from keras.models import Sequential
from keras.layers.core import Dense, Dropout
import random
import numpy as np
import pandas as pd
from operator import add
INPUT_NUM = 22
playerLength = 50
playerWidth = 50
def dangerRight(self, player, bullet_list):
for lsr in bullet_list:
if (player.posy + playerLength >= lsr.posy) and (player.posy <= lsr.posy) and (lsr.posx >= player.posx):
return True
return False
def dangerLeft(self, player, bullet_list):
for lsr in bullet_list:
if (player.posy + playerLength >= lsr.posy) and (player.posy <= lsr.posy) and (lsr.posx <= player.posx):
return True
return False
def dangerUp(self, player, bullet_list):
for lsr in bullet_list:
if (player.posx + playerWidth >= lsr.posx) and (player.posx <= lsr.posx) and (lsr.posy >= player.posy):
return True
return False
def dangerDown(self, player, bullet_list):
for lsr in bullet_list:
if (player.posx + playerWidth >= lsr.posx) and (player.posx <= lsr.posx) and (lsr.posy <= player.posy):
return True
return False
class DQNAgent(object):
def __init__(self):
self.reward = 0
self.gamma = 0.9
self.dataframe = pd.DataFrame()
self.short_memory = np.array([])
self.agent_target = 1
self.agent_predict = 0
self.learning_rate = 0.0005
self.model = self.network()
self.model = self.network("weights.hdf5")
self.epsilon = 0
self.actual = []
self.memory = []
def get_state(self, bullet_list, player, opponent):
dangerRightBool = dangerRight(self,player, bullet_list)
dangerLeftBool = dangerLeft(self,player,bullet_list)
dangerUpBool = dangerUp(self,player,bullet_list)
dangerDownBool = dangerDown(self,player,bullet_list)
state = [
dangerRightBool,
dangerLeftBool,
dangerUpBool,
dangerDownBool,
player.posx == opponent.posx, # same vertical
player.posx > opponent.posx, # right of opponent
player.posx < opponent.posx, # left of opponent
player.posy == opponent.posy, # same horizon
player.posy > opponent.posy, # above opponent
player.posy < opponent.posy, # below opponent
player.velx < 0, # move left
player.velx > 0, # move right
player.velx == 0, # no horizontal movement
player.vely < 0, # move down
player.vely > 0, # move up
player.vely == 0, # no vertical movement
opponent.velx < 0, # move left
opponent.velx > 0, # move right
opponent.velx == 0, # no horizontal movement
opponent.vely < 0, # move down
opponent.vely > 0, # move up
opponent.vely == 0, # no vertical movement
]
for i in range(len(state)):
if state[i]:
state[i] = 1
else:
state[i] = 0
return np.asarray(state)
def set_reward(self, player, death):
self.reward = 0
if death:
self.reward += -1000
return self.reward
else:
self.reward += 50
if player.kill:
self.reward += 1000
if (player.posx > 100) and (player.posx < 700):
self.reward += 50
if (player.posy > 100) and (player.posy < 700):
self.reward += 50
self.reward -= 30 #nothing happening = punishment
return self.reward
def network(self, weights = None):
model = Sequential()
model.add(Dense(output_dim=120, activation='relu', input_dim = INPUT_NUM)) #originally 11
model.add(Dropout(0.15))
model.add(Dense(output_dim=120, activation='relu'))
model.add(Dropout(0.15))
model.add(Dense(output_dim=120, activation='relu'))
model.add(Dropout(0.15))
model.add(Dense(output_dim=5, activation='softmax')) #originally 3
opt = Adam(self.learning_rate)
model.compile(loss='mse', optimizer=opt)
if weights:
model.load_weights(weights)
return model
def remember(self,state,action,reward,next_state,done):
self.memory.append((state,action,reward,next_state,done))
def replay_new(self, memory):
if len(memory) > 1000: #1000
minibatch = random.sample(memory, 1000) #1000
else:
minibatch = memory
for state, action, reward, next_state, done in minibatch:
target = reward
if not done:
target = reward + self.gamma * np.amax(self.model.predict(np.array([next_state]))[0])
target_f = self.model.predict(np.array([state]))
target_f[0][np.argmax(action)] = target
self.model.fit(np.array([state]), target_f, epochs=1, verbose=0)
def train_short_memory(self, state, action, reward, next_state, done):
target = reward
if not done:
target = reward + self.gamma * np.amax(self.model.predict(next_state.reshape((1,INPUT_NUM)))[0])
target_f = self.model.predict(state.reshape((1,INPUT_NUM)))
target_f[0][np.argmax(action)] = target
self.model.fit(state.reshape((1,INPUT_NUM)), target_f, epochs=1,verbose=0)