forked from cafeelmore/Pretty-Pytorch-Text-Classification
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
130 lines (115 loc) · 4.53 KB
/
utils.py
File metadata and controls
130 lines (115 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# coding: UTF-8
import os
import pandas as pd
import torch
import numpy as np
import random
import pickle as pkl
from tqdm import tqdm
import time
from datetime import timedelta
from twitter_preprocessor import TwitterPreprocessor
from text_cleaner import TextCleaner
CLS, SEP = '[CLS]','[SEP]'
def set_random_state(seed):
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True # Ensure the results can be replicated
def build_dataset(config):
'''build train, dev, test set'''
def load_dataset(path):
contents = []
label_dict = config.label_dict
content_key = config.content_key
label_key = config.label_key
data = pd.read_csv(path) # read data
df = pd.DataFrame(data) # build DataFrame
df[content_key] = df[content_key].apply(str)
pad_size = config.pad_size # get padding size
tokenizer = config.tokenizer # get the tokenizer
text_cleaner = TextCleaner() # get text cleaner
for idx in tqdm(range(len(df))):
content = df[content_key].iloc[idx]
label = df[label_key].iloc[idx]
label = label_dict[label]
p = TwitterPreprocessor(content)
p.fully_preprocess() # clean the textual content of current instances
clean_content = p.text
if len(clean_content) == 0: # skip the instances with no words after preprocessing
continue
token = tokenizer.tokenize(clean_content)
seq_len = len(token) # length of sequence before padding
token_ids = tokenizer.encode(
token,
add_special_tokens = True, # add [CLS] and [SEP] special tokens
max_length = pad_size,
padding = 'max_length',
truncation=True,
)
if seq_len < pad_size:
mask = [1] * seq_len + [0] * (pad_size - len(token)) #generate mask sequence
else:
mask = [1] * pad_size
content = (token_ids, label, seq_len, mask)
contents.append(content)
return contents
train = load_dataset(config.train_path)
random.shuffle(train)
dev = load_dataset(config.dev_path)
test = load_dataset(config.test_path)
return train, dev, test
class DatasetIterator(object):
'''Dataset Iterator to generate mini-batches for model training.
Params:
batches: input dataset
batch_size: size of mini-batches
device: computing device
'''
def __init__(self, batches, batch_size, device):
self.batch_size = batch_size
self.batches = batches
self.n_batches = len(batches) // batch_size
self.residue = False # record if number of batches is an int
if len(batches) % self.n_batches != 0:
self.residue = True
self.index = 0
self.device = device
def _to_tensor(self, data):
'''convert data to tensor '''
x = torch.LongTensor([_[0] for _ in data]).to(self.device)
y = torch.LongTensor([_[1] for _ in data]).to(self.device)
seq_len = torch.LongTensor([_[2] for _ in data]).to(self.device)
mask = torch.LongTensor([_[3] for _ in data]).to(self.device)
return (x, seq_len, mask), y
def __next__(self):
'''get next batch'''
if self.residue and self.index == self.n_batches:
batches = self.batches[self.index * self.batch_size: len(self.batches)]
self.index += 1
batches = self._to_tensor(batches)
return batches
elif self.index >= self.n_batches:
self.index = 0
raise StopIteration
else:
batches = self.batches[self.index * self.batch_size: (self.index + 1) * self.batch_size]
self.index += 1
batches = self._to_tensor(batches)
return batches
def __iter__(self):
return self
def __len__(self):
if self.residue:
return self.n_batches + 1
else:
return self.n_batches
def build_iterator(dataset, config):
'''API for building dataset iterator'''
iterator = DatasetIterator(dataset, config.batch_size, config.device)
return iterator
def get_time_dif(start_time):
"""compute time difference"""
end_time = time.time()
time_dif = end_time - start_time
return timedelta(seconds=int(round(time_dif)))