Source code for gamspy._model

from __future__ import annotations

import io
import logging
import os
import uuid
from enum import Enum
from typing import TYPE_CHECKING

from gams.core.gmd import gmdCloseLicenseSession

import gamspy as gp
import gamspy._algebra.expression as expression
import gamspy._algebra.operation as operation
import gamspy._symbols.implicits as implicits
import gamspy._validation as validation
import gamspy.utils as utils
from gamspy._backend.backend import backend_factory
from gamspy._convert import GamsConverter, LatexConverter
from gamspy._model_instance import ModelInstance
from gamspy._options import EXECUTION_OPTIONS, MODEL_ATTR_OPTION_MAP, Options
from gamspy.exceptions import GamspyException, ValidationError

if TYPE_CHECKING:
    from typing import Iterable, Literal

    import pandas as pd

    from gamspy import Container, Equation, Parameter, Variable
    from gamspy._algebra.expression import Expression
    from gamspy._algebra.operation import Operation
    from gamspy._backend.engine import EngineClient
    from gamspy._backend.neos import NeosClient
    from gamspy._options import ModelInstanceOptions
    from gamspy._symbols.implicits import ImplicitParameter
    from gamspy._symbols.symbol import Symbol

IS_MIRO_INIT = os.getenv("MIRO", False)

logger = logging.getLogger("MODEL")
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter("[%(name)s - %(levelname)s] %(message)s")
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)


