Skip to content

Road networks

simweave.roads provides discrete-event road-network primitives that sit naturally alongside queuing systems and supply chains inside a shared SimEnvironment. Build anything from a single signalised crossroads to a multi-junction urban network.

The two charts below come from demos 24 and 25. The left chart shows approach-road occupancy at a 4-way signalised junction; the right shows a UK roundabout's circulating and arm-queue vehicle counts over a one-hour simulation.


Concepts

Road

A Road is a free-flow conveyor belt. When a vehicle calls road.enter(vehicle, env) a delivery event is scheduled length / effective_speed simulation-time units in the future. Multiple vehicles travel simultaneously — there is no blocking or overtaking in this model.

Travel time is governed by the road's speed_limit. Vehicles may carry an own-speed override that is capped at the limit (a slow lorry is slower; a fast sports car is no quicker than the limit).

DualCarriageway is a convenience wrapper around two opposing Road instances sharing the same corridor.

Vehicle

Vehicle is a thin Entity that carries an optional speed override and accumulates travel statistics (roads_traversed, total_travel_time).

VehicleArrivalProcess generates Vehicle instances at a given inter-arrival distribution and enters them onto a target road — analogous to ArrivalGenerator in the discrete queueing module.

TrafficSignal and SignalPhase

TrafficSignal cycles through a list of SignalPhase objects. Each phase specifies which approach roads are green and how long the phase lasts. The signal is a registered process: it ticks and advances its phase timer each simulation step.

Important registration order: always register a TrafficSignal before any Intersection it controls. The signal must update its phase state before the intersection dispatches queued vehicles.

env.register(signal)       # 1 — signal ticks first
env.register(intersection) # 2 — then intersection releases queued vehicles

RoadNetwork.register_all() handles this automatically.

Intersection

Intersection holds a FIFO approach queue per incoming road and a weighted list of exit roads. On each tick it processes one vehicle per green approach, routing it to an exit road by weighted random selection.

Without a TrafficSignal the intersection acts as a give-way / all-way stop: all approaches are permanently green and vehicles pass through as fast as they arrive.

junction = sw.Intersection(signal=signal, name="high_st_junction")
junction.add_approach(road_north)
junction.add_approach(road_south)
junction.add_exit(exit_east, weight=2.0)   # twice as likely as west
junction.add_exit(exit_west, weight=1.0)
road_north.outlet = junction
road_south.outlet = junction

Roundabout

Roundabout implements a priority-to-circulating-traffic model. Vehicles arriving at an arm entry wait if the roundabout is at capacity (max_circulating). Once admitted, they spend transit_time simulation-time units circulating and are then routed to an exit road.

Handedness records the driving-side convention:

Value Circulation Countries
Handedness.LEFT Clockwise (from above) UK, Australia, Japan …
Handedness.RIGHT Anti-clockwise EU, US, most of the world
rb = sw.Roundabout(
    max_circulating=6,
    transit_time=6.0,          # seconds inside the roundabout
    handedness=sw.Handedness.LEFT,
    name="town_centre_roundabout",
)
rb.add_entry(road_north)
rb.add_exit(exit_east,  weight=1.0)
rb.add_exit(exit_south, weight=1.0)
rb.add_exit(exit_west,  weight=1.0)
road_north.outlet = rb

RoadNetwork

RoadNetwork is a container that registers all components with a SimEnvironment in the correct tick order via a single register_all(env) call.


Quick start

Signalised junction

import numpy as np
import simweave as sw

# ── Road geometry ──────────────────────────────────────────────────────────
road_ns = sw.Road(200.0, 13.9, lanes=2, name="NS_approach")
road_ew = sw.Road(200.0, 13.9, lanes=2, name="EW_approach")
exit_ns = sw.Road(200.0, 13.9, name="NS_exit")
exit_ew = sw.Road(200.0, 13.9, name="EW_exit")

# ── Traffic signal (45 s NS / 30 s EW) ────────────────────────────────────
signal = sw.TrafficSignal([
    sw.SignalPhase(green_roads=[road_ns], duration=45.0, name="NS_green"),
    sw.SignalPhase(green_roads=[road_ew], duration=30.0, name="EW_green"),
])

# ── Intersection ───────────────────────────────────────────────────────────
junction = sw.Intersection(signal=signal, name="junction")
for road in (road_ns, road_ew):
    junction.add_approach(road)
    road.outlet = junction
junction.add_exit(exit_ns, weight=1.0)
junction.add_exit(exit_ew, weight=1.0)

