Skip to content

Monte Carlo

Replication helpers and the MCResult container.

Single-process

import simweave as sw

def scenario(seed):
    rng = np.random.default_rng(seed)
    msd = sw.MassSpringDamper(mass=1.0, damping=rng.uniform(0.1, 0.6), stiffness=4.0)
    return sw.simulate(msd, t_span=(0.0, 8.0), dt=0.01,
                       x0=np.array([1.0, 0.0]))

mc = sw.run_monte_carlo(scenario, n_runs=200, seed=42)
print(mc.n_runs, mc.seeds[:3])

Batched / parallel

mc = sw.run_batched_mc(scenario, n_runs=2_000, seed=42, n_workers=8)

run_batched_mc shards the work over a process pool. Your scenario function must be pickleable.

Plotting an ensemble

The fan chart helper accepts an MCResult, a raw 2-D ndarray (n_runs, n_time), or a (times, samples) tuple:

samples = np.stack([r.state[:, 0] for r in mc.results])
times   = mc.results[0].time
sw.plot_mc_fan((times, samples), title="Displacement fan").show()

API

Monte Carlo harness.

MCResult dataclass

MCResult(n_runs: int, seeds: list[int], samples: ndarray | list[Any], scenario_name: str = 'default', extras: dict[str, Any] = dict())

Monte Carlo summary.

run_monte_carlo

run_monte_carlo(scenario_builder: Callable[[int], Any], n_runs: int, seeds: Iterable[int] | None = None, executor: str = 'serial', n_workers: int | None = None, scenario_name: str = 'default') -> MCResult

Run Monte Carlo replicates.

Parameters:

Name Type Description Default
scenario_builder Callable[[int], Any]

Callable f(seed) -> Any. Must be picklable when executor="processes".

required
n_runs int

Number of replicates.

required
seeds Iterable[int] | None

Optional iterable of seeds. Defaults to range(n_runs).

None
executor str

One of "serial", "processes", "threads".

'serial'
n_workers int | None

Max workers for pool executors. Defaults to os-determined.

None
scenario_name str

Label carried on the result for bookkeeping.

'default'
Source code in src/simweave/mc/runner.py
def run_monte_carlo(
    scenario_builder: Callable[[int], Any],
    n_runs: int,
    seeds: Iterable[int] | None = None,
    executor: str = "serial",
    n_workers: int | None = None,
    scenario_name: str = "default",
) -> MCResult:
    """Run Monte Carlo replicates.

    Parameters
    ----------
    scenario_builder:
        Callable ``f(seed) -> Any``. Must be picklable when ``executor="processes"``.
    n_runs:
        Number of replicates.
    seeds:
        Optional iterable of seeds. Defaults to ``range(n_runs)``.
    executor:
        One of ``"serial"``, ``"processes"``, ``"threads"``.
    n_workers:
        Max workers for pool executors. Defaults to os-determined.
    scenario_name:
        Label carried on the result for bookkeeping.
    """
    if executor not in {"serial", "processes", "threads"}:
        raise ValueError("executor must be one of 'serial' | 'processes' | 'threads'.")
    seed_list = list(seeds) if seeds is not None else list(range(n_runs))
    if len(seed_list) != n_runs:
        raise ValueError("len(seeds) must equal n_runs.")

    payload = [(scenario_builder, s) for s in seed_list]

    if executor == "serial" or n_runs == 1:
        results = [_run_single(p) for p in payload]
    elif executor == "processes":
        with ProcessPoolExecutor(max_workers=n_workers) as ex:
            results = list(ex.map(_run_single, payload))
    else:  # threads
        with ThreadPoolExecutor(max_workers=n_workers) as ex:
            results = list(ex.map(_run_single, payload))

    try:
        samples: np.ndarray | list[Any] = np.asarray(results)
        # If numpy fell back to object dtype, keep a Python list for clarity.
        if samples.dtype == object:
            samples = list(results)
    except Exception:
        samples = list(results)

    return MCResult(
        n_runs=n_runs,
        seeds=seed_list,
        samples=samples,
        scenario_name=scenario_name,
    )

run_batched_mc

run_batched_mc(batched_step: Callable[[Generator, int], ndarray], n_runs: int, seed: int | None = 0, scenario_name: str = 'batched') -> MCResult

Run a vectorised Monte Carlo where batched_step returns an (n_runs, ...) ndarray, all replicates progressed in one numpy op.

This is a thin wrapper -- the point is to give a single entry point with an MCResult back so callers don't have to distinguish between the two styles downstream.

Source code in src/simweave/mc/runner.py
def run_batched_mc(
    batched_step: Callable[[np.random.Generator, int], np.ndarray],
    n_runs: int,
    seed: int | None = 0,
    scenario_name: str = "batched",
) -> MCResult:
    """Run a vectorised Monte Carlo where ``batched_step`` returns an
    ``(n_runs, ...)`` ndarray, all replicates progressed in one numpy op.

    This is a thin wrapper -- the point is to give a single entry point with
    an ``MCResult`` back so callers don't have to distinguish between the two
    styles downstream.
    """
    rng = np.random.default_rng(seed)
    samples = np.asarray(batched_step(rng, n_runs))
    if samples.shape[0] != n_runs:
        raise ValueError(
            f"batched_step returned first dim {samples.shape[0]}, expected {n_runs}."
        )
    return MCResult(
        n_runs=n_runs,
        seeds=[seed if seed is not None else 0],
        samples=samples,
        scenario_name=scenario_name,
    )