-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
123 lines (98 loc) · 4.34 KB
/
utils.py
File metadata and controls
123 lines (98 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import numpy as np
def sample_HMM(parameters, T, seed=None):
"""Samples from an HMM (Gaussian outputs) with provided parameters.
Args:
parameters: A dictionary of HMM parameters.
Keys include: n_states (number of states),
init_prob (initial distribution of states),
trans_matrix (matrix of state transition probabilities, each row
sums to 1),
obs_dim (dimension of observations),
mean (mean of the emission distribution for every state),
cov (covariance matrix of the emission distribution).
T: Number of observations to sample.
"""
K = parameters["num_states"]
pi_0 = parameters["init_prob"]
A = parameters["trans_matrix"]
D = parameters["obs_dim"]
mean = parameters["mean"]
cov = parameters["cov"]
np.random.seed(seed)
# create empty numpy arrays to store samples
states = np.empty(T, np.int32)
obs = np.empty((T, D), np.float32)
for t in range(T):
if t == 0:
# sample the first state from initial distribution
states[t] = np.random.choice(K, p=pi_0)
else:
# get the next state based on transition matrix (the row
# corresponding to the previous state)
states[t] = np.random.choice(K, p=A[states[t - 1]])
# sample observation from the corresponding Gaussian distribution
obs[t] = np.random.multivariate_normal(
mean[states[t]], cov[states[t]])
return states, obs
def sample_HHMM(parameters, T, seed=None):
L = parameters["n_layers"]
K = parameters["n_states"]
pi_0 = parameters["init_prob"]
# the last term of each row is the probability of reaching end state
# (except the top layer)
A = parameters["trans_matrix"]
D = parameters["obs_dim"]
mean = parameters["mean"]
cov = parameters["cov"]
np.random.seed(seed)
# indicator for whether the current state at each layer ends
# (whether the child layer reaches end state)
end = np.full(L, False)
states = np.empty((T, L), np.int32)
obs = np.empty((T, D), np.float32)
for t in range(T):
if t == 0:
# at the first time point, draw from the initial distribution
# conditioned on the parent layer (except the top layer)
for l in range(L):
if l == 0:
states[t, l] = np.random.choice(K[l], p=pi_0[l][0])
else:
states[t, l] = np.random.choice(
K[l], p=pi_0[l][states[t, l - 1]])
# the last layer (with production states) always transitions
end[-1] = True
else:
for l in range(L):
# the top layer either transitions or not
if l == 0:
if not end[l]:
states[t, l] = states[t - 1, l]
else:
states[t, l] = np.random.choice(
K[l], p=A[l][0][states[t - 1, l]])
else:
# no transition if the child layer doesn't reach end state
if not end[l]:
states[t, l] = states[t - 1, l]
# transition conditioned on the parent layer if end state
# not reached at t - 1
elif not end[l - 1]:
states[t, l] = np.random.choice(
K[l],
p=A[l][states[t, l - 1]][states[t - 1, l]][:-1])
# resample from the initial distribution conditioned on
# the parent layer if end state reached at t - 1
else:
states[t, l] = np.random.choice(
K[l], p=pi_0[l][states[t, l - 1]])
for l in range(L - 1, 0, -1):
# reach end state with some probability conditioned on
# the parent layer only if the child layer reaches end state
if (end[l] and np.random.uniform()
<= A[l][states[t, l - 1]][states[t, l]][-1]):
end[l - 1] = True
# sample observation from the corresponding Gaussian distribution
obs[t] = np.random.multivariate_normal(
mean[states[t, -1]], cov[states[t, -1]])
return states, obs