from __future__ import annotations
import io
import logging
import os
import platform
import signal
import uuid
from enum import Enum
from typing import TYPE_CHECKING
from gams.core.gmd import gmdCloseLicenseSession
import gamspy as gp
import gamspy._algebra.expression as expression
import gamspy._algebra.operation as operation
import gamspy._symbols.implicits as implicits
import gamspy._validation as validation
import gamspy.utils as utils
from gamspy._backend.backend import backend_factory
from gamspy._model_instance import ModelInstance
from gamspy.exceptions import GamspyException, ValidationError
if TYPE_CHECKING:
from typing import Iterable, Literal
import pandas as pd
from gamspy import Container, Equation, Parameter, Variable
from gamspy._algebra.expression import Expression
from gamspy._algebra.operation import Operation
from gamspy._backend.engine import EngineClient
from gamspy._backend.neos import NeosClient
from gamspy._options import ModelInstanceOptions, Options
from gamspy._symbols.implicits import ImplicitParameter
IS_MIRO_INIT = os.getenv("MIRO", False)
logger = logging.getLogger("MODEL")
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter("[%(name)s - %(levelname)s] %(message)s")
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
[docs]
class Problem(Enum):
"""An enumeration for problem all problem types"""
LP = "LP"
NLP = "NLP"
QCP = "QCP"
DNLP = "DNLP"
MIP = "MIP"
RMIP = "RMIP"
MINLP = "MINLP"
RMINLP = "RMINLP"
MIQCP = "MIQCP"
RMIQCP = "RMIQCP"
MCP = "MCP"
CNS = "CNS"
MPEC = "MPEC"
RMPEC = "RMPEC"
EMP = "EMP"
MPSGE = "MPSGE"
@classmethod
def values(cls):
"""Convenience function to return all values of enum"""
return list(cls._value2member_map_.keys())
def __str__(self) -> str:
return self.value
[docs]
class Sense(Enum):
"""An enumeration for sense types"""
MIN = "MIN"
MAX = "MAX"
FEASIBILITY = "FEASIBILITY"
@classmethod
def values(cls):
"""Convenience function to return all values of enum"""
return list(cls._value2member_map_.keys())
def __str__(self) -> str:
return self.value
[docs]
class ModelStatus(Enum):
"""An enumeration for model status types"""
OptimalGlobal = 1
OptimalLocal = 2
Unbounded = 3
InfeasibleGlobal = 4
InfeasibleLocal = 5
InfeasibleIntermed = 6
Feasible = 7
Integer = 8
NonIntegerIntermed = 9
IntegerInfeasible = 10
LicenseError = 11
ErrorUnknown = 12
ErrorNoSolution = 13
NoSolutionReturned = 14
SolvedUnique = 15
Solved = 16
SolvedSingular = 17
UnboundedNoSolution = 18
InfeasibleNoSolution = 19
[docs]
class SolveStatus(Enum):
"""An enumeration for solve status types"""
NormalCompletion = 1
IterationInterrupt = 2
ResourceInterrupt = 3
TerminatedBySolver = 4
EvaluationInterrupt = 5
CapabilityError = 6
LicenseError = 7
UserInterrupt = 8
SetupError = 9
SolverError = 10
InternalError = 11
Skipped = 12
SystemError = 13
INTERRUPT_STATUS = [
SolveStatus.IterationInterrupt,
SolveStatus.ResourceInterrupt,
SolveStatus.EvaluationInterrupt,
SolveStatus.UserInterrupt,
]
ERROR_STATUS = [
SolveStatus.CapabilityError,
SolveStatus.LicenseError,
SolveStatus.SetupError,
SolveStatus.SolverError,
SolveStatus.InternalError,
SolveStatus.SystemError,
SolveStatus.TerminatedBySolver,
]
# GAMS name -> GAMSPy name
attribute_map = {
"domUsd": "num_domain_violations",
"etAlg": "algorithm_time",
"etSolve": "total_solve_time",
"etSolver": "total_solver_time",
"iterUsd": "num_iterations",
"marginals": "marginals",
"maxInfes": "max_infeasibility",
"meanInfes": "mean_infeasibility",
"modelStat": "status",
"nodUsd": "num_nodes_used",
"numDepnd": "num_dependencies",
"numDVar": "num_discrete_variables",
"numEqu": "num_equations",
"numInfes": "num_infeasibilities",
"numNLIns": "num_nonlinear_insts",
"numNLNZ": "num_nonlinear_zeros",
"numNOpt": "num_nonoptimalities",
"numNZ": "num_nonzeros",
"numRedef": "num_mcp_redefinitions",
"numVar": "num_variables",
"numVarProj": "num_bound_projections",
"objEst": "objective_estimation",
"objVal": "objective_value",
"procUsed": "used_model_type",
"resGen": "model_generation_time",
"resUsd": "solve_model_time",
"solveStat": "solve_status",
"sumInfes": "sum_infeasibilities",
"sysVer": "solver_version",
}
[docs]
class Model:
"""
Represents a list of equations to be solved.
Parameters
----------
container : Container
Container of the model.
name : str, optional
Name of the model. Name is autogenerated by default.
equations : str | Iterable
List of Equation objects or str. ``all`` as a string represents
all the equations specified before the creation of this model.
problem : Problem or str, optional
'LP', 'NLP', 'QCP', 'DNLP', 'MIP', 'RMIP', 'MINLP', 'RMINLP', 'MIQCP', 'RMIQCP', 'MCP', 'CNS', 'MPEC', 'RMPEC', 'EMP', or 'MPSGE',
by default Problem.LP.
sense : Sense, optional
"MIN", "MAX", or "FEASIBILITY".
objective : Variable | Expression, optional
Objective variable to minimize or maximize or objective itself.
limited_variables : Iterable, optional
Allows limiting the domain of variables used in a model.
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> v = gp.Variable(m, "v")
>>> e = gp.Equation(m, "e", definition= v == 5)
>>> my_model = gp.Model(m, "my_model", "LP", [e])
"""
# Prefix for auto-generated symbols
_generate_prefix = "autogenerated_"
def __init__(
self,
container: Container,
name: str | None = None,
problem: Problem | str = Problem.LP,
equations: list[Equation] = [],
sense: Sense | str | None = None,
objective: Variable | Expression | None = None,
matches: dict | None = None,
limited_variables: Iterable[Variable] | None = None,
):
self._auto_id = str(uuid.uuid4()).replace("-", "_")
if name is not None:
name = validation.validate_name(name)
self.name = validation.validate_model_name(name)
else:
self.name = self._auto_id
self.container = container
self.problem, self.sense = self._validate_model(
equations, problem, sense
)
self.equations = list(equations)
self._objective_variable = self._set_objective_variable(objective)
self._matches = matches
self._limited_variables = limited_variables
self.container._add_statement(self)
# allow freezing
self._is_frozen = False
# Attributes
self.num_domain_violations = None
self.algorithm_time = None
self.total_solve_time = None
self.total_solver_time = None
self.num_iterations = None
self.marginals = None
self.max_infeasibility = None
self.mean_infeasibility = None
self.status: ModelStatus | None = None
self.num_nodes_used = None
self.num_dependencies = None
self.num_discrete_variables = None
self.num_infeasibilities = None
self.num_nonlinear_insts = None
self.num_nonlinear_zeros = None
self.num_nonoptimalities = None
self.num_nonzeros = None
self.num_mcp_redefinitions = None
self.num_variables = None
self.num_bound_projections = None
self.objective_estimation = None
self.objective_value = None
self.used_model_type = None
self.model_generation_time = None
self.solve_model_time = None
self.sum_infeasibilities = None
self.solve_status: SolveStatus | None = None
self.solver_version = None
self._infeasibility_tolerance: float | None = None
self.container._synch_with_gams()
def __repr__(self) -> str:
return f"<Model `{self.name}` ({hex(id(self))})>"
def __str__(self) -> str:
return (
f"Model {self.name}:\n Problem Type: {self.problem}\n Sense:"
f" {self.sense}\n Equations: {self.equations}"
)
def _generate_obj_var_and_equation(self):
variable = gp.Variable._constructor_bypass(
self.container,
f"{Model._generate_prefix}variable_{self._auto_id}",
domain=[],
)
equation = gp.Equation._constructor_bypass(
self.container,
f"{Model._generate_prefix}equation_{self._auto_id}",
domain=[],
)
return variable, equation
def _set_objective_variable(
self,
assignment: None | (Variable | Operation | Expression) = None,
) -> Variable | None:
"""
Returns objective variable. If the assignment is an Expression
or an Operation (Sum, Product etc.), it automatically generates
an objective variable and a equation.
Returns
-------
Variable | None
Raises
------
TypeError
In case assignment is not a Variable, Expression or an Operation.
"""
if assignment is not None and not isinstance(
assignment,
(gp.Variable, expression.Expression, operation.Operation),
):
raise TypeError(
"Objective must be a Variable or an Expression but"
f" {type(assignment)} given"
)
if self.sense == gp.Sense.FEASIBILITY:
if assignment is not None:
raise ValidationError(
"Cannot set an objective when the sense is FEASIBILITY!"
)
if self.problem in [gp.Problem.CNS, gp.Problem.MCP]:
raise ValidationError(
"Problem type cannot be CNS or MCP when the sense is"
" FEASIBILITY"
)
variable, equation = self._generate_obj_var_and_equation()
statement = expression.Expression(
implicits.ImplicitEquation(
equation,
name=equation.name,
type=equation.type,
domain=[],
),
"..",
variable == 0,
)
self.container._add_statement(statement)
equation._definition = statement
equation.modified = False
equation._is_dirty = False
variable.modified = False
self.equations.append(equation)
return variable
if isinstance(
assignment, (expression.Expression, operation.Operation)
):
variable, equation = self._generate_obj_var_and_equation()
# Sum((i,j),c[i,j]*x[i,j])->Sum((i,j),c[i,j]*x[i,j]) =e= var
assignment = assignment == variable
# equation .. Sum((i,j),c[i,j]*x[i,j]) =e= var
statement = expression.Expression(
implicits.ImplicitEquation(
equation,
name=equation.name,
type=equation.type,
domain=[],
),
"..",
assignment,
)
self.container._add_statement(statement)
equation._definition = statement
equation.modified = False
equation._is_dirty = False
variable.modified = False
self.equations.append(equation)
return variable
return assignment
def _validate_model(self, equations, problem, sense=None) -> tuple:
if isinstance(problem, str):
if problem.upper() not in gp.Problem.values():
raise ValueError(
f"Allowed problem types: {gp.Problem.values()} but found"
f" {problem}."
)
problem = gp.Problem(problem.upper())
if isinstance(sense, str):
if sense.upper() not in gp.Sense.values():
raise ValueError(
f"Allowed sense values: {gp.Sense.values()} but found"
f" {sense}."
)
sense = gp.Sense(sense.upper())
if (
problem not in [Problem.CNS, Problem.MCP]
and not isinstance(equations, (list, tuple))
or any(
not isinstance(equation, gp.Equation) for equation in equations
)
):
raise TypeError(
"equations must be list self.clieof Equation objects"
)
return problem, sense
def _append_solve_string(self) -> None:
solve_string = f"solve {self.name} using {self.problem}"
if self.sense:
if self.sense == gp.Sense.FEASIBILITY:
# Set sense as min or max for feasibility
self.sense = gp.Sense.MIN
solve_string += f" {self.sense}"
if self._objective_variable is not None:
solve_string += f" {self._objective_variable.gamsRepr()}"
self.container._add_statement(solve_string + ";\n")
def _create_model_attributes(self) -> None:
self.container._add_statement("$offListing")
for attr_name in attribute_map:
symbol_name = f"{self._generate_prefix}{attr_name}_{self._auto_id}"
_ = gp.Parameter._constructor_bypass(self.container, symbol_name)
self.container._add_statement(
f"{symbol_name} = {self.name}.{attr_name};"
)
self.container._add_statement("$onListing")
def _update_model_attributes(self) -> None:
container = self.container._temp_container
gdx_handle = utils._open_gdx_file(
self.container.system_directory, self.container._gdx_out
)
for gams_attr, python_attr in attribute_map.items():
symbol_name = f"{self._generate_prefix}{gams_attr}_{self._auto_id}"
data = utils._get_scalar_data(
container._gams2np, gdx_handle, symbol_name
)
if python_attr == "status":
setattr(self, python_attr, ModelStatus(data))
elif python_attr == "solve_status":
status = SolveStatus(data)
setattr(self, python_attr, status)
if status in INTERRUPT_STATUS:
logger.warn(
f"The solve was interrupted! Solve status: {status.name}. "
"For further information, see https://www.gams.com/latest/docs/UG_GAMSOutput.html#UG_GAMSOutput_SolverStatus."
)
elif status in ERROR_STATUS:
raise GamspyException(
f"The model `{self.name}` was not solved successfully!"
f" Solve status: {status.name}. "
"For further information, see https://www.gams.com/latest/docs/UG_GAMSOutput.html#UG_GAMSOutput_SolverStatus",
status.value,
)
else:
setattr(self, python_attr, data)
utils._close_gdx_handle(gdx_handle)
self.container._temp_container.data = {}
def _make_variable_and_equations_dirty(self):
if (
self._objective_variable is not None
and not self._objective_variable.name.startswith(
Model._generate_prefix
)
):
self._objective_variable._is_dirty = True
for equation in self.equations:
if not equation.name.startswith(Model._generate_prefix):
equation._is_dirty = True
if equation._definition is not None:
variables = equation._definition._find_variables()
for name in variables:
if not name.startswith(Model._generate_prefix):
self.container[name]._is_dirty = True
if self._matches:
for equation, variable in self._matches.items():
equation._is_dirty = True
variable._is_dirty = True
[docs]
def compute_infeasibilities(self) -> dict[str, pd.DataFrame]:
"""
Computes infeasabilities for all equations of the model
Returns
-------
dict[str, pd.DataFrame]
Dictionary of infeasibilities where equation names are keys and
infeasibilities are values
"""
infeas_dict = {}
for equation in self.equations:
if equation.records is None:
continue
infeas_rows = utils._calculate_infeasibilities(equation)
infeas_dict[equation.name] = infeas_rows
return infeas_dict
@property
def infeasibility_tolerance(self) -> float | None:
"""
This option sets the tolerance for marking an equation infeasible in
the equation listing. By default, 1.0e-13.
Returns
-------
float | None
"""
return self._infeasibility_tolerance
@infeasibility_tolerance.setter
def infeasibility_tolerance(self, value: float):
self.container._add_statement(f"{self.name}.tolInfRep = {value};")
self._infeasibility_tolerance = value
[docs]
def interrupt(self) -> None:
"""
Sends interrupt signal to the running job.
Raises
------
ValidationError
If the job is not initialized
"""
if platform.system() == "Windows":
self.container._process.send_signal(signal.SIGTERM)
else:
self.container._process.send_signal(signal.SIGINT)
self.container._stop_socket()
[docs]
def freeze(
self,
modifiables: list[Parameter | ImplicitParameter],
freeze_options: dict | None = None,
) -> None:
"""
Freezes all symbols except modifiable symbols.
Parameters
----------
modifiables : List[Parameter | ImplicitParameter]
freeze_options : dict, optional
"""
self.container._synch_with_gams()
self.instance = ModelInstance(
self.container, self, modifiables, freeze_options
)
self._is_frozen = True
[docs]
def unfreeze(self) -> None:
"""Unfreezes the model"""
self._is_frozen = False
gmdCloseLicenseSession(self.instance.instance.sync_db._gmd)
[docs]
def solve(
self,
solver: str | None = None,
options: Options | None = None,
solver_options: dict | None = None,
model_instance_options: ModelInstanceOptions | dict | None = None,
output: io.TextIOWrapper | None = None,
backend: Literal["local", "engine", "neos"] = "local",
client: EngineClient | NeosClient | None = None,
) -> pd.DataFrame | None:
"""
Solves the model with given options.
Parameters
----------
solver : str, optional
Solver name
options : Options, optional
GAMS options
solver_options : dict, optional
Solver options
model_instance_options : optional
Model instance options
output : TextIOWrapper, optional
Output redirection target
backend : str, optional
Backend to run on
client : EngineClient, NeosClient, optional
EngineClient to communicate with GAMS Engine or NEOS Client to communicate with NEOS Server
Returns
-------
DataFrame, optional
Summary of the solve
Raises
------
ValidationError
In case engine_config is not provided for `engine` backend or
neos_client is not provided for `neos` backend.
ValueError
In case problem is not in possible problem types
ValueError
In case sense is different than "MIN" or "MAX"
"""
validation.validate_solver_args(solver, self.problem, options, output)
validation.validate_model(self)
if options is None:
options = self.container._options
options._set_solver_options(
self.container.working_directory,
solver=solver,
problem=self.problem,
solver_options=solver_options,
)
if self._is_frozen:
self.instance.solve(model_instance_options, output)
return None
self._append_solve_string()
self._create_model_attributes()
self._make_variable_and_equations_dirty()
runner = backend_factory(
self.container,
options,
output,
backend,
client,
self,
)
summary = runner.run()
if IS_MIRO_INIT:
self.container._write_default_gdx_miro()
return summary
[docs]
def getDeclaration(self) -> str:
"""
Declaration of the Model in GAMS
Returns
-------
str
Examples
--------
>>> import gamspy as gp
>>> m = gp.Container()
>>> v = gp.Variable(m, "v")
>>> e = gp.Equation(m, "e", definition= v == 5)
>>> my_model = gp.Model(m, "my_model", "LP", [e])
>>> my_model.getDeclaration()
'Model my_model / e /;'
"""
equations = []
for equation in self.equations:
if self._matches:
if equation not in self._matches:
equations.append(equation.gamsRepr())
else:
equations.append(equation.gamsRepr())
equations_str = ",".join(equations)
if self._matches:
matches_str = ",".join(
[
f"{equation.gamsRepr()}.{variable.gamsRepr()}"
for equation, variable in self._matches.items()
]
)
equations_str = (
",".join([equations_str, matches_str])
if equations
else matches_str
)
if self._limited_variables:
limited_variables_str = ",".join(
[variable.gamsRepr() for variable in self._limited_variables]
)
equations_str += "," + limited_variables_str
model_str = f"Model {self.name}"
if equations_str != "":
model_str += f" / {equations_str} /"
model_str += ";"
return model_str