Updating and Projecting Coefficients: The RAS Approach (IOBALANCE)#

`iobalance.py`

```"""
## GAMSSOURCE: https://www.gams.com/latest/gamslib_ml/libhtml/gamslib_iobalance.html
## MODELTYPE: LP, NLP, QCP
## KEYWORDS: linear programming, nonlinear programming, quadratic constraints, statistics, RAS approach

Updating and Projecting Coefficients: The RAS Approach (IOBALANCE)

The RAS procedure (named after Richard A. Stone) is an iterative procedure to
update matrices. This numerical example is taken from chapter 7.4.2 of Miller
and Blair. Several additional optimization formulations will be applied to
this toy problem.

Miller R E, and Blair P D, Input-Output Analysis: Foundations and Extensions,
Cambridge University Press, New York, 2009.
"""

from __future__ import annotations

import os

import gamspy.math as gams_math
import numpy as np
from gamspy import (
Alias,
Card,
Container,
Equation,
Model,
Parameter,
Problem,
Sense,
Set,
Smax,
Sum,
Variable,
)

def main():
m = Container(
system_directory=os.getenv("SYSTEM_DIRECTORY", None),
)

# Sets

i = Set(m, name="i", records=list(range(1, 4)))
j = Alias(m, name="j", alias_with=i)

# Parameters

a0 = Parameter(
m,
name="a0",
domain=[i, j],
records=np.array(
[
[0.120, 0.100, 0.049],
[0.210, 0.247, 0.265],
[0.026, 0.249, 0.145],
]
),
description="known base matrix",
)

z1 = Parameter(
m,
name="z1",
domain=[i, j],
records=np.array(
[
[98, 72, 75],
[65, 8, 63],
[88, 27, 44],
]
),
description="unknown industry flows",
)

x = Parameter(
m,
name="x",
domain=j,
records=np.array([421, 284, 283]),
description="observed total output",
)
u = Parameter(m, name="u", domain=i, description="observed row totals")
v = Parameter(m, name="v", domain=j, description="observed column totals")
a1 = Parameter(m, name="ai", domain=[i, j], description="unknown matrix A")

u[i] = Sum(j, z1[i, j])
v[j] = Sum(i, z1[i, j])

a1[i, j] = z1[i, j] / x[j]

print("Values of u \n", u.records)
print("Values of v \n", v.records)
print("Values of a1 \n", a1.records)

# --- 1: RAS updating

r = Parameter(m, name="r", domain=i, description="row adjustment")
s = Parameter(m, name="s", domain=j, description="column adjustment")

r[i] = 1
s[j] = 1

oldr = Parameter(m, name="oldr", domain=i)
olds = Parameter(m, name="olds", domain=j)
maxdelta = Parameter(m, name="maxdelta")
maxdelta[...] = 1

while True:
oldr[i] = r[i]
olds[j] = s[j]
r[i] = r[i] * u[i] / Sum(j, r[i] * a0[i, j] * x[j] * s[j])
s[j] = s[j] * v[j] / Sum(i, r[i] * a0[i, j] * x[j] * s[j])
maxdelta[...] = gams_math.Max(
Smax(i, gams_math.abs(oldr[i] - r[i])),
Smax(j, gams_math.abs(olds[j] - s[j])),
)
print("In loop maxdelta: ", round(maxdelta.records.values[0][0], 3))
if maxdelta.records.values[0][0] < 0.005:
break

# Parameter
report = Parameter(
m, name="report", domain=["*", i, j], description="summary report"
)
# option report:3:1:2

report["A0", i, j] = a0[i, j]
report["A1", i, j] = a1[i, j]
report["RAS", i, j] = r[i] * a0[i, j] * s[j]

# --- 2: Entropy formulation   a*ln(a/a0)
#        The RAS procedure gives the solution to the Entropy formulation
# Variable
obj = Variable(m, name="obj", type="free", description="objective value")
a = Variable(
m,
name="a",
type="positive",
domain=[i, j],
description="estimated A matrix",
)

# Equation
rowbal = Equation(m, name="rowbal", domain=i, description="row totals")
colbal = Equation(m, name="colbal", domain=j, description="column totals")
defobjent = Equation(m, name="defobjent", description="entropy definition")

rowbal[i] = Sum(j, a[i, j] * x[j]) == u[i]

colbal[j] = Sum(i, a[i, j] * x[j]) == v[j]

defobjent[...] = obj == Sum(
[i, j], x[j] * a[i, j] * gams_math.log(a[i, j] / a0[i, j])
)

mEntropy = Model(
m,
name="mEntropy",
equations=[rowbal, colbal, defobjent],
problem=Problem.NLP,
sense=Sense.MIN,
objective=obj,
)

# we need to exclude small values to avoid domain violations
a.lo[i, j] = 1e-5

mEntropy.solve()
report["Entropy", i, j] = a.l[i, j]

# --- 3: Entropy with flow variable
#        we can balance the flow matrix instead of the A matrix
zv = Variable(
m, name="zv", type="free", domain=[i, j], description="industry flows"
)

rowbalz = Equation(m, name="rowbalz", domain=i, description="row totals")
colbalz = Equation(
m, name="colbalz", domain=j, description="column totals tive"
)
defobjentz = Equation(
m, name="defobjentz", description="entropy objective using flows"
)

rowbalz[i] = Sum(j, zv[i, j]) == u[i]

colbalz[j] = Sum(i, zv[i, j]) == v[j]

zbar = Parameter(
m, name="zbar", domain=[i, j], description="reference flow"
)

zbar[i, j] = a0[i, j] * x[j]
zv.lo[i, j] = 1

defobjentz[...] = obj == Sum(
[i, j], zv[i, j] * gams_math.log(zv[i, j] / zbar[i, j])
)

mEntropyz = Model(
m,
name="mEntropyz",
equations=[rowbalz, colbalz, defobjentz],
problem=Problem.NLP,
sense=Sense.MIN,
objective=obj,
)

mEntropyz.solve()
report["EntropyZ", i, j] = zv.l[i, j] / x[j]

# --- 4. absolute deviation formulations result in LPs
#        MAPE Mean absolute percentage error
#        Linf Infinity norm
# Positive Variable
ap = Variable(
m,
name="ap",
type="positive",
domain=[i, j],
description="positive deviation iation",
)
an = Variable(
m,
name="an",
type="positive",
domain=[i, j],
description="negative deviation",
)
amax = Variable(
m, name="amax", type="positive", description="maximum absilute dev"
)

# Equation
defabs = Equation(
m, name="defabs", domain=[i, j], description="absolute definition"
)
defmaxp = Equation(
m, name="defmaxp", domain=[i, j], description="max positive"
)
defmaxn = Equation(
m, name="defmaxn", domain=[i, j], description="max neagtive"
)
m, name="defmade", description="mean absolute percentage error"
)
deflinf = Equation(m, name="deflinf", description="infinity norm")

defabs[i, j] = a[i, j] - a0[i, j] == ap[i, j] - an[i, j]

defmaxp[i, j] = a[i, j] - a0[i, j] <= amax

defmaxn[i, j] = a[i, j] - a0[i, j] >= -amax

defmad[...] = obj == 1 / gams_math.power(Card(i), 2) * Sum(
[i, j], ap[i, j] + an[i, j]
)

defmade[...] = obj == 100 / gams_math.power(Card(i), 2) * Sum(
[i, j], (ap[i, j] + an[i, j]) / a0[i, j]
)

deflinf[...] = obj == amax

# Model
m,
problem="LP",
sense=Sense.MIN,
objective=obj,
)
m,
problem="LP",
sense=Sense.MIN,
objective=obj,
)
mLinf = Model(
m,
name="mLinf",
equations=[rowbal, colbal, defmaxp, defmaxn, deflinf],
problem="LP",
sense=Sense.MIN,
objective=obj,
)

report["MAD", i, j] = a.l[i, j]
report["MADE", i, j] = a.l[i, j]
mLinf.solve()
report["Linf", i, j] = a.l[i, j]

# --- 5. Squared Deviations can be solved with powerful QP codes
#        SD     squared deviations
#        RSD    relative squared deviations
# Equation
defsd = Equation(m, name="defsd")
defrsd = Equation(m, name="defrsd")

defsd[...] = obj == Sum([i, j], gams_math.power(a[i, j] + a0[i, j], 2))

defrsd[...] = obj == Sum(
[i, j], gams_math.power(a[i, j] + a0[i, j], 2) / a0[i, j]
)

# Model
mSD = Model(
m,
name="mSD",
equations=[rowbal, colbal, defsd],
problem=Problem.QCP,
sense=Sense.MIN,
objective=obj,
)
mRSD = Model(
m,
name="mRSD",
equations=[rowbal, colbal, defrsd],
problem=Problem.QCP,
sense=Sense.MIN,
objective=obj,
)

mSD.solve()
report["SD", i, j] = a.l[i, j]
mRSD.solve()
report["RSD", i, j] = a.l[i, j]

import math

assert math.isclose(mRSD.objective_value, 7.6203, rel_tol=0.001)

print()
print("\t SOLUTION REPORT: \n", report.pivot())

if __name__ == "__main__":
main()
```