-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsearch.py
More file actions
80 lines (57 loc) · 2.17 KB
/
search.py
File metadata and controls
80 lines (57 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import math
from abc import abstractmethod
from asyncio import Protocol
from collections import deque
from dataclasses import dataclass, field
from queue import PriorityQueue, Empty
from typing import Iterable, TypeVar, Callable, Generic
from itertoolsx import iter_except
Node = TypeVar("Node")
@dataclass
class PathNotFoundError(Exception):
start: Node
end: Node
def __str__(self):
return f"No path exists between {self.start} and {self.end}"
@dataclass
class Path(Generic[Node]):
nodes: list[Node]
length: float
@dataclass(order=True)
class ScoredNode:
f_score: float
node: Node = field(compare=False)
def reconstruct_path(came_from: dict[Node, Node], current: Node) -> list[Node]:
path = deque()
while current in came_from:
current = came_from[current]
path.appendleft(current)
return list(path)
def a_star(
start: Node,
goal: Node,
neighbours: Callable[[Node], Iterable[Node]],
heuristic: Callable[[Node], float],
distance: Callable[[Node, Node], float]
) -> Path[Node]:
open_set: PriorityQueue[ScoredNode] = PriorityQueue()
open_set.put_nowait(ScoredNode(heuristic(start), start))
came_from: dict[Node, Node] = {}
g_score = {start: 0}
for current_scored_node in iter_except(open_set.get_nowait, Empty):
current_node = current_scored_node.node
if current_node == goal:
nodes = reconstruct_path(came_from, current_node)
return Path(nodes, g_score.get(current_node))
for neighbour in neighbours(current_node):
tentative_g_score = g_score.get(current_node, math.inf) + distance(current_node, neighbour)
if tentative_g_score >= g_score.get(neighbour, math.inf):
continue
came_from[neighbour] = current_node
g_score[neighbour] = tentative_g_score
f_score = tentative_g_score + heuristic(neighbour)
scored_neighbour = ScoredNode(f_score, neighbour)
if neighbour in map(lambda scored: scored.node, open_set.queue):
continue
open_set.put_nowait(scored_neighbour)
raise PathNotFoundError(start, goal)