Package horizons :: Package util :: Package pathfinding :: Module pathfinding
[hide private]
[frames] | no frames]

Source Code for Module horizons.util.pathfinding.pathfinding

  1  # ################################################### 
  2  # Copyright (C) 2008-2017 The Unknown Horizons Team 
  3  # team@unknown-horizons.org 
  4  # This file is part of Unknown Horizons. 
  5  # 
  6  # Unknown Horizons is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, 
 12  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 14  # GNU General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the 
 18  # Free Software Foundation, Inc., 
 19  # 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA 
 20  # ################################################### 
 21   
 22  import logging 
 23   
 24   
 25  """ 
 26  This file contains only the pathfinding algorithm. It is implemented in a callable class 
 27  called FindPath. You should never ever use this class directly, just through the Pather 
 28  interface. 
 29  """ 
 30   
 31   
32 -class FindPath:
33 """ Finds best path from source to destination via a*-algo 34 "best path" means path with shortest travel time, which 35 is not necessarily the shortest path (cause roads have different speeds) 36 """ 37 log = logging.getLogger("world.pathfinding") 38
39 - def __call__(self, source, destination, path_nodes, blocked_coords=None, 40 diagonal=False, make_target_walkable=True):
41 """ 42 @param source: Rect, Point or BasicBuilding 43 @param destination: Rect, Point or BasicBuilding 44 @param path_nodes: dict { (x, y) = speed_on_coords } or list [(x, y), ..] 45 @param blocked_coords: temporarily blocked coords (e.g. by a unit) as list or dict of tuples 46 @param diagonal: whether the unit is able to move diagonally 47 @param make_target_walkable: whether we force the tiles of the target to be walkable, 48 even if they actually aren't (used e.g. when walking to a building) 49 @return: list of coords as tuples that are part of the best path 50 (from first coord after source to first coord in destination) 51 or None if no path is found 52 """ 53 # assure correct call 54 # commented out checks since BasicBuilding can't be imported here 55 #assert isinstance(source, (Rect, Point, BasicBuilding)) 56 #assert isinstance(destination, (Rect, Point, BasicBuilding)) 57 blocked_coords = blocked_coords or [] 58 assert isinstance(path_nodes, (dict, list, set)) 59 assert isinstance(blocked_coords, (dict, list, set)) 60 61 # save args 62 self.source = source 63 self.destination = destination 64 self.path_nodes = path_nodes 65 self.blocked_coords = blocked_coords 66 self.diagonal = diagonal 67 self.make_target_walkable = make_target_walkable 68 69 #self.log.debug('searching path from %s to %s. blocked: %s', 70 # source, destination, blocked_coords) 71 72 # prepare args 73 if not self.setup(): 74 return None 75 76 # execute algorithm on the args 77 path = self.execute() 78 self.log.debug('found path: %s', path) 79 return path
80
81 - def setup(self):
82 """Sets up variables for execution of algorithm 83 @return: bool, whether setup was successful""" 84 # support for building 85 if hasattr(self.source, 'position'): 86 self.source = self.source.position 87 if hasattr(self.destination, 'position'): 88 self.destination = self.destination.position 89 90 if isinstance(self.path_nodes, list) or isinstance(self.path_nodes, set): 91 self.path_nodes = dict.fromkeys(self.path_nodes, 1.0) 92 93 # check if target is blocked 94 target_is_blocked = True 95 for coord in self.destination.tuple_iter(): 96 if coord not in self.blocked_coords: 97 target_is_blocked = False 98 if target_is_blocked: 99 #self.log.debug("FindPath: target is blocked") 100 return False 101 102 # check if target is walkable 103 if not self.make_target_walkable: 104 target_is_walkable = False 105 for coord in self.destination: 106 if coord.to_tuple() in self.path_nodes: 107 target_is_walkable = True 108 break 109 if not target_is_walkable: 110 #self.log.debug("FindPath: target is not walkable") 111 return False 112 113 return True
114
115 - def execute(self):
116 """Executes algorithm""" 117 # nodes are the keys of the following dicts (x, y) 118 # the val of the keys are: (previous node, distance to here, 119 # distance to here + estimated distance to target) 120 # we use this data structure out of speed consideration, 121 # using a class would admittedly be more readable 122 123 # values of distance is usually measured in speed 124 # since you can't calculate the speed to the destination, 125 # these distances are measured in space 126 # this might become a problem, but this can just be fixed when 127 # the values of speed or slowness and such are defined 128 129 # nodes that weren't processed but will be processed: 130 to_check = {} 131 # nodes that have been processed: 132 checked = {} 133 134 destination = self.destination 135 destination_to_tuple_distance_func = destination.get_distance_function((0, 0)) 136 137 source_coords = self.source.get_coordinates() 138 for coords in source_coords: 139 to_check[coords] = (None, 0, destination_to_tuple_distance_func(destination, coords)) 140 141 # if any node in dest_coords_set has been processed 142 # (i.e. is in checked), a good path has been found 143 dest_coords_set = destination.get_coordinates() 144 dest_coords_set = set(dest_coords_set) 145 if not self.make_target_walkable: 146 # restrict destination coords to walkable tiles, by default they are counted as walkable 147 # the manual set intersection is used because set.intersection(dict) doesn't take advantage of the fast lookup 148 dest_coords_set = {coords for coords in dest_coords_set if coords in self.path_nodes} 149 if not dest_coords_set: 150 return None 151 152 from heapq import heappush, heappop 153 heap = [] 154 for coords, data in to_check.items(): 155 heappush(heap, (data[2], coords)) 156 157 # pull dereferencing out of loop 158 path_nodes = self.path_nodes 159 blocked_coords = self.blocked_coords 160 161 # loop until we have no more nodes to check 162 while to_check: 163 164 # find next node to check, which is the one with best rating 165 (_, cur_node_coords) = heappop(heap) 166 cur_node_data = to_check[cur_node_coords] 167 168 # shortcuts: 169 x = cur_node_coords[0] 170 y = cur_node_coords[1] 171 172 # find possible neighbors 173 # optimization TODO: use data structures more suitable for contains-check 174 if self.diagonal: 175 # all relevant adjacent neighbors 176 x_p1 = x + 1 177 x_m1 = x - 1 178 y_p1 = y + 1 179 y_m1 = y - 1 180 neighbors = (i for i in ((x_m1, y_m1), (x_m1, y), 181 (x_m1, y_p1), (x, y_m1), 182 (x, y_p1), (x_p1, y_m1), 183 (x_p1, y), (x_p1, y_p1)) 184 if # conditions are sorted by likelihood in ship worst case 185 i not in checked 186 and (i in path_nodes 187 or i in source_coords 188 or i in dest_coords_set) 189 and i not in blocked_coords) 190 else: 191 # all relevant vertical and horizontal neighbors 192 neighbors = (i for i in ((x - 1, y), (x + 1, y), 193 (x, y - 1), (x, y + 1)) 194 if (i in path_nodes 195 or i in source_coords 196 or i in dest_coords_set) 197 and i not in checked 198 and i not in blocked_coords) 199 200 # Profiling info: In the worst case, this for-loop takes 80% of the time. 201 # Parts of this are actually spent in evaluating the generator expressions from the if above 202 203 for neighbor_node in neighbors: 204 if neighbor_node not in to_check: 205 # add neighbor to list of reachable nodes to check 206 207 # save previous node, calc distance to neighbor_node 208 # and estimate from neighbor_node to destination 209 dist_to_here = cur_node_data[1] + path_nodes.get(cur_node_coords, 0) 210 211 total_dist_estimation = destination_to_tuple_distance_func(destination, neighbor_node) + dist_to_here 212 to_check[neighbor_node] = (cur_node_coords, 213 dist_to_here, 214 total_dist_estimation) 215 216 heappush(heap, (total_dist_estimation, neighbor_node)) 217 218 else: 219 # neighbor has been processed, 220 # check if current node provides a better path to this neighbor 221 distance_to_neighbor = cur_node_data[1] + path_nodes.get(cur_node_coords, 0) 222 223 neighbor = to_check[neighbor_node] 224 225 if neighbor[1] > distance_to_neighbor: 226 # found better path to neighbor, update values 227 neighbor = (cur_node_coords, 228 distance_to_neighbor, 229 distance_to_neighbor + (neighbor[2] - neighbor[1])) 230 231 # done processing cur_node 232 checked[cur_node_coords] = cur_node_data 233 del to_check[cur_node_coords] 234 235 # check if cur_node is at the destination 236 if cur_node_coords in dest_coords_set: 237 # we're done. 238 # insert steps of path to a list and return it 239 path = [cur_node_coords] 240 previous_node = cur_node_data[0] 241 while previous_node is not None: 242 path.insert(0, previous_node) 243 previous_node = checked[previous_node][0] 244 245 return path 246 247 else: 248 return None
249