Source code for gamspy._backend.engine

#
# GAMS - General Algebraic Modeling System Python API
#
# Copyright (c) 2023 GAMS Development Corp. <support@gams.com>
# Copyright (c) 2023 GAMS Software GmbH <support@gams.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
from __future__ import annotations

import os
import copy
import io
import uuid
import json
import time
import logging
import tempfile

from typing import TYPE_CHECKING

from abc import ABC

import urllib3
import urllib.parse
import certifi
import zipfile

from gams import DebugLevel
from gams import GamsEngineConfiguration
from gams import GamsOptions
from gams.control.workspace import GamsException
from gams.core.gmo import gmoProc_nrofmodeltypes
from gams.core.cfg import cfgModelTypeName
from gams.core.opt import optSetStrStr

import gamspy._backend.backend as backend
from gamspy.exceptions import GamsEngineException
from gamspy.exceptions import ValidationError

if TYPE_CHECKING:
    from gamspy import Container
    from gamspy import Model


logger = logging.getLogger("ENGINE")
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter("[%(name)s - %(levelname)s] %(message)s")
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)


MAX_REQUEST_ATTEMPS = 3
ZIP_NAME = "data.zip"
INEX_FILE_NAME = "inex.json"
STATUS_MAP = {
    -10: "waiting",
    -3: "cancelled",
    -2: "cancelling",
    -1: "corrupted",
    0: "queued",
    1: "running",
    2: "outputting",
    10: "finished",
}


def get_relative_paths(paths: list[str], start: str) -> list[str]:
    relative_paths = []
    for path in paths:
        relative_path = os.path.relpath(path, start)
        if relative_path.startswith(f"..{os.sep}"):
            raise ValidationError(
                "Extra model file path must be relative to the working"
                f" directory. The given path: {path}, the working"
                f" directory: {start}, the"
                f" relative path: {relative_path}"
            )

        relative_paths.append(relative_path)

    return relative_paths


class Endpoint(ABC): ...