# ── Arrival processes ──────────────────────────────────────────────────────
rng = np.random.default_rng(0)
arr_ns = sw.VehicleArrivalProcess(
    interarrival=lambda r: r.exponential(5.0),   # 1 vehicle / 5 s
    road=road_ns, rng=rng,
)
arr_ew = sw.VehicleArrivalProcess(
    interarrival=lambda r: r.exponential(4.0),
    road=road_ew, rng=rng,
)

# ── Recorders ──────────────────────────────────────────────────────────────
occ_rec = sw.RoadOccupancyRecorder([road_ns, road_ew])
q_rec   = sw.IntersectionQueueRecorder(junction)

# ── Assemble and run ───────────────────────────────────────────────────────
net = sw.RoadNetwork()
net.add_signal(signal)
net.add_intersection(junction)
for r in (road_ns, road_ew, exit_ns, exit_ew):
    net.add_road(r)
net.add_arrival_process(arr_ns)
net.add_arrival_process(arr_ew)
net.add_recorder(occ_rec)
net.add_recorder(q_rec)

env = sw.SimEnvironment(dt=1.0, end=3600.0)
net.register_all(env)
env.run()

print(f"Cleared  : {junction.total_vehicles:,}")
print(f"Delayed  : {junction.total_delayed:,}")

fig = sw.plot_intersection_queues(q_rec, title="Junction queue lengths")
fig.show()

Roundabout

rb = sw.Roundabout(
    max_circulating=6,
    transit_time=6.0,
    handedness=sw.Handedness.LEFT,
    name="roundabout",
)
rb.add_entry(road_north)
rb.add_entry(road_south)
rb.add_entry(road_east)
rb.add_entry(road_west)
rb.add_exit(exit_north, weight=1.0)
rb.add_exit(exit_south, weight=1.0)
rb.add_exit(exit_east,  weight=1.0)
rb.add_exit(exit_west,  weight=1.0)
for road in (road_north, road_south, road_east, road_west):
    road.outlet = rb

API reference

Vehicle

Vehicle(speed: float | None = None, name: str | None = None)

Bases: Entity

A single road vehicle travelling through a road network.

Parameters:

Name Type Description Default
speed float | None

Optional own-speed override (m/s, or whatever unit your simulation uses). If None, the road's speed_limit governs travel time. A vehicle faster than the speed limit is still capped at the limit.

None
name str | None

Optional display name. Auto-generated if omitted.

None

Attributes:

Name Type Description
speed float | None
roads_traversed int

Number of road segments completed so far.

total_travel_time float

Cumulative in-road travel time (does not include intersection wait).

VehicleArrivalProcess

VehicleArrivalProcess(interarrival: Callable[[Generator], float], road: 'Road', rng: Generator | None = None, speed: float | None = None, name: str | None = None)

Bases: Entity

Generates :class:Vehicle instances and enters them onto a road.

Parameters:

Name Type Description Default
interarrival Callable[[Generator], float]

Callable (rng) -> float returning the next inter-arrival gap. Use simweave.exponential(rate=λ) for a Poisson process.

required
road 'Road'

The :class:~simweave.roads.road.Road that receives new vehicles.

required
rng Generator | None

Numpy random generator. Defaults to np.random.default_rng().

None
speed float | None

Fixed speed assigned to every generated vehicle. None uses the road's speed limit.

None
name str | None

Optional display name.

None

Attributes:

Name Type Description
generated int

Cumulative vehicles successfully placed on the road.

rejected int

Vehicles that could not enter (should not occur for free-flow roads).

Road

Road(length: float, speed_limit: float, lanes: int = 1, outlet: Any = None, name: str | None = None)

Bases: Entity

Free-flow road segment.

Vehicles enter at one end via :meth:enter and are asynchronously delivered to the :attr:outlet after length / effective_speed simulation-time units. The outlet must expose an arrive(vehicle, from_road, env) method (implemented by :class:~simweave.roads.intersection.Intersection and :class:~simweave.roads.roundabout.Roundabout).

Parameters:

Name Type Description Default
length float

Road length in metres (or whatever distance unit your scenario uses).

required
speed_limit float

Free-flow speed in m/s. Travel time = length / speed_limit.

required
lanes int

Number of lanes (informational; does not limit throughput in the free-flow model but is exposed for recorders and visualisation).

1
outlet Any

Where vehicles go after traversing this road. Pass None for a terminal road (vehicles exit the system silently).

None
name str | None

Optional display name.

None

Attributes:

Name Type Description
in_transit int

Number of vehicles currently travelling on this road.

total_entered int

