1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import logging
23
24
25 """
26 This file contains only the pathfinding algorithm. It is implemented in a callable class
27 called FindPath. You should never ever use this class directly, just through the Pather
28 interface.
29 """
30
31
33 """ Finds best path from source to destination via a*-algo
34 "best path" means path with shortest travel time, which
35 is not necessarily the shortest path (cause roads have different speeds)
36 """
37 log = logging.getLogger("world.pathfinding")
38
39 - def __call__(self, source, destination, path_nodes, blocked_coords=None,
40 diagonal=False, make_target_walkable=True):
41 """
42 @param source: Rect, Point or BasicBuilding
43 @param destination: Rect, Point or BasicBuilding
44 @param path_nodes: dict { (x, y) = speed_on_coords } or list [(x, y), ..]
45 @param blocked_coords: temporarily blocked coords (e.g. by a unit) as list or dict of tuples
46 @param diagonal: whether the unit is able to move diagonally
47 @param make_target_walkable: whether we force the tiles of the target to be walkable,
48 even if they actually aren't (used e.g. when walking to a building)
49 @return: list of coords as tuples that are part of the best path
50 (from first coord after source to first coord in destination)
51 or None if no path is found
52 """
53
54
55
56
57 blocked_coords = blocked_coords or []
58 assert isinstance(path_nodes, (dict, list, set))
59 assert isinstance(blocked_coords, (dict, list, set))
60
61
62 self.source = source
63 self.destination = destination
64 self.path_nodes = path_nodes
65 self.blocked_coords = blocked_coords
66 self.diagonal = diagonal
67 self.make_target_walkable = make_target_walkable
68
69
70
71
72
73 if not self.setup():
74 return None
75
76
77 path = self.execute()
78 self.log.debug('found path: %s', path)
79 return path
80
82 """Sets up variables for execution of algorithm
83 @return: bool, whether setup was successful"""
84
85 if hasattr(self.source, 'position'):
86 self.source = self.source.position
87 if hasattr(self.destination, 'position'):
88 self.destination = self.destination.position
89
90 if isinstance(self.path_nodes, list) or isinstance(self.path_nodes, set):
91 self.path_nodes = dict.fromkeys(self.path_nodes, 1.0)
92
93
94 target_is_blocked = True
95 for coord in self.destination.tuple_iter():
96 if coord not in self.blocked_coords:
97 target_is_blocked = False
98 if target_is_blocked:
99
100 return False
101
102
103 if not self.make_target_walkable:
104 target_is_walkable = False
105 for coord in self.destination:
106 if coord.to_tuple() in self.path_nodes:
107 target_is_walkable = True
108 break
109 if not target_is_walkable:
110
111 return False
112
113 return True
114
116 """Executes algorithm"""
117
118
119
120
121
122
123
124
125
126
127
128
129
130 to_check = {}
131
132 checked = {}
133
134 destination = self.destination
135 destination_to_tuple_distance_func = destination.get_distance_function((0, 0))
136
137 source_coords = self.source.get_coordinates()
138 for coords in source_coords:
139 to_check[coords] = (None, 0, destination_to_tuple_distance_func(destination, coords))
140
141
142
143 dest_coords_set = destination.get_coordinates()
144 dest_coords_set = set(dest_coords_set)
145 if not self.make_target_walkable:
146
147
148 dest_coords_set = {coords for coords in dest_coords_set if coords in self.path_nodes}
149 if not dest_coords_set:
150 return None
151
152 from heapq import heappush, heappop
153 heap = []
154 for coords, data in to_check.items():
155 heappush(heap, (data[2], coords))
156
157
158 path_nodes = self.path_nodes
159 blocked_coords = self.blocked_coords
160
161
162 while to_check:
163
164
165 (_, cur_node_coords) = heappop(heap)
166 cur_node_data = to_check[cur_node_coords]
167
168
169 x = cur_node_coords[0]
170 y = cur_node_coords[1]
171
172
173
174 if self.diagonal:
175
176 x_p1 = x + 1
177 x_m1 = x - 1
178 y_p1 = y + 1
179 y_m1 = y - 1
180 neighbors = (i for i in ((x_m1, y_m1), (x_m1, y),
181 (x_m1, y_p1), (x, y_m1),
182 (x, y_p1), (x_p1, y_m1),
183 (x_p1, y), (x_p1, y_p1))
184 if
185 i not in checked
186 and (i in path_nodes
187 or i in source_coords
188 or i in dest_coords_set)
189 and i not in blocked_coords)
190 else:
191
192 neighbors = (i for i in ((x - 1, y), (x + 1, y),
193 (x, y - 1), (x, y + 1))
194 if (i in path_nodes
195 or i in source_coords
196 or i in dest_coords_set)
197 and i not in checked
198 and i not in blocked_coords)
199
200
201
202
203 for neighbor_node in neighbors:
204 if neighbor_node not in to_check:
205
206
207
208
209 dist_to_here = cur_node_data[1] + path_nodes.get(cur_node_coords, 0)
210
211 total_dist_estimation = destination_to_tuple_distance_func(destination, neighbor_node) + dist_to_here
212 to_check[neighbor_node] = (cur_node_coords,
213 dist_to_here,
214 total_dist_estimation)
215
216 heappush(heap, (total_dist_estimation, neighbor_node))
217
218 else:
219
220
221 distance_to_neighbor = cur_node_data[1] + path_nodes.get(cur_node_coords, 0)
222
223 neighbor = to_check[neighbor_node]
224
225 if neighbor[1] > distance_to_neighbor:
226
227 neighbor = (cur_node_coords,
228 distance_to_neighbor,
229 distance_to_neighbor + (neighbor[2] - neighbor[1]))
230
231
232 checked[cur_node_coords] = cur_node_data
233 del to_check[cur_node_coords]
234
235
236 if cur_node_coords in dest_coords_set:
237
238
239 path = [cur_node_coords]
240 previous_node = cur_node_data[0]
241 while previous_node is not None:
242 path.insert(0, previous_node)
243 previous_node = checked[previous_node][0]
244
245 return path
246
247 else:
248 return None
249