class Job(Endpoint):
    def __init__(
        self,
        http: urllib3.PoolManager,
        extra_model_files: list,
        engine_config: GamsEngineConfiguration,
        request_headers: dict,
        engine_options: dict | None = None,
    ) -> None:
        self._http = http
        self.extra_model_files = extra_model_files
        self.engine_options = engine_options
        self._engine_config = engine_config
        self._request_headers = request_headers

    def get(self, token: str) -> tuple[int, str, int | None]:
        """
        Get request to /jobs/{token} which returns the details of a job.
        Refer to https://engine.gams.com/api/ for more details.

        Parameters
        ----------
        token : str
            Job token

        Returns
        -------
        tuple[int, str, int]
            Job status, job status message, and gams exit code

        Raises
        ------
        GamsEngineException
            If get request has failed.
        """
        for attempt_number in range(MAX_REQUEST_ATTEMPS):
            r = self._http.request(
                "GET",
                self._engine_config.host + f"/jobs/{token}",
                headers=self._request_headers,
            )
            response_data = r.data.decode("utf-8", errors="replace")

            if r.status == 200:
                info = json.loads(response_data)
                job_status = int(info["status"])
                return (
                    job_status,
                    STATUS_MAP[job_status],
                    info["process_status"],
                )
            elif r.status == 429:
                time.sleep(2**attempt_number)  # retry with exponential backoff
                continue

            raise GamsEngineException(
                "Creating job on GAMS Engine failed with status code: "
                + str(r.status)
                + ". Message: "
                + response_data,
                r.status,
            )

        raise GamsEngineException(
            "Creating job on GAMS Engine failed after: "
            + str(MAX_REQUEST_ATTEMPS)
            + " attempts. Message: "
            + response_data,
            r.status,
        )

    def post(
        self,
        working_directory: str,
        gms_file: str,
        pf_file: str | None = None,
    ) -> str:
        """
        Post request to /jobs which submits a new job to be solved.
        Refer to https://engine.gams.com/api/ for more details.

        Parameters
        ----------
        working_directory : str
            Working directory
        gms_file : str
            Name of the gms file
        pf_file: str | None
            Name of the pf file

        Returns
        -------
        str
            Token

        Raises
        ------
        GamsEngineException
            If post request has failed.
        """
        model_data_zip = self._create_zip_file(
            working_directory, gms_file, pf_file
        )
        gms_file = os.path.relpath(gms_file, working_directory)
        pf_file = (
            os.path.relpath(pf_file, working_directory)
            if pf_file is not None
            else None
        )
        query_params, file_params = self._get_params(
            model_data_zip, gms_file, pf_file
        )

        for attempt_number in range(MAX_REQUEST_ATTEMPS):
            r = self._http.request(
                "POST",
                self._engine_config.host
                + "/jobs/?"
                + urllib.parse.urlencode(query_params, doseq=True),
                fields=file_params,
                headers=self._request_headers,
            )
            response_data = r.data.decode("utf-8", errors="replace")
            if r.status == 201:
                return json.loads(response_data)["token"]
            elif r.status == 429:
                time.sleep(2**attempt_number)  # retry with exponential backoff
                continue

            raise GamsEngineException(
                "Creating job on GAMS Engine failed with status code: "
                + str(r.status)
                + ". Message: "
                + response_data,
                r.status,
            )

        raise GamsEngineException(
            "Creating job on GAMS Engine failed after: "
            + str(MAX_REQUEST_ATTEMPS)
            + " attempts. Message: "
            + response_data,
            r.status,
        )

    def get_results(self, token: str, working_directory: str):
        """
        Get request to /jobs/{token}/result which downloads the job results.
        Downloaded results are unpacked to working directory.
        Refer to https://engine.gams.com/api/ for more details.

        Parameters
        ----------
        token : str
            Job token
        working_directory : str
            Working directory

        Raises
        ------
        GamsEngineException
            If get request has failed.
        """
        if not os.path.exists(working_directory):
            os.makedirs(working_directory, exist_ok=True)

        r = self._http.request(
            "GET",
            self._engine_config.host + f"/jobs/{token}/result",
            headers=self._request_headers,
            preload_content=False,
        )

        if r.status == 200:
            fd, path = tempfile.mkstemp()

            try:
                with open(path, "wb") as out:
                    while True:
                        data = r.read(6000)
                        if not data:
                            break
                        out.write(data)

                r.release_conn()

                with zipfile.ZipFile(path, "r") as zip_ref:
                    zip_ref.extractall(working_directory)
            finally:
                os.close(fd)
                os.remove(path)
        else:
            response_data = r.data.decode("utf-8", errors="replace")
            raise GamsEngineException(
                "Fatal error while getting the results back from engine. GAMS"
                f" Engine return code: {r.status}. Error message:"
                f" {response_data['message']}",
                r.status,
            )

    def delete_results(self, token: str):
        """
        Delete request to /jobs/{token} which deletes the job results.
        Refer to https://engine.gams.com/api/ for more details.

        Parameters
        ----------
        token : str
            Job token

        Raises
        ------
        GamsEngineException
            If job data does not exist in GAMS Engine.
        GamsEngineException
            If delete request has failed.
        """
        for attempt_number in range(MAX_REQUEST_ATTEMPS):
            r = self._http.request(
                "DELETE",
                self._engine_config.host + "/jobs/" + token + "/result",
                headers=self._request_headers,
            )
            response_data = r.data.decode("utf-8", errors="replace")

            if r.status == 200:
                return
            elif r.status == 403:
                raise GamsEngineException(
                    "Job data does not exist in GAMS Engine!", r.status
                )
            elif r.status == 429:
                time.sleep(2**attempt_number)  # retry with exponential backoff
                continue

            raise GamsEngineException(
                "Removing job result failed with status code: "
                + str(r.status)
                + ". Message: "
                + response_data,
                r.status,
            )

        raise GamsEngineException(
            "Removing job result failed after: "
            + str(MAX_REQUEST_ATTEMPS)
            + " attempts. Message: "
            + response_data,
            r.status,
        )

    def get_logs(self, token: str) -> tuple[str, bool]:
        """
        Get request to /jobs/{token}/unread-logs which returns stdout of a job.
        Refer to https://engine.gams.com/api/ for more details.

        Parameters
        ----------
        token : str
            Job token

        Returns
        -------
        str
            Current output buffer

        Raises
        ------
        GamsEngineException
            If get request has failed.
        """
        for attempt_number in range(MAX_REQUEST_ATTEMPS):
            r = self._http.request(
                "DELETE",
                self._engine_config.host + f"/jobs/{token}/unread-logs",
                headers=self._request_headers,
            )
            response_data = r.data.decode("utf-8", errors="replace")
            response_data = json.loads(response_data)

            if r.status == 429:
                time.sleep(2**attempt_number)  # retry with exponential backoff
                continue
            elif r.status != 200:
                raise GamsEngineException(
                    "Getting logs failed with status code: "
                    + str(r.status)
                    + ". "
                    + response_data["message"]
                    + ".",
                    r.status,
                )
            stdout_data = response_data["message"]
            break

        return stdout_data, response_data["queue_finished"]

    def _create_zip_file(
        self,
        working_directory: str,
        gms_file: str,
        pf_file: str | None,
    ) -> io.BytesIO:
        model_data_zip = io.BytesIO()
        model_files = [gms_file]
        if pf_file is not None:
            model_files.append(pf_file)

        model_files += self.extra_model_files
        model_files = get_relative_paths(model_files, working_directory)

        with zipfile.ZipFile(
            model_data_zip, "w", zipfile.ZIP_DEFLATED
        ) as model_data:
            for model_file in model_files:
                model_data.write(
                    os.path.join(working_directory, model_file),
                    arcname=model_file,
                )

        model_data_zip.seek(0)

        return model_data_zip

    def _get_params(self, model_data_zip, gms_file, pf_file: str | None):
        file_params = {}
        query_params = (
            copy.deepcopy(self.engine_options) if self.engine_options else {}
        )

        query_params["namespace"] = self._engine_config.namespace

        if "data" in query_params or "model_data" in query_params:
            raise ValidationError(
                "`engine_options` must not include keys `data` or "
                "`model_data`. Please use `extra_model_files` to "
                "provide additional files to send to GAMS Engine.",
            )

        if "inex_file" in query_params:
            if isinstance(query_params["inex_file"], io.IOBase):
                file_params["inex_file"] = (
                    INEX_FILE_NAME,
                    query_params["inex_file"].read(),
                    "application/json",
                )
            else:
                with open(query_params["inex_file"], "rb") as f:
                    file_params["inex_file"] = (
                        INEX_FILE_NAME,
                        f.read(),
                        "application/json",
                    )
            del query_params["inex_file"]

        if "model" in query_params:
            file_params["data"] = (
                ZIP_NAME,
                model_data_zip.read(),
                "application/zip",
            )
        else:
            query_params["run"] = gms_file
            query_params["model"] = os.path.splitext(gms_file)[0]
            file_params["model_data"] = (
                ZIP_NAME,
                model_data_zip.read(),
                "application/zip",
            )

        model_data_zip.close()

        if "arguments" in query_params:
            if not isinstance(query_params["arguments"], list):
                query_params["arguments"] = [query_params["arguments"]]

            if pf_file is not None:
                query_params["arguments"].append(f"pf={pf_file}")
        else:
            if pf_file is not None:
                query_params["arguments"] = [f"pf={pf_file}"]

        return query_params, file_params


