Source code for gamspy._backend.engine

from __future__ import annotations

import copy
import io
import json
import logging
import os
import shutil
import tempfile
import time
import urllib.parse
import zipfile
from typing import TYPE_CHECKING

import certifi
import urllib3
from gams import GamsEngineConfiguration
from gams.control.workspace import GamsException

import gamspy._backend.backend as backend
import gamspy.utils as utils
from gamspy._options import Options
from gamspy.exceptions import (
    EngineClientException,
    EngineException,
    GamspyException,
    ValidationError,
)

if TYPE_CHECKING:
    from gamspy import Container, Model
    from gamspy._symbols.symbol import Symbol


logger = logging.getLogger("ENGINE")
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter("[%(name)s - %(levelname)s] %(message)s")
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)


MAX_REQUEST_ATTEMPS = 3
ZIP_NAME = "data.zip"
INEX_FILE_NAME = "inex.json"
STATUS_MAP = {
    -10: "waiting",
    -3: "cancelled",
    -2: "cancelling",
    -1: "corrupted",
    0: "queued",
    1: "running",
    2: "outputting",
    10: "finished",
}


def get_relative_paths(paths: list[str], start: str) -> list[str]:
    relative_paths = []
    for path in paths:
        relative_path = os.path.relpath(path, start)
        if relative_path.startswith(f"..{os.sep}"):
            raise ValidationError(
                "Extra model file path must be relative to the working"
                f" directory. The given path: {path}, the working"
                f" directory: {start}, the"
                f" relative path: {relative_path}"
            )

        relative_paths.append(relative_path)

    return relative_paths


SCOPES = [
    "READONLY",
    "NAMESPACES",
    "JOBS",
    "USERS",
    "HYPERCUBE",
    "CLEANUP",
    "LICENSES",
    "USAGE",
    "AUTH",
    "CONFIGURATION",
]


class Endpoint:
    def get_request_headers(self):
        return {
            "Authorization": self.client._engine_config._get_auth_header(),
            "User-Agent": "GAMSPy EngineClient",
            "Accept": "application/json",
        }