Cumulative vehicles that have entered this road.

total_exited int

Cumulative vehicles that have completed this road.

outlet property writable

outlet: Any

Where vehicles go after traversing this road.

travel_time

travel_time(vehicle: 'Vehicle | None' = None) -> float

Return travel time in simulation-time units.

Vehicles are capped at the speed limit; slower vehicles use their own speed.

enter

enter(vehicle: 'Vehicle', env: 'SimEnvironment') -> bool

Place a vehicle onto this road.

Schedules a delivery event after travel_time(vehicle) simulation units. Returns True always (free-flow roads have no capacity limit).

DualCarriageway

DualCarriageway(length: float, speed_limit: float, lanes_each: int = 1, name: str = 'dual_carriageway')

Two opposing :class:Road instances sharing the same corridor.

Models a dual-carriageway (divided highway) where traffic flows in both directions simultaneously. Each direction is an independent :class:Road with its own outlet and arrival process — simply wire them up separately.

Parameters:

Name Type Description Default
length float

Carriageway length (m) — shared by both directions.

required
speed_limit float

Free-flow speed (m/s) — shared by both directions.

required
lanes_each int

Lanes per direction. Default 1.

1
name str

Base name. "_forward" / "_backward" suffixes are appended.

'dual_carriageway'

roads property

roads: tuple[Road, Road]

Both constituent roads as (forward, backward).

SignalPhase dataclass

SignalPhase(green_roads: list['Road'] = list(), duration: float = 30.0, name: str = 'phase')

One phase in a traffic-signal cycle.

Parameters:

Name Type Description Default
green_roads list['Road']

Roads whose approach queues are released during this phase.

list()
duration float

Phase duration in simulation-time units.

30.0
name str

Optional label (e.g. "NS_green").

'phase'

TrafficSignal

TrafficSignal(phases: list[SignalPhase], name: str | None = None)

Bases: Entity

Fixed-time traffic signal controller.

Cycles sequentially through :class:SignalPhase objects. When the current phase timer expires the controller advances to the next phase (wrapping around to the first after the last).

Parameters:

Name Type Description Default
phases list[SignalPhase]

Ordered list of signal phases. Must contain at least one entry.

required
name str | None

Optional display name.

None

Attributes:

Name Type Description
current_phase SignalPhase

The currently active phase.

phase_timer float

Simulation time remaining in the current phase.

phase_index int

Zero-based index of the current phase.

road_is_green

road_is_green(road: 'Road') -> bool

Return True if road is in the currently active green set.

Intersection

Intersection(signal: 'TrafficSignal | None' = None, rng: Generator | None = None, approach_capacity: int = 50, name: str | None = None)

Bases: Entity

Signalised or give-way road intersection.

Parameters:

Name Type Description Default
signal 'TrafficSignal | None'

Optional :class:~simweave.roads.signal.TrafficSignal. None means all approaches are always green (give-way / all-way stop).

None
rng Generator | None

Numpy random generator for exit-road selection.

None
approach_capacity int

Maximum vehicles per approach queue. Further arrivals are dropped.

50
name str | None

Optional display name.

None

Attributes:

Name Type Description
total_vehicles int

Cumulative vehicles dispatched through this intersection.

total_delayed int

Cumulative vehicles that had to queue (arrived at a red approach).

total_dropped int

Cumulative vehicles dropped because an approach queue was full.

Notes

Registration order: if a :class:TrafficSignal is attached, register it before this intersection so the signal's state is updated before approach queues are released each tick.

Throughput: at most one vehicle per approach road per tick is dispatched. Reduce dt for higher saturation flows.

queue_lengths property

queue_lengths: dict[str, int]

Current queue length per approach road name.

total_queued property

total_queued: int

Total vehicles currently waiting across all approaches.

add_approach

add_approach(road: 'Road') -> None

Register an approach road.

Called automatically on the first arrival from an unknown road, but you can also call this explicitly to pre-create the queue.

add_exit

add_exit(road: 'Road', weight: float = 1.0) -> None

Register an exit road with a routing weight.

Weights are normalised to probabilities; absolute values do not matter, only relative proportions.

arrive

arrive(vehicle: 'Vehicle', from_road: 'Road', env: 'SimEnvironment') -> None

Receive a vehicle arriving from from_road.

Handedness

Bases: Enum

Driving-side convention for a roundabout.

LEFT Traffic circulates clockwise (viewed from above). Entering vehicles give way to traffic from the right. Used in the UK, Australia, Japan, and other left-hand-traffic countries.

