-
Notifications
You must be signed in to change notification settings - Fork 90
/
Copy pathsignal_processing.py
223 lines (179 loc) · 8.02 KB
/
signal_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import string
import threading, sys
import librosa
import numpy as np
import scipy
from numpy.fft import rfft, irfft
from scipy import signal
from scipy.io.wavfile import write as wav_write
sys.path.append('.')
from local.utils import segment_axis
def _samples_to_stft_frames(samples, size, shift):
"""
Calculates STFT frames from samples in time domain.
:param samples: Number of samples in time domain.
:param size: FFT size.
:param shift: Hop in samples.
:return: Number of STFT frames.
"""
return np.ceil((float(samples) - size + shift) / shift).astype(np.int)
def _stft_frames_to_samples(frames, size, shift):
"""
Calculates samples in time domain from STFT frames
:param frames: Number of STFT frames.
:param size: FFT size.
:param shift: Hop in samples.
:return: Number of samples in time domain.
"""
return frames * shift + size - shift
def _biorthogonal_window_loopy(analysis_window, shift):
"""
This version of the synthesis calculation is as close as possible to the
Matlab impelementation in terms of variable names.
The results are equal.
The implementation follows equation A.92 in
Krueger, A. Modellbasierte Merkmalsverbesserung zur robusten automatischen
Spracherkennung in Gegenwart von Nachhall und Hintergrundstoerungen
Paderborn, Universitaet Paderborn, Diss., 2011, 2011
"""
fft_size = len(analysis_window)
assert np.mod(fft_size, shift) == 0
number_of_shifts = len(analysis_window) // shift
sum_of_squares = np.zeros(shift)
for synthesis_index in range(0, shift):
for sample_index in range(0, number_of_shifts + 1):
analysis_index = synthesis_index + sample_index * shift
if analysis_index + 1 < fft_size:
sum_of_squares[synthesis_index] \
+= analysis_window[analysis_index] ** 2
sum_of_squares = np.kron(np.ones(number_of_shifts), sum_of_squares)
synthesis_window = analysis_window / sum_of_squares / fft_size
return synthesis_window
def audioread(path, offset=0.0, duration=None, sample_rate=16000):
"""
Reads a wav file, converts it to 32 bit float values and reshapes accoring
to the number of channels.
Now, this is a wrapper of librosa with our common defaults.
:param path: Absolute or relative file path to audio file.
:type: String.
:param offset: Begin of loaded audio.
:type: Scalar in seconds.
:param duration: Duration of loaded audio.
:type: Scalar in seconds.
:param sample_rate: Sample rate of audio
:type: scalar in number of samples per second
:return:
"""
signal = librosa.load(path, sr=sample_rate, mono=False, offset=offset, duration=duration)
return signal[0]
def stft(time_signal, time_dim=None, size=1024, shift=256,
window=signal.blackman, fading=True, window_length=None):
"""
Calculates the short time Fourier transformation of a multi channel multi
speaker time signal. It is able to add additional zeros for fade-in and
fade out and should yield an STFT signal which allows perfect
reconstruction.
:param time_signal: multi channel time signal.
:param time_dim: Scalar dim of time.
Default: None means the biggest dimension
:param size: Scalar FFT-size.
:param shift: Scalar FFT-shift. Typically shift is a fraction of size.
:param window: Window function handle.
:param fading: Pads the signal with zeros for better reconstruction.
:param window_length: Sometimes one desires to use a shorter window than
the fft size. In that case, the window is padded with zeros.
The default is to use the fft-size as a window size.
:return: Single channel complex STFT signal
with dimensions frames times size/2+1.
"""
if time_dim is None:
time_dim = np.argmax(time_signal.shape)
# Pad with zeros to have enough samples for the window function to fade.
if fading:
pad = [(0, 0)] * time_signal.ndim
pad[time_dim] = [size - shift, size - shift]
time_signal = np.pad(time_signal, pad, mode='constant')
# Pad with trailing zeros, to have an integral number of frames.
frames = _samples_to_stft_frames(time_signal.shape[time_dim], size, shift)
samples = _stft_frames_to_samples(frames, size, shift)
pad = [(0, 0)] * time_signal.ndim
pad[time_dim] = [0, samples - time_signal.shape[time_dim]]
time_signal = np.pad(time_signal, pad, mode='constant')
if window_length is None:
window = window(size)
else:
window = window(window_length)
window = np.pad(window, (0, size - window_length), mode='constant')
time_signal_seg = segment_axis(time_signal, size,
size - shift, axis=time_dim)
letters = string.ascii_lowercase
mapping = letters[:time_signal_seg.ndim] + ',' + letters[time_dim + 1] \
+ '->' + letters[:time_signal_seg.ndim]
return rfft(np.einsum(mapping, time_signal_seg, window),
axis=time_dim + 1)
def istft(stft_signal, size=1024, shift=256,
window=signal.blackman, fading=True, window_length=None):
"""
Calculated the inverse short time Fourier transform to exactly reconstruct
the time signal.
:param stft_signal: Single channel complex STFT signal
with dimensions frames times size/2+1.
:param size: Scalar FFT-size.
:param shift: Scalar FFT-shift. Typically shift is a fraction of size.
:param window: Window function handle.
:param fading: Removes the additional padding, if done during STFT.
:param window_length: Sometimes one desires to use a shorter window than
the fft size. In that case, the window is padded with zeros.
The default is to use the fft-size as a window size.
:return: Single channel complex STFT signal
:return: Single channel time signal.
"""
assert stft_signal.shape[1] == size // 2 + 1
if window_length is None:
window = window(size)
else:
window = window(window_length)
window = np.pad(window, (0, size - window_length), mode='constant')
window = _biorthogonal_window_loopy(window, shift)
# Why? Line created by Hai, Lukas does not know, why it exists.
window *= size
time_signal = scipy.zeros(stft_signal.shape[0] * shift + size - shift)
for j, i in enumerate(range(0, len(time_signal) - size + shift, shift)):
time_signal[i:i + size] += window * np.real(irfft(stft_signal[j]))
# Compensate fade-in and fade-out
if fading:
time_signal = time_signal[size - shift:len(time_signal) - (size - shift)]
return time_signal
def audiowrite(data, path, samplerate=16000, normalize=False, threaded=True):
""" Write the audio data ``data`` to the wav file ``path``
The file can be written in a threaded mode. In this case, the writing
process will be started at a separate thread. Consequently, the file will
not be written when this function exits.
:param data: A numpy array with the audio data
:param path: The wav file the data should be written to
:param samplerate: Samplerate of the audio data
:param normalize: Normalize the audio first so that the values are within
the range of [INTMIN, INTMAX]. E.g. no clipping occurs
:param threaded: If true, the write process will be started as a separate
thread
:return: The number of clipped samples
"""
data = data.copy()
int16_max = np.iinfo(np.int16).max
int16_min = np.iinfo(np.int16).min
if normalize:
if not data.dtype.kind == 'f':
data = data.astype(np.float)
data /= np.max(np.abs(data))
if data.dtype.kind == 'f':
data *= int16_max
sample_to_clip = np.sum(data > int16_max)
if sample_to_clip > 0:
print('Warning, clipping {} samples'.format(sample_to_clip))
data = np.clip(data, int16_min, int16_max)
data = data.astype(np.int16)
if threaded:
threading.Thread(target=wav_write, args=(path, samplerate, data)).start()
else:
wav_write(path, samplerate, data)
return sample_to_clip