Source code for gamspy.formulations.ml.regression_tree

from __future__ import annotations

import importlib
import uuid
from typing import TYPE_CHECKING

import numpy as np

import gamspy as gp
import gamspy._algebra.expression as expression
import gamspy.formulations.utils as utils
from gamspy.exceptions import ValidationError
from gamspy.formulations.ml.decision_tree_struct import DecisionTreeStruct

if TYPE_CHECKING:
    from sklearn.tree import DecisionTreeRegressor


[docs] class RegressionTree: """ Formulation generator for Regression Trees in GAMSPy. Parameters ---------- container : Container Container that will contain the new variable and equations. regressor: DecisionTreeRegressor | DecisionTreeStruct - A fitted `sklearn.tree.DecisionTreeRegressor` instance. - If `sklearn.tree.DecisionTreeRegressor` is not utilized, the fitted tree information can be supplied via the `DecisionTreeStruct` dataclass, which represents the same components as those in `sklearn.tree`. See :meth:`DecisionTreeStruct <gamspy.formulations.DecisionTreeStruct>` for details on required attributes. name_prefix : str | None Prefix for generated GAMSPy symbols, by default None which means random prefix. Using the same name_prefix in different formulations causes name conflicts. Do not use the same name_prefix again. Examples -------- >>> import gamspy as gp >>> import numpy as np >>> from gamspy.math import dim >>> np.random.seed(42) >>> m = gp.Container() >>> in_data = np.random.randint(0, 10, size=(5, 2)) >>> out_data = np.random.randint(1, 3, size=(5, 1)) >>> tree_attribute = { ... "children_left": np.array([1, 2, -1, -1, -1]), ... "children_right": np.array([4, 3, -1, -1, -1]), ... "feature": np.array([0, 1, -2, -2, -2]), ... "threshold": np.array([5.5, 4.5, -2.0, -2.0, -2.0]), ... "value": np.array([[15.6], [11.25], [10.0], [15.0], [33.0]]), ... "capacity": 5, ... "n_features": 2, ... } >>> tree = gp.formulations.DecisionTreeStruct(**tree_attribute) >>> dt_model = gp.formulations.RegressionTree(m, tree) >>> x = gp.Variable(m, "x", domain=dim((5, 2)), type="positive") >>> x.up[:, :] = 10 >>> y, eqns = dt_model(x) >>> set_of_samples = y.domain[0] >>> set_of_samples.name 'DenseDim5_1' """ def __init__( self, container: gp.Container, regressor: DecisionTreeRegressor | DecisionTreeStruct, name_prefix: str | None = None, ): if not isinstance(container, gp.Container): raise ValidationError(f"{container} is not a gp.Container.") if isinstance(regressor, DecisionTreeStruct): self._initialize_from_decision_tree_struct(regressor=regressor) else: try: sklearn_tree = importlib.import_module("sklearn.tree") if isinstance(regressor, sklearn_tree.DecisionTreeRegressor): self._initialize_from_sklearn(regressor=regressor) else: raise ValidationError( f"{regressor} must be an instance of either >sklearn.tree.DecisionTreeRegressor< or >DecisionTreeStruct<" ) except ModuleNotFoundError: raise ValidationError( ">sklearn.tree< module not found." ) from None self._check_tree() self.container = container if name_prefix is None: name_prefix = str(uuid.uuid4()).split("-")[0] self._name_prefix = name_prefix def _initialize_from_sklearn(self, regressor: DecisionTreeRegressor): """ Initializes the tree attributes from a fitted scikit-learn DecisionTreeRegressor. """ _trained = hasattr(regressor, "tree_") and regressor.tree_ is not None if not _trained: raise ValidationError( f"{regressor} is not fitted or tree_ attribute is missing." ) self.children_left = regressor.tree_.children_left self.children_right = regressor.tree_.children_right self.feature = regressor.tree_.feature self.threshold = regressor.tree_.threshold self.value = regressor.tree_.value[:, :, 0] self.capacity = regressor.tree_.capacity self.n_features = regressor.tree_.n_features def _initialize_from_decision_tree_struct( self, regressor: DecisionTreeStruct ): """ Initializes the tree attributes from DecisionTreeStruct. """ self.children_left = regressor.children_left self.children_right = regressor.children_right self.feature = regressor.feature self.threshold = regressor.threshold self.value = regressor.value self.capacity = regressor.capacity self.n_features = regressor.n_features def _check_tree(self): """ Validates that the core tree attributes are populated and have valid basic properties. Raises: ValidationError: If any essential attribute is empty or invalid. """ if self.children_left.size == 0: raise ValidationError("Attribute 'children_left' is empty.") if self.children_right.size == 0: raise ValidationError("Attribute 'children_right' is empty.") if self.feature.size == 0: raise ValidationError("Attribute 'feature' is empty.") if self.threshold.size == 0: raise ValidationError("Attribute 'threshold' is empty.") if self.value.size == 0: raise ValidationError("Attribute 'value' is empty.") if self.n_features <= 0: raise ValidationError("'n_features' must be a positive integer.") if self.capacity <= 0: raise ValidationError("'capacity' must be a positive integer.") def _node_bounds(self): """Traverse the tree using DFS and extract bound information for each node.""" node_lb = np.empty((self.n_features, self.capacity)) node_lb.fill(-np.inf) node_ub = np.empty((self.n_features, self.capacity)) node_ub.fill(np.inf) stack = [0] while stack: node = stack.pop() left = self.children_left[node] if left < 0: continue right = self.children_right[node] node_lb[:, left] = node_lb[:, node] node_ub[:, left] = node_ub[:, node] node_lb[:, right] = node_lb[:, node] node_ub[:, right] = node_ub[:, node] node_ub[self.feature[node], left] = self.threshold[node] node_lb[self.feature[node], right] = self.threshold[node] stack.extend((right, left)) return node_lb, node_ub def _yield_call( self, input: gp.Parameter | gp.Variable, M: float | None = None, ): leafs = self.children_left < 0 leafs = leafs.nonzero()[0] nleafs = len(leafs) output_dim = self.value.shape[-1] node_lb, node_ub = self._node_bounds() if not isinstance(input, (gp.Variable, gp.Parameter)): raise ValidationError( "Input must be of either type gp.Parameter | gp.Variable" ) if len(input.domain) != 2: raise ValidationError( f"input expected to be in shape (n_samples, {self.n_features})" ) if len(input.domain[-1]) != self.n_features: raise ValidationError("number of features do not match") if M is not None and not isinstance(M, (float, int)): raise ValidationError("M can either be of type float or int") set_of_samples: gp.Set = input.domain[0] set_of_features: gp.Set = input.domain[-1] [ set_of_output_dim, set_of_lb_ub, set_of_leafs, ] = gp.math._generate_dims( self.container, dims=[output_dim, 2, nleafs] ) [ set_of_samples, set_of_features, set_of_output_dim, set_of_lb_ub, set_of_leafs, ] = utils._next_domains( [ set_of_samples, set_of_features, set_of_output_dim, set_of_lb_ub, set_of_leafs, ], [], ) lb = ge = "0" ub = le = "1" cons_type = set_of_lb_ub feat_par = gp.Parameter._constructor_bypass( self.container, name=utils._generate_name("p", self._name_prefix, "feat_par"), domain=[set_of_features, set_of_leafs, set_of_lb_ub], ) out = gp.Variable._constructor_bypass( self.container, name=utils._generate_name("v", self._name_prefix, "output"), domain=[set_of_samples, set_of_output_dim], ) yield out feat_vars = gp.Variable._constructor_bypass( self.container, name=utils._generate_name("v", self._name_prefix, "feature"), domain=[set_of_samples, set_of_features], ) indicator_vars = gp.Variable._constructor_bypass( self.container, name=utils._generate_name("v", self._name_prefix, "indicator"), type="binary", domain=[set_of_samples, set_of_leafs], description="indicator variable for each leaf for each sample", ) assign_one_output = gp.Equation._constructor_bypass( self.container, name=utils._generate_name( "e", self._name_prefix, "assign_one_output" ), domain=[set_of_samples], description="Activate only one leaf per sample", ) out_link = gp.Parameter._constructor_bypass( self.container, name=utils._generate_name( "p", self._name_prefix, "predicted_value" ), domain=[set_of_leafs, set_of_output_dim], ) link_indicator_output = gp.Equation._constructor_bypass( self.container, name=utils._generate_name( "e", self._name_prefix, "link_indicator_output" ), domain=[set_of_samples, set_of_output_dim], description="Link the indicator variable to the predicted value of the decision tree", ) max_out = gp.Parameter._constructor_bypass( self.container, name=utils._generate_name("p", self._name_prefix, "max_out"), domain=[set_of_output_dim], ) min_out = gp.Parameter._constructor_bypass( self.container, name=utils._generate_name("p", self._name_prefix, "min_out"), domain=[set_of_output_dim], ) ub_output = gp.Equation._constructor_bypass( self.container, name=utils._generate_name("e", self._name_prefix, "ub_output"), domain=[set_of_samples, set_of_output_dim], description="Output cannot be more than the maximum of predicted value", ) lb_output = gp.Equation._constructor_bypass( self.container, name=utils._generate_name("e", self._name_prefix, "lb_output"), domain=[set_of_samples, set_of_output_dim], description="Output cannot be less than the minimum of predicted value", ) uni_domain = [set_of_samples, set_of_features, set_of_leafs] s = gp.Set._constructor_bypass( self.container, name=utils._generate_name( "s", self._name_prefix, "subset_of_paths" ), description="Dynamic subset of possible paths", domain=uni_domain, ) feat_thresh = gp.Parameter._constructor_bypass( self.container, name=utils._generate_name( "p", self._name_prefix, "feature_threshold" ), description="feature splitting value", domain=uni_domain + [cons_type], ) _bound_big_m = gp.Parameter._constructor_bypass( self.container, name=utils._generate_name("p", self._name_prefix, "bound_big_m"), description="bound value for big-M", domain=uni_domain + [cons_type], ) ge_cons = gp.Equation._constructor_bypass( self.container, name=utils._generate_name( "e", self._name_prefix, "link_indicator_feature_ge" ), domain=uni_domain, description="Link the indicator variable with the feature which is Lower bounded using a big-M constraint", ) le_cons = gp.Equation._constructor_bypass( self.container, name=utils._generate_name( "e", self._name_prefix, "link_indicator_feature_le" ), domain=uni_domain, description="Link the indicator variable with the feature which is Upper bounded using a big-M constraint", ) if isinstance(input, gp.Variable): feat_vars = input else: self.container._add_statement( expression.Expression(feat_vars.fx[...], "=", input[...]) ) definition = expression.Expression( assign_one_output[set_of_samples], "..", ( gp.Sum( set_of_leafs, indicator_vars[set_of_samples, set_of_leafs] ) == 1 ), ) self.container._add_statement(definition) assign_one_output._definition = definition idx, jdx = np.indices((nleafs, output_dim), dtype=int) mapped_values = self.value[leafs[:, None], jdx] definition = expression.Expression( link_indicator_output[set_of_samples, set_of_output_dim], "..", ( gp.Sum( set_of_leafs, out_link[set_of_leafs, set_of_output_dim] * indicator_vars[set_of_samples, set_of_leafs], ) == out ), ) self.container._add_statement(definition) link_indicator_output._definition = definition definition = expression.Expression( ub_output[...], "..", out <= max_out ) self.container._add_statement(definition) ub_output._definition = definition definition = expression.Expression( lb_output[...], "..", out >= min_out ) lb_output._definition = definition self.container._add_statement(definition) set_records_dict = { out_link: [ (int(i), int(j), v) for i, j, v in np.stack( (idx, jdx, mapped_values), axis=-1 ).reshape(-1, 3) ], feat_par: np.stack([node_lb, node_ub], axis=-1)[:, leafs, :], max_out: np.max(self.value[leafs, :], axis=0), min_out: np.min(self.value[leafs, :], axis=0), } yield [set_of_output_dim, set_records_dict] ### This generates the set of possible paths given the input data mask = (feat_vars.up[...] >= feat_par[..., lb]) & ( feat_vars.lo[...] <= feat_par[..., ub] ) mask_lo = ( (feat_vars.lo[...] < feat_par[..., lb]) & mask & (feat_par[..., lb] > -np.inf) ) mask_up = ( (feat_vars.up[...] > feat_par[..., ub]) & mask & (feat_par[..., ub] < np.inf) ) s[...].where[mask_lo | mask_up] = True self.container._add_statement( expression.Expression( feat_thresh[..., ge].where[mask_lo], "=", feat_par[..., lb], ) ) self.container._add_statement( expression.Expression( feat_thresh[..., le].where[mask_up], "=", feat_par[..., ub], ) ) self.container._add_statement( expression.Expression( indicator_vars.fx[set_of_samples, set_of_leafs].where[ gp.Sum(set_of_features, ~mask) ], "=", 0, ) ) self.container._synch_with_gams() self.container._add_statement( expression.Expression( _bound_big_m[s[uni_domain], ge].where[mask_lo], "=", ( M if M else ( feat_par[set_of_features, set_of_leafs, lb] - feat_vars.lo[set_of_samples, set_of_features] ) ), ) ) self.container._add_statement( expression.Expression( _bound_big_m[s[uni_domain], le].where[mask_up], "=", ( M if M else ( feat_vars.up[set_of_samples, set_of_features] - feat_par[set_of_features, set_of_leafs, ub] ) ), ) ) self.container._add_statement( expression.Expression( _bound_big_m[...].where[gp.math.abs(_bound_big_m) == np.inf], "=", 1e10, ) ) definition = expression.Expression( ge_cons[uni_domain].where[ (feat_thresh[..., ge] != 0) & s[uni_domain] ], "..", feat_vars >= feat_thresh[..., ge] - _bound_big_m[..., ge] * (1 - indicator_vars), ) self.container._add_statement(definition) ge_cons._definition = definition le_cons[uni_domain].where[ (feat_thresh[..., le] != 0) & s[uni_domain] ] = feat_vars <= feat_thresh[..., le] + _bound_big_m[..., le] * ( 1 - indicator_vars ) eqns = [ assign_one_output, link_indicator_output, ub_output, lb_output, ge_cons, le_cons, ] yield eqns
[docs] def __call__( self, input: gp.Parameter | gp.Variable, M: float | None = None, ) -> tuple[gp.Variable, list[gp.Equation]]: """ Generate output variable and equations required for embedding the regression tree. Parameters ---------- input : gp.Parameter | gp.Variable input for the regression tree, must be in shape (sample_size, number_of_features) M : float value for the big_M. By default, infer the value using the available bounds for variables. If the variable is unbounded, then default to 1e10. """ generator = self._yield_call(input, M) output = next(generator) _, set_values = next(generator) input.container.setRecords(set_values) eqs = next(generator) return output, eqs