Skip to content

Agents and spatial

Graphs, shortest-path algorithms, and an Agent that traverses them.

Graphs

import simweave as sw

g = sw.grid_graph(rows=8, cols=12, diagonal=True)

grid_graph returns a simweave.spatial.Graph whose nodes are (row, col) tuples. Bring your own graph by either constructing Graph directly or using a NetworkX graph with the [graph] extra.

Path planners

  • a_star(graph, start, goal, heuristic)
  • dijkstra(graph, start, goal)
  • Heuristics: manhattan, euclidean, chebyshev
  • Errors: NoPathError if no route exists

Agents

agent = sw.Agent(
    graph=g,
    start_node=(0, 0),
    tasks=[(7, 11), (3, 4), (0, 11)],
    speed=2.0,
    heuristic=sw.manhattan,
    name="rover",
)
env = sw.SimEnvironment(dt=0.5, end=50.0)
env.register(agent)
env.run()

sw.plot_agent_path(agent, graph=g).write_html("path.html",
                                              include_plotlyjs="cdn")

Compass is a small enum for cardinal-direction logic if you need it when authoring custom agent behaviours.

API: simweave.agents

Agents, routing, and orientation primitives.

Compass

Compass(points: int = 8, angle: float = 0.0)

Discrete compass quantised to a fixed number of points.

Source code in src/simweave/agents/compass.py
def __init__(self, points: int = 8, angle: float = 0.0) -> None:
    if points not in self._ALLOWED:
        raise ValueError(f"Compass points must be one of {self._ALLOWED}")
    self.points = points
    self.atomic_angle = 360.0 / points
    self.angle = self._quantize(float(angle))

NoPathError

Bases: ValueError

Raised when A* cannot reach the goal from the start node.

Agent

Agent(graph: Any, start_node: Node, tasks: Sequence[Node] | None = None, speed: float = 1.0, heuristic: Callable[[Node, Node], float] | None = None, on_arrive: Callable[['Agent', Node, Any], None] | None = None, weight_attr: str = 'weight', name: str | None = None)

Bases: Entity

Graph-routing agent.

Parameters:

Name Type Description Default
graph Any

A simweave Graph, networkx graph, or dict-of-dict adjacency.

required
start_node Node

Node at which the agent spawns.

required
tasks Sequence[Node] | None

Ordered list of target nodes to visit.

None
speed float

Distance per unit time.

1.0
heuristic Callable[[Node, Node], float] | None

Optional heuristic for A. If None, A degenerates to Dijkstra.

None
on_arrive Callable[['Agent', Node, Any], None] | None

Optional callback on_arrive(agent, node, env) fired when the agent arrives at any node (including intermediate path nodes).

None
name str | None

Optional agent name.

None
Source code in src/simweave/agents/agent.py
def __init__(
    self,
    graph: Any,
    start_node: Node,
    tasks: Sequence[Node] | None = None,
    speed: float = 1.0,
    heuristic: Callable[[Node, Node], float] | None = None,
    on_arrive: Callable[["Agent", Node, Any], None] | None = None,
    weight_attr: str = "weight",
    name: str | None = None,
) -> None:
    super().__init__(name=name)
    self.graph = graph
    self.position: Node = start_node
    self.tasks: list[Node] = list(tasks or [])
    self.path: list[Node] = []
    self.speed = float(speed)
    self.heuristic = heuristic
    self.compass = Compass(points=8)
    self.weight_attr = weight_attr
    self.on_arrive = on_arrive

    self._edge_remaining: float = 0.0
    self._edge_weight: float = 0.0
    self.history: list[tuple[float, Node]] = []
    self.completed_tasks: int = 0

plan_to

plan_to(target: Node) -> None

Replan to target from the current position using A*.

Source code in src/simweave/agents/agent.py
def plan_to(self, target: Node) -> None:
    """Replan to ``target`` from the current position using A*."""
    try:
        path = a_star(
            self.graph,
            self.position,
            target,
            heuristic=self.heuristic,
            weight_attr=self.weight_attr,
        )
    except NoPathError:
        log.warning("%s: no path from %s to %s", self.name, self.position, target)
        self.path = []
        return
    # path[0] is the current position; drop it.
    self.path = path[1:] if path and path[0] == self.position else path

a_star

a_star(graph: Any, start: Node, goal: Node, heuristic: Heuristic | None = None, weight_attr: str = 'weight') -> list[Node]

Compute shortest path from start to goal.

Parameters:

Name Type Description Default
graph Any

Any object understood by :func:simweave.spatial.graph.adj_view.

required
heuristic Heuristic | None

Admissible heuristic h(node, goal). Defaults to zero (Dijkstra).

None
weight_attr str

Edge attribute to treat as weight. Defaults to "weight".

'weight'

Returns:

Type Description
list[Node]

Path from start to goal inclusive.

