-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmarchenko_pastur.py
130 lines (116 loc) · 4.94 KB
/
marchenko_pastur.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import numpy as np
import pandas as pd
from sklearn.neighbors import KernelDensity
from scipy.optimize import minimize
class Signal:
# Private Method
# -----------------------------------------------------------------------------------------------------------------------------------------------------------
def __init__(self, data):
"""
:param correlation_matrix: Correlation matrix of the set of returns.
"""
self.data = data
pass
def mpPDF(self, var, q, pts):
"""
Creates a Marchenko-Pastur Probability Density Function
Args:
var (float): Variance
q (float): T/N where T is the number of rows and N the number of columns
pts (int): Number of points used to construct the PDF
Returns:
pd.Series: Marchenko-Pastur PDF
"""
# Marchenko-Pastur pdf
# q=T/N
# Adjusting code to work with 1 dimension arrays
if isinstance(var, np.ndarray):
if var.shape == (1,):
var = var[0]
eMin, eMax = var * (1 - (1. / q) ** .5) ** 2, var * (1 + (1. / q) ** .5) ** 2
eVal = np.linspace(eMin, eMax, pts)
pdf = q / (2 * np.pi * var * eVal) * ((eMax - eVal) * (eVal - eMin)) ** .5
pdf = pd.Series(pdf, index=eVal)
return pdf
def getPCA(self, matrix):
"""
Gets the Eigenvalues and Eigenvector values from a Hermitian Matrix
Args:
matrix pd.DataFrame: Correlation matrix
Returns:
(tuple): tuple containing:
np.ndarray: Eigenvalues of correlation matrix
np.ndarray: Eigenvectors of correlation matrix
"""
# Get eVal,eVec from a Hermitian matrix
eVal, eVec = np.linalg.eigh(matrix)
indices = eVal.argsort()[::-1] # arguments for sorting eVal desc
eVal, eVec = eVal[indices], eVec[:, indices]
eVal = np.diagflat(eVal)
return eVal, eVec
def fitKDE(self, obs, bWidth=.25, kernel='gaussian', x=None):
"""
Fit kernel to a series of obs, and derive the prob of obs x is the array of values
on which the fit KDE will be evaluated. It is the empirical PDF
Args:
obs (np.ndarray): observations to fit. Commonly is the diagonal of Eigenvalues
bWidth (float): The bandwidth of the kernel. Default is .25
kernel (str): The kernel to use. Valid kernels are [‘gaussian’|’tophat’|
’epanechnikov’|’exponential’|’linear’|’cosine’] Default is ‘gaussian’.
x (np.ndarray): x is the array of values on which the fit KDE will be evaluated
Returns:
pd.Series: Empirical PDF
"""
if len(obs.shape) == 1:
obs = obs.reshape(-1, 1)
kde = KernelDensity(kernel=kernel, bandwidth=bWidth).fit(obs)
if x is None:
x = np.unique(obs).reshape(-1, 1)
if len(x.shape) == 1:
x = x.reshape(-1, 1)
logProb = kde.score_samples(x) # log(density)
pdf = pd.Series(np.exp(logProb), index=x.flatten())
return pdf
def errPDFs(self, var, eVal, q, bWidth, pts=1000):
"""
Fit error of Empirical PDF (uses Marchenko-Pastur PDF)
Args:
var (float): Variance
eVal (np.ndarray): Eigenvalues to fit.
q (float): T/N where T is the number of rows and N the number of columns
bWidth (float): The bandwidth of the kernel.
pts (int): Number of points used to construct the PDF
Returns:
float: sum squared error
"""
# Fit error
pdf0 = self.mpPDF(var, q, pts) # theoretical pdf
pdf1 = self.fitKDE(eVal, bWidth, x=pdf0.index.values) # empirical pdf
sse = np.sum((pdf1 - pdf0) ** 2)
return sse
def findMaxEval(self, data, bWidth=0.01):
"""
Find max random eVal by fitting Marchenko’s dist (i.e) everything else larger than
this, is a signal eigenvalue
Args:
data (pd.DataFrame): Time series data
q (float): T/N where T is the number of rows and N the number of columns
bWidth (float): The bandwidth of the kernel.
Returns:
(tuple): tuple containing:
float: Maximum random eigenvalue
float: Variance attributed to noise (1-result) is one way to measure
signal-to-noise
"""
corr0 = data.corr()
eVal0, eVec0 = self.getPCA(corr0)
q = data.shape[0] / data.shape[1]
out = minimize(lambda *x: self.errPDFs(*x), 0.5, args=(np.diag(eVal0), q, bWidth),
bounds=((1E-5, 1 - 1E-5),))
if out['success']:
var = out['x'][0]
else:
var = 1
eMax = var * (1 + (1. / q) ** .5) ** 2
nFacts0 = eVal0.shape[0] - np.diag(eVal0)[::-1].searchsorted(eMax)
return eMax, var, nFacts0