from __future__ import annotations
import os
import threading
from collections.abc import Sequence
from typing import TYPE_CHECKING, Literal
import gamspy as gp
import gamspy._gdx as gdxio
from gamspy._algebra.condition import Condition
from gamspy._algebra.domain import Domain
from gamspy._symbols.implicits import ImplicitSet
from gamspy.exceptions import ValidationError
if TYPE_CHECKING:
from gamspy import Alias, Container, Parameter, Set
from gamspy._algebra.expression import Expression
from gamspy._algebra.operation import Card, Operation
from gamspy._symbols.implicits import ImplicitParameter
from gamspy.math import MathOp
# Dictionary to track the container used in the most recent If/ElseIf block
_last_containers: dict[tuple[int, int], Container] = {}
[docs]
class Loop:
"""
A context manager to execute a group of statements iteratively for each member of a set or domain.
The Loop class maps to the GAMS `loop` statement. It is particularly useful for
cases where parallel assignments are not sufficient, such as iterative calculations,
nested loops, or modifying models and solving them repeatedly.
Parameters
----------
indices : Set | Alias | ImplicitSet | Condition | Domain | Sequence[Set | Alias]
The controlling domain of the loop. This can be a single Set, a sequence of Sets,
or a domain restricted by a logical condition (using `.where`).
Examples
--------
**1. Simple iteration over a single Set:**
>>> import gamspy as gp
>>> m = gp.Container()
>>> t = gp.Set(m, records=["1985", "1986", "1987"])
>>> pop = gp.Parameter(m, domain=t, records=[("1985", 3456)])
>>> growth = gp.Parameter(m, domain=t, records=[("1985", 25.3), ("1986", 27.3)])
>>> with gp.Loop(t):
... pop[t + 1] = pop[t] + growth[t]
**2. Iteration with a logical condition (dollar condition):**
You can restrict the loop domain using the `.where` attribute on Sets or Domains.
>>> i = gp.Set(m, records=["i1", "i2", "i3"])
>>> j = gp.Set(m, records=["j1", "j2", "j3"])
>>> q = gp.Parameter(m, domain=[i, j], records=[("i1", "j1", 1), ("i1", "j2", 3)])
>>> x = gp.Parameter(m, records=1)
>>> with gp.Loop(gp.Domain(i, j).where[q[i, j] > 0]):
... x[...] = x[...] + q[i, j]
**3. Nested Loops:**
Loops can be nested using standard Python indentation.
>>> a = gp.Parameter(m, domain=[i, j])
>>> b = gp.Parameter(m)
>>> a.generateRecords()
>>> with gp.Loop(i):
... with gp.Loop(j):
... b[...] = a[i, j]
"""
def __init__(
self,
indices: Set
| Alias
| ImplicitSet
| Condition
| Domain
| Sequence[Set | Alias]
| MathOp,
):
self.indices = indices
self._loop_number = -1
self.container = self._find_container()
def _find_container(self) -> Container:
if isinstance(
self.indices,
(gp.Set, gp.Alias, Condition, Domain, ImplicitSet, gp.math.MathOp),
):
return self.indices.container # type: ignore
elif isinstance(self.indices, Sequence):
for elem in self.indices:
if hasattr(elem, "container"):
return elem.container
raise ValidationError(
f"`{type(self.indices)}` is not an allowed type for a loop index. "
)
def _index_repr(self) -> str:
if isinstance(
self.indices,
(gp.Set, gp.Alias, Condition, Domain, ImplicitSet, gp.math.MathOp),
):
return self.indices.gamsRepr()
elif isinstance(self.indices, Sequence):
representations = [index.gamsRepr() for index in self.indices]
return f"({','.join(representations)})"
raise ValidationError(
f"`{type(self.indices)}` is not an allowed type for a loop index. "
)
@property
def Break(self) -> None:
"""
Breaks the execution of the current loop prematurely.
This property maps to the GAMS `break` statement. Note that you can only
break out of the innermost loop currently executing. Attempting to break an
outer loop from within an inner loop will raise a ValidationError.
Raises
------
ValidationError
If attempting to break an outer loop without breaking the inner loop first.
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> i = gp.Set(m, records=["i1", "i2", "i3"])
>>> j = gp.Set(m, records=["j1", "j2", "j3"])
>>> cnt = gp.Parameter(m, records=0)
>>> with gp.Loop(i) as loop:
... with gp.Loop(j) as loop2:
... cnt[...] += 1
... loop2.Break # Successfully breaks the inner loop
... loop.Break # Successfully breaks the outer loop
"""
if self._loop_number < self.container._in_loop:
raise ValidationError(
"You cannot break this loop. You should break the inner loop first."
)
self.container._add_statement("break;")
@property
def Continue(self) -> None:
"""
Skips the remaining statements in the current iteration and proceeds to the next one.
This property maps to the GAMS `continue` statement. It gives additional
control over the execution of loop structures by allowing you to bypass
the rest of the loop block for the current domain element.
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> i = gp.Set(m, records=["i1", "i2", "i3", "i4"])
>>> cnt = gp.Parameter(m, records=0)
>>> with gp.Loop(i) as loop:
... with gp.If(gp.Ord(i) == 2):
... loop.Continue # Skips incrementing for "i2"
... cnt[...] += 1
"""
self.container._add_statement("continue;")
def __enter__(self) -> Loop:
self.container._in_loop += 1
self._loop_number = self.container._in_loop
self.container._add_statement(f"loop({self._index_repr()},")
return self
def __exit__(self, exc_type, exc, tb):
self.container._in_loop -= 1
self.container._add_statement(");")
self.container._last_control_flow = "loop"
if self.container._in_loop == 0: # Run only in the most outer loop
self.container._synch_with_gams()
symbol_names = gdxio._get_symbol_names_from_gdx(
self.container.system_directory, self.container._gdx_out
)
for name in symbol_names:
self.container._data[name]._should_load_from_gams = True
[docs]
class For:
"""
A context manager to execute a group of statements iteratively over a numerical range.
The For class maps to the GAMS `for` statement. It allows you to iterate over a
range of numerical values, incrementing or decrementing a scalar parameter at each step.
It is useful for iterative algorithmic calculations that require a numerical counter,
rather than iterating over elements of a set.
Parameters
----------
index : Parameter
A scalar Parameter used as the numerical loop counter.
start : int | float | Parameter | Expression | Card | Operation | MathOp
The starting value of the loop counter.
end : int | float | Parameter | Expression | Card | Operation | MathOp
The final value of the loop counter.
step : int | float | Parameter | Expression | Card | Operation | MathOp, optional
The increment or decrement step size. Defaults to 1.
direction : Litera['to', 'downto']
The direction of the step. 'to' steps upwards, 'downto' steps downwards. Defaults to 'to'.
Examples
--------
**1. Simple iteration over a numerical range:**
>>> import gamspy as gp
>>> m = gp.Container()
>>> i = gp.Parameter(m)
>>> cnt = gp.Parameter(m, records=0)
>>> with gp.For(i, 1, 10):
... cnt[...] += i
**2. Iterating backwards**
When a negative step is provided, the loop iterate downwards.
>>> x = gp.Parameter(m, records=10)
>>> with gp.For(i, 10, 1, 2, direction="downto"):
... x[...] = x[...] - 2
**3. Using Parameters as loop bounds:**
You can use other parameters or expressions to define the boundaries of the loop.
>>> start_val = gp.Parameter(m, records=5)
>>> end_val = gp.Parameter(m, records=15)
>>> with gp.For(i, start_val, end_val):
... cnt[...] += 1
"""
def __init__(
self,
index: Parameter,
start: int
| float
| Parameter
| ImplicitParameter
| Expression
| Card
| Operation
| MathOp,
end: int
| float
| Parameter
| ImplicitParameter
| Expression
| Card
| Operation
| MathOp,
step: int
| float
| Parameter
| ImplicitParameter
| Expression
| Card
| Operation
| MathOp = 1,
direction: Literal["to", "downto"] = "to",
):
if not isinstance(index, gp.Parameter):
raise TypeError(
f"`index` must be a scalar Parameter but given {type(index)}"
)
if index.dimension != 0:
raise ValidationError(
f"`index` parameter must be a scalar but given index dimension is {index.dimension}"
)
self.index = index
self.start = start
self.end = end
self.step = step
self.direction = direction
self._loop_number = -1
self.container = index.container
@property
def Break(self) -> None:
"""
Breaks the execution of the current loop prematurely.
This property maps to the GAMS `break` statement. Note that you can only
break out of the innermost loop currently executing. Attempting to break an
outer loop from within an inner loop will raise a ValidationError.
Raises
------
ValidationError
If attempting to break an outer loop without breaking the inner loop first.
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> i = gp.Parameter(m)
>>> cnt = gp.Parameter(m, records=0)
>>> with gp.For(i, 1, 10) as my_for:
... cnt[...] += 1
... with gp.If(i == 5):
... my_for.Break # Exits the loop when `i` reaches 5
"""
if self._loop_number < self.container._in_loop:
raise ValidationError(
"You cannot break this for loop. You should break the inner loop first."
)
self.container._add_statement("break;")
@property
def Continue(self) -> None:
"""
Skips the remaining statements in the current iteration and proceeds to the next one.
This property maps to the GAMS `continue` statement. It gives additional
control over the execution of loop structures by allowing you to bypass
the rest of the loop block for the current counter value.
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> i = gp.Parameter(m)
>>> cnt = gp.Parameter(m, records=0)
>>> with gp.For(i, 1, 10) as my_for:
... with gp.If(i == 5):
... my_for.Continue # Skips incrementing `cnt` when `i` is 5
... cnt[...] += 1
"""
self.container._add_statement("continue;")
def __enter__(self) -> For:
self.container._in_loop += 1
self._loop_number = self.container._in_loop
index_str = self.index.gamsRepr()
start_str = (
str(self.start)
if isinstance(self.start, (int, float))
else self.start.gamsRepr()
)
end_str = (
str(self.end) if isinstance(self.end, (int, float)) else self.end.gamsRepr()
)
step_str = (
str(self.step)
if isinstance(self.step, (int, float))
else self.step.gamsRepr()
)
self.container._add_statement(
f"for({index_str} = {start_str} {self.direction} {end_str} by {step_str}, "
)
return self
def __exit__(self, exc_type, exc, tb):
self.container._in_loop -= 1
self.container._add_statement(");")
self.container._last_control_flow = "for"
if self.container._in_loop == 0: # Run only in the most outer loop
self.container._synch_with_gams()
symbol_names = gdxio._get_symbol_names_from_gdx(
self.container.system_directory, self.container._gdx_out
)
for name in symbol_names:
self.container._data[name]._should_load_from_gams = True
class While:
"""
A context manager to execute a group of statements repeatedly as long as a
condition evaluates to True.
The While class maps to the GAMS `while` statement. It is useful for
processes that must repeat an unknown number of times until a specific
logical condition is met.
Parameters
----------
condition : Expression | Condition | Operation | MathOp | Parameter
The logical condition that must remain true to continue executing the nested statements.
Examples
--------
**1. Iteratively dividing a number:**
>>> import gamspy as gp
>>> m = gp.Container()
>>> x = gp.Parameter(m, records=100)
>>> cnt = gp.Parameter(m, records=0)
>>> with gp.While(x > 1):
... x[...] = x / 2
... cnt[...] += 1
"""
def __init__(
self, condition: Expression | Condition | Operation | MathOp | Parameter
):
self.condition = condition
if not isinstance(condition.container, gp.Container):
raise ValidationError(
f"Could not find the container in the given condition `{condition}`. Hence, gp.While operation is not possible."
)
self.container = condition.container
self._loop_number = -1
@property
def Break(self) -> None:
"""
Breaks the execution of the current while loop prematurely.
This property maps to the GAMS `break` statement. Note that you can only
break out of the innermost loop currently executing.
Raises
------
ValidationError
If attempting to break an outer loop without breaking the inner loop first.
"""
if self._loop_number < self.container._in_loop:
raise ValidationError(
"You cannot break this while loop. You should break the inner loop first."
)
self.container._add_statement("break;")
@property
def Continue(self) -> None:
"""
Skips the remaining statements in the current iteration and proceeds to the next one.
"""
self.container._add_statement("continue;")
def __enter__(self) -> While:
self.container._in_loop += 1
self._loop_number = self.container._in_loop
representation = self.condition.gamsRepr()
representation = gp.utils._replace_equality_signs(representation)
self.container._add_statement(f"while({representation},")
return self
def __exit__(self, exc_type, exc, tb):
self.container._in_loop -= 1
self.container._add_statement(");")
self.container._last_control_flow = "while"
if self.container._in_loop == 0: # Run only in the most outer loop
self.container._synch_with_gams()
symbol_names = gdxio._get_symbol_names_from_gdx(
self.container.system_directory, self.container._gdx_out
)
for name in symbol_names:
self.container._data[name]._should_load_from_gams = True
[docs]
class If:
"""
A context manager to conditionally execute a group of statements.
The If class maps to the GAMS `if` statement. It allows you to branch
conditionally around a group of execution statements within a loop.
Parameters
----------
condition : Expression
The logical condition that must be satisfied to execute the nested statements.
Examples
--------
**1. Skipping iterations conditionally:**
>>> import gamspy as gp
>>> m = gp.Container()
>>> i = gp.Set(m, records=[f"i{idx}" for idx in range(1, 11)])
>>> cnt = gp.Parameter(m, records=0)
>>> with gp.Loop(i) as loop:
... with gp.If(gp.math.mod(gp.Ord(i), 2) == 0):
... loop.Continue
... cnt[...] += 1
**2. Breaking a loop based on a condition:**
>>> with gp.Loop(i) as loop:
... with gp.If(i.sameAs("i6")):
... loop.Break
... cnt[...] += 1
"""
def __init__(
self, condition: Expression | Condition | Operation | MathOp | Parameter
):
self.condition = condition
if not isinstance(condition.container, gp.Container):
raise ValidationError(
f"Could not find the container in the given condition `{condition}`. Hence, gp.If operation is not possible."
)
self.container = condition.container
# Track the container for potential succeeding ElseIf/Else statements
pid = os.getpid()
tid = threading.get_native_id()
_last_containers[(pid, tid)] = self.container
def __enter__(self):
if not self.container._in_loop:
raise ValidationError(
"`gp.If` context manager can only be used in `gp.Loop` context managers. Use regular Python if statements instead."
)
representation = self.condition.gamsRepr()
representation = gp.utils._replace_equality_signs(representation)
self.container._add_statement(f"if ({representation},")
def __exit__(self, exc_type, exc, tb):
self.container._add_statement(");")
self.container._last_control_flow = "if"
class ElseIf:
"""
A context manager to conditionally execute a group of statements if the preceding
`If` or `ElseIf` condition was False and the current condition is True.
Parameters
----------
condition : Expression
The logical condition that must be satisfied to execute the nested statements.
"""
def __init__(
self, condition: Expression | Condition | Operation | MathOp | Parameter
):
self.condition = condition
if not isinstance(condition.container, gp.Container):
raise ValidationError(
f"Could not find the container in the given condition `{condition}`. Hence, gp.ElseIf operation is not possible."
)
self.container = condition.container
# Track the container for potential succeeding ElseIf/Else statements
pid = os.getpid()
tid = threading.get_native_id()
_last_containers[(pid, tid)] = self.container
def __enter__(self):
if not self.container._in_loop:
raise ValidationError(
"`gp.ElseIf` context manager can only be used in `gp.Loop` context managers."
)
if getattr(self.container, "_last_control_flow", None) not in ("if", "elseif"):
raise ValidationError(
"`gp.ElseIf` must follow a `gp.If` or `gp.ElseIf` block."
)
last_statement = self.container._unsaved_statements[-1]
if (
not self.container._unsaved_statements
or not isinstance(last_statement, str)
or self.container._unsaved_statements[-1] != ");"
):
raise ValidationError(
"`gp.ElseIf` must immediately follow a `gp.If` or `gp.ElseIf` block without any intervening statements."
)
# Remove the closing parenthesis of the previous block to continue the chain
self.container._unsaved_statements.pop()
representation = self.condition.gamsRepr()
representation = gp.utils._replace_equality_signs(representation)
self.container._add_statement(f"elseif {representation},")
return self
def __exit__(self, exc_type, exc, tb):
self.container._add_statement(");")
self.container._last_control_flow = "elseif"
class Else:
"""
A context manager to execute a group of statements if all preceding `If` and `ElseIf`
conditions were False.
"""
def __init__(self):
pid = os.getpid()
tid = threading.get_native_id()
container = _last_containers.get((pid, tid))
if not isinstance(container, gp.Container):
raise ValidationError(
"Could not find the container. Hence, gp.Else operation is not possible. "
"Ensure gp.Else follows a gp.If or gp.ElseIf statement."
)
self.container = container
def __enter__(self):
if not getattr(self.container, "_in_loop", 0):
raise ValidationError(
"`gp.Else` context manager can only be used in `gp.Loop` context managers."
)
if getattr(self.container, "_last_control_flow", None) not in ("if", "elseif"):
raise ValidationError(
"`gp.Else` must follow a `gp.If` or `gp.ElseIf` block."
)
last_statement = self.container._unsaved_statements[-1]
if (
not self.container._unsaved_statements
or not isinstance(last_statement, str)
or self.container._unsaved_statements[-1] != ");"
):
raise ValidationError(
"`gp.Else` must immediately follow a `gp.If` or `gp.ElseIf` block without any intervening statements."
)
# Remove the closing parenthesis of the previous block to continue the chain
self.container._unsaved_statements.pop()
self.container._add_statement("else")
return self
def __exit__(self, exc_type, exc, tb):
self.container._add_statement(");")
self.container._last_control_flow = "else"