[docs] class Problem(Enum): """An enumeration for problem all problem types""" LP = "LP" NLP = "NLP" QCP = "QCP" DNLP = "DNLP" MIP = "MIP" RMIP = "RMIP" MINLP = "MINLP" RMINLP = "RMINLP" MIQCP = "MIQCP" RMIQCP = "RMIQCP" MCP = "MCP" CNS = "CNS" MPEC = "MPEC" RMPEC = "RMPEC" EMP = "EMP" MPSGE = "MPSGE" @classmethod def values(cls): """Convenience function to return all values of enum""" return list(cls._value2member_map_.keys()) def __str__(self) -> str: return self.value
[docs] class Sense(Enum): """An enumeration for sense types""" MIN = "MIN" MAX = "MAX" FEASIBILITY = "FEASIBILITY" @classmethod def values(cls): """Convenience function to return all values of enum""" return list(cls._value2member_map_.keys()) def __str__(self) -> str: return self.value
[docs] class ModelStatus(Enum): """An enumeration for model status types""" OptimalGlobal = 1 OptimalLocal = 2 Unbounded = 3 InfeasibleGlobal = 4 InfeasibleLocal = 5 InfeasibleIntermed = 6 Feasible = 7 Integer = 8 NonIntegerIntermed = 9 IntegerInfeasible = 10 LicenseError = 11 ErrorUnknown = 12 ErrorNoSolution = 13 NoSolutionReturned = 14 SolvedUnique = 15 Solved = 16 SolvedSingular = 17 UnboundedNoSolution = 18 InfeasibleNoSolution = 19
[docs] class SolveStatus(Enum): """An enumeration for solve status types""" NormalCompletion = 1 IterationInterrupt = 2 ResourceInterrupt = 3 TerminatedBySolver = 4 EvaluationInterrupt = 5 CapabilityError = 6 LicenseError = 7 UserInterrupt = 8 SetupError = 9 SolverError = 10 InternalError = 11 Skipped = 12 SystemError = 13
INTERRUPT_STATUS = [ SolveStatus.IterationInterrupt, SolveStatus.ResourceInterrupt, SolveStatus.EvaluationInterrupt, SolveStatus.UserInterrupt, SolveStatus.TerminatedBySolver, ] ERROR_STATUS = { SolveStatus.CapabilityError: "The solver does not have the capability required by the model.", SolveStatus.LicenseError: "The solver cannot find the appropriate license key needed to use a specific subsolver.", SolveStatus.SetupError: "The solver encountered a fatal failure during problem set-up time.", SolveStatus.SolverError: "The solver encountered a fatal error.", SolveStatus.InternalError: "The solver encountered an internal fatal error.", SolveStatus.SystemError: "This indicates a completely unknown or unexpected error condition.", } # GAMS name -> GAMSPy name ATTRIBUTE_MAP = { "domUsd": "num_domain_violations", "etAlg": "algorithm_time", "etSolve": "total_solve_time", "etSolver": "total_solver_time", "iterUsd": "num_iterations", "marginals": "marginals", "maxInfes": "max_infeasibility", "meanInfes": "mean_infeasibility", "modelStat": "status", "nodUsd": "num_nodes_used", "numDepnd": "num_dependencies", "numDVar": "num_discrete_variables", "numEqu": "num_equations", "numInfes": "num_infeasibilities", "numNLIns": "num_nonlinear_insts", "numNLNZ": "num_nonlinear_zeros", "numNOpt": "num_nonoptimalities", "numNZ": "num_nonzeros", "numRedef": "num_mcp_redefinitions", "numVar": "num_variables", "numVarProj": "num_bound_projections", "objEst": "objective_estimation", "objVal": "objective_value", "procUsed": "used_model_type", "resGen": "model_generation_time", "resUsd": "solve_model_time", "solveStat": "solve_status", "sumInfes": "sum_infeasibilities", "sysVer": "solver_version", }
[docs] class Model: """ Represents a list of equations to be solved. Parameters ---------- container : Container Container of the model. name : str, optional Name of the model. Name is autogenerated by default. equations : Iterable[Equation] Iterable of Equation objects. problem : Problem or str, optional 'LP', 'NLP', 'QCP', 'DNLP', 'MIP', 'RMIP', 'MINLP', 'RMINLP', 'MIQCP', 'RMIQCP', 'MCP', 'CNS', 'MPEC', 'RMPEC', 'EMP', or 'MPSGE', by default Problem.LP. sense : Sense, optional "MIN", "MAX", or "FEASIBILITY". objective : Variable | Expression, optional Objective variable to minimize or maximize or objective itself. matches : dict[Equation, Variable] Equation - Variable matches for MCP models. limited_variables : Iterable, optional Allows limiting the domain of variables used in a model. external_module: str, optional The name of the external module in which the external equations are implemented Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> v = gp.Variable(m, "v") >>> e = gp.Equation(m, "e", definition= v == 5) >>> my_model = gp.Model(m, "my_model", "LP", [e]) """ # Prefix for auto-generated symbols _generate_prefix = "autogenerated_" def __init__( self, container: Container, name: str | None = None, problem: Problem | str = Problem.LP, equations: Iterable[Equation] = [], sense: Sense | str | None = None, objective: Variable | Expression | None = None, matches: dict[Equation, Variable] | None = None, limited_variables: Iterable[Variable] | None = None, external_module: str | None = None, ): self._auto_id = "m" + str(uuid.uuid4()).replace("-", "_") if name is not None: name = validation.validate_name(name) self.name = validation.validate_model_name(name) else: self.name = self._auto_id self.container = container self._matches = matches self.problem, self.sense = validation.validate_model( equations, problem, sense ) self.equations = list(equations) self._objective_variable = self._set_objective_variable(objective) if ( self._objective_variable is not None and self._objective_variable.type.lower() != "free" ): raise ValidationError( f"Objective variable `{self._objective_variable}` must be a free variable" ) self._limited_variables = limited_variables if not self.equations and not self._matches: raise ValidationError("Model requires at least one equation.") self._external_module_file = None self._external_module = None if external_module is not None: self.external_module = external_module else: # To avoid adding it twice self.container._add_statement(self) # allow freezing self._is_frozen = False # Attributes self.num_domain_violations = None self.algorithm_time = None self.total_solve_time = None self.total_solver_time = None self.num_iterations = None self.marginals = None self.max_infeasibility = None self.mean_infeasibility = None self.status: ModelStatus | None = None self.num_nodes_used = None self.num_dependencies = None self.num_discrete_variables = None self.num_infeasibilities = None self.num_nonlinear_insts = None self.num_nonlinear_zeros = None self.num_nonoptimalities = None self.num_nonzeros = None self.num_mcp_redefinitions = None self.num_variables = None self.num_bound_projections = None self.objective_estimation = None self.objective_value = None self.used_model_type = None self.model_generation_time = None self.solve_model_time = None self.sum_infeasibilities = None self.solve_status: SolveStatus | None = None self.solver_version = None self._infeasibility_tolerance: float | None = None self.container._synch_with_gams() def __repr__(self) -> str: return f"Model(name={self.name}, problem={self.problem}, equations={self.equations}, sense={self.sense}, objective={self._objective_variable}, matches={self._matches}, limited_variables={self._limited_variables}" def __str__(self) -> str: return ( f"Model {self.name}:\n Problem Type: {self.problem}\n Sense:" f" {self.sense}\n Equations: {self.equations}" ) @property def external_module(self) -> str | None: """ Name of the external module in which the external equations are implemented. By default, this parameter is set to None. When provided, it triggers the opening of the specified file using a File statement and incorporates the file into the model by adding it as an external module. This feature requires a solid understanding of programming, compilation, and linking processes. For more information, please refer to the https://www.gams.com/latest/docs/UG_ExternalEquations.html . Returns ------- str | None """ return self._external_module @external_module.setter def external_module(self, value: str | None): if self._external_module_file is not None: self.container._add_statement( f"putclose {self._external_module_file};" ) write_model_statement = value != self._external_module self._external_module = None self._external_module_file = None if value is not None: filename = "f" + str(uuid.uuid4()).replace("-", "_") self._external_module_file = filename self._external_module = value self.container._add_statement(f"File {filename} / '{value}' /;") if write_model_statement: self.container._add_statement(self) def _generate_obj_var_and_equation(self) -> tuple[Variable, Equation]: variable = gp.Variable._constructor_bypass( self.container, f"{self.name}_objective_variable", domain=[], ) equation = gp.Equation._constructor_bypass( self.container, f"{self.name}_objective", domain=[], ) return variable, equation def _set_objective_variable( self, assignment: None | Variable | Operation | Expression = None, ) -> Variable | None: """ Returns objective variable. If the assignment is an Expression or an Operation (Sum, Product etc.), it automatically generates an objective variable and a equation. Returns ------- Variable | None Raises ------ TypeError In case assignment is not a Variable, Expression or an Operation. """ if assignment is not None and not isinstance( assignment, (gp.Variable, expression.Expression, operation.Operation), ): raise TypeError( "Objective must be a Variable or an Expression but" f" {type(assignment)} given" ) if self.sense == gp.Sense.FEASIBILITY: if assignment is not None: raise ValidationError( "Cannot set an objective when the sense is FEASIBILITY!" ) if self.problem in [gp.Problem.CNS, gp.Problem.MCP]: raise ValidationError( "Problem type cannot be CNS or MCP when the sense is" " FEASIBILITY" ) variable, equation = self._generate_obj_var_and_equation() statement = expression.Expression( implicits.ImplicitEquation( equation, name=equation.name, type=equation.type, domain=[], ), "..", variable == 0, ) self.container._add_statement(statement) equation._definition = statement equation.modified = False variable.modified = False if equation.name not in [symbol.name for symbol in self.equations]: self.equations.append(equation) return variable if isinstance( assignment, (expression.Expression, operation.Operation) ): variable, equation = self._generate_obj_var_and_equation() # Sum((i,j),c[i,j]*x[i,j])->Sum((i,j),c[i,j]*x[i,j]) =e= var assignment = assignment == variable # equation .. Sum((i,j),c[i,j]*x[i,j]) =e= var statement = expression.Expression( implicits.ImplicitEquation( equation, name=equation.name, type=equation.type, domain=[], ), "..", assignment, ) self.container._add_statement(statement) equation._definition = statement equation.modified = False variable.modified = False if equation.name not in [symbol.name for symbol in self.equations]: self.equations.append(equation) return variable return assignment def _generate_solve_string(self) -> str: solve_string = f"solve {self.name} using {self.problem}" if self.sense: if self.sense == gp.Sense.FEASIBILITY: # Set sense as min or max for feasibility self.sense = gp.Sense.MIN # type: ignore solve_string += f" {self.sense}" if self._objective_variable is not None: solve_string += f" {self._objective_variable.gamsRepr()}" return solve_string + ";" def _add_model_attr_options(self, options: Options) -> None: for key, value in options.model_dump(exclude_none=True).items(): if key in MODEL_ATTR_OPTION_MAP: if isinstance(value, bool): value = int(value) elif isinstance(value, str): value = f"'{value}'" self.container._add_statement( f"{self.name}.{MODEL_ATTR_OPTION_MAP[key]} = {value};\n" ) elif key in EXECUTION_OPTIONS: self.container._add_statement( f"{EXECUTION_OPTIONS[key]} '{value}';\n" ) def _append_solve_string(self) -> None: solve_string = self._generate_solve_string() self.container._add_statement(solve_string + "\n") def _create_model_attributes(self) -> None: self.container._add_statement("$offListing") for attr_name in ATTRIBUTE_MAP: symbol_name = f"{self._generate_prefix}{attr_name}_{self._auto_id}" _ = gp.Parameter._constructor_bypass(self.container, symbol_name) self.container._add_statement( f"{symbol_name} = {self.name}.{attr_name};" ) self.container._add_statement("$onListing") def _update_model_attributes(self) -> None: container = self.container._temp_container gdx_handle = utils._open_gdx_file( self.container.system_directory, self.container._gdx_out ) for gams_attr, python_attr in ATTRIBUTE_MAP.items(): symbol_name = f"{self._generate_prefix}{gams_attr}_{self._auto_id}" data = utils._get_scalar_data( container._gams2np, gdx_handle, symbol_name ) if python_attr == "status": setattr(self, python_attr, ModelStatus(data)) elif python_attr == "solve_status": status = SolveStatus(data) setattr(self, python_attr, status) if status in INTERRUPT_STATUS: logger.warn( f"The solve was interrupted! Solve status: {status.name}. " "For further information, see https://www.gams.com/latest/docs/UG_GAMSOutput.html#UG_GAMSOutput_SolverStatus." ) elif status in ERROR_STATUS: raise GamspyException( f"Solve status: {status.name}. {ERROR_STATUS[status]}", status.value, ) else: setattr(self, python_attr, data) utils._close_gdx_handle(gdx_handle) self.container._temp_container.data = {}
[docs] def computeInfeasibilities(self) -> dict[str, pd.DataFrame]: """ Computes infeasabilities for all equations of the model Returns ------- dict[str, pd.DataFrame] Dictionary of infeasibilities where equation names are keys and infeasibilities are values Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> i = gp.Set(m, name="i", records=["i1", "i2"]) >>> j = gp.Set(m, name="j", records=["j1", "j2", "j3"]) >>> a = gp.Parameter(m, name="a", domain=i, records=[("i1", 350), ("i2", 600)]) >>> b = gp.Parameter(m, name="b", domain=j, records=[("j1", 400), ("j2", 450), ("j3", 420)]) >>> x = gp.Variable(m, name="x", domain=[i,j], type="Positive") >>> s = gp.Equation(m, name="s", domain=i) >>> d = gp.Equation(m, name="d", domain=j) >>> s[i] = gp.Sum(j, x[i, j]) <= a[i] >>> d[j] = gp.Sum(i, x[i, j]) >= b[j] >>> my_model = gp.Model(m, name="my_model", equations=m.getEquations(), problem="LP", sense="min", objective=gp.Sum((i, j), x[i, j])) >>> summary = my_model.solve() >>> infeasibilities = my_model.computeInfeasibilities() >>> infeasibilities["s"].infeasibility.item() 320.0 """ infeas_dict = {} for equation in self.equations: if equation.records is None: continue infeas_rows = utils._calculate_infeasibilities(equation) infeas_dict[equation.name] = infeas_rows return infeas_dict
[docs] def getEquationListing( self, n: int | None = None, infeasibility_threshold: float | None = None, ) -> list[str]: """ Returns the generated equations. Parameters ---------- n : int | None, optional Number of equations to be returned. infeasibility_threshold: float, optional Filters out equations with infeasibilities that are above this value. Returns ------- list[str] """ listings = [] for equation in self.equations: listings += equation.getEquationListing( infeasibility_threshold=infeasibility_threshold ) return listings[:n]
[docs] def getVariableListing(self, n: int | None = None) -> list[str]: """ Returns the variable listing. Parameters ---------- n : int | None, optional Number of equations to be returned. Returns ------- list[str] """ if not hasattr(self, "_variables"): raise ValidationError( "The model must be solved with `variable_listing_limit` option for this functionality to work." ) listings = [] for variable in self._variables: listings += variable.getVariableListing() return listings[:n]
@property def infeasibility_tolerance(self) -> float | None: """ This option sets the tolerance for marking an equation infeasible in the equation listing. By default, 1.0e-13. Returns ------- float | None """ return self._infeasibility_tolerance @infeasibility_tolerance.setter def infeasibility_tolerance(self, value: float): self.container._add_statement(f"{self.name}.tolInfRep = {value};") self._infeasibility_tolerance = value
[docs] def interrupt(self) -> None: """ Sends interrupt signal to the running job. Raises ------ ValidationError If the job is not initialized """ self.container._stop_socket()
[docs] def freeze( self, modifiables: list[Parameter | ImplicitParameter], options: Options | None = None, ) -> None: """ Freezes all symbols except modifiable symbols. Parameters ---------- modifiables : List[Parameter | ImplicitParameter] freeze_options : dict, optional Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> a = gp.Parameter(m, name="a", records=10) >>> x = gp.Variable(m, name="x") >>> e = gp.Equation(m, name="e", definition= x <= a) >>> my_model = gp.Model(m, name="my_model", equations=m.getEquations(), problem="LP", sense="max", objective=x) >>> solved = my_model.solve() >>> float(x.toValue()) 10.0 >>> my_model.freeze(modifiables=[a]) >>> a.setRecords(35) >>> solved = my_model.solve() >>> float(x.toValue()) 35.0 """ self._is_frozen = True self.instance = ModelInstance( self.container, self, modifiables, options )
[docs] def unfreeze(self) -> None: """Unfreezes the model""" self._is_frozen = False gmdCloseLicenseSession(self.instance.instance.sync_db._gmd)
[docs] def solve( self, solver: str | None = None, options: Options | None = None, solver_options: dict | None = None, model_instance_options: ModelInstanceOptions | dict | None = None, output: io.TextIOWrapper | None = None, backend: Literal["local", "engine", "neos"] = "local", client: EngineClient | NeosClient | None = None, load_symbols: list[Symbol] | None = None, ) -> pd.DataFrame | None: """ Solves the model with given options. Parameters ---------- solver : str, optional Solver name options : Options, optional GAMS options solver_options : dict, optional Solver options model_instance_options : optional Model instance options output : TextIOWrapper, optional Output redirection target backend : str, optional Backend to run on client : EngineClient, NeosClient, optional EngineClient to communicate with GAMS Engine or NEOS Client to communicate with NEOS Server Returns ------- DataFrame, optional Summary of the solve Raises ------ ValidationError In case engine_config is not provided for `engine` backend or neos_client is not provided for `neos` backend. ValueError In case problem is not in possible problem types ValueError In case sense is different than "MIN" or "MAX" Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> v = gp.Variable(m, "v") >>> e = gp.Equation(m, "e", definition= v == 5) >>> my_model = gp.Model(m, "my_model", "LP", [e], "max", v) >>> solved = my_model.solve() """ validation.validate_solver_args( self.container.system_directory, solver, self.problem, options, output, load_symbols, ) validation.validate_equations(self) if options is None: options = ( self.container._options if self.container._options else gp.Options() ) options._set_solver_options( working_directory=self.container.working_directory, solver=solver, problem=self.problem, solver_options=solver_options, ) self._add_model_attr_options(options) if self._is_frozen: self.instance.solve(solver, model_instance_options, output) return None self._append_solve_string() self._create_model_attributes() runner = backend_factory( self.container, options, output, backend, client, self, load_symbols, ) summary = runner.run() if IS_MIRO_INIT: self.container._write_default_gdx_miro() return summary
[docs] def getDeclaration(self) -> str: """ Declaration of the Model in GAMS Returns ------- str Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> v = gp.Variable(m, "v") >>> e = gp.Equation(m, "e", definition= v == 5) >>> my_model = gp.Model(m, "my_model", "LP", [e]) >>> my_model.getDeclaration() 'Model my_model / e /;' """ equations = [] for equation in self.equations: if self._matches: if equation not in self._matches: equations.append(equation.name) else: equations.append(equation.name) equations_str = ",".join(equations) if self._matches: matches_str = ",".join( [ f"{equation.name}.{variable.name}" for equation, variable in self._matches.items() ] ) equations_str = ( ",".join([equations_str, matches_str]) if equations else matches_str ) if self._external_module_file: equations_str = ",".join( [equations_str, self._external_module_file] ) if self._limited_variables: limited_variables_str = ",".join( [variable.gamsRepr() for variable in self._limited_variables] ) equations_str += "," + limited_variables_str model_str = f"Model {self.name}" if equations_str != "": model_str += f" / {equations_str} /" model_str += ";" return model_str
[docs] def toGams(self, path: str, options: Options | None = None) -> None: """ Generates GAMS model under path/<model_name>.gms Parameters ---------- path : str Path to the directory which will contain the GAMS model. """ if options is not None and not isinstance(options, gp.Options): raise ValidationError( f"`options` must be of type gp.Options of found {type(options)}" ) converter = GamsConverter(self, path) converter.convert(options=options)
[docs] def toLatex(self, path: str, generate_pdf: bool = False) -> None: """ Generates a latex file that contains the model definition under path/<model_name>.gms Parameters ---------- path : str Path to the directory which will contain the .tex file. """ converter = LatexConverter(self, path) converter.convert() if generate_pdf: converter.to_pdf()