[docs] class Auth(Endpoint): def __init__(self, client: EngineClient) -> None: self.client = client self._http = client._http self.engine_options = client.engine_options
[docs] def post( self, expires_in: int = 14400, scope: list[str] | None = None, ) -> str: # type: ignore """ Creates a JSON Web Token(JWT) for authentication Parameters ---------- expires_in : int, optional Expiration time, by default 14400 scope : list[str] | None, optional Scope of the token, by default None Returns ------- str token Raises ------ EngineClientException In case bad request EngineClientException In case unauthorized request EngineClientException In case there is an internal error GamspyException In case the status code is unrecognized """ info = {"expires_in": str(expires_in)} if isinstance(scope, list): for elem in scope: if elem not in SCOPES: raise ValidationError(f"{elem} is not a valid scope") scope_info = " ".join(scope) info.update({"scope": scope_info}) for attempt_number in range(MAX_REQUEST_ATTEMPS): r = self._http.request( "POST", self.client._engine_config.host + "/auth/?" + urllib.parse.urlencode(info, doseq=True), headers=self.get_request_headers(), ) response_data = r.data.decode("utf-8", errors="replace") if r.status in [200, 400, 401, 500]: info = json.loads(response_data) if r.status == 200: return info["token"] elif r.status == 400: raise EngineClientException(f"Bad request: {info['message']}") elif r.status == 401: raise EngineClientException("Unauthorized!") elif r.status == 429: time.sleep(2**attempt_number) # retry with exponential backoff continue elif r.status == 500: raise EngineClientException( f"Internal error: {info['message']}" ) raise GamspyException(f"Unrecognized status code {r.status}")
[docs] def login( self, expires_in: int = 14400, scope: list[str] | None = None ) -> str: """ Creates a JSON Web Token(JWT) for authentication (username and password in request body) Parameters ---------- expires_in : int, optional Expiration time for the token, by default 14400 scope: list[str], optional Scope of the token, by default None Returns ------- str """ info = { "username": self.client._engine_config.username, "password": self.client._engine_config.password, "expires_in": expires_in, } if isinstance(scope, list): for elem in scope: if elem not in SCOPES: raise ValidationError(f"{elem} is not a valid scope") scope_info = " ".join(scope) info.update({"scope": scope_info}) for attempt_number in range(MAX_REQUEST_ATTEMPS): r = self._http.request( "POST", self.client._engine_config.host + "/auth/login", fields=info, ) response_data = r.data.decode("utf-8", errors="replace") if r.status in [200, 400, 401, 500]: info = json.loads(response_data) if r.status == 200: return info["token"] elif r.status == 400: raise EngineClientException(f"Bad request: {info['message']}") elif r.status == 401: raise EngineClientException(f"Unauthorized: {info['message']}") elif r.status == 429: time.sleep(2**attempt_number) # retry with exponential backoff continue elif r.status == 500: raise EngineClientException( f"Internal error: {info['message']}" ) raise GamspyException(f"Unrecognized status code {r.status}")
[docs] def logout(self) -> str: """ Invalidates all of your JSON Web Tokens(JWTs) Returns ------- str """ for attempt_number in range(MAX_REQUEST_ATTEMPS): r = self._http.request( "POST", self.client._engine_config.host + "/auth/logout", headers=self.get_request_headers(), ) if r.status == 200: response_data = r.data.decode("utf-8", errors="replace") info = json.loads(response_data) return info["message"] elif r.status == 400: raise EngineClientException("Bad request!") elif r.status == 401: raise EngineClientException("Unauthorized!") elif r.status == 429: time.sleep(2**attempt_number) # retry with exponential backoff continue elif r.status == 500: raise EngineClientException("Internal error!") raise GamspyException(f"Unrecognized status code {r.status}")
[docs] class Job(Endpoint): def __init__(self, client: EngineClient) -> None: self.client = client self._http = client._http self.engine_options = client.engine_options
[docs] def get(self, token: str) -> tuple[int, str, int | None]: """ Get request to /jobs/{token} which returns the details of a job. Refer to https://engine.gams.com/api/ for more details. Parameters ---------- token : str Job token Returns ------- tuple[int, str, int] Job status, job status message, and gams exit code Raises ------ EngineClientException If get request has failed. """ for attempt_number in range(MAX_REQUEST_ATTEMPS): r = self._http.request( "GET", self.client._engine_config.host + f"/jobs/{token}", headers=self.get_request_headers(), ) response_data = r.data.decode("utf-8", errors="replace") if r.status == 200: info = json.loads(response_data) job_status = int(info["status"]) return ( job_status, STATUS_MAP[job_status], info["process_status"], ) elif r.status == 403: raise EngineClientException("Unauthorized!") elif r.status == 404: raise EngineClientException(f"Job {token} not found!") elif r.status == 429: time.sleep(2**attempt_number) # retry with exponential backoff continue elif r.status == 500: raise EngineClientException("Internal error!") raise EngineClientException( "Creating job on GAMS Engine failed after: " + str(MAX_REQUEST_ATTEMPS) + " attempts. Message: " + response_data )
[docs] def post( self, working_directory: str, gms_file: str, pf_file: str | None = None, ) -> str: """ Post request to /jobs which submits a new job to be solved. Refer to https://engine.gams.com/api/ for more details. Parameters ---------- working_directory : str Working directory gms_file : str Name of the gms file pf_file: str | None Name of the pf file Returns ------- str Token Raises ------ EngineClientException If post request has failed. """ model_data_zip = self._create_zip_file( working_directory, gms_file, pf_file ) gms_file = os.path.relpath(gms_file, working_directory) pf_file = ( os.path.relpath(pf_file, working_directory) if pf_file is not None else None ) query_params, file_params = self._get_params( model_data_zip, gms_file, pf_file ) for attempt_number in range(MAX_REQUEST_ATTEMPS): r = self._http.request( "POST", self.client._engine_config.host + "/jobs/?" + urllib.parse.urlencode(query_params, doseq=True), fields=file_params, headers=self.get_request_headers(), ) response_data = r.data.decode("utf-8", errors="replace") if r.status == 201: return json.loads(response_data)["token"] elif r.status == 400: raise EngineClientException("Input is not valid!") elif r.status == 401: raise EngineClientException("Invalid authentication!") elif r.status == 402: raise EngineClientException("Quota exceeded!") elif r.status == 403: raise EngineClientException("Unauthorized access!") elif r.status == 404: raise EngineClientException("Namespace could not be found!") elif r.status == 429: time.sleep(2**attempt_number) # retry with exponential backoff continue elif r.status == 500: raise EngineClientException("Internal error!") raise EngineClientException( "Creating job on GAMS Engine failed after: " + str(MAX_REQUEST_ATTEMPS) + " attempts. Message: " + response_data )
[docs] def get_results(self, token: str, working_directory: str): """ Get request to /jobs/{token}/result which downloads the job results. Downloaded results are unpacked to working directory. Refer to https://engine.gams.com/api/ for more details. Parameters ---------- token : str Job token working_directory : str Working directory Raises ------ EngineClientException If get request has failed. """ if not os.path.exists(working_directory): os.makedirs(working_directory, exist_ok=True) for attempt_number in range(MAX_REQUEST_ATTEMPS): r = self._http.request( "GET", self.client._engine_config.host + f"/jobs/{token}/result", headers=self.get_request_headers(), preload_content=False, ) if r.status == 200: fd, path = tempfile.mkstemp() try: with open(path, "wb") as out: while True: data = r.read(6000) if not data: break out.write(data) out.flush() r.release_conn() with zipfile.ZipFile(path, "r") as zip_ref: zip_ref.extractall(working_directory) finally: os.close(fd) os.remove(path) logger.info( f"Results have been extracted to your working directory: {working_directory}." ) elif r.status == 400: raise EngineClientException("Bad request!") elif r.status == 401: raise EngineClientException("Unauthorized!") elif r.status == 403: raise EngineClientException("Job data does not exist!") elif r.status == 404: raise EngineClientException("No job found!") elif r.status == 429: time.sleep(2**attempt_number) # retry with exponential backoff continue else: response_data = r.data.decode("utf-8", errors="replace") raise EngineClientException( "Fatal error while getting the results back from engine. GAMS" f" Engine return code: {r.status}. Error message:" f" {response_data}" )
[docs] def delete_results(self, token: str): """ Delete request to /jobs/{token} which deletes the job results. Refer to https://engine.gams.com/api/ for more details. Parameters ---------- token : str Job token Raises ------ EngineClientException If job data does not exist in GAMS Engine. EngineClientException If delete request has failed. """ for attempt_number in range(MAX_REQUEST_ATTEMPS): r = self._http.request( "DELETE", self.client._engine_config.host + "/jobs/" + token + "/result", headers=self.get_request_headers(), ) response_data = r.data.decode("utf-8", errors="replace") if r.status == 200: logger.info(f"Results for {token} have been deleted.") return elif r.status == 400: raise EngineClientException("Bad request!") elif r.status == 401: raise EngineClientException("Unauthorized!") elif r.status == 403: raise EngineClientException("Job data does not exist!") elif r.status == 404: raise EngineClientException("No job found!") elif r.status == 429: time.sleep(2**attempt_number) # retry with exponential backoff continue raise EngineClientException( "Removing job result failed with status code: " + str(r.status) + ". Message: " + response_data )
[docs] def get_logs(self, token: str) -> tuple[str, bool]: """ Get request to /jobs/{token}/unread-logs which returns stdout of a job. Refer to https://engine.gams.com/api/ for more details. Parameters ---------- token : str Job token Returns ------- tuple[str, bool] Current output buffer and queue finished status Raises ------ EngineClientException If get request has failed. """ for attempt_number in range(MAX_REQUEST_ATTEMPS): r = self._http.request( "DELETE", self.client._engine_config.host + f"/jobs/{token}/unread-logs", headers=self.get_request_headers(), ) response_data = r.data.decode("utf-8", errors="replace") if r.status == 200: info = json.loads(response_data) return info["message"], info["queue_finished"] elif r.status == 308: info = json.loads(response_data) return info["message"], True elif r.status == 403: raise EngineClientException( "Cannot get logs of pending job / Unauthorized access!" ) elif r.status == 404: raise EngineClientException("No job found!") elif r.status == 429: time.sleep(2**attempt_number) # retry with exponential backoff continue raise EngineClientException( "Getting logs failed with status code: " + str(r.status) + ". " + response_data + "." )
def _create_zip_file( self, working_directory: str, gms_file: str, pf_file: str | None, ) -> io.BytesIO: model_data_zip = io.BytesIO() model_files = [gms_file] if pf_file is not None: model_files.append(pf_file) model_files += self.client.extra_model_files model_files = get_relative_paths(model_files, working_directory) with zipfile.ZipFile( model_data_zip, "w", zipfile.ZIP_DEFLATED ) as model_data: for model_file in model_files: model_data.write( os.path.join(working_directory, model_file), arcname=model_file, ) model_data_zip.seek(0) return model_data_zip def _get_params(self, model_data_zip, gms_file, pf_file: str | None): file_params = {} query_params = ( copy.deepcopy(self.engine_options) if self.engine_options else {} ) query_params["namespace"] = self.client._engine_config.namespace if "data" in query_params or "model_data" in query_params: raise ValidationError( "`engine_options` must not include keys `data` or " "`model_data`. Please use `extra_model_files` to " "provide additional files to send to GAMS Engine.", ) if "inex_file" in query_params: if isinstance(query_params["inex_file"], io.IOBase): file_params["inex_file"] = ( INEX_FILE_NAME, query_params["inex_file"].read(), "application/json", ) else: with open(query_params["inex_file"], "rb") as f: file_params["inex_file"] = ( INEX_FILE_NAME, f.read(), "application/json", ) del query_params["inex_file"] if "model" in query_params: file_params["data"] = ( ZIP_NAME, model_data_zip.read(), "application/zip", ) else: query_params["run"] = gms_file query_params["model"] = os.path.splitext(gms_file)[0] file_params["model_data"] = ( ZIP_NAME, model_data_zip.read(), "application/zip", ) model_data_zip.close() if "arguments" in query_params: if not isinstance(query_params["arguments"], list): query_params["arguments"] = [query_params["arguments"]] if pf_file is not None: query_params["arguments"].append(f"pf={pf_file}") else: if pf_file is not None: query_params["arguments"] = [f"pf={pf_file}"] return query_params, file_params
[docs] class EngineClient: """ GAMS Engine Client. Parameters ---------- host : str Host address. username : str | None, optional Username password : str | None, optional Password jwt : str | None, optional JWT Token namespace : str, optional Namespace, by default "global" extra_model_files : list[str], optional extra_model_files, by default [] engine_options : dict | None, optional GAMS Engine options, by default None remove_results : bool, optional Whether to remove the results from GAMS Engine Server, by default False is_blocking : bool, optional Synchronous (blocking) or asynchronous, by default True Examples -------- >>> import gamspy as gp >>> client = gp.EngineClient(host="https://engine.gams.com/api/", username="<username>", password="<password>") """ def __init__( self, host: str, username: str | None = None, password: str | None = None, jwt: str | None = None, namespace: str = "global", extra_model_files: list[str] = [], engine_options: dict | None = None, remove_results: bool = False, is_blocking: bool = True, ): self.host = host self.username = username self.password = password self.jwt = jwt self.namespace = namespace self.extra_model_files = extra_model_files self.engine_options = engine_options self.remove_results = remove_results self.is_blocking = is_blocking self.tokens: list[str] = [] self._http = urllib3.PoolManager( cert_reqs="CERT_REQUIRED", ca_certs=certifi.where() ) self._engine_config = self._get_engine_config() # Endpoints self.job = Job(self) self.auth = Auth(self) def _get_engine_config(self): try: return GamsEngineConfiguration( self.host, self.username, self.password, self.jwt, self.namespace, ) except GamsException as e: raise ValidationError(e) from e
class GAMSEngine(backend.Backend): def __init__( self, container: Container, client: EngineClient | None, options: Options, solver: str, solver_options: dict | None, output: io.TextIOWrapper | None, model: Model, load_symbols: list[Symbol] | None, ) -> None: if client is None or not isinstance(client, EngineClient): raise ValidationError( "`engine_client` must be provided to solve on GAMS Engine" ) if solver.lower() in ["mpsge"]: raise ValidationError( f"`{solver}` is not a valid solver for GAMS Engine." ) super().__init__( container, model, options, solver, solver_options, output, load_symbols, ) self.client = client self.job_name = self.get_job_name() self.gms_file = self.job_name + ".gms" self.pf_file = self.job_name + ".pf" self.restart_file = self.job_name + ".g00" self.trace_file = self.job_name + ".txt" def is_async(self): return not self.client.is_blocking def run( self, relaxed_domain_mapping: bool = False, gams_to_gamspy: bool = False, ): # Run a dummy job to get the restart file to be sent to GAMS Engine self._create_restart_file() # Generate gams string and write modified symbols to gdx gams_string = self.preprocess(os.path.basename(self.container._gdx_in)) self.execute_gams(gams_string) if self.is_async(): return None # Synchronize GAMSPy with checkpoint and return a summary summary = self.postprocess() # Run another dummy job to synchronize GAMS and GAMSPy state self._sync() return summary def execute_gams(self, gams_string: str): extra_options = { "gdx": os.path.basename(self.container._gdx_out), "gdxSymbols": "newOrChanged", "trace": os.path.basename(self.trace_file), "restart": os.path.basename(self.restart_file), "input": os.path.basename(self.gms_file), } self.options._set_extra_options(extra_options) self.options._export(self.pf_file, self.output) with open(self.gms_file, "w", encoding="utf-8") as file: file.write(gams_string) try: original_extra_files = copy.deepcopy(self.client.extra_model_files) self.client.extra_model_files = self._append_gamspy_files() token = self.client.job.post( self.container.working_directory, self.gms_file, self.pf_file, ) self.client.extra_model_files = original_extra_files self.client.tokens.append(token) if not self.is_async(): job_status, message, _ = self.client.job.get(token) if job_status not in STATUS_MAP: raise EngineException( "Unknown job status code! Currently supported job" f" status codes: {STATUS_MAP.keys()}", return_code=job_status, status_code=job_status, ) if job_status in [-1, -3]: raise EngineException( "Could not get job results because the job is" f" {message}.", return_code=job_status, status_code=job_status, ) while job_status in [-10, -2, 0]: logger.info(f"Job status is {message}...") job_status, _, _ = self.client.job.get(token) while job_status in [1, 2]: message, is_finished = self.client.job.get_logs(token) if self.output is not None: self.output.write(message) if is_finished: job_status = 10 self.client.job.get_results( token, self.container.working_directory ) engine_output_path = os.path.join( self.container.working_directory, "log_stdout.txt" ) if self.options.log_file: # logoption = 2 | 4 shutil.move(engine_output_path, self.options.log_file) else: try: os.remove(engine_output_path) # logoption = 0 | 3 except FileNotFoundError: ... self.model._update_model_attributes() if self.client.remove_results: self.client.job.delete_results(token) finally: self.container._unsaved_statements = [] self.container._delete_autogenerated_symbols() def postprocess(self): super().load_records() if self.client.remove_results or self.model is None: return None if not self.is_async(): self.parse_listings() return self.prepare_summary(self.trace_file) return None def _prepare_dummy_options(self) -> dict: scrdir = self.container._process_directory extra_options = { "gdx": self.container._gdx_out, "gdxSymbols": "newOrChanged", "trace": self.trace_file, "input": self.gms_file, "sysdir": self.container.system_directory, "scrdir": scrdir, "scriptnext": os.path.join(scrdir, "gamsnext.sh"), "logoption": 0, "previouswork": 1, "license": utils._get_license_path( self.container.system_directory ), } if self.container._network_license: extra_options["netlicense"] = os.path.join(scrdir, "gamslice.dat") return extra_options def _create_restart_file(self): with open(self.gms_file, "w", encoding="utf-8") as gams_file: gams_file.write("") options = Options() extra_options = self._prepare_dummy_options() options._set_extra_options(extra_options) options._extra_options["save"] = self.restart_file options._export(self.pf_file) self.container._send_job(self.job_name, self.pf_file) def _sync(self): symbols = utils._get_symbol_names_from_gdx( self.container.system_directory, self.container._gdx_out ) dirty_str = ",".join(symbols) with open(self.gms_file, "w", encoding="utf-8") as gams_file: gams_file.write( f'execute_load "{self.container._gdx_out}", {dirty_str};' ) options = Options() extra_options = self._prepare_dummy_options() options._set_extra_options(extra_options) options._export(self.pf_file) self.container._send_job(self.job_name, self.pf_file) def _append_gamspy_files(self) -> list[str]: extra_model_files = self.client.extra_model_files + [ self.container._gdx_in, self.restart_file, ] if self.solver_options: extra_model_files.append( os.path.join( self.container.working_directory, f"{self.solver.lower()}.opt", ) ) if self.options.loadpoint is not None: extra_model_files.append(self.options.loadpoint) if self.model.external_module is not None: extra_model_files.append(self.model.external_module) return extra_model_files