from __future__ import annotations
import io
import logging
import os
import uuid
from enum import Enum
from typing import TYPE_CHECKING
from gams.core.gmd import gmdCloseLicenseSession
import gamspy as gp
import gamspy._algebra.expression as expression
import gamspy._algebra.operation as operation
import gamspy._symbols.implicits as implicits
import gamspy._validation as validation
import gamspy.utils as utils
from gamspy._backend.backend import backend_factory
from gamspy._convert import GamsConverter, LatexConverter
from gamspy._model_instance import ModelInstance
from gamspy._options import EXECUTION_OPTIONS, MODEL_ATTR_OPTION_MAP, Options
from gamspy.exceptions import GamspyException, ValidationError
if TYPE_CHECKING:
from typing import Iterable, Literal
import pandas as pd
from gamspy import Container, Equation, Parameter, Variable
from gamspy._algebra.expression import Expression
from gamspy._algebra.operation import Operation
from gamspy._backend.engine import EngineClient
from gamspy._backend.neos import NeosClient
from gamspy._options import ModelInstanceOptions
from gamspy._symbols.implicits import ImplicitParameter
from gamspy._symbols.symbol import Symbol
IS_MIRO_INIT = os.getenv("MIRO", False)
logger = logging.getLogger("MODEL")
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter("[%(name)s - %(levelname)s] %(message)s")
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
[docs]
class Problem(Enum):
"""An enumeration for problem all problem types"""
LP = "LP"
NLP = "NLP"
QCP = "QCP"
DNLP = "DNLP"
MIP = "MIP"
RMIP = "RMIP"
MINLP = "MINLP"
RMINLP = "RMINLP"
MIQCP = "MIQCP"
RMIQCP = "RMIQCP"
MCP = "MCP"
CNS = "CNS"
MPEC = "MPEC"
RMPEC = "RMPEC"
EMP = "EMP"
MPSGE = "MPSGE"
@classmethod
def values(cls):
"""Convenience function to return all values of enum"""
return list(cls._value2member_map_.keys())
def __str__(self) -> str:
return self.value
[docs]
class Sense(Enum):
"""An enumeration for sense types"""
MIN = "MIN"
MAX = "MAX"
FEASIBILITY = "FEASIBILITY"
@classmethod
def values(cls):
"""Convenience function to return all values of enum"""
return list(cls._value2member_map_.keys())
def __str__(self) -> str:
return self.value
[docs]
class ModelStatus(Enum):
"""An enumeration for model status types"""
OptimalGlobal = 1
OptimalLocal = 2
Unbounded = 3
InfeasibleGlobal = 4
InfeasibleLocal = 5
InfeasibleIntermed = 6
Feasible = 7
Integer = 8
NonIntegerIntermed = 9
IntegerInfeasible = 10
LicenseError = 11
ErrorUnknown = 12
ErrorNoSolution = 13
NoSolutionReturned = 14
SolvedUnique = 15
Solved = 16
SolvedSingular = 17
UnboundedNoSolution = 18
InfeasibleNoSolution = 19
[docs]
class SolveStatus(Enum):
"""An enumeration for solve status types"""
NormalCompletion = 1
IterationInterrupt = 2
ResourceInterrupt = 3
TerminatedBySolver = 4
EvaluationInterrupt = 5
CapabilityError = 6
LicenseError = 7
UserInterrupt = 8
SetupError = 9
SolverError = 10
InternalError = 11
Skipped = 12
SystemError = 13
INTERRUPT_STATUS = [
SolveStatus.IterationInterrupt,
SolveStatus.ResourceInterrupt,
SolveStatus.EvaluationInterrupt,
SolveStatus.UserInterrupt,
SolveStatus.TerminatedBySolver,
]
ERROR_STATUS = {
SolveStatus.CapabilityError: "The solver does not have the capability required by the model.",
SolveStatus.LicenseError: "The solver cannot find the appropriate license key needed to use a specific subsolver.",
SolveStatus.SetupError: "The solver encountered a fatal failure during problem set-up time.",
SolveStatus.SolverError: "The solver encountered a fatal error.",
SolveStatus.InternalError: "The solver encountered an internal fatal error.",
SolveStatus.SystemError: "This indicates a completely unknown or unexpected error condition.",
}
# GAMS name -> GAMSPy name
ATTRIBUTE_MAP = {
"domUsd": "num_domain_violations",
"etAlg": "algorithm_time",
"etSolve": "total_solve_time",
"etSolver": "total_solver_time",
"iterUsd": "num_iterations",
"marginals": "marginals",
"maxInfes": "max_infeasibility",
"meanInfes": "mean_infeasibility",
"modelStat": "status",
"nodUsd": "num_nodes_used",
"numDepnd": "num_dependencies",
"numDVar": "num_discrete_variables",
"numEqu": "num_equations",
"numInfes": "num_infeasibilities",
"numNLIns": "num_nonlinear_insts",
"numNLNZ": "num_nonlinear_zeros",
"numNOpt": "num_nonoptimalities",
"numNZ": "num_nonzeros",
"numRedef": "num_mcp_redefinitions",
"numVar": "num_variables",
"numVarProj": "num_bound_projections",
"objEst": "objective_estimation",
"objVal": "objective_value",
"procUsed": "used_model_type",
"resGen": "model_generation_time",
"resUsd": "solve_model_time",
"solveStat": "solve_status",
"sumInfes": "sum_infeasibilities",
"sysVer": "solver_version",
}
[docs]
class Model:
"""
Represents a list of equations to be solved.
Parameters
----------
container : Container
Container of the model.
name : str, optional
Name of the model. Name is autogenerated by default.
equations : Iterable[Equation]
Iterable of Equation objects.
problem : Problem or str, optional
'LP', 'NLP', 'QCP', 'DNLP', 'MIP', 'RMIP', 'MINLP', 'RMINLP', 'MIQCP', 'RMIQCP', 'MCP', 'CNS', 'MPEC', 'RMPEC', 'EMP', or 'MPSGE',
by default Problem.LP.
sense : Sense, optional
"MIN", "MAX", or "FEASIBILITY".
objective : Variable | Expression, optional
Objective variable to minimize or maximize or objective itself.
matches : dict[Equation, Variable]
Equation - Variable matches for MCP models.
limited_variables : Iterable, optional
Allows limiting the domain of variables used in a model.
external_module: str, optional
The name of the external module in which the external equations are implemented
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> v = gp.Variable(m, "v")
>>> e = gp.Equation(m, "e", definition= v == 5)
>>> my_model = gp.Model(m, "my_model", "LP", [e])
"""
# Prefix for auto-generated symbols
_generate_prefix = "autogenerated_"
def __init__(
self,
container: Container,
name: str | None = None,
problem: Problem | str = Problem.LP,
equations: Iterable[Equation] = [],
sense: Sense | str | None = None,
objective: Variable | Expression | None = None,
matches: dict[Equation, Variable] | None = None,
limited_variables: Iterable[Variable] | None = None,
external_module: str | None = None,
):
self._auto_id = "m" + str(uuid.uuid4()).replace("-", "_")
if name is not None:
name = validation.validate_name(name)
self.name = validation.validate_model_name(name)
else:
self.name = self._auto_id
self.container = container
self._matches = matches
self.problem, self.sense = validation.validate_model(
equations, problem, sense
)
self.equations = list(equations)
self._objective_variable = self._set_objective_variable(objective)
if (
self._objective_variable is not None
and self._objective_variable.type.lower() != "free"
):
raise ValidationError(
f"Objective variable `{self._objective_variable}` must be a free variable"
)
self._limited_variables = limited_variables
if not self.equations and not self._matches:
raise ValidationError("Model requires at least one equation.")
self._external_module_file = None
self._external_module = None
if external_module is not None:
self.external_module = external_module
else:
# To avoid adding it twice
self.container._add_statement(self)
# allow freezing
self._is_frozen = False
# Attributes
self.num_domain_violations = None
self.algorithm_time = None
self.total_solve_time = None
self.total_solver_time = None
self.num_iterations = None
self.marginals = None
self.max_infeasibility = None
self.mean_infeasibility = None
self.status: ModelStatus | None = None
self.num_nodes_used = None
self.num_dependencies = None
self.num_discrete_variables = None
self.num_infeasibilities = None
self.num_nonlinear_insts = None
self.num_nonlinear_zeros = None
self.num_nonoptimalities = None
self.num_nonzeros = None
self.num_mcp_redefinitions = None
self.num_variables = None
self.num_bound_projections = None
self.objective_estimation = None
self.objective_value = None
self.used_model_type = None
self.model_generation_time = None
self.solve_model_time = None
self.sum_infeasibilities = None
self.solve_status: SolveStatus | None = None
self.solver_version = None
self.container._synch_with_gams()
def __repr__(self) -> str:
return f"Model(name={self.name}, problem={self.problem}, equations={self.equations}, sense={self.sense}, objective={self._objective_variable}, matches={self._matches}, limited_variables={self._limited_variables}"
def __str__(self) -> str:
return (
f"Model {self.name}:\n Problem Type: {self.problem}\n Sense:"
f" {self.sense}\n Equations: {self.equations}"
)
@property
def external_module(self) -> str | None:
"""
Name of the external module in which the external equations are implemented.
By default, this parameter is set to None. When provided, it triggers the
opening of the specified file using a File statement and incorporates the
file into the model by adding it as an external module.
This feature requires a solid understanding of programming, compilation,
and linking processes. For more information, please refer to the
https://www.gams.com/latest/docs/UG_ExternalEquations.html .
Returns
-------
str | None
"""
return self._external_module
@external_module.setter
def external_module(self, value: str | None):
if self._external_module_file is not None:
self.container._add_statement(
f"putclose {self._external_module_file};"
)
write_model_statement = value != self._external_module
self._external_module = None
self._external_module_file = None
if value is not None:
filename = "f" + str(uuid.uuid4()).replace("-", "_")
self._external_module_file = filename
self._external_module = value
self.container._add_statement(f"File {filename} / '{value}' /;")
if write_model_statement:
self.container._add_statement(self)
def _generate_obj_var_and_equation(self) -> tuple[Variable, Equation]:
variable = gp.Variable._constructor_bypass(
self.container,
f"{self.name}_objective_variable",
domain=[],
)
equation = gp.Equation._constructor_bypass(
self.container,
f"{self.name}_objective",
domain=[],
)
return variable, equation
def _set_objective_variable(
self,
assignment: None | Variable | Operation | Expression = None,
) -> Variable | None:
"""
Returns objective variable. If the assignment is an Expression
or an Operation (Sum, Product etc.), it automatically generates
an objective variable and a equation.
Returns
-------
Variable | None
Raises
------
TypeError
In case assignment is not a Variable, Expression or an Operation.
"""
if assignment is not None and not isinstance(
assignment,
(gp.Variable, expression.Expression, operation.Operation),
):
raise TypeError(
"Objective must be a Variable or an Expression but"
f" {type(assignment)} given"
)
if self.sense == gp.Sense.FEASIBILITY:
if assignment is not None:
raise ValidationError(
"Cannot set an objective when the sense is FEASIBILITY!"
)
if self.problem in [gp.Problem.CNS, gp.Problem.MCP]:
raise ValidationError(
"Problem type cannot be CNS or MCP when the sense is"
" FEASIBILITY"
)
variable, equation = self._generate_obj_var_and_equation()
statement = expression.Expression(
implicits.ImplicitEquation(
equation,
name=equation.name,
type=equation.type,
domain=[],
),
"..",
variable == 0,
)
self.container._add_statement(statement)
equation._definition = statement
equation.modified = False
variable.modified = False
if equation.name not in [symbol.name for symbol in self.equations]:
self.equations.append(equation)
return variable
if isinstance(
assignment, (expression.Expression, operation.Operation)
):
variable, equation = self._generate_obj_var_and_equation()
# Sum((i,j),c[i,j]*x[i,j])->Sum((i,j),c[i,j]*x[i,j]) =e= var
assignment = assignment == variable
# equation .. Sum((i,j),c[i,j]*x[i,j]) =e= var
statement = expression.Expression(
implicits.ImplicitEquation(
equation,
name=equation.name,
type=equation.type,
domain=[],
),
"..",
assignment,
)
self.container._add_statement(statement)
equation._definition = statement
equation.modified = False
variable.modified = False
if equation.name not in [symbol.name for symbol in self.equations]:
self.equations.append(equation)
return variable
return assignment
def _generate_solve_string(self) -> str:
solve_string = f"solve {self.name} using {self.problem}"
if self.sense:
if self.sense == gp.Sense.FEASIBILITY:
# Set sense as min or max for feasibility
self.sense = gp.Sense.MIN # type: ignore
solve_string += f" {self.sense}"
if self._objective_variable is not None:
solve_string += f" {self._objective_variable.gamsRepr()}"
return solve_string + ";"
def _add_model_attr_options(self, options: Options) -> None:
for key, value in options.model_dump(exclude_none=True).items():
if key in MODEL_ATTR_OPTION_MAP:
if isinstance(value, bool):
value = int(value)
elif isinstance(value, str):
value = f"'{value}'"
self.container._add_statement(
f"{self.name}.{MODEL_ATTR_OPTION_MAP[key]} = {value};\n"
)
elif key in EXECUTION_OPTIONS:
self.container._add_statement(
f"{EXECUTION_OPTIONS[key]} '{value}';\n"
)
def _append_solve_string(self) -> None:
solve_string = self._generate_solve_string()
self.container._add_statement(solve_string + "\n")
def _create_model_attributes(self) -> None:
self.container._add_statement("$offListing")
for attr_name in ATTRIBUTE_MAP:
symbol_name = f"{self._generate_prefix}{attr_name}_{self._auto_id}"
_ = gp.Parameter._constructor_bypass(self.container, symbol_name)
self.container._add_statement(
f"{symbol_name} = {self.name}.{attr_name};"
)
self.container._add_statement("$onListing")
def _update_model_attributes(self) -> None:
container = self.container._temp_container
gdx_handle = utils._open_gdx_file(
self.container.system_directory, self.container._gdx_out
)
for gams_attr, python_attr in ATTRIBUTE_MAP.items():
symbol_name = f"{self._generate_prefix}{gams_attr}_{self._auto_id}"
data = utils._get_scalar_data(
container._gams2np, gdx_handle, symbol_name
)
if python_attr == "status":
setattr(self, python_attr, ModelStatus(data))
elif python_attr == "solve_status":
status = SolveStatus(data)
setattr(self, python_attr, status)
if status in INTERRUPT_STATUS:
logger.warn(
f"The solve was interrupted! Solve status: {status.name}. "
"For further information, see https://www.gams.com/latest/docs/UG_GAMSOutput.html#UG_GAMSOutput_SolverStatus."
)
elif status in ERROR_STATUS:
raise GamspyException(
f"Solve status: {status.name}. {ERROR_STATUS[status]}",
status.value,
)
else:
setattr(self, python_attr, data)
utils._close_gdx_handle(gdx_handle)
self.container._temp_container.data = {}
[docs]
def computeInfeasibilities(self) -> dict[str, pd.DataFrame]:
"""
Computes infeasabilities for all equations of the model
Returns
-------
dict[str, pd.DataFrame]
Dictionary of infeasibilities where equation names are keys and
infeasibilities are values
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> i = gp.Set(m, name="i", records=["i1", "i2"])
>>> j = gp.Set(m, name="j", records=["j1", "j2", "j3"])
>>> a = gp.Parameter(m, name="a", domain=i, records=[("i1", 350), ("i2", 600)])
>>> b = gp.Parameter(m, name="b", domain=j, records=[("j1", 400), ("j2", 450), ("j3", 420)])
>>> x = gp.Variable(m, name="x", domain=[i,j], type="Positive")
>>> s = gp.Equation(m, name="s", domain=i)
>>> d = gp.Equation(m, name="d", domain=j)
>>> s[i] = gp.Sum(j, x[i, j]) <= a[i]
>>> d[j] = gp.Sum(i, x[i, j]) >= b[j]
>>> my_model = gp.Model(m, name="my_model", equations=m.getEquations(), problem="LP", sense="min", objective=gp.Sum((i, j), x[i, j]))
>>> summary = my_model.solve()
>>> infeasibilities = my_model.computeInfeasibilities()
>>> infeasibilities["s"].infeasibility.item()
320.0
"""
infeas_dict = {}
for equation in self.equations:
if equation.records is None:
continue
infeas_rows = utils._calculate_infeasibilities(equation)
infeas_dict[equation.name] = infeas_rows
return infeas_dict
[docs]
def getEquationListing(
self,
n: int | None = None,
infeasibility_threshold: float | None = None,
) -> list[str]:
"""
Returns the generated equations.
Parameters
----------
n : int | None, optional
Number of equations to be returned.
infeasibility_threshold: float, optional
Filters out equations with infeasibilities that are above this value.
Returns
-------
list[str]
"""
listings = []
for equation in self.equations:
listings += equation.getEquationListing(
infeasibility_threshold=infeasibility_threshold
)
return listings[:n]
[docs]
def getVariableListing(self, n: int | None = None) -> list[str]:
"""
Returns the variable listing.
Parameters
----------
n : int | None, optional
Number of variables to be returned.
Returns
-------
list[str]
"""
if not hasattr(self, "_variables"):
raise ValidationError(
"The model must be solved with `variable_listing_limit` option for this functionality to work."
)
listings = []
for variable in self._variables:
listings += variable.getVariableListing()
return listings[:n]
[docs]
def interrupt(self) -> None:
"""
Sends interrupt signal to the running job.
Raises
------
ValidationError
If the job is not initialized
"""
self.container._stop_socket()
[docs]
def freeze(
self,
modifiables: list[Parameter | ImplicitParameter],
options: Options | None = None,
) -> None:
"""
Freezes all symbols except modifiable symbols.
Parameters
----------
modifiables : List[Parameter | ImplicitParameter]
freeze_options : dict, optional
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> a = gp.Parameter(m, name="a", records=10)
>>> x = gp.Variable(m, name="x")
>>> e = gp.Equation(m, name="e", definition= x <= a)
>>> my_model = gp.Model(m, name="my_model", equations=m.getEquations(), problem="LP", sense="max", objective=x)
>>> solved = my_model.solve()
>>> float(x.toValue())
10.0
>>> my_model.freeze(modifiables=[a])
>>> a.setRecords(35)
>>> solved = my_model.solve()
>>> float(x.toValue())
35.0
"""
self._is_frozen = True
self.instance = ModelInstance(
self.container, self, modifiables, options
)
[docs]
def unfreeze(self) -> None:
"""Unfreezes the model"""
self._is_frozen = False
gmdCloseLicenseSession(self.instance.instance.sync_db._gmd)
[docs]
def solve(
self,
solver: str | None = None,
options: Options | None = None,
solver_options: dict | None = None,
model_instance_options: ModelInstanceOptions | dict | None = None,
output: io.TextIOWrapper | None = None,
backend: Literal["local", "engine", "neos"] = "local",
client: EngineClient | NeosClient | None = None,
load_symbols: list[Symbol] | None = None,
) -> pd.DataFrame | None:
"""
Solves the model with given options.
Parameters
----------
solver : str, optional
Solver name
options : Options, optional
GAMS options
solver_options : dict, optional
Solver options
model_instance_options : optional
Model instance options
output : TextIOWrapper, optional
Output redirection target
backend : str, optional
Backend to run on
client : EngineClient, NeosClient, optional
EngineClient to communicate with GAMS Engine or NEOS Client to communicate with NEOS Server
Returns
-------
DataFrame, optional
Summary of the solve
Raises
------
ValidationError
In case engine_config is not provided for `engine` backend or
neos_client is not provided for `neos` backend.
ValueError
In case problem is not in possible problem types
ValueError
In case sense is different than "MIN" or "MAX"
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> v = gp.Variable(m, "v")
>>> e = gp.Equation(m, "e", definition= v == 5)
>>> my_model = gp.Model(m, "my_model", "LP", [e], "max", v)
>>> solved = my_model.solve()
"""
validation.validate_solver_args(
self.container.system_directory,
solver,
self.problem,
options,
output,
load_symbols,
)
validation.validate_equations(self)
if options is None:
options = (
self.container._options
if self.container._options
else gp.Options()
)
options._set_solver_options(
working_directory=self.container.working_directory,
solver=solver,
problem=self.problem,
solver_options=solver_options,
)
self._add_model_attr_options(options)
if self._is_frozen:
self.instance.solve(solver, model_instance_options, output)
return None
self._append_solve_string()
self._create_model_attributes()
runner = backend_factory(
self.container,
options,
output,
backend,
client,
self,
load_symbols,
)
summary = runner.run()
if IS_MIRO_INIT:
self.container._write_default_gdx_miro()
return summary
[docs]
def getDeclaration(self) -> str:
"""
Declaration of the Model in GAMS
Returns
-------
str
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> v = gp.Variable(m, "v")
>>> e = gp.Equation(m, "e", definition= v == 5)
>>> my_model = gp.Model(m, "my_model", "LP", [e])
>>> my_model.getDeclaration()
'Model my_model / e /;'
"""
equations = []
for equation in self.equations:
if self._matches:
if equation not in self._matches:
equations.append(equation.name)
else:
equations.append(equation.name)
equations_str = ",".join(equations)
if self._matches:
matches_str = ",".join(
[
f"{equation.name}.{variable.name}"
for equation, variable in self._matches.items()
]
)
equations_str = (
",".join([equations_str, matches_str])
if equations
else matches_str
)
if self._external_module_file:
equations_str = ",".join(
[equations_str, self._external_module_file]
)
if self._limited_variables:
limited_variables_str = ",".join(
[variable.gamsRepr() for variable in self._limited_variables]
)
equations_str += "," + limited_variables_str
model_str = f"Model {self.name}"
if equations_str != "":
model_str += f" / {equations_str} /"
model_str += ";"
return model_str
[docs]
def toGams(self, path: str, options: Options | None = None) -> None:
"""
Generates GAMS model under path/<model_name>.gms
Parameters
----------
path : str
Path to the directory which will contain the GAMS model.
"""
if options is not None and not isinstance(options, gp.Options):
raise ValidationError(
f"`options` must be of type gp.Options of found {type(options)}"
)
converter = GamsConverter(self, path)
converter.convert(options=options)
[docs]
def toLatex(self, path: str, generate_pdf: bool = False) -> None:
"""
Generates a latex file that contains the model definition under path/<model_name>.gms
Parameters
----------
path : str
Path to the directory which will contain the .tex file.
"""
converter = LatexConverter(self, path)
converter.convert()
if generate_pdf:
converter.to_pdf()