-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrid_pathfinder_algorithm.py
195 lines (158 loc) · 7.93 KB
/
grid_pathfinder_algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# -*- coding: utf-8 -*-
"""
/***************************************************************************
Pathfinder
A QGIS plugin
Finds near-optimal paths in raster images
Generated by Plugin Builder: http://g-sherman.github.io/Qgis-Plugin-Builder/
-------------------
begin : 2022-01-15
copyright : (C) 2022 by Noah Mollerstuen
email : [email protected]
***************************************************************************/
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/
"""
__author__ = 'Noah Mollerstuen'
__date__ = '2022-01-15'
__copyright__ = '(C) 2022 by Noah Mollerstuen'
# This will get replaced with a git SHA1 when you do a git archive
__revision__ = '$Format:%H$'
import math
import typing as t
import numpy as np
from qgis.core import (QgsProject,
QgsProcessing,
QgsFeature,
QgsGeometry,
QgsPoint,
QgsRectangle)
from .pathfinder_algorithm import PathfinderAlgorithm, PriorityQueue
DIRECTION_MAPPING = {
(0, 1): 1, # 0 is reserved for None
(1, 0): 2,
(0, -1): 3,
(-1, 0): 4
}
DIRECTION_MAPPING_INV = {v: k for k, v in DIRECTION_MAPPING.items()}
NEIGHBORS = ((0, 1), (1, 0), (0, -1), (-1, 0))
def point_to_pixel(point: QgsPoint, img_bounds: QgsRectangle, img_width: int, img_height: int) -> (int, int):
return (
math.floor((point.x() - img_bounds.xMinimum()) / img_bounds.width() * img_width),
img_height - 1 - math.floor((point.y() - img_bounds.yMinimum()) / img_bounds.height() * img_height)
)
def pixel_to_point(pix: (int, int), img_bounds: QgsRectangle, img_width: int, img_height: int) -> QgsPoint:
return QgsPoint(
(pix[0] + 0.5) / img_width * img_bounds.width() + img_bounds.xMinimum(),
(img_height - 1 - pix[1] + 0.5) / img_height * img_bounds.height() + img_bounds.yMinimum()
)
def a_star_heuristic(pos1: (int, int), pos2: (int, int)) -> int:
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
def get_neighbors(pos: (int, int), max_x, max_y):
return [
p for p in (((pos[0] + n[0], pos[1] + n[1]), n) for n in NEIGHBORS) if
0 <= p[0][0] < max_x and 0 <= p[0][1] < max_y
]
class GridPathfinderAlgorithm(PathfinderAlgorithm):
"""
This algorithm uses the Theta* algorithm to find near-optimal paths within raster images. The user may specify
custom expressions cost and traversability of the image. The output paths are constrained to the pixel grid.
"""
def processAlgorithm(self, parameters, context, feedback):
"""
Here is where the processing itself takes place.
"""
self.parse_inputs(parameters, context)
start_pos = point_to_pixel(self.start_point, self.bounding_rect, self.grid_width, self.grid_height)
if not self.is_traversable(start_pos):
raise ValueError(self.tr("Starting point must be traversable"))
end_pos = point_to_pixel(self.end_point, self.bounding_rect, self.grid_width, self.grid_height)
if not self.is_traversable(end_pos):
raise ValueError(self.tr("Ending point must be traversable"))
# A* Algorithm
frontier = PriorityQueue()
frontier.put(start_pos, 0)
came_from = np.empty((self.grid_height, self.grid_width), np.ushort)
came_from[start_pos[1]][start_pos[0]] = 0
cost_so_far = np.full((self.grid_height, self.grid_width), np.inf, np.float64)
cost_so_far[start_pos[1]][start_pos[0]] = 0
starting_heuristic = a_star_heuristic(start_pos, end_pos)
min_heuristic = starting_heuristic
warned_inadmissible_heuristic = False
print("Starting A*")
while True:
if feedback.isCanceled():
raise RuntimeError("Task Cancelled")
feedback.setProgress((1 - min_heuristic / starting_heuristic) * 100)
if frontier.empty():
raise ValueError(self.tr("No path found"))
current = frontier.get()
if len(frontier) > 100000:
print(frontier.elements)
raise RuntimeError(self.tr("Max queue length exceeded"))
if current == end_pos:
break
neighbors = [n for n in get_neighbors(current, self.grid_width, self.grid_height) if
self.is_traversable(n[0])]
for next_pos, direction in neighbors:
add_cost = self.get_cost(next_pos)
if not warned_inadmissible_heuristic and add_cost < 1:
warned_inadmissible_heuristic = True
feedback.pushInfo(
self.tr("[WARNING] Custom cost expression is less than 1, path may not be optimal!"))
new_cost = cost_so_far[current[1]][current[0]] + add_cost
if new_cost < cost_so_far[next_pos[1]][next_pos[0]]:
cost_so_far[next_pos[1]][next_pos[0]] = new_cost
heuristic = a_star_heuristic(next_pos, end_pos)
min_heuristic = min(min_heuristic, heuristic)
frontier.put(next_pos, new_cost + heuristic)
came_from[next_pos[1]][next_pos[0]] = DIRECTION_MAPPING[direction]
print("Reconstructing path")
current_point = end_pos
last_point = None
path = []
while True:
if feedback.isCanceled():
raise RuntimeError("Task Cancelled")
delta_code = came_from[current_point[1]][current_point[0]]
if delta_code == 0:
next_point = None
else:
delta = DIRECTION_MAPPING_INV[delta_code]
next_point = (current_point[0] - delta[0], current_point[1] - delta[1])
if last_point is None or next_point is None or \
next_point[0] - current_point[0] != current_point[0] - last_point[0] or \
next_point[1] - current_point[1] != current_point[1] - last_point[1]:
path.append(pixel_to_point(current_point, self.bounding_rect, self.grid_width, self.grid_height))
if current_point == start_pos:
break
last_point = current_point
current_point = next_point
# Add a feature in the sink
feature = QgsFeature()
feature.setGeometry(QgsGeometry.fromPolyline(path))
self.output_sink.addFeature(feature)
# Return the results of the algorithm. In this case our only result is
# the feature sink which contains the processed features, but some
# algorithms may return multiple feature sinks, calculated numeric
# statistics, etc. These should all be included in the returned
# dictionary, with keys matching the feature corresponding parameter
# or output names.
return {self.OUTPUT: self.output_id}
def name(self):
"""
Returns the algorithm name, used for identifying the algorithm. This
string should be fixed for the algorithm, and must not be localised.
The name should be unique within each provider. Names should contain
lowercase alphanumeric characters only and no spaces or other
formatting characters.
"""
return 'Find Path (Grid)'
def createInstance(self):
return GridPathfinderAlgorithm()