|
| 1 | +""" |
| 2 | +Blog post GMM: https://brilliant.org/wiki/gaussian-mixture-model/ |
| 3 | +""" |
| 4 | +import torch |
| 5 | +import math |
| 6 | +from sklearn.datasets import load_iris |
| 7 | +from sklearn.metrics import accuracy_score |
| 8 | +from sklearn.model_selection import train_test_split |
| 9 | + |
| 10 | +class GMM: |
| 11 | + def __init__(self, k, max_epochs=100, tolerance=1e-8): |
| 12 | + """ |
| 13 | + :param k: the number of clusters the algorithm will form. |
| 14 | + :param max_epochs: The number of iterations the algorithm will run for if it does |
| 15 | + not converge before that. |
| 16 | + :param tolerance: float |
| 17 | + If the difference of the results from one iteration to the next is |
| 18 | + smaller than this value we will say that the algorithm has converged. |
| 19 | + """ |
| 20 | + self.k = k |
| 21 | + self.parameters = [] |
| 22 | + self.max_epochs = max_epochs |
| 23 | + self.tolerance = tolerance |
| 24 | + self.responsibility = None |
| 25 | + self.responsibilities = [] |
| 26 | + self.sample_assignments = None |
| 27 | + |
| 28 | + def normalization(self, X): |
| 29 | + """ |
| 30 | + :param X: Input tensor |
| 31 | + :return: Normalized input using l2 norm. |
| 32 | + """ |
| 33 | + l2 = torch.norm(X, p=2, dim=-1) |
| 34 | + l2[l2 == 0] = 1 |
| 35 | + return X / l2.unsqueeze(1) |
| 36 | + |
| 37 | + def covariance_matrix(self, X): |
| 38 | + """ |
| 39 | + :param X: Input tensor |
| 40 | + :return: cavariance of input tensor |
| 41 | + """ |
| 42 | + centering_X = X - torch.mean(X, dim=0) |
| 43 | + cov = torch.mm(centering_X.T, centering_X) / (centering_X.shape[0] - 1) |
| 44 | + return cov |
| 45 | + |
| 46 | + def random_gaussian_initialization(self, X): |
| 47 | + """ |
| 48 | + Since we are using iris dataset, we know the no. of class is 3. |
| 49 | + We create three gaussian distribution representing each class with |
| 50 | + random sampling of data to find parameters like μ and 𝚺/N (covariance matrix) |
| 51 | + for each class |
| 52 | + :param X: input tensor |
| 53 | + :return: 3 randomly selected mean and covariance of X, each act as a separate cluster |
| 54 | + """ |
| 55 | + n_samples = X.shape[0] |
| 56 | + self.prior = (1 / self.k) * torch.ones(self.k) |
| 57 | + for cls in range(self.k): |
| 58 | + parameter = {} |
| 59 | + parameter['mean'] = X[torch.randperm(n_samples)[:1]] |
| 60 | + parameter['cov'] = self.covariance_matrix(X) |
| 61 | + self.parameters.append(parameter) |
| 62 | + |
| 63 | + def multivariate_gaussian_distribution(self, X, parameters): |
| 64 | + """ |
| 65 | + Checkout the equation from Multi-Dimensional Model from blog link posted above. |
| 66 | + We find the likelihood of each sample w.r.t to the parameters initialized above for each separate cluster. |
| 67 | + :param X: Input tensor |
| 68 | + :param parameters: mean, cov of the randomly initialized gaussian |
| 69 | + :return: Likelihood of each sample belonging to a cluster with random initialization of mean and cov. |
| 70 | + Since it is a multivariate problem we have covariance and not variance. |
| 71 | + """ |
| 72 | + n_features = X.shape[1] |
| 73 | + mean = parameters['mean'] |
| 74 | + cov = parameters['cov'] |
| 75 | + determinant = torch.det(cov) |
| 76 | + likelihoods = torch.zeros(X.shape[0]) |
| 77 | + for i, sample in enumerate(X): |
| 78 | + dim = torch.scalar_tensor(n_features, dtype=torch.float) |
| 79 | + coefficients = 1.0/ torch.sqrt(torch.pow((2.0 * math.pi), dim) * determinant) |
| 80 | + exponent = torch.exp( -0.5 * torch.mm(torch.mm((sample - mean) ,torch.pinverse(cov)) , (sample - mean).T)) |
| 81 | + likelihoods[i] = coefficients * exponent |
| 82 | + |
| 83 | + return likelihoods |
| 84 | + |
| 85 | + def get_likelihood(self, X): |
| 86 | + """ |
| 87 | + Previously, we have initialized 3 different mean and covariance in random_gaussian_initialization(). Now around |
| 88 | + each of these mean and cov, we see likelihood of the each sample using multivariate gaussian distribution. |
| 89 | + :param X: |
| 90 | + :return: Storing the likelihood of each sample belonging to a cluster with random initialization of mean and cov. |
| 91 | + Since it is a multivariate problem we have covariance and not variance. |
| 92 | + """ |
| 93 | + n_samples = X.shape[0] |
| 94 | + likelihoods_cls = torch.zeros((n_samples, self.k)) |
| 95 | + for cls in range(self.k): |
| 96 | + likelihoods_cls[:, cls] = self.multivariate_gaussian_distribution(X, self.parameters[cls]) |
| 97 | + |
| 98 | + return likelihoods_cls |
| 99 | + |
| 100 | + def expectation(self, X): |
| 101 | + """ |
| 102 | + Expectation Maximization Algorithm is used to find the optimized value of randomly initialized mean and cov. |
| 103 | + Expectation refers to probability. Here, It calculates the probabilities of X belonging to different cluster. |
| 104 | + :param X: input tensor |
| 105 | + :return: Max probability of each sample belonging to a particular class. |
| 106 | + """ |
| 107 | + weighted_likelihood = self.get_likelihood(X) * self.prior |
| 108 | + sum_likelihood = torch.sum(weighted_likelihood, dim=1).unsqueeze(1) |
| 109 | + # Determine responsibility as P(X|y)*P(y)/P(X) |
| 110 | + # responsibility stores each sample's probability score corresponding to each class |
| 111 | + self.responsibility = weighted_likelihood /sum_likelihood |
| 112 | + # Assign samples to cluster that has largest probability |
| 113 | + self.sample_assignments = self.responsibility.argmax(dim=1) |
| 114 | + # Save value for convergence check |
| 115 | + self.responsibilities.append(torch.max(self.responsibility, dim=1)) |
| 116 | + |
| 117 | + def maximization(self, X): |
| 118 | + """ |
| 119 | + Iterate through clusters and updating mean and covariance. |
| 120 | + Finding updated mean and covariance using probability score of each sample w.r.t each class |
| 121 | + :param X: |
| 122 | + :return: Updated mean, covariance and priors |
| 123 | + """ |
| 124 | + for i in range(self.k): |
| 125 | + resp = self.responsibility[:, i].unsqueeze(1) |
| 126 | + mean = torch.sum(resp * X, dim=0) / torch.sum(resp) |
| 127 | + covariance = torch.mm((X - mean).T, (X - mean) * resp) / resp.sum() |
| 128 | + self.parameters[i]['mean'], self.parameters[i]['cov'] = mean.unsqueeze(0), covariance |
| 129 | + |
| 130 | + n_samples = X.shape[0] |
| 131 | + self.prior = self.responsibility.sum(dim=0) / n_samples |
| 132 | + |
| 133 | + def convergence(self, X): |
| 134 | + """Convergence if || likehood - last_likelihood || < tolerance """ |
| 135 | + if len(self.responsibilities) < 2: |
| 136 | + return False |
| 137 | + difference = torch.norm(self.responsibilities[-1].values - self.responsibilities[-2].values) |
| 138 | + return difference <= self.tolerance |
| 139 | + |
| 140 | + def predict(self, X): |
| 141 | + self.random_gaussian_initialization(X) |
| 142 | + |
| 143 | + for _ in range(self.max_epochs): |
| 144 | + self.expectation(X) |
| 145 | + self.maximization(X) |
| 146 | + break |
| 147 | + |
| 148 | + if self.convergence(X): |
| 149 | + break |
| 150 | + |
| 151 | + self.expectation(X) |
| 152 | + return self.sample_assignments |
| 153 | + |
| 154 | +if __name__ == '__main__': |
| 155 | + iris = load_iris() |
| 156 | + torch.manual_seed(0) |
| 157 | + X = torch.tensor(iris.data, dtype=torch.float) |
| 158 | + y = torch.tensor(iris.target) |
| 159 | + n_classes = len(torch.unique(y)) |
| 160 | + x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2) |
| 161 | + gmm = GMM(k=n_classes, max_epochs=2000) |
| 162 | + y_pred = gmm.predict(x_train) |
| 163 | + print(f'Accuracy Score: {accuracy_score(y_train, y_pred)}') |
| 164 | + |
0 commit comments