-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathRNNTestMulti.py
134 lines (110 loc) · 5.37 KB
/
RNNTestMulti.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# -*- coding: utf-8 -*-
"""
Réseau de neurones récurrent, modèle de langue par mot, pour paroles de chansons
Version avec couche vectorisation de mots et RNN
"""
import torch
torch.manual_seed(0) # Pour résultats reproductibles
from torch import nn
import pandas as pd
from collections import Counter
# Déterminer si un GPU est disponible
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('Entrainement sur ',device)
class DatasetParoles(torch.utils.data.Dataset):
""" Créer un Dataset avec les paroles de la colonne Lyric du fichier
https://www.kaggle.com/neisse/scrapped-lyrics-from-6-genres?select=lyrics-data.csv
taille_sequence : taille d'une séquence de mots pour le modèle de langue
Le texte est découpé en séquences de la taille taille_sequence
"""
def __init__(self,taille_sequence=4):
self.taille_sequence = taille_sequence
self.mots = self.charger_mots()
self.mots_uniques = self.chercher_mots_uniques()
self.index_a_mot = {index: mot for index, mot in enumerate(self.mots_uniques)}
self.mot_a_index = {mot: index for index, mot in enumerate(self.mots_uniques)}
self.mots_indexes = [self.mot_a_index[w] for w in self.mots]
def charger_mots(self):
dataframe_entrainement = pd.read_csv('lyrics-data.csv')
texte_concatene = dataframe_entrainement.iloc[0:5]['Lyric'].str.cat(sep=' ')
return texte_concatene.split(' ')
def chercher_mots_uniques(self):
frequence_mot = Counter(self.mots)
return sorted(frequence_mot, key=frequence_mot.get, reverse=True)
def __len__(self):
return len(self.mots_indexes) - self.taille_sequence
def __getitem__(self, index):
return (
torch.tensor(self.mots_indexes[index:index+self.taille_sequence]),
torch.tensor(self.mots_indexes[index+1:index+self.taille_sequence+1]),
)
class Modele(nn.Module):
"""Modèle de RNR avec une couche vectorisation, suivie d'une couche RNN et d'une couche linéaire"""
def __init__(self, ds_paroles):
super(Modele, self).__init__()
self.taille_H_RNN = 128
self.taille_vectorisation_mots = 64
self.nombre_couches_RNR = 1
taille_vocabulaire = len(ds_paroles.mots_uniques)
self.vectorisation_mots = nn.Embedding(num_embeddings=taille_vocabulaire,
embedding_dim=self.taille_vectorisation_mots)
self.rnn = nn.RNN(input_size=self.taille_vectorisation_mots,hidden_size=self.taille_H_RNN,
num_layers=self.nombre_couches_RNR,batch_first=True)
self.fc = nn.Linear(self.taille_H_RNN, taille_vocabulaire)
def forward(self, lot_X, etat_0):
vectorisation = self.vectorisation_mots(lot_X)
lot_Ht, etat = self.rnn(vectorisation, etat_0)
lot_Yt = self.fc(lot_Ht)
return lot_Yt, etat
def initializer_etat(self, taille_sequence):
return (torch.zeros(self.nombre_couches_RNR, taille_sequence, self.taille_H_RNN))
ds_paroles = DatasetParoles(taille_sequence=6)
modele = Modele(ds_paroles)
# Placer le modèle en mode GPU si possible
modele = modele.to(device)
import torch
import numpy as np
from torch import nn, optim
from torch.utils.data import DataLoader
import time
def entrainer_RNR(ds_paroles, modele, taille_lot=32, epochs=10, taille_sequence=6):
debut = time.time()
modele.train()
dl_paroles = DataLoader(ds_paroles,batch_size=taille_lot)
fonction_cout = nn.CrossEntropyLoss()
optimizeur = optim.Adam(modele.parameters(), lr=0.001)
for epoch in range(epochs):
for lot, (lot_X, lot_Y) in enumerate(dl_paroles):
lot_X = lot_X.to(device)
lot_Y = lot_Y.to(device)
etat = modele.initializer_etat(lot_X.shape[0])
etat = etat.to(device)
optimizeur.zero_grad()
lot_Y_predictions, etat = modele(lot_X, etat)
cout = fonction_cout(lot_Y_predictions.transpose(1, 2), lot_Y)
cout.backward()
optimizeur.step()
if lot%100 == 0:
print(f'-------- > epoch {epoch} lot {lot} : coût = {cout.item()}')
temps_ecoule = time.time() - debut
print('Temps écoulé : {:.0f}m {:.0f}s'.format(temps_ecoule // 60, temps_ecoule % 60))
def predire(ds, modele, debut_texte, nb_mots=20):
""" Prédire une suite de nb_mots à partir de debut_texte selon le modele"""
mots = debut_texte.split(' ')
modele.eval()
etat = modele.initializer_etat(1)
etat = etat.to(device)
for i in range(0, nb_mots):
lot_X = torch.tensor([[ds.mot_a_index[m] for m in mots[i:]]])
lot_X = lot_X.to(device)
lot_Y_predictions, etat = modele(lot_X, etat)
dernier_mot_Yt = lot_Y_predictions[0][-1]
#probs_dernier_mot = torch.nn.functional.softmax(dernier_mot_Yt, dim=0).detach().numpy()
#index_mot_choisi = np.random.choice(len(dernier_mot_Yt), p=probs_dernier_mot)
probs_dernier_mot = torch.nn.functional.softmax(dernier_mot_Yt, dim=0).data
#index_mot_choisi = torch.max(probs_dernier_mot, dim=0)[1].item()
index_mot_choisi = torch.multinomial(probs_dernier_mot, 1)[0].item()
mots.append(ds.index_a_mot[index_mot_choisi])
return mots
entrainer_RNR(ds_paroles, modele, taille_lot=32, epochs=2, taille_sequence=6)
print(predire(ds_paroles, modele, debut_texte='There was'))