Package horizons :: Package util :: Package pathfinding :: Module pather
[hide private]
[frames] | no frames]

Source Code for Module horizons.util.pathfinding.pather

  1  # ################################################### 
  2  # Copyright (C) 2008-2017 The Unknown Horizons Team 
  3  # team@unknown-horizons.org 
  4  # This file is part of Unknown Horizons. 
  5  # 
  6  # Unknown Horizons is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, 
 12  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 14  # GNU General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the 
 18  # Free Software Foundation, Inc., 
 19  # 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA 
 20  # ################################################### 
 21   
 22  import logging 
 23  import weakref 
 24  from typing import List, Tuple 
 25   
 26  from horizons.util.pathfinding import PathBlockedError 
 27  from horizons.util.pathfinding.pathfinding import FindPath 
 28  from horizons.util.shapes import Point 
 29   
 30   
 31  """ 
 32  In this file, you will find an interface to the pathfinding algorithm. 
 33  We just call this interface Pather. It is used by unit to hide implementation details 
 34  of the pathfinding algorithm. 
 35  """ 
36 37 38 -class AbstractPather:
39 """Abstract Interface for pathfinding for use by Unit. 40 Use only subclasses!""" 41 log = logging.getLogger("world.pathfinding") 42
43 - def __init__(self, unit, move_diagonal, session, make_target_walkable=True):
44 """ 45 @param unit: instance of unit, to which the pather belongs 46 @param move_diagonal: whether the unit may walk diagonally 47 @param make_target_walkable: whether we should assume, that we can walk on 48 the tiles that make up the target 49 """ 50 self.session = session 51 self.move_diagonal = move_diagonal 52 self.make_target_walkable = make_target_walkable 53 54 self._unit = weakref.ref(unit) 55 56 self.destination_in_building = False 57 self.source_in_building = False 58 59 self.path = None # type: List[Tuple[int, int]] 60 self.cur = None # type: int
61 62 @property
63 - def unit(self):
64 return self._unit()
65
66 - def _get_path_nodes(self):
67 """Returns nodes, where unit can walk on. 68 Return value type must be supported by FindPath""" 69 raise NotImplementedError
70
71 - def _get_blocked_coords(self):
72 """Returns blocked coordinates 73 Return value type must be supported by FindPath""" 74 return []
75
76 - def _check_for_obstacles(self, point):
77 """Check if the path is unexpectedly blocked by e.g. a unit 78 @param point: tuple: (x, y) 79 @return: bool, True if path is blocked""" 80 return (point in self._get_blocked_coords())
81
82 - def _get_position(self):
83 """Returns current position considering movement status and being in a building""" 84 source = self.unit.position 85 if self.unit.is_moving() and self.path: 86 # we are moving, use next step as source 87 source = Point(*self.path[self.cur]) 88 else: 89 # check if we are in a building 90 building = self.session.world.get_building(self.unit.position) 91 if building is not None: 92 source = building 93 return source
94
95 - def calc_path(self, destination, destination_in_building=False, check_only=False, 96 source=None):
97 """Calculates a path to destination 98 @param destination: a destination supported by pathfinding 99 @param destination_in_building: bool, whether destination is in a building. 100 this makes the unit "enter the building" 101 @param check_only: if True the path isn't saved 102 @param source: use this as source of movement instead of self.unit.position 103 @return: True iff movement is possible or the path if check_only==True""" 104 # calculate our source 105 if source is None: 106 source = self._get_position() 107 108 # call algorithm 109 # to use a different pathfinding code, just change the following line 110 path = FindPath()(source, destination, self._get_path_nodes(), 111 self._get_blocked_coords(), self.move_diagonal, 112 self.make_target_walkable) 113 114 if path is None: 115 return False 116 117 if not check_only: 118 # prepare movement 119 self.move_on_path(path, source, destination_in_building) 120 else: 121 return path 122 123 return True
124
125 - def move_on_path(self, path, source=None, destination_in_building=False):
126 """Start moving on a precalculated path. 127 @param path: return value of FindPath()() 128 """ 129 if source is None: 130 source = self._get_position() 131 self.path = path 132 if self.unit.is_moving(): 133 self.cur = 0 134 self.unit.show() # make sure unit is displayed 135 else: 136 self.cur = -1 137 self.source_in_building = hasattr(source, 'is_building') and source.is_building 138 self.destination_in_building = destination_in_building
139
140 - def get_next_step(self):
141 """Returns the next step in the current movement 142 @return: Point""" 143 if self.cur is None: 144 return None 145 146 self.cur += 1 147 if not self.path or self.cur == len(self.path): 148 # movement finished 149 self.cur = None 150 self.path = None 151 return None 152 153 if self._check_for_obstacles(self.path[self.cur]): 154 # path is suddenly blocked, find another path 155 self.cur -= 1 # reset, since move is not possible 156 # try to calculate another path 157 if not self.calc_path(Point(*self.path[-1]), self.destination_in_building): 158 self.log.info("tile suddenly %s %s blocked for %s %s", 159 self.path[self.cur][0], self.path[self.cur][1], self.unit, self.unit.worldid) 160 # no other path can be found. since the problem cannot be fixed here, 161 # we raise an exception 162 raise PathBlockedError 163 164 # check if we have to change visibility because of entering or leaving a building 165 if self.destination_in_building and self.cur == len(self.path) - 1: 166 self.destination_in_building = False 167 self.unit.hide() 168 elif self.source_in_building and self.cur == 2: 169 self.source_in_building = False 170 self.unit.show() 171 172 return Point(*self.path[self.cur])
173
174 - def get_move_source(self):
175 """Returns the source Point of the current movement. 176 @return: Point or None if no path has been calculated""" 177 return None if not self.path else Point(*self.path[0])
178
179 - def get_move_target(self):
180 """Returns the point where the path leads 181 @return: Point or None if no path has been calculated""" 182 return None if not self.path else Point(*self.path[-1])
183
184 - def end_move(self):
185 """Pretends that the path is finished in order to make the unit stop""" 186 del self.path[self.cur + 1:]
187
188 - def save(self, db, unitid):
189 # just save each step of the path 190 # current position is calculated on loading through unit position 191 if self.path: 192 for step in range(len(self.path)): 193 db("INSERT INTO unit_path(`unit`, `index`, `x`, `y`) VALUES(?, ?, ?, ?)", 194 unitid, step, self.path[step][0], self.path[step][1])
195
196 - def load(self, db, worldid):
197 """ 198 @return: Bool, whether a path was loaded 199 """ 200 path_steps = db.get_unit_path(worldid) 201 if path_steps is None: 202 return False 203 else: 204 self.path = path_steps 205 cur_position = self.unit.position.to_tuple() 206 if cur_position in self.path: 207 self.cur = self.path.index(cur_position) 208 else: 209 self.cur = -1 210 return True
211
212 213 -class ShipPather(AbstractPather):
214 """Pather for ships (units that move on water tiles)"""
215 - def __init__(self, unit, *args, **kwargs):
216 super().__init__(unit, move_diagonal=True, make_target_walkable=False, 217 *args, **kwargs)
218
219 - def _get_path_nodes(self):
220 return self.session.world.water
221
222 - def _get_blocked_coords(self):
223 return self.session.world.ship_map
224
225 226 -class FisherShipPather(ShipPather):
227 """Can also drive through shallow water"""
228 - def _get_path_nodes(self):
229 return self.session.world.water_and_coastline
230
231 - def _get_blocked_coords(self):
232 # don't let fisher be blocked by other ships (#1023) 233 return []
234
235 236 -class BuildingCollectorPather(AbstractPather):
237 """Pather for collectors, that move freely (without depending on roads) 238 within the radius of their home building such as farm animals."""
239 - def __init__(self, unit, *args, **kwargs):
240 super().__init__(unit, move_diagonal=True, *args, **kwargs)
241
242 - def _get_path_nodes(self):
243 from horizons.component.collectingcomponent import CollectingComponent 244 return self.unit.home_building.get_component(CollectingComponent).path_nodes.nodes
245
246 247 -class RoadPather(AbstractPather):
248 """Pather for collectors, that depend on roads (e.g. the one used for the warehouse)"""
249 - def __init__(self, unit, *args, **kwargs):
250 super().__init__(unit, move_diagonal=False, *args, **kwargs) 251 self.island = self.session.world.get_island(unit.position)
252
253 - def _get_path_nodes(self):
254 return self.island.path_nodes.road_nodes
255
256 257 -class SoldierPather(AbstractPather):
258 """Pather for units, that move absolutely freely (such as soldiers) 259 Their path list is maintained by IslandPathNodes"""
260 - def __init__(self, unit, *args, **kwargs):
261 super().__init__(unit, move_diagonal=True, 262 make_target_walkable=False, *args, **kwargs)
263
264 - def _get_path_nodes(self):
265 # island might change (e.g. when transported via ship), so reload every time 266 island = self.session.world.get_island(self.unit.position) 267 return island.path_nodes.nodes
268
269 - def _get_blocked_coords(self):
270 return self.session.world.ground_unit_map
271
272 - def _check_for_obstacles(self, point):
273 # retrieve island, island of soldier may change at any time 274 island = self.session.world.get_island(self.unit.position) 275 path_blocked = not island.path_nodes.is_walkable(self.path[self.cur]) 276 if path_blocked: 277 # update list in island, so that new path calculations consider this obstacle 278 island.path_nodes.reset_tile_walkability(point) 279 self.log.debug("tile %s %s blocked for %s %s on island", point[0], point[1], 280 self.unit, self.unit.worldid) 281 return path_blocked 282 else: 283 # also check in super class 284 return super()._check_for_obstacles(point)
285
286 287 -class StaticPather:
288 """Misc pathing routines not depending on units. 289 Does not use AbstractPather Interface""" 290 @classmethod
291 - def get_path_on_roads(cls, island, source, destination):
292 """Returns a path that runs only on roads. 293 @param island: island to search path on 294 @param source, destination: Point or anything supported by FindPath 295 @return: list of tuples or None in case no path is found""" 296 return FindPath()(source, destination, island.path_nodes.road_nodes)
297