-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpriority_queue.py
161 lines (134 loc) · 5.61 KB
/
priority_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# Priority Queue - similar to binary heaps but stores nodes which
# contain 'priority' data. Parents always have higher priority than child
# elements. Relationship between parent index => child index = 2n+1 (left)
# or 2n+2 (right). Child => parent = (n-1)//2 where n is current index
# Node data for priority queues can contain any data as sorted by priority
# not value
class Node:
def __init__(self, value, priority):
self.value = value
self.priority = priority
# Big O for priority queues is Time: O(log(n)) for insertion and removal.
# Search is O(n). Space: O(n)
# Min Priority Queue - parents always less than child nodes
class MaxPQ:
"""
Max Priority Queue. Stores node data sorted by priority level. Highest
priority values first.
:return: List of nodes contained in the queue
"""
def __init__(self):
self.values = []
def __repr__(self):
return f"{self.get_all_nodes()}"
# returns a list of all nodes as tuples (data, priority)
def get_all_nodes(self):
return [(x.value, x.priority) for x in self.values]
# enqueue a new node
def enqueue(self, value, priority):
new_node = Node(value, priority)
self.values.append(new_node)
index = len(self.values) - 1
parent_index = (index-1)//2 if (index-1)//2 >= 0 else None
while parent_index is not None:
if priority > self.values[parent_index].priority:
self.values[index] = self.values[parent_index]
self.values[parent_index] = new_node
index = parent_index
parent_index = (index-1)//2 if (index-1)//2 >= 0 else None
else:
break
return self.values
# enqueue multiple nodes
def enqueue_all(self, lst):
for node in lst:
self.enqueue(node[0], node[1])
# return dequeued node at stated index, default is to remove the root
def dequeue(self, start=0):
if len(self.values) < 0 or start >= len(self.values):
return None
elif len(self.values) == 1 or start == len(self.values)-1:
return self.values.pop()
node_removed = self.values[start]
end_node = self.values.pop()
self.values[start] = end_node
index = start
while True:
left = 2 * index + 1 if 2 * index + 1 < len(self.values) else None
right = 2 * index + 2 if 2 * index + 2 < len(self.values) else None
if left and end_node.priority < self.values[left].priority or \
right and end_node.priority > self.values[right].priority:
if not right or self.values[left].priority >= self.values[
right].priority:
self.values[index] = self.values[left]
self.values[left] = end_node
index = left
else:
self.values[index] = self.values[right]
self.values[right] = end_node
index = right
else:
break
return node_removed
# Min Priority Queue - parents always less than child nodes
class MinPQ:
"""
Min Priority Queue. Stores node data sorted by priority level. Lowest
priority values first.
:return: List of nodes contained in the queue
"""
def __init__(self):
self.values = []
def __repr__(self):
return f"{self.get_all_nodes()}"
def __len__(self):
return len(self.values)
# returns a list of all nodes as tuples (data, priority)
def get_all_nodes(self):
return [(x.value, x.priority) for x in self.values]
# enqueue a new node
def enqueue(self, value, priority):
new_node = Node(value, priority)
self.values.append(new_node)
index = len(self.values) - 1
parent_index = (index-1)//2 if (index-1)//2 >= 0 else None
while parent_index is not None:
if priority < self.values[parent_index].priority:
self.values[index] = self.values[parent_index]
self.values[parent_index] = new_node
index = parent_index
parent_index = (index-1)//2 if (index-1)//2 >= 0 else None
else:
break
return self.values
# enqueue multiple nodes
def enqueue_all(self, lst):
for node in lst:
self.enqueue(node[0], node[1])
# return dequeued node at stated index, default is to remove the root
def dequeue(self, start=0):
if len(self.values) < 0 or start >= len(self.values):
return None
elif len(self.values) == 1 or start == len(self.values)-1:
return self.values.pop()
node_removed = self.values[start]
end_node = self.values.pop()
self.values[start] = end_node
index = start
while True:
left = 2 * index + 1 if 2 * index + 1 < len(self.values) else None
right = 2 * index + 2 if 2 * index + 2 < len(self.values) else None
if left and end_node.priority > self.values[left].priority or \
right and end_node.priority > self.values[right].priority:
if not right or self.values[left].priority <= self.values[
right].priority:
self.values[index] = self.values[left]
self.values[left] = end_node
index = left
else:
self.values[index] = self.values[right]
self.values[right] = end_node
index = right
else:
break
return node_removed