Distributionally Robust Mean-CVaR Portfolio Optimization using Wasserstein-Based Ambiguity Sets¶
Consider the distributionally robust portfolio optimization problem described in Section 7.1 of [Esfahani2018]:
where \(\langle \cdot, \cdot \rangle\) is the inner product, \(\hat{\mathbb{P}}\) is the empirical distribution based on data, \(\xi = [\xi_1, \dots \xi_m]^T\) are the market values of \(m\) assets and \(x\) is the portfolio with \(\mathbb{x} = \{x \in \mathbb{R}_+^m, \sum_i x_i = 1\}\).
Using the convex optimization-based definition of \(\text{CVaR}\), this portfolio optimization problem can be rewritten as a worst-case expectation involving the max of affine terms. This worst-case optimization problem can then be formulated into the following convex optimization problem:
where \(N\) is the number of samples, \(\|\cdot\|_*\) is the dual norm, and \(k \in \{1,2\}\) with \(a_1 = −1, \ a_2 = −1 − \rho/\alpha, \ b_1 = \rho, b_2 = \rho(1 − 1/\alpha)\). The support set is given by \(\Xi = \{\xi | C \xi \leq d\}\).
Using cvxRiskOpt, the original problem can be encoded using high-level function without having to reformulate the problem.
Example¶
The distributionally robust mean-cvar optimization problem, as described in [Esfahani2018] with \(\Xi = \mathbb{R}^m\) and using the 1-norm so that the dual norm is the \(\infty\) norm, can be encoded using cvxRiskOpt as follows:
import numpy as np
import cvxpy as cp
import time
from numpy.random import normal as gauss
from cvxRiskOpt.wass_risk_opt_pb import WassDRExpectation, WassDRCVaR
solver = cp.CLARABEL
def generate_esfahani_portfolio_prob_dataset(num_samples, num_assets, num_simulations):
# parameters
phi_mu, phi_sigma = 0, 2 / 100 # systematic risk factor: Gaussian(mu, var)
zeta_mu, zeta_sigma = lambda i: i * 3 / 100, lambda i: i * 2.5 / 100 # unsystematic risk factor: Gaussian(mu, var)
# dataset
xi_dataset = np.zeros([num_samples, num_assets, num_simulations])
for sim in range(num_simulations):
for i in range(num_assets):
# this implementation matches the paper results
phi = gauss(phi_mu, phi_sigma, num_samples)
zeta = gauss(zeta_mu(i + 1), zeta_sigma(i + 1), num_samples)
xi_dataset[:, i, sim] = phi + zeta
return xi_dataset
eps = 0.01 # Wasserstein radius
num_samples = 100
# problem settings
num_assets = 10
alpha, rho = 0.2, 10
num_sim = 1
# generate the dataset
xi_dataset = generate_esfahani_portfolio_prob_dataset(num_samples, num_assets, num_sim)
# create the optimization problem
# set up the problem we're testing
x = cp.Variable(num_assets, name='x')
# expectation part
a_e = -1 * x
expect_part = WassDRExpectation(num_samples, a_e, used_norm=1)
# CVaR part
a_c = -1 * x
cvar_part = WassDRCVaR(num_samples, num_assets, a_c, alpha=alpha, used_norm=1)
# portfolio constraints
portfolio_constr = cp.Problem(cp.Minimize(0), [cp.sum(x) == 1, x >= 0])
# put together the DR portfolio problem
test_prob = expect_part + rho * cvar_part + portfolio_constr
# run the simulations
x_test, t_test = [], []
for sim in range(num_sim):
xi = xi_dataset[:, :, sim]
# solve the problem we are testing
for par in test_prob.param_dict.keys():
if 'eps' in par:
test_prob.param_dict[par].value = eps
if 'samples' in par:
test_prob.param_dict[par].value = xi
test_prob.solve(
solver=solver) # cvxpy's first run is usually slower than all other solves. Solve once before timing it
t0 = time.time()
test_prob.solve(solver=solver)
t1 = time.time()
test_result = x.value
x_test.append(test_result)
print("Portfolio distribution: ", x_test)
print("Solve Time: %.3f ms" % (1000 * (t1 - t0)))
Portfolio distribution: [array([5.49843425e-10, 7.01795140e-10, 1.47961976e-09, 6.67777003e-02,
1.32029716e-01, 1.74495485e-01, 1.74497468e-01, 1.73936079e-01,
1.42893790e-01, 1.35369758e-01])]
Solve Time: 20.528 ms
Note that compared to the code above which uses cvxRiskOpt, the CVXPY-only implementation of the problem, after its manual reformulation, is given by:
# Optimization variables
self.x = cp.Variable(self.n, name='x') # decision variable
self.tau = cp.Variable(name='tau') # variable for CVaR computation
self.lam = cp.Variable(name='lam') # lambda variable from dual problem
self.s = cp.Variable(self.N, name='s') # epigraph auxiliary variables
# l(xi) = max_k a_k * xi + b_k loss function
self.a = [-self.expectation_rho * self.x, (-self.expectation_rho - self.rho / self.alpha) * self.x]
self.b = [self.rho * self.tau, self.rho * (1 - 1 / self.alpha) * self.tau]
self.K = len(self.a) # number of affine functions
# one more optimization variable
if self.use_gamma:
self.gamma = [cp.Variable((self.K, len(self.d)), name='gamma_' + str(i)) for i in range(self.N)]
# Parameters
self.eps = cp.Parameter(name='eps')
self.xi = cp.Parameter((self.N, self.m), name='xi')
# Optimization objective
objective = self.lam * self.eps + 1 / self.N * cp.sum(self.s)
self.objective = cp.Minimize(objective)
# constraints
constraints = []
# DR (Expectation + CVaR) constraints
for i in range(self.N):
for k in range(self.K):
if self.use_gamma:
constraints += [self.b[k] + self.a[k] @ self.xi[i, :] + self.gamma[i][k, :] @ (self.d - self.C @ self.xi[i, :]) <= self.s[i]]
constraints += [cp.norm(self.C.T @ self.gamma[i][k, :] - self.a[k], np.inf) <= self.lam]
else:
constraints += [self.b[k] + self.a[k] @ self.xi[i, :] <= self.s[i]]
constraints += [cp.norm(- self.a[k], np.inf) <= self.lam]
if self.use_gamma:
constraints += [self.gamma[i] >= 0]
# additional constraints on x
constraints += [cp.sum(self.x) == 1]
constraints += [self.x >= 0]
self.constraints = constraints
self.problem = cp.Problem(self.objective, self.constraints)
i.e. instead of the following using cvxRiskOpt:
x = cp.Variable(num_assets, name='x')
# expectation part
a_e = -1 * x
expect_part = WassDRExpectation(num_samples, a_e, used_norm=1)
# CVaR part
a_c = -1 * x
cvar_part = WassDRCVaR(num_samples, num_assets, a_c, alpha=alpha, used_norm=1)
# portfolio constraints
portfolio_constr = cp.Problem(cp.Minimize(0), [cp.sum(x) == 1, x >= 0])
# put together the DR portfolio problem
test_prob = expect_part + rho * cvar_part + portfolio_constr
Generating C code¶
We can also generate C code for the CVXPY Problem instance above using CVXPYgen. This can be done as shown below.
from cvxpygen import cpg
cpg.generate_code(test_prob, code_dir='dr_portfolio_opt', solver=solver)
from dr_portfolio_opt.cpg_solver import cpg_solve
test_prob.register_solve('cpg', cpg_solve)
update_params = [] # get the list of parameters that should be updated
for par in test_prob.param_dict.keys():
if 'eps' in par or 'samples' in par:
update_params.append(par)
test_prob.solve(method='cpg', updated_params=update_params)
t0 = time.time()
test_prob.solve(method='cpg', updated_params=update_params)
t1 = time.time()
print("Portfolio distribution with codegen: ", x.value)
print("Solve Time: %.3f ms" % (1000 * (t1 - t0)))
Portfolio distribution with codegen: [5.49843425e-10 7.01795140e-10 1.47961976e-09 6.67777003e-02
1.32029716e-01 1.74495485e-01 1.74497468e-01 1.73936079e-01
1.42893790e-01 1.35369758e-01]
Solve Time: 16.167 ms
The portfolio distribution matches with and without using CVXPYgen. By calling the C code, we get a speed-up.