"""
## GAMSSOURCE: https://www.gams.com/latest/gamslib_ml/libhtml/gamslib_cta.html
## LICENSETYPE: Demo
## MODELTYPE: MIP
## DATAFILES: cta.xlsx
## KEYWORDS: mixed integer linear programming, statistical disclosure limitations
Controlled Tabular Adjustments (CTA)
Statistical agencies publish data which contains items that need to be
altered to protect confidentiality. Controlled Tabular Adjustments (CTA)
is a recent method to limit disclosure and can be elegantly expressed
as a Mixed Integer Programming problem. The programming framework then
allows easy expression of other data relationships like multi-dimensional
adding up conditions. The following model uses a 3-dimensional table from
from Cox, Kelly and Patil (2005) to illustrate this method.
The data is stored in an Excel Spreadsheet.
Lawrence H Cox, James P Kelly and Rahul J Patil, Computational Aspects
of Controlled Tabular Adjustments: Algorithms and Analysis, in The Next
Wave in Computing, Optimization, and Decision Technologies, Eds Bruce L Golden,
S Raghavan and Edward A Wasil, Springer, 2005, pp 45-59.
"""
from __future__ import annotations
import math
import os
import sys
from gams.connect import ConnectDatabase
from gamspy import (
Container,
Domain,
Equation,
Model,
Number,
Options,
Parameter,
Sense,
Set,
Sum,
Variable,
)
from gamspy.math import Round
def main():
m = Container(
system_directory=os.getenv("SYSTEM_DIRECTORY", None),
)
# Sets
i = Set(m, name="i", description="rows")
j = Set(m, name="j", description="columns")
k = Set(m, name="k", description="planes")
v = Set(m, name="v", description="non zero cells", domain=[i, j, k])
s = Set(m, name="s", description="sensitive cells", domain=[i, j, k])
# Parameters
dat = Parameter(
m,
name="dat",
description="unprotected data table",
domain=[k, i, j],
domain_forwarding=True,
)
pro = Parameter(
m,
name="pro",
description="information sensitive cells",
domain=[k, i, j],
)
# extract data from Excel
file_dir = os.path.dirname(os.path.abspath(__file__))
cdb = ConnectDatabase(m.system_directory)
cdb.exec_task(
{
"ExcelReader": {
"file": os.path.join(file_dir, "cta.xlsx"),
"symbols": [
{
"name": "dat",
"range": "Sheet1!A1",
"rowDimension": 2,
"columnDimension": 1,
},
{
"name": "pro",
"range": "Sheet2!A1",
"rowDimension": 2,
"columnDimension": 1,
},
],
}
}
)
dat.setRecords(cdb.container["dat"].records)
pro.setRecords(cdb.container["pro"].records)
# do some basic data checks
check = Parameter(m, name="check")
check[...] = Sum(
[i, k], Round(Sum(j, dat[k, i, j]) - 2 * dat[k, i, "total"])
)
assert math.isclose(check.toList()[0], 0), "row totals are incorrect"
check[...] = Sum(
[j, k], Round(Sum(i, dat[k, i, j]) - 2 * dat[k, "total", j])
)
assert math.isclose(check.toList()[0], 0), "column totals are incorrect"
check[...] = Sum(
[i, j], Round(Sum(k, dat[k, i, j]) - 2 * dat["total", i, j])
)
assert math.isclose(check.toList()[0], 0), "plane totals are incorrect"
# Parameter BigM
BigM = Parameter(
m,
name="BigM",
description="the famous big M - make it as small as possible",
)
# Variables
t = Variable(
m, name="t", description="adjusted cell value", domain=[i, j, k]
)
adjn = Variable(m, name="adjn", domain=[i, j, k], type="Positive")
adjp = Variable(m, name="adjp", domain=[i, j, k], type="Positive")
b = Variable(m, name="b", domain=[i, j, k], type="Binary")
# Equations
defadj = Equation(
m,
name="defadj",
description="define new cell values",
domain=[i, j, k],
)
addrow = Equation(
m, name="addrow", description="add up for rows", domain=[i, k]
)
addcol = Equation(
m, name="addcol", description="add up for columns", domain=[j, k]
)
addpla = Equation(
m, name="addpla", description="add up for plane", domain=[i, j]
)
pmin = Equation(
m,
name="pmin",
description="small value for sensitive cells",
domain=[i, j, k],
)
pmax = Equation(
m,
name="pmax",
description="big value for sensitive cells",
domain=[i, j, k],
)
pminx = Equation(m, name="pminx", domain=[i, j, k])
pmaxx = Equation(m, name="pmaxx", domain=[i, j, k])
v[i, j, k] = dat[k, i, j]
s[i, j, k] = pro[k, i, j]
BigM.setRecords(3)
defadj[v[i, j, k]] = t[v] == dat[k, i, j] + adjp[v] - adjn[v]
addrow[i, k] = Sum(v[i, j, k], t[v]) == 2 * t[i, "total", k]
addcol[j, k] = Sum(v[i, j, k], t[v]) == 2 * t["total", j, k]
addpla[i, j] = Sum(v[i, j, k], t[v]) == 2 * t[i, j, "total"]
pmin[s[i, j, k]] = adjn[s] >= pro[k, i, j] * (1 - b[s])
pmax[s[i, j, k]] = adjp[s] >= pro[k, i, j] * b[s]
pminx[s[i, j, k]] = adjn[s] <= BigM * pro[k, i, j] * (1 - b[s])
pmaxx[s[i, j, k]] = adjp[s] <= BigM * pro[k, i, j] * b[s]
cox3 = Model(
m,
name="cox3",
equations=m.getEquations(),
problem="MIP",
sense=Sense.MIN,
objective=Sum([i, j, k], adjn[i, j, k] + adjp[i, j, k]),
)
cmd_params = Options(absolute_optimality_gap=0.99, time_limit=10)
cox3.solve(
options=cmd_params,
output=sys.stdout,
)
rep = Parameter(
m, name="rep", description="summary report", domain=[k, i, j]
)
adjsum = Parameter(
m,
name="adjsum",
description="adjustment summary",
domain=[k, i, j, "*"],
)
adjrep = Parameter(
m, name="adjrep", description="adjustment report", domain=[k, i, j]
)
rep[k, i, j] = t.l[i, j, k]
adjsum[k, i, j, "neg"] = adjn.l[i, j, k]
adjsum[k, i, j, "pos"] = adjp.l[i, j, k]
adjsum[k, i, j, "min"] = pro[k, i, j]
adjrep[k, i, j] = -adjn.l[i, j, k] + adjp.l[i, j, k]
cdb = ConnectDatabase(m.system_directory, m)
cdb.exec_task(
{
"ExcelWriter": {
"file": os.path.join(file_dir, "results.xlsx"),
"clearSheet": True,
"symbols": [
{
"name": "adjrep",
},
{
"name": "rep",
},
{
"name": "adjsum",
},
],
}
}
)
binrep = Parameter(
m,
name="binrep",
domain=["*", "*", "*", "*"],
)
obj = cox3.objective_value
best = round(obj)
num_nodes_used = cox3.num_nodes_used
solve_time = cox3.total_solve_time
for it in range(5):
if (obj - best) / best > 0.01:
break
b_list = b.toList("level")
sol_str = f"solution{it+1}"
binrep[s, sol_str] = Round(b.l[s])
binrep["", "", "Obj", sol_str] = obj
binrep["", "", "mSec", sol_str] = solve_time * 1000
binrep["", "", "nodes", sol_str] = num_nodes_used
binrep["Comp", "Cells", "Adjusted", sol_str] = Sum(
Domain(i, j, k).where[~s[i, j, k]],
Number(1).where[Round(adjn.l[i, j, k] + adjp.l[i, j, k])],
)
cutone = Equation(m, name=f"cutone_{it}")
cuttwo = Equation(m, name=f"cuttwo_{it}")
cutone[...] = (
sum(
[
1 - b[rec[:-1]] if rec[3] > 0.5 else b[rec[:-1]]
for rec in b_list
]
)
>= 1
)
cuttwo[...] = (
sum(
[
1 - b[rec[:-1]] if rec[3] < 0.5 else b[rec[:-1]]
for rec in b_list
]
)
>= 1
)
cox3c = Model(
m,
name=f"cox3c_{it}",
equations=m.getEquations(),
problem="MIP",
sense=Sense.MIN,
objective=Sum([i, j, k], adjn[i, j, k] + adjp[i, j, k]),
)
cox3c.solve(options=cmd_params, output=sys.stdout)
obj = cox3c.objective_value
num_nodes_used = cox3c.num_nodes_used
solve_time = cox3c.total_solve_time
cdb = ConnectDatabase(m.system_directory, m)
cdb.exec_task(
{
"ExcelWriter": {
"file": os.path.join(file_dir, "results.xlsx"),
"clearSheet": True,
"symbols": [
{
"name": "binrep",
}
],
}
}
)
if __name__ == "__main__":
main()