Source code for gamspy._container

#
# GAMS - General Algebraic Modeling System Python API
#
# Copyright (c) 2023 GAMS Development Corp. <support@gams.com>
# Copyright (c) 2023 GAMS Software GmbH <support@gams.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
from __future__ import annotations

import io
import os
import shutil
import uuid
from typing import Any
from typing import List
from typing import Literal
from typing import Optional
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union

import gams.transfer as gt
import pandas as pd
from gams import DebugLevel
from gams import GamsCheckpoint
from gams import GamsJob
from gams import GamsOptions
from gams import GamsWorkspace
from gams.control.workspace import GamsException
from gams.control.workspace import GamsExceptionExecution
from gams.core import gdx

import gamspy as gp
import gamspy.utils as utils
from gamspy._model import ModelStatus
from gamspy._neos import NeosClient
from gamspy._options import _mapOptions
from gamspy.exceptions import customize_exception
from gamspy.exceptions import GamspyException

if TYPE_CHECKING:
    from gamspy import (
        Alias,
        Set,
        Parameter,
        Variable,
        Equation,
        EquationType,
        Model,
    )
    from gamspy._algebra.expression import Expression
    from gamspy._engine import EngineConfig
    from gamspy._options import Options


[docs]class Container(gt.Container): """ A container is an object that holds all symbols and operates on them. Parameters ---------- load_from : str, optional Path to the GDX file to be loaded from, by default None system_directory : str, optional Path to the directory that holds the GAMS installation, by default None working_directory : str, optional Path to the working directory to store temporary files such .lst, .gms, .gdx, .g00 files. delayed_execution : bool, optional Delayed execution mode, by default False options : Options Global options for the overall execution Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> i = gp.Set(m, "i") """ def __init__( self, load_from: Optional[str] = None, system_directory: Optional[str] = None, working_directory: Optional[str] = None, delayed_execution: bool = False, options: Optional[Options] = None, ): system_directory = ( system_directory if system_directory else utils._getGAMSPyBaseDirectory() ) self._delayed_execution = delayed_execution self._unsaved_statements: list = [] self._is_first_run = True # import symbols from arbitrary gams code self._import_symbols: List[str] = [] super().__init__(load_from, system_directory) self.workspace = GamsWorkspace( working_directory, self.system_directory, DebugLevel.KeepFiles ) self.working_directory = self.workspace.working_directory ( self._save_to, self._restart_from, self._gdx_in, self._gdx_out, ) = self._setup_paths() # allows interrupt self._job: Optional[GamsJob] = None self._options = options def _addGamsCode( self, gams_code: str, import_symbols: List[str] = [] ) -> None: """ Adds an arbitrary GAMS code to the generate .gms file Parameters ---------- gams_code : str import_symbols : List[str], optional """ if import_symbols is not None and ( not isinstance(import_symbols, list) or any(not isinstance(symbol, str) for symbol in import_symbols) ): raise GamspyException("import_symbols must be a list of strings") self._import_symbols = import_symbols self._unsaved_statements.append(gams_code) def _addStatement(self, statement) -> None: self._unsaved_statements.append(statement) def _cast_symbols(self, symbol_names: Optional[List[str]] = None) -> None: """ Casts all symbols in the GAMS Transfer container to GAMSpy symbols """ symbol_names = symbol_names if symbol_names else list(self.data.keys()) for symbol_name in symbol_names: symbol = self.data[symbol_name] new_domain = [ self.data[set.name] if not isinstance(set, str) else set for set in symbol.domain ] del self.data[symbol_name] if isinstance(symbol, gt.Alias): alias_with = self[symbol.alias_with.name] _ = gp.Alias( self, symbol.name, alias_with, ) elif isinstance(symbol, gt.UniverseAlias): _ = gp.UniverseAlias( self, symbol.name, ) elif isinstance(symbol, gt.Set): _ = gp.Set( self, symbol.name, new_domain, symbol.is_singleton, symbol._records, symbol.domain_forwarding, symbol.description, ) elif isinstance(symbol, gt.Parameter): _ = gp.Parameter( self, symbol.name, new_domain, symbol._records, symbol.domain_forwarding, symbol.description, ) elif isinstance(symbol, gt.Variable): _ = gp.Variable( self, symbol.name, symbol.type, new_domain, symbol._records, symbol.domain_forwarding, symbol.description, ) elif isinstance(symbol, gt.Equation): symbol_type = symbol.type if symbol.type in ["eq", "leq", "geq"]: symbol_type = "regular" _ = gp.Equation( container=self, name=symbol.name, type=symbol_type, domain=new_domain, records=symbol._records, domain_forwarding=symbol.domain_forwarding, description=symbol.description, ) def _get_symbol_names_from_gdx(self, load_from: str) -> List[str]: gdx_handle = utils._openGdxFile(self.system_directory, load_from) _, symbol_count, _ = gdx.gdxSystemInfo(gdx_handle) symbol_names = [] for i in range(1, symbol_count + 1): _, symbol_name, _, _ = gdx.gdxSymbolInfo(gdx_handle, i) symbol_names.append(symbol_name) utils._closeGdxHandle(gdx_handle) return symbol_names def _get_symbol_names_to_load( self, load_from: str, symbol_names: Optional[List[str]] = None, ) -> List[str]: if symbol_names is None or symbol_names == []: symbol_names = self._get_symbol_names_from_gdx(load_from) return symbol_names def _interrupt(self) -> None: """ Sends interrupt signal to the running job. Raises ------ GamspyException If the job is not initialized """ if self._job: self._job.interrupt() else: raise GamspyException("There is no job initialized.") def _setup_paths( self, ) -> Tuple[GamsCheckpoint, GamsCheckpoint, str, str]: """ Sets up the paths for .g00, and .gdx files. Parameters ---------- working_directory : str, optional Returns ------- Tuple[GamsCheckpoint, GamsCheckpoint, str, str] save_to, restart_from, gdx_in, gdx_out """ suffix = uuid.uuid4() save_to = GamsCheckpoint(self.workspace, f"_save_{suffix}.g00") restart_from = GamsCheckpoint(self.workspace, f"_restart_{suffix}.g00") gdx_in = self.working_directory + os.sep + f"_gdx_in_{suffix}.gdx" gdx_out = self.working_directory + os.sep + f"_gdx_out_{suffix}.gdx" return save_to, restart_from, gdx_in, gdx_out def _get_autogenerated_symbol_names(self) -> List[str]: names = [] for name in self.data.keys(): if name.startswith(gp.Model._generate_prefix): names.append(name) return names def _get_touched_symbol_names(self) -> Tuple[List[str], List[str]]: dirty_names = [] assigned_names = [] for name, symbol in self: if isinstance(symbol, gp.UniverseAlias): continue if symbol._is_dirty: dirty_names.append(name) if symbol._is_assigned: assigned_names.append(name) return dirty_names, assigned_names def _clean_dirty_symbols(self, dirty_names: List[str]): for name in dirty_names: self[name]._is_dirty = False def _update_assigned_state(self, assigned_names: List[str]): for name in assigned_names: self[name]._is_assigned = False def _run( self, options: Optional[GamsOptions] = None, output: Optional[io.TextIOWrapper] = None, backend: Literal["local", "engine", "neos"] = "local", engine_config: Optional[EngineConfig] = None, neos_client: Optional[NeosClient] = None, create_log_file: bool = False, ): if options is None: options = _mapOptions( self.workspace, options=None, global_options=self._options, is_seedable=self._is_first_run, output=output, create_log_file=create_log_file, ) dirty_names, assigned_names = self._get_touched_symbol_names() gams_string = self._generate_gams_string(backend, dirty_names) # Create gdx file to read records from self._clean_dirty_symbols(dirty_names) self._update_assigned_state(assigned_names) self.isValid(verbose=True, force=True) super().write(self._gdx_in, assigned_names) # If there is no restart checkpoint, set it to None checkpoint = self._restart_from if not self._is_first_run else None self._job = GamsJob( self.workspace, job_name=f"_job_{uuid.uuid4()}", source=gams_string, checkpoint=checkpoint, ) if backend == "local": self._run_local(options, output) elif backend == "engine": self._run_engine(options, output, engine_config) elif backend == "neos": self._run_neos(gams_string, options, neos_client) # type: ignore else: self._unsaved_statements = [] raise GamspyException( f"`{backend}` is not a valid backend. Possible backends:" " local, engine, and neos" ) if backend == "neos" and not neos_client.is_blocking: # type: ignore return self.loadRecordsFromGdx( self._gdx_out, dirty_names + self._import_symbols ) self._restart_from, self._save_to = self._save_to, self._restart_from self._is_first_run = False def _run_local( self, options: GamsOptions, output: Union[io.TextIOWrapper, None] ): try: self._job.run( # type: ignore gams_options=options, checkpoint=self._save_to, create_out_db=False, output=output, ) except GamsExceptionExecution as exception: exception = customize_exception( self.workspace, options, self._job, exception ) raise exception finally: self._unsaved_statements = [] if utils._in_notebook(): from IPython.display import display, HTML solve_stat = [ "", "Normal", "Iteration", "Resource", "Solver", "EvalError", "Capability", "License", "User", "SetupErr", "SolverErr", "InternalErr", "Skipped", "SystemErr", ] HEADER = [ "Solver Status", "Model Status", "Objective", "#equ", "#var", "Model Type", "Solver", "Solver Time", ] with open( os.path.join(self.working_directory, "trace.txt") ) as file: lines = file.readlines()[-2:] ( _, _, _, model_type, solver_name, solver_status, model_status, _, _, _, _, num_equations, num_variables, _, _, _, _, _, _, _, _, objective_value, _, ) = lines[0].split(" ") solver_time = lines[1].split(" ")[-2] dataframe = pd.DataFrame( [ [ solve_stat[int(solver_status)], ModelStatus(int(model_status)), objective_value, num_equations, num_variables, model_type, solver_name, solver_time, ] ], columns=HEADER, ) display(HTML(dataframe.to_html())) def _run_engine( self, options: GamsOptions, output: Union[io.TextIOWrapper, None], engine_config: Union[EngineConfig, None], ): options.forcework = 1 # In case GAMS version differs on Engine assert engine_config extra_model_files = engine_config._preprocess_extra_model_files( self.workspace, self._gdx_in ) try: self._job.run_engine( # type: ignore engine_configuration=engine_config._get_engine_config(), extra_model_files=extra_model_files, gams_options=options, checkpoint=self._save_to, output=output, create_out_db=False, engine_options=engine_config.engine_options, remove_results=engine_config.remove_results, ) except GamsException as e: raise GamspyException(str(e)) finally: self._unsaved_statements = [] options.forcework = 0 def _run_neos( self, gams_string: str, options: GamsOptions, client: NeosClient, ): client._prepare_xml( gams_string, self._gdx_in, self._restart_from._checkpoint_file_name, self._save_to.name, options=options, working_directory=self.working_directory, ) job_number, job_password = client.submit_job( is_blocking=client.is_blocking, working_directory=self.working_directory, ) if client.is_blocking: client.download_output( job_number, job_password, working_directory=self.working_directory, ) shutil.move( os.path.join(self.working_directory, "output.gdx"), self._gdx_out, ) @property def delayed_execution(self) -> bool: """ Delayed execution mode. Returns ------- bool """ return self._delayed_execution
[docs] def gamsJobName(self) -> Union[str, None]: """ Returns the name of the latest GAMS job that was executed Returns ------- str | None """ return self._job.name if self._job is not None else None
[docs] def gdxInputName(self) -> str: """ Name of the input gdx file Returns ------- str """ return os.path.basename(self._gdx_in)
[docs] def gdxOutputName(self) -> str: """ Name of the output gdx file Returns ------- str """ return os.path.basename(self._gdx_out)
[docs] def addAlias(self, name: str, alias_with: Union[Set, Alias]) -> Alias: """ Creates a new Alias and adds it to the container Parameters ---------- name : str alias_with : Set | Alias Returns ------- Alias Raises ------ TypeError In case the alias_with is different than a Set or an Alias ValueError If there is symbol with same name but different type in the Container Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> i = m.addSet("i") >>> a = m.addAlias("a", i) """ return gp.Alias(self, name, alias_with)
[docs] def addSet( self, name: str, domain: Optional[List[Union[Set, str]]] = None, is_singleton: bool = False, records: Optional[Any] = None, domain_forwarding: bool = False, description: str = "", uels_on_axes: bool = False, ) -> Set: """ Creates a Set and adds it to the container Parameters ---------- name : str domain : List[Set | str], optional is_singleton : bool, optional records : Any, optional domain_forwarding : bool, optional description : str, optional uels_on_axes : bool, optional Returns ------- Set Raises ------ err In case arguments are not valid ValueError When there is symbol with same name in the Container Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> i = m.addSet("i") """ return gp.Set( self, name, domain, is_singleton, records, domain_forwarding, description, uels_on_axes, )
[docs] def addParameter( self, name: str, domain: Optional[List[Union[str, Set]]] = None, records: Optional[Any] = None, domain_forwarding: bool = False, description: str = "", uels_on_axes: bool = False, ) -> Parameter: """ Creates a Parameter and adds it to the Container Parameters ---------- name : str domain : List[str | Set]], optional records : Any, optional domain_forwarding : bool, optional description : str, optional uels_on_axes : bool, optional Returns ------- Parameter Raises ------ err In case arguments are not valid ValueError If there is symbol with same name but different type in the Container Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> a = m.addParameter("a") """ return gp.Parameter( self, name, domain, records, domain_forwarding, description, uels_on_axes, )
[docs] def addVariable( self, name: str, type: str = "free", domain: Optional[List[Union[str, Set]]] = None, records: Optional[Any] = None, domain_forwarding: bool = False, description: str = "", uels_on_axes: bool = False, ) -> Variable: """ Creates a Variable and adds it to the Container Parameters ---------- name : str type : str, optional domain : List[str | Set]], optional records : Any, optional domain_forwarding : bool, optional description : str, optional uels_on_axes : bool, optional Returns ------- Variable Raises ------ err In case arguments are not valid ValueError If there is symbol with same name but different type in the Container Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> v = m.addVariable("v") """ return gp.Variable( self, name, type, domain, records, domain_forwarding, description, uels_on_axes, )
[docs] def addEquation( self, name: str, type: Union[str, EquationType] = "regular", domain: Optional[List[Union[Set, str]]] = None, definition: Optional[Expression] = None, records: Optional[Any] = None, domain_forwarding: bool = False, description: str = "", uels_on_axes: bool = False, definition_domain: Optional[List[Union[Set, str]]] = None, ) -> Equation: """ Creates an Equation and adds it to the Container Parameters ---------- name : str type : str domain : List[Set | str], optional definition : Definition, optional records : Any, optional domain_forwarding : bool, optional description : str, optional uels_on_axes : bool, optional definition_domain : List[Set | str], optional Returns ------- Equation Raises ------ err In case arguments are not valid ValueError If there is symbol with same name but different type in the Container Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> i = m.addEquation("i") """ return gp.Equation( self, name, type, domain, definition, records, domain_forwarding, description, uels_on_axes, definition_domain, )
[docs] def addModel( self, name: str, problem: str, equations: List[Equation], sense: Optional[Literal["MIN", "MAX"]] = None, objective: Optional[Union[Variable, Expression]] = None, matches: Optional[dict] = None, limited_variables: Optional[list] = None, ) -> Model: """ Creates a Model and adds it to the Container Parameters ---------- name : str equations : List[Equation] problem : str sense : Optional[Literal[MIN, MAX]], optional objective : Optional[Union[Variable, Expression]], optional matches : Optional[dict], optional limited_variables : Optional[list], optional Returns ------- Model Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> e = gp.Equation(m, "e") >>> model = m.addModel("my_model", "LP", [e]) """ return gp.Model( self, name, problem, equations, sense, objective, matches, limited_variables, )
[docs] def copy(self, working_directory: str) -> Container: """ Creates a copy of the Container. Should not be invoked after creating the model. Parameters ---------- working_directory : str, optional Working directory of the new Container, by default None Returns ------- Container Raises ------ GamspyException Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> i = gp.Set(m, "i") >>> new_cont = m.copy(working_directory="test") >>> new_cont.data.keys() == m.data.keys() True """ m = Container(working_directory=working_directory) if m.working_directory == self.working_directory: raise GamspyException( "Copy of a container cannot have the same working directory" " with the original container." ) self._run() for name, symbol in self: new_domain = [] for set in symbol.domain: if not isinstance(set, str): new_set = gp.Set( m, set.name, set.domain, set.is_singleton, set.records, set.domain_forwarding, set.description, ) new_domain.append(new_set) else: new_domain.append(set) if isinstance(symbol, gt.Alias): alias_with = gp.Set( m, symbol.alias_with.name, symbol.alias_with.domain, symbol.alias_with.is_singleton, symbol.alias_with.records, ) _ = gp.Alias( m, name, alias_with, ) elif isinstance(symbol, gt.UniverseAlias): _ = gp.UniverseAlias( m, name, ) elif isinstance(symbol, gt.Set): _ = gp.Set( m, name, new_domain, symbol.is_singleton, symbol._records, symbol.domain_forwarding, symbol.description, ) elif isinstance(symbol, gt.Parameter): _ = gp.Parameter( m, name, new_domain, symbol._records, symbol.domain_forwarding, symbol.description, ) elif isinstance(symbol, gt.Variable): _ = gp.Variable( m, name, symbol.type, new_domain, symbol._records, symbol.domain_forwarding, symbol.description, ) elif isinstance(symbol, gt.Equation): symbol_type = symbol.type if symbol.type in ["eq", "leq", "geq"]: symbol_type = "regular" _ = gp.Equation( container=m, name=name, type=symbol_type, domain=new_domain, records=symbol._records, domain_forwarding=symbol.domain_forwarding, description=symbol.description, ) try: shutil.copy( self._save_to._checkpoint_file_name, m._save_to._checkpoint_file_name, ) except FileNotFoundError: # save_to might not exist and it's fine pass shutil.copy( self._restart_from._checkpoint_file_name, m._restart_from._checkpoint_file_name, ) shutil.copy(self._gdx_in, m._gdx_in) shutil.copy(self._gdx_out, m._gdx_out) # if already defined equations exist, add them to .gms file for equation in self.getEquations(): if equation._definition is not None: m._addStatement(equation._definition) return m
[docs] def generateGamsString(self) -> str: """ Generates the GAMS code Returns ------- str """ return self._generate_gams_string()
def _get_unload_symbols_str( self, dirty_names: List[str], gdx_out: str ) -> str: # Write dirty names, import symbols and autogenerated names to gdx autogenerated_names = self._get_autogenerated_symbol_names() unload_names = dirty_names + autogenerated_names + self._import_symbols unload_str = ",".join(unload_names) return f"execute_unload '{gdx_out}' {unload_str}\n" def _preprocess_gdx_paths(self, backend: str) -> Tuple[str, str]: # setup gdx input name according to backend if backend == "engine": return ( os.path.basename(self._gdx_in), os.path.basename(self._gdx_out), ) elif backend == "neos": return "in.gdx", "output.gdx" return self._gdx_in, self._gdx_out def _generate_gams_string( self, backend: str = "local", dirty_names: List[str] = [], ) -> str: LOAD_SYMBOL_TYPES = (gp.Set, gp.Parameter, gp.Variable, gp.Equation) gdx_in, gdx_out = self._preprocess_gdx_paths(backend) # Generate the string string = f"$onMultiR\n$onUNDF\n$gdxIn {gdx_in}\n" for statement in self._unsaved_statements: if isinstance(statement, str): string += statement + "\n" elif isinstance(statement, gp.UniverseAlias): continue else: string += statement.getStatement() + "\n" if isinstance(statement, LOAD_SYMBOL_TYPES): string += f"$load {statement.name}\n" string += "$offUNDF\n$gdxIn\n" string += self._get_unload_symbols_str(dirty_names, gdx_out) return string
[docs] def loadRecordsFromGdx( self, load_from: str, symbol_names: Optional[List[str]] = None, ) -> None: """ Loads data of the given symbols from a gdx file. If no symbol names are given, data of all symbols are loaded. Parameters ---------- load_from : str Path to the gdx file symbols : List[str], optional Symbols whose data will be load from gdx, by default None Examples -------- >>> from gamspy import Container, Set >>> m = Container() >>> i = Set(m, "i", records=["i1", "i2"]) >>> m.write("test.gdx") >>> m2 = Container() >>> m2.loadRecordsFromGdx("test.gdx") >>> print(i.records.equals(m2["i"].records)) True """ symbol_names = self._get_symbol_names_to_load(load_from, symbol_names) temp_container = Container(system_directory=self.system_directory) temp_container.read(load_from, symbol_names) for name in symbol_names: if name in self.data.keys(): updated_records = temp_container[name]._records self[name].records = updated_records if updated_records is not None: self[name].domain_labels = self[name].domain_names else: self.read(load_from, [name])
[docs] def read( self, load_from: str, symbol_names: Optional[List[str]] = None ) -> None: """ Reads specified symbols from the gdx file. If symbol_names are not provided, it reads all symbols from the gdx file. Parameters ---------- load_from : str symbol_names : List[str], optional Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> i = gp.Set(m, "i", records=['i1', 'i2']) >>> m.write("test.gdx") >>> new_container = gp.Container() >>> new_container.read("test.gdx") >>> new_container.data.keys() == m.data.keys() True """ super().read(load_from, symbol_names) self._cast_symbols(symbol_names)
[docs] def write( self, write_to: str, symbols: Optional[List[str]] = None, ) -> None: """ Writes specified symbols to the gdx file. If symbol_names are not provided, it writes all symbols to the gdx file. Parameters ---------- write_to : str symbols : List[str], optional Examples -------- >>> import gamspy as gp >>> m = gp.Container() >>> i = gp.Set(m, "i", records=['i1', 'i2']) >>> m.write("test.gdx") """ dirty_names, assigned_names = self._get_touched_symbol_names() # If there are dirty symbols, make 'em clean by calculating their records if len(dirty_names) > 0: self._run() super().write(write_to, symbols) self._update_assigned_state(assigned_names)