From b9f49d5802e7992daaf51841d1f35fd99a8b6ae5 Mon Sep 17 00:00:00 2001 From: tzufgoogle Date: Mon, 9 May 2022 15:13:14 +0000 Subject: [PATCH] multi task with prompt --- cabby/model/dataset_item.py | 26 ++- cabby/model/datasets.py | 24 +- cabby/model/text/BUILD | 16 ++ cabby/model/text/model_trainer.py | 54 +++-- cabby/model/text/model_trainer_multitask.py | 230 ++++++++++++++++++++ cabby/model/text/models.py | 7 +- cabby/model/util.py | 3 +- run_model_multi_task.sh | 18 ++ 8 files changed, 342 insertions(+), 36 deletions(-) create mode 100644 cabby/model/text/model_trainer_multitask.py create mode 100644 run_model_multi_task.sh diff --git a/cabby/model/dataset_item.py b/cabby/model/dataset_item.py index 3ec03677..cd9bd47d 100755 --- a/cabby/model/dataset_item.py +++ b/cabby/model/dataset_item.py @@ -65,15 +65,25 @@ def from_TextGeoSplit(cls, train, valid, test, unique_cellids, ) @classmethod - def load(cls, dataset_path: Text, train_path_dataset: Text, - valid_path_dataset: Text, test_path_dataset: Text, - unique_cellid_path: Text, tensor_cellid_path: Text, + def load(cls, dataset_dir: Text, model_type: Text, + s2_level: Text, unique_cellid_path: Text, tensor_cellid_path: Text, label_to_cellid_path: Text): + dataset_model_path = os.path.join(dataset_dir, str(model_type)) + dataset_path = os.path.join(dataset_model_path, str(s2_level)) + train_path_dataset = os.path.join(dataset_path,'train.pth') + valid_path_dataset = os.path.join(dataset_path,'valid.pth') + test_path_dataset = os.path.join(dataset_path,'test.pth') + unique_cellid_path = os.path.join(dataset_path,"unique_cellid.npy") + tensor_cellid_path = os.path.join(dataset_path,"tensor_cellid.pth") + label_to_cellid_path = os.path.join(dataset_path,"label_to_cellid.npy") + logging.info("Loading dataset from <== {}.".format(dataset_path)) train_dataset = torch.load(train_path_dataset) valid_dataset = torch.load(valid_path_dataset) test_dataset = torch.load(test_path_dataset) + logging.info(f"Size of train set: {len(train_dataset)}" + + f", Size of validation set: {len(valid_dataset)}, Size of test set: {len(test_dataset)}") unique_cellid = np.load(unique_cellid_path, allow_pickle='TRUE') label_to_cellid = np.load( @@ -144,8 +154,16 @@ def __init__(self, text_tokenizer, s2_tokenizer, data: pd.DataFrame, s2level: in # Tokenize instructions. + + instruction_list = data.instructions.tolist() + if 'T5' in model_type: + # Add prompt + instruction_list = [model_type + ": " + t for t in instruction_list] + + logging.info(f"An example of the text encoded: '{instruction_list[0]}'") + self.encodings = self.text_tokenizer( - data.instructions.tolist(), truncation=True, + instruction_list, truncation=True, padding=True, add_special_tokens=True) data['far_cells'] = data.cellid.apply( diff --git a/cabby/model/datasets.py b/cabby/model/datasets.py index 927d05f4..1b1d6ceb 100644 --- a/cabby/model/datasets.py +++ b/cabby/model/datasets.py @@ -101,10 +101,9 @@ def process_route(self, route_str): gutil.point_from_str_coord_xy(landmark_str) for landmark_str in ladmarks_str_list] - def process_landmarks(self, landmarks_str_one_line): - ladmarks_str_list = landmarks_str_one_line.split(';') - return [gutil.point_from_str_coord_yx( - landmark_str.split(':')[-1]) for landmark_str in ladmarks_str_list] + def process_landmarks(self, row): + points = [row['end_point'], row['start_point'], row['main_pivot'], row['near_pivot']] + return points def get_specific_landmark(self, landmarks_str_one_line, landmark_name): @@ -118,7 +117,7 @@ def get_specific_landmark(self, landmarks_str_one_line, landmark_name): return landmark_found - def create_dataset(self, infer_only: bool = False + def create_dataset(self, infer_only: bool = False, ) -> dataset_item.TextGeoDataset: '''Loads data and creates datasets and train, validate and test sets. Returns: @@ -219,13 +218,13 @@ def load_data(self, data_dir: str, ds_set: str, lines: bool): ds['main_pivot'] = ds.landmarks.apply( lambda x: self.get_specific_landmark(x, 'main_pivot')) - ds['landmarks'] = ds.landmarks.apply(self.process_landmarks) + ds['landmarks'] = ds.apply(self.process_landmarks, axis=1) if 'route' in ds: ds['route'] = ds.route.apply(self.process_route) ds['route_fixed'] = ds.route.apply(self.get_fixed_point_along_route) - ds['start_end'] = ds.route.apply(self.get_fixed_point_along_route) + ds['start_end'] = ds.route.apply(self.get_fixed_point_along_route) columns_keep = ds.columns.difference( [ 'instructions', @@ -353,9 +352,14 @@ def load_data(self, data_dir: str, split: str, lines: bool): return ds def process_landmarks(self, landmarks_dict): - ladmarks_list = list(landmarks_dict.values()) - return [gutil.point_from_list_coord_yx( - landmark_l[-1]) for landmark_l in ladmarks_list if landmark_l[-1]] + landmarks_corrds = [ + landmarks_dict['end_point'][-1], + landmarks_dict['start_point'][-1], + landmarks_dict['main_pivot'][-1], + landmarks_dict['near_pivot'][-1]] + points = [gutil.point_from_list_coord_yx( + coord) for coord in landmarks_corrds] + return points def process_route(self, route_list): return [ diff --git a/cabby/model/text/BUILD b/cabby/model/text/BUILD index d0b085c3..497c0e1a 100755 --- a/cabby/model/text/BUILD +++ b/cabby/model/text/BUILD @@ -24,6 +24,22 @@ py_binary( ], ) +py_binary( + name = 'model_trainer_multitask', + main = 'model_trainer_multitask.py', + srcs = ['model_trainer_multitask.py'], + deps = [ + '//cabby/model/text:train', + '//cabby/model:datasets', + '//cabby/model:dataset_item', + "//cabby/model:util", + "//cabby/geo:util", + ':models' + + ], +) + + py_binary( name = 'models', diff --git a/cabby/model/text/model_trainer.py b/cabby/model/text/model_trainer.py index 3256b303..bb21e896 100755 --- a/cabby/model/text/model_trainer.py +++ b/cabby/model/text/model_trainer.py @@ -91,6 +91,7 @@ flags.DEFINE_string("model_path", None, "A path of a model the model to be fine tuned\ evaluated.") + flags.DEFINE_integer( 'train_batch_size', default=4, help=('Batch size for training.')) @@ -107,6 +108,10 @@ 'infer_only', default=False, help=('Train and infer\ just infer.')) +flags.DEFINE_bool( + 'is_single_sample_train', default=False, + help=('Train on a single sample and do not evaluate.')) + flags.DEFINE_bool( 'is_val_loss_from_model', default=False, @@ -150,23 +155,26 @@ def main(argv): else: sys.exit("Dataset invalid") - dataset = dataset_init( - data_dir = FLAGS.data_dir, - region = FLAGS.region, - s2level = FLAGS.s2_level, - model_type = FLAGS.model) - + + if FLAGS.is_single_sample_train: + FLAGS.train_batch_size = 1 + if os.path.exists(dataset_path): dataset_text = dataset_item.TextGeoDataset.load( - dataset_path = dataset_path, - train_path_dataset = train_path_dataset, - valid_path_dataset = valid_path_dataset, - test_path_dataset = test_path_dataset, + dataset_dir = FLAGS.dataset_dir, + model_type = str(FLAGS.model), + s2_level = FLAGS.s2_level, label_to_cellid_path = label_to_cellid_path, unique_cellid_path = unique_cellid_path, tensor_cellid_path = tensor_cellid_path) else: + dataset = dataset_init( + data_dir = FLAGS.data_dir, + region = FLAGS.region, + s2level = FLAGS.s2_level, + model_type = FLAGS.model) + if not os.path.exists(dataset_model_path): os.mkdir(dataset_model_path) logging.info("Preparing data.") @@ -204,13 +212,17 @@ def main(argv): if 'Dual-Encoder' in FLAGS.model: run_model = models.DualEncoder(device=device) elif FLAGS.model == 'S2-Generation-T5': - run_model = models.S2GenerationModel(dataset_text.label_to_cellid, device=device) + run_model = models.S2GenerationModel( + dataset_text.label_to_cellid, device=device) elif FLAGS.model == 'S2-Generation-T5-Landmarks': - run_model = models.S2GenerationModel(dataset_text.label_to_cellid, is_landmarks=True, device=device) + run_model = models.S2GenerationModel( + dataset_text.label_to_cellid, is_landmarks=True, device=device) elif FLAGS.model == 'S2-Generation-T5-Path': - run_model = models.S2GenerationModel(dataset_text.label_to_cellid, is_path=True, device=device) + run_model = models.S2GenerationModel( + dataset_text.label_to_cellid, is_path=True, device=device) elif FLAGS.model == 'S2-Generation-T5-Warmup-start-end': - run_model = models.S2GenerationModel(dataset_text.label_to_cellid, is_warmup_start_end=True, device=device) + run_model = models.S2GenerationModel( + dataset_text.label_to_cellid, is_warmup_start_end=True, device=device) elif FLAGS.model == 'Classification-Bert': run_model = models.ClassificationModel(n_cells, device=device) else: @@ -250,7 +262,8 @@ def main(argv): cells_tensor = dataset_text.unique_cellids_binary, label_to_cellid = dataset_text.label_to_cellid, is_distance_distribution = FLAGS.is_distance_distribution, - best_valid_loss = run_model.best_valid_loss + best_valid_loss = run_model.best_valid_loss, + is_single_sample_train = FLAGS.is_single_sample_train ) if FLAGS.infer_only: logging.info("Starting to infer model.") @@ -262,17 +275,18 @@ def main(argv): true_points, pred_points) - accuracy = accuracy_score(true_vals, predictions) evaluator = eu.Evaluator() error_distances = evaluator.get_error_distances(trainer.metrics_path) _, mean_distance, median_distance, max_error, norm_auc = ( evaluator.compute_metrics(error_distances)) - logging.info(f"\nTest Accuracy: {accuracy}, \n" + - f"Mean distance: {mean_distance},\nMedian distance: {median_distance},\n" + - f"Max error: {max_error},\nNorm AUC: {norm_auc}") - + logging.info(f"\ + Mean distance: {mean_distance}, \ + Median distance: {median_distance}, \ + Max error: {max_error}, \ + Norm AUC: {norm_auc}") + else: logging.info("Starting to train model.") trainer.train_model() diff --git a/cabby/model/text/model_trainer_multitask.py b/cabby/model/text/model_trainer_multitask.py new file mode 100644 index 00000000..9fba4227 --- /dev/null +++ b/cabby/model/text/model_trainer_multitask.py @@ -0,0 +1,230 @@ +# coding=utf-8 +# Copyright 2020 Google LLC +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Model framework for text and S2Cellids matching. + +Example command line call: +$ bazel-bin/cabby/model/text/model_trainer \ + --data_dir ~/data/wikigeo/pittsburgh \ + --dataset_dir ~/model/dataset/pittsburgh \ + --region Pittsburgh \ + --s2_level 12 \ + --output_dir ~/tmp/output/\ + --train_batch_size 32 \ + --test_batch_size 32 \ + +For infer: +$ bazel-bin/cabby/model/text/model_trainer \ + --data_dir ~/data/wikigeo/pittsburgh \ + --dataset_dir ~/model/dataset/pittsburgh \ + --region Pittsburgh \ + --s2_level 12 \ + --test_batch_size 32 \ + --infer_only True \ + --model_path ~/tmp/model/ \ + --output_dir ~/tmp/output/\ + --task RVS + + + +""" + +from absl import app +from absl import flags + +from absl import logging +import numpy as np +import os +import sys +from sklearn.metrics import accuracy_score +import torch +import torch.optim as optim +import torch.nn as nn +from torch.utils.data import DataLoader +from transformers import AdamW + +from cabby.evals import utils as eu +from cabby.model.text import train +from cabby.model import dataset_item +from cabby.model.text import models +from cabby.model import datasets +from cabby.model import util +from cabby.geo import regions + +TASKS = ["WikiGeo", "RVS", "RUN", "human"] + +FLAGS = flags.FLAGS + +flags.DEFINE_string("dataset_dir_T5_landmarks_RVS", None, + "The directory from which to load the dataset.") + +flags.DEFINE_string("dataset_dir_T5_landmarks_human", None, + "The directory from which to load the dataset.") + +flags.DEFINE_string("dataset_dir_T5_Warmup_start_end_RVS", None, + "The directory from which to load the dataset.") + + +flags.DEFINE_enum( + "region", None, regions.SUPPORTED_REGION_NAMES, + regions.REGION_SUPPORT_MESSAGE) + + + +flags.DEFINE_integer("s2_level", None, "S2 level of the S2Cells.") +flags.DEFINE_string("output_dir", None, + "The directory where the model and results will be save to.") +flags.DEFINE_float( + 'learning_rate', default=5e-5, + help=('The learning rate for the Adam optimizer.')) + +flags.DEFINE_string("model_path", None, + "A path of a model the model to be fine tuned\ evaluated.") + + +flags.DEFINE_integer( + 'train_batch_size', default=4, + help=('Batch size for training.')) + +flags.DEFINE_integer( + 'test_batch_size', default=4, + help=('Batch size for testing and validating.')) + +flags.DEFINE_integer( + 'num_epochs', default=5, + help=('Number of training epochs.')) + +flags.DEFINE_bool( + 'infer_only', default=False, + help=('Train and infer\ just infer.')) + +flags.DEFINE_bool( + 'is_single_sample_train', default=False, + help=('Train on a single sample and do not evaluate.')) + + +flags.DEFINE_bool( + 'is_val_loss_from_model', default=False, + help=('In case the model is loaded - should the validation loss use the models current loss.')) + +flags.DEFINE_bool( + 'is_distance_distribution', default=False, + help=( + 'Add probability over cells according to the distance from start point.'+ + 'This is optional only for RVS and RUN.')) + + +# Required flags. +flags.mark_flag_as_required("region") +flags.mark_flag_as_required("s2_level") +flags.mark_flag_as_required("dataset_dir_T5_landmarks_RVS") +flags.mark_flag_as_required("dataset_dir_T5_landmarks_human") +flags.mark_flag_as_required("dataset_dir_T5_Warmup_start_end_RVS") + +def main(argv): + + + dataset_model_path = os.path.join(FLAGS.dataset_dir_T5_landmarks_RVS, "S2-Generation-T5-Landmarks") + dataset_path = os.path.join(dataset_model_path, str(FLAGS.s2_level)) + + unique_cellid_path = os.path.join(dataset_path,"unique_cellid.npy") + tensor_cellid_path = os.path.join(dataset_path,"tensor_cellid.pth") + label_to_cellid_path = os.path.join(dataset_path,"label_to_cellid.npy") + + + path_exists = [ + f for f in [ + FLAGS.dataset_dir_T5_landmarks_RVS, + FLAGS.dataset_dir_T5_landmarks_human, + FLAGS.dataset_dir_T5_Warmup_start_end_RVS] if os.path.isfile(f)] + + if not all(path_exists): + sys.exit() + + + dataset_t5_rvs = dataset_item.TextGeoDataset.load( + dataset_dir = FLAGS.dataset_dir_T5_landmarks_RVS, + model_type = "S2-Generation-T5-Landmarks", + s2_level = FLAGS.s2_level, + label_to_cellid_path = label_to_cellid_path, + unique_cellid_path = unique_cellid_path, + tensor_cellid_path = tensor_cellid_path) + + + dataset_t5_human = dataset_item.TextGeoDataset.load( + dataset_dir = FLAGS.dataset_dir_T5_landmarks_human, + model_type = "S2-Generation-T5-Landmarks", + s2_level = FLAGS.s2_level, + label_to_cellid_path = label_to_cellid_path, + unique_cellid_path = unique_cellid_path, + tensor_cellid_path = tensor_cellid_path) + + dataset_t5_warmup = dataset_item.TextGeoDataset.load( + dataset_dir = FLAGS.dataset_dir_T5_Warmup_start_end_RVS, + model_type = "S2-Generation-T5-Warmup-start-end", + s2_level = FLAGS.s2_level, + label_to_cellid_path = label_to_cellid_path, + unique_cellid_path = unique_cellid_path, + tensor_cellid_path = tensor_cellid_path) + + train_loader_t5_rvs = DataLoader( + dataset_t5_rvs.train, batch_size=FLAGS.train_batch_size, shuffle=True) + train_loader_t5_human = DataLoader( + dataset_t5_human.train, batch_size=FLAGS.train_batch_size, shuffle=True) + train_loader_t5_warmup = DataLoader( + dataset_t5_warmup.train, batch_size=FLAGS.train_batch_size, shuffle=True) + + + valid_loader_t5_human = DataLoader( + dataset_t5_human.valid, batch_size=FLAGS.test_batch_size, shuffle=False) + test_loader_t5_human = DataLoader( + dataset_t5_human.test, batch_size=FLAGS.test_batch_size, shuffle=False) + + device = torch.device( + 'cuda') if torch.cuda.is_available() else torch.device('cpu') + + + run_model = models.S2GenerationModel( + dataset_t5_rvs.label_to_cellid, is_landmarks=True, is_warmup_start_end=True, device=device) + + run_model.to(device) + + optimizer = torch.optim.Adam( + run_model.parameters(), lr=FLAGS.learning_rate) + + run_model.best_valid_loss = float("Inf") + + trainer = train.Trainer( + model=run_model, + device=device, + num_epochs=FLAGS.num_epochs, + optimizer=optimizer, + train_loader=[train_loader_t5_rvs, train_loader_t5_human, train_loader_t5_warmup], + valid_loader=valid_loader_t5_human, + test_loader=test_loader_t5_human, + unique_cells = dataset_t5_human.unique_cellids, + file_path=FLAGS.output_dir, + cells_tensor = dataset_t5_human.unique_cellids_binary, + label_to_cellid = dataset_t5_human.label_to_cellid, + is_distance_distribution = FLAGS.is_distance_distribution, + best_valid_loss = run_model.best_valid_loss, + is_single_sample_train = FLAGS.is_single_sample_train + ) + + logging.info("Starting to train model.") + trainer.multi_train_model() + + +if __name__ == '__main__': + app.run(main) \ No newline at end of file diff --git a/cabby/model/text/models.py b/cabby/model/text/models.py index ac2ccbf7..fbaf7acc 100644 --- a/cabby/model/text/models.py +++ b/cabby/model/text/models.py @@ -144,7 +144,12 @@ def text_embed(self, text): class S2GenerationModel(GeneralModel): def __init__( - self, label_to_cellid, device, is_landmarks=False, is_path=False, is_warmup_start_end=False): + self, + label_to_cellid, + device, + is_landmarks=False, + is_path=False, + is_warmup_start_end=False): GeneralModel.__init__(self, device) self.model = T5ForConditionalGeneration.from_pretrained(T5_TYPE) self.tokenizer = T5Tokenizer.from_pretrained(T5_TYPE) diff --git a/cabby/model/util.py b/cabby/model/util.py index 433d7e27..131d94da 100644 --- a/cabby/model/util.py +++ b/cabby/model/util.py @@ -128,7 +128,6 @@ def load_checkpoint(load_path: Text, model: torch.nn.Module, return state_dict = torch.load(load_path, map_location=device) - logging.info(f'Model loaded from <== {load_path}') if isinstance(model, nn.DataParallel): model.module.load_state_dict(state_dict['model_state_dict']) @@ -137,6 +136,8 @@ def load_checkpoint(load_path: Text, model: torch.nn.Module, model.load_state_dict(state_dict['model_state_dict']) model.best_valid_loss = state_dict['valid_loss'] + logging.info(f'Model loaded from <== {load_path} with validation loss {model.best_valid_loss}') + return state_dict diff --git a/run_model_multi_task.sh b/run_model_multi_task.sh new file mode 100644 index 00000000..f1afac0d --- /dev/null +++ b/run_model_multi_task.sh @@ -0,0 +1,18 @@ + +echo "** S2-Generation-T5-Landmarks - RVS **" +bazel-bin/cabby/model/text/model_trainer --data_dir /mnt/hackney/rvs/model/manhattan --dataset_dir /mnt/hackney/rvs/model/manhattan/fixedlandmarks --region Manhattan --s2_level 18 --output_dir /mnt/hackney/rvs/multitask/18 --test_batch_size 64 --num_epochs 1 --task RVS --model S2-Generation-T5-Landmarks --is_single_sample_train True + +for i in {1..1000} +do + echo "~~~~~~~~~~~~~~~~~~~~~~~~ Epoch $i ~~~~~~~~~~~~~~~~~~~~~~~~" + echo "** S2-Generation-T5-Landmarks - RVS **" + bazel-bin/cabby/model/text/model_trainer --data_dir /mnt/hackney/rvs/model/manhattan --dataset_dir /mnt/hackney/rvs/model/manhattan/fixedlandmarks --region Manhattan --s2_level 18 --output_dir /mnt/hackney/rvs/multitask/18 --test_batch_size 64 --num_epochs 1 --task RVS --model S2-Generation-T5-Landmarks --model_path /mnt/hackney/rvs/multitask/18/model.pt --is_val_loss_from_model True --is_single_sample_train True + echo "** S2-Generation-T5-Landmarks - human **" + bazel-bin/cabby/model/text/model_trainer --data_dir /mnt/hackney/human_data/landmarks_data --dataset_dir /mnt/hackney/human_data/fixedlandmarks --region Manhattan --s2_level 18 --output_dir /mnt/hackney/rvs/multitask/18 --test_batch_size 64 --num_epochs 1 --task human --model S2-Generation-T5-Landmarks --model_path /mnt/hackney/rvs/multitask/18/model.pt --is_val_loss_from_model True --is_single_sample_train True + echo "** S2-Generation-T5-Warmup-start-end - RVS **" + bazel-bin/cabby/model/text/model_trainer --data_dir /mnt/hackney/rvs/data --dataset_dir /mnt/hackney/rvs/model/manhattan/warmup-start-end --region Manhattan --s2_level 18 --output_dir /mnt/hackney/rvs/multitask/18 --test_batch_size 64 --num_epochs 1 --task RVS --model S2-Generation-T5-Warmup-start-end --model_path /mnt/hackney/rvs/multitask/18/model.pt --is_val_loss_from_model True --is_single_sample_train True +done +echo "** S2-Generation-T5-Landmarks - RVS - final **" +bazel-bin/cabby/model/text/model_trainer --data_dir /mnt/hackney/rvs/model/manhattan --dataset_dir /mnt/hackney/rvs/model/manhattan/fixedlandmarks --region Manhattan --s2_level 18 --output_dir /mnt/hackney/rvs/multitask/18 --test_batch_size 64 --num_epochs 1 --task RVS --model S2-Generation-T5-Landmarks --model_path /mnt/hackney/rvs/multitask/18/model.pt --is_val_loss_from_model True --is_single_sample_train True +echo "** S2-Generation-T5-Landmarks - human - final **" +bazel-bin/cabby/model/text/model_trainer --data_dir /mnt/hackney/human_data/landmarks_data --dataset_dir /mnt/hackney/human_data/fixedlandmarks --region Manhattan --s2_level 18 --output_dir /mnt/hackney/rvs/multitask/18 --train_batch_size 16 --test_batch_size 64 --num_epochs 1 --task human --model S2-Generation-T5-Landmarks --model_path /mnt/hackney/rvs/multitask/18/model.pt --is_val_loss_from_model True \ No newline at end of file