Source code for gamspy.formulations.nn.conv2d

from __future__ import annotations

from typing import TYPE_CHECKING, Literal

import numpy as np
from numpy.lib.stride_tricks import sliding_window_view

import gamspy as gp
import gamspy.formulations.nn.utils as utils
from gamspy.exceptions import ValidationError
from gamspy.math import dim

if TYPE_CHECKING:
    from gamspy import Parameter, Variable


[docs] class Conv2d: """ Formulation generator for 2D Convolution symbol in GAMS. It can be used to embed convolutional layers of trained neural networks in your problem. It can also be used to embed convolutional layers when you need weights as variables. Parameters ---------- container : Container Container that will contain the new variable and equations. in_channel : int Number of channels in the input out_channel : int Number of channels in the output kernel_size : int | tuple[int, int] Filter size stride : int | tuple[int, int] Stride in the convolution, by default 1 padding : int | tuple[int, int] | Literal["same", "valid"] Specifies the amount of padding to apply to the input, by default 0. If an integer is provided, that padding is applied to both the height and width. If a tuple of two integers is given, the first value determines the padding for the top and bottom, while the second value sets the padding for the left and right. It is also possible to provide string literals "same" and "valid". "same" pads the input so the output has the shape as the input. "valid" is the same as no padding. bias : bool Is bias added after the convolution, by default True name_prefix : str | None Prefix for names of the GAMS symbols generated, by default None which means random prefix. Using same name_prefix in different formulations causes name conflicts. Do not use same name_prefix again. Examples -------- >>> import gamspy as gp >>> import numpy as np >>> from gamspy.math import dim >>> w1 = np.random.rand(2, 1, 3, 3) >>> b1 = np.random.rand(2) >>> m = gp.Container() >>> # in_channels=1, out_channels=2, kernel_size=3x3 >>> conv1 = gp.formulations.Conv2d(m, 1, 2, 3) >>> conv1.load_weights(w1, b1) >>> # 10 images, 1 channel, 24 by 24 >>> inp = gp.Variable(m, domain=dim((10, 1, 24, 24))) >>> out, eqs = conv1(inp) >>> type(out) <class 'gamspy._symbols.variable.Variable'> >>> [len(x) for x in out.domain] [10, 2, 22, 22] """ def __init__( self, container: gp.Container, in_channels: int, out_channels: int, kernel_size: int | tuple[int, int], stride: int | tuple[int, int] = 1, padding: int | tuple[int, int] | Literal["same", "valid"] = 0, bias: bool = True, name_prefix: str | None = None, ): if not (isinstance(in_channels, int) and in_channels > 0): raise ValidationError("in_channels must be a positive integer") if not (isinstance(out_channels, int) and out_channels > 0): raise ValidationError("out_channels must be a positive integer") _kernel_size = utils._check_tuple_int(kernel_size, "kernel_size") _stride = utils._check_tuple_int(stride, "stride") if isinstance(padding, str): if padding not in {"same", "valid"}: raise ValidationError( "padding must be 'same' or 'valid' when it is a string" ) if padding == "same" and _stride != (1, 1): raise ValidationError( "'same' padding can only be used with stride=1" ) _padding: tuple[int, int, int, int] | str = ( (0, 0, 0, 0) if padding == "valid" else "same" ) else: _padding = utils._check_padding(padding) if not isinstance(bias, bool): raise ValidationError("bias must be a boolean") self.container = container self.in_channels = in_channels self.out_channels = out_channels self.kernel_size = _kernel_size self.stride = _stride self.padding = _padding self.use_bias = bias self._state = 0 self.weight: Parameter | Variable | None = None self.weight_array = None self.bias: Parameter | Variable | None = None self.bias_array = None if name_prefix is None: name_prefix = gp.utils._get_unique_name() self._name_prefix = name_prefix def _encode_infinity(self, x): """ Encode infinity values as complex numbers. - Replace -np.inf with 0 - 1j. - Replace np.inf with 0 + 1j. """ x = np.where(x == -np.inf, 0 - 1j, x) x = np.where(x == np.inf, 0 + 1j, x) return x def _decode_complex_array(self, z: np.ndarray) -> np.ndarray: real = z.real.copy() real[z.imag > 0] = np.inf real[z.imag < 0] = -np.inf return real def _propagate_bounds(self, input, output, weight, bias, stride, padding): # Extract input bounds input_bounds = gp.Parameter( self.container, domain=dim([2, *input.shape]), name=utils._generate_name("p", self._name_prefix, "input_bounds"), ) # lower bounds input_bounds[("0",) + tuple(input.domain)] = input.lo[...] # upper bounds input_bounds[("1",) + tuple(input.domain)] = input.up[...] # If the bounds are all zeros (None in GAMSPy parameters); # we skip matrix multiplication as it will result in zero values if input_bounds.records is None: out_bounds_array = np.zeros(output.shape) if self.use_bias: b = self.bias_array[:, np.newaxis, np.newaxis] out_bounds_array = out_bounds_array + b out_bounds = gp.Parameter( self.container, domain=dim(output.shape), records=out_bounds_array, name=utils._generate_name( "p", self._name_prefix, "output_bounds" ), ) output.lo[...] = out_bounds output.up[...] = out_bounds return input_lower, input_upper = input_bounds.toDense() # Check if the input bounds contain (-)infinity values inf_exists = input_bounds.countNegInf() or input_bounds.countPosInf() out_arr_dtype = np.complex128 if inf_exists else np.float64 if inf_exists: # Encode infinity values as complex numbers input_lower = self._encode_infinity(input_lower) input_upper = self._encode_infinity(input_upper) batch, out_channels, h_out, w_out = output.shape _, in_channels, h_k, w_k = weight.shape stride_y, stride_x = stride pad_top, pad_left, pad_bottom, pad_right = padding # if any side of the padding is non-zero, we need to pad the input if any(padding): # Pad the input bounds input_lower = np.pad( input_lower, ((0, 0), (0, 0), (pad_top, pad_bottom), (pad_left, pad_right)), mode="constant", constant_values=0, ) input_upper = np.pad( input_upper, ((0, 0), (0, 0), (pad_top, pad_bottom), (pad_left, pad_right)), mode="constant", constant_values=0, ) # ----- Sliding window view ----- # Create sliding windows for the input, where each window has the same shape as the kernel. # This is done separately for the lower and upper bounds of the input. # Then, downsample the sliding windows by selecting every `stride_y` step along the height (vertical) axis # and every `stride_x` step along the width (horizontal) axis. This reduces the number of windows # by skipping positions based on the user-specified strides. # # After slicing, the axes are transposed to reorder the dimensions. Specifically: # - The original window positions (height and width axes) are moved to the end. # - The window content (channel and spatial dimensions) is brought forward. # # This ensures that the windows are processed in a left-to-right, top-to-bottom order, # prioritizing horizontal traversal first, which is the desired behavior. windows_lower = sliding_window_view( input_lower, (batch, in_channels, h_k, w_k) ) windows_upper = sliding_window_view( input_upper, (batch, in_channels, h_k, w_k) ) windows_lower = windows_lower[ :, :, ::stride_y, ::stride_x, :, :, : ].transpose(0, 1, 4, 2, 3, 5, 6, 7) windows_upper = windows_upper[ :, :, ::stride_y, ::stride_x, :, :, : ].transpose(0, 1, 4, 2, 3, 5, 6, 7) # # Reshape windows for vectorized computation windows_lower = windows_lower.reshape( batch, h_out, w_out, in_channels, h_k, w_k ) windows_upper = windows_upper.reshape( batch, h_out, w_out, in_channels, h_k, w_k ) # Split weights into positive and negative parts pos_weight = np.maximum(weight, 0) neg_weight = np.minimum(weight, 0) # Initialize output bounds output_lower = np.zeros( (batch, out_channels, h_out, w_out), dtype=out_arr_dtype ) output_upper = np.zeros( (batch, out_channels, h_out, w_out), dtype=out_arr_dtype ) if bias is None: bias = np.zeros(out_channels) for c_out in range(out_channels): # Lower bound: sum(input_lower * pos_weight + input_upper * neg_weight) output_lower[:, c_out, :, :] = ( np.sum( windows_lower * pos_weight[c_out, np.newaxis, np.newaxis, :, :, :], axis=(3, 4, 5), ) + np.sum( windows_upper * neg_weight[c_out, np.newaxis, np.newaxis, :, :, :], axis=(3, 4, 5), ) + bias[c_out] ) # Upper bound: sum(input_lower * neg_weight + input_upper * pos_weight) output_upper[:, c_out, :, :] = ( np.sum( windows_lower * neg_weight[c_out, np.newaxis, np.newaxis, :, :, :], axis=(3, 4, 5), ) + np.sum( windows_upper * pos_weight[c_out, np.newaxis, np.newaxis, :, :, :], axis=(3, 4, 5), ) + bias[c_out] ) if inf_exists: # Decode complex numbers back to real numbers if infinity values were used output_lower = self._decode_complex_array(output_lower) output_upper = self._decode_complex_array(output_upper) out_bounds_array = np.stack([output_lower, output_upper], axis=0) out_bounds = gp.Parameter( self.container, domain=dim([2, *output.shape]), records=out_bounds_array, name=utils._generate_name("p", self._name_prefix, "output_bounds"), ) output.lo[...] = out_bounds[("0",) + tuple(output.domain)] output.up[...] = out_bounds[("1",) + tuple(output.domain)]
[docs] def make_variable(self) -> None: """ Mark Conv2d as variable. After this is called `load_weights` cannot be called. Use this when you need to learn the weights of your convolutional layer in your optimization model. This does not initialize the weights, it is highly recommended that you set initial values to `weight` and `bias` variables. """ if self._state == 1: raise ValidationError( "make_variable cannot be used after calling load_weights" ) expected_shape = ( self.out_channels, self.in_channels, self.kernel_size[0], self.kernel_size[1], ) if self.weight is None: self.weight = gp.Variable( self.container, domain=dim(expected_shape), name=utils._generate_name("v", self._name_prefix, "weight"), ) if self.use_bias and self.bias is None: self.bias = gp.Variable( self.container, domain=dim([self.out_channels]), name=utils._generate_name("v", self._name_prefix, "bias"), ) self._state = 2
[docs] def load_weights( self, weight: np.ndarray, bias: np.ndarray | None = None ) -> None: """ Mark Conv2d as parameter and load weights from NumPy arrays. After this is called `make_variable` cannot be called. Use this when you already have the weights of your convolutional layer. Parameters ---------- weight : np.ndarray Conv2d layer weights in shape (out_channels x in_channels x kernel_size[0] x kernel_size[1]) bias : np.ndarray | None Conv2d layer bias in shape (out_channels, ), only required when bias=True during initialization """ if self._state == 2: raise ValidationError( "load_weights cannot be used after calling make_variable" ) if self.use_bias is False and bias is not None: raise ValidationError( "bias must be None since bias was set to False during initialization" ) if self.use_bias is True and bias is None: raise ValidationError("bias must be provided") if len(weight.shape) != 4: raise ValidationError( f"expected 4D input for weight (got {len(weight.shape)}D input)" ) expected_shape = ( self.out_channels, self.in_channels, self.kernel_size[0], self.kernel_size[1], ) if weight.shape != expected_shape: raise ValidationError(f"weight expected to be {expected_shape}") if bias is not None: if len(bias.shape) != 1: raise ValidationError( f"expected 1D input for bias (got {len(bias.shape)}D input)" ) if bias.shape != (self.out_channels,): raise ValidationError( f"bias expected to be ({self.out_channels},)" ) if self.weight is None: self.weight = gp.Parameter( self.container, domain=dim(expected_shape), records=weight, name=utils._generate_name("p", self._name_prefix, "weight"), ) else: self.weight.setRecords(weight) self.weight_array = weight if self.use_bias: if self.bias is None: self.bias = gp.Parameter( self.container, domain=dim([self.out_channels]), records=bias, name=utils._generate_name("p", self._name_prefix, "bias"), ) else: self.bias.setRecords(bias) self.bias_array = bias self._state = 1
[docs] def __call__( self, input: gp.Parameter | gp.Variable, propagate_bounds: bool = True ) -> tuple[gp.Variable, list[gp.Equation]]: """ Forward pass your input, generate output and equations required for calculating the convolution. If `propagate_bounds` is True, the `input` is of type variable, and `load_weights` was called, then the bounds of the input are propagated to the output. Parameters ---------- input : gp.Parameter | gp.Variable input to the conv layer, must be in shape (batch x in_channels x height x width) propagate_bounds : bool = True If True, propagate bounds of the input to the output. Otherwise, the output variable is unbounded. """ if not isinstance(propagate_bounds, bool): raise ValidationError("Expected a boolean for propagate_bounds") if self.weight is None: raise ValidationError( "You must call load_weights or make_variable first before using the Conv2d" ) if len(input.domain) != 4: raise ValidationError( f"expected 4D input (got {len(input.domain)}D input)" ) N, C_in, H_in, W_in = input.domain if len(C_in) != self.in_channels: raise ValidationError("in_channels does not match") h_in = len(H_in) w_in = len(W_in) h_out, w_out = utils._calc_hw( self.padding, self.kernel_size, self.stride, h_in, w_in ) out = gp.Variable( self.container, domain=dim([len(N), self.out_channels, h_out, w_out]), name=utils._generate_name("v", self._name_prefix, "output"), ) N, C_out, H_out, W_out = out.domain set_out = gp.Equation( self.container, domain=out.domain, name=utils._generate_name("e", self._name_prefix, "set_output"), ) if isinstance(self.padding, str): padding = utils._calc_same_padding(self.kernel_size) else: padding = self.padding # expr must have domain N, C_out, H_out, W_out top_index = (self.stride[0] * (gp.Ord(H_out) - 1)) - padding[0] + 1 left_index = (self.stride[1] * (gp.Ord(W_out) - 1)) - padding[1] + 1 _, _, Hf, Wf = self.weight.domain C_in, Hf, Wf, H_in, W_in = utils._next_domains( [C_in, Hf, Wf, H_in, W_in], out.domain, ) subset = gp.Set( self.container, domain=[H_out, W_out, Hf, Wf, H_in, W_in], name=utils._generate_name("s", self._name_prefix, "conv_subset"), ) subset[ H_out, W_out, Hf, Wf, H_in, W_in, ].where[ (gp.Ord(H_in) == (top_index + gp.Ord(Hf) - 1)) & (gp.Ord(W_in) == (left_index + gp.Ord(Wf) - 1)) ] = True expr = gp.Sum( [C_in], gp.Sum( subset[H_out, W_out, Hf, Wf, H_in, W_in], input[N, C_in, H_in, W_in] * self.weight[C_out, C_in, Hf, Wf], ), ) if self.use_bias: assert self.bias is not None expr = expr + self.bias[C_out] set_out[N, C_out, H_out, W_out] = out[N, C_out, H_out, W_out] == expr # If propagate_bounds is True, weight is a parameter and input is a variable, # we will propagate the bounds of the input to the output if ( propagate_bounds and self._state == 1 and isinstance(input, gp.Variable) ): self._propagate_bounds( input, out, self.weight_array, self.bias_array, self.stride, padding, ) return out, [set_out]