diff --git a/examples/basic_tutorials/cifar10_cnn_dist_new.py b/examples/basic_tutorials/cifar10_cnn_dist_new.py new file mode 100644 index 0000000..950c368 --- /dev/null +++ b/examples/basic_tutorials/cifar10_cnn_dist_new.py @@ -0,0 +1,158 @@ +#! /usr/bin/python +# -*- coding: utf-8 -*- + +import os +os.environ['TL_BACKEND'] = 'paddle' +# os.environ['TL_BACKEND'] = 'jittor' +# os.environ['TL_BACKEND'] = 'tensorflow' +# os.environ['TL_BACKEND'] = 'mindspore' +# os.environ['TL_BACKEND'] = 'torch' + +from tensorlayerx.dataflow import Dataset, DataLoader, DistributedBatchSampler +from tensorlayerx.vision.transforms import ( + Compose, Resize, RandomFlipHorizontal, RandomContrast, RandomBrightness, StandardizePerImage, RandomCrop +) +from tensorlayerx.nn import Module +import tensorlayerx as tlx +from tensorlayerx.nn import (Conv2d, Linear, Flatten, MaxPool2d, BatchNorm2d) +# enable debug logging +tlx.logging.set_verbosity(tlx.logging.DEBUG) + +# paddle.disable_static() +tlx.ops.set_device('gpu') +print(tlx.ops.get_device()) +tlx.ops.distributed_init() +print(tlx.is_distributed()) +# ################## Download and prepare the CIFAR10 dataset ################## +# This is just some way of getting the CIFAR10 dataset from an online location +# and loading it into numpy arrays with shape [32,32,3] +X_train, y_train, X_test, y_test = tlx.files.load_cifar10_dataset(shape=(-1, 32, 32, 3), plotable=False) + +# training settings +batch_size = 128 +n_epoch = 10 +learning_rate = 0.0001 +print_freq = 5 +n_step_epoch = int(len(y_train) / batch_size) +n_step = n_epoch * n_step_epoch +shuffle_buffer_size = 128 + +# ################## CIFAR10 dataset ################## +# We define a Dataset class for Loading CIFAR10 images and labels. +class make_dataset(Dataset): + + def __init__(self, data, label, transforms): + self.data = data + self.label = label + self.transforms = transforms + + def __getitem__(self, idx): + x = self.data[idx].astype('uint8') + y = self.label[idx].astype('int64') + x = self.transforms(x) + + return x, y + + def __len__(self): + + return len(self.label) + +# We define the CIFAR10 iamges preprocessing pipeline. +train_transforms = Compose( # Combining multiple operations sequentially + [ + RandomCrop(size=[24, 24]), #random crop from images to shape [24, 24] + RandomFlipHorizontal(), # random invert each image horizontally by probability + RandomBrightness(brightness_factor=(0.5, 1.5)), # Within the range of values (0.5, 1.5), adjust brightness randomly + RandomContrast(contrast_factor=(0.5, 1.5)), # Within the range of values (0.5, 1.5), adjust contrast randomly + StandardizePerImage() #Normalize the values of each image to [-1, 1] + ] +) + +test_transforms = Compose([Resize(size=(24, 24)), StandardizePerImage()]) + +# We use DataLoader to batch and shuffle data, and make data into iterators. +train_dataset = make_dataset(data=X_train, label=y_train, transforms=train_transforms) +test_dataset = make_dataset(data=X_test, label=y_test, transforms=test_transforms) + +train_sampler = DistributedBatchSampler(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True) +valid_sampler = DistributedBatchSampler(test_dataset, batch_size=batch_size, drop_last=True) + +train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, num_workers=2) +valid_loader = DataLoader(test_dataset, batch_sampler=valid_sampler, num_workers=2) + + +# ################## CNN network ################## +class CNN(Module): + + def __init__(self): + super(CNN, self).__init__() + # Parameter initialization method + W_init = tlx.nn.initializers.truncated_normal(stddev=5e-2) + W_init2 = tlx.nn.initializers.truncated_normal(stddev=0.04) + b_init2 = tlx.nn.initializers.constant(value=0.1) + + # 2D Convolutional Neural Network, Set padding method "SAME", convolutional kernel size [5,5], stride [1,1], in channels, out channels + self.conv1 = Conv2d(64, (5, 5), (1, 1), padding='SAME', W_init=W_init, b_init=None, name='conv1', in_channels=3) + # Add 2D BatchNormalize, using ReLU for output. + self.bn = BatchNorm2d(num_features=64, act=tlx.ReLU) + # Add 2D Max pooling layer. + self.maxpool1 = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool1') + + self.conv2 = Conv2d( + 64, (5, 5), (1, 1), padding='SAME', act=tlx.ReLU, W_init=W_init, name='conv2', in_channels=64 + ) + self.maxpool2 = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool2') + # Flatten 2D data to 1D data + self.flatten = Flatten(name='flatten') + # Linear layer with 384 units, using ReLU for output. + self.linear1 = Linear(384, act=tlx.ReLU, W_init=W_init2, b_init=b_init2, name='linear1relu', in_features=2304) + self.linear2 = Linear(192, act=tlx.ReLU, W_init=W_init2, b_init=b_init2, name='linear2relu', in_features=384) + self.linear3 = Linear(10, act=None, W_init=W_init2, name='output', in_features=192) + + # We define the forward computation process. + def forward(self, x): + z = self.conv1(x) + z = self.bn(z) + z = self.maxpool1(z) + z = self.conv2(z) + z = self.maxpool2(z) + z = self.flatten(z) + z = self.linear1(z) + z = self.linear2(z) + z = self.linear3(z) + return z + + + +# get the network +net = CNN() + +# Define the loss function, use the softmax cross entropy loss. +loss_fn = tlx.losses.softmax_cross_entropy_with_logits +# Define the optimizer, use the Adam optimizer. +optimizer = tlx.optimizers.Adam(learning_rate) +metrics = tlx.metrics.Accuracy() + +# Wrap the network with distributed_model +dp_layer = tlx.ops.distributed_model(net) + +print("模型已转换为分布式") + +# 使用高级 API 构建可训练模型 +net_with_train = tlx.model.Model(network=dp_layer, loss_fn=loss_fn, optimizer=optimizer, metrics=metrics) + +#执行训练 +import time +t0 = time.time() + +net_with_train.train(n_epoch=n_epoch, train_dataset=train_loader, print_freq=print_freq, print_train_batch=False) + +t1 = time.time() +training_time = t1 - t0 +import datetime +def format_time(time): + elapsed_rounded = int(round((time))) + # 格式化为 hh:mm:ss + return str(datetime.timedelta(seconds=elapsed_rounded)) +training_time = format_time(training_time) +print(training_time) diff --git a/examples/basic_tutorials/cifar10_cnn_dist_paddle_api.py b/examples/basic_tutorials/cifar10_cnn_dist_paddle_api.py new file mode 100644 index 0000000..dfda1b1 --- /dev/null +++ b/examples/basic_tutorials/cifar10_cnn_dist_paddle_api.py @@ -0,0 +1,182 @@ +#! /usr/bin/python +# -*- coding: utf-8 -*- +import os +os.environ['TL_BACKEND'] = 'paddle' + +import numpy as np +import paddle +from paddle.distributed import fleet +from paddle.io import DataLoader, DistributedBatchSampler +from tensorlayerx.vision.transforms import ( + Compose, Resize, RandomFlipHorizontal, RandomContrast, RandomBrightness, StandardizePerImage, RandomCrop +) +from tensorlayerx.dataflow import Dataset +from tensorlayerx.nn import Module +import tensorlayerx as tlx +from tensorlayerx.nn import (Conv2d, Linear, Flatten, MaxPool2d, BatchNorm2d) +# enable debug logging +tlx.logging.set_verbosity(tlx.logging.DEBUG) + +# paddle.disable_static() +paddle.set_device('gpu') +print(paddle.get_device()) +fleet.init(is_collective=True) +# print(tlx.is_distributed()) +# prepare cifar10 data +X_train, y_train, X_test, y_test = tlx.files.load_cifar10_dataset(shape=(-1, 32, 32, 3), plotable=False) + +# training settings +batch_size = 128 +n_epoch = 10 +learning_rate = 0.0001 +print_freq = 5 +n_step_epoch = int(len(y_train) / batch_size) +n_step = n_epoch * n_step_epoch +shuffle_buffer_size = 128 + +class make_dataset(Dataset): + + def __init__(self, data, label, transforms): + self.data = data + self.label = label + self.transforms = transforms + + def __getitem__(self, idx): + x = self.data[idx].astype('uint8') + y = self.label[idx].astype('int64') + x = self.transforms(x) + + return x, y + + def __len__(self): + + return len(self.label) + + + +#设置数据处理函数 +train_transforms = Compose( + [ + RandomCrop(size=[24, 24]), + RandomFlipHorizontal(), + RandomBrightness(brightness_factor=(0.5, 1.5)), + RandomContrast(contrast_factor=(0.5, 1.5)), + StandardizePerImage() + ] +) + +test_transforms = Compose([Resize(size=(24, 24)), StandardizePerImage()]) + +# 构建分布式训练使用的数据集和加载器 +train_dataset = make_dataset(data=X_train, label=y_train, transforms=train_transforms) +test_dataset = make_dataset(data=X_test, label=y_test, transforms=test_transforms) + +# 五、构建分布式训练使用的数据集 +train_sampler = DistributedBatchSampler(train_dataset, batch_size, shuffle=True, drop_last=True) +train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, num_workers=2) + +valid_sampler = DistributedBatchSampler(test_dataset, batch_size, drop_last=True) +valid_loader = DataLoader(test_dataset, batch_sampler=valid_sampler, num_workers=2) + + +class CNN(Module): + + def __init__(self): + super(CNN, self).__init__() + # weights init + W_init = tlx.nn.initializers.truncated_normal(stddev=5e-2) + W_init2 = tlx.nn.initializers.truncated_normal(stddev=0.04) + b_init2 = tlx.nn.initializers.constant(value=0.1) + + self.conv1 = Conv2d(64, (5, 5), (1, 1), padding='SAME', W_init=W_init, b_init=None, name='conv1', in_channels=3) + self.bn = BatchNorm2d(num_features=64, act=tlx.ReLU) + self.maxpool1 = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool1') + + self.conv2 = Conv2d( + 64, (5, 5), (1, 1), padding='SAME', act=tlx.ReLU, W_init=W_init, name='conv2', in_channels=64 + ) + self.maxpool2 = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool2') + + self.flatten = Flatten(name='flatten') + self.linear1 = Linear(384, act=tlx.ReLU, W_init=W_init2, b_init=b_init2, name='linear1relu', in_features=2304) + self.linear2 = Linear(192, act=tlx.ReLU, W_init=W_init2, b_init=b_init2, name='linear2relu', in_features=384) + self.linear3 = Linear(10, act=None, W_init=W_init2, name='output', in_features=192) + + def forward(self, x): + z = self.conv1(x) + z = self.bn(z) + z = self.maxpool1(z) + z = self.conv2(z) + z = self.maxpool2(z) + z = self.flatten(z) + z = self.linear1(z) + z = self.linear2(z) + z = self.linear3(z) + return z + + +# get the network +net = CNN() + +# 获取分布式 model,用于支持分布式训练 +dp_layer = fleet.distributed_model(net) + +# 定义损失函数、优化器等 +loss_fn = tlx.losses.softmax_cross_entropy_with_logits +optimizer = paddle.optimizer.Adam(learning_rate, parameters=dp_layer.parameters()) +optimizer = fleet.distributed_optimizer(optimizer) +metrics = tlx.metrics.Accuracy() + +print("模型已转换为分布式") + +val_acc_history = [] +val_loss_history = [] + +#执行训练 +import time +t0 = time.time() + +for epoch in range(n_epoch): + dp_layer.train() + for batch_id, data in enumerate(train_loader()): + x_data = data[0] + y_data = paddle.to_tensor(data[1]) + y_data = paddle.unsqueeze(y_data, 1) + + logits = dp_layer(x_data) + loss = loss_fn(logits, y_data) + + if batch_id % 1000 == 0: + print("epoch: {}, batch_id: {}, loss is: {}".format(epoch, batch_id, loss.numpy())) + loss.backward() + optimizer.step() + optimizer.clear_grad() + + # dp_layer.eval() + # accuracies = [] + # losses = [] + # for batch_id, data in enumerate(valid_loader()): + # x_data = data[0] + # y_data = paddle.to_tensor(data[1]) + # y_data = paddle.unsqueeze(y_data, 1) + + # logits = dp_layer(x_data) + # loss = loss_fn(logits, y_data) + # acc = paddle.metric.accuracy(logits, y_data) + # accuracies.append(acc.numpy()) + # losses.append(loss.numpy()) + + # avg_acc, avg_loss = np.mean(accuracies), np.mean(losses) + # print("[validation] accuracy/loss: {}/{}".format(avg_acc, avg_loss)) + # val_acc_history.append(avg_acc) + # val_loss_history.append(avg_loss) + +t1 = time.time() +training_time = t1 - t0 +import datetime +def format_time(time): + elapsed_rounded = int(round((time))) + # 格式化为 hh:mm:ss + return str(datetime.timedelta(seconds=elapsed_rounded)) +training_time = format_time(training_time) +print(training_time) diff --git a/examples/basic_tutorials/cifar10_cnn_dist_pytorch_api.py b/examples/basic_tutorials/cifar10_cnn_dist_pytorch_api.py new file mode 100644 index 0000000..7e1a5f7 --- /dev/null +++ b/examples/basic_tutorials/cifar10_cnn_dist_pytorch_api.py @@ -0,0 +1,217 @@ +#! /usr/bin/python +# -*- coding: utf-8 -*- + +import os +# os.environ['TL_BACKEND'] = 'paddle' +# os.environ['TL_BACKEND'] = 'tensorflow' +# os.environ['TL_BACKEND'] = 'mindspore' +os.environ['TL_BACKEND'] = 'torch' + +import time +from tensorlayerx.vision.transforms import ( + Compose, Resize, RandomFlipHorizontal, RandomContrast, RandomBrightness, StandardizePerImage, RandomCrop +) +from tensorlayerx.nn import Module +import tensorlayerx as tlx +from tensorlayerx.nn import (Conv2d, Linear, Flatten, MaxPool2d, BatchNorm2d) +import torch +import torch.nn.functional as F +from torch.utils.data import Dataset, DataLoader +from torch.utils.data.distributed import DistributedSampler +from torch.nn.parallel import DistributedDataParallel as DDP +from torch.distributed import init_process_group, destroy_process_group + +# enable debug logging +tlx.logging.set_verbosity(tlx.logging.DEBUG) + + +local_rank = int(os.environ["LOCAL_RANK"]) +torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) +init_process_group(backend="nccl") +print(f"Process {local_rank} using GPU: {torch.cuda.get_device_name(local_rank)}") + +# ################## Download and prepare the CIFAR10 dataset ################## +# This is just some way of getting the CIFAR10 dataset from an online location +# and loading it into numpy arrays with shape [32,32,3] +X_train, y_train, X_test, y_test = tlx.files.load_cifar10_dataset(shape=(-1, 32, 32, 3), plotable=False) + +# training settings +n_epoch = 5 +learning_rate = 0.0001 +print_freq = 5 +n_step_epoch = int(len(y_train) / 128) +n_step = n_epoch * n_step_epoch +shuffle_buffer_size = 128 +batch_size = 128 + +# ################## CIFAR10 dataset ################## +# We define a Dataset class for Loading CIFAR10 images and labels. +class make_dataset(Dataset): + + def __init__(self, data, label, transforms): + self.data = data + self.label = label + self.transforms = transforms + + def __getitem__(self, idx): + x = self.data[idx].astype('uint8') + y = self.label[idx].astype('int64') + x = self.transforms(x) + + return x, y + + def __len__(self): + + return len(self.label) + +# We define the CIFAR10 iamges preprocessing pipeline. +train_transforms = Compose( # Combining multiple operations sequentially + [ + RandomCrop(size=[24, 24]), #random crop from images to shape [24, 24] + RandomFlipHorizontal(), # random invert each image horizontally by probability + RandomBrightness(brightness_factor=(0.5, 1.5)), # Within the range of values (0.5, 1.5), adjust brightness randomly + RandomContrast(contrast_factor=(0.5, 1.5)), # Within the range of values (0.5, 1.5), adjust contrast randomly + StandardizePerImage() #Normalize the values of each image to [-1, 1] + ] +) + +test_transforms = Compose([Resize(size=(24, 24)), StandardizePerImage()]) + +# We use DataLoader to batch and shuffle data, and make data into iterators. +train_dataset = make_dataset(data=X_train, label=y_train, transforms=train_transforms) +test_dataset = make_dataset(data=X_test, label=y_test, transforms=test_transforms) +train_dataset = DataLoader( + train_dataset, + batch_size=batch_size, + pin_memory=True, + shuffle=False, + sampler=DistributedSampler(train_dataset) + ) + +class Trainer: + def __init__( + self, + model: torch.nn.Module, + train_data: DataLoader, + optimizer: torch.optim.Optimizer, + # save_every: int, + # snapshot_path: str, + ) -> None: + self.gpu_id = int(os.environ["LOCAL_RANK"]) + self.model = model.to(self.gpu_id) + self.train_data = train_data + self.optimizer = optimizer + # self.save_every = save_every + self.epochs_run = 0 + # self.snapshot_path = snapshot_path + # if os.path.exists(snapshot_path): + # print("Loading snapshot") + # self._load_snapshot(snapshot_path) + + self.model = DDP(self.model, device_ids=[self.gpu_id]) + + def _load_snapshot(self, snapshot_path): + loc = f"cuda:{self.gpu_id}" + snapshot = torch.load(snapshot_path, map_location=loc) + self.model.load_state_dict(snapshot["MODEL_STATE"]) + self.epochs_run = snapshot["EPOCHS_RUN"] + print(f"Resuming training from snapshot at Epoch {self.epochs_run}") + + def _run_batch(self, source, targets): + self.optimizer.zero_grad() + output = self.model(source) + loss = F.cross_entropy(output, targets) + loss.backward() + self.optimizer.step() + + def _run_epoch(self, epoch): + b_sz = len(next(iter(self.train_data))[0]) + print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}") + self.train_data.sampler.set_epoch(epoch) + for source, targets in self.train_data: + source = source.to(self.gpu_id) + targets = targets.to(self.gpu_id) + self._run_batch(source, targets) + + def _save_snapshot(self, epoch): + snapshot = { + "MODEL_STATE": self.model.module.state_dict(), + "EPOCHS_RUN": epoch, + } + torch.save(snapshot, self.snapshot_path) + print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}") + + def train(self, max_epochs: int): + for epoch in range(self.epochs_run, max_epochs): + self._run_epoch(epoch) + # if self.gpu_id == 0 and epoch % self.save_every == 0: + # self._save_snapshot(epoch) + + +# ################## CNN network ################## +class CNN(Module): + + def __init__(self): + super(CNN, self).__init__() + # Parameter initialization method + W_init = tlx.nn.initializers.truncated_normal(stddev=5e-2) + W_init2 = tlx.nn.initializers.truncated_normal(stddev=0.04) + b_init2 = tlx.nn.initializers.constant(value=0.1) + + # 2D Convolutional Neural Network, Set padding method "SAME", convolutional kernel size [5,5], stride [1,1], in channels, out channels + self.conv1 = Conv2d(64, (5, 5), (1, 1), padding='SAME', W_init=W_init, b_init=None, name='conv1', in_channels=3) + # Add 2D BatchNormalize, using ReLU for output. + self.bn = BatchNorm2d(num_features=64, act=tlx.nn.ReLU) + # Add 2D Max pooling layer. + self.maxpool1 = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool1') + + self.conv2 = Conv2d( + 64, (5, 5), (1, 1), padding='SAME', act=tlx.nn.ReLU, W_init=W_init, name='conv2', in_channels=64 + ) + self.maxpool2 = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool2') + # Flatten 2D data to 1D data + self.flatten = Flatten(name='flatten') + # Linear layer with 384 units, using ReLU for output. + self.linear1 = Linear(384, act=tlx.nn.ReLU, W_init=W_init2, b_init=b_init2, name='linear1relu', in_features=2304) + self.linear2 = Linear(192, act=tlx.nn.ReLU, W_init=W_init2, b_init=b_init2, name='linear2relu', in_features=384) + self.linear3 = Linear(10, act=None, W_init=W_init2, name='output', in_features=192) + + # We define the forward computation process. + def forward(self, x): + z = self.conv1(x) + z = self.bn(z) + z = self.maxpool1(z) + z = self.conv2(z) + z = self.maxpool2(z) + z = self.flatten(z) + z = self.linear1(z) + z = self.linear2(z) + z = self.linear3(z) + return z + + +# get the network +net = CNN() + +# Get training parameters +train_weights = net.trainable_weights +# Define the optimizer, use the Adam optimizer. +optimizer = torch.optim.Adam(net.parameters(), lr=1e-3) +trainer = Trainer(net, train_dataset, optimizer) + +# Custom training loops +t0 = time.time() + +trainer.train(n_epoch) + +t1 = time.time() +training_time = t1 - t0 +import datetime +def format_time(time): + elapsed_rounded = int(round((time))) + # 格式化为 hh:mm:ss + return str(datetime.timedelta(seconds=elapsed_rounded)) +training_time = format_time(training_time) +print(training_time) + +destroy_process_group() diff --git a/examples/basic_tutorials/cifar10_cnn_torch_dist.py b/examples/basic_tutorials/cifar10_cnn_torch_dist.py index 72ce083..76ca0c3 100644 --- a/examples/basic_tutorials/cifar10_cnn_torch_dist.py +++ b/examples/basic_tutorials/cifar10_cnn_torch_dist.py @@ -3,35 +3,39 @@ import os # os.environ['TL_BACKEND'] = 'paddle' +# os.environ['TL_BACKEND'] = 'jittor' # os.environ['TL_BACKEND'] = 'tensorflow' # os.environ['TL_BACKEND'] = 'mindspore' os.environ['TL_BACKEND'] = 'torch' -import time -from tensorlayerx.dataflow import Dataset, DataLoader +from tensorlayerx.dataflow import Dataset, DataLoader, DistributedBatchSampler from tensorlayerx.vision.transforms import ( Compose, Resize, RandomFlipHorizontal, RandomContrast, RandomBrightness, StandardizePerImage, RandomCrop ) -from tensorlayerx.model import TrainOneStep from tensorlayerx.nn import Module import tensorlayerx as tlx from tensorlayerx.nn import (Conv2d, Linear, Flatten, MaxPool2d, BatchNorm2d) -import argparse - -parser = argparse.ArgumentParser() -parser.add_argument("--local_rank", type=int, default=-1, - help="For distributed training: local_rank") -args = parser.parse_args() # enable debug logging tlx.logging.set_verbosity(tlx.logging.DEBUG) -tlx.ops.set_device(device = 'MLU', id = args.local_rank) -tlx.ops.distributed_init(backend="cncl") +tlx.ops.set_device('gpu') +print(tlx.ops.get_device()) +tlx.ops.distributed_init(backend="nccl") +print(tlx.is_distributed()) # ################## Download and prepare the CIFAR10 dataset ################## # This is just some way of getting the CIFAR10 dataset from an online location # and loading it into numpy arrays with shape [32,32,3] X_train, y_train, X_test, y_test = tlx.files.load_cifar10_dataset(shape=(-1, 32, 32, 3), plotable=False) +# training settings +batch_size = 128 +n_epoch = 5 +learning_rate = 0.0001 +print_freq = 5 +n_step_epoch = int(len(y_train) / batch_size) +n_step = n_epoch * n_step_epoch +shuffle_buffer_size = 128 + # ################## CIFAR10 dataset ################## # We define a Dataset class for Loading CIFAR10 images and labels. class make_dataset(Dataset): @@ -69,8 +73,12 @@ def __len__(self): train_dataset = make_dataset(data=X_train, label=y_train, transforms=train_transforms) test_dataset = make_dataset(data=X_test, label=y_test, transforms=test_transforms) -train_dataset = DataLoader(train_dataset, batch_size=128, shuffle=True) -test_dataset = DataLoader(test_dataset, batch_size=128) +train_sampler = DistributedBatchSampler(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True) +valid_sampler = DistributedBatchSampler(test_dataset, batch_size=batch_size, drop_last=True) + +train_loader = DataLoader(train_dataset, pin_memory=True, batch_sampler=train_sampler, num_workers=2) +valid_loader = DataLoader(test_dataset, pin_memory=True, batch_sampler=valid_sampler, num_workers=2) + # ################## CNN network ################## class CNN(Module): @@ -112,67 +120,38 @@ def forward(self, x): z = self.linear2(z) z = self.linear3(z) return z + # get the network net = CNN() -# training settings -n_epoch = 500 -learning_rate = 0.0001 -print_freq = 5 -n_step_epoch = int(len(y_train) / 128) -n_step = n_epoch * n_step_epoch -shuffle_buffer_size = 128 -# Get training parameters -train_weights = net.trainable_weights +# Define the loss function, use the softmax cross entropy loss. +loss_fn = tlx.losses.softmax_cross_entropy_with_logits # Define the optimizer, use the Adam optimizer. optimizer = tlx.optimizers.Adam(learning_rate) -# Define evaluation metrics. metrics = tlx.metrics.Accuracy() -# Define the loss calculation process -class WithLoss(Module): - - def __init__(self, net, loss_fn): - super(WithLoss, self).__init__() - self._net = net - self._loss_fn = loss_fn - - def forward(self, data, label): - out = self._net(data) - loss = self._loss_fn(out, label) - return loss - - -net_with_loss = WithLoss(net.mlu(), loss_fn=tlx.losses.softmax_cross_entropy_with_logits).mlu() -model = tlx.ops.distributed_model(net_with_loss, device_ids=[args.local_rank], - output_device=args.local_rank, - find_unused_parameters=True) -# Initialize one-step training -#net_with_train = TrainOneStep(net_with_loss, optimizer, train_weights) -net_with_train = TrainOneStep(model, optimizer, train_weights) - -# Custom training loops -for epoch in range(n_epoch): - start_time = time.time() - # Set the network to training state - net.set_train() - train_loss, train_acc, n_iter = 0, 0, 0 - # Get training data and labels - for X_batch, y_batch in train_dataset: - # Calculate the loss value, and automatically complete the gradient update - _loss_ce = net_with_train(X_batch.mlu(), y_batch.mlu()) - train_loss += _loss_ce - - n_iter += 1 - _logits = net(X_batch.mlu()) - # Calculate accuracy - metrics.update(_logits, y_batch.mlu()) - train_acc += metrics.result() - metrics.reset() - if (n_iter % 100 == 0): - print("Epoch {} of {} took {}".format(epoch + 1, n_epoch, time.time() - start_time)) - print("rank {} train loss: {}".format(args.local_rank,train_loss / n_iter)) - print("rank {} train acc: {}".format(args.local_rank,train_acc / n_iter)) +# Wrap the network with distributed_model +dp_layer = tlx.ops.distributed_model(net) + +print("模型已转换为分布式") +# 使用高级 API 构建可训练模型 +net_with_train = tlx.model.Model(network=dp_layer, loss_fn=loss_fn, optimizer=optimizer, metrics=metrics) + +#执行训练 +import time +t0 = time.time() + +net_with_train.train(n_epoch=n_epoch, train_dataset=train_loader, print_freq=print_freq, print_train_batch=False) + +t1 = time.time() +training_time = t1 - t0 +import datetime +def format_time(time): + elapsed_rounded = int(round((time))) + # 格式化为 hh:mm:ss + return str(datetime.timedelta(seconds=elapsed_rounded)) +training_time = format_time(training_time) +print(training_time) diff --git a/tensorlayerx/backend/__init__.py b/tensorlayerx/backend/__init__.py index a4057a1..b3e7916 100644 --- a/tensorlayerx/backend/__init__.py +++ b/tensorlayerx/backend/__init__.py @@ -4,6 +4,7 @@ # ops from .ops import BACKEND from .ops import BACKEND_VERSION +from .ops import is_distributed from .ops import padding_format from .ops import preprocess_1d_format from .ops import preprocess_2d_format diff --git a/tensorlayerx/backend/ops/__init__.py b/tensorlayerx/backend/ops/__init__.py index 125cb73..9ec756f 100644 --- a/tensorlayerx/backend/ops/__init__.py +++ b/tensorlayerx/backend/ops/__init__.py @@ -209,6 +209,7 @@ from .load_backend import set_device from .load_backend import distributed_init from .load_backend import distributed_model +# from .load_backend import distributed_optimizer from .load_backend import get_device from .load_backend import scatter_update from .load_backend import to_device @@ -235,7 +236,7 @@ # backend from .load_backend import BACKEND from .load_backend import BACKEND_VERSION - +from .load_backend import is_distributed from .load_backend import Reshape from .load_backend import ReduceSum from .load_backend import ReduceMax diff --git a/tensorlayerx/backend/ops/mindspore_backend.py b/tensorlayerx/backend/ops/mindspore_backend.py index 0dd307b..f5fb045 100644 --- a/tensorlayerx/backend/ops/mindspore_backend.py +++ b/tensorlayerx/backend/ops/mindspore_backend.py @@ -13,6 +13,7 @@ _calculate_fan_in_and_fan_out, _calculate_correct_fan ) from mindspore.common.tensor import Tensor +from mindspore.communication import init, get_group_size from mindspore.ops import operations as P from mindspore.ops import functional as F from mindspore.ops import composite as C @@ -59,6 +60,8 @@ complex64 = None complex128 = None +IS_DISTRIBUTED = False + # isinstance input output # TensorLike = Tensor_ @@ -1839,19 +1842,33 @@ def __init__(self, equation): def __call__(self, *args): return self.einsum(tuple(args)) -def set_device(device = 'GPU', id = 0): +def set_device(device = 'GPU', id = None): + device = device.upper() if device not in ['GPU', 'CPU', 'Ascend']: raise ValueError ("In mindspore, only support 'CPU', 'GPU' and 'Ascend'.") ms.context.set_context(device_target=device) - ms.context.set_context(device_id = id) - -def distributed_init(backend="cncl"): - raise NotImplementedError("Distributed for this backend is not supported") + if id is not None: + ms.context.set_context(device_id = id) + +def is_distributed(): + return IS_DISTRIBUTED + +def distributed_init(backend="nccl"): + init(backend) # GPU 使用 nccl,Ascend 使用 hccl + device_num = get_group_size() + context.set_auto_parallel_context( + parameter_broadcast=True, + parallel_mode=context.ParallelMode.DATA_PARALLEL, + gradients_mean=True, + device_num=device_num + ) + global IS_DISTRIBUTED + IS_DISTRIBUTED = True def distributed_model(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False): - raise NotImplementedError("Distributed for this backend is not supported") + return module def scatter_update(tensor, indices, updates): diff --git a/tensorlayerx/backend/ops/paddle_backend.py b/tensorlayerx/backend/ops/paddle_backend.py index ca96dd0..872f106 100644 --- a/tensorlayerx/backend/ops/paddle_backend.py +++ b/tensorlayerx/backend/ops/paddle_backend.py @@ -72,6 +72,8 @@ complex64 = paddle.complex64 complex128 = paddle.complex128 +IS_DISTRIBUTED = False + def _npairs(x, n): if isinstance(x, (paddle.Tensor, list, tuple)): return x @@ -1926,19 +1928,32 @@ def __call__(self, *args): return einsum(self.equation, *args) -def set_device(device = 'GPU', id = 0): +def set_device(device = 'GPU', id = None): device = device.lower() - if device == 'gpu': + if id is not None: device = device + ':' + str(id) paddle.device.set_device(device) def distributed_init(backend="cncl"): - raise NotImplementedError("Distributed for this backend is not supported") + from paddle.distributed import fleet + if get_device() == "cpu": + fleet.init(is_collective=False) + else: + fleet.init(is_collective=True) + global IS_DISTRIBUTED + IS_DISTRIBUTED = True + +def is_distributed(): + return IS_DISTRIBUTED def distributed_model(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False): - raise NotImplementedError("Distributed for this backend is not supported") + from paddle.distributed import fleet + return fleet.distributed_model(module) + +# def distributed_optimizer(optimizer): +# optimizer.enable_distribution() def scatter_update(tensor, indices, updates): diff --git a/tensorlayerx/backend/ops/torch_backend.py b/tensorlayerx/backend/ops/torch_backend.py index 191a078..3a6b18b 100644 --- a/tensorlayerx/backend/ops/torch_backend.py +++ b/tensorlayerx/backend/ops/torch_backend.py @@ -1751,20 +1751,31 @@ def __init__(self, equation): def __call__(self, *args): return torch.einsum(self.equation, *args) -def set_device(device = 'GPU', id = 0): +def set_device(device = 'GPU', id = None): + device = device.upper() + if id is None: + import os + id = int(os.environ["LOCAL_RANK"]) if device == 'GPU': - torch.set_default_tensor_type('torch.cuda.FloatTensor') + # torch.set_default_tensor_type('torch.cuda.FloatTensor') torch.cuda.set_device(id) if device == 'MLU': - torch.set_default_tensor_type('torch.mlu.FloatTensor') + # torch.set_default_tensor_type('torch.mlu.FloatTensor') torch.mlu.set_device(id) +def is_distributed(): + return torch.distributed.is_initialized() + def distributed_init(backend="cncl"): torch.distributed.init_process_group(backend=backend) def distributed_model(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False): + if device_ids is None: + import os + device_ids = [int(os.environ["LOCAL_RANK"])] + module = module.to(device_ids[0]) return torch.nn.parallel.DistributedDataParallel(module, device_ids=device_ids, output_device=output_device, dim=dim, broadcast_buffers=broadcast_buffers, @@ -1783,15 +1794,13 @@ def scatter_update(tensor, indices, updates): def get_device(): try: id = torch.cuda.current_device() - device = 'GPU:' + str(id) - except: - device = 'CPU' - - try: - id = torch.mlu.current_device() - device = 'MLU:' + str(id) + device = 'GPU:' + str(id) except: - device = 'CPU' + try: + id = torch.mlu.current_device() + device = 'MLU:' + str(id) + except: + device = 'CPU' return device def to_device(tensor, device='MLU', id=0): diff --git a/tensorlayerx/dataflow/__init__.py b/tensorlayerx/dataflow/__init__.py index 2279ed7..a2abe4a 100644 --- a/tensorlayerx/dataflow/__init__.py +++ b/tensorlayerx/dataflow/__init__.py @@ -1,6 +1,37 @@ #! /usr/bin/python # -*- coding: utf-8 -*- from __future__ import absolute_import, division, print_function -from .dataloader import * +from tensorlayerx.backend import BACKEND + +class DataLoader: + def __new__( + cls, + dataset, + batch_size=1, + shuffle=False, + drop_last=False, + sampler=None, + batch_sampler=None, + num_workers=0, + collate_fn=None, + time_out=0, + worker_init_fn=None, + prefetch_factor=2, + persistent_workers=False, + pin_memory=False, + ): + if BACKEND == 'paddle': + from paddle.io import DataLoader + return DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last, batch_sampler=batch_sampler, num_workers=num_workers, collate_fn=collate_fn, timeout=time_out, worker_init_fn=worker_init_fn, prefetch_factor=prefetch_factor, persistent_workers=persistent_workers) + elif BACKEND == 'torch': + from torch.utils.data import DataLoader + return DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last, sampler=sampler, batch_sampler=batch_sampler, num_workers=num_workers, collate_fn=collate_fn, pin_memory=pin_memory) + elif BACKEND == 'mindspore': + from tensorlayerx import is_distributed + if is_distributed(): + from .dataloader import MS_DataLoader + return MS_DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last, batch_sampler=batch_sampler, num_workers=num_workers) + from .dataloader import DataLoader + return DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last, sampler=sampler, batch_sampler=batch_sampler, num_workers=num_workers, collate_fn=collate_fn, persistent_workers=persistent_workers) from .sampler import * from .dataset import * \ No newline at end of file diff --git a/tensorlayerx/dataflow/dataloader.py b/tensorlayerx/dataflow/dataloader.py index a4f0ec8..d09bfa4 100644 --- a/tensorlayerx/dataflow/dataloader.py +++ b/tensorlayerx/dataflow/dataloader.py @@ -170,3 +170,46 @@ def __len__(self): return length else: return len(self._index_sampler) + + +from tensorlayerx.backend import BACKEND +if BACKEND == 'mindspore': + __all__.append('MS_DataLoader') + +def MS_DataLoader(dataset, + batch_size=1, + shuffle=False, + drop_last=False, + batch_sampler=None, + num_workers=0,): + if batch_sampler is not None: + assert batch_size == 1 and not shuffle and not drop_last, ( + "batch_size/shuffle/drop_last should not be set when " + "batch_sampler is given" + ) + batch_size = batch_sampler.batch_size + drop_last = batch_sampler.drop_last + shuffle = batch_sampler.shuffle + else: + assert batch_size > 0, ( + "batch_size should be None or a positive value when " + "batch_sampler is not given" + ) + if num_workers == 0: + num_workers = 1 + + from mindspore.communication import get_rank, get_group_size + import mindspore as ms + ms_dataset = ms.dataset.GeneratorDataset( + source=list(dataset), + column_names=["image", "label"], + shuffle=shuffle, + num_parallel_workers=num_workers, + num_shards=get_group_size(), + shard_id=get_rank() + ) + ms_dataset = ms_dataset.batch( + batch_size, + drop_remainder=True + ) + return ms_dataset diff --git a/tensorlayerx/dataflow/sampler.py b/tensorlayerx/dataflow/sampler.py index d24ba66..d128fc6 100644 --- a/tensorlayerx/dataflow/sampler.py +++ b/tensorlayerx/dataflow/sampler.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import numpy as np +import math __all__ = [ 'Sampler', @@ -10,6 +11,7 @@ 'SequentialSampler', 'WeightedRandomSampler', 'SubsetRandomSampler', + 'DistributedBatchSampler', ] @@ -286,3 +288,209 @@ def __iter__(self): def __len__(self): return len(self.indices) + +class DistributedBatchSampler(BatchSampler): + """Sampler that restricts data loading to a subset of the dataset. + + In such case, each process can pass a DistributedBatchSampler instance + as a DataLoader sampler, and load a subset of the original dataset that + is exclusive to it. + + .. note:: + Dataset is assumed to be of constant size. + + Parameters + ------------- + data : Dataset + dataset to sample + batch_size : int + sample indice number in a mini-batch indices. + num_replicas : int, optional + porcess number in distributed training. + If :attr:`num_replicas` is None, :attr:`num_replicas` will be + retrieved from :code:`paddle.distributed.ParallenEnv`. + Default None. + rank : int, optional + the rank of the current process among :attr:`num_replicas` + processes. If :attr:`rank` is None, :attr:`rank` is retrieved from + :code:`paddle.distributed.ParallenEnv`. Default None. + shuffle : bool + whther to shuffle indices order before genrating + batch indices. Default False. + drop_last : bool + whether drop the last incomplete batch dataset size + is not divisible by the batch size. Default False + + Examples: + .. code-block:: python + + import numpy as np + + from tensorlayerx.dataflow import Dataset, DistributedBatchSampler + + # init with dataset + class RandomDataset(Dataset): + def __init__(self, num_samples): + self.num_samples = num_samples + + def __getitem__(self, idx): + image = np.random.random([784]).astype('float32') + label = np.random.randint(0, 9, (1, )).astype('int64') + return image, label + + def __len__(self): + return self.num_samples + + dataset = RandomDataset(100) + sampler = DistributedBatchSampler(dataset, batch_size=64) + + for data in sampler: + # do something + break + """ + + def __init__(self, + data, + batch_size, + num_replicas=None, + rank=None, + shuffle=False, + drop_last=False): + self.data = data + + assert isinstance(batch_size, int) and batch_size > 0, \ + "batch_size should be a positive integer" + self.batch_size = batch_size + assert isinstance(shuffle, bool), \ + "shuffle should be a boolean value" + self.shuffle = shuffle + assert isinstance(drop_last, bool), \ + "drop_last should be a boolean number" + + self.drop_last = drop_last + self.epoch = 0 + + from ..backend import BACKEND + if BACKEND == 'mindspore': + return + elif BACKEND == 'torch': + from torch import distributed as dist + if num_replicas is None: + if not dist.is_available(): + raise RuntimeError("Requires distributed package to be available") + num_replicas = dist.get_world_size() + if rank is None: + if not dist.is_available(): + raise RuntimeError("Requires distributed package to be available") + rank = dist.get_rank() + if rank >= num_replicas or rank < 0: + raise ValueError( + "Invalid rank {}, rank should be in the interval" + " [0, {}]".format(rank, num_replicas - 1)) + self.nranks = num_replicas + self.local_rank = rank + elif BACKEND == 'paddle': + from paddle.fluid.dygraph.parallel import ParallelEnv + + if num_replicas is not None: + assert isinstance(num_replicas, int) and num_replicas > 0, \ + "num_replicas should be a positive integer" + self.nranks = num_replicas + else: + self.nranks = ParallelEnv().nranks + + if rank is not None: + assert isinstance(rank, int) and rank >= 0, \ + "rank should be a non-negative integer" + self.local_rank = rank + else: + self.local_rank = ParallelEnv().local_rank + + self.num_samples = int(math.ceil(len(self.data) * 1.0 / self.nranks)) + self.total_size = self.num_samples * self.nranks + + def __iter__(self): + num_samples = len(self.data) + indices = np.arange(num_samples).tolist() + indices += indices[:(self.total_size - len(indices))] + assert len(indices) == self.total_size + if self.shuffle: + np.random.RandomState(self.epoch).shuffle(indices) + self.epoch += 1 + + # subsample + def _get_indices_by_batch_size(indices): + subsampled_indices = [] + last_batch_size = self.total_size % (self.batch_size * self.nranks) + assert last_batch_size % self.nranks == 0 + last_local_batch_size = last_batch_size // self.nranks + + for i in range(self.local_rank * self.batch_size, + len(indices) - last_batch_size, + self.batch_size * self.nranks): + subsampled_indices.extend(indices[i:i + self.batch_size]) + + indices = indices[len(indices) - last_batch_size:] + subsampled_indices.extend(indices[ + self.local_rank * last_local_batch_size:( + self.local_rank + 1) * last_local_batch_size]) + return subsampled_indices + + if self.nranks > 1: + indices = _get_indices_by_batch_size(indices) + + assert len(indices) == self.num_samples + _sample_iter = iter(indices) + + batch_indices = [] + for idx in _sample_iter: + batch_indices.append(idx) + if len(batch_indices) == self.batch_size: + yield batch_indices + batch_indices = [] + if not self.drop_last and len(batch_indices) > 0: + yield batch_indices + + def __len__(self): + num_samples = self.num_samples + num_samples += int(not self.drop_last) * (self.batch_size - 1) + return num_samples // self.batch_size + + def set_epoch(self, epoch): + """ + Sets the epoch number. When :attr:`shuffle=True`, this number is used + as seeds of random numbers. By default, users may not set this, all + replicas (workers) use a different random ordering for each epoch. + If set same number at each epoch, this sampler will yield the same + ordering at all epoches. + + Arguments: + epoch (int): Epoch number. + + Examples: + .. code-block:: python + + import numpy as np + + from tensorlayerx.dataflow import Dataset, DistributedBatchSampler + + # init with dataset + class RandomDataset(Dataset): + def __init__(self, num_samples): + self.num_samples = num_samples + + def __getitem__(self, idx): + image = np.random.random([784]).astype('float32') + label = np.random.randint(0, 9, (1, )).astype('int64') + return image, label + + def __len__(self): + return self.num_samples + + dataset = RandomDataset(100) + sampler = DistributedBatchSampler(dataset, batch_size=64) + + for epoch in range(10): + sampler.set_epoch(epoch) + """ + self.epoch = epoch diff --git a/tensorlayerx/model/core.py b/tensorlayerx/model/core.py index 02ef47f..92181e6 100644 --- a/tensorlayerx/model/core.py +++ b/tensorlayerx/model/core.py @@ -89,8 +89,15 @@ def __init__(self, network, loss_fn=None, optimizer=None, metrics=None, **kwargs self.loss_fn = loss_fn self.optimizer = optimizer self.metrics = metrics - self.all_weights = network.all_weights - self.train_weights = self.network.trainable_weights + if tlx.BACKEND == 'paddle' and tlx.is_distributed(): + self.all_weights = network.parameters() + self.train_weights = network.parameters() + elif tlx.BACKEND == 'torch' and tlx.is_distributed(): + self.all_weights = network.parameters() + self.train_weights = network.parameters() + else: + self.all_weights = network.all_weights + self.train_weights = self.network.trainable_weights def train(self, n_epoch, train_dataset=None, test_dataset=False, print_train_batch=False, print_freq=5): if not isinstance(train_dataset, Iterable): @@ -109,18 +116,33 @@ def train(self, n_epoch, train_dataset=None, test_dataset=False, print_train_bat print_train_batch=print_train_batch, print_freq=print_freq, test_dataset=test_dataset ) elif tlx.BACKEND == 'paddle': - self.pd_train( - n_epoch=n_epoch, train_dataset=train_dataset, network=self.network, loss_fn=self.loss_fn, - train_weights=self.train_weights, optimizer=self.optimizer, metrics=self.metrics, - print_train_batch=print_train_batch, print_freq=print_freq, test_dataset=test_dataset - ) + if tlx.is_distributed(): + print("Distributed training is enabled.") + self.pd_dist_train( + n_epoch=n_epoch, train_dataset=train_dataset, network=self.network, loss_fn=self.loss_fn, + train_weights=self.train_weights, optimizer=self.optimizer, metrics=self.metrics, + print_train_batch=print_train_batch, print_freq=print_freq, test_dataset=test_dataset + ) + else: + self.pd_train( + n_epoch=n_epoch, train_dataset=train_dataset, network=self.network, loss_fn=self.loss_fn, + train_weights=self.train_weights, optimizer=self.optimizer, metrics=self.metrics, + print_train_batch=print_train_batch, print_freq=print_freq, test_dataset=test_dataset + ) elif tlx.BACKEND == 'torch': - self.th_train( - n_epoch=n_epoch, train_dataset=train_dataset, network=self.network, loss_fn=self.loss_fn, - train_weights=self.train_weights, optimizer=self.optimizer, metrics=self.metrics, - print_train_batch=print_train_batch, print_freq=print_freq, test_dataset=test_dataset - ) - + if tlx.is_distributed(): + print("Distributed training is enabled.") + self.th_dist_train( + n_epoch=n_epoch, train_dataset=train_dataset, network=self.network, loss_fn=self.loss_fn, + train_weights=self.train_weights, optimizer=self.optimizer, metrics=self.metrics, + print_train_batch=print_train_batch, print_freq=print_freq, test_dataset=test_dataset + ) + else: + self.th_train( + n_epoch=n_epoch, train_dataset=train_dataset, network=self.network, loss_fn=self.loss_fn, + train_weights=self.train_weights, optimizer=self.optimizer, metrics=self.metrics, + print_train_batch=print_train_batch, print_freq=print_freq, test_dataset=test_dataset + ) elif tlx.BACKEND == "oneflow": self.of_train( n_epoch=n_epoch, train_dataset=train_dataset, network=self.network, loss_fn=self.loss_fn, @@ -368,7 +390,7 @@ def ms_train( TimeRemainingColumn(), TimeElapsedColumn()) as progress: - n_batch = len(train_dataset) + n_batch = len(list(train_dataset)) epoch_tqdm = progress.add_task(description="[red]Epoch progress 0/{}".format(n_epoch), total=n_epoch) batch_tqdm = progress.add_task(description="[green]Batch progress 0/{}".format(n_batch), total=n_batch) @@ -496,6 +518,88 @@ def pd_train( progress.advance(epoch_tqdm, advance=1) progress.reset(batch_tqdm) + def pd_dist_train( + self, n_epoch, train_dataset, network, loss_fn, train_weights, optimizer, metrics, print_train_batch, + print_freq, test_dataset + ): + with Progress(TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeRemainingColumn(), + TimeElapsedColumn()) as progress: + + n_batch = len(train_dataset) + epoch_tqdm = progress.add_task(description="[red]Epoch progress 0/{}".format(n_epoch), total=n_epoch) + batch_tqdm = progress.add_task(description="[green]Batch progress 0/{}".format(n_batch), total=n_batch) + + for epoch in range(n_epoch): + start_time = time.time() + train_loss, train_acc, n_iter = 0, 0, 0 + network.train() + for batch, (X_batch, y_batch) in enumerate(train_dataset): + output = network(X_batch) + loss = loss_fn(output, y_batch) + loss_ce = loss.numpy() + # loss.backward() + # optimizer.step() + # optimizer.clear_grad() + + grads = optimizer.gradient(loss, train_weights) + optimizer.apply_gradients(zip(grads, train_weights)) + + train_loss += loss_ce + if metrics: + metrics.update(output, y_batch) + train_acc += metrics.result() + metrics.reset() + else: + train_acc += pd.metric.accuracy(output, y_batch) + n_iter += 1 + + if print_train_batch: + print("Epoch {} of {} took {}".format(epoch + 1, n_epoch, time.time() - start_time)) + print(" train loss: {}".format(train_loss / n_iter)) + print(" train acc: {}".format(train_acc / n_iter)) + progress.advance(batch_tqdm, advance=1) + progress.update(batch_tqdm, description="[green]Batch progress {}/{}".format(batch + 1, n_batch)) + + if epoch + 1 == 1 or (epoch + 1) % print_freq == 0: + + print("Epoch {} of {} took {}".format(epoch + 1, n_epoch, time.time() - start_time)) + print(" train loss: {}".format(train_loss / n_iter)) + print(" train acc: {}".format(train_acc / n_iter)) + + if test_dataset: + # use training and evaluation sets to evaluate the model every print_freq epoch + val_acc_history = [] + val_loss_history = [] + network.eval() + accuracies = [] + losses = [] + val_loss, val_acc, n_iter = 0, 0, 0 + for batch_id, data in enumerate(test_dataset): + x_data = data[0] + y_data = data[1] + # y_data = paddle.unsqueeze(y_data, 1) + + logits = network(x_data) + loss = loss_fn(logits, y_data) + if metrics: + metrics.update(logits, y_data) + val_acc += metrics.result() + metrics.reset() + else: + val_acc += np.mean(np.equal(np.argmax(logits, 1), y_data)) + accuracies.append(val_acc.numpy()) + losses.append(loss.numpy()) + + avg_acc, avg_loss = np.mean(accuracies), np.mean(losses) + print("[validation] accuracy/loss: {}/{}".format(avg_acc, avg_loss)) + val_acc_history.append(avg_acc) + val_loss_history.append(avg_loss) + progress.update(epoch_tqdm, description="[red]Epoch progress {}/{}".format(epoch + 1, n_epoch)) + progress.advance(epoch_tqdm, advance=1) + progress.reset(batch_tqdm) def th_train( self, n_epoch, train_dataset, network, loss_fn, train_weights, optimizer, metrics, print_train_batch, @@ -567,6 +671,83 @@ def th_train( progress.advance(epoch_tqdm, advance=1) progress.reset(batch_tqdm) + def th_dist_train( + self, n_epoch, train_dataset, network, loss_fn, train_weights, optimizer, metrics, print_train_batch, + print_freq, test_dataset + ): + # device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') + # network = network.to(device) + with Progress(TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeRemainingColumn(), + TimeElapsedColumn()) as progress: + + n_batch = len(train_dataset) + epoch_tqdm = progress.add_task(description="[red]Epoch progress 0/{}".format(n_epoch), total=n_epoch) + batch_tqdm = progress.add_task(description="[green]Batch progress 0/{}".format(n_batch), total=n_batch) + import os + gpu_id = int(os.environ["LOCAL_RANK"]) + + for epoch in range(n_epoch): + start_time = time.time() + train_loss, train_acc, n_iter = 0, 0, 0 + # print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}") + # train_dataset.sampler.set_epoch(epoch) + for batch, (X_batch, y_batch) in enumerate(train_dataset): + # for source, targets in train_dataset: + X_batch = X_batch.to(gpu_id) + y_batch = y_batch.to(gpu_id) + # optimizer.zero_grad() + output = network(X_batch) + loss = loss_fn(output, y_batch) + # loss.backward() + # self.optimizer.step() + grads = optimizer.gradient(loss, train_weights) + optimizer.apply_gradients(zip(grads, train_weights)) + + train_loss += loss + if metrics: + metrics.update(output, y_batch) + train_acc += metrics.result() + metrics.reset() + else: + train_acc += (output.argmax(1) == y_batch).type(torch.float).mean().item() + n_iter += 1 + + if print_train_batch: + print("Epoch {} of {} took {}".format(epoch + 1, n_epoch, time.time() - start_time)) + print(" train loss: {}".format(train_loss / n_iter)) + print(" train acc: {}".format(train_acc / n_iter)) + progress.advance(batch_tqdm, advance=1) + progress.update(batch_tqdm, description="[green]Batch progress {}/{}".format(batch + 1, n_batch)) + + if epoch + 1 == 1 or (epoch + 1) % print_freq == 0: + + print("Epoch {} of {} took {}".format(epoch + 1, n_epoch, time.time() - start_time)) + print(" train loss: {}".format(train_loss / n_iter)) + print(" train acc: {}".format(train_acc / n_iter)) + + # if test_dataset: + # # use training and evaluation sets to evaluate the model every print_freq epoch + # if epoch + 1 == 1 or (epoch + 1) % print_freq == 0: + # network.set_eval() + # val_loss, val_acc, n_iter = 0, 0, 0 + # for X_batch, y_batch in test_dataset: + # _logits = network(X_batch) # is_train=False, disable dropout + # val_loss += loss_fn(_logits, y_batch) + # if metrics: + # metrics.update(_logits, y_batch) + # val_acc += metrics.result() + # metrics.reset() + # else: + # val_acc += (_logits.argmax(1) == y_batch).type(torch.float).mean().item() + # n_iter += 1 + # print(" val loss: {}".format(val_loss / n_iter)) + # print(" val acc: {}".format(val_acc / n_iter)) + progress.update(epoch_tqdm, description="[red]Epoch progress {}/{}".format(epoch + 1, n_epoch)) + progress.advance(epoch_tqdm, advance=1) + progress.reset(batch_tqdm) def of_train( diff --git a/tensorlayerx/optimizers/paddle_optimizers.py b/tensorlayerx/optimizers/paddle_optimizers.py index 95c31ff..24ad127 100644 --- a/tensorlayerx/optimizers/paddle_optimizers.py +++ b/tensorlayerx/optimizers/paddle_optimizers.py @@ -2,30 +2,18 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, division, print_function +from abc import ABC, abstractmethod import paddle from paddle.optimizer import Optimizer import warnings +from .. import is_distributed __all__ = ['Adadelta', 'Adagrad', 'Adam', 'Adamax', 'Ftrl', 'Nadam', 'RMSprop', 'SGD', 'Momentum', 'Lamb', 'LARS'] - -class Adadelta(Optimizer): - - def __init__(self, lr=0.001, eps=1.0e-6, rho=0.95, weight_decay=0.0, grad_clip=None): - if lr is None: - raise ValueError('learn_rate is not set.') - if eps is None: - raise ValueError('eps is not set.') - if rho is None: - raise ValueError('rho is not set') - self.lr = lr - self.eps = eps - self.rho = rho - self.grad_succeed = True - self.init_optim = False - self.weight_decay = weight_decay - self.grad_clip = grad_clip - self._grad_clip = grad_clip +class TLX_Optimizer(Optimizer, ABC): + def __init__(self): + self._optimizer = None + self._distribution = is_distributed() def gradient(self, loss, weights): if loss is None: @@ -33,30 +21,61 @@ def gradient(self, loss, weights): if weights is None: raise ValueError('weights is not set.') if not self.init_optim: - self.adadelta = paddle.optimizer.Adadelta( - learning_rate=self.lr, epsilon=self.eps, rho=self.rho, parameters=weights, - grad_clip=self.grad_clip, weight_decay=self.weight_decay - ) + self._optimizer = self._create_optimizer(weights) + if self._distribution: + self._optimizer = paddle.distributed.fleet.distributed_optimizer(self._optimizer) self.init_optim = True + loss.backward() - grads_and_vars = self.adadelta.backward(loss=loss, parameters=weights) + grads_and_vars = self._optimizer.backward(loss=loss, parameters=weights) params, grads, self.grad_succeed = filter_grads(grads_and_vars, weights) self.grads_and_vars = grads_and_vars return grads + @abstractmethod + def _create_optimizer(self, weights): + pass + def apply_gradients(self, grads_and_vars): grads_and_vars = zip_grads_and_params(grads_and_vars, self.grad_succeed, self.grads_and_vars) if grads_and_vars is None: raise ValueError('grads_and_vars is not set.') - self.adadelta._apply_optimize(loss=None, startup_program=None, params_grads=grads_and_vars) - self.adadelta.clear_grad() + self._optimizer._apply_optimize(loss=None, startup_program=None, params_grads=grads_and_vars) + self._optimizer.clear_grad() -class Adagrad(Optimizer): +class Adadelta(TLX_Optimizer): + + def __init__(self, lr=0.001, eps=1.0e-6, rho=0.95, weight_decay=0.0, grad_clip=None): + super().__init__() + if lr is None: + raise ValueError('learn_rate is not set.') + if eps is None: + raise ValueError('eps is not set.') + if rho is None: + raise ValueError('rho is not set') + self.lr = lr + self.eps = eps + self.rho = rho + self.grad_succeed = True + self.init_optim = False + self.weight_decay = weight_decay + self.grad_clip = grad_clip + self._grad_clip = grad_clip + + def _create_optimizer(self, weights): + return paddle.optimizer.Adadelta( + learning_rate=self.lr, epsilon=self.eps, rho=self.rho, parameters=weights, + grad_clip=self.grad_clip, weight_decay=self.weight_decay + ) + + +class Adagrad(TLX_Optimizer): def __init__(self, lr=0.001, initial_accumulator_value=0.0, eps=1.0e-6, weight_decay=0.0, grad_clip=None): + super().__init__() if lr is None: raise ValueError('lr is not set.') if initial_accumulator_value is None: @@ -73,37 +92,18 @@ def __init__(self, lr=0.001, initial_accumulator_value=0.0, eps=1.0e-6, weight_d self.grad_clip = grad_clip self._grad_clip = grad_clip - def gradient(self, loss, weights): - if loss is None: - raise ValueError('loss is not set.') - if weights is None: - raise ValueError('weights is not set.') - if not self.init_optim: - self.adagrad = paddle.optimizer.Adagrad( - learning_rate=self.lr, epsilon=self.eps, - initial_accumulator_value=self.initial_accumulator_value, parameters=weights, grad_clip=self.grad_clip, - weight_decay=self.weight_decay - ) - self.init_optim = True - loss.backward() - grads_and_vars = self.adagrad.backward(loss=loss, parameters=weights) - - params, grads, self.grad_succeed = filter_grads(grads_and_vars, weights) - self.grads_and_vars = grads_and_vars - return grads - - def apply_gradients(self, grads_and_vars): - grads_and_vars = zip_grads_and_params(grads_and_vars, self.grad_succeed, self.grads_and_vars) - if grads_and_vars is None: - raise ValueError('grads_and_vars is not set.') - self.adagrad._apply_optimize(loss=None, startup_program=None, params_grads=grads_and_vars) - self.adagrad.clear_grad() + def _create_optimizer(self, weights): + return paddle.optimizer.Adagrad( + learning_rate=self.lr, epsilon=self.eps, initial_accumulator_value=self.initial_accumulator_value, + parameters=weights, grad_clip=self.grad_clip, weight_decay=self.weight_decay + ) -class Adam(Optimizer): +class Adam(TLX_Optimizer): def __init__(self, lr=0.001, beta_1=0.9, beta_2=0.999, eps=1.0e-8, weight_decay=0.0, grad_clip=None): + super().__init__() if lr is None: raise ValueError('lr is not set.') if beta_1 is None: @@ -127,38 +127,20 @@ def __init__(self, lr=0.001, beta_1=0.9, beta_2=0.999, eps=1.0e-8, weight_decay= self.weight_decay = weight_decay self.grad_clip = grad_clip self._grad_clip = grad_clip + self._parameter_list = None - def gradient(self, loss, weights): - if loss is None: - raise ValueError('loss is not set.') - if weights is None: - raise ValueError('weights is not set.') - if not self.init_optim: - self.adam = paddle.optimizer.Adam( - learning_rate=self.lr, beta1=self.beta_1, beta2=self.beta_2, epsilon=self.eps, - parameters=weights, grad_clip=self.grad_clip, weight_decay=self.weight_decay - ) - self.init_optim = True - loss.backward() - grads_and_vars = self.adam.backward(loss, parameters=weights) - - params, grads, self.grad_succeed = filter_grads(grads_and_vars, weights) - self.grads_and_vars = grads_and_vars - return grads - - def apply_gradients(self, grads_and_vars): - grads_and_vars = zip_grads_and_params(grads_and_vars, self.grad_succeed, self.grads_and_vars) - if grads_and_vars is None: - raise ValueError('grads_and_vars is not set.') - self.adam._apply_optimize(loss=None, startup_program=None, params_grads=grads_and_vars) - self.adam.clear_grad() - + def _create_optimizer(self, weights): + return paddle.optimizer.Adam( + learning_rate=self.lr, beta1=self.beta_1, beta2=self.beta_2, epsilon=self.eps, + parameters=weights, grad_clip=self.grad_clip, weight_decay=self.weight_decay + ) -class Adamax(Optimizer): +class Adamax(TLX_Optimizer): def __init__(self, lr=0.001, beta_1=0.9, beta_2=0.999, eps=1.0e-8, weight_decay=0.0, grad_clip=None): + super().__init__() if lr is None: raise ValueError('lr is not set.') if beta_1 is None: @@ -183,52 +165,34 @@ def __init__(self, lr=0.001, beta_1=0.9, beta_2=0.999, eps=1.0e-8, weight_decay= self.grad_clip = grad_clip self._grad_clip = grad_clip - def gradient(self, loss, weights): - if loss is None: - raise ValueError('loss is not set.') - if weights is None: - raise ValueError('weights is not set.') - if not self.init_optim: - self.adamax = paddle.optimizer.Adamax( - learning_rate=self.lr, beta1=self.beta_1, beta2=self.beta_2, epsilon=self.eps, - parameters=weights, grad_clip=self.grad_clip, weight_decay=self.weight_decay - ) - self.init_optim = True - loss.backward() - grads_and_vars = self.adamax.backward(loss=loss, parameters=weights) - - params, grads, self.grad_succeed = filter_grads(grads_and_vars, weights) - self.grads_and_vars = grads_and_vars - return grads - - def apply_gradients(self, grads_and_vars): - grads_and_vars = zip_grads_and_params(grads_and_vars, self.grad_succeed, self.grads_and_vars) - if grads_and_vars is None: - raise ValueError('grads_and_vars is not set.') - self.adamax._apply_optimize(loss=None, startup_program=None, params_grads=grads_and_vars) - self.adamax.clear_grad() + def _create_optimizer(self, weights): + return paddle.optimizer.Adamax( + learning_rate=self.lr, beta1=self.beta_1, beta2=self.beta_2, epsilon=self.eps, + parameters=weights, grad_clip=self.grad_clip, weight_decay=self.weight_decay + ) -class Ftrl(Optimizer): +class Ftrl(TLX_Optimizer): def __init__(self): raise Exception('Ftrl optimizer function not implemented') -class Nadam(Optimizer): +class Nadam(TLX_Optimizer): def __init__(self): raise Exception('Nadam optimizer function not implemented') -class RMSprop(Optimizer): +class RMSprop(TLX_Optimizer): def __init__( self, lr=0.001, rho=0.95, eps=1.0e-6, momentum=0.0, centered=False, weight_decay=0.0, grad_clip=None ): + super().__init__() if lr is None: raise ValueError("lr is not set.") if rho is None: @@ -255,35 +219,17 @@ def __init__( self.grad_clip = grad_clip self._grad_clip = grad_clip - def gradient(self, loss, weights): - if loss is None: - raise ValueError('loss is not set.') - if weights is None: - raise ValueError('weights is not set.') - if not self.init_optim: - self.rmsprop = paddle.optimizer.RMSProp( - learning_rate=self.lr, epsilon=self.eps, rho=self.rho, momentum=self.momentum, - parameters=weights, grad_clip=self.grad_clip, weight_decay=self.weight_decay - ) - self.init_optim = True - loss.backward() - grads_and_vars = self.rmsprop.backward(loss=loss, parameters=weights) - - params, grads, self.grad_succeed = filter_grads(grads_and_vars, weights) - self.grads_and_vars = grads_and_vars - return grads - - def apply_gradients(self, grads_and_vars): - grads_and_vars = zip_grads_and_params(grads_and_vars, self.grad_succeed, self.grads_and_vars) - if grads_and_vars is None: - raise ValueError('grads_and_vars is not set.') - self.rmsprop._apply_optimize(loss=None, startup_program=None, params_grads=grads_and_vars) - self.rmsprop.clear_grad() + def _create_optimizer(self, weights): + return paddle.optimizer.RMSprop( + learning_rate=self.lr, epsilon=self.eps, rho=self.rho, momentum=self.momentum, + parameters=weights, grad_clip=self.grad_clip, weight_decay=self.weight_decay + ) -class SGD(Optimizer): +class SGD(TLX_Optimizer): def __init__(self, lr=0.1, momentum=0.0, weight_decay=0.0, grad_clip=None): + super().__init__() if lr is None: raise ValueError("lr is not set.") @@ -294,35 +240,17 @@ def __init__(self, lr=0.1, momentum=0.0, weight_decay=0.0, grad_clip=None): self.grad_clip = grad_clip self._grad_clip = grad_clip - def gradient(self, loss, weights): - if loss is None: - raise ValueError('loss is not set.') - if weights is None: - raise ValueError('weights is not set.') - if not self.init_optim: - self.sgd = paddle.optimizer.SGD( - learning_rate=self.lr, parameters=weights, grad_clip=self.grad_clip, - weight_decay=self.weight_decay - ) - self.init_optim = True - loss.backward() - grads_and_vars = self.sgd.backward(loss=loss, parameters=weights) + def _create_optimizer(self, weights): + return paddle.optimizer.SGD( + learning_rate=self.lr, parameters=weights, grad_clip=self.grad_clip, + weight_decay=self.weight_decay + ) - params, grads, self.grad_succeed = filter_grads(grads_and_vars, weights) - self.grads_and_vars = grads_and_vars - return grads - - def apply_gradients(self, grads_and_vars): - grads_and_vars = zip_grads_and_params(grads_and_vars, self.grad_succeed, self.grads_and_vars) - if grads_and_vars is None: - raise ValueError('grads_and_vars is not set.') - self.sgd._apply_optimize(loss=None, startup_program=None, params_grads=grads_and_vars) - self.sgd.clear_grad() - -class Momentum(Optimizer): +class Momentum(TLX_Optimizer): def __init__(self, lr=0.001, momentum=0.9, weight_decay=0.0, nesterov=False, grad_clip=None): + super().__init__() if lr is None: raise ValueError("lr is not set") if momentum is None: @@ -337,38 +265,20 @@ def __init__(self, lr=0.001, momentum=0.9, weight_decay=0.0, nesterov=False, gr self.grad_clip = grad_clip self._grad_clip = grad_clip - def gradient(self, loss, weights): - if loss is None: - raise ValueError('loss is not set.') - if weights is None: - raise ValueError('weights is not set.') - if not self.init_optim: - self.moment = paddle.optimizer.Momentum( - learning_rate=self.lr, momentum=self.momentum, parameters=weights, - use_nesterov=self.nesterov, grad_clip=self.grad_clip, weight_decay=self.weight_decay - ) - self.init_optim = True - loss.backward() - grads_and_vars = self.moment.backward(loss=loss, parameters=weights) - - params, grads, self.grad_succeed = filter_grads(grads_and_vars, weights) - self.grads_and_vars = grads_and_vars - return grads - - def apply_gradients(self, grads_and_vars): - grads_and_vars = zip_grads_and_params(grads_and_vars, self.grad_succeed, self.grads_and_vars) - if grads_and_vars is None: - raise ValueError('grads_and_vars is not set.') - self.moment._apply_optimize(loss=None, startup_program=None, params_grads=grads_and_vars) - self.moment.clear_grad() + def _create_optimizer(self, weights): + return paddle.optimizer.Momentum( + learning_rate=self.lr, momentum=self.momentum, parameters=weights, + use_nesterov=self.nesterov, grad_clip=self.grad_clip, weight_decay=self.weight_decay + ) -class Lamb(Optimizer): +class Lamb(TLX_Optimizer): def __init__( self, lr=0.001, beta_1=0.9, beta_2=0.999, eps=1.0e-6, weight_decay=0.01, grad_clip=None ): + super().__init__() if lr is None: raise ValueError('lr is not set.') if beta_1 is None: @@ -393,33 +303,14 @@ def __init__( self.grad_clip = grad_clip self._grad_clip = grad_clip - def gradient(self, loss, weights): - if loss is None: - raise ValueError('loss is not set.') - if weights is None: - raise ValueError('weights is not set.') - if not self.init_optim: - self.lamb = paddle.optimizer.Lamb( - learning_rate=self.lr, lamb_weight_decay=self.lamb_weight_decay, beta1=self.beta_1, - beta2=self.beta_2, epsilon=self.eps, parameters=weights, grad_clip=self.grad_clip - ) - self.init_optim = True - loss.backward() - grads_and_vars = self.lamb.backward(loss=loss, parameters=weights) - - params, grads, self.grad_succeed = filter_grads(grads_and_vars, weights) - self.grads_and_vars = grads_and_vars - return grads - - def apply_gradients(self, grads_and_vars): - grads_and_vars = zip_grads_and_params(grads_and_vars, self.grad_succeed, self.grads_and_vars) - if grads_and_vars is None: - raise ValueError('grads_and_vars is not set.') - self.lamb._apply_optimize(loss=None, startup_program=None, params_grads=grads_and_vars) - self.lamb.clear_grad() + def _create_optimizer(self, weights): + return paddle.optimizer.Lamb( + learning_rate=self.lr, lamb_weight_decay=self.lamb_weight_decay, beta1=self.beta_1, + beta2=self.beta_2, epsilon=self.eps, parameters=weights, grad_clip=self.grad_clip + ) -class LARS(Optimizer): +class LARS(TLX_Optimizer): def __init__(self): @@ -429,6 +320,9 @@ def gradient(self): pass + def _create_optimizer(self, weights): + raise Exception('LARS optimizer function not implemented') + def apply_gradients(self, grads_and_vars): raise Exception('LARS optimizer function not implemented')