Source code in src/simweave/agents/routing.py
def a_star(
    graph: Any,
    start: Node,
    goal: Node,
    heuristic: Heuristic | None = None,
    weight_attr: str = "weight",
) -> list[Node]:
    """Compute shortest path from ``start`` to ``goal``.

    Parameters
    ----------
    graph:
        Any object understood by :func:`simweave.spatial.graph.adj_view`.
    heuristic:
        Admissible heuristic ``h(node, goal)``. Defaults to zero (Dijkstra).
    weight_attr:
        Edge attribute to treat as weight. Defaults to ``"weight"``.

    Returns
    -------
    list[Node]
        Path from start to goal inclusive.
    """
    if heuristic is None:

        def heuristic(a: Node, b: Node) -> float:
            return 0.0

    open_heap: list[tuple[float, int, Node]] = [(0.0, 0, start)]
    came_from: dict[Node, Node] = {}
    g_score: dict[Node, float] = {start: 0.0}
    counter = 1
    closed: set[Node] = set()

    while open_heap:
        _, _, current = heapq.heappop(open_heap)
        if current in closed:
            continue
        if current == goal:
            return _reconstruct(came_from, current)
        closed.add(current)

        for neighbor, edge_data in adj_view(graph, current).items():
            if neighbor in closed:
                continue
            w = edge_weight(edge_data, weight_attr)
            tentative = g_score[current] + w
            if tentative < g_score.get(neighbor, inf):
                g_score[neighbor] = tentative
                came_from[neighbor] = current
                f = tentative + heuristic(neighbor, goal)
                heapq.heappush(open_heap, (f, counter, neighbor))
                counter += 1

    raise NoPathError(f"No path from {start!r} to {goal!r}.")

dijkstra

dijkstra(graph: Any, start: Node, goal: Node, weight_attr: str = 'weight') -> list[Node]

Convenience wrapper -- A* with zero heuristic.

Source code in src/simweave/agents/routing.py
def dijkstra(
    graph: Any, start: Node, goal: Node, weight_attr: str = "weight"
) -> list[Node]:
    """Convenience wrapper -- A* with zero heuristic."""
    return a_star(graph, start, goal, heuristic=None, weight_attr=weight_attr)

API: simweave.spatial

Graphs and spatial primitives.

Graph

Graph(directed: bool = False)

A tiny adjacency-dict graph. Optional subset of the networkx API.

Source code in src/simweave/spatial/graph.py
def __init__(self, directed: bool = False) -> None:
    self._adj: dict[Node, dict[Node, dict[str, Any]]] = {}
    self.directed = directed

adj_view

adj_view(graph: Any, node: Node) -> dict[Node, dict[str, Any]]

Return {neighbor: edge_data_dict} for either a simweave Graph or a networkx graph.

Source code in src/simweave/spatial/graph.py
def adj_view(graph: Any, node: Node) -> dict[Node, dict[str, Any]]:
    """Return ``{neighbor: edge_data_dict}`` for *either* a simweave Graph or a networkx graph."""
    # simweave Graph or any dict-of-dict
    if isinstance(graph, Graph):
        return graph[node]
    if isinstance(graph, dict):
        nbrs = graph[node]
        return {
            v: (d if isinstance(d, dict) else {"weight": float(d)})
            for v, d in nbrs.items()
        }
    # networkx-style: has .neighbors and __getitem__ returning AtlasView
    if hasattr(graph, "neighbors") and hasattr(graph, "__getitem__"):
        nbrs = graph[node]
        # Ensure each edge dict has a weight key
        result: dict[Node, dict[str, Any]] = {}
        for v in nbrs:
            data = nbrs[v]
            if isinstance(data, dict):
                result[v] = data
            else:
                result[v] = {"weight": float(data)}
        return result
    raise TypeError(f"Unsupported graph type: {type(graph).__name__}")

grid_graph

grid_graph(nrows: int, ncols: int, *, diagonal: bool = False, weight: float = 1.0, diagonal_weight: float | None = None) -> Graph

Build a 2D lattice graph with nodes (r, c).

Parameters:

Name Type Description Default
diagonal bool

If True, add 45-degree edges (cost defaults to sqrt(2) * weight).

False
Source code in src/simweave/spatial/graph.py
def grid_graph(
    nrows: int,
    ncols: int,
    *,
    diagonal: bool = False,
    weight: float = 1.0,
    diagonal_weight: float | None = None,
) -> Graph:
    """Build a 2D lattice graph with nodes ``(r, c)``.

    Parameters
    ----------
    diagonal:
        If True, add 45-degree edges (cost defaults to ``sqrt(2) * weight``).
    """
    if diagonal and diagonal_weight is None:
        diagonal_weight = weight * (2**0.5)

    g = Graph(directed=False)
    for r in range(nrows):
        for c in range(ncols):
            g.add_node((r, c))

    for r in range(nrows):
        for c in range(ncols):
            if r + 1 < nrows:
                g.add_edge((r, c), (r + 1, c), weight=weight)
            if c + 1 < ncols:
                g.add_edge((r, c), (r, c + 1), weight=weight)
            if diagonal:
                if r + 1 < nrows and c + 1 < ncols:
                    g.add_edge((r, c), (r + 1, c + 1), weight=diagonal_weight)
                if r + 1 < nrows and c - 1 >= 0:
                    g.add_edge((r, c), (r + 1, c - 1), weight=diagonal_weight)
    return g