[docs]class EngineClient: def __init__( self, host: str, username: str | None = None, password: str | None = None, jwt: str | None = None, namespace: str = "global", extra_model_files: list[str] = [], engine_options: dict | None = None, remove_results: bool = False, is_blocking: bool = True, ): self.host = host self.username = username self.password = password self.jwt = jwt self.namespace = namespace self.extra_model_files = extra_model_files self.engine_options = engine_options self.remove_results = remove_results self.is_blocking = is_blocking self.tokens: list[tuple] = [] self._http = urllib3.PoolManager( cert_reqs="CERT_REQUIRED", ca_certs=certifi.where() ) self._engine_config = self._get_engine_config() self._request_headers = { "Authorization": self._engine_config._get_auth_header(), "User-Agent": "GAMSPy EngineClient", "Accept": "application/json", } # Endpoints self.job = Job( self._http, extra_model_files, self._engine_config, self._request_headers, engine_options, ) def _get_engine_config(self): try: return GamsEngineConfiguration( self.host, self.username, self.password, self.jwt, self.namespace, ) except GamsException as e: raise ValidationError(e) from e
class GAMSEngine(backend.Backend): def __init__( self, container: Container, client: EngineClient | None, options: GamsOptions, output: io.TextIOWrapper | None = None, model: Model | None = None, ) -> None: if client is None: raise ValidationError( "`engine_client` must be provided to solve on GAMS Engine" ) super().__init__( container, os.path.basename(container._gdx_in), os.path.basename(container._gdx_out), ) self.client = client self.options = options self.output = output self.model = model self.job_name = f"_job_{uuid.uuid4()}" self.gms_file = self.job_name + ".gms" self.pf_file = self.job_name + ".pf" def is_async(self): return False if self.client.is_blocking else True def preprocess(self, keep_flags: bool = False): gams_string, dirty_names = super().preprocess(keep_flags) # Set selected solvers for i in range(1, gmoProc_nrofmodeltypes): optSetStrStr( self.options._opt, cfgModelTypeName(self.options._cfg, i), self.options._selected_solvers[i], ) # Set save file path self.options._save = self.container._save_to.name # Set restart file path self.options._restart = self.container._restart_from.name # Set input file path self.options._input = self.job_name + ".gms" # Set output file path if not self.options.output: self.options.output = self.job_name + ".lst" # Export pf file self.options.export(self.pf_file) # Export gms file gms_path = os.path.join( self.container.working_directory, self.gms_file ) with open(gms_path, "w", encoding="utf-8") as file: file.write(gams_string) return dirty_names def solve(self, is_implicit: bool = False, keep_flags: bool = False): dirty_names = self.preprocess(keep_flags) self.run() if self.is_async(): return None # Synchronize GAMSPy with checkpoint and return a summary summary = self.postprocess(dirty_names, is_implicit) return summary def run(self): try: original_extra_files = copy.deepcopy( self.client.job.extra_model_files ) self.client.job.extra_model_files = self._append_gamspy_files() token = self.client.job.post( self.container.working_directory, os.path.join(self.container.working_directory, self.gms_file), os.path.join(self.container.working_directory, self.pf_file), ) self.client.job.extra_model_files = original_extra_files self.client.tokens.append(token) if not self.is_async(): job_status, message, _ = self.client.job.get(token) if job_status not in STATUS_MAP.keys(): raise GamsEngineException( "Unknown job status code! Currently supported job" f" status codes: {STATUS_MAP.keys()}", status_code=job_status, ) if job_status in [-1, -3]: raise GamsEngineException( "Could not get job results because the job is" f" {message}.", status_code=job_status, ) while job_status in [-10, -2, 0]: logger.info(f"Job status is {message}...") job_status = self.client.job.get(token) while job_status in [1, 2]: message, is_finished = self.client.job.get_logs(token) if self.output is not None: self.output.write(message) if is_finished: job_status = 10 self.client.job.get_results( token, self.container.working_directory ) self.model._update_model_attributes() except GamsEngineException as e: if self.container._debugging_level == "keep_on_error": self.container.workspace._debug = DebugLevel.KeepFiles raise e finally: self.container._unsaved_statements = [] self.container._delete_autogenerated_symbols() def postprocess(self, dirty_names: list[str], is_implicit: bool = False): symbols = dirty_names + self.container._import_symbols if len(symbols) != 0: self.container._load_records_from_gdx( self.container._gdx_out, symbols ) self.container._swap_checkpoints() if ( self.client.remove_results or self.options.traceopt != 3 or is_implicit ): return None return self.prepare_summary( self.container.working_directory, self.options.trace ) def _append_gamspy_files(self) -> list[str]: extra_model_files = self.client.job.extra_model_files + [ self.container._gdx_in, self.container._restart_from._checkpoint_file_name, ] return extra_model_files