From 68a19351f060dac1253021ac6671bc17f804d6ec Mon Sep 17 00:00:00 2001 From: howardchanth Date: Thu, 10 Jun 2021 22:32:51 +0800 Subject: [PATCH 01/12] * Initial ideas of adding incremental training feature --- torchbiggraph/checkpoint_enlarge.py | 84 +++++++++++++++++++++++++++++ update_plan.txt | 5 ++ 2 files changed, 89 insertions(+) create mode 100644 torchbiggraph/checkpoint_enlarge.py create mode 100644 update_plan.txt diff --git a/torchbiggraph/checkpoint_enlarge.py b/torchbiggraph/checkpoint_enlarge.py new file mode 100644 index 00000000..be1de09b --- /dev/null +++ b/torchbiggraph/checkpoint_enlarge.py @@ -0,0 +1,84 @@ +""" +Python Script enlarging existing checkpoint files +""" + +import json +import h5py + +#!/usr/bin/env python3 + +# Copyright (c) Facebook, Inc. and its affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE.txt file in the root directory of this source tree. + +import argparse +import logging +from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple + +from torchbiggraph.config import ConfigFileLoader, ConfigSchema, add_to_sys_path +from torchbiggraph.types import SINGLE_TRAINER, Rank +from torchbiggraph.util import ( + setup_logging, +) + + +logger = logging.getLogger("torchbiggraph") +dist_logger = logging.LoggerAdapter(logger, {"distributed": True}) + + +# TODO: Read the updated count files to initialize new embeddings +# TODO: Can merge this feature to the beginning of the training +# TODO: Load the previous entity mapping and append new entities in new partition +def enlarge_checkpoint_files(config_dict): + """ + :param: + + :return: + """ + checkpoint_path = config_dict.checkpoint_path + ( + entity_configs, + relation_configs, + entity_path, + edge_paths, + dynamic_relations, + ) = parse_config_partial( # noqa + config_dict + ) + + # Load specified embedding + embeddings_paths = [checkpoint_path / f"embeddings_{entity_name}_{partition}.v60.h5" + for entity_name, entity_config in entity_configs.items() + for partition in range(entity_config.num_partitions) + ] + + for emb_path in embeddings_paths: + with h5py.File(emb_path, "r") as hf: + print(hf["embeddings"][:]) + + +def main(): + setup_logging() + config_help = "\n\nConfig parameters:\n\n" + "\n".join(ConfigSchema.help()) + parser = argparse.ArgumentParser( + epilog=config_help, + # Needed to preserve line wraps in epilog. + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("config", help="Path to config file") + parser.add_argument("-p", "--param", action="append", nargs="*") + parser.add_argument( + "--rank", + type=int, + default=SINGLE_TRAINER, + help="For multi-machine, this machine's rank", + ) + opt = parser.parse_args() + + loader = ConfigFileLoader() + config = loader.load_config(opt.config, opt.param) + + enlarge_checkpoint_files(config) + diff --git a/update_plan.txt b/update_plan.txt new file mode 100644 index 00000000..f3cce3af --- /dev/null +++ b/update_plan.txt @@ -0,0 +1,5 @@ +1. Load entity mapping from previously pbg_input names +2. Append new entities to the end of dictionary +3. modify load_entities_by_type to preserve the previous mapping +4. Enlarge the embeddings before training - add script to train_cpu.py + -> Read the count files from newly partitioned data \ No newline at end of file From affbd9e95e751a2c5d503668caaa76021dfaabea Mon Sep 17 00:00:00 2001 From: howardchanth Date: Sat, 12 Jun 2021 23:02:44 +0800 Subject: [PATCH 02/12] * Initial version of incremental training --- test/test_checkpoint_manager.py | 1 + torchbiggraph/config.py | 7 +++++++ torchbiggraph/train_cpu.py | 27 +++++++++++++++++++++++++++ 3 files changed, 35 insertions(+) diff --git a/test/test_checkpoint_manager.py b/test/test_checkpoint_manager.py index 8f86dc88..a545528e 100644 --- a/test/test_checkpoint_manager.py +++ b/test/test_checkpoint_manager.py @@ -58,6 +58,7 @@ def test_basic(self): entity_path="foo", edge_paths=["bar"], checkpoint_path="baz", + init_entity_path="foo" ) metadata = ConfigMetadataProvider(config).get_checkpoint_metadata() self.assertIsInstance(metadata, dict) diff --git a/torchbiggraph/config.py b/torchbiggraph/config.py index 5e8854f7..a44fe572 100644 --- a/torchbiggraph/config.py +++ b/torchbiggraph/config.py @@ -256,6 +256,13 @@ class ConfigSchema(Schema): "the entities of some types." }, ) + init_entity_path: Optional[str] = attr.ib( + default=None, + metadata={ + "help": "If set, it must be a path to a directory that " + "contains initial values of the entities and their offsets " + }, + ) checkpoint_preservation_interval: Optional[int] = attr.ib( default=None, metadata={ diff --git a/torchbiggraph/train_cpu.py b/torchbiggraph/train_cpu.py index b87f0349..7bf9b884 100644 --- a/torchbiggraph/train_cpu.py +++ b/torchbiggraph/train_cpu.py @@ -472,6 +472,19 @@ def __init__( # noqa else: self.loadpath_manager = None + # TODO: Need to read new offsets from entity_storage here + + # Load previously partitioned entities and their offsets + if config.init_entity_path: + init_entity_storage = ENTITY_STORAGES.make_instance(config.init_entity_path) + self.init_entity_offsets: Dict[str, List[str]] = {} + for entity, econf in config.entities.items(): + for part in range(econf.num_partitions): + self.init_entity_offsets[entity].\ + append(init_entity_storage.load_names(entity, part)) + else: + self.init_entity_offsets = None + # load model from checkpoint or loadpath, if available state_dict, optim_state = checkpoint_manager.maybe_read_model() if state_dict is None and self.loadpath_manager is not None: @@ -488,6 +501,20 @@ def __init__( # noqa dimension = config.entity_dimension(entity) embs = torch.FloatTensor(s).view(-1, dimension)[:count] embs, optimizer = self._load_embeddings(entity, UNPARTITIONED, out=embs) + """ + Enlarging the embeddings from previous run. The function takes in + the trained N old embeddings from init_path defined, and enlarge it to + N + M embeddings, with M be the number of new entities joining the training, + and initialize the M embeddings with random numbers + """ + if self.init_entity_offsets: + new_embs = torch.FloatTensor(s).view(-1, dimension)[:count] + new_names = entity_storage.load_names(entity, UNPARTITIONED) + init_subset = [new_names.index(name) + for name in self.init_entity_offsets[entity]] + # Initialize embeddings from previous checkpoint + new_embs[init_subset, :] = embs + embs = new_embs holder.unpartitioned_embeddings[entity] = embs trainer.unpartitioned_optimizers[entity] = optimizer From 010ddfa439e3df6e9c53a6b938a47c2e807d0298 Mon Sep 17 00:00:00 2001 From: howardchanth Date: Tue, 15 Jun 2021 11:42:39 +0800 Subject: [PATCH 03/12] * Initial workable version of incremental training --- torchbiggraph/train_cpu.py | 50 ++++++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/torchbiggraph/train_cpu.py b/torchbiggraph/train_cpu.py index 7bf9b884..546bfd4f 100644 --- a/torchbiggraph/train_cpu.py +++ b/torchbiggraph/train_cpu.py @@ -255,6 +255,7 @@ def __init__( # noqa logger.info("Loading entity counts...") entity_storage = ENTITY_STORAGES.make_instance(config.entity_path) + self.entity_storage = entity_storage entity_counts: Dict[str, List[int]] = {} for entity, econf in config.entities.items(): entity_counts[entity] = [] @@ -472,16 +473,20 @@ def __init__( # noqa else: self.loadpath_manager = None - # TODO: Need to read new offsets from entity_storage here - # Load previously partitioned entities and their offsets + # TODO: Bring this tp importer level so that train gpu can also use if config.init_entity_path: init_entity_storage = ENTITY_STORAGES.make_instance(config.init_entity_path) self.init_entity_offsets: Dict[str, List[str]] = {} + self.init_entity_counts: Dict[str, List[int]] = {} for entity, econf in config.entities.items(): + self.init_entity_offsets[entity] = [] + self.init_entity_counts[entity] = [] for part in range(econf.num_partitions): self.init_entity_offsets[entity].\ append(init_entity_storage.load_names(entity, part)) + self.init_entity_counts[entity].\ + append(init_entity_storage.load_count(entity, part)) else: self.init_entity_offsets = None @@ -496,7 +501,10 @@ def __init__( # noqa logger.debug("Loading unpartitioned entities...") for entity in holder.lhs_unpartitioned_types | holder.rhs_unpartitioned_types: - count = entity_counts[entity][0] + if self.init_entity_offsets is not None: + count = self.init_entity_counts[entity][0] + else: + count = entity_counts[entity][0] s = embedding_storage_freelist[entity].pop() dimension = config.entity_dimension(entity) embs = torch.FloatTensor(s).view(-1, dimension)[:count] @@ -508,12 +516,15 @@ def __init__( # noqa and initialize the M embeddings with random numbers """ if self.init_entity_offsets: - new_embs = torch.FloatTensor(s).view(-1, dimension)[:count] - new_names = entity_storage.load_names(entity, UNPARTITIONED) + count = self.entity_counts[entity][0] + new_embs = torch.rand((count, dimension)) + new_names = self.entity_storage.load_names(entity, part) init_subset = [new_names.index(name) - for name in self.init_entity_offsets[entity]] + for name in self.init_entity_offsets[entity][part]] # Initialize embeddings from previous checkpoint - new_embs[init_subset, :] = embs + new_embs[init_subset, :] = embs.clone() + # Test case 1: Whether the embeddings are correctly mapped into the new embeddings + assert torch.equal(new_embs[init_subset[0], :], embs[0]) embs = new_embs holder.unpartitioned_embeddings[entity] = embs trainer.unpartitioned_optimizers[entity] = optimizer @@ -842,13 +853,36 @@ def _swap_partitioned_embeddings( for entity, part in new_parts - old_parts: logger.debug(f"Loading ({entity} {part})") force_dirty = self.bucket_scheduler.check_and_set_dirty(entity, part) - count = self.entity_counts[entity][part] + # There are trained embeddings already, updating the + # offsets here + if self.init_entity_offsets is not None: + count = self.init_entity_counts[entity][part] + else: + count = self.entity_counts[entity][part] s = self.embedding_storage_freelist[entity].pop() dimension = self.config.entity_dimension(entity) embs = torch.FloatTensor(s).view(-1, dimension)[:count] embs, optimizer = self._load_embeddings( entity, part, out=embs, strict=self.strict, force_dirty=force_dirty ) + """ + Enlarging the embeddings from previously trained embeddings. We take in + the trained N old embeddings from init_path defined, and enlarge it to + N + M embeddings, with M be the number of new entities joining the training, + and initialize the M embeddings with random numbers + """ + if self.init_entity_offsets: + count = self.entity_counts[entity][part] + # Initialize an (N + M) X (emb_dim) enlarged embeddings storage + new_embs = torch.rand((count, dimension)) + new_names = self.entity_storage.load_names(entity, part) + init_subset = [new_names.index(name) + for name in self.init_entity_offsets[entity][part]] + # Initialize embeddings from previous checkpoint + new_embs[init_subset, :] = embs.clone() + # Test case 1: Whether the embeddings are correctly mapped into the new embeddings + assert torch.equal(new_embs[init_subset[0], :], embs[0]) + embs = new_embs holder.partitioned_embeddings[entity, part] = embs self.trainer.partitioned_optimizers[entity, part] = optimizer io_bytes += embs.numel() * embs.element_size() # ignore optim state From 722361811c2e8c46869332cc820ea87801612e65 Mon Sep 17 00:00:00 2001 From: howardchanth Date: Tue, 15 Jun 2021 16:13:11 +0800 Subject: [PATCH 04/12] * Faster loading of pretrained embeddings in enlargements --- torchbiggraph/train_cpu.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/torchbiggraph/train_cpu.py b/torchbiggraph/train_cpu.py index 546bfd4f..a85a4f81 100644 --- a/torchbiggraph/train_cpu.py +++ b/torchbiggraph/train_cpu.py @@ -872,21 +872,34 @@ def _swap_partitioned_embeddings( and initialize the M embeddings with random numbers """ if self.init_entity_offsets: + logger.debug(f"Enlarging from pretrained embeddings of entity {entity} in partition {part}") count = self.entity_counts[entity][part] # Initialize an (N + M) X (emb_dim) enlarged embeddings storage new_embs = torch.rand((count, dimension)) new_names = self.entity_storage.load_names(entity, part) - init_subset = [new_names.index(name) - for name in self.init_entity_offsets[entity][part]] + + subset_idxs = set() + for name in self.init_entity_offsets[entity][part]: + subset_idxs.add(new_names.index(name)) + subset_idxs = list(subset_idxs) + # subset_idxs = [new_names.index(name) for name in self.init_entity_offsets[entity][part]] + # Initialize embeddings from previous checkpoint - new_embs[init_subset, :] = embs.clone() + new_embs[subset_idxs, :] = embs.detach().clone() + # Test case 1: Whether the embeddings are correctly mapped into the new embeddings - assert torch.equal(new_embs[init_subset[0], :], embs[0]) + assert torch.equal(new_embs[subset_idxs, :], embs) + embs = new_embs + logger.debug(f"Loaded {entity} embeddings of shape {embs.shape}") holder.partitioned_embeddings[entity, part] = embs self.trainer.partitioned_optimizers[entity, part] = optimizer io_bytes += embs.numel() * embs.element_size() # ignore optim state + # Load the pretrained embeddings only once + # TODO: Temporary solution, refactor later + self.init_entity_offsets = None + assert new_parts == holder.partitioned_embeddings.keys() return io_bytes From 0ea84d987e88090d00d79924feeec2bf7f72d0b8 Mon Sep 17 00:00:00 2001 From: howardchanth Date: Wed, 16 Jun 2021 16:17:41 +0800 Subject: [PATCH 05/12] * Workable version of recurrent training --- torchbiggraph/train_cpu.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/torchbiggraph/train_cpu.py b/torchbiggraph/train_cpu.py index a85a4f81..7e8ecb61 100644 --- a/torchbiggraph/train_cpu.py +++ b/torchbiggraph/train_cpu.py @@ -874,23 +874,30 @@ def _swap_partitioned_embeddings( if self.init_entity_offsets: logger.debug(f"Enlarging from pretrained embeddings of entity {entity} in partition {part}") count = self.entity_counts[entity][part] - # Initialize an (N + M) X (emb_dim) enlarged embeddings storage new_embs = torch.rand((count, dimension)) - new_names = self.entity_storage.load_names(entity, part) + # Initialize an (N + M) X (emb_dim) enlarged embeddings storage + init_names: Set = set(self.init_entity_offsets[entity][part]) + new_names: List = self.entity_storage.load_names(entity, part) + subset_idxs = {name: j for (j, name) in enumerate(init_names)} + + for i, new_name in enumerate(new_names): - subset_idxs = set() - for name in self.init_entity_offsets[entity][part]: - subset_idxs.add(new_names.index(name)) - subset_idxs = list(subset_idxs) - # subset_idxs = [new_names.index(name) for name in self.init_entity_offsets[entity][part]] + if new_name in init_names: + subset_idxs[new_name] = i - # Initialize embeddings from previous checkpoint + if (i+1) % 1000000 == 0: + logger.debug(f"Mapped {i} entities...") + + subset_idxs = list(subset_idxs.values()) new_embs[subset_idxs, :] = embs.detach().clone() # Test case 1: Whether the embeddings are correctly mapped into the new embeddings + logger.debug(f"{new_embs[subset_idxs[0], :]}") + logger.debug(embs[0]) assert torch.equal(new_embs[subset_idxs, :], embs) embs = new_embs + logger.debug(f"Loaded {entity} embeddings of shape {embs.shape}") holder.partitioned_embeddings[entity, part] = embs self.trainer.partitioned_optimizers[entity, part] = optimizer From 3f0a9d6a8a20ada32c09a661326c3baedb8da122 Mon Sep 17 00:00:00 2001 From: howardchanth Date: Thu, 17 Jun 2021 14:39:45 +0800 Subject: [PATCH 06/12] * Fix bugs in recurrent training * Optimizer state not loaded --- torchbiggraph/train_cpu.py | 73 +++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/torchbiggraph/train_cpu.py b/torchbiggraph/train_cpu.py index 7e8ecb61..9a9fff40 100644 --- a/torchbiggraph/train_cpu.py +++ b/torchbiggraph/train_cpu.py @@ -802,6 +802,39 @@ def _load_embeddings( fast_approx_rand(embs) embs.mul_(self.config.init_scale) optim_state = None + """ + Enlarging the embeddings from previously trained embeddings. We take in + the trained N old embeddings from init_path defined, and enlarge it to + N + M embeddings, with M be the number of new entities joining the training, + and initialize the M embeddings with random numbers + """ + if self.init_entity_offsets is not None: + logger.debug(f"Enlarging from pretrained embeddings of entity {entity} in partition {part}") + count = self.entity_counts[entity][part] + dimension = self.config.entity_dimension(entity) + new_embs = torch.rand((count, dimension)) + # Initialize an (N + M) X (emb_dim) enlarged embeddings storage + init_names: Set = set(self.init_entity_offsets[entity][part]) + new_names: List = self.entity_storage.load_names(entity, part) + subset_idxs = {name: j for (j, name) in enumerate(init_names)} + + for i, new_name in enumerate(new_names): + if new_name in init_names: + subset_idxs[new_name] = i + + if (i + 1) % 1000000 == 0: + logger.debug(f"Mapped {i+1} entities...") + + subset_idxs = list(subset_idxs.values()) + new_embs[subset_idxs, :] = embs.detach().clone() + + # Test case 1: Whether the embeddings are correctly mapped into the new embeddings + assert torch.equal(new_embs[subset_idxs, :], embs) + + embs = new_embs + optim_state = None + logger.debug(f"Loaded {entity} embeddings of shape {embs.shape}") + embs = torch.nn.Parameter(embs) optimizer = make_optimizer(self.config, [embs], True) if optim_state is not None: @@ -865,47 +898,15 @@ def _swap_partitioned_embeddings( embs, optimizer = self._load_embeddings( entity, part, out=embs, strict=self.strict, force_dirty=force_dirty ) - """ - Enlarging the embeddings from previously trained embeddings. We take in - the trained N old embeddings from init_path defined, and enlarge it to - N + M embeddings, with M be the number of new entities joining the training, - and initialize the M embeddings with random numbers - """ - if self.init_entity_offsets: - logger.debug(f"Enlarging from pretrained embeddings of entity {entity} in partition {part}") - count = self.entity_counts[entity][part] - new_embs = torch.rand((count, dimension)) - # Initialize an (N + M) X (emb_dim) enlarged embeddings storage - init_names: Set = set(self.init_entity_offsets[entity][part]) - new_names: List = self.entity_storage.load_names(entity, part) - subset_idxs = {name: j for (j, name) in enumerate(init_names)} - for i, new_name in enumerate(new_names): - - if new_name in init_names: - subset_idxs[new_name] = i - - if (i+1) % 1000000 == 0: - logger.debug(f"Mapped {i} entities...") - - subset_idxs = list(subset_idxs.values()) - new_embs[subset_idxs, :] = embs.detach().clone() - - # Test case 1: Whether the embeddings are correctly mapped into the new embeddings - logger.debug(f"{new_embs[subset_idxs[0], :]}") - logger.debug(embs[0]) - assert torch.equal(new_embs[subset_idxs, :], embs) - - embs = new_embs - - logger.debug(f"Loaded {entity} embeddings of shape {embs.shape}") holder.partitioned_embeddings[entity, part] = embs self.trainer.partitioned_optimizers[entity, part] = optimizer io_bytes += embs.numel() * embs.element_size() # ignore optim state - # Load the pretrained embeddings only once - # TODO: Temporary solution, refactor later - self.init_entity_offsets = None + # Load the pretrained embeddings only once + # TODO: Temporary solution, refactor later + self.init_entity_offsets = None + assert new_parts == holder.partitioned_embeddings.keys() From 0ecff68278577ee528aebc3ef449808403b810d7 Mon Sep 17 00:00:00 2001 From: howardchanth Date: Mon, 28 Jun 2021 18:16:33 +0800 Subject: [PATCH 07/12] * Develop workable v2, still investigating bugs --- torchbiggraph/checkpoint_enlarge.py | 84 ---------------- torchbiggraph/checkpoint_manager.py | 81 +++++++++++++++ torchbiggraph/train_cpu.py | 146 ++++++++++++++-------------- 3 files changed, 155 insertions(+), 156 deletions(-) delete mode 100644 torchbiggraph/checkpoint_enlarge.py diff --git a/torchbiggraph/checkpoint_enlarge.py b/torchbiggraph/checkpoint_enlarge.py deleted file mode 100644 index be1de09b..00000000 --- a/torchbiggraph/checkpoint_enlarge.py +++ /dev/null @@ -1,84 +0,0 @@ -""" -Python Script enlarging existing checkpoint files -""" - -import json -import h5py - -#!/usr/bin/env python3 - -# Copyright (c) Facebook, Inc. and its affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE.txt file in the root directory of this source tree. - -import argparse -import logging -from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple - -from torchbiggraph.config import ConfigFileLoader, ConfigSchema, add_to_sys_path -from torchbiggraph.types import SINGLE_TRAINER, Rank -from torchbiggraph.util import ( - setup_logging, -) - - -logger = logging.getLogger("torchbiggraph") -dist_logger = logging.LoggerAdapter(logger, {"distributed": True}) - - -# TODO: Read the updated count files to initialize new embeddings -# TODO: Can merge this feature to the beginning of the training -# TODO: Load the previous entity mapping and append new entities in new partition -def enlarge_checkpoint_files(config_dict): - """ - :param: - - :return: - """ - checkpoint_path = config_dict.checkpoint_path - ( - entity_configs, - relation_configs, - entity_path, - edge_paths, - dynamic_relations, - ) = parse_config_partial( # noqa - config_dict - ) - - # Load specified embedding - embeddings_paths = [checkpoint_path / f"embeddings_{entity_name}_{partition}.v60.h5" - for entity_name, entity_config in entity_configs.items() - for partition in range(entity_config.num_partitions) - ] - - for emb_path in embeddings_paths: - with h5py.File(emb_path, "r") as hf: - print(hf["embeddings"][:]) - - -def main(): - setup_logging() - config_help = "\n\nConfig parameters:\n\n" + "\n".join(ConfigSchema.help()) - parser = argparse.ArgumentParser( - epilog=config_help, - # Needed to preserve line wraps in epilog. - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("config", help="Path to config file") - parser.add_argument("-p", "--param", action="append", nargs="*") - parser.add_argument( - "--rank", - type=int, - default=SINGLE_TRAINER, - help="For multi-machine, this machine's rank", - ) - opt = parser.parse_args() - - loader = ConfigFileLoader() - config = loader.load_config(opt.config, opt.param) - - enlarge_checkpoint_files(config) - diff --git a/torchbiggraph/checkpoint_manager.py b/torchbiggraph/checkpoint_manager.py index 40402bcf..e8ea5767 100644 --- a/torchbiggraph/checkpoint_manager.py +++ b/torchbiggraph/checkpoint_manager.py @@ -35,6 +35,7 @@ Partition, Rank, ) +from torchbiggraph.graph_storages import AbstractEntityStorage from torchbiggraph.util import CouldNotLoadData @@ -448,6 +449,86 @@ def preserve_current_version(self, config: ConfigSchema, epoch_idx: int) -> None self.storage.copy_model_to_snapshot(version, epoch_idx) self.storage.copy_version_to_snapshot(version, epoch_idx) + def enlarge( + self, + config: ConfigSchema, + init_entity_storage: AbstractEntityStorage, + entity_storage: AbstractEntityStorage, + entity_counts: Dict[str, List[int]], + ) -> None: + """ + Enlarge a checkpoint to the new checkpoint path + + * Read new entity counts and offsets from the updated partitioned data + * Enlarge previous N embeddings to N + M with M new entities + - Map the previous N embeddings to according to the new offsets + - Initialize the rest M embeddings to with random vectors + @param config: Config dictionary for the PBG run + @param init_entity_storage: + @param entity_storage: + @param entity_counts: + @return: None + """ + logger.debug(f"Enlarging checkpoint from {config.init_path} to {config.checkpoint_path}") + + init_entity_offsets: Dict[str, List[str]] = {} + init_entity_counts: Dict[str, List[int]] = {} + init_checkpoint_storage: AbstractCheckpointStorage = CHECKPOINT_STORAGES.make_instance(config.init_path) + init_version: int = init_checkpoint_storage.load_version() + metadata = self.collect_metadata() + # Load offsets from initial entities + for entity, econf in config.entities.items(): + init_entity_offsets[entity] = [] + init_entity_counts[entity] = [] + for part in range(econf.num_partitions): + init_entity_offsets[entity]. \ + append(init_entity_storage.load_names(entity, part)) + init_entity_counts[entity]. \ + append(init_entity_storage.load_count(entity, part)) + + # Enlarge embeddings to the new check point + for entity, econf in config.entities.items(): + for part in range(econf.num_partitions): + + embs, _ = init_checkpoint_storage.load_entity_partition(init_version, entity, part) + + new_count = entity_counts[entity][part] + dimension = config.entity_dimension(entity) + + new_embs = torch.randn((new_count, dimension)) + + logger.debug(f"Loaded old {entity} embeddings of shape {embs.shape}") + logger.debug(f"Loading {entity} embeddings of shape {new_embs.shape}") + logger.debug(f"Old embeddings {entity}{embs[0]}") + + # Initialize an (N + M) X (emb_dim) enlarged embeddings storage + init_names: Set = set(init_entity_offsets[entity][part]) + new_names: List = entity_storage.load_names(entity, part) + subset_idxs = {name: j for (j, name) in enumerate(init_names)} + + for i, new_name in enumerate(new_names): + if new_name in init_names: + subset_idxs[new_name] = i + + if (i + 1) % 1000000 == 0: + logger.debug(f"Mapped {i + 1} entities...") + + subset_idxs = list(subset_idxs.values()) + # Enlarged embeddings with the offsets obtained from previous training + # Initialize new embeddings with random numbers + old_embs = embs.clone() + new_embs[subset_idxs, :] = embs.clone() + + # Test case 1: Whether the embeddings are correctly mapped into the new embeddings + logger.debug(f"New embs at index {0} \n {new_embs[0, :]}") + + assert torch.equal(new_embs[subset_idxs, :], old_embs) + + embs = new_embs + optim_state = None + + self.storage.save_entity_partition(0, entity, part, embs, optim_state, metadata) + def close(self) -> None: self.join() diff --git a/torchbiggraph/train_cpu.py b/torchbiggraph/train_cpu.py index 9a9fff40..50a85842 100644 --- a/torchbiggraph/train_cpu.py +++ b/torchbiggraph/train_cpu.py @@ -470,25 +470,34 @@ def __init__( # noqa if config.init_path is not None: self.loadpath_manager = CheckpointManager(config.init_path) + if config.init_entity_path is not None: + # Enlarge the embeddings if there is a change in entity counts + # Checkpoint will be enlarge from init_path to new checkpoint path + init_entity_storage = ENTITY_STORAGES.make_instance(config.init_entity_path) + self.checkpoint_manager.enlarge(config, + init_entity_storage, + self.entity_storage, + entity_counts + ) else: self.loadpath_manager = None - # Load previously partitioned entities and their offsets - # TODO: Bring this tp importer level so that train gpu can also use - if config.init_entity_path: - init_entity_storage = ENTITY_STORAGES.make_instance(config.init_entity_path) - self.init_entity_offsets: Dict[str, List[str]] = {} - self.init_entity_counts: Dict[str, List[int]] = {} - for entity, econf in config.entities.items(): - self.init_entity_offsets[entity] = [] - self.init_entity_counts[entity] = [] - for part in range(econf.num_partitions): - self.init_entity_offsets[entity].\ - append(init_entity_storage.load_names(entity, part)) - self.init_entity_counts[entity].\ - append(init_entity_storage.load_count(entity, part)) - else: - self.init_entity_offsets = None + # # Load previously partitioned entities and their offsets + # if config.init_entity_path is not None: + # init_entity_storage = ENTITY_STORAGES.make_instance(config.init_entity_path) + # self.init_entity_offsets: Dict[str, List[str]] = {} + # self.init_entity_counts: Dict[str, List[int]] = {} + # for entity, econf in config.entities.items(): + # self.init_entity_offsets[entity] = [] + # self.init_entity_counts[entity] = [] + # for part in range(econf.num_partitions): + # self.init_entity_offsets[entity].\ + # append(init_entity_storage.load_names(entity, part)) + # self.init_entity_counts[entity].\ + # append(init_entity_storage.load_count(entity, part)) + # else: + # self.init_entity_offsets = None + # self.init_entity_counts = None # load model from checkpoint or loadpath, if available state_dict, optim_state = checkpoint_manager.maybe_read_model() @@ -498,34 +507,20 @@ def __init__( # noqa model.load_state_dict(state_dict, strict=False) if optim_state is not None: trainer.model_optimizer.load_state_dict(optim_state) - + # Nullify the loadpath manager after reading model + # if the initial and current entity counts are different + if config.init_entity_path is not None: + self.loadpath_manager = None logger.debug("Loading unpartitioned entities...") for entity in holder.lhs_unpartitioned_types | holder.rhs_unpartitioned_types: - if self.init_entity_offsets is not None: - count = self.init_entity_counts[entity][0] - else: - count = entity_counts[entity][0] + # if self.init_entity_offsets is not None: + # count = self.init_entity_counts[entity][0] + # else: + count = entity_counts[entity][0] s = embedding_storage_freelist[entity].pop() dimension = config.entity_dimension(entity) embs = torch.FloatTensor(s).view(-1, dimension)[:count] embs, optimizer = self._load_embeddings(entity, UNPARTITIONED, out=embs) - """ - Enlarging the embeddings from previous run. The function takes in - the trained N old embeddings from init_path defined, and enlarge it to - N + M embeddings, with M be the number of new entities joining the training, - and initialize the M embeddings with random numbers - """ - if self.init_entity_offsets: - count = self.entity_counts[entity][0] - new_embs = torch.rand((count, dimension)) - new_names = self.entity_storage.load_names(entity, part) - init_subset = [new_names.index(name) - for name in self.init_entity_offsets[entity][part]] - # Initialize embeddings from previous checkpoint - new_embs[init_subset, :] = embs.clone() - # Test case 1: Whether the embeddings are correctly mapped into the new embeddings - assert torch.equal(new_embs[init_subset[0], :], embs[0]) - embs = new_embs holder.unpartitioned_embeddings[entity] = embs trainer.unpartitioned_optimizers[entity] = optimizer @@ -808,32 +803,42 @@ def _load_embeddings( N + M embeddings, with M be the number of new entities joining the training, and initialize the M embeddings with random numbers """ - if self.init_entity_offsets is not None: - logger.debug(f"Enlarging from pretrained embeddings of entity {entity} in partition {part}") - count = self.entity_counts[entity][part] - dimension = self.config.entity_dimension(entity) - new_embs = torch.rand((count, dimension)) - # Initialize an (N + M) X (emb_dim) enlarged embeddings storage - init_names: Set = set(self.init_entity_offsets[entity][part]) - new_names: List = self.entity_storage.load_names(entity, part) - subset_idxs = {name: j for (j, name) in enumerate(init_names)} - - for i, new_name in enumerate(new_names): - if new_name in init_names: - subset_idxs[new_name] = i - - if (i + 1) % 1000000 == 0: - logger.debug(f"Mapped {i+1} entities...") - - subset_idxs = list(subset_idxs.values()) - new_embs[subset_idxs, :] = embs.detach().clone() - - # Test case 1: Whether the embeddings are correctly mapped into the new embeddings - assert torch.equal(new_embs[subset_idxs, :], embs) - - embs = new_embs - optim_state = None - logger.debug(f"Loaded {entity} embeddings of shape {embs.shape}") + # if self.init_entity_offsets is not None: + # logger.debug(f"Enlarging from pretrained embeddings of entity {entity} in partition {part}") + # + # count = self.entity_counts[entity][part] + # dimension = self.config.entity_dimension(entity) + # + # new_embs = torch.FloatTensor(s).view(-1, dimension)[:count] + # new_embs = torch.randn(new_embs.shape) + # logger.debug(f"Loaded old {entity} embeddings of shape {embs.shape}") + # logger.debug(f"Loading {entity} embeddings of shape {new_embs.shape}") + # # Initialize an (N + M) X (emb_dim) enlarged embeddings storage + # init_names: Set = set(self.init_entity_offsets[entity][part]) + # new_names: List = self.entity_storage.load_names(entity, part) + # subset_idxs = {name: j for (j, name) in enumerate(init_names)} + # + # # Map the indices of initial names to match the offsets in the current enlarged sets + # for i, new_name in enumerate(new_names): + # if new_name in init_names: + # subset_idxs[new_name] = i + # + # if (i + 1) % 1000000 == 0: + # logger.debug(f"Mapped {i+1} entities...") + # + # subset_idxs = list(subset_idxs.values()) + # # Enlarged embeddings with the offsets obtained from previous training + # # Initialize new embeddings with random numbers + # old_embs = embs.clone() + # new_embs[subset_idxs, :] = embs.clone() + # + # # Test case 1: Whether the embeddings are correctly mapped into the new embeddings + # logger.debug(f"New embs at index {0} \n {new_embs[0, :]}") + # # logger.debug(f"New embs {old_embs[0]}") + # assert torch.equal(new_embs[subset_idxs, :], old_embs) + # + # embs = new_embs + # optim_state = None embs = torch.nn.Parameter(embs) optimizer = make_optimizer(self.config, [embs], True) @@ -886,12 +891,11 @@ def _swap_partitioned_embeddings( for entity, part in new_parts - old_parts: logger.debug(f"Loading ({entity} {part})") force_dirty = self.bucket_scheduler.check_and_set_dirty(entity, part) - # There are trained embeddings already, updating the - # offsets here - if self.init_entity_offsets is not None: - count = self.init_entity_counts[entity][part] - else: - count = self.entity_counts[entity][part] + # # There are trained embeddings already, updating the offsets here + # if self.init_entity_offsets is not None: + # count = self.init_entity_counts[entity][part] + # else: + count = self.entity_counts[entity][part] s = self.embedding_storage_freelist[entity].pop() dimension = self.config.entity_dimension(entity) embs = torch.FloatTensor(s).view(-1, dimension)[:count] @@ -904,10 +908,8 @@ def _swap_partitioned_embeddings( io_bytes += embs.numel() * embs.element_size() # ignore optim state # Load the pretrained embeddings only once - # TODO: Temporary solution, refactor later self.init_entity_offsets = None - assert new_parts == holder.partitioned_embeddings.keys() return io_bytes From c337299b913e23858c9652846f969ed4f68a75a8 Mon Sep 17 00:00:00 2001 From: howardchanth Date: Mon, 28 Jun 2021 18:22:26 +0800 Subject: [PATCH 08/12] Delete update_plan.txt --- update_plan.txt | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 update_plan.txt diff --git a/update_plan.txt b/update_plan.txt deleted file mode 100644 index f3cce3af..00000000 --- a/update_plan.txt +++ /dev/null @@ -1,5 +0,0 @@ -1. Load entity mapping from previously pbg_input names -2. Append new entities to the end of dictionary -3. modify load_entities_by_type to preserve the previous mapping -4. Enlarge the embeddings before training - add script to train_cpu.py - -> Read the count files from newly partitioned data \ No newline at end of file From b5969cd388759571a15d7eb469210e3096ca328c Mon Sep 17 00:00:00 2001 From: howardchanth Date: Mon, 28 Jun 2021 23:17:11 +0800 Subject: [PATCH 09/12] * Bug fixed; recurrent training implemented --- torchbiggraph/checkpoint_manager.py | 15 +++++-- torchbiggraph/train_cpu.py | 62 ----------------------------- 2 files changed, 11 insertions(+), 66 deletions(-) diff --git a/torchbiggraph/checkpoint_manager.py b/torchbiggraph/checkpoint_manager.py index e8ea5767..cfc67e7a 100644 --- a/torchbiggraph/checkpoint_manager.py +++ b/torchbiggraph/checkpoint_manager.py @@ -470,7 +470,7 @@ def enlarge( @return: None """ logger.debug(f"Enlarging checkpoint from {config.init_path} to {config.checkpoint_path}") - + # TODO init_entity_offsets: Dict[str, List[str]] = {} init_entity_counts: Dict[str, List[int]] = {} init_checkpoint_storage: AbstractCheckpointStorage = CHECKPOINT_STORAGES.make_instance(config.init_path) @@ -502,18 +502,25 @@ def enlarge( logger.debug(f"Old embeddings {entity}{embs[0]}") # Initialize an (N + M) X (emb_dim) enlarged embeddings storage - init_names: Set = set(init_entity_offsets[entity][part]) + init_names: Dict = {j: init_name for (j, init_name) in enumerate(init_entity_offsets[entity][part])} new_names: List = entity_storage.load_names(entity, part) - subset_idxs = {name: j for (j, name) in enumerate(init_names)} + subset_idxs = {name: None for (_, name) in init_names.items()} + + logger.debug(f"{list(init_names.values())[:100]}") + logger.debug(f"{new_names[:100]}") + + init_name_set = set(init_names.values()) for i, new_name in enumerate(new_names): - if new_name in init_names: + if new_name in init_name_set: subset_idxs[new_name] = i if (i + 1) % 1000000 == 0: logger.debug(f"Mapped {i + 1} entities...") subset_idxs = list(subset_idxs.values()) + logger.debug(f"{subset_idxs[:100]}") + # Enlarged embeddings with the offsets obtained from previous training # Initialize new embeddings with random numbers old_embs = embs.clone() diff --git a/torchbiggraph/train_cpu.py b/torchbiggraph/train_cpu.py index 50a85842..7b953338 100644 --- a/torchbiggraph/train_cpu.py +++ b/torchbiggraph/train_cpu.py @@ -482,23 +482,6 @@ def __init__( # noqa else: self.loadpath_manager = None - # # Load previously partitioned entities and their offsets - # if config.init_entity_path is not None: - # init_entity_storage = ENTITY_STORAGES.make_instance(config.init_entity_path) - # self.init_entity_offsets: Dict[str, List[str]] = {} - # self.init_entity_counts: Dict[str, List[int]] = {} - # for entity, econf in config.entities.items(): - # self.init_entity_offsets[entity] = [] - # self.init_entity_counts[entity] = [] - # for part in range(econf.num_partitions): - # self.init_entity_offsets[entity].\ - # append(init_entity_storage.load_names(entity, part)) - # self.init_entity_counts[entity].\ - # append(init_entity_storage.load_count(entity, part)) - # else: - # self.init_entity_offsets = None - # self.init_entity_counts = None - # load model from checkpoint or loadpath, if available state_dict, optim_state = checkpoint_manager.maybe_read_model() if state_dict is None and self.loadpath_manager is not None: @@ -797,48 +780,6 @@ def _load_embeddings( fast_approx_rand(embs) embs.mul_(self.config.init_scale) optim_state = None - """ - Enlarging the embeddings from previously trained embeddings. We take in - the trained N old embeddings from init_path defined, and enlarge it to - N + M embeddings, with M be the number of new entities joining the training, - and initialize the M embeddings with random numbers - """ - # if self.init_entity_offsets is not None: - # logger.debug(f"Enlarging from pretrained embeddings of entity {entity} in partition {part}") - # - # count = self.entity_counts[entity][part] - # dimension = self.config.entity_dimension(entity) - # - # new_embs = torch.FloatTensor(s).view(-1, dimension)[:count] - # new_embs = torch.randn(new_embs.shape) - # logger.debug(f"Loaded old {entity} embeddings of shape {embs.shape}") - # logger.debug(f"Loading {entity} embeddings of shape {new_embs.shape}") - # # Initialize an (N + M) X (emb_dim) enlarged embeddings storage - # init_names: Set = set(self.init_entity_offsets[entity][part]) - # new_names: List = self.entity_storage.load_names(entity, part) - # subset_idxs = {name: j for (j, name) in enumerate(init_names)} - # - # # Map the indices of initial names to match the offsets in the current enlarged sets - # for i, new_name in enumerate(new_names): - # if new_name in init_names: - # subset_idxs[new_name] = i - # - # if (i + 1) % 1000000 == 0: - # logger.debug(f"Mapped {i+1} entities...") - # - # subset_idxs = list(subset_idxs.values()) - # # Enlarged embeddings with the offsets obtained from previous training - # # Initialize new embeddings with random numbers - # old_embs = embs.clone() - # new_embs[subset_idxs, :] = embs.clone() - # - # # Test case 1: Whether the embeddings are correctly mapped into the new embeddings - # logger.debug(f"New embs at index {0} \n {new_embs[0, :]}") - # # logger.debug(f"New embs {old_embs[0]}") - # assert torch.equal(new_embs[subset_idxs, :], old_embs) - # - # embs = new_embs - # optim_state = None embs = torch.nn.Parameter(embs) optimizer = make_optimizer(self.config, [embs], True) @@ -907,9 +848,6 @@ def _swap_partitioned_embeddings( self.trainer.partitioned_optimizers[entity, part] = optimizer io_bytes += embs.numel() * embs.element_size() # ignore optim state - # Load the pretrained embeddings only once - self.init_entity_offsets = None - assert new_parts == holder.partitioned_embeddings.keys() return io_bytes From b44f6c361bc3078a0f8b2429cb5b214dd65df1b6 Mon Sep 17 00:00:00 2001 From: howardchanth Date: Tue, 3 Aug 2021 10:50:25 +0800 Subject: [PATCH 10/12] * Bug fixed; recurrent training implemented --- torchbiggraph/checkpoint_manager.py | 27 +++--- torchbiggraph/converters/importers.py | 30 +++++-- torchbiggraph/train_cpu.py | 122 +++++++++++++------------- 3 files changed, 94 insertions(+), 85 deletions(-) diff --git a/torchbiggraph/checkpoint_manager.py b/torchbiggraph/checkpoint_manager.py index cfc67e7a..e1c1d87a 100644 --- a/torchbiggraph/checkpoint_manager.py +++ b/torchbiggraph/checkpoint_manager.py @@ -417,8 +417,9 @@ def switch_to_new_version(self) -> None: def remove_old_version(self, config: ConfigSchema) -> None: old_version = self.checkpoint_version - 1 - # We never create a v0 checkpoint, so if there is one we leave it there. - if old_version == 0: + # We almost never create a v0 checkpoint, so if there is one we leave it there. + # Checkpoint of version 0 will be created in incremental training, so need to remove it + if old_version == 0 and config.init_entity_path is None: return for entity, econf in config.entities.items(): for part in range(self.rank, econf.num_partitions, self.num_machines): @@ -470,7 +471,11 @@ def enlarge( @return: None """ logger.debug(f"Enlarging checkpoint from {config.init_path} to {config.checkpoint_path}") - # TODO + # Checkpoint exist, not going to enlarge + if self.checkpoint_version > 0: + logger.info(f"Checkpoint with version {self.checkpoint_version} found at {config.checkpoint_path}" + f", not enlarging") + return init_entity_offsets: Dict[str, List[str]] = {} init_entity_counts: Dict[str, List[int]] = {} init_checkpoint_storage: AbstractCheckpointStorage = CHECKPOINT_STORAGES.make_instance(config.init_path) @@ -497,29 +502,20 @@ def enlarge( new_embs = torch.randn((new_count, dimension)) - logger.debug(f"Loaded old {entity} embeddings of shape {embs.shape}") logger.debug(f"Loading {entity} embeddings of shape {new_embs.shape}") - logger.debug(f"Old embeddings {entity}{embs[0]}") # Initialize an (N + M) X (emb_dim) enlarged embeddings storage init_names: Dict = {j: init_name for (j, init_name) in enumerate(init_entity_offsets[entity][part])} new_names: List = entity_storage.load_names(entity, part) subset_idxs = {name: None for (_, name) in init_names.items()} - logger.debug(f"{list(init_names.values())[:100]}") - logger.debug(f"{new_names[:100]}") - - init_name_set = set(init_names.values()) + init_names_set = set(init_names.values()) for i, new_name in enumerate(new_names): - if new_name in init_name_set: + if new_name in init_names_set: subset_idxs[new_name] = i - if (i + 1) % 1000000 == 0: - logger.debug(f"Mapped {i + 1} entities...") - subset_idxs = list(subset_idxs.values()) - logger.debug(f"{subset_idxs[:100]}") # Enlarged embeddings with the offsets obtained from previous training # Initialize new embeddings with random numbers @@ -527,13 +523,12 @@ def enlarge( new_embs[subset_idxs, :] = embs.clone() # Test case 1: Whether the embeddings are correctly mapped into the new embeddings - logger.debug(f"New embs at index {0} \n {new_embs[0, :]}") - assert torch.equal(new_embs[subset_idxs, :], old_embs) embs = new_embs optim_state = None + # Save the previous embeddings as the first version (v0) self.storage.save_entity_partition(0, entity, part, embs, optim_state, metadata) def close(self) -> None: diff --git a/torchbiggraph/converters/importers.py b/torchbiggraph/converters/importers.py index b8a3ee7b..984bba42 100644 --- a/torchbiggraph/converters/importers.py +++ b/torchbiggraph/converters/importers.py @@ -86,15 +86,29 @@ def read(self, path: Path): "'pip install parquet'" ) - with path.open("rb") as tf: - columns = [self.lhs_col, self.rhs_col] - if self.rel_col is not None: - columns.append(self.rel_col) - for row in parquet.reader(tf, columns=columns): + if path.is_dir(): + files = [p for p in path.glob('*.parquet')] + random.shuffle(files) + for pq in files: + with pq.open("rb") as tf: + columns = [self.lhs_col, self.rhs_col] + if self.rel_col is not None: + columns.append(self.rel_col) + for row in parquet.reader(tf, columns=columns): + if self.rel_col is not None: + yield row + else: + yield row[0], row[1], None + else: + with path.open("rb") as tf: + columns = [self.lhs_col, self.rhs_col] if self.rel_col is not None: - yield row - else: - yield row[0], row[1], None + columns.append(self.rel_col) + for row in parquet.reader(tf, columns=columns): + if self.rel_col is not None: + yield row + else: + yield row[0], row[1], None def collect_relation_types( diff --git a/torchbiggraph/train_cpu.py b/torchbiggraph/train_cpu.py index 7b953338..66d7165d 100644 --- a/torchbiggraph/train_cpu.py +++ b/torchbiggraph/train_cpu.py @@ -65,17 +65,16 @@ tag_logs_with_process_name, ) - logger = logging.getLogger("torchbiggraph") dist_logger = logging.LoggerAdapter(logger, {"distributed": True}) class Trainer(AbstractBatchProcessor): def __init__( - self, - model_optimizer: Optimizer, - loss_fn: AbstractLossFunction, - relation_weights: List[float], + self, + model_optimizer: Optimizer, + loss_fn: AbstractLossFunction, + relation_weights: List[float], ) -> None: super().__init__(loss_fn, relation_weights) self.model_optimizer = model_optimizer @@ -83,7 +82,7 @@ def __init__( self.partitioned_optimizers: Dict[Tuple[EntityName, Partition], Optimizer] = {} def _process_one_batch( - self, model: MultiRelationEmbedder, batch_edges: EdgeList + self, model: MultiRelationEmbedder, batch_edges: EdgeList ) -> Stats: model.zero_grad() @@ -113,12 +112,12 @@ def _process_one_batch( class IterationManager(MetadataProvider): def __init__( - self, - num_epochs: int, - edge_paths: List[str], - num_edge_chunks: int, - *, - iteration_idx: int = 0, + self, + num_epochs: int, + edge_paths: List[str], + num_edge_chunks: int, + *, + iteration_idx: int = 0, ) -> None: self.num_epochs = num_epochs self.edge_paths = edge_paths @@ -171,7 +170,7 @@ def __add__(self, delta: int) -> "IterationManager": def should_preserve_old_checkpoint( - iteration_manager: IterationManager, interval: Optional[int] + iteration_manager: IterationManager, interval: Optional[int] ) -> bool: """Whether the checkpoint consumed by the current iteration should be kept @@ -212,7 +211,7 @@ def get_num_edge_chunks(config: ConfigSchema) -> int: def make_optimizer( - config: ConfigSchema, params: Iterable[torch.nn.Parameter], is_emb: bool + config: ConfigSchema, params: Iterable[torch.nn.Parameter], is_emb: bool ) -> Optimizer: params = list(params) if len(params) == 0: @@ -234,14 +233,14 @@ def make_optimizer( class TrainingCoordinator: def __init__( # noqa - self, - config: ConfigSchema, - model: Optional[MultiRelationEmbedder] = None, - trainer: Optional[AbstractBatchProcessor] = None, - evaluator: Optional[AbstractBatchProcessor] = None, - rank: Rank = SINGLE_TRAINER, - subprocess_init: Optional[Callable[[], None]] = None, - stats_handler: StatsHandler = NOOP_STATS_HANDLER, + self, + config: ConfigSchema, + model: Optional[MultiRelationEmbedder] = None, + trainer: Optional[AbstractBatchProcessor] = None, + evaluator: Optional[AbstractBatchProcessor] = None, + rank: Rank = SINGLE_TRAINER, + subprocess_init: Optional[Callable[[], None]] = None, + stats_handler: StatsHandler = NOOP_STATS_HANDLER, ): """Each epoch/pass, for each partition pair, loads in embeddings and edgelist from disk, runs HOGWILD training on them, and writes partitions back to disk. @@ -284,17 +283,17 @@ def __init__( # noqa num_sides = 1 else: num_sides = ( - (1 if entity_type in holder.lhs_partitioned_types else 0) - + (1 if entity_type in holder.rhs_partitioned_types else 0) - + ( - 1 - if entity_type - in ( - holder.lhs_unpartitioned_types - | holder.rhs_unpartitioned_types + (1 if entity_type in holder.lhs_partitioned_types else 0) + + (1 if entity_type in holder.rhs_partitioned_types else 0) + + ( + 1 + if entity_type + in ( + holder.lhs_unpartitioned_types + | holder.rhs_unpartitioned_types + ) + else 0 ) - else 0 - ) ) for _ in range(num_sides): embedding_storage_freelist[entity_type].add( @@ -320,8 +319,8 @@ def __init__( # noqa num_ps_groups = config.num_groups_for_partition_server groups: List[List[int]] = [ranks.trainers] # barrier group groups += [ - ranks.trainers + ranks.partition_servers - ] * num_ps_groups # ps groups + ranks.trainers + ranks.partition_servers + ] * num_ps_groups # ps groups group_idxs_for_partition_servers = range(1, len(groups)) if rank == SINGLE_TRAINER: @@ -474,11 +473,12 @@ def __init__( # noqa # Enlarge the embeddings if there is a change in entity counts # Checkpoint will be enlarge from init_path to new checkpoint path init_entity_storage = ENTITY_STORAGES.make_instance(config.init_entity_path) - self.checkpoint_manager.enlarge(config, - init_entity_storage, - self.entity_storage, - entity_counts - ) + self.checkpoint_manager.enlarge( + config, + init_entity_storage, + self.entity_storage, + entity_counts + ) else: self.loadpath_manager = None @@ -620,9 +620,9 @@ def train(self) -> None: self.model.set_all_embeddings(holder, cur_b) current_index = ( - (iteration_manager.iteration_idx + 1) * total_buckets - - remaining - - 1 + (iteration_manager.iteration_idx + 1) * total_buckets + - remaining + - 1 ) bucket_logger.debug("Loading edges") @@ -753,12 +753,12 @@ def _barrier(self) -> None: td.barrier(group=self.barrier_group) def _load_embeddings( - self, - entity: EntityName, - part: Partition, - out: FloatTensorType, - strict: bool = False, - force_dirty: bool = False, + self, + entity: EntityName, + part: Partition, + out: FloatTensorType, + strict: bool = False, + force_dirty: bool = False, ) -> Tuple[torch.nn.Parameter, Optimizer]: if strict: embs, optim_state = self.checkpoint_manager.read( @@ -788,10 +788,10 @@ def _load_embeddings( return embs, optimizer def _swap_partitioned_embeddings( - self, - old_b: Optional[Bucket], - new_b: Optional[Bucket], - old_stats: Optional[BucketStats], + self, + old_b: Optional[Bucket], + new_b: Optional[Bucket], + old_stats: Optional[BucketStats], ) -> int: io_bytes = 0 logger.info(f"Swapping partitioned embeddings {old_b} {new_b}") @@ -880,8 +880,8 @@ def _coordinate_train(self, edges, eval_edge_idxs, epoch_idx) -> Stats: else 0, ) for rank, s in enumerate( - split_almost_equally(edge_perm.size(0), num_parts=self.num_workers) - ) + split_almost_equally(edge_perm.size(0), num_parts=self.num_workers) + ) ], ) all_stats = get_async_result(future_all_stats, self.pool) @@ -905,8 +905,8 @@ def _coordinate_eval(self, edges, eval_edge_idxs) -> Optional[Stats]: indices=eval_edge_idxs[s], ) for s in split_almost_equally( - eval_edge_idxs.size(0), num_parts=self.num_workers - ) + eval_edge_idxs.size(0), num_parts=self.num_workers + ) ], ) all_eval_stats = get_async_result(future_all_eval_stats, self.pool) @@ -915,11 +915,11 @@ def _coordinate_eval(self, edges, eval_edge_idxs) -> Optional[Stats]: return None def _maybe_write_checkpoint( - self, - epoch_idx: int, - edge_path_idx: int, - edge_chunk_idx: int, - current_index: int, + self, + epoch_idx: int, + edge_path_idx: int, + edge_chunk_idx: int, + current_index: int, ) -> None: config = self.config From b9e2cf6eafba744dfbeb7d6ad432c310d937e171 Mon Sep 17 00:00:00 2001 From: howardchanth Date: Wed, 4 Aug 2021 16:00:11 +0800 Subject: [PATCH 11/12] * bug fixes on recurrent training --- torchbiggraph/checkpoint_manager.py | 15 +++++++++------ torchbiggraph/train_gpu.py | 4 ++++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/torchbiggraph/checkpoint_manager.py b/torchbiggraph/checkpoint_manager.py index e1c1d87a..d09ef9ca 100644 --- a/torchbiggraph/checkpoint_manager.py +++ b/torchbiggraph/checkpoint_manager.py @@ -505,22 +505,25 @@ def enlarge( logger.debug(f"Loading {entity} embeddings of shape {new_embs.shape}") # Initialize an (N + M) X (emb_dim) enlarged embeddings storage - init_names: Dict = {j: init_name for (j, init_name) in enumerate(init_entity_offsets[entity][part])} + init_names: Dict = {init_name: j for (j, init_name) in enumerate(init_entity_offsets[entity][part])} new_names: List = entity_storage.load_names(entity, part) - subset_idxs = {name: None for (_, name) in init_names.items()} + subset_idxs = {name: None for name in init_names.keys()} + old_subset_idxs = {name: None for name in init_names.keys()} - init_names_set = set(init_names.values()) + init_names_set = set(init_names.keys()) for i, new_name in enumerate(new_names): if new_name in init_names_set: subset_idxs[new_name] = i + old_subset_idxs[new_name] = init_names[new_name] - subset_idxs = list(subset_idxs.values()) + subset_idxs = [v for v in subset_idxs.values() if v is not None] + old_subset_idxs = [v for v in old_subset_idxs.values() if v is not None] # Enlarged embeddings with the offsets obtained from previous training # Initialize new embeddings with random numbers - old_embs = embs.clone() - new_embs[subset_idxs, :] = embs.clone() + old_embs = embs[old_subset_idxs].clone() + new_embs[subset_idxs, :] = embs[old_subset_idxs].clone() # Test case 1: Whether the embeddings are correctly mapped into the new embeddings assert torch.equal(new_embs[subset_idxs, :], old_embs) diff --git a/torchbiggraph/train_gpu.py b/torchbiggraph/train_gpu.py index aab31b15..571bd97c 100644 --- a/torchbiggraph/train_gpu.py +++ b/torchbiggraph/train_gpu.py @@ -509,6 +509,10 @@ def _coordinate_train(self, edges, eval_edge_idxs, epoch_idx) -> Stats: f"Time spent removing eval edges: {tk.stop('remove_eval'):.4f} s" ) + # edges_lhs = edges_lhs[torch.randperm(edges_lhs.size()[0])] + # edges_rhs = edges_rhs[torch.randperm(edges_rhs.size()[0])] + # edges_rel = edges_rel[torch.randperm(edges_rel.size()[0])] + bucket_logger.debug("Splitting edges into sub-buckets") tk.start("mapping_edges") # randomly permute the entities, to get a random subbucketing From 38feaa7d6ffe037fe8c6a4ca44333a16574aa7ca Mon Sep 17 00:00:00 2001 From: howardchanth Date: Wed, 11 Aug 2021 11:30:31 +0800 Subject: [PATCH 12/12] fixed overfitting to sub-buckets in GPU Training --- torchbiggraph/train_gpu.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/torchbiggraph/train_gpu.py b/torchbiggraph/train_gpu.py index 571bd97c..0f35a0c6 100644 --- a/torchbiggraph/train_gpu.py +++ b/torchbiggraph/train_gpu.py @@ -495,13 +495,25 @@ def _coordinate_train(self, edges, eval_edge_idxs, epoch_idx) -> Stats: edges_lhs = edges.lhs.tensor edges_rhs = edges.rhs.tensor edges_rel = edges.rel + eval_edges_lhs = None + eval_edges_rhs = None + eval_edges_rel = None + full_edges_lhs = None + full_edges_rhs = None + full_edges_rel = None if eval_edge_idxs is not None: bucket_logger.debug("Removing eval edges") tk.start("remove_eval") num_eval_edges = len(eval_edge_idxs) + eval_edges_lhs = edges_lhs[eval_edge_idxs] + eval_edges_rhs = edges_rhs[eval_edge_idxs] + eval_edges_rel = edges_rel[eval_edge_idxs] edges_lhs[eval_edge_idxs] = edges_lhs[-num_eval_edges:].clone() edges_rhs[eval_edge_idxs] = edges_rhs[-num_eval_edges:].clone() edges_rel[eval_edge_idxs] = edges_rel[-num_eval_edges:].clone() + full_edges_lhs = edges_lhs + full_edges_rhs = edges_rhs + full_edges_rel = edges_rel edges_lhs = edges_lhs[:-num_eval_edges] edges_rhs = edges_rhs[:-num_eval_edges] edges_rel = edges_rel[:-num_eval_edges] @@ -509,10 +521,6 @@ def _coordinate_train(self, edges, eval_edge_idxs, epoch_idx) -> Stats: f"Time spent removing eval edges: {tk.stop('remove_eval'):.4f} s" ) - # edges_lhs = edges_lhs[torch.randperm(edges_lhs.size()[0])] - # edges_rhs = edges_rhs[torch.randperm(edges_rhs.size()[0])] - # edges_rel = edges_rel[torch.randperm(edges_rel.size()[0])] - bucket_logger.debug("Splitting edges into sub-buckets") tk.start("mapping_edges") # randomly permute the entities, to get a random subbucketing @@ -657,6 +665,11 @@ def schedule(gpu_idx: GPURank) -> None: f"Time spent mapping embeddings back from sub-buckets: {tk.stop('rev_perm'):.4f} s" ) + if eval_edge_idxs is not None: + full_edges_lhs[eval_edge_idxs] = eval_edges_lhs + full_edges_rhs[eval_edge_idxs] = eval_edges_rhs + full_edges_rel[eval_edge_idxs] = eval_edges_rel + logger.debug( f"_coordinate_train: Time unaccounted for: {tk.unaccounted():.4f} s" )