-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbicoherence.py
156 lines (120 loc) · 3.98 KB
/
bicoherence.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from __future__ import division
import numpy as np
from scipy.linalg import hankel
import scipy.signal
import scipy.io
import matplotlib.pyplot as plt
def flat_eq(x, y):
"""
Emulate MATLAB's assignment of the form
x(:) = y
"""
z = x.reshape(1, -1)
z = y
return z.reshape(x.shape)
def nextpow2(num):
'''
Returns the next highest power of 2 from the given value.
Example
-------
>>>nextpow2(1000)
1024
>>nextpow2(1024)
2048
Taken from: https://github.com/alaiacano/frfft/blob/master/frfft.py
'''
npow = 2
while npow <= num:
npow = npow * 2
return npow
def bicoherence(y, time_step, nfft=None, wind=None, nsamp=None, overlap=None, disp=False):
"""
Direct (FD) method for estimating bicoherence
Parameters:
y - data vector or time-series
nfft - fft length [default = power of two > segsamp]
actual size used is power of two greater than 'nsamp'
wind - specifies the time-domain window to be applied to each
data segment; should be of length 'segsamp' (see below);
otherwise, the default Hanning window is used.
segsamp - samples per segment [default: such that we have 8 segments]
- if x is a matrix, segsamp is set to the number of rows
overlap - percentage overlap, allowed range [0,99]. [default = 50];
- if x is a matrix, overlap is set to 0.
Output:
bic - estimated bicoherence: an nfft x nfft array, with origin
at the center, and axes pointing down and to the right.
waxis - vector of frequencies associated with the rows and columns
of bic; sampling frequency is assumed to be 1.
"""
# Parameter checks
(ly, nrecs) = y.shape
if ly == 1:
y = y.reshape(1, -1)
ly = nrecs
nrecs = 1
if not nfft: nfft = 128
if not overlap: overlap = 50
if nrecs > 1: overlap = 0
if not nsamp: nsamp = 0
if nrecs > 1: nsamp = ly
if nrecs > 1 and nsamp <= 0:
nsamp = np.fix(ly / (8 - 7 * overlap/100))
if nfft < nsamp:
nfft = 2**nextpow2(nsamp)
overlap = np.fix(nsamp * overlap/100)
nadvance = nsamp - overlap
nrecs = np.fix ((ly*nrecs - overlap) / nadvance)
if not wind:
wind = np.hanning(nsamp)
try:
(rw, cw) = wind.shape
except ValueError:
(rw,) = wind.shape
cw = 1
if min(rw, cw) == 1 or max(rw, cw) == nsamp:
print "Segment size is " + str(nsamp)
print "Wind array is " + str(rw) + " by " + str(cw)
print "Using default Hanning window"
wind = np.hanning(nsamp)
wind = wind.reshape(1,-1)
# Accumulate triple products
bic = np.zeros([nfft, nfft])
Pyy = np.zeros([nfft,1])
mask = hankel(np.arange(nfft),np.array([nfft-1]+range(nfft-1)))
Yf12 = np.zeros([nfft,nfft])
ind = np.arange(nsamp)
y = y.ravel(order='F')
for k in xrange(nrecs):
ys = y[ind]
#ys = (ys.reshape(1,-1)-np.mean(ys))*wind
ys = scipy.signal.detrend(ys).reshape(1,-1)*wind
Yf = np.fft.fft(ys, nfft)/nsamp
CYf = np.conjugate(Yf)
Pyy += flat_eq(Pyy, (Yf*CYf))
Yf12 = flat_eq(Yf12, CYf.ravel(order='F')[mask])
bic += ((Yf * np.transpose(Yf)) * Yf12)
ind += int(nadvance)
bic = bic / nrecs
Pyy = Pyy / nrecs
mask = flat_eq(mask, Pyy.ravel(order='F')[mask])
bic = abs(bic)**2 / ((Pyy * np.transpose(Pyy)) * mask)
bic = np.fft.fftshift(bic)
if nfft%2 == 0:
waxis = np.transpose(np.arange(-nfft/2, nfft/2)) / (nfft*time_step)
else:
waxis = np.transpose(np.arange(-1*(nfft-1)/2, (nfft-1)/2+1)) / (nfft*time_step)
colmax, row = bic.max(0), bic.argmax(0)
maxval, col = colmax.max(0), colmax.argmax(0)
print 'Max: bic('+str(waxis[col])+','+str(waxis[col])+') = '+str(maxval)
if disp:
cont = plt.contourf(waxis, waxis, bic, 100, cmap=plt.cm.Spectral_r, vmin=0, vmax=1)
plt.colorbar()
plt.title('Bicoherence estimated via the direct (FFT) method')
plt.xlabel('f1 (Hz)')
plt.ylabel('f2 (Hz)')
plt.show()
return (bic, waxis)
def demo():
qpc = scipy.io.loadmat('resources/qpc.mat')['zmat']
bicoherence(qpc, 1., disp=True)