-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilt.py
181 lines (146 loc) · 5.32 KB
/
filt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import abc
import numpy as np
from scipy.stats import norm
from .util import window
from .padding import get_padder
class BaseSpatialFilter():
"""Base class for time series filtering.
"""
def __init__(self, win_size=3, padding="same", n_iter=1):
assert win_size % 2 == 1, "window size must be odd value."
assert padding in ("zero", "same", "identical"),\
"padding method has to be `zero`, `same` or `identical`."
self.win_size = win_size
self.padder = get_padder(padding, {"padding_size": win_size // 2})
self.med_idx = win_size // 2
self.n_iter = n_iter
def fit(self, seq):
self.seq_ = seq # keep original signal internal
# padding
self.seq_padded_ = self.padder.transform(seq)
return self
def transform(self, seq):
for i in range(self.n_iter):
x = self.seq_padded_
# do filtering for each sub-sequence
filt = []
for xs in window(x, n=self.win_size):
filt.append(self._filt(xs))
x = np.hstack(filt)
self.seq_padded_ = self.padder.transform(x)
return x
def fit_transform(self, seq):
self.fit(seq)
return self.transform(seq)
@abc.abstractmethod
def _filt(self, sub_seq):
"""Execute filtering for sub sequence.
"""
pass
class IdenticalFilter():
"""
"""
def fit(self, seq):
return self
def transform(self, seq):
return seq
def fit_transform(self, seq):
return seq
class BoxFilter(BaseSpatialFilter):
"""Box Filtering Class.
"""
def __init__(self, win_size, padding="same", n_iter=1):
super(BoxFilter, self).__init__(win_size, padding, n_iter)
self.weight = 1.0 / win_size * np.ones(win_size)
def _filt(self, sub_seq):
prod = self.weight.reshape(1, -1) @ sub_seq.reshape(-1, 1)
return prod[0, 0]
class GaussianFilter(BaseSpatialFilter):
"""Gaussian Filtering Class.
"""
def __init__(self, win_size, padding="same", n_iter=1, sigma_d=None):
super(GaussianFilter, self).__init__(win_size, padding, n_iter)
if sigma_d is None:
sigma_d = self._suggest_sigma_d()
self.sigma_d = sigma_d
self.weight = norm.pdf(np.arange(win_size), loc=self.med_idx, scale=self.sigma_d)
self.weight /= self.weight.sum()
def _filt(self, sub_seq):
prod = self.weight.reshape(1, -1) @ sub_seq.reshape(-1, 1)
return prod[0, 0]
def _suggest_sigma_d(self):
RATIO = 4
return self.win_size / (RATIO * 2)
class BilateralFilter(GaussianFilter):
"""Bilateral Filtering Class.
"""
def __init__(self, win_size, padding="same", n_iter=1,\
sigma_d=None, sigma_i=None):
super(BilateralFilter, self).__init__(win_size, padding, n_iter, sigma_d)
self.sigma_i = sigma_i
def _filt(self, sub_seq):
if self.sigma_i is None:
self.sigma_i = self._suggest_sigma_i()
w = norm.pdf(sub_seq, loc=sub_seq[self.med_idx], scale=self.sigma_i)
weight = self.weight * w
weight /= weight.sum()
prod = weight.reshape(1, -1) @ sub_seq.reshape(-1, 1)
return prod[0, 0]
def _suggest_sigma_i(self):
"""Suggest sigma param.
Estimate noise standard deviation.
"""
x = self.seq_
# 1% of total range
return (x.max() - x.min()) / 100.0
class NonLocalMeanFilter(BaseSpatialFilter):
"""NonLocalMeanFilter Class.
"""
def __init__(self, win_size, padding="same", n_iter=1,\
beta=1.0, th=None, delta=0.5, sigma=None):
super(NonLocalMeanFilter, self).__init__(win_size, padding, n_iter)
self.beta = beta
self.th = th
self.sigma = sigma
self.delta = delta
def _filt(self, sub_seq):
if self.sigma is None:
self.sigma = self._suggest_sigma()
self.th = self._suggest_th(sub_seq)
x_all = self.seq_padded_
weight = np.empty_like(self.seq_)
for i, xs in enumerate(window(x_all, n=self.win_size)):
d = np.linalg.norm((xs - sub_seq), ord=2)
if d < self.th:
weight[i] = np.exp(-d / (2 * self.beta * self.sigma * self.win_size))
else:
weight[i] = 0
if weight.sum() == 0:
return sub_seq[self.med_idx]
weight /= weight.sum()
prod = weight.reshape(1, -1) @ self.seq_.reshape(-1, 1)
self.w_ = weight
return prod[0, 0]
def _suggest_sigma(self):
"""Suggest sigma param.
Estimate noise standard deviation.
"""
x = self.seq_
# 1% of total range
return (x.max() - x.min()) / 100.0
def _suggest_th(self, sub_seq):
max_val = sub_seq.max()
min_val = sub_seq.min()
return self.delta * (max_val - min_val) * self.win_size
class ChainFilter():
def __init__(self, filters, keep_intermediate=False):
self.filters = filters
self.keep_intermediate = keep_intermediate
if keep_intermediate:
self.filt_res_ = list()
def fit_transform(self, seq):
for filt in self.filters:
seq = filt.fit_transform(seq)
if self.keep_intermediate:
self.filt_res_.append(seq)
return seq