Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

By vectorizing computations and using Numba JIT, the performance of SRISK calculations has been significantly improved. #7

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 173 additions & 50 deletions src/frds/algorithms/_garch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import warnings
import itertools
from typing import List
from typing import List, Union
from dataclasses import dataclass
import numpy as np
from scipy.optimize import minimize, OptimizeResult
Expand Down Expand Up @@ -147,10 +147,10 @@ def loglikelihood_model(
"""
if zero_mean:
resids = self.returns
self.sigma2 = self.compute_variance(params, resids, backcast, var_bounds)
self.sigma2 = compute_variance(params, resids, backcast, var_bounds)
else:
resids = self.returns - params[0] # params[0] is mu
self.sigma2 = self.compute_variance(
self.sigma2 = compute_variance(
params[1:], resids, backcast, var_bounds
)
return -self.loglikelihood(resids, self.sigma2)
Expand All @@ -168,7 +168,10 @@ def loglikelihood(resids: np.ndarray, sigma2: np.ndarray) -> float:
float: log-likelihood
"""
l = -0.5 * (np.log(2 * np.pi) + np.log(sigma2) + resids**2.0 / sigma2)
return np.sum(l)
if len(sigma2.shape)>1:
return np.sum(l, axis=1)
else:
return np.sum(l)

@staticmethod
def compute_variance(
Expand Down Expand Up @@ -209,28 +212,58 @@ def starting_values(self, resids: np.ndarray) -> List[float]:
Returns:
List[float]: [omega, alpha, beta]
"""
# Candidate target persistence - we don't have a prior knowledge here
persistence_grid = [0.5, 0.7, 0.9, 0.98]
# Candidate alpha values
alpha_grid = [0.01, 0.05, 0.1, 0.2]
# Sample variance
var = self.returns.var()
# Backcast initial value for the volaility process
initial_params = []
max_likelihood = -np.inf
for alpha, p in itertools.product(*[alpha_grid, persistence_grid]):
# Note that (alpha+beta) is the "persistence"
beta = p - alpha
# Note that the unconditional variance is omega/(1-persistence).
# As we are guessing the initial value, we use the sample variance
# as the unconditional variance to guess the omega value.
omega = var * (1 - p)
params = [omega, alpha, beta]
sigma2 = self.compute_variance(
params, resids, self.backcast_value, self.var_bounds
)
if -self.loglikelihood(resids, sigma2) > max_likelihood:
initial_params = params
def hierarchical_grid_search(resids, initial_range, base_grid_size=10, depth=2):
current_range = initial_range
best_params = None

for i in range(depth):
if i != 0:
# 更新搜索范围为最佳参数周围的较小区域
search_width = (current_range[:, 1] - current_range[:, 0]) / base_grid_size
current_range = np.array([
[best_params[1] - search_width[0]*2, best_params[1] + search_width[0]*2],
[best_params[2] - search_width[1]*2, best_params[2] + search_width[1]*2]
])
# 创建当前深度的网格
alpha_grid = np.linspace(current_range[0, 0], current_range[0, 1], base_grid_size)
p_grid = np.linspace(current_range[1, 0], current_range[1, 1], base_grid_size)
alpha_values, p_values = np.meshgrid(alpha_grid, p_grid)
parameters = np.stack([alpha_values.ravel(), p_values.ravel()], axis=1)

# 过滤无效组合
valid_indices = np.logical_and(parameters[:, 1] - parameters[:, 0] > 0, # beta > 0
parameters[:, 0] > 0) # alpha > 0
valid_parameters = parameters[valid_indices]

# 得到三个参数组成的二维矩阵
alpha = valid_parameters[:, 0]
p = valid_parameters[:, 1]
beta = p - alpha
omega = self.returns.var() * (1 - p)

omega = omega.reshape(-1, 1)
alpha = alpha.reshape(-1, 1)
beta = beta.reshape(-1, 1)

# Combine them into a new matrix
valid_parameters = np.hstack((omega, alpha, beta))
sigma2 = compute_variance(
valid_parameters, resids, self.backcast_value, self.var_bounds
)
ll_values = self.loglikelihood(resids, sigma2)
max_ll_index = np.argmax(ll_values)
best_params = valid_parameters[max_ll_index]

return best_params

# 初始化参数范围
initial_range = np.array([[0.01, 0.2], [0.5, 0.99]]) # [a_range, b_range]
base_grid_size = 5 # 初始网格大小
depth = 1 # 搜索深度,即细化的次数

# 调用函数
best_parameters = hierarchical_grid_search(resids, initial_range, base_grid_size, depth)
initial_params = best_parameters
return initial_params

@staticmethod
Expand Down Expand Up @@ -359,6 +392,9 @@ def fit(self) -> Parameters:
Returns:
List[float]: [mu, omega, alpha, gamma, beta, loglikelihood]
"""
# No repeated estimation?
if self.estimation_success:
return
starting_vals = self.preparation()

bounds = [
Expand Down Expand Up @@ -471,32 +507,119 @@ def starting_values(self, resids: np.ndarray) -> List[float]:
Returns:
List[float]: [omega, alpha, gamma, beta]
"""
# Candidate target persistence - we don't have a prior knowledge here
persistence_grid = [0.5, 0.7, 0.9, 0.98]
# Candidate alpha values
alpha_grid = [0.01, 0.05, 0.1, 0.2]
gamma_grid = alpha_grid
# Sample variance
var = self.returns.var()
# Backcast initial value for the volaility process
var_bounds = self.variance_bounds(resids)
backcast = self.backcast(resids)
initial_params = []
max_likelihood = -np.inf
# fmt: off
for alpha, gamma, p in itertools.product(*[alpha_grid, gamma_grid, persistence_grid]):
# Note that (alpha+beta) is the "persistence"
beta = p - alpha - gamma/2
# Note that the unconditional variance is omega/(1-persistence).
# As we are guessing the initial value, we use the sample variance
# as the unconditional variance to guess the omega value.
omega = var * (1 - p)
params = [omega, alpha, gamma, beta]
sigma2 = self.compute_variance(params, resids, backcast, var_bounds)
if -self.loglikelihood(resids, sigma2) > max_likelihood:
initial_params = params
def hierarchical_grid_search(resids, initial_range, base_grid_size=10, depth=2):
current_range = initial_range
best_params = None

for i in range(depth):
if i != 0:
# 更新搜索范围为最佳参数周围的较小区域
search_width = (current_range[:, 1] - current_range[:, 0]) / base_grid_size
current_range = np.array([
[best_params[1] - search_width[0]*2, best_params[1] + search_width[0]*2],
[best_params[2] - search_width[1]*2, best_params[2] + search_width[1]*2],
[best_params[3] - search_width[2]*2, best_params[3] + search_width[2]*2]
])
# 创建当前深度的网格
alpha_grid = np.linspace(current_range[0, 0], current_range[0, 1], base_grid_size)
gamma_grid = np.linspace(current_range[1, 0], current_range[1, 1], base_grid_size)
p_grid = np.linspace(current_range[2, 0], current_range[2, 1], base_grid_size)
alpha_values, gamma_values, p_values = np.meshgrid(alpha_grid, gamma_grid, p_grid)
parameters = np.stack([alpha_values.ravel(), gamma_values.ravel(), p_values.ravel()], axis=1)
# 过滤无效组合
valid_indices = np.logical_and(parameters[:, 2] - parameters[:, 0] - parameters[:, 1]/2 > 0, # beta > 0
parameters[:, 1] > 0, # gamma>0
parameters[:, 0] > 0) # alpha > 0
valid_parameters = parameters[valid_indices]

# 得到三个参数组成的二维矩阵
alpha = valid_parameters[:, 0]
gamma = valid_parameters[:, 1]
p = valid_parameters[:, 2]
beta = p - alpha - gamma/2
omega = self.returns.var() * (1 - p)

omega = omega.reshape(-1, 1)
alpha = alpha.reshape(-1, 1)
gamma = gamma.reshape(-1, 1)
beta = beta.reshape(-1, 1)

# Combine them into a new matrix
valid_parameters = np.hstack((omega, alpha, gamma, beta))
sigma2 = compute_variance(
valid_parameters, resids, self.backcast_value, self.var_bounds
)
ll_values = self.loglikelihood(resids, sigma2)
max_ll_index = np.argmax(ll_values)
best_params = valid_parameters[max_ll_index]

return best_params

# 初始化参数范围
initial_range = np.array([[0.01, 0.2], [0.01, 0.2], [0.5, 0.99]]) # [a_range, b_range]
base_grid_size = 5 # 初始网格大小
depth = 1 # 搜索深度,即细化的次数

# 调用函数
best_parameters = hierarchical_grid_search(resids, initial_range, base_grid_size, depth)
initial_params = best_parameters
return initial_params

from numba import jit
@jit(nopython=True, cache=True)
def bounds_check(sigma2: np.float64|np.ndarray, var_bounds:np.ndarray) -> np.float64|np.ndarray:
lower, upper = var_bounds[0], var_bounds[1]
if isinstance(sigma2, np.float64):
if sigma2 < lower:
sigma2 = lower
if sigma2 > upper:
if not np.isinf(sigma2):
sigma2 = upper + np.log(sigma2 / upper)
else:
sigma2 = upper + 1000.0
else:
sigma2 = np.maximum(lower, sigma2)
sigma2 = np.where(sigma2 > upper,
np.where(np.isinf(sigma2), upper + 1000.0, upper + np.log(sigma2 / upper)),
sigma2)
return sigma2

@jit(nopython=True, cache=True)
def compute_variance(params: np.ndarray, resids: np.ndarray, backcast: np.float64, var_bounds: np.ndarray) -> np.ndarray:
if params.shape[-1] > 3:
gjrgarch = True
else:
gjrgarch = False
if params.ndim > 1:
omega = params[:, 0]
alpha = params[:, 1]
if gjrgarch:
gamma = params[:, 2]
beta = params[:, 3]
else:
gamma = np.zeros(omega.shape)
beta = params[:, 2]
sigma2 = np.zeros((omega.shape[0], resids.shape[0]))
sigma2[:, 0] = omega + (alpha + gamma/2 + beta) * backcast
resids_squared = resids ** 2
for t in range(1, resids.shape[0]):
sigma2[:, t] = omega + alpha * resids_squared[t - 1] + beta * sigma2[:, t - 1]
sigma2[:, t] += gamma * resids_squared[t - 1] if resids[t - 1] < 0 else np.zeros_like(omega, dtype=np.float64)
sigma2[:, t] = bounds_check(sigma2[:, t], var_bounds[t])
else:
if gjrgarch:
omega, alpha, gamma, beta = params
else:
omega, alpha, beta = params
gamma = 0
sigma2 = np.zeros(resids.shape)
sigma2[0] = omega + (alpha + gamma/2 + beta) * backcast
for t in range(1, resids.shape[0]):
sigma2[t] = omega + alpha * (resids[t - 1] ** 2) + beta * sigma2[t - 1]
sigma2[t] += gamma * resids[t - 1] ** 2 if resids[t - 1] < 0 else 0
sigma2[t] = bounds_check(sigma2[t], var_bounds[t])
return sigma2


if __name__ == "__main__":
import pandas as pd
Expand Down
Loading