""""""
"""
cvxRiskOpt: Risk-Based Optimization tool using CVXPY and CVXPYgen
Copyright (C) 2024 Sleiman Safaoui
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
GitHub:
@The-SS
Email:
snsafaoui@gmail.com
sleiman.safaoui@utdallas.edu
CVXPY-based data-driven optimization problems that compute risk metrics.
We implement some results from:
"Data-driven distributionally robust optimization using the Wasserstein metric:
Performance guarantees and tractable reformulations"
"""
import cvxpy as cp
import numpy as np
from numbers import Number
from cvxpy.problems.problem import Problem as CVXPYProb
from cvxpy.utilities.deterministic import unique_list
from typing import Union
[docs]
class WassWCEMaxAffine(CVXPYProb):
"""
Wasserstein Worst Case Expectation Problem with Max of Affine Terms.
This class implements a generic solution for Cor 5.1 (i), eq (15a) with affine loss functions:
.. math::
\\ell = \\max_k \\{ \\langle a_k, \\xi \\rangle + b_k \\}
where
- xi (m x 1): the random variable for which we have samples
- a_k (m x 1): a vector that is either a constant vector or a scalar times a decision variable (cvxpy affine expression)
- b_k (1 x 1): a scalar or a 1-dimensional cvxpy affine expression
- :math:`\\langle \\cdot, \\cdot \\rangle` is the inner product
.. note::
Currently, this is only implemented for the 1-, 2-, and inf-norm cases.
Arguments:
----------
num_samples: int:
The number of samples.
a_k_list: Number | array_like
A list of vectors that are either constant vectors or scalars times a decision variable.
b_k_list: array_like
A list of scalars or 1-dimensional cvxpy affine expressions.
support_C: np.ndarray, optional
The support matrix C. If None passed, the support is considered to be the real space.
support_d: np.ndarray, optional
The support vector d. If None passed, the support is considered to be the real space.
used_norm: int | np.inf, optional
The norm to be used. Either 1, 2, or infinity (np.inf). Default is 2.
vp_suffix: str, optional
The suffix for the variable/parameter names.
"""
instance_counter = 0
def __init__(self, num_samples: int,
a_k_list: list, b_k_list: list = None,
support_C=None, support_d=None, used_norm=2,
vp_suffix: str = None) -> None:
# choose the cp variable and parameter suffix
WassWCEMaxAffine.instance_counter += 1
if vp_suffix is None:
vp_suffix = "_wwcema" + str(WassWCEMaxAffine.instance_counter)
# number of samples
self._num_samples = num_samples
# # loss function terms
# if a_k_list is None or a_k_list == []:
# raise ValueError('Problem statement error: a_k_list must be non-empty')
self._a_k_list = a_k_list
self._K = len(self._a_k_list)
if isinstance(self._a_k_list[0], Number): # a_k is just a number
self._m = 0
else:
self._m = self._a_k_list[0].shape[0] if self._a_k_list[0].shape else 0
# TODO: consider adding some logic to check if all values of a_k are similar in type/dimension
# raise ValueError('a_k_list error: all value must be of the same dimension')
self._b_k_list = b_k_list if b_k_list is not None else []
# if self._b_k_list:
# if isinstance(self._b_k_list[0], Number): # b_k is just a number
# b_dim = 0
# else:
# b_dim = self._b_k_list[0].shape[0] if self._b_k_list[0].shape else 0
# TODO: consider adding some logic to check if all values of a_k are similar in type/dimension
# raise ValueError('b_k_list error: all value must be of the same dimension')
if self._b_k_list and len(self._b_k_list) != self._K:
raise ValueError('Dimension missmatch: b_k_list must either be empty or of the same dimension as a_k_list.')
# # TODO: add some type checking to make sure that a_k and b_k terms are cst or cvxpy affine expressions
# # and of the right dimensions
# Random variable support: Xi = {xi | C xi <= d}
self._C, self._d = support_C, support_d
if support_C is not None and support_d is not None:
self._use_gamma = True
self._d_size = len(self._d)
elif support_C is None and support_d is None:
self._use_gamma = False
else:
raise ValueError('Random variable support error: Both C and d must be set.')
# norm selection
self._used_norm = used_norm
if self._used_norm == 2:
self._dual_norm = 2
elif self._used_norm == 1:
self._dual_norm = np.inf
elif self._used_norm == np.inf:
self._dual_norm = 1
else:
raise NotImplementedError('Chosen norm not supported.')
# Optimization variables due to problem reformulation
self._lam = cp.Variable(name='lam' + vp_suffix) # dual problem lambda variable
self._s = cp.Variable(self._num_samples, name='s' + vp_suffix) # epigraph auxiliary variables
if self._use_gamma: # gamma terms s.t. gamma @ (d - C @ sample)
self._gamma = [cp.Variable((self._K, self._d_size), name='gamma_' + str(i) + vp_suffix) for i in
range(self._num_samples)]
# Parameters
self._eps = cp.Parameter(name='eps' + vp_suffix) # Wasserstain ball radius
# Random variable samples
if self._m > 0:
self._samples = cp.Parameter([self._num_samples, self._m], name='samples' + vp_suffix)
elif self._m == 0:
self._samples = cp.Parameter([self._num_samples, ], name='samples' + vp_suffix)
else:
raise ValueError('m should be >= 0, but that is not the case.')
self._obj = self._def_opt_pb_obj()
self._constr = self._def_opt_pb_contr()
super().__init__(self._obj, self._constr)
def _def_opt_pb_obj(self):
"""
defines the objective function
:return:
"""
# Optimization Objective
return cp.Minimize(self._lam * self._eps + 1 / self._num_samples * cp.sum(self._s))
def _def_opt_pb_contr(self):
"""
defines the constraints
:return:
"""
constraints = []
for i in range(self._num_samples):
for k in range(self._K):
# b_k + a_k @ xi_i + gamma_ik @ (d - C xi_i) <= s_i
lhs1 = 0
if self._b_k_list:
lhs1 += self._b_k_list[k]
a_k = self._a_k_list[k]
samp = self._samples[i]
if self._m > 1:
lhs1 += a_k @ samp
else:
lhs1 += a_k * samp
if self._use_gamma:
if self._m > 1:
lhs1 += self._gamma[i][k, :] @ (self._d - self._C @ samp)
else:
lhs1 += self._gamma[i][k, :] @ (self._d - np.squeeze(self._C) * samp)
constraints += [lhs1 <= self._s[i]]
# dual_norm{C @ gamma_ik - a_k} <= lambda
if self._use_gamma:
lhs2 = self._C.T @ self._gamma[i][k, :] - a_k
else:
lhs2 = -a_k
constraints += [cp.norm(lhs2, self._dual_norm) <= self._lam]
# gamma_ik >= 0
if self._use_gamma:
constraints += [self._gamma[i] >= 0]
return constraints
# TODO: consider implementing some operators.
[docs]
def update_wass_wce_params(prob, eps, samples) -> None:
"""
Update the parameters associated with the WassWCEMaxAffine problem.
Arguments:
----------
prob: WassWCEMaxAffine:
WassWCEMaxAffine problem
eps: int:
Wasserstein ball radius
samples: np.ndarray:
Data samples
"""
for par in prob.param_dict.keys():
if 'eps' in par:
prob.param_dict[par].value = eps
if 'samples' in par:
prob.param_dict[par].value = samples
return
[docs]
class WassDRExpectation(WassWCEMaxAffine):
"""
Provides a high-level implementation of the DR Expectation with a Wasserstein-based ambiguity set.
Implements
.. math::
\\sup_{\\mathbb{P} \\in \\mathcal{P}} \\quad \\mathbb{E}^\\mathbb{P} \\left[ \\langle a, \\xi \\rangle + b \\right]
where :math:`a, b` may contain decision variables and :math:`\\langle \\cdot, \\cdot \\rangle` is the inner product.
Arguments:
----------
num_samples: int:
The number of samples.
a: int | float | cp.Variable | cp.Parameter | cp.Expression:
The "a" term in :math:`\\langle a, xi \\rangle + b`.
b: int | float | cp.Variable | cp.Parameter | cp.Expression, optional"
The "b" term in :math:`\\langle a, xi \\rangle + b`. If not assigned, 0 is used.
support_C: np.ndarray, optional
The support matrix C. If None passed, the support is considered to be the real space.
support_d: np.ndarray, optional
The support vector d. If None passed, the support is considered to be the real space.
used_norm: int | np.inf, optional
The norm to be used. Either 1, 2, or infinity (np.inf). Default is 2.
vp_suffix: str, optional
The suffix for the variable/parameter names.
"""
instance_counter = 0
def __init__(self,
num_samples: int,
a: int | float | cp.Variable | cp.Parameter | cp.Expression,
b: int | float | cp.Variable | cp.Parameter | cp.Expression = 0,
support_C=None, support_d=None, used_norm=2, vp_suffix=None):
WassDRExpectation.instance_counter += 1
if vp_suffix is None:
vp_suffix = "_wdre" + str(WassDRExpectation.instance_counter)
if b == 0:
b = []
a = [a]
super().__init__(num_samples, a, b, support_C, support_d, used_norm, vp_suffix=vp_suffix)
def _problems_match(self, other) -> bool:
if self._used_norm != other._used_norm:
raise NotImplementedError("Norms are different")
elif self._num_samples != other._num_samples:
raise NotImplementedError("Number of samples is different")
elif self._C != other._C or self._d != other._d:
raise NotImplementedError("Supports are different")
return True
# TODO: might be a good idea to check that the params used by both problems (xi and eps) are the same
def __add__(self, other) -> Union["WassDRExpectation", "CVXPYProb", "WassDRCVaR"]:
if other == 0:
return self
elif isinstance(other, WassDRExpectation): # DR-E + DR-E --> just add a and b terms if problems match
if self._problems_match(other):
a = self._a_k_list[0] + other._a_k_list[0]
b1 = self._b_k_list[0] if self._b_k_list else 0
b2 = other._b_k_list[0] if other._b_k_list else 0
b = b1 + b2
return WassDRExpectation(self._num_samples, a, b, self._C, self._d, self._used_norm)
else:
raise NotImplementedError()
elif isinstance(other, WassWCEMaxAffine):
if self._problems_match(other):
a_expect = self._a_k_list[0]
b_expect = self._b_k_list[0] if self._b_k_list else 0
a_list_new = [a_expect + a_k_other for a_k_other in other._a_k_list]
if b_expect == 0 and (not other._b_k_list): # no b terms in either problem
b_list_new = None
elif b_expect == 0: # no b in DR-E. b term in other only
b_list_new = other._b_k_list
elif not other._b_k_list: # b in DR-E only. no b term in other
b_list_new = [b_expect for _ in other._a_k_list]
else: # b terms in both
b_list_new = [b_expect + b_other for b_other in other._b_k_list]
if isinstance(other, WassDRCVaR):
return WassDRCVaR(other._num_samples, other._m, a=None,
a_k_list=a_list_new, b_k_list=b_list_new, alpha=other._alpha,
support_C=other._C, support_d=other._d, used_norm=other._used_norm)
else:
return WassWCEMaxAffine(num_samples=other._num_samples, a_k_list=a_list_new, b_k_list=b_list_new,
support_C=other._C, support_d=other._d, used_norm=other._used_norm)
else:
raise NotImplementedError()
elif not isinstance(other, CVXPYProb):
raise NotImplementedError()
else:
return CVXPYProb(self.objective + other.objective, unique_list(self.constraints + other.constraints))
def __sub__(self, other) -> Union["WassDRExpectation", "CVXPYProb"]:
if isinstance(other, WassDRExpectation):
if self._problems_match(other):
a = self._a_k_list[0] - other._a_k_list[0]
b1 = self._b_k_list[0] if self._b_k_list else 0
b2 = other._b_k_list[0] if other._b_k_list else 0
b = b1 - b2
return WassDRExpectation(self._num_samples, a, b, self._C, self._d, self._used_norm)
else:
raise NotImplementedError()
elif not isinstance(other, CVXPYProb):
raise NotImplementedError()
else:
CVXPYProb(self.objective - other.objective, unique_list(self.constraints + other.constraints))
def __mul__(self, other) -> "WassDRExpectation":
if not isinstance(other, (int, float)):
raise NotImplementedError()
a = self._a_k_list[0] * other
b = self._b_k_list[0] * other if self._b_k_list else 0
return WassDRExpectation(self._num_samples, a, b, self._C, self._d, self._used_norm)
__rmul__ = __mul__
def __div__(self, other) -> "WassDRExpectation":
if not isinstance(other, (int, float)):
raise NotImplementedError()
if np.abs(other) < 1e-8: # basically division by zero
raise ZeroDivisionError()
a = self._a_k_list[0] * (1.0 / other)
b = self._b_k_list[0] * (1.0 / other) if self._b_k_list else 0
return WassDRExpectation(self._num_samples, a, b, self._C, self._d, self._used_norm)
__truediv__ = __div__
# TODO: consider implementing the following
# __neg__, __rsub__, __radd__
[docs]
class WassDRCVaR(WassWCEMaxAffine):
"""
Provides a high-level implementation of the DR-CVaR with a Wasserstein-based ambiguity set.
Implements
.. math ::
\\sup_{\\mathbb{P} \\in \\mathcal{P}} \\quad \\text{CVaR}_{\\alpha}^\\mathbb{P}\\left[ \\langle a, \\xi \\rangle + b \\right]
where :math:`a, b` may contain decision variables and :math:`\\langle \\cdot, \\cdot \\rangle` is the inner product.
:math:`\\alpha` is the CVaR level (average in alpha * 100% of the worst/highest cases)
Alternatively, this can also implement the CVaR reformulated form as a max of two affine terms:
.. math ::
\\sup_{\\mathbb{P} \\in \\mathcal{P}} \\quad \\mathbb{E}^{\\mathbb{P}} \\max_{k \\in \\{1, 2\\}} \\left[ \\langle a_k, \\xi \\rangle + b_k \\right].
.. Note ::
Either "a" and "b" must be passed or "a_k_list" and "b_k_list", but not both nor neither.
Arguments:
----------
num_samples: int:
The number of samples.
xi_length: int:
Size of a sample. If :math:`\\xi` is m x 1 --> xi_length = m
a: int | float | cp.Variable | cp.Parameter | cp.Expression, optional:
The "a" term in :math:`\\langle a, xi \\rangle + b`.
If this is passed, the "b" term is expected.
This should not be passed if "a_k_list" and "b_k_list" are passed.
b: int | float | cp.Variable | cp.Parameter | cp.Expression, optional, optional:
The "b" term in :math:`\\langle a, xi \\rangle + b`. If not assigned, 0 is used.
This should not be passed if "a_k_list" and "b_k_list" are passed.
a_k_list: Number | array_like, optional:
A list of vectors that are either constant vectors or scalars times a decision variable.
If this is passed, the "b_k_list" term is expected.
This should not be passed if "a" and "b" are passed.
b_k_list: array_like, optional:
A list of scalars or 1-dimensional cvxpy affine expressions.
This should not be passed if "a" and "b" are passed.
alpha: float, optional:
CVaR level (average in alpha * 100% of the worst/highest cases)
support_C: np.ndarray, optional
The support matrix C. If None passed, the support is considered to be the real space.
support_d: np.ndarray, optional
The support vector d. If None passed, the support is considered to be the real space.
used_norm: int | np.inf, optional
The norm to be used. Either 1, 2, or infinity (np.inf). Default is 2.
vp_suffix: str, optional
The suffix for the variable/parameter names.
"""
instance_counter = 0
def __init__(self,
num_samples: int, xi_length: int,
a: int | float | cp.Variable | cp.Parameter | cp.Expression = None,
b: int | float | cp.Variable | cp.Parameter | cp.Expression = 0,
a_k_list=None,
b_k_list=None,
alpha=0.1,
support_C=None, support_d=None,
used_norm=2, vp_suffix=None):
WassDRCVaR.instance_counter += 1
if vp_suffix is None:
vp_suffix = "_wdrcvar" + str(WassDRCVaR.instance_counter)
self._alpha = alpha
if a is None and (a_k_list is None or b_k_list is None):
raise ValueError(
'Option 1: provide a (optionally b). Option 2: provided a_k_list and b_k_list. Both failed.')
if a is None:
super().__init__(num_samples, a_k_list, b_k_list, support_C, support_d, used_norm, vp_suffix=vp_suffix)
else:
tau = cp.Variable(1, name='tau' + vp_suffix)
a1 = np.zeros(xi_length)
a2 = a / alpha
b1 = tau
b2 = b / alpha + (1 - 1 / alpha) * tau
a_k_list = [a1, a2]
b_k_list = [b1, b2]
super().__init__(num_samples, a_k_list, b_k_list, support_C, support_d, used_norm, vp_suffix=vp_suffix)
def _problems_match(self, other) -> bool:
if self._used_norm != other._used_norm:
raise NotImplementedError("Norms are different")
elif self._num_samples != other._num_samples:
raise NotImplementedError("Number of samples is different")
elif self._C != other._C or self._d != other._d:
raise NotImplementedError("Supports are different")
elif isinstance(other, WassDRCVaR) and self._C != other._alpha or self._d != other._alpha:
raise NotImplementedError("CVaR alpha levels are different")
return True
# TODO: might be a good idea to check that the params used by both problems (xi and eps) are the same
def __add__(self, other) -> Union["WassDRExpectation", "CVXPYProb", "WassDRCVaR"]:
if other == 0:
return self
elif isinstance(other, WassDRExpectation): # DR-CVaR + DR-E special case --> call __add__ in DR-E
return other + self
elif not isinstance(other, CVXPYProb):
raise NotImplementedError()
else:
return CVXPYProb(self.objective + other.objective, unique_list(self.constraints + other.constraints))
def __mul__(self, other) -> "WassDRCVaR":
if not isinstance(other, (int, float)):
raise NotImplementedError()
if other <= 0:
raise NotImplementedError('Multiplication is only supported with positive scalars')
a_list_new = [a_k * other for a_k in self._a_k_list]
b_list_new = [b_k * other for b_k in self._b_k_list]
return WassDRCVaR(self._num_samples, self._m, a=None,
a_k_list=a_list_new, b_k_list=b_list_new,
alpha=self._alpha, support_C=self._C,
support_d=self._d, used_norm=self._used_norm)
__rmul__ = __mul__
def __div__(self, other) -> "WassDRCVaR":
if not isinstance(other, (int, float)):
raise NotImplementedError()
if np.abs(other) < 1e-8: # basically division by zero
raise ZeroDivisionError()
return (1.0 / other) * self
__truediv__ = __div__
# TODO: consider implementing the rest of the operators.