Source code for gamspy.formulations.nn.rnn

from __future__ import annotations

from typing import TYPE_CHECKING, Literal

if TYPE_CHECKING:
    import numpy as np

    from gamspy import Parameter

import gamspy as gp
import gamspy.formulations.utils as utils
from gamspy.exceptions import ValidationError
from gamspy.formulations.result import FormulationResult
from gamspy.math import dim


[docs] class RNN: """ Formulation generator for Recurrent Neural Networks in GAMSPy. It can be used to embed trained Recurrent neural networks in your problem. Note: It currently does **NOT** support Bidirectional RNNs and Dropout layers. Parameters ---------- container : Container Container that will hold the new variables and equations. input_size : int The number of expected features in the input sequence. hidden_size : int The number of features in the hidden state. activation : Literal["tanh", "relu", "linear"] The activation function applied to the hidden state update. By default "tanh". Examples -------- >>> import gamspy as gp >>> import numpy as np >>> from gamspy.math import dim >>> m = gp.Container() >>> # 2 input features, 4 hidden units >>> rnn = gp.formulations.RNN(m, input_size=2, hidden_size=4) >>> w_ih = np.random.rand(4, 2) >>> w_hh = np.random.rand(4, 4) >>> b_ih = np.random.rand(4) >>> b_hh = np.random.rand(4) >>> rnn.load_weights(w_ih, w_hh, b_ih, b_hh) >>> batch, time_step, features = [1, 3, 2] >>> input_domain = dim([batch, time_step, features]) >>> x = gp.Parameter(m, name="x_in", domain=input_domain, records=np.random.rand(1, 3, 2)) >>> out_var = rnn(x).result >>> type(out_var) <class 'gamspy._symbols.variable.Variable'> >>> [d.name for d in out_var.domain] ['DenseDim1_1', 'DenseDim3_1', 'DenseDim4_1'] """ def __init__( self, container: gp.Container, input_size: int, hidden_size: int, activation: Literal["tanh", "relu", "linear"] = "tanh", ): if not isinstance(input_size, int) or input_size <= 0: raise ValidationError("input_size must be a positive integer") if not isinstance(hidden_size, int) or hidden_size <= 0: raise ValidationError("hidden_size must be a positive integer") if activation not in ["tanh", "relu", "linear"]: raise ValidationError( f"""activation must be either of ["tanh", "relu", "linear"] and not >{activation}<""" ) self.container = container self.input_size = input_size self.hidden_size = hidden_size self.activation = activation self._state = 0 self.weight_ih: Parameter | None = None self.weight_hh: Parameter | None = None self.bias_ih: Parameter | None = None self.bias_hh: Parameter | None = None
[docs] def load_weights( self, weight_ih: np.ndarray, weight_hh: np.ndarray, bias_ih: np.ndarray | None = None, bias_hh: np.ndarray | None = None, ) -> None: """ Mark RNN as parameter and load weights from NumPy arrays. Use this when you already have the weights of your hidden layers. Parameters ---------- weight_ih : np.ndarray The input-to-hidden layer weights. Shape: (hidden_size, input_size) weight_hh : np.ndarray The hidden-to-hidden layer weights. Shape: (hidden_size, hidden_size) bias_ih : np.ndarray | None The input-to-hidden layer bias. Shape: (hidden_size, ) bias_hh : np.ndarray | None The hidden-to-hidden layer bias. Shape: (hidden_size, ) """ import numpy as np if weight_ih.shape != (self.hidden_size, self.input_size): raise ValidationError( f"weight_ih shape mismatch: expected {(self.hidden_size, self.input_size)}" ) if weight_hh.shape != (self.hidden_size, self.hidden_size): raise ValidationError( f"weight_hh shape mismatch: expected {(self.hidden_size, self.hidden_size)}" ) # Create Parameters self.weight_ih = gp.Parameter( self.container, domain=dim(weight_ih.shape), records=weight_ih, ) assert self.weight_ih is not None H_set = self.weight_ih.domain[0] H_prev = gp.Alias( self.container, alias_with=H_set, ) self.weight_hh = gp.Parameter( self.container, domain=[H_set, H_prev], records=weight_hh, ) # Handle Biases (default to 0 if None) if bias_ih is None: bias_ih = np.zeros(self.hidden_size) elif bias_ih.shape != (self.hidden_size,): raise ValidationError( f"bias_ih shape mismatch: expected {(self.hidden_size,)}" ) if bias_hh is None: bias_hh = np.zeros(self.hidden_size) elif bias_hh.shape != (self.hidden_size,): raise ValidationError( f"bias_hh shape mismatch: expected {(self.hidden_size,)}" ) self.bias_ih = gp.Parameter( self.container, domain=dim([self.hidden_size]), records=bias_ih, ) self.bias_hh = gp.Parameter( self.container, domain=dim([self.hidden_size]), records=bias_hh, ) self.weight_ih_array = weight_ih self.weight_hh_array = weight_hh self.bias_ih_array = bias_ih self.bias_hh_array = bias_hh self._state = 1
def _propagate_bounds( self, input: gp.Variable, result: FormulationResult, h_out_shape: tuple, h0: gp.Parameter | None = None, ) -> tuple[np.ndarray | None, np.ndarray]: """ Sequentially calculates the tight lower and upper bounds for the RNN hidden state. """ import numpy as np x_bounds = gp.Parameter( self.container, domain=dim([2, *input.shape]), ) x_bounds[("0",) + tuple(input.domain)] = input.lo[...] x_bounds[("1",) + tuple(input.domain)] = input.up[...] result.parameters_created["input_bounds"] = x_bounds h0_bounds_array = None if h0 is not None: h0_bounds_array = h0.toDense() # If the bounds are all zeros (None in GAMSPy parameters) if x_bounds.records is None: x_lb = np.zeros(shape=input.shape) x_ub = np.zeros(shape=input.shape) else: x_lb, x_ub = x_bounds.toDense() batch_size, time_steps, _ = input.shape inf_exists = x_bounds.countNegInf() or x_bounds.countPosInf() if inf_exists: x_lb = utils._encode_infinity(x_lb) x_ub = utils._encode_infinity(x_ub) w_ih_pos = np.maximum(self.weight_ih_array, 0) w_ih_neg = np.minimum(self.weight_ih_array, 0) w_hh_pos = np.maximum(self.weight_hh_array, 0) w_hh_neg = np.minimum(self.weight_hh_array, 0) pre_act_lb_seq, pre_act_ub_seq = [], [] h_lb_seq, h_ub_seq = [], [] if h0_bounds_array is not None: prev_lb = h0_bounds_array prev_ub = h0_bounds_array else: prev_lb = np.zeros((batch_size, self.hidden_size)) prev_ub = np.zeros((batch_size, self.hidden_size)) for t in range(time_steps): x_lb_t = x_lb[:, t, :] x_ub_t = x_ub[:, t, :] in_lb = (x_lb_t @ w_ih_pos.T) + (x_ub_t @ w_ih_neg.T) + self.bias_ih_array in_ub = (x_ub_t @ w_ih_pos.T) + (x_lb_t @ w_ih_neg.T) + self.bias_ih_array if t == 0 and h0_bounds_array is None: hid_lb = self.bias_hh_array hid_ub = self.bias_hh_array else: hid_lb = ( (prev_lb @ w_hh_pos.T) + (prev_ub @ w_hh_neg.T) + self.bias_hh_array ) hid_ub = ( (prev_ub @ w_hh_pos.T) + (prev_lb @ w_hh_neg.T) + self.bias_hh_array ) pre_lb = in_lb + hid_lb pre_ub = in_ub + hid_ub # Decode before activation so Numpy does not evaluate wrong bounds # np.maximum(8, 2 + 1j) => 8 pre_lb_real = utils._decode_complex_array(pre_lb) pre_ub_real = utils._decode_complex_array(pre_ub) if self.activation == "relu": pre_act_lb_seq.append(pre_lb_real) pre_act_ub_seq.append(pre_ub_real) h_lb_real = np.maximum(0, pre_lb_real) h_ub_real = np.maximum(0, pre_ub_real) elif self.activation == "tanh": h_lb_real = np.tanh(pre_lb_real) h_ub_real = np.tanh(pre_ub_real) else: # linear h_lb_real = pre_lb_real h_ub_real = pre_ub_real h_lb_seq.append(h_lb_real) h_ub_seq.append(h_ub_real) # Re-encode for the next time step's matmul if inf_exists: prev_lb = utils._encode_infinity(h_lb_real) prev_ub = utils._encode_infinity(h_ub_real) else: prev_lb = h_lb_real prev_ub = h_ub_real # stacked arrays are Real if self.activation == "relu": pre_act_bounds = np.stack( [np.stack(pre_act_lb_seq, axis=1), np.stack(pre_act_ub_seq, axis=1)], axis=0, ) relu_bounds = gp.Parameter( self.container, domain=dim([2, *h_out_shape]), records=pre_act_bounds, ) result.parameters_created["relu_bounds"] = relu_bounds else: pre_act_bounds = None relu_bounds = None output_bounds = np.stack( [np.stack(h_lb_seq, axis=1), np.stack(h_ub_seq, axis=1)], axis=0 ) out_bounds = gp.Parameter( self.container, domain=dim([2, *h_out_shape]), records=output_bounds, ) result.parameters_created["out_bounds"] = out_bounds return relu_bounds, out_bounds
[docs] def __call__( self, input_seq: gp.Parameter | gp.Variable, h0: gp.Parameter | None = None, *, propagate_bounds: bool = True, ) -> FormulationResult: """ Forward pass your input sequence, generating the output hidden states and equations required for calculating the recurrent neural network steps over time. If `propagate_bounds` is True (default), the `input_seq` is of type variable, and `load_weights` was called, then the bounds of the input are propagated to the output. Returns `FormulationResult` which can be unpacked as a output variable and list of equations. FormulationResult: - equations_created: ["set_output", "set_pre_act", "y_gte_x", "y_lte_x_1", "y_lte_x_2"] - variables_created: ["output", "pre_act", "binary"] - parameters_created: ["w_ih", "w_hh", "b_ih", "b_hh", "input_bounds", "out_bounds", "relu_bounds"] Note: - The `output` variable will have the domain (batch, time_steps, hidden_size). - Following equations are available only when `activation="relu"`, ["set_pre_act", "y_gte_x", "y_lte_x_1", "y_lte_x_2"]. - Following variables are available only when `activation="relu"`, ["pre_act", "binary"]. - Following parameters are available only when `propagate_bounds=True`, ["input_bounds", "out_bounds", "relu_bounds"]. Further, `relu_bounds` is only available when `activation="relu"`. - For backward compatibility, this result object can be unpacked as a tuple: `output, equations = rnn_layer(input_seq)`. Parameters ---------- input_seq : gp.Parameter | gp.Variable Input sequence to the RNN layer. It must be a 3D symbol of the following shape (batch_size, time_steps, input_features). h0 : gp.Parameter | None Initial hidden state for the first time step. If None, the initial hidden state is assumed to be a vector of zeros. By default None. Shape: (batch, hidden_size) propagate_bounds : bool = True If True, propagate bounds of the input to the output. Otherwise, the output variable is unbounded. Returns ------- FormulationResult """ if self._state != 1: raise ValidationError("Call load_weights before generating formulation.") if not isinstance(propagate_bounds, bool): raise ValidationError("propagate_bounds should be a boolean.") assert self.weight_ih is not None assert self.weight_hh is not None assert self.bias_ih is not None assert self.bias_hh is not None if len(input_seq.domain) != 3: raise ValidationError( f"Expected 3D input (batch, time_step, feature), got {len(input_seq.domain)}" ) if len(input_seq.domain[-1]) != self.weight_ih.shape[-1]: raise ValidationError( f"Last dimension of Input sequence does not match. Expected {self.weight_ih.shape[-1]}, got {len(input_seq.domain[-1])}." ) if h0 is not None: expected = (len(input_seq.domain[0]), self.weight_hh.shape[-1]) actual = tuple(len(i) for i in h0.domain) if expected != actual: raise ValidationError( f"h0 shape mismatch: expected {expected}, got {actual}" ) linear_input = input_seq @ self.weight_ih.t() linear_input = linear_input + self.bias_ih[linear_input.domain[-1]] h_out = gp.Variable( self.container, domain=linear_input.domain, ) N_set, T_set, H_set = h_out.domain _, H_prev = self.weight_hh.domain if len(T_set) == 1: linear_hidden = self.bias_hh[H_set] if h0 is not None: h0_effect = gp.Sum( H_prev, h0[N_set, H_prev] * self.weight_hh[H_set, H_prev] ) linear_hidden = linear_hidden + h0_effect else: linear_hidden = ( gp.Sum( H_prev, h_out[N_set, T_set.lag(1), H_prev] * self.weight_hh[H_set, H_prev], ) + self.bias_hh[H_set] ) if h0 is not None: h0_effect = gp.Sum( H_prev, h0[N_set, H_prev] * self.weight_hh[H_set, H_prev] ) linear_hidden = linear_hidden + h0_effect.where[gp.Ord(T_set) == 1] total_pre_activation = linear_input + linear_hidden result = FormulationResult(result=h_out, equations_created={}) result.parameters_created.update( { "w_ih": self.weight_ih, "w_hh": self.weight_hh, "b_ih": self.bias_ih, "b_hh": self.bias_hh, } ) pre_act_bounds = None if propagate_bounds and isinstance(input_seq, gp.Variable): pre_act_bounds, output_bounds = self._propagate_bounds( input=input_seq, result=result, h_out_shape=h_out.shape, h0=h0, ) h_out.lo[...] = output_bounds[("0",) + tuple(h_out.domain)] h_out.up[...] = output_bounds[("1",) + tuple(h_out.domain)] if self.activation == "tanh": rnn_eq = gp.Equation( self.container, domain=h_out.domain, ) rnn_eq[...] = h_out == gp.math.tanh(total_pre_activation) result.equations_created["set_output"] = rnn_eq result.variables_created["output"] = h_out elif self.activation == "relu": from gamspy.math.activation import relu_with_binary_var pre_act_var = gp.Variable( self.container, domain=h_out.domain, ) if pre_act_bounds is not None: pre_act_var.lo[...] = pre_act_bounds[("0",) + tuple(h_out.domain)] pre_act_var.up[...] = pre_act_bounds[("1",) + tuple(h_out.domain)] pre_act_eq = gp.Equation( self.container, domain=h_out.domain, ) pre_act_eq[...] = pre_act_var == total_pre_activation relu_res = relu_with_binary_var(pre_act_var) link_eq = gp.Equation( self.container, domain=h_out.domain, ) link_eq[...] = h_out == relu_res.result result.equations_created.update( { "set_pre_act": pre_act_eq, "set_output": link_eq, **relu_res.equations_created, } ) result.variables_created.update( {"output": h_out, "pre_act": pre_act_var, **relu_res.variables_created} ) else: # linear rnn_eq = gp.Equation( self.container, domain=h_out.domain, ) rnn_eq[...] = h_out == total_pre_activation result.equations_created["set_output"] = rnn_eq result.variables_created["output"] = h_out return result
def __str__(self) -> str: return ( "RNN(\n" f" input_size={self.input_size}\n" f" hidden_size={self.hidden_size}\n" f" activation={self.activation}\n" f" weights_loaded={'True' if self._state == 1 else 'False'}\n)" )