from __future__ import annotations
import re
import signal
import threading
import time
import warnings
import numpy as np
import pandas as pd
import gamspy as gp
from gamspy.exceptions import GamspyException, ValidationError
from gamspy.formulations.sddp.noise import NoiseConfig
from gamspy.formulations.sddp.policy import PolicyResult
from gamspy.formulations.sddp.result import SDDPResult, _sci
from gamspy.formulations.sddp.risk import CVaR
from gamspy.formulations.sddp.simulation import SimulationResult
from gamspy.formulations.sddp.state import StateVar
[docs]
class SDDP:
"""
Stochastic Dual Dynamic Programming for multistage stochastic GAMSPy
models.
SDDP trains a cost-to-go approximation for a multistage stochastic
program by alternating forward simulation passes with backward Benders
cut generation. Register the state variable(s) with ``add_state``, the
stochastic noise with ``set_noise``, inject the algorithm into the model
with ``build``, then ``train`` the policy.
Parameters
----------
container : Container
The ``gp.Container`` holding every user-defined symbol.
stage_set : Set
The full stage set; its length is the number of stages in the problem.
time_set : Set | None
Full time-domain set when the model has a finer-grained time inside
each stage. By default None, which reuses ``stage_set``.
n_trials : int
Number of trial levels per state variable (must be >= 1). By default 5.
seed : int
Seed for the forward-pass scenario sampler. By default 42.
verbose : bool
Print one convergence row per iteration during training. By default
True.
Examples
--------
>>> import numpy as np
>>> import gamspy as gp
>>> from gamspy.formulations import SDDP
>>> m = gp.Container()
>>> t = gp.Set(m, "t", records=["jan", "feb", "mar", "apr"])
>>> sddp = SDDP(m, stage_set=t, n_trials=2, seed=42, verbose=False)
>>> stage = sddp.active_stage
>>> precip = gp.Parameter(m, "precip")
>>> level = gp.Variable(m, "L", type="positive", domain=t)
>>> spill = gp.Variable(m, "F", type="positive", domain=t)
>>> shortfall = gp.Variable(m, "Z", type="positive", domain=t)
>>> release = gp.Variable(m, "R", type="positive", domain=t)
>>> cost = gp.Variable(m, "COST")
>>> release.up[t] = 200.0
>>> level.up[t] = 250.0
>>> balance = gp.Equation(m, "balance", domain=t)
>>> obj = gp.Equation(m, "obj")
>>> balance[t].where[stage[t]] = (
... level[t] - level[t.lag(1, "circular")]
... + release[t] + spill[t] - shortfall[t] == precip
... )
>>> obj[...] = cost == gp.Sum(stage[t], 10.0 * spill[t] + 5.0 * shortfall[t])
>>> sddp.add_state(level, initial_state=100.0)
>>> sddp.set_noise(precip, scenario_data=np.array([[50.0], [50.0], [-50.0], [-50.0]]))
>>> sddp.build(stage_cost=cost)
>>> sddp
SDDP(stages=4, states=['L'], noise=precip, built=True)
"""
_EPS = 1e-9
def __init__(
self,
container: gp.Container,
stage_set: gp.Set,
time_set: gp.Set | None = None,
n_trials: int = 5,
seed: int = 42,
verbose: bool = True,
) -> None:
if not isinstance(container, gp.Container):
raise ValidationError("container must be a gp.Container instance")
if not isinstance(stage_set, gp.Set):
raise ValidationError("stage_set must be a gp.Set instance")
if time_set is not None and not isinstance(time_set, gp.Set):
raise ValidationError("time_set must be a gp.Set instance or None")
if n_trials < 1:
raise ValidationError(f"n_trials must be >= 1, got {n_trials}")
self._m: gp.Container = container
self._stage_parent: gp.Set = stage_set
self._time_set: gp.Set = time_set if time_set is not None else stage_set
self._n_trials: int = n_trials
self._seed: int = seed
self._verbose: bool = verbose
# sddp-owned active-stage singleton; the user reads this back via
# `sddp.active_stage` and references it in their .where[...] gates.
self._active_stage: gp.Set = gp.Set(
container,
"sddp_active",
domain=stage_set,
description="sddp-owned active-stage singleton",
)
self._states: list[StateVar] = []
self._noise: NoiseConfig | None = None
# Counter so each simulate() call can use unique GAMSPy symbol names.
self._sim_call_count: int = 0
# populated by build()
self._built: bool = False
self._j_set: gp.Set | None = None
self._jj_set: gp.Set | None = None
self._alpha: gp.Variable | None = None
self._acost: gp.Variable | None = None
self._gp_model: gp.Model | None = None
self._loaded_from_save: bool = False
@property
def active_stage(self) -> gp.Set:
"""sddp-owned active-stage singleton.
Reference this in your equations' ``.where[stage[...]]`` clauses so
each per-stage solve activates only the equations for the current
stage.
"""
return self._active_stage
@property
def container(self) -> gp.Container:
"""
The host ``gp.Container`` holding every symbol owned by this sddp instance.
"""
return self._m
def _resolve_report(self, report: list[gp.Variable] | None) -> list[gp.Variable]:
"""
Normalize a ``report=`` argument for ``policy()`` / ``simulate()``.
"""
if report is None:
return [sv.variable for sv in self._states]
resolved: list[gp.Variable] = []
for item in report:
if not isinstance(item, gp.Variable):
raise ValidationError(
f"report items must be gp.Variable, got "
f"{type(item).__name__}. To reference a variable by name, "
f"pass the variable object, e.g. sddp.container['R']."
)
resolved.append(item)
return resolved
def _resolve_state(self, state: float | dict[str, float]) -> dict[str, float]:
names = [sv.variable.name for sv in self._states]
if isinstance(state, dict):
expected = set(names)
got = set(state)
if got != expected:
raise ValidationError(
f"state keys {sorted(got)} must match the state variables {names}."
)
return {n: float(state[n]) for n in names}
if len(self._states) != 1:
raise ValidationError(
f"scalar state but {len(self._states)} states {names}; pass a dict keyed by state name."
)
return {names[0]: float(state)}
def _restore_user_bounds(self) -> None:
for v, vdom, lo_p, up_p in self._user_bound_snaps:
if not vdom:
v.lo[...] = lo_p[...]
v.up[...] = up_p[...]
elif len(vdom) == 1:
v.lo[vdom[0]] = lo_p[vdom[0]]
v.up[vdom[0]] = up_p[vdom[0]]
else:
idx = tuple(vdom)
v.lo[idx] = lo_p[idx]
v.up[idx] = up_p[idx]
# registration
[docs]
def add_state(
self,
variable: gp.Variable,
lower_bound: float | None = None,
upper_bound: float | None = None,
initial_state: float | None = None,
) -> None:
"""Register a state variable.
Parameters
----------
variable : Variable
GAMSPy Variable for the state (reservoir level, inventory, etc.),
indexed over the time set.
lower_bound : float | None
Lower end of the feasible range used to seed the initial uniform
trial grid and to clamp the adaptive trial update. By default
None, which infers the bound (see Notes).
upper_bound : float | None
Upper end of the feasible range, resolved like ``lower_bound``.
By default None.
initial_state : float | None
Value the state takes before stage 1. By default None, which
falls back to ``lower_bound``.
Notes
-----
Each bound is resolved in the following order:
1. Use the value passed here.
2. If ``None``, read from ``variable.records["lower"]`` /
``variable.records["upper"]``.
3. If the variable has no recorded bounds, fall back to the variable
type's default (``positive`` gives ``(0, +inf)``, etc.).
If a bound is passed explicitly while the variable also carries an
explicit recorded bound that disagrees, a ``UserWarning`` is raised
and the passed value still wins.
"""
if self._built:
raise ValidationError("Cannot call add_state() after build()")
var_lo, var_up, src = self._infer_var_bounds(variable)
# Resolve lower / upper with user-wins-with-warning semantics.
if lower_bound is None:
lo = var_lo
else:
lo = float(lower_bound)
if src == "records" and abs(lo - var_lo) > 1e-12:
warnings.warn(
f"add_state: user-supplied lower_bound={lo} for "
f"`{variable.name}` differs from the variable's recorded "
f"lower bound={var_lo}. Using {lo}. (Did you mean to set "
f"`{variable.name}.lo` differently, or omit lower_bound?)",
UserWarning,
stacklevel=2,
)
if upper_bound is None:
up = var_up
else:
up = float(upper_bound)
if src == "records" and abs(up - var_up) > 1e-12:
warnings.warn(
f"add_state: user-supplied upper_bound={up} for "
f"`{variable.name}` differs from the variable's recorded "
f"upper bound={var_up}. Using {up}. (Did you mean to set "
f"`{variable.name}.up` differently, or omit upper_bound?)",
UserWarning,
stacklevel=2,
)
sv = StateVar( # type: ignore[call-arg]
variable=variable,
lower_bound=lo,
upper_bound=up,
initial_state=float(initial_state) if initial_state is not None else None,
)
sv.validate()
self._states.append(sv)
[docs]
def set_noise(
self,
parameter: gp.Parameter,
scenario_data: np.ndarray,
probabilities: np.ndarray | list[float] | None = None,
) -> None:
"""Register the stochastic noise model.
Parameters
----------
parameter : Parameter
GAMSPy Parameter that is overwritten before each LP solve with
the sampled inflow value for the current scenario.
scenario_data : np.ndarray
2-D numpy array of shape ``(n_stages, n_scenarios)``.
probabilities : np.ndarray | list[float] | None
Optional 1-D array of scenario probabilities, shape
``(n_scenarios,)``. Must be non-negative and sum to 1.0. By
default None, which makes the scenarios equally likely
(probability ``1/n_scenarios`` each).
"""
if self._built:
raise ValidationError("Cannot call set_noise() after build()")
if self._noise is not None:
raise ValidationError("set_noise() already called")
prob_array: np.ndarray | None = None
if probabilities is not None:
prob_array = np.asarray(probabilities, dtype=float)
nc = NoiseConfig(
parameter=parameter,
scenario_data=np.asarray(scenario_data, dtype=float),
probabilities=prob_array,
)
n_stages = (
len(self._stage_parent.records)
if self._stage_parent.records is not None
else 0
)
if n_stages:
nc.validate(n_stages)
self._noise = nc
# build
[docs]
def build(
self,
stage_cost: gp.Variable,
equations: list | None = None,
) -> None:
"""Inject the SDDP algorithm into the user model.
Parameters
----------
stage_cost : Variable
GAMSPy Variable equal to the per-stage operational cost
(WITHOUT the future-cost alpha term).
equations : list | None
User physics equations to include in the LP. By default None,
which makes the sddp module pull every equation currently
declared in the container.
"""
if self._built:
raise ValidationError("build() already called")
if not self._states:
raise ValidationError("Call add_state() before build()")
if self._noise is None:
raise ValidationError("Call set_noise() before build()")
if equations is None:
equations = list(self._m.getEquations())
if not equations:
raise ValidationError(
"build(equations=None) found no equations in the container. "
"Either declare your equations before calling build(), or "
"pass them explicitly via build(equations=[...])."
)
# Pre-flight: at least one equation must reference stage_cost. Without
# this the LP minimises a free variable that nothing constrains, so
# the solve returns -inf (or a bound-constrained extreme) silently.
if not self._equation_references_var(equations, stage_cost):
raise ValidationError(
"None of the supplied equations reference the stage-cost"
)
self._user_variables = list(self._m.getVariables())
m = self._m
ww = self._active_stage # sddp-owned active-stage singleton
w = self._stage_parent # full stage set
t_set = self._time_set
w_labels = w.toList()
t_labels = t_set.toList()
n_stages = len(w_labels)
n_times = len(t_labels)
if n_times % n_stages != 0:
raise ValidationError(
f"len(time_set)={n_times} is not divisible by len(stage_set)={n_stages}"
)
hpw = n_times // n_stages # time steps per stage (e.g. 168 hours/week)
last_hour: dict[str, str] = {}
prev_last_hour: dict[str, str] = {}
for pos, wl in enumerate(w_labels):
last_hour[wl] = t_labels[(pos + 1) * hpw - 1]
prev_last_hour[wl] = t_labels[((pos - 1) % n_stages + 1) * hpw - 1]
last_set = gp.Set(
m,
"sddp_last",
domain=[w, t_set],
records=[(wl, last_hour[wl]) for wl in w_labels],
description="last time step of each stage",
)
prevlast_set = gp.Set(
m,
"sddp_prevlast",
domain=[w, t_set],
records=[(wl, prev_last_hour[wl]) for wl in w_labels],
description="last time step of the previous stage (circular)",
)
# Alias for t_set used inside equation/loop bodies whose control index
# is the same underlying set as t_set (i.e. when hpw == 1 and the
# stage set is the same set as the time set). GAMS requires distinct
# aliases to avoid the "already in control" error.
#
# CONVENTION for future edits: any new equation or .where[...] gate
# referencing the time-domain index must use `tt` (or self._tt),
# NEVER `t_set`/`self._time_set`. Bypassing this works for hpw>1
# problems (hydro) but breaks hpw=1 problems (ClearLake) with a
# runtime "Set is already in control" ValidationError.
tt = gp.Alias(m, "sddp_tt", alias_with=t_set)
# Step 1: j and jj
j = gp.Set(
m,
"sddp_j",
description="SDDP iteration index",
) # records set in train()
jj = gp.Set(
m,
"sddp_jj",
domain=j,
description="active cut iterations (subset of j, grows each iter)",
)
self._j_set = j
self._jj_set = jj
# Step 2: trial grid and per-state cut parameters
n_trials = self._n_trials
i_labels = [f"i{k}" for k in range(1, n_trials + 1)]
i_set = gp.Set(
m,
"sddp_i",
records=i_labels,
description="trial levels",
)
self._i_set = i_set
fracs = [
(pos_i / (n_trials - 1) if n_trials > 1 else 0.0)
for pos_i in range(n_trials)
]
for sv in self._states:
sv.trial_set = i_set
lb, ub = sv.lower_bound, sv.upper_bound
rows: list[tuple] = []
for wl in w_labels:
ph = prev_last_hour[wl]
for pos_i, il in enumerate(i_labels):
val = lb + fracs[pos_i] * (ub - lb)
if abs(val) < self._EPS:
val = self._EPS
rows.append((il, ph, val))
sv.trial_param = gp.Parameter(
m,
f"sddp_ires_{sv.name}",
domain=[i_set, t_set],
records=pd.DataFrame(rows, columns=["i", "t", "value"]),
description=f"trial reservoir levels for {sv.name}",
)
sv.cut_slope = gp.Parameter(
m,
f"sddp_cm_{sv.name}",
domain=[j, i_set, w],
description=f"Benders cut slope (cont_m) for {sv.name}",
)
cut_intercept = gp.Parameter(
m,
"sddp_d",
domain=[j, i_set, w],
description="Benders cut intercept (delta), shared across states",
)
self._cut_intercept = cut_intercept
sv = self._states[0]
assert sv.trial_set is not None
assert sv.cut_slope is not None
i_labels = i_set.toList()
# Step 3: Scenario set
assert self._noise is not None
nc = self._noise
s_labels = [f"s{k}" for k in range(1, nc.n_scenarios + 1)]
nc.scenario_set = gp.Set(
m, "sddp_s", records=s_labels, description="inflow scenarios"
)
s_set = nc.scenario_set
# Probability per scenario: uniform by default, user-supplied otherwise.
if nc.probabilities is not None:
prob_values = nc.probabilities
else:
prob_values = np.full(nc.n_scenarios, 1.0 / nc.n_scenarios)
prob_param = gp.Parameter(
m,
"sddp_prob",
domain=s_set,
records=pd.DataFrame(
[(sl, float(pv)) for sl, pv in zip(s_labels, prob_values, strict=True)],
columns=["s", "value"],
),
description="scenario probabilities (uniform by default)",
)
self._prob_param = prob_param
# Step 4: alpha (Approcimation of future cost from the current stage) and acost (Total approximate cost)
alpha = gp.Variable(
m,
"sddp_alpha",
type="positive",
domain=w,
description="future cost approximation, one value per stage",
)
acost = gp.Variable(
m,
"sddp_acost",
description="total approximate cost: stage_cost + alpha[next stage]",
)
self._alpha = alpha
self._acost = acost
# Step 5: obj_approx
# acost == stage_cost + alpha[w+1]
# For the last stage, Ord(w) < Card(w) is False -> Sum contributes 0.
obj_approx = gp.Equation(
m, "sddp_obj_approx", description="approximate objective with future cost"
)
obj_approx[...] = acost == stage_cost + gp.Sum(
w.where[ww[w] & (gp.Ord(w) < gp.Card(w))],
alpha[w.lead(1)],
)
# Step 6: Benders cuts
# alpha[w+1] - sum_s cont_m_s[jj,i,w+1] * res_s[last_of_w] >= delta[jj,i,w]
# The slope term SUMS over states (a single supporting hyperplane per
# cut, with one slope per state); the intercept is the shared `sddp_d`.
cuts = gp.Equation(
m,
"sddp_cuts",
domain=[j, i_set, w],
description="Benders cuts on future cost",
)
slope_total = None
for sv_k in self._states:
assert sv_k.cut_slope is not None
term = gp.Sum(
tt.where[last_set[w, tt]],
sv_k.cut_slope[jj, i_set, w.lead(1)] * sv_k.variable[tt],
)
slope_total = term if slope_total is None else slope_total + term
assert slope_total is not None
cuts[jj, i_set, w].where[ww[w] & (gp.Ord(w) < gp.Card(w))] = (
alpha[w.lead(1)] - slope_total >= cut_intercept[jj, i_set, w]
)
# Step 7: LP model
solve_opts = gp.Options(
equation_listing_limit=0,
variable_listing_limit=0,
report_solution=2,
solve_link_type="memory",
merge_strategy="clear",
)
gp_model = gp.Model(
m,
"sddp_model",
problem="lp",
equations=list(equations) + [obj_approx, cuts],
sense=gp.Sense.MIN,
objective=acost,
)
# Step 8: Backward GUSS dict
# For each (i, s): fix every state at its trial level, inject inflow[s],
# then collect acost.l and, PER STATE, x_s.m[prev_last] -> that state's
# cut slope.
is_scen = gp.Set(
m,
"sddp_is_scen",
domain=[i_set, s_set],
records=[(il, sl) for il in i_labels for sl in s_labels],
description="backward GUSS scenario set (trial x inflow)",
)
so = gp.Parameter(
m,
"sddp_so",
domain="*",
records=[("SkipBaseCase", 1), ("LogOption", 1), ("UpdateType", 2)],
)
self._so = so
is_noise_b = gp.Parameter(
m,
"sddp_is_noise_b",
domain=[i_set, s_set],
description="scatter: inflow per backward scenario",
)
is_acost = gp.Parameter(
m,
"sddp_is_acost",
domain=[i_set, s_set],
description="extract: acost.l per backward scenario",
)
for sv_k in self._states:
sv_k.is_res_fx = gp.Parameter(
m,
f"sddp_is_res_fx_{sv_k.name}",
domain=[i_set, s_set, t_set],
description=f"scatter: fix {sv_k.name} at trial level",
)
sv_k.is_res_m = gp.Parameter(
m,
f"sddp_is_res_m_{sv_k.name}",
domain=[i_set, s_set, t_set],
description=f"extract: {sv_k.name}.m[prev_last] = cut slope",
)
sv_k.guss_cm = gp.Parameter(
m,
f"sddp_gcm_{sv_k.name}",
domain=i_set,
description=f"cut slope accumulator for {sv_k.name}",
)
dict_b = gp.GUSSScenarioDict(m, "sddp_dict_b", is_scen)
dict_b.add_options(so)
dict_b.add_param(nc.parameter, is_noise_b) # inject scalar inflow
for sv_k in self._states:
assert sv_k.is_res_fx is not None
dict_b.add_fixed(sv_k.variable, sv_k.is_res_fx) # fix at trial level
dict_b.add_level(acost, is_acost) # collect acost.l
for sv_k in self._states:
assert sv_k.is_res_m is not None
dict_b.add_marginal(sv_k.variable, sv_k.is_res_m) # collect x_s.m
# Scenario inflow as a GAMSPy parameter for use in symbolic loop assignments
sw_inflow_param = gp.Parameter(
m,
"sddp_sw_inflow",
domain=[w, s_set],
records=pd.DataFrame(
[
(w_labels[wi], sl, float(nc.scenario_data[wi, si]))
for wi in range(n_stages)
for si, sl in enumerate(s_labels)
],
columns=["w", "s", "value"],
),
description="stochastic inflow data (n_stages x n_scenarios)",
)
# Cut intercept accumulator (shared: a single intercept per cut).
guss_d = gp.Parameter(
m,
"sddp_gd",
domain=i_set,
description="cut intercept accumulator (per trial point)",
)
# CVaR change-of-measure weight accumulators
sp = gp.Alias(m, "sddp_sp", alias_with=s_set)
cvar_A = gp.Parameter(
m,
"sddp_cvar_A",
domain=[i_set, s_set],
description="CVaR: prob mass of scenarios worse than s, per trial",
)
cvar_w = gp.Parameter(
m,
"sddp_cvar_w",
domain=[i_set, s_set],
description="CVaR: backward blended change-of-measure weight",
)
# Step 9: Forward GUSS dict
# n_trials scenarios per stage: one sampled inflow path per trial level.
sampled_path = gp.Set(
m,
"sddp_sampled_path",
domain=[j, w, i_set, s_set],
description="pre-sampled inflow scenario for each (iter, stage, trial)",
) # Path determined in train()
f_scen = gp.Set(
m,
"sddp_f_scen",
domain=[i_set, s_set],
description="active forward scenarios for current stage",
)
f_inflow = gp.Parameter(
m,
"sddp_f_inflow",
domain=[i_set, s_set],
description="scatter: inflow per forward scenario",
)
f_acost = gp.Parameter(
m,
"sddp_f_acost",
domain=[i_set, s_set],
description="extract: acost.l per forward scenario",
)
f_cost = gp.Parameter(
m,
"sddp_f_cost",
domain=[i_set, s_set],
description="extract: stage_cost.l (without alpha)",
)
for sv_k in self._states:
sv_k.f_res_fixed = gp.Parameter(
m,
f"sddp_f_res_fixed_{sv_k.name}",
domain=[i_set, s_set, t_set],
description=f"scatter: fix {sv_k.name} at forward state",
)
sv_k.f_res_level = gp.Parameter(
m,
f"sddp_f_res_level_{sv_k.name}",
domain=[i_set, s_set, t_set],
description=f"extract: {sv_k.name}.l after forward solve",
)
dict_f = gp.GUSSScenarioDict(m, "sddp_dict_f", f_scen)
dict_f.add_options(so)
dict_f.add_param(nc.parameter, f_inflow) # inject inflow
for sv_k in self._states:
assert sv_k.f_res_fixed is not None
dict_f.add_fixed(sv_k.variable, sv_k.f_res_fixed) # fix at forward state
dict_f.add_level(acost, f_acost) # collect acost.l (for diagnostics)
dict_f.add_level(stage_cost, f_cost) # collect stage cost (for zt)
for sv_k in self._states:
assert sv_k.f_res_level is not None
dict_f.add_level(sv_k.variable, sv_k.f_res_level) # res.l (next stage)
# Step 9b: Stage-1 wait-and-see GUSS dict
# n_scenarios LPs in a single GUSS batch, one per stage-1 realization
# of the noise.
is_w1_noise = gp.Parameter(
m,
"sddp_is_w1_noise",
domain=s_set,
description="scatter: precip per stage-1 scenario",
)
is_w1_acost = gp.Parameter(
m,
"sddp_is_w1_acost",
domain=s_set,
description="extract: acost.l per stage-1 scenario (LB)",
)
is_w1_cost = gp.Parameter(
m,
"sddp_is_w1_cost",
domain=s_set,
description="extract: stage_cost.l per stage-1 scenario",
)
for sv_k in self._states:
sv_k.is_w1_init = gp.Parameter(
m,
f"sddp_is_w1_init_{sv_k.name}",
domain=[s_set, t_set],
description=f"scatter: fix {sv_k.name} at initial state for stage 1",
)
sv_k.is_w1_lstate = gp.Parameter(
m,
f"sddp_is_w1_lstate_{sv_k.name}",
domain=[s_set, t_set],
description=f"extract: {sv_k.name}.l per stage-1 scenario",
)
dict_w1 = gp.GUSSScenarioDict(m, "sddp_dict_w1", s_set)
dict_w1.add_options(so)
dict_w1.add_param(nc.parameter, is_w1_noise)
for sv_k in self._states:
assert sv_k.is_w1_init is not None
dict_w1.add_fixed(sv_k.variable, sv_k.is_w1_init)
dict_w1.add_level(acost, is_w1_acost)
dict_w1.add_level(stage_cost, is_w1_cost)
for sv_k in self._states:
assert sv_k.is_w1_lstate is not None
dict_w1.add_level(sv_k.variable, sv_k.is_w1_lstate)
# Step 10: Loop sets and convergence bookkeeping
wp = gp.Alias(m, "sddp_wp", alias_with=w)
wloop = gp.Alias(m, "sddp_wloop", alias_with=w)
wfwd = gp.Alias(m, "sddp_wfwd", alias_with=w)
# revt[wp, wloop]: wp=w1..w51 paired with wloop=w52..w2 (reversed)
revt = gp.Set(
m,
"sddp_revt",
domain=[wp, wloop],
records=[(w_labels[k], w_labels[-(k + 1)]) for k in range(n_stages - 1)],
description="backward pass index map",
)
wfwd_set = gp.Set(
m,
"sddp_wfwd_set",
domain=[wfwd],
records=w_labels[1:],
description="forward pass stages (w2..last)",
)
# Convergence and state-tracking parameters
conv = gp.Parameter(
m,
"sddp_conv",
domain=[j, "*"],
description="convergence bounds per iteration",
)
zt = gp.Parameter(
m,
"sddp_zt",
domain=[j, i_set],
description="accumulated stage cost per forward path",
)
for sv_k in self._states:
sv_k.forward_state = gp.Parameter(
m,
f"sddp_fstate_{sv_k.name}",
domain=i_set,
description=f"{sv_k.name} state at end of current forward stage",
)
sv_k.forward_res_state = gp.Parameter(
m,
f"sddp_frstate_{sv_k.name}",
domain=[j, w, i_set],
description=f"{sv_k.name} state history for adaptive trials",
)
# Adaptive-update support symbols
adapt = gp.Set(
m,
"sddp_adapt",
domain=[t_set, w],
records=[(prev_last_hour[wl], wl) for wl in w_labels[1:]],
description="adaptive trial update: (prev_last_hour, downstream stage)",
)
adapt_t = gp.Set(
m,
"sddp_adapt_t",
domain=t_set,
records=[prev_last_hour[wl] for wl in w_labels[1:]],
description="t indices the adaptive trial update writes to",
)
prev_j_indicator = gp.Parameter(
m,
"sddp_prev_j",
domain=j,
description="indicator (0/1) selecting the previous iteration for adaptive update",
)
# Store everything train() will need
self._w = w
self._w_labels = w_labels
self._t_labels = t_labels
self._last_hour = last_hour
self._prev_last_hour = prev_last_hour
self._hpw = hpw
self._last_set = last_set
self._prevlast_set = prevlast_set
self._j_labels: list[str] = [] # populated by train() once n_iter is set
self._s_labels = s_labels
self._i_labels = i_labels
self._stage_cost_var = stage_cost
self._user_equations = list(equations)
self._obj_approx_eq = obj_approx
self._cuts_eq = cuts
self._solve_opts = solve_opts
self._gp_model = gp_model
self._is_scen = is_scen
self._is_noise_b = is_noise_b
self._is_acost = is_acost
self._dict_b = dict_b
self._sw_inflow_param = sw_inflow_param
self._guss_d = guss_d
self._sp = sp
self._cvar_A = cvar_A
self._cvar_w = cvar_w
self._sampled_path = sampled_path
self._f_scen = f_scen
self._f_inflow = f_inflow
self._f_acost = f_acost
self._f_cost = f_cost
self._dict_f = dict_f
self._dict_w1 = dict_w1
self._is_w1_noise = is_w1_noise
self._is_w1_acost = is_w1_acost
self._is_w1_cost = is_w1_cost
self._wp = wp
self._wloop = wloop
self._wfwd = wfwd
self._revt = revt
self._wfwd_set = wfwd_set
self._conv = conv
self._zt = zt
self._adapt = adapt
self._adapt_t = adapt_t
self._prev_j_ind = prev_j_indicator
self._tt = tt
self._state_hour = prev_last_hour[w_labels[0]]
self._state_orig_lo, self._state_orig_up = self._read_var_bounds(
sv.variable, self._state_hour
)
# Snapshot each state's full user-set bound profile
type_defaults = {
"positive": (0.0, float("inf")),
"negative": (float("-inf"), 0.0),
"binary": (0.0, 1.0),
"integer": (0.0, float("inf")),
}
for sv_k in self._states:
d_lo, d_up = type_defaults.get(
getattr(sv_k.variable, "type", "free"), (float("-inf"), float("inf"))
)
recs = sv_k.variable.records
lo_rows: list[tuple] = []
up_rows: list[tuple] = []
for tl in t_labels:
lo, up = d_lo, d_up
if recs is not None and len(recs) > 0 and "lower" in recs.columns:
domain_col = recs.columns[0]
match = recs[recs[domain_col].astype(str) == tl]
if len(match) > 0:
lo = float(match["lower"].iloc[0])
up = float(match["upper"].iloc[0])
lo_rows.append((tl, lo))
up_rows.append((tl, up))
sv_k.orig_lo_param = gp.Parameter(
m,
f"sddp_orig_lo_{sv_k.name}",
domain=t_set,
records=pd.DataFrame(lo_rows, columns=["t", "value"]),
description=f"snapshot of user lower bounds for {sv_k.name}",
)
sv_k.orig_up_param = gp.Parameter(
m,
f"sddp_orig_up_{sv_k.name}",
domain=t_set,
records=pd.DataFrame(up_rows, columns=["t", "value"]),
description=f"snapshot of user upper bounds for {sv_k.name}",
)
value_cols = {"level", "marginal", "lower", "upper", "scale"}
state_names = {s.name for s in self._states}
self._user_bound_snaps: list[tuple] = []
for v in self._user_variables:
if v.name in state_names:
continue
vrecs = v.records
if vrecs is None or len(vrecs) == 0:
continue
if (vrecs["lower"] == float("-inf")).all() and (
vrecs["upper"] == float("inf")
).all():
continue # fully free variable
vdom = list(v.domain)
dom_cols = [c for c in vrecs.columns if c not in value_cols]
lo_p = gp.Parameter(
m,
f"sddp_blo_{v.name}",
domain=vdom,
records=vrecs[[*dom_cols, "lower"]].rename(columns={"lower": "value"}),
description=f"snapshot of user lower bounds for {v.name}",
)
up_p = gp.Parameter(
m,
f"sddp_bup_{v.name}",
domain=vdom,
records=vrecs[[*dom_cols, "upper"]].rename(columns={"upper": "value"}),
description=f"snapshot of user upper bounds for {v.name}",
)
self._user_bound_snaps.append((v, vdom, lo_p, up_p))
self._built = True
@staticmethod
def _infer_var_bounds(var: gp.Variable) -> tuple[float, float, str]:
type_defaults = {
"positive": (0.0, float("inf")),
"negative": (float("-inf"), 0.0),
"binary": (0.0, 1.0),
"integer": (0.0, float("inf")),
}
default_lo, default_up = type_defaults.get(
getattr(var, "type", "free"), (float("-inf"), float("inf"))
)
recs = var.records
if recs is None or len(recs) == 0:
return default_lo, default_up, "type_default"
if "lower" not in recs.columns or "upper" not in recs.columns:
return default_lo, default_up, "type_default"
return float(recs["lower"].min()), float(recs["upper"].max()), "records"
@staticmethod
def _equation_references_var(equations: list, var: gp.Variable) -> bool:
pattern = re.compile(rf"\b{re.escape(var.name)}\b")
for eq in equations:
text = ""
for accessor in ("getDefinition", "gamsRepr", "latexRepr"):
fn = getattr(eq, accessor, None)
if callable(fn):
try:
text = str(fn())
break
except Exception:
continue
if not text:
text = str(eq)
if pattern.search(text):
return True
return False
@staticmethod
def _read_var_bounds(var: gp.Variable, idx: str) -> tuple[float, float]:
type_defaults = {
"positive": (0.0, float("inf")),
"negative": (float("-inf"), 0.0),
"binary": (0.0, 1.0),
"integer": (0.0, float("inf")),
}
default_lo, default_up = type_defaults.get(
getattr(var, "type", "free"), (float("-inf"), float("inf"))
)
recs = var.records
if recs is None or len(recs) == 0:
return default_lo, default_up
if "lower" not in recs.columns or "upper" not in recs.columns:
return default_lo, default_up
domain_col = recs.columns[0]
match = recs[recs[domain_col].astype(str) == idx]
if len(match) == 0:
return default_lo, default_up
return float(match["lower"].iloc[0]), float(match["upper"].iloc[0])
@staticmethod
def _cvar_backward_weights(
risk: CVaR,
cvar_A: gp.Parameter,
cvar_w: gp.Parameter,
cost: gp.Parameter,
prob_param: gp.Parameter,
sp: gp.Alias,
i_set: gp.Set,
s_set: gp.Set,
) -> None:
cvar_A[i_set, s_set] = gp.Sum(
sp.where[
(cost[i_set, sp] > cost[i_set, s_set])
| (
(cost[i_set, sp] >= cost[i_set, s_set])
& (gp.Ord(sp) < gp.Ord(s_set))
)
],
prob_param[sp],
)
cvar_w[i_set, s_set] = (1.0 - risk.weight) * prob_param[s_set] + ( # ty: ignore[invalid-assignment]
risk.weight
* gp.math.Max(
0.0,
gp.math.Min(prob_param[s_set], risk.tail - cvar_A[i_set, s_set]),
)
/ risk.tail
)
# training (running) the model
[docs]
def train(
self,
n_iter: int = 20,
rel_tol: float | None = None,
patience: int = 5,
risk: CVaR | None = None,
gap_paths: int = 0,
) -> SDDPResult:
"""Run the SDDP iteration loop and return convergence results.
Parameters
----------
n_iter : int
Maximum number of SDDP iterations; training may stop earlier when
``rel_tol`` is set. By default 20.
rel_tol : float | None
Relative lower-bound improvement below which an iteration counts
as a plateau step. By default None.
patience : int
Number of consecutive sub-``rel_tol`` iterations required before
stopping early (must be >= 1). Ignored when ``rel_tol`` is
``None``. By default 5.
risk : CVaR | None
Risk measure to optimize. By default None, the risk-neutral
expectation.
gap_paths : int
If ``>= 1``, after training run an out-of-sample Monte-Carlo
simulation of the trained policy with this many independent paths
and report a statistically meaningful optimality gap (95% CI of the
policy cost vs. the lower bound) on the result. By default 0.
Notes
-----
Pressing CTRL+C during training stops gracefully: the current iteration
is finalized (or, if its in-flight solve was aborted, discarded) and the
policy trained so far is returned with ``stop_reason == "interrupted"``.
Press CTRL+C a second time to abort hard (raises ``KeyboardInterrupt``).
"""
if not self._built:
raise ValidationError("Call build() before train()")
if self._loaded_from_save:
raise ValidationError(
"train() cannot be called on a loaded sddp instance. "
"Loaded instances are read-only; use policy() / simulate() "
"for inference, or retrain from scratch."
)
if n_iter < 1:
raise ValidationError(f"n_iter must be >= 1, got {n_iter}")
if rel_tol is not None and rel_tol <= 0:
raise ValidationError(f"rel_tol must be > 0 when set, got {rel_tol}")
if patience < 1:
raise ValidationError(f"patience must be >= 1, got {patience}")
if risk is not None and not isinstance(risk, CVaR):
raise ValidationError("risk must be a CVaR(...) instance or None")
if gap_paths < 0:
raise ValidationError(f"gap_paths must be >= 0, got {gap_paths}")
# build() invariants: narrow Optional fields for the type checker.
assert self._noise is not None
assert self._j_set is not None
assert self._jj_set is not None
assert self._alpha is not None
assert self._acost is not None
assert self._gp_model is not None
m = self._m
ww = self._active_stage
w = self._w
t = self._time_set
t_set = t
sv = self._states[0]
assert sv.trial_set is not None
assert sv.trial_param is not None
assert sv.cut_slope is not None
assert self._cut_intercept is not None
nc = self._noise
assert nc.scenario_set is not None
i_set = sv.trial_set
s_set = nc.scenario_set
j = self._j_set
jj = self._jj_set
w_labels = self._w_labels
i_labels = self._i_labels
s_labels = self._s_labels
last_set = self._last_set
prevlast_set = self._prevlast_set
gp_model = self._gp_model
solve_opts = self._solve_opts
dict_b = self._dict_b
dict_f = self._dict_f
prob_param = self._prob_param
is_noise_b = self._is_noise_b
is_acost = self._is_acost
guss_d = self._guss_d
cut_intercept = self._cut_intercept
sp = self._sp
cvar_A = self._cvar_A
cvar_w = self._cvar_w
f_scen = self._f_scen
f_inflow = self._f_inflow
f_acost = self._f_acost
f_cost = self._f_cost
dict_w1 = self._dict_w1
is_w1_noise = self._is_w1_noise
is_w1_acost = self._is_w1_acost
is_w1_cost = self._is_w1_cost
wp = self._wp
wloop = self._wloop
wfwd = self._wfwd
revt = self._revt
wfwd_set = self._wfwd_set
sampled_path = self._sampled_path
sw_inflow = self._sw_inflow_param
conv = self._conv
zt = self._zt
adapt = self._adapt
adapt_t = self._adapt_t
prev_j_ind = self._prev_j_ind
tt = self._tt
state_hour = self._state_hour
# Size the iteration machinery for this run
j.setRecords([f"j{k}" for k in range(1, n_iter + 1)])
j_labels = j.toList()
self._j_labels = j_labels
rng = np.random.default_rng(seed=self._seed)
sample_p = nc.probabilities # None -> uniform; preserves seed sequences
sampled_path.setRecords(
[
(jl, wl, il, rng.choice(s_labels, p=sample_p))
for jl in j_labels
for wl in w_labels
for il in i_labels
]
)
# CTRL+C handling
self._stop_requested = False
on_main_thread = threading.current_thread() is threading.main_thread()
prev_sigint = signal.getsignal(signal.SIGINT) # GAMSPy's handler
def _sddp_sigint(signum, frame):
if not self._stop_requested:
self._stop_requested = True
print(
"\n[sddp] interrupted by user - finishing the current "
"iteration; press CTRL+C again to abort."
)
return
# Second CTRL+C: restore the original handler and hard-abort.
if prev_sigint is not None:
signal.signal(signal.SIGINT, prev_sigint)
raise KeyboardInterrupt
def _restore_sigint():
if on_main_thread and prev_sigint is not None:
signal.signal(signal.SIGINT, prev_sigint)
if on_main_thread:
signal.signal(signal.SIGINT, _sddp_sigint)
result = SDDPResult()
result.risk = risk
t_total = time.perf_counter()
prev_lo: float | None = None
flat_streak = 0 # consecutive iterations with sub-rel_tol LB gain
completed = 0 # fully finished iterations (-> result.iterations_run)
lo = up = up_95 = sigma = float("nan") # last completed iteration's stats
for pos, j_label in enumerate(j_labels):
# A CTRL+C during the previous iteration's tail -> stop cleanly here.
if self._stop_requested:
result.stop_reason = "interrupted"
break
prev_j = j_labels[pos - 1] if pos > 0 else None
t_iter = time.perf_counter()
interrupted = False
# Single-sync pattern: pre-bump prevents nested Loop.__exit__ syncs.
# One explicit _synch_with_gams() at the end covers the full iteration.
m._in_loop += 1
try:
########## Adaptive trial-level update ##########
# Replace the static uniform grid with the previous iteration's
# forward-pass reservoir states. The whole update is a single
# symbolic GAMS assignment; the `prev_j_ind` parameter selects
# which iteration's frs to pull from, and the `adapt` set
# threads (prev_last_hour[wl], wl) so the inner Sum has exactly
# one non-zero term per (i, t). Clamp + eps floor are done
# in-line via gp.math.Max/Min so we still get only one GAMS
# statement (vs. ~n_trials x (n_stages - 1) per-iter before).
if prev_j is not None:
prev_j_ind[j] = 0
prev_j_ind[prev_j] = 1
for sv_k in self._states:
assert sv_k.trial_param is not None
assert sv_k.forward_res_state is not None
sv_k.trial_param[i_set, tt].where[adapt_t[tt]] = gp.math.Max(
self._EPS,
gp.math.Min(
sv_k.upper_bound,
gp.math.Max(
sv_k.lower_bound,
gp.Sum(
gp.Domain(j, w).where[
prev_j_ind[j] & adapt[tt, w]
],
sv_k.forward_res_state[j, w, i_set],
),
),
),
)
# Reset per-iteration accumulators
zt[j_label, i_set] = 0
for sv_k in self._states:
assert sv_k.forward_state is not None
assert sv_k.forward_res_state is not None
sv_k.forward_state[i_set] = 0
sv_k.forward_res_state[j_label, w, i_set] = 0
# Backward pass
with gp.Loop(gp.Domain(wp, wloop).where[revt[wp, wloop]]):
# Activate the current backward stage
ww[w] = False
ww[wloop] = True
# Restore user bounds + scatter each state's trial level.
# GUSS leaves a state variable fixed at the prior batch's
# last scenario, which would pin the current stage's free
# decision variable at a stale value.
for sv_k in self._states:
assert sv_k.is_res_fx is not None
assert sv_k.trial_param is not None
assert sv_k.orig_lo_param is not None
assert sv_k.orig_up_param is not None
sv_k.variable.lo[t_set] = sv_k.orig_lo_param[t_set]
sv_k.variable.up[t_set] = sv_k.orig_up_param[t_set]
# Scatter: fix x_s[prev_last_of_wloop] at trial level[i]
sv_k.is_res_fx[i_set, s_set, t_set] = 0
sv_k.is_res_fx[i_set, s_set, tt].where[
prevlast_set[wloop, tt]
] = sv_k.trial_param[i_set, tt]
# Scatter: inject inflow scenario for this stage (same for all i)
is_noise_b[i_set, s_set] = sw_inflow[wloop, s_set]
# GUSS solve: (n_trials x n_scenarios) LPs in one batch
gp_model.solve(options=solve_opts, scenario=dict_b)
# Change-of-measure weight per scenario: nominal probability,
# or the CVaR Rockafellar-Uryasev weight when risk-averse.
if isinstance(risk, CVaR):
self._cvar_backward_weights(
risk, cvar_A, cvar_w, is_acost, prob_param, sp, i_set, s_set
)
weight = cvar_w[i_set, s_set]
else:
weight = prob_param[s_set]
# Cut slope, per state: weighted x_s.m[prev_last] over scenarios.
for sv_k in self._states:
assert sv_k.guss_cm is not None
assert sv_k.is_res_m is not None
sv_k.guss_cm[i_set] = gp.Sum(
gp.Domain(s_set, tt).where[prevlast_set[wloop, tt]],
weight * sv_k.is_res_m[i_set, s_set, tt],
)
# Cut intercept (shared): V(trial) - sum_s slope_s * trial_s.
# Uses LP strong duality: delta = E[acost.l] - sum_s slope_s*x0_s.
slope_dot_trial = None
for sv_k in self._states:
assert sv_k.guss_cm is not None
assert sv_k.trial_param is not None
term = sv_k.guss_cm[i_set] * gp.Sum(
tt.where[prevlast_set[wloop, tt]],
sv_k.trial_param[i_set, tt],
)
slope_dot_trial = (
term if slope_dot_trial is None else slope_dot_trial + term
)
assert slope_dot_trial is not None
guss_d[i_set] = ( # ty: ignore[invalid-assignment]
gp.Sum(s_set, weight * is_acost[i_set, s_set]) - slope_dot_trial
)
# Store cut: per-state slope, shared intercept; activate jj.
for sv_k in self._states:
assert sv_k.cut_slope is not None
assert sv_k.guss_cm is not None
sv_k.cut_slope[j_label, i_set, wloop] = sv_k.guss_cm[i_set]
cut_intercept[j_label, i_set, wloop.lag(1)] = guss_d[i_set]
jj[j_label] = True
# Forward stage 1: wait-and-see GUSS batch
ww[w] = False
ww[w_labels[0]] = True
# Restore user bounds + fix each state at its initial level.
# GUSS leaves variables at the prior batch's last scenario, same
# as the backward sweep.
for sv_k in self._states:
assert sv_k.is_w1_init is not None
assert sv_k.is_w1_lstate is not None
assert sv_k.orig_lo_param is not None
assert sv_k.orig_up_param is not None
sv_k.variable.lo[t_set] = sv_k.orig_lo_param[t_set]
sv_k.variable.up[t_set] = sv_k.orig_up_param[t_set]
sv_k.is_w1_init[s_set, t_set] = 0
sv_k.is_w1_init[s_set, state_hour] = max(
sv_k.initial_state
if sv_k.initial_state is not None
else sv_k.lower_bound,
self._EPS,
)
sv_k.is_w1_lstate[s_set, t_set] = 0
# Scatter: precip per scenario (shared)
is_w1_noise[s_set] = sw_inflow[w_labels[0], s_set]
is_w1_acost[s_set] = 0
is_w1_cost[s_set] = 0
gp_model.solve(options=solve_opts, scenario=dict_w1)
conv[j_label, "lo"] = gp.Sum(
s_set, prob_param[s_set] * is_w1_acost[s_set]
)
# Per-trial stage-1 cost: pick each trial's pre-sampled scenario
zt[j_label, i_set] = gp.Sum(
s_set.where[sampled_path[j_label, w_labels[0], i_set, s_set]],
is_w1_cost[s_set],
)
# Per-trial forward state per state: x_s[last_of_w1] from the
# trial's sampled scenario.
for sv_k in self._states:
assert sv_k.forward_state is not None
assert sv_k.forward_res_state is not None
assert sv_k.is_w1_lstate is not None
sv_k.forward_state[i_set] = gp.Sum(
gp.Domain(s_set, tt).where[
sampled_path[j_label, w_labels[0], i_set, s_set]
& last_set[w_labels[0], tt]
],
sv_k.is_w1_lstate[s_set, tt],
)
sv_k.forward_res_state[j_label, w_labels[1], i_set] = (
sv_k.forward_state[i_set]
)
# Forward pass:
with gp.Loop(wfwd.where[wfwd_set[wfwd]]):
ww[w] = False
ww[wfwd] = True
# Select the pre-sampled inflow scenario for each trial point
f_scen[i_set, s_set] = False
f_scen[i_set, s_set].where[
sampled_path[j_label, wfwd, i_set, s_set]
] = True
# Scatter: inject inflow from sampled scenario
f_inflow[i_set, s_set] = 0
f_inflow[i_set, s_set].where[f_scen[i_set, s_set]] = sw_inflow[
wfwd, s_set
]
# Per state: restore bounds, fix x_s[prev_last] at the forward
# state, reset its level extract.
for sv_k in self._states:
assert sv_k.f_res_fixed is not None
assert sv_k.f_res_level is not None
assert sv_k.forward_state is not None
assert sv_k.orig_lo_param is not None
assert sv_k.orig_up_param is not None
sv_k.variable.lo[t_set] = sv_k.orig_lo_param[t_set]
sv_k.variable.up[t_set] = sv_k.orig_up_param[t_set]
sv_k.f_res_fixed[i_set, s_set, t_set] = 0
sv_k.f_res_fixed[i_set, s_set, tt].where[
f_scen[i_set, s_set] & prevlast_set[wfwd, tt]
] = sv_k.forward_state[i_set]
sv_k.f_res_level[i_set, s_set, t_set] = 0
# Reset shared extract containers
f_acost[i_set, s_set] = 0
f_cost[i_set, s_set] = 0
# GUSS solve
gp_model.solve(options=solve_opts, scenario=dict_f)
# Accumulate stage cost into zt
zt[j_label, i_set] = zt[j_label, i_set] + gp.Sum( # ty: ignore[invalid-assignment]
s_set.where[f_scen[i_set, s_set]],
f_cost[i_set, s_set],
)
# Advance each state for the next stage
for sv_k in self._states:
assert sv_k.f_res_level is not None
assert sv_k.forward_state is not None
assert sv_k.forward_res_state is not None
sv_k.forward_state[i_set] = gp.Sum(
gp.Domain(s_set, tt).where[
f_scen[i_set, s_set] & last_set[wfwd, tt]
],
sv_k.f_res_level[i_set, s_set, tt],
)
sv_k.forward_res_state[j_label, wfwd.lead(1), i_set] = (
sv_k.forward_state[i_set]
)
# Upper bound
# Average total-path cost across all n_trials forward trajectories
conv[j_label, "up"] = gp.Sum(i_set, zt[j_label, i_set]) / gp.Card(i_set)
except GamspyException:
if self._stop_requested:
result.stop_reason = "interrupted"
interrupted = True
else:
_restore_sigint()
raise
finally:
m._in_loop -= 1
# Single explicit sync, now that the pre-bump is released so it runs
# at the baseline _in_loop and actually flushes. Wrapped so a CTRL+C
# that aborts the sync's own GAMS job is finalized too.
if not interrupted:
try:
m._synch_with_gams()
except GamspyException:
if self._stop_requested:
result.stop_reason = "interrupted"
interrupted = True
else:
_restore_sigint()
raise
if interrupted:
break
# Safety: jj must round-trip back to Python after every iteration.
# If it doesn't, downstream policy()/simulate() solves see jj empty,
# the Benders cuts deactivate, alpha collapses to its prior bound,
# and decisions are silently wrong. Catches refactor breakage of
# the load_symbols list at the moment it happens, not three steps
# downstream when a user sees a nonsense policy result.
assert jj.records is not None and len(jj.records) >= pos + 1, (
f"jj sync failed after iteration {j_label}: expected at least "
f"{pos + 1} record(s), got "
f"{0 if jj.records is None else len(jj.records)}. "
f"Check that `jj` is in the load_symbols list above."
)
# Read back convergence bounds
# Filter for this iteration's rows
cv = _pv_at_j(conv, j_label)
lo = cv.get("lo", 0.0)
elapsed = time.perf_counter() - t_iter
# Forward-pass cost estimate from this iteration's trial paths
# zt[j_label, il] = total cost on path il.
zt_row = _pv_at_j(zt, j_label)
path_costs = np.array([zt_row.get(il, 0.0) for il in i_labels])
n_paths = len(path_costs)
up = float(np.mean(path_costs))
sigma = float(np.std(path_costs, ddof=1)) if n_paths > 1 else 0.0
up_95 = up + 1.96 * sigma / np.sqrt(n_paths)
if prev_lo is not None and lo < prev_lo - 1e-3:
_restore_sigint()
raise GamspyException(
f"Lower bound decreased at {j_label}: {prev_lo:,.3f} -> {lo:,.3f}"
)
# LB-plateau stopping
if rel_tol is not None and prev_lo is not None:
rel_gain = (lo - prev_lo) / max(abs(prev_lo), 1e-12)
flat_streak = flat_streak + 1 if rel_gain < rel_tol else 0
prev_lo = lo
row = {
"iteration": j_label,
"lo": lo,
"up": up,
"sigma": sigma,
"up_95": up_95,
"elapsed": elapsed,
}
result.convergence_table.append(row)
if self._verbose:
print(
f" {j_label:>4s}: "
f"bound = {_sci(lo):>14s} "
f"sim cost = {_sci(up):>14s} ± {_sci(sigma):>12s} "
f"[{elapsed:.1f}s]"
)
completed += 1
# A CTRL+C landed during this iteration but the solve still finished
if self._stop_requested:
result.stop_reason = "interrupted"
break
# Stop once the LB has plateaued for `patience` consecutive iters
if rel_tol is not None and flat_streak >= patience:
result.stop_reason = "converged"
break
_restore_sigint()
result.iterations_run = completed
total = time.perf_counter() - t_total
result.lower_bound = lo
result.upper_bound = up
result.upper_bound_95 = up_95
result.sigma = sigma
result.total_time = total
# Optional end-of-training optimality-gap estimate.
if gap_paths >= 1 and completed > 0 and result.stop_reason != "interrupted":
self._sim_call_count += 1
gap_sim = self._run_simulation(
gap_paths,
self._resolve_report(None),
self._seed + 997,
self._sim_call_count,
quiet=True,
)
costs = gap_sim.total_cost.to_numpy(dtype=float)
result.policy_cost_mean = float(costs.mean())
result.policy_cost_stderr = (
float(costs.std(ddof=1) / np.sqrt(len(costs)))
if len(costs) > 1
else 0.0
)
result.policy_cost_paths = int(gap_paths)
if self._verbose:
print(f"\nTotal: {total:.1f}s ({total / max(completed, 1):.1f}s/iter avg)")
print(f"\n{result}")
return result
# persistence (save / load)
[docs]
def save(self, path: str) -> None:
"""Serialize this sddp instance to a single ``.sddp`` file.
The saved artifact contains the host ``Container`` (via
``gp.serialize``) plus a small JSON sidecar mapping symbol names
to sddp roles. The output is loadable in a different Python
process / notebook with ``SDDP.load(path)`` and supports
``policy()`` / ``simulate()`` immediately; it does not support
further training (see ``SDDP.load`` for the rationale).
Parameters
----------
path : str
Output path. Must end with ``.sddp``.
Raises
------
ValidationError
If the instance was not built, or ``path`` does not end
with ``.sddp``.
"""
if not path.endswith(".sddp"):
raise ValidationError(f"path must end with .sddp but found {path}")
if not self._built:
raise ValidationError(
"Cannot save() a sddp instance that was not built and trained."
)
from gamspy.formulations.sddp.persistence import (
_collect_sddp_metadata,
_pack,
)
metadata = _collect_sddp_metadata(self)
_pack(self._m, metadata, path)
[docs]
@classmethod
def load(cls, path: str) -> SDDP:
"""Load an sddp instance.
The returned instance is **read-only**: ``policy()`` and
``simulate()`` work as expected, but ``add_state()`` /
``set_noise()`` / ``build()`` / ``train()`` will raise. To add
more iterations, retrain from scratch.
Parameters
----------
path : str
Path to a ``.sddp`` file produced by ``SDDP.save()``.
Returns
-------
SDDP
An sddp instance reattached to the deserialized Container.
Raises
------
ValidationError
If ``path`` does not end with ``.sddp``, the file is missing,
malformed, references symbols absent from the Container, or
carries a major version different from the current sddp module.
"""
if not path.endswith(".sddp"):
raise ValidationError(f"path must end with .sddp but found {path}")
from gamspy.formulations.sddp.persistence import (
_reattach_sddp,
_unpack,
_validate_metadata,
)
container, metadata = _unpack(path)
_validate_metadata(metadata)
return _reattach_sddp(container, metadata)
# simulate
[docs]
def simulate(
self,
n_paths: int = 100,
report: list[gp.Variable] | None = None,
seed: int | None = None,
) -> SimulationResult:
"""Run the trained policy on fresh Monte Carlo paths.
Parameters
----------
n_paths : int
Number of independent simulation paths. By default 100.
report : list[Variable] | None
GAMSPy Variables to capture per (path, stage). By default None,
which captures every state variable.
seed : int | None
Sampler seed. By default None, which sets it to ``train_seed + 1``.
Returns
-------
SimulationResult
Pivot-shaped DataFrames (paths x stages) for total cost, stage
costs, realised noise, and each reported variable.
"""
if not self._built:
raise ValidationError("Call build() before simulate()")
if n_paths < 1:
raise ValidationError(f"n_paths must be >= 1, got {n_paths}")
resolved = self._resolve_report(report)
for v in resolved:
if list(v.domain) != [self._time_set]:
raise NotImplementedError(
f"simulate() v1 only captures variables with domain==[time_set]; "
f"got {v.name} with domain={v.domain}"
)
if seed is None:
seed = self._seed + 1
self._sim_call_count += 1
return self._run_simulation(n_paths, resolved, seed, self._sim_call_count)
def _run_simulation(
self,
n_paths: int,
report: list[gp.Variable],
seed: int,
call_id: int,
quiet: bool = False,
) -> SimulationResult:
t0 = time.perf_counter()
# build() invariants: narrow Optional fields for the type checker.
assert self._noise is not None
assert self._jj_set is not None
assert self._gp_model is not None
assert self._acost is not None
m = self._m
sv = self._states[0]
assert sv.trial_set is not None
assert sv.cut_slope is not None
assert sv.orig_lo_param is not None
assert sv.orig_up_param is not None
nc = self._noise
assert nc.scenario_set is not None
s_set = nc.scenario_set
w = self._w
t_set = self._time_set
tt = self._tt
ww = self._active_stage
w_labels = self._w_labels
s_labels = self._s_labels
last_set = self._last_set
prevlast_set = self._prevlast_set
state_hour = self._state_hour
sw_inflow = self._sw_inflow_param
# Every state variable's post-solve level drives forward propagation, so
# it must be extracted even if the caller's `report` omits it.
state_vars = [sv_k.variable for sv_k in self._states]
_report_names = {v.name for v in report}
extract_vars = list(report) + [
v for v in state_vars if v.name not in _report_names
]
sfx = f"_{call_id}"
# Build per-call simulation symbols
p_labels = [f"p{k}" for k in range(1, n_paths + 1)]
sim_p = gp.Set(
m,
f"sddp_sim_p{sfx}",
records=p_labels,
description="simulation paths",
)
rng = np.random.default_rng(seed)
sample_p = nc.probabilities # None -> uniform; preserves seed sequences
sample_recs = [
(p, wl, rng.choice(s_labels, p=sample_p))
for p in p_labels
for wl in w_labels
]
sim_sample = gp.Set(
m,
f"sddp_sim_sample{sfx}",
domain=[sim_p, w, s_set],
records=sample_recs,
description="pre-sampled noise scenario per (path, stage)",
)
# Active (path, scenario) tuples for the current stage, mutated in loop.
sim_active = gp.Set(
m,
f"sddp_sim_active{sfx}",
domain=[sim_p, s_set],
description="active simulation scenarios for current stage",
)
# Scatter parameters
sim_noise = gp.Parameter(
m,
f"sddp_sim_noise{sfx}",
domain=[sim_p, s_set],
description="scatter: noise per simulation scenario",
)
sim_state: dict[str, gp.Parameter] = {}
for sv_k in self._states:
sim_state[sv_k.variable.name] = gp.Parameter(
m,
f"sddp_sim_state_{sv_k.name}{sfx}",
domain=[sim_p, s_set, t_set],
description=f"scatter: fix {sv_k.name} per simulation scenario",
)
# Extract parameters
sim_acost = gp.Parameter(
m,
f"sddp_sim_acost{sfx}",
domain=[sim_p, s_set],
description="extract: acost.l per simulation scenario",
)
sim_cost = gp.Parameter(
m,
f"sddp_sim_cost{sfx}",
domain=[sim_p, s_set],
description="extract: stage_cost.l per simulation scenario",
)
sim_var_extract: dict[str, gp.Parameter] = {}
for v in extract_vars:
sim_var_extract[v.name] = gp.Parameter(
m,
f"sddp_sim_var_{v.name}{sfx}",
domain=[sim_p, s_set, t_set],
description=f"extract: {v.name}.l per simulation scenario",
)
# Per-stage history (path x stage)
sim_cost_hist = gp.Parameter(
m,
f"sddp_sim_cost_hist{sfx}",
domain=[sim_p, w],
description="stage cost per (path, stage)",
)
sim_noise_hist = gp.Parameter(
m,
f"sddp_sim_noise_hist{sfx}",
domain=[sim_p, w],
description="realised noise per (path, stage)",
)
sim_var_hist: dict[str, gp.Parameter] = {}
for v in report:
sim_var_hist[v.name] = gp.Parameter(
m,
f"sddp_sim_{v.name}_hist{sfx}",
domain=[sim_p, w],
description=f"history: {v.name}.l at last_of_stage per (path, stage)",
)
# Forward state tracker per state (path -> end-of-previous-stage level)
sim_fwd_state: dict[str, gp.Parameter] = {}
for sv_k in self._states:
sim_fwd_state[sv_k.variable.name] = gp.Parameter(
m,
f"sddp_sim_fwd_state_{sv_k.name}{sfx}",
domain=sim_p,
description=f"end-of-previous-stage {sv_k.name} per path",
)
# GUSS dict for the simulation
sim_dict = gp.GUSSScenarioDict(m, f"sddp_sim_dict{sfx}", sim_active)
sim_dict.add_options(self._so)
sim_dict.add_param(nc.parameter, sim_noise)
for sv_k in self._states:
sim_dict.add_fixed(sv_k.variable, sim_state[sv_k.variable.name])
sim_dict.add_level(self._acost, sim_acost)
sim_dict.add_level(self._stage_cost_var, sim_cost)
for v in extract_vars:
sim_dict.add_level(v, sim_var_extract[v.name])
# Forward simulation: one GUSS batch per stage
m._in_loop += 1
try:
for stage_idx, wl in enumerate(w_labels):
# Activate this stage
ww[w] = False
ww[wl] = True
# Build active (path, scenario) set for this stage
sim_active[sim_p, s_set] = sim_sample[sim_p, wl, s_set]
# Scatter noise: only the sampled scenario per path is non-zero
sim_noise[sim_p, s_set] = 0
sim_noise[sim_p, s_set].where[sim_active[sim_p, s_set]] = sw_inflow[
wl, s_set
]
# Per state: restore bounds and fix the incoming state.
for sv_k in self._states:
assert sv_k.orig_lo_param is not None
assert sv_k.orig_up_param is not None
sv_k.variable.lo[t_set] = sv_k.orig_lo_param[t_set]
sv_k.variable.up[t_set] = sv_k.orig_up_param[t_set]
state_param = sim_state[sv_k.variable.name]
state_param[sim_p, s_set, t_set] = 0
if stage_idx == 0:
initial = (
sv_k.initial_state
if sv_k.initial_state is not None
else sv_k.lower_bound
)
state_param[sim_p, s_set, state_hour].where[
sim_active[sim_p, s_set]
] = max(initial, self._EPS)
else:
state_param[sim_p, s_set, tt].where[
sim_active[sim_p, s_set] & prevlast_set[wl, tt]
] = sim_fwd_state[sv_k.variable.name][sim_p]
# Reset extracts
sim_acost[sim_p, s_set] = 0
sim_cost[sim_p, s_set] = 0
for v in extract_vars:
sim_var_extract[v.name][sim_p, s_set, t_set] = 0
# GUSS solve: n_paths LPs in one batch
self._gp_model.solve(options=self._solve_opts, scenario=sim_dict)
# Record per-stage history
sim_cost_hist[sim_p, wl] = gp.Sum(
s_set.where[sim_active[sim_p, s_set]],
sim_cost[sim_p, s_set],
)
sim_noise_hist[sim_p, wl] = gp.Sum(
s_set.where[sim_active[sim_p, s_set]],
sim_noise[sim_p, s_set],
)
for v in report:
sim_var_hist[v.name][sim_p, wl] = gp.Sum(
gp.Domain(s_set, tt).where[
sim_active[sim_p, s_set] & last_set[wl, tt]
],
sim_var_extract[v.name][sim_p, s_set, tt],
)
# Advance each state's forward value for the next stage.
if stage_idx < len(w_labels) - 1:
for sv_k in self._states:
sim_fwd_state[sv_k.variable.name][sim_p] = gp.Sum(
gp.Domain(s_set, tt).where[
sim_active[sim_p, s_set] & last_set[wl, tt]
],
sim_var_extract[sv_k.variable.name][sim_p, s_set, tt],
)
except Exception:
m._in_loop -= 1
raise
m._in_loop -= 1
# Single sync flushes the loop. The per-stage history params assigned
# above (sim_cost_hist, sim_noise_hist, sim_*_hist) flag themselves for
# load-back, so the container loads them automatically.
m._synch_with_gams()
# Aggregate into DataFrames
cost_df = self._pivot_history(sim_cost_hist, p_labels, w_labels)
noise_df = self._pivot_history(sim_noise_hist, p_labels, w_labels)
var_dfs = {
v.name: self._pivot_history(sim_var_hist[v.name], p_labels, w_labels)
for v in report
}
total_cost = cost_df.sum(axis=1)
total_cost.name = "total_cost"
elapsed = time.perf_counter() - t0
sim_result = SimulationResult(
n_paths=n_paths,
total_cost=total_cost,
stage_costs=cost_df,
noise=noise_df,
variables=var_dfs,
elapsed=elapsed,
)
if self._verbose and not quiet:
print(f"\nsimulate: {sim_result} [{elapsed:.1f}s]")
return sim_result
# policy point query
[docs]
def policy(
self,
stage: str,
state: float | dict[str, float],
noise: float,
report: list[gp.Variable] | None = None,
) -> PolicyResult:
"""Query the trained policy at a single situation.
Answers: *"I'm in `stage`, my state arrived at `state`, this stage's
noise realised as `noise`. What is the optimal decision and what
does it cost me from here on?"*
Parameters
----------
stage : str
Stage label to stand in (must be one of the defined stages).
state : float | dict[str, float]
The state value(s) entering this stage. With a single registered
state variable, pass a scalar (e.g. ``180``). With several states,
pass a ``dict`` keyed by state-variable name, e.g.
``{"L_up": 120, "L_dn": 200}`` where its keys must match the registered
states exactly. A scalar with multiple states raises ``ValidationError``.
noise : float
The realised noise value for this stage.
report : list[Variable] | None
Variables whose optimal level to return. By default None, which
returns every state variable.
Returns
-------
PolicyResult
``stage``, ``incoming_state`` (scalar for one state, ``dict`` for
several), ``noise``, ``approx_cost_to_go`` and ``decisions``
(``{var_name: level}``).
"""
if not self._built:
raise ValidationError("Call build() and train() before policy()")
# build() invariants: narrow Optional fields for the type checker.
assert self._noise is not None
assert self._jj_set is not None
assert self._gp_model is not None
assert self._acost is not None
if stage not in self._w_labels:
raise ValidationError(
f"stage {stage!r} is not a valid stage; "
f"expected one of {self._w_labels}"
)
if self._jj_set.records is None or len(self._jj_set.records) == 0:
warnings.warn(
"policy() called before train(); no cuts exist yet, so the "
"future-cost approximation is absent and the returned decision "
"is NOT the trained policy. Call train() first.",
UserWarning,
stacklevel=2,
)
nc = self._noise
state_values = self._resolve_state(state)
resolved = self._resolve_report(report)
for v in resolved:
if not v.domain or v.domain[0] != self._time_set:
raise NotImplementedError(
f"policy() requires variables whose first domain element "
f"is the time set; got {v.name} with "
f"domain={v.domain}"
)
ww = self._active_stage
w = self._w
t_set = self._time_set
ph = self._prev_last_hour[stage]
lt = self._last_hour[stage]
# Activate only this stage
ww[w] = False
ww[stage] = True
# Restore user bounds, then fix each state at its incoming value.
self._restore_user_bounds()
for sv_k in self._states:
assert sv_k.orig_lo_param is not None
assert sv_k.orig_up_param is not None
sv_k.variable.lo[t_set] = sv_k.orig_lo_param[t_set]
sv_k.variable.up[t_set] = sv_k.orig_up_param[t_set]
sv_k.variable.fx[ph] = max(state_values[sv_k.variable.name], self._EPS)
# Inject the realised noise
nc.parameter[...] = float(noise)
# Single plain solve
self._gp_model.solve(options=self._solve_opts)
cost_to_go = self._scalar_level(self._acost)
decisions = {v.name: self._extract_decision(v, lt) for v in resolved}
# Logical cleanup: unfix every state
for sv_k in self._states:
assert sv_k.orig_lo_param is not None
assert sv_k.orig_up_param is not None
sv_k.variable.lo[t_set] = sv_k.orig_lo_param[t_set]
sv_k.variable.up[t_set] = sv_k.orig_up_param[t_set]
incoming: float | dict[str, float] = (
state_values[self._states[0].variable.name]
if len(self._states) == 1
else state_values
)
result = PolicyResult(
stage=stage,
incoming_state=incoming,
noise=float(noise),
approx_cost_to_go=cost_to_go,
decisions=decisions,
)
return result
@staticmethod
def _scalar_level(var: gp.Variable) -> float:
rec = var.records
if rec is None or len(rec) == 0:
return 0.0
return float(rec["level"].iloc[0])
@staticmethod
def _extract_decision(var: gp.Variable, t_label: str):
"""Pull a variable's level(s) at the given time label.
Returns:
- ``float`` for 1-D (time-only) variables.
- ``dict[str, float]`` for 2-D variables (time + one other dim),
keyed by the non-time dimension's label.
- ``dict[tuple[str, ...], float]`` for 3+-D variables (time + n
other dims), keyed by a tuple of the non-time dim labels in
declaration order.
"""
rec = var.records
dim = var.dimension
if rec is None or len(rec) == 0:
return 0.0 if dim == 1 else {}
tcol = rec.columns[0]
match = rec[rec[tcol].astype(str) == t_label]
if len(match) == 0:
return 0.0 if dim == 1 else {}
if dim == 1:
return float(match["level"].iloc[0])
# Multi-dim: filter to records at t_label, key by remaining dim values.
value_cols = {"level", "marginal", "lower", "upper", "scale"}
other_dim_cols = [c for c in rec.columns if c != tcol and c not in value_cols]
if len(other_dim_cols) == 1:
col = other_dim_cols[0]
return {str(row[col]): float(row["level"]) for _, row in match.iterrows()}
return {
tuple(str(row[c]) for c in other_dim_cols): float(row["level"])
for _, row in match.iterrows()
}
@staticmethod
def _pivot_history(
param: gp.Parameter,
row_labels: list[str],
col_labels: list[str],
) -> pd.DataFrame:
rec = param.records
if rec is None or len(rec) == 0:
return pd.DataFrame(0.0, index=row_labels, columns=col_labels)
cols = [c for c in rec.columns if c != "value"]
pivot = rec.pivot(index=cols[0], columns=cols[1], values="value")
pivot = pivot.reindex(index=row_labels, columns=col_labels)
pivot = pivot.fillna(0.0)
pivot.index.name = "path"
pivot.columns.name = "stage"
return pivot
# helpers
@property
def n_stages(self) -> int:
full_w = getattr(self, "_w", self._stage_parent)
recs = full_w.records
return len(recs) if recs is not None else 0
def __repr__(self) -> str:
states = [sv.name for sv in self._states]
noise = self._noise.parameter.name if self._noise else "none"
return (
f"SDDP(stages={self.n_stages}, "
f"states={states}, noise={noise}, built={self._built})"
)
def _pv_at_j(param: gp.Parameter, j_label: str) -> dict:
rec = param.records
if rec is None or len(rec) == 0:
return {}
cols = [c for c in rec.columns if c != "value"]
if not cols:
return {}
mask = rec[cols[0]].astype(str).values == j_label
if not mask.any():
return {}
sub = rec.loc[mask]
other = cols[1:]
vals = sub["value"].astype(float).values
if len(other) == 0:
# Leading column was the only domain, so return a single-entry dict.
return {(): float(vals[0])}
if len(other) == 1:
keys = sub[other[0]].astype(str).values
return dict(zip(keys, vals, strict=False))
key_cols = [sub[c].astype(str).values for c in other]
return dict(zip(zip(*key_cols, strict=False), vals, strict=False))