RIGHT Traffic circulates anti-clockwise. Entering vehicles give way to traffic from the left. Used in continental Europe, North America, and most of the rest of the world.

Roundabout

Roundabout(max_circulating: int = 8, transit_time: float = 5.0, handedness: Handedness = LEFT, rng: Generator | None = None, approach_capacity: int = 30, name: str | None = None)

Bases: Entity

Priority-to-circulating roundabout.

Parameters:

Name Type Description Default
max_circulating int

Maximum number of vehicles allowed in the roundabout simultaneously. Models the finite capacity of the circulatory carriageway.

8
transit_time float

Time a vehicle spends in the roundabout from entry to exit (same units as the simulation dt).

5.0
handedness Handedness

Driving-side convention — recorded for labelling / visualisation.

LEFT
rng Generator | None

Numpy random generator for exit-road selection.

None
approach_capacity int

Maximum vehicles per arm queue before arrivals are dropped.

30
name str | None

Optional display name.

None

Attributes:

Name Type Description
handedness Handedness
circulating int

Number of vehicles currently in the roundabout.

total_entered int

Cumulative vehicles admitted to the roundabout.

total_exited int

Cumulative vehicles that have left onto an exit road.

total_delayed int

Cumulative vehicles that had to queue at an arm entry.

total_dropped int

Cumulative vehicles dropped because an arm queue was full.

queue_lengths property

queue_lengths: dict[str, int]

Current queue length per entry arm road name.

total_queued property

total_queued: int

Total vehicles currently waiting at all arms.

add_entry

add_entry(road: 'Road') -> None

Register an entry arm road.

Called automatically on first arrival from an unknown road, but can also be called explicitly to pre-create the arm queue.

add_exit

add_exit(road: 'Road', weight: float = 1.0) -> None

Register an exit road with a routing weight.

arrive

arrive(vehicle: 'Vehicle', from_road: 'Road', env: 'SimEnvironment') -> None

Receive a vehicle at the entry of arm from_road.

RoadNetwork

RoadNetwork(name: str = 'road_network')

Container for all road-network components in a scenario.

Parameters:

Name Type Description Default
name str

Optional label for this network.

'road_network'
Example

::

net = sw.RoadNetwork(name="town_centre")
net.add_signal(signal)
net.add_intersection(crossroads)
net.add_road(road_north)
net.add_arrival_process(arrivals_north)
net.register_all(env)

add_signal

add_signal(signal: 'TrafficSignal') -> 'TrafficSignal'

Add a :class:~simweave.roads.signal.TrafficSignal.

add_intersection

add_intersection(intersection: 'Intersection') -> 'Intersection'

Add an :class:~simweave.roads.intersection.Intersection.

add_roundabout

add_roundabout(roundabout: 'Roundabout') -> 'Roundabout'

Add a :class:~simweave.roads.roundabout.Roundabout.

add_road

add_road(road: 'Road') -> 'Road'

Add a single :class:~simweave.roads.road.Road.

add_dual_carriageway

add_dual_carriageway(dc: 'DualCarriageway') -> 'DualCarriageway'

Register both constituent roads of a :class:~simweave.roads.road.DualCarriageway.

add_arrival_process

add_arrival_process(proc: 'VehicleArrivalProcess') -> 'VehicleArrivalProcess'

Add a :class:~simweave.roads.vehicle.VehicleArrivalProcess.

add_recorder

add_recorder(recorder: object) -> object

Add a recorder (e.g. :class:~simweave.roads.recorder.RoadOccupancyRecorder).

register_all

register_all(env: 'SimEnvironment') -> None

Register every component with env in the correct tick order.

Order: signals → intersections → roundabouts → roads → arrival processes → recorders.

RoadOccupancyRecorder

RoadOccupancyRecorder(roads: 'list[Road]', name: str | None = None)

Bases: _Recorder

Snapshot in-transit vehicle counts for a set of roads each tick.

Attributes:

Name Type Description
times list[float]

Simulation times of each sample.

occupancy list[list[int]]

(n_samples, n_roads) in-transit counts per road.

road_names list[str]

Names of the roads being recorded (same column order as occupancy).

IntersectionQueueRecorder

IntersectionQueueRecorder(intersection: 'Intersection', name: str | None = None)

Bases: _Recorder

Snapshot approach-queue lengths at one intersection each tick.

Attributes:

Name Type Description
times list[float]

Simulation times.

total_queued list[int]

Total vehicles waiting across all approaches.

per_approach list[dict[str, int]]

Per-approach breakdown {road_name: queue_length} at each sample.