-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqLearning.py
162 lines (123 loc) · 6.54 KB
/
qLearning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import objects
import itertools
import state
import random
class QLearn:
def __init__(self, startingState, reward, livingReward, learningRate, epsilon):
# Variables that change the AI behavior
self.rewardDiscount = reward
self.livingReward = livingReward
self.learningRate = learningRate
self.epsilon = epsilon
self.currentState = startingState
self.states = list() # contains all the valid states of the model
self.policyTable = dict() # store policies using state as a key, which returns an action
self.qTable = dict() # will store q values using (state, action) keys and a floating point value
self.iterations = 0 # DON'T MODIFY THIS. The agent increments this on its own.
self.numberOfWins = 0
self.initializeStates() # initialize all the states that exist in the MDP
def initializeStates(self):
# iterates over the board and creates every valid state that exists in the MDP.
# this is gonna be computationally expensive as shit.
# get all possible player, key, and wormhole positions. Positions are stored as a tuple of (x, y) coordinates.
# Note that these are ALL positions each of these items can occupy.
playerPos = list()
keyPos = list()
# get player positions
for x in range(0, len(objects.board.tiles)):
for y in range(0, len(objects.board.tiles[0])):
if objects.board.tiles[x][y].isWall():
continue
# if this tile is avalid player position, add it to list.
playerPos.append((x, y))
# get all valid combinations of key positions.
keyList = self.getKeyPositionCombinations()
# populate list of all possible states
stateList = list(itertools.product(playerPos, keyList))
for newState in stateList:
stateObj = state.State(newState[0], newState[1])
self.states.append(stateObj) # will change to state.State(state[0], state[1], state[2]) once wormholes are in.
actionVector = self.getActionVector(stateObj, objects.board)
# assign a random action to the initial policy
self.policyTable[stateObj] = random.choice(actionVector)
for action in actionVector:
self.qTable[(stateObj, action)] = 0 # initialize state's value to 0 in the dictionary
self.qTable[(None, None)] = 0 # this will be used for the exit state
def getActionVector(self, currentState, board):
# helper function called by valueIteration() to get a vector of valid directions from the position passed.
# NOTE: playerPos is a tuple of (x, y) type. So playerPos[0] gives x value and playerPos[1] gives y value
x = currentState.playerPos[0]
y = currentState.playerPos[1]
validMoves = ["Left", "Right", "Up", "Down", "Stay"]
if board.tiles[x][y].isExit() and all(val == None for val in currentState.keyPositions):
validMoves = ["Exit"] # player is on exit and has all the keys, exit is the only right move.
return validMoves
# rule out which actions we can take depending on current and next x, y positions. Based on implementation in key.py
if x == 0 or board.tiles[x - 1][y].isWall():
validMoves.remove("Left")
if x == board.width - 1 or board.tiles[x + 1][y].isWall():
validMoves.remove("Right")
if y == 0 or board.tiles[x][y - 1].isWall():
validMoves.remove("Up")
if y == board.height - 1 or board.tiles[x][y + 1].isWall():
validMoves.remove("Down")
return validMoves
def getKeyPositionCombinations(self):
# get key positions for each key. We have min and max x,y positions for each key that must be considered
counter = 0
keyPos = [list() for x in range(0, len(objects.board.keys))]
for key in objects.board.keys:
for x in range(key.xmin, key.xmax + 1):
for y in range(key.ymin, key.ymax + 1):
if objects.board.tiles[x][y].isWall():
# wall tiles are not valid key positions
continue
keyPos[counter].append((x, y)) # add a position for this specific key.
keyPos[counter].append(None)
counter += 1
# get cartesian product of all key positions. This gives us all possible key position combinations
keyCombos = list(set(itertools.product(*keyPos)))
# Find entries that contain duplicate positions. Keys cannot be on top of one another.
removeList = list()
for i in range(0, len(keyCombos)):
for j in range(0, len(objects.board.keys)):
for k in range(0, len(objects.board.keys)):
if j == k:
continue
if keyCombos[i][j] == keyCombos[i][k] and (keyCombos[i][j] and keyCombos[i][k] != None):
removeList.append(keyCombos[i])
break
return [list(positions) for positions in keyCombos if positions not in removeList]
# print keyList
def initializeNextStateTable():
return
def updateState(self, actionTaken, rewardReceived):
# will be called by player.aiQMove()
# Q_k+1(s, a) = (1 - learningRate)(Q_k(s, a)) + learningRate(R(s, a, s') + rewardDiscount(max_a'(Q_k(s', a'))))
newState = state.State((objects.player.x, objects.player.y), [(key.x, key.y) if not(key.acquired) else None for key in objects.board.keys])
rewardReceived += self.livingReward # apply living reward
# use the policy table to get the largest action' for the new state. Policy table should always be up-to-date with highest action values
sample = rewardReceived + (self.rewardDiscount * self.qTable[(newState, self.policyTable[newState])])
newValue = (
(1 - self.learningRate) * self.qTable[(self.currentState, actionTaken)] +
(self.learningRate * sample)
)
self.qTable[(self.currentState, actionTaken)] = newValue
# get best action from the q values and update policy
actionVector = self.getActionVector(self.currentState, objects.board)
temporaryQValues = dict()
for action in actionVector:
temporaryQValues[action] = self.qTable[(self.currentState, action)]
maxActionValuePair = max(temporaryQValues, key=temporaryQValues.get)
self.policyTable[self.currentState] = maxActionValuePair # assign the best action to take from the current state.
self.currentState = newState
def getCurrentStateActionFromPolicy(self):
# epsilon decays exponentially. So earlier runs are very random and increasingly lower
if random.random() <= self.epsilon**self.iterations or self.currentState not in self.policyTable:
# handle exploration by randomizing our action we take if we RNG epsilon
# also handles a case where we haven't yet figured out a policy for the current state
return random.choice(self.getActionVector(self.currentState, objects.board))
# if we don't RNG epsilon for exploration, then return the best action from our policy so far
return self.policyTable[self.currentState]
def getWinRate(self):
return 100 * float(self.numberOfWins / self.iterations)