From 7304119ce1f5804943a2e5ef65a98288f7d79136 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sat, 9 Nov 2024 11:47:52 -0500 Subject: [PATCH 01/30] fix GPTMemmapDataset --- fast_llm/data/gpt/memmap.py | 2 +- setup.cfg | 1 + tests/test_memmap_dataset.py | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 tests/test_memmap_dataset.py diff --git a/fast_llm/data/gpt/memmap.py b/fast_llm/data/gpt/memmap.py index b49bb9a5..0ff4857b 100644 --- a/fast_llm/data/gpt/memmap.py +++ b/fast_llm/data/gpt/memmap.py @@ -106,7 +106,7 @@ def write_dataset(cls, prefix: pathlib.Path | str, documents: list[np.ndarray]): dtype = documents[0].dtype num_documents = len(documents) lengths = np.array([len(document) for document in documents], dtype=np.int32) - pointers = padded_cumsum(lengths[:-1].astype(np.int64) * 2) + pointers = padded_cumsum(lengths[:-1].astype(np.int64)) * np.dtype(dtype).itemsize prefix.parent.mkdir(parents=True, exist_ok=True) with prefix.with_suffix(".idx").open("wb") as stream: stream.write(cls._INDEX_HEADER) diff --git a/setup.cfg b/setup.cfg index a353151c..d45144a5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -42,6 +42,7 @@ OPTIONAL = DEV = pytest>=8.3.2 pytest-depends>=1.0.1 + hypothesis>=6.118.1 # Required for building the documentation DOCS = diff --git a/tests/test_memmap_dataset.py b/tests/test_memmap_dataset.py new file mode 100644 index 00000000..af7153ca --- /dev/null +++ b/tests/test_memmap_dataset.py @@ -0,0 +1,33 @@ +from hypothesis import given, strategies as st +from hypothesis.extra import numpy as npst +import numpy as np +from tempfile import TemporaryDirectory +from pathlib import Path +from fast_llm.data.gpt.memmap import GPTMemmapDataset + +def dtype_arrays(dtype: type[np.integer], min_size: int=1, max_size: int=100) -> st.SearchStrategy: + return st.lists( + npst.arrays( + dtype=dtype, + shape=st.integers(1, 1000), + elements=st.integers( + min_value=np.iinfo(dtype).min, + max_value=np.iinfo(dtype).max, + ), + ), + min_size=min_size, + max_size=max_size, + ) + +for dtype in [np.int8, np.uint16, np.int16, np.int32, np.int64]: + @given(arrays=dtype_arrays(dtype)) + def test_gpt_memmap_dataset(arrays: list[np.ndarray]): + run_gpt_memmap_dataset_test(documents=arrays) + +def run_gpt_memmap_dataset_test(documents: list[np.ndarray]) -> None: + with TemporaryDirectory() as temp_dir: + prefix = Path(temp_dir) + GPTMemmapDataset.write_dataset(prefix=prefix, documents=documents) + dataset = GPTMemmapDataset(name="foo", prefix=prefix) + for i, document in enumerate(documents): + assert np.array_equal(dataset.get(i), document), f"Mismatch at index {i}" From 47d453b6b7482c63e5469b26dc5bb990ea736a42 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sat, 9 Nov 2024 12:03:14 -0500 Subject: [PATCH 02/30] fix GPTMemmapDataset --- tests/test_memmap_dataset.py | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/tests/test_memmap_dataset.py b/tests/test_memmap_dataset.py index af7153ca..49ca44d1 100644 --- a/tests/test_memmap_dataset.py +++ b/tests/test_memmap_dataset.py @@ -4,30 +4,24 @@ from tempfile import TemporaryDirectory from pathlib import Path from fast_llm.data.gpt.memmap import GPTMemmapDataset +import pytest -def dtype_arrays(dtype: type[np.integer], min_size: int=1, max_size: int=100) -> st.SearchStrategy: +def dtype_arrays(dtype: np.dtype, min_size: int=1, max_size: int=100) -> st.SearchStrategy: return st.lists( - npst.arrays( - dtype=dtype, - shape=st.integers(1, 1000), - elements=st.integers( - min_value=np.iinfo(dtype).min, - max_value=np.iinfo(dtype).max, - ), - ), + npst.arrays(dtype=dtype, shape=st.integers(1, 1000)), min_size=min_size, max_size=max_size, ) -for dtype in [np.int8, np.uint16, np.int16, np.int32, np.int64]: - @given(arrays=dtype_arrays(dtype)) - def test_gpt_memmap_dataset(arrays: list[np.ndarray]): - run_gpt_memmap_dataset_test(documents=arrays) - -def run_gpt_memmap_dataset_test(documents: list[np.ndarray]) -> None: - with TemporaryDirectory() as temp_dir: - prefix = Path(temp_dir) - GPTMemmapDataset.write_dataset(prefix=prefix, documents=documents) - dataset = GPTMemmapDataset(name="foo", prefix=prefix) - for i, document in enumerate(documents): - assert np.array_equal(dataset.get(i), document), f"Mismatch at index {i}" +@pytest.mark.parametrize("dtype", GPTMemmapDataset._DTYPES.values()) +def test_gpt_memmap_dataset(dtype): + @given(documents=dtype_arrays(dtype)) + def inner_test(documents): + with TemporaryDirectory() as temp_dir: + prefix = Path(temp_dir) + GPTMemmapDataset.write_dataset(prefix=prefix, documents=documents) + dataset = GPTMemmapDataset(name="foo", prefix=prefix) + for i, document in enumerate(documents): + assert np.array_equal(dataset.get(i), document, equal_nan=True), f"Mismatch at index {i}" + + inner_test() From bef3a723f8d014a2e24d3546198d18d54234a69f Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sat, 9 Nov 2024 23:00:47 -0500 Subject: [PATCH 03/30] add prepare-dataset command --- fast_llm/data/config.py | 1 - fast_llm/data/tokenizer.py | 2 +- fast_llm/tools/cli.py | 4 +- fast_llm/tools/prepare_dataset.py | 345 ++++++++++++++++++++++++++++++ fast_llm/utils.py | 8 +- setup.cfg | 3 + 6 files changed, 358 insertions(+), 5 deletions(-) create mode 100644 fast_llm/tools/prepare_dataset.py diff --git a/fast_llm/data/config.py b/fast_llm/data/config.py index 836a6d17..5e829150 100644 --- a/fast_llm/data/config.py +++ b/fast_llm/data/config.py @@ -106,7 +106,6 @@ def _validate(self): class TokenizerConfig(Config): """ Configuration for the tokenizer. - Currently, the tokenizer is only needed for FIM. """ format: str = Field( diff --git a/fast_llm/data/tokenizer.py b/fast_llm/data/tokenizer.py index d75aab7f..2061d6b6 100644 --- a/fast_llm/data/tokenizer.py +++ b/fast_llm/data/tokenizer.py @@ -6,7 +6,7 @@ class Tokenizer: """ - A Huggingface (transformers) tokenizer. + A wrapper around Huggingface (transformers) tokenizer. """ def __init__(self, config: TokenizerConfig): diff --git a/fast_llm/tools/cli.py b/fast_llm/tools/cli.py index 7b338953..d3ac5e6d 100644 --- a/fast_llm/tools/cli.py +++ b/fast_llm/tools/cli.py @@ -15,13 +15,15 @@ def fast_llm(args=None): # (Pre-)configure logging configure_logging() parser = argparse.ArgumentParser(add_help=False) - parser.add_argument("subcommand", choices=["train", "convert"]) + parser.add_argument("subcommand", choices=["train", "convert", "prepare_dataset"]) parsed, unparsed = parser.parse_known_args(args) try: if parsed.subcommand == "train": from fast_llm.tools.train import CliTrainingConfig as Runnable elif parsed.subcommand == "convert": from fast_llm.tools.convert import ConversionConfig as Runnable + elif parsed.subcommand == "prepare_dataset": + from fast_llm.tools.prepare_dataset import PrepareDatasetConfig as Runnable else: raise RuntimeError("Unknown subcommand") Runnable.parse_and_run(unparsed) diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py new file mode 100644 index 00000000..80dc1426 --- /dev/null +++ b/fast_llm/tools/prepare_dataset.py @@ -0,0 +1,345 @@ +import abc +import argparse +import json +import os +import pathlib +import typing +from multiprocessing import Pool + +import numpy as np +import torch.distributed + +from fast_llm.config import Config, Field, FieldHint, check_field, config_class +from fast_llm.data.config import TokenizerConfig +from fast_llm.data.gpt.memmap import GPTMemmapDataset +from fast_llm.data.tokenizer import Tokenizer +from fast_llm.engine.config_utils.data_type import DataType +from fast_llm.engine.config_utils.runnable import RunnableConfig +from fast_llm.utils import Assert, Registry + + +@config_class +class DistributedConfig(Config): + default_world_size: typing.ClassVar[int] = int(os.environ.get("WORLD_SIZE", 1)) + default_rank: typing.ClassVar[int] = int(os.environ.get("RANK", 0)) + world_size: int = Field( + default=None, + desc="Size of the world group. Typically provided by torchrun or equivalent through the `WORLD_SIZE` environment variable.", + hint=FieldHint.expert, + valid=check_field(Assert.gt, 0), + ) + rank: int = Field( + default=None, + desc="Rank of the local process. Typically provided by torchrun or equivalent through the `RANK` environment variable.", + hint=FieldHint.expert, + valid=check_field(Assert.geq, 0), + ) + backend: str = Field( + default="gloo", + desc="Distributed backend to use.", + hint=FieldHint.optional, + valid=check_field(Assert.incl, torch.distributed.Backend.backend_list), + ) + + def _validate(self): + if self.world_size is None: + self.world_size = self.default_world_size + if self.rank is None: + self.rank = self.default_rank + super()._validate() + Assert.in_range(self.rank, 0, self.world_size) + + +@config_class() +class DatasetPreparatorConfig(RunnableConfig): + _abstract = True + model_name: typing.ClassVar[str] + + output_path: pathlib.Path = Field( + desc="Output directory for the processed dataset.", + hint=FieldHint.core, + ) + distributed: DistributedConfig = Field( + default_factory=DistributedConfig, + desc="Configuration for distributed processing.", + hint=FieldHint.feature, + ) + + @classmethod + def get_dataset_preparator_class(cls) -> typing.Type["DatasetPreparator"]: + raise NotImplementedError + + def _get_runnable(self, parsed: argparse.Namespace) -> typing.Callable[[], None]: + dataset_preparator = self.get_dataset_preparator_class()(config=self) + return dataset_preparator.run + + +class DatasetPreparator(abc.ABC): + _abstract = True + _config: DatasetPreparatorConfig + config_class: typing.ClassVar[type[DatasetPreparatorConfig]] = DatasetPreparatorConfig + + def __init__(self, config: DatasetPreparatorConfig) -> None: + Assert.custom(isinstance, config, self.config_class) + config.validate() + self._config = config + + def run(self) -> None: + raise NotImplementedError + + +@config_class +class GPTDatasetConfig(Config): + name_or_path: str = Field( + desc="Name or path of the dataset.", + hint=FieldHint.core, + ) + config_name: None | str = Field( + default=None, + desc="Specific configuration name for the dataset.", + hint=FieldHint.optional, + ) + split: str = Field( + default="train", + desc="Split of the dataset to use.", + hint=FieldHint.optional, + ) + field: str = Field( + default="text", + desc="Field of the dataset to use.", + hint=FieldHint.optional, + ) + data_type: DataType = Field( + default=None, + desc="Data type of the dataset field.", + hint=FieldHint.derived, + ) + trust_remote_code: bool = Field( + default=False, + desc="Trust remote code when downloading the dataset.", + hint=FieldHint.optional, + ) + disable_disk_space_check: bool = Field( + default=False, + desc="Disable disk space check. Useful for environments where disk space is not accurately reported.", + hint=FieldHint.optional, + ) + + +@config_class() +class GPTDatasetPreparatorConfig(DatasetPreparatorConfig): + _abstract = False + model_name: typing.ClassVar[str] = "gpt" + + tokens_per_shard: int = Field( + default=1_000_000_000, + desc="Approximate number of tokens per shard.", + hint=FieldHint.feature, + valid=check_field(Assert.geq, 100_000), + ) + loading_workers: int = Field( + default=1, + desc="Number of workers in load_dataset() call.", + hint=FieldHint.optional, + valid=check_field(Assert.geq, 1), + ) + tokenize_workers: int = Field( + default=1, + desc="Number of workers for tokenization.", + hint=FieldHint.optional, + valid=check_field(Assert.geq, 1), + ) + saving_workers: int = Field( + default=1, + desc="Number of processes for saving the data.", + hint=FieldHint.optional, + valid=check_field(Assert.geq, 1), + ) + dataset: GPTDatasetConfig = Field( + default_factory=GPTDatasetConfig, + desc="Configuration for the dataset.", + hint=FieldHint.feature, + ) + tokenizer: TokenizerConfig = Field( + default_factory=TokenizerConfig, + desc="Configuration for the tokenizer.", + hint=FieldHint.feature, + ) + _tokenizer: Tokenizer = Field( + init=False, + desc="The tokenizer instance.", + hint=FieldHint.derived, + ) + + def _validate(self): + Assert.not_none(self.tokenizer.path) + self._tokenizer = Tokenizer(config=self.tokenizer) + if self.dataset.data_type is None: + # Decide the datatype based on the tokenizer vocabulary size + vocab_size = self._tokenizer.vocab_size + if vocab_size <= np.iinfo(np.int16).max: + self.dataset.data_type = DataType.int16 + # elif vocab_size <= np.iinfo(np.uint16).max: + # self.dataset.data_type = DataType.uint16 # Not supported by Fast-LLM's DataType + elif vocab_size <= np.iinfo(np.int32).max: + self.dataset.data_type = DataType.int32 + else: + raise ValueError(f"Tokenizer vocabulary size {vocab_size} is too large. This is likely an error.") + super()._validate() + + @classmethod + def get_dataset_preparator_class(cls): + return GPTDatasetPreparator + + +class GPTDatasetPreparator(DatasetPreparator): + _abstract = False + _config: GPTDatasetPreparatorConfig + config_class = GPTDatasetPreparatorConfig + + def _tokenize_batch(self, batch): + input_ids = [ + np.array(self._tokenizer.tokenize(text), dtype=self.dataset.data_type.numpy) + for text in batch[self.dataset.field] + ] + num_tokens = [len(x) for x in input_ids] + return { + "input_ids": input_ids, + "num_tokens": num_tokens, + } + + def _save_shard(self, args) -> dict: + from tqdm import tqdm + + shard_idx, shard_dataset = args + prefix = f"shard_{self.rank}_{shard_idx}" + shard_output_path = self._config.output_path / prefix + documents = [ + np.array(item["input_ids"], dtype=self.dataset.data_type.numpy) + for item in tqdm(shard_dataset, desc=f"Saving shard {shard_idx}", unit="docs") + ] + GPTMemmapDataset.write_dataset(prefix=shard_output_path, documents=documents) + dataset_dict = { + "prefix": prefix, + "num_documents": len(documents), + "num_tokens": sum(len(doc) for doc in documents), + } + return dataset_dict + + def run(self): + import datasets + import transformers + from tqdm import tqdm + + # Set transformers logging verbosity + transformers.logging.set_verbosity_error() + + if self._config.dataset.disable_disk_space_check: + datasets.builder.has_sufficient_disk_space = lambda needed_bytes, directory=".": True + + # Initialize distributed processing + if self._config.distributed.world_size > 1: + torch.distributed.init_process_group( + backend=self._config.distributed.backend, + rank=self._config.distributed.rank, + world_size=self._config.distributed.world_size, + ) + + # Prepare output directory + self._config.output_path.mkdir(parents=True, exist_ok=True) + + # Download dataset + download_path = self._config.output_path / "downloaded_dataset" + if self._config.distributed.rank == 0: + datasets.load_dataset( + path=self._config.dataset.name_or_path, + name=self._config.dataset.config_name, + split=self._config.dataset.split, + num_proc=self._config.loading_workers, + trust_remote_code=self._config.dataset.trust_remote_code, + ).save_to_disk(download_path, num_proc=self._config.saving_workers) + + # Synchronize processes to wait for the download + if self._config.distributed.world_size > 1: + torch.distributed.barrier() + + # Load and shard the dataset + dataset = datasets.load_from_disk(download_path).shard( + num_shards=self._config.distributed.world_size, + index=self._config.distribted.rank, + ) + if self._config.dataset.field not in dataset.column_names: + raise ValueError(f"Dataset does not have field '{self._config.dataset.field}'.") + + # Tokenize the dataset + tokenized_dataset = dataset.map( + self._tokenize_batch, + batched=True, + num_proc=self._config.tokenize_workers, + desc="Tokenizing batches", + ) + + # Calculate total number of tokens + total_tokens = sum(tqdm(tokenized_dataset["num_tokens"], desc="Counting tokens", unit="tokens")) + + # Split dataset into shards + num_shards = int(np.ceil(total_tokens / self._config.tokens_per_shard)) + shards = [ + (i, tokenized_dataset.shard(num_shards=num_shards, index=i)) + for i in tqdm(range(num_shards), desc="Creating shards") + ] + + # Use multiprocessing to save each shard in parallel + with Pool(processes=self._config.saving_workers) as pool: + dataset_dicts = pool.map(self._save_shard, shards) + + # Gather dataset_dicts from all ranks to rank 0 + if self._config.distributed.world_size > 1: + all_dataset_dicts = [None] * self._config.distributed.world_size + torch.distributed.gather_object(dataset_dicts, all_dataset_dicts, dst=0) + if self._config.distributed.rank == 0: + dataset_dicts = [item for sublist in all_dataset_dicts for item in sublist] + + # Create a metadata file + if self._config.distributed.rank == 0: + total_tokens = sum(dataset_dict["num_tokens"] for dataset_dict in dataset_dicts) + for dataset_dict in dataset_dicts: + dataset_dict["weight"] = float(dataset_dict["num_tokens"]) / float(total_tokens) + output_file = self._config.output_path / "fast_llm_dataset.json" + json.dump({"datasets": dataset_dicts}, output_file.open("w")) + + # Finalize distributed processing + if self._config.distributed.world_size > 1: + torch.distributed.barrier() + torch.distributed.destroy_process_group() + + +dataset_preparator_registry = Registry( + "DatasetPreparator", + { + dataset_preparator.model_name: dataset_preparator + for dataset_preparator in [ + GPTDatasetPreparatorConfig, + ] + }, +) + + +class PrepareDatasetConfig(RunnableConfig): + @classmethod + def _get_parser(cls): + parser = super()._get_parser() + parser.add_argument( + "model_type", + choices=dataset_preparator_registry.keys(), + help="The Fast-LLM model type to use. Must be defined in the model registry in `fast_llm.models.auto`.", + ) + return parser + + @classmethod + def _from_parsed_args(cls, parsed: argparse.Namespace, unparsed: list[str]): + return dataset_preparator_registry[parsed.model_type]._from_parsed_args(parsed, unparsed) + + +if __name__ == "__main__": + PrepareDatasetConfig.parse_and_run() diff --git a/fast_llm/utils.py b/fast_llm/utils.py index 937aacd8..111d5ca1 100644 --- a/fast_llm/utils.py +++ b/fast_llm/utils.py @@ -116,6 +116,10 @@ def in_range_incl(x, low, high): @staticmethod def none(x): assert x is None, f"Object of type {type(x)} is not None ({str(x)})" + + @staticmethod + def not_none(x): + assert x is not None, "Object is None" @staticmethod def empty(x): @@ -175,8 +179,8 @@ def not_custom(fn, *args, **kwargs): ), f"Assertion failed: not fn({', '.join(itertools.chain((str(x) for x in args),(f'{str(k)}={str(v)}' for k,v in kwargs.items())))})" -class Registry: - def __init__(self, name, data: dict): +class Registry[_KT, _VT]: + def __init__(self, name: str, data: dict[_KT, _VT]): self._name = name self._data = data.copy() diff --git a/setup.cfg b/setup.cfg index d45144a5..1cf0a541 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,11 +32,14 @@ OPTIONAL = # Huggingface tools transformers>=4.44.2 hf-transfer>=0.1.8 + datasets>=3.1.0 # Weights and biases wandb>=0.17.7 # Hydra hydra-core>=1.3.2 omegaconf>=2.3.0 + # Miscaleaneous + tqdm>=4.66.3 # Required for testing DEV = From 0ffc75c6e3039e2456810749d506e277e84b890b Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sat, 9 Nov 2024 23:39:01 -0500 Subject: [PATCH 04/30] add prepare-dataset command --- fast_llm/tools/prepare_dataset.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py index 80dc1426..67f82afb 100644 --- a/fast_llm/tools/prepare_dataset.py +++ b/fast_llm/tools/prepare_dataset.py @@ -199,8 +199,8 @@ class GPTDatasetPreparator(DatasetPreparator): def _tokenize_batch(self, batch): input_ids = [ - np.array(self._tokenizer.tokenize(text), dtype=self.dataset.data_type.numpy) - for text in batch[self.dataset.field] + np.array(self._config._tokenizer.tokenize(text), dtype=self._config.dataset.data_type.numpy) + for text in batch[self._config.dataset.field] ] num_tokens = [len(x) for x in input_ids] return { @@ -266,7 +266,7 @@ def run(self): # Load and shard the dataset dataset = datasets.load_from_disk(download_path).shard( num_shards=self._config.distributed.world_size, - index=self._config.distribted.rank, + index=self._config.distributed.rank, ) if self._config.dataset.field not in dataset.column_names: raise ValueError(f"Dataset does not have field '{self._config.dataset.field}'.") From fda6386538516774efed05d6a2540d708c6eb67b Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sun, 10 Nov 2024 09:42:54 -0500 Subject: [PATCH 05/30] add prepare-dataset command --- fast_llm/tools/prepare_dataset.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py index 67f82afb..f0437dd0 100644 --- a/fast_llm/tools/prepare_dataset.py +++ b/fast_llm/tools/prepare_dataset.py @@ -212,10 +212,10 @@ def _save_shard(self, args) -> dict: from tqdm import tqdm shard_idx, shard_dataset = args - prefix = f"shard_{self.rank}_{shard_idx}" + prefix = f"shard_{self._config.distributed.rank}_{shard_idx}" shard_output_path = self._config.output_path / prefix documents = [ - np.array(item["input_ids"], dtype=self.dataset.data_type.numpy) + np.array(item["input_ids"], dtype=self._config.dataset.data_type.numpy) for item in tqdm(shard_dataset, desc=f"Saving shard {shard_idx}", unit="docs") ] GPTMemmapDataset.write_dataset(prefix=shard_output_path, documents=documents) @@ -295,10 +295,12 @@ def run(self): # Gather dataset_dicts from all ranks to rank 0 if self._config.distributed.world_size > 1: - all_dataset_dicts = [None] * self._config.distributed.world_size - torch.distributed.gather_object(dataset_dicts, all_dataset_dicts, dst=0) if self._config.distributed.rank == 0: + all_dataset_dicts = [None] * self._config.distributed.world_size + torch.distributed.gather_object(dataset_dicts, all_dataset_dicts, dst=0) dataset_dicts = [item for sublist in all_dataset_dicts for item in sublist] + else: + torch.distributed.gather_object(dataset_dicts, [], dst=0) # Create a metadata file if self._config.distributed.rank == 0: From acae7d92960ac11eb9d05035a8a41a5f8c3a0a69 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sun, 10 Nov 2024 09:58:00 -0500 Subject: [PATCH 06/30] add prepare-dataset command --- fast_llm/tools/prepare_dataset.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py index f0437dd0..d172c7c6 100644 --- a/fast_llm/tools/prepare_dataset.py +++ b/fast_llm/tools/prepare_dataset.py @@ -155,6 +155,11 @@ class GPTDatasetPreparatorConfig(DatasetPreparatorConfig): hint=FieldHint.optional, valid=check_field(Assert.geq, 1), ) + clean_output: bool = Field( + default=False, + desc="Remove downloaded dataset after processing.", + hint=FieldHint.optional, + ) dataset: GPTDatasetConfig = Field( default_factory=GPTDatasetConfig, desc="Configuration for the dataset.", @@ -248,9 +253,10 @@ def run(self): # Prepare output directory self._config.output_path.mkdir(parents=True, exist_ok=True) - # Download dataset + # Download dataset if necessary on rank 0 download_path = self._config.output_path / "downloaded_dataset" - if self._config.distributed.rank == 0: + download_path_ok = download_path / "ok" + if self._config.distributed.rank == 0 and not download_path_ok.exists(): datasets.load_dataset( path=self._config.dataset.name_or_path, name=self._config.dataset.config_name, @@ -258,12 +264,13 @@ def run(self): num_proc=self._config.loading_workers, trust_remote_code=self._config.dataset.trust_remote_code, ).save_to_disk(download_path, num_proc=self._config.saving_workers) + download_path_ok.touch() - # Synchronize processes to wait for the download + # Synchronize processes to wait for the download to finish if self._config.distributed.world_size > 1: torch.distributed.barrier() - # Load and shard the dataset + # Load and shard the dataset on each rank dataset = datasets.load_from_disk(download_path).shard( num_shards=self._config.distributed.world_size, index=self._config.distributed.rank, @@ -271,7 +278,7 @@ def run(self): if self._config.dataset.field not in dataset.column_names: raise ValueError(f"Dataset does not have field '{self._config.dataset.field}'.") - # Tokenize the dataset + # Tokenize the dataset in parallel tokenized_dataset = dataset.map( self._tokenize_batch, batched=True, @@ -282,14 +289,14 @@ def run(self): # Calculate total number of tokens total_tokens = sum(tqdm(tokenized_dataset["num_tokens"], desc="Counting tokens", unit="tokens")) - # Split dataset into shards + # Split dataset into shards based on number of tokens num_shards = int(np.ceil(total_tokens / self._config.tokens_per_shard)) shards = [ (i, tokenized_dataset.shard(num_shards=num_shards, index=i)) for i in tqdm(range(num_shards), desc="Creating shards") ] - # Use multiprocessing to save each shard in parallel + # Use multiprocessing to save each shard in parallel on all ranks with Pool(processes=self._config.saving_workers) as pool: dataset_dicts = pool.map(self._save_shard, shards) @@ -302,7 +309,7 @@ def run(self): else: torch.distributed.gather_object(dataset_dicts, [], dst=0) - # Create a metadata file + # Create a metadata file on rank 0 if self._config.distributed.rank == 0: total_tokens = sum(dataset_dict["num_tokens"] for dataset_dict in dataset_dicts) for dataset_dict in dataset_dicts: @@ -314,6 +321,10 @@ def run(self): if self._config.distributed.world_size > 1: torch.distributed.barrier() torch.distributed.destroy_process_group() + + # Clean up downloaded dataset + if self._config.clean_output and self._config.distributed.rank == 0: + download_path.unlink(missing_ok=True) dataset_preparator_registry = Registry( From eb7da598608719db6f995422b3fe28e8e72719ff Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sun, 10 Nov 2024 10:01:14 -0500 Subject: [PATCH 07/30] add prepare-dataset command --- fast_llm/tools/prepare_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py index d172c7c6..b8d18fa8 100644 --- a/fast_llm/tools/prepare_dataset.py +++ b/fast_llm/tools/prepare_dataset.py @@ -155,7 +155,7 @@ class GPTDatasetPreparatorConfig(DatasetPreparatorConfig): hint=FieldHint.optional, valid=check_field(Assert.geq, 1), ) - clean_output: bool = Field( + remove_downloads: bool = Field( default=False, desc="Remove downloaded dataset after processing.", hint=FieldHint.optional, @@ -323,7 +323,7 @@ def run(self): torch.distributed.destroy_process_group() # Clean up downloaded dataset - if self._config.clean_output and self._config.distributed.rank == 0: + if self._config.remove_downloads and self._config.distributed.rank == 0: download_path.unlink(missing_ok=True) From b5ed2f0535fafff54e77ea1e93217cb89b358d0e Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sun, 10 Nov 2024 10:05:14 -0500 Subject: [PATCH 08/30] add prepare-dataset command --- fast_llm/tools/prepare_dataset.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py index b8d18fa8..38a79221 100644 --- a/fast_llm/tools/prepare_dataset.py +++ b/fast_llm/tools/prepare_dataset.py @@ -3,6 +3,7 @@ import json import os import pathlib +import shutil import typing from multiprocessing import Pool @@ -321,10 +322,10 @@ def run(self): if self._config.distributed.world_size > 1: torch.distributed.barrier() torch.distributed.destroy_process_group() - + # Clean up downloaded dataset if self._config.remove_downloads and self._config.distributed.rank == 0: - download_path.unlink(missing_ok=True) + shutil.rmtree(download_path) dataset_preparator_registry = Registry( From c8f746a55a871652352e6e017f2d65cbdfd38998 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sun, 10 Nov 2024 13:40:18 -0500 Subject: [PATCH 09/30] only push latest tag for commits to main --- .github/workflows/ci.yaml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8629c06b..7b3accb2 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -57,12 +57,9 @@ jobs: ghcr.io/servicenow/fast-llm tags: | type=schedule - type=ref,event=branch - type=semver,pattern={{version}} - type=semver,pattern={{major}}.{{minor}} - type=semver,pattern={{major}} + type=pep440,pattern={{version}} type=sha - type=raw,value=latest,enabled={{github.ref == 'refs/heads/main'}} + type=raw,value=latest,enable={{is_default_branch}} - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 @@ -78,7 +75,6 @@ jobs: uses: docker/build-push-action@v6 with: context: . - # push: ${{ github.event_name != 'pull_request' }} push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} From e0f813ce2e4d944ad91e4c2fdd9ab93022bb187f Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sun, 10 Nov 2024 14:01:49 -0500 Subject: [PATCH 10/30] use older generics syntax --- fast_llm/utils.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/fast_llm/utils.py b/fast_llm/utils.py index 111d5ca1..285c2272 100644 --- a/fast_llm/utils.py +++ b/fast_llm/utils.py @@ -116,7 +116,7 @@ def in_range_incl(x, low, high): @staticmethod def none(x): assert x is None, f"Object of type {type(x)} is not None ({str(x)})" - + @staticmethod def not_none(x): assert x is not None, "Object is None" @@ -179,7 +179,11 @@ def not_custom(fn, *args, **kwargs): ), f"Assertion failed: not fn({', '.join(itertools.chain((str(x) for x in args),(f'{str(k)}={str(v)}' for k,v in kwargs.items())))})" -class Registry[_KT, _VT]: +_KT = typing.TypeVar("_KT") +_VT = typing.TypeVar("_VT") + + +class Registry(typing.Generic[_KT, _VT]): def __init__(self, name: str, data: dict[_KT, _VT]): self._name = name self._data = data.copy() From b88c9d3eae97e63d9e1d67ebbf99fe18d4839ccd Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sun, 10 Nov 2024 14:15:10 -0500 Subject: [PATCH 11/30] remove user and install Fast-LLM globally --- Dockerfile | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9c3ecf49..956cafb7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,28 +7,32 @@ RUN apt-get update \ && rm -rf /var/lib/apt/lists/* \ && git lfs install -# Add a user for Fast-LLM with sudo privileges for runtime adjustments -ARG FAST_LLM_USER_ID=1000 -RUN useradd -m -u $FAST_LLM_USER_ID -s /bin/bash fast_llm \ - && echo 'fast_llm ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers +# Create a generic writable home directory for arbitrary users +RUN mkdir -p /home/user && chmod -R a+w /home/user -USER fast_llm +# Set the working directory WORKDIR /app -# Environment settings for Python and PATH +# Environment settings for Python and the user ENV PYTHONPATH=/app:/app/Megatron-LM \ - PATH=$PATH:/home/fast_llm/.local/bin/ + HOME=/home/user -# Copy the dependency files and install dependencies -COPY --chown=fast_llm setup.py setup.cfg pyproject.toml ./ -COPY --chown=fast_llm ./fast_llm/csrc/ fast_llm/csrc/ +# Copy the dependency files and install dependencies globally +COPY setup.py setup.cfg pyproject.toml ./ +COPY ./fast_llm/csrc/ fast_llm/csrc/ RUN PIP_NO_INPUT=1 pip3 install --no-cache-dir --no-build-isolation -e ".[CORE,OPTIONAL,DEV]" # Copy the rest of the code -COPY --chown=fast_llm ./Megatron-LM Megatron-LM -COPY --chown=fast_llm ./examples examples -COPY --chown=fast_llm ./tests tests -COPY --chown=fast_llm ./tools tools +COPY ./Megatron-LM Megatron-LM +COPY ./examples examples +COPY ./tests tests +COPY ./tools tools -# Copy the main source code for Fast-LLM -COPY --exclude=./fast_llm/csrc/ --chown=fast_llm ./fast_llm/ fast_llm/ +# Copy the main source code +COPY --exclude=./fast_llm/csrc/ ./fast_llm/ fast_llm/ + +# Ensure the source code files are writable +RUN chmod -R a+w /app + +# Ensure the user can write to the home directory +ENTRYPOINT ["/bin/bash", "-c", "export HOME=${HOME} && exec \"$@\"", "--"] From 4df12d964b2e6a016b526f898da82a10b2256b68 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Sun, 10 Nov 2024 19:39:11 -0500 Subject: [PATCH 12/30] simplify Dockerfile --- Dockerfile | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/Dockerfile b/Dockerfile index 956cafb7..e3804d59 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,15 +7,11 @@ RUN apt-get update \ && rm -rf /var/lib/apt/lists/* \ && git lfs install -# Create a generic writable home directory for arbitrary users -RUN mkdir -p /home/user && chmod -R a+w /home/user - # Set the working directory WORKDIR /app # Environment settings for Python and the user -ENV PYTHONPATH=/app:/app/Megatron-LM \ - HOME=/home/user +ENV PYTHONPATH=/app:/app/Megatron-LM # Copy the dependency files and install dependencies globally COPY setup.py setup.cfg pyproject.toml ./ @@ -33,6 +29,3 @@ COPY --exclude=./fast_llm/csrc/ ./fast_llm/ fast_llm/ # Ensure the source code files are writable RUN chmod -R a+w /app - -# Ensure the user can write to the home directory -ENTRYPOINT ["/bin/bash", "-c", "export HOME=${HOME} && exec \"$@\"", "--"] From 3737bc0d7fc99954842bf2cb2628fc325d740053 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Mon, 11 Nov 2024 18:14:41 -0500 Subject: [PATCH 13/30] improvements --- .dockerignore | 7 +++++++ Dockerfile | 57 ++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/.dockerignore b/.dockerignore index 2022ee39..0ed5480a 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,4 +1,7 @@ +# Ignore everything by default * + +# Allow specific files and directories !setup.py !setup.cfg !Megatron-LM @@ -7,3 +10,7 @@ !tools !tests !pyproject.toml + +# Exclude Python cache directories and shared object files within included directories +**/__pycache__/ +**/*.so diff --git a/Dockerfile b/Dockerfile index e3804d59..41fbd2c9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,31 +1,60 @@ # syntax=docker/dockerfile:1.7-labs -FROM nvcr.io/nvidia/pytorch:24.07-py3 +FROM nvcr.io/nvidia/pytorch:24.07-py3 as base -# Install git-lfs for Huggingface hub interaction and sudo for system adjustments +# Install dependencies RUN apt-get update \ - && apt-get install --no-install-recommends -y git-lfs sudo util-linux \ + && apt-get install --no-install-recommends -y acl python3.10-venv git-lfs \ && rm -rf /var/lib/apt/lists/* \ && git lfs install # Set the working directory WORKDIR /app -# Environment settings for Python and the user -ENV PYTHONPATH=/app:/app/Megatron-LM +# Set the setgid bit and default ACL for /app +RUN chmod g+s /app && \ + setfacl -d -m u::rwx,g::rwx,o::rwx /app && \ + setfacl -d -m u::rw-,g::rw-,o::rw- /app -# Copy the dependency files and install dependencies globally -COPY setup.py setup.cfg pyproject.toml ./ -COPY ./fast_llm/csrc/ fast_llm/csrc/ -RUN PIP_NO_INPUT=1 pip3 install --no-cache-dir --no-build-isolation -e ".[CORE,OPTIONAL,DEV]" +# Environment settings for the virtual environment +ENV VIRTUAL_ENV=/app/venv +ENV PATH="$VIRTUAL_ENV/bin:$PATH" -# Copy the rest of the code +# Create the virtual environment with system site packages +RUN python3 -m venv $VIRTUAL_ENV --system-site-packages + +# Copy dependency files with universal write permissions for all users +COPY --chmod=666 setup.py setup.cfg pyproject.toml ./ +COPY --chmod=666 ./fast_llm/csrc/ fast_llm/csrc/ + +# Install dependencies within the virtual environment +RUN pip install --no-cache-dir --no-build-isolation -e ".[CORE,OPTIONAL,DEV]" + +# Use intermediate build stage to copy the remaining source code +FROM alpine as copy_source + +# Set the working directory +WORKDIR /app + +# Copy remaining source code with universal write permissions COPY ./Megatron-LM Megatron-LM COPY ./examples examples COPY ./tests tests COPY ./tools tools - -# Copy the main source code COPY --exclude=./fast_llm/csrc/ ./fast_llm/ fast_llm/ -# Ensure the source code files are writable -RUN chmod -R a+w /app +RUN find Megatron-LM -type f -exec chmod 666 {} \; && \ + find examples -type f -exec chmod 666 {} \; && \ + find tests -type f -exec chmod 666 {} \; && \ + find tools -type f -exec chmod 666 {} \; && \ + find fast_llm -type f -exec chmod 666 {} \; && \ + find . -type d -exec chmod 777 {} \; + +# Create a tar archive of /app with permissions preserved +RUN tar -cf /app.tar -C /app . + +# Continue with the base stage +FROM base + +# Copy the remaining source code from the intermediate build stage +COPY --from=copy_source /app.tar / +RUN tar -xf /app.tar -C /app && rm /app.tar From 4b6b195548235d38ee85d866c0893083e8df94e8 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Mon, 11 Nov 2024 18:26:38 -0500 Subject: [PATCH 14/30] add docstring --- fast_llm/data/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fast_llm/data/config.py b/fast_llm/data/config.py index 5e829150..32d48fab 100644 --- a/fast_llm/data/config.py +++ b/fast_llm/data/config.py @@ -106,6 +106,7 @@ def _validate(self): class TokenizerConfig(Config): """ Configuration for the tokenizer. + The tokenizer is needed for FIM and dataset preparation. """ format: str = Field( From 52a6f0be9719e23f5a589a9767c1b8dd33f49659 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Mon, 11 Nov 2024 18:28:37 -0500 Subject: [PATCH 15/30] use full imports --- tests/test_memmap_dataset.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/test_memmap_dataset.py b/tests/test_memmap_dataset.py index 49ca44d1..6d27faf3 100644 --- a/tests/test_memmap_dataset.py +++ b/tests/test_memmap_dataset.py @@ -1,21 +1,24 @@ -from hypothesis import given, strategies as st -from hypothesis.extra import numpy as npst +import hypothesis +import hypothesis.strategies +import hypothesis.extra.numpy import numpy as np from tempfile import TemporaryDirectory from pathlib import Path from fast_llm.data.gpt.memmap import GPTMemmapDataset import pytest -def dtype_arrays(dtype: np.dtype, min_size: int=1, max_size: int=100) -> st.SearchStrategy: - return st.lists( - npst.arrays(dtype=dtype, shape=st.integers(1, 1000)), + +def dtype_arrays(dtype: np.dtype, min_size: int = 1, max_size: int = 100) -> hypothesis.strategies.SearchStrategy: + return hypothesis.strategies.lists( + hypothesis.extra.numpy.arrays(dtype=dtype, shape=hypothesis.strategies.integers(1, 1000)), min_size=min_size, max_size=max_size, ) + @pytest.mark.parametrize("dtype", GPTMemmapDataset._DTYPES.values()) def test_gpt_memmap_dataset(dtype): - @given(documents=dtype_arrays(dtype)) + @hypothesis.given(documents=dtype_arrays(dtype)) def inner_test(documents): with TemporaryDirectory() as temp_dir: prefix = Path(temp_dir) @@ -23,5 +26,5 @@ def inner_test(documents): dataset = GPTMemmapDataset(name="foo", prefix=prefix) for i, document in enumerate(documents): assert np.array_equal(dataset.get(i), document, equal_nan=True), f"Mismatch at index {i}" - + inner_test() From 55b0b88ea466946bb4f13d8c8823cc442fd9cf81 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Mon, 11 Nov 2024 18:29:17 -0500 Subject: [PATCH 16/30] use full imports --- tests/test_memmap_dataset.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/test_memmap_dataset.py b/tests/test_memmap_dataset.py index 6d27faf3..8cef40a8 100644 --- a/tests/test_memmap_dataset.py +++ b/tests/test_memmap_dataset.py @@ -1,12 +1,14 @@ +import pathlib +from tempfile import TemporaryDirectory + import hypothesis -import hypothesis.strategies import hypothesis.extra.numpy +import hypothesis.strategies import numpy as np -from tempfile import TemporaryDirectory -from pathlib import Path -from fast_llm.data.gpt.memmap import GPTMemmapDataset import pytest +from fast_llm.data.gpt.memmap import GPTMemmapDataset + def dtype_arrays(dtype: np.dtype, min_size: int = 1, max_size: int = 100) -> hypothesis.strategies.SearchStrategy: return hypothesis.strategies.lists( @@ -21,7 +23,7 @@ def test_gpt_memmap_dataset(dtype): @hypothesis.given(documents=dtype_arrays(dtype)) def inner_test(documents): with TemporaryDirectory() as temp_dir: - prefix = Path(temp_dir) + prefix = pathlib.Path(temp_dir) GPTMemmapDataset.write_dataset(prefix=prefix, documents=documents) dataset = GPTMemmapDataset(name="foo", prefix=prefix) for i, document in enumerate(documents): From 1f975d227ad586e577557a42f366ca50310efa32 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Mon, 11 Nov 2024 18:30:51 -0500 Subject: [PATCH 17/30] use full imports --- fast_llm/tools/prepare_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py index 38a79221..16a7d930 100644 --- a/fast_llm/tools/prepare_dataset.py +++ b/fast_llm/tools/prepare_dataset.py @@ -5,7 +5,7 @@ import pathlib import shutil import typing -from multiprocessing import Pool +import multiprocessing import numpy as np import torch.distributed @@ -298,7 +298,7 @@ def run(self): ] # Use multiprocessing to save each shard in parallel on all ranks - with Pool(processes=self._config.saving_workers) as pool: + with multiprocessing.Pool(processes=self._config.saving_workers) as pool: dataset_dicts = pool.map(self._save_shard, shards) # Gather dataset_dicts from all ranks to rank 0 From b665e914f8bff9be4b20ba93081450cc44c7d4e5 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Mon, 11 Nov 2024 18:37:44 -0500 Subject: [PATCH 18/30] don't load tokenizer during validatin --- fast_llm/tools/prepare_dataset.py | 39 ++++++++++++++++++------------- fast_llm/utils.py | 4 ---- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py index 16a7d930..cc5dc912 100644 --- a/fast_llm/tools/prepare_dataset.py +++ b/fast_llm/tools/prepare_dataset.py @@ -110,10 +110,10 @@ class GPTDatasetConfig(Config): desc="Field of the dataset to use.", hint=FieldHint.optional, ) - data_type: DataType = Field( + data_type: DataType | None = Field( default=None, - desc="Data type of the dataset field.", - hint=FieldHint.derived, + desc="Data type of the dataset field. If not provided, it will be inferred based on the tokenizer vocabulary size.", + hint=FieldHint.optional, ) trust_remote_code: bool = Field( default=False, @@ -178,19 +178,9 @@ class GPTDatasetPreparatorConfig(DatasetPreparatorConfig): ) def _validate(self): - Assert.not_none(self.tokenizer.path) - self._tokenizer = Tokenizer(config=self.tokenizer) - if self.dataset.data_type is None: - # Decide the datatype based on the tokenizer vocabulary size - vocab_size = self._tokenizer.vocab_size - if vocab_size <= np.iinfo(np.int16).max: - self.dataset.data_type = DataType.int16 - # elif vocab_size <= np.iinfo(np.uint16).max: - # self.dataset.data_type = DataType.uint16 # Not supported by Fast-LLM's DataType - elif vocab_size <= np.iinfo(np.int32).max: - self.dataset.data_type = DataType.int32 - else: - raise ValueError(f"Tokenizer vocabulary size {vocab_size} is too large. This is likely an error.") + assert self.tokenizer.path is not None + if self.dataset.data_type is not None: + Assert.incl(self.dataset.data_type.numpy, GPTMemmapDataset._DTYPES.values()) super()._validate() @classmethod @@ -240,9 +230,26 @@ def run(self): # Set transformers logging verbosity transformers.logging.set_verbosity_error() + # Disable disk space check if requested if self._config.dataset.disable_disk_space_check: datasets.builder.has_sufficient_disk_space = lambda needed_bytes, directory=".": True + # Load tokenizer + self._tokenizer = Tokenizer(config=self.tokenizer) + + # Set data type if not provided + if self.dataset.data_type is None: + # Decide the datatype based on the tokenizer vocabulary size + vocab_size = self._tokenizer.vocab_size + if vocab_size <= np.iinfo(np.int16).max: + self.dataset.data_type = DataType.int16 + # elif vocab_size <= np.iinfo(np.uint16).max: + # self.dataset.data_type = DataType.uint16 # Not supported by Fast-LLM's DataType + elif vocab_size <= np.iinfo(np.int32).max: + self.dataset.data_type = DataType.int32 + else: + raise ValueError(f"Tokenizer vocabulary size {vocab_size} is too large. This is likely an error.") + # Initialize distributed processing if self._config.distributed.world_size > 1: torch.distributed.init_process_group( diff --git a/fast_llm/utils.py b/fast_llm/utils.py index 285c2272..66539efb 100644 --- a/fast_llm/utils.py +++ b/fast_llm/utils.py @@ -117,10 +117,6 @@ def in_range_incl(x, low, high): def none(x): assert x is None, f"Object of type {type(x)} is not None ({str(x)})" - @staticmethod - def not_none(x): - assert x is not None, "Object is None" - @staticmethod def empty(x): assert len(x) == 0, f"Not empty (len={len(x)}), {x}" From e51677f54992abf5f861c7acd17873bacb60472d Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Mon, 11 Nov 2024 19:10:17 -0500 Subject: [PATCH 19/30] simplify --- Dockerfile | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 41fbd2c9..99611493 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,12 +42,8 @@ COPY ./tests tests COPY ./tools tools COPY --exclude=./fast_llm/csrc/ ./fast_llm/ fast_llm/ -RUN find Megatron-LM -type f -exec chmod 666 {} \; && \ - find examples -type f -exec chmod 666 {} \; && \ - find tests -type f -exec chmod 666 {} \; && \ - find tools -type f -exec chmod 666 {} \; && \ - find fast_llm -type f -exec chmod 666 {} \; && \ - find . -type d -exec chmod 777 {} \; +# Set permissions for all users to write to /app +RUN chmod -R a+w /app # Create a tar archive of /app with permissions preserved RUN tar -cf /app.tar -C /app . From 1f447bbb1ea81525c4bd76852e80bd3d8ba14e76 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Tue, 12 Nov 2024 08:06:38 -0500 Subject: [PATCH 20/30] simplify --- Dockerfile | 39 +++++++++------------------------------ 1 file changed, 9 insertions(+), 30 deletions(-) diff --git a/Dockerfile b/Dockerfile index 99611493..64d2bb36 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,10 +10,8 @@ RUN apt-get update \ # Set the working directory WORKDIR /app -# Set the setgid bit and default ACL for /app -RUN chmod g+s /app && \ - setfacl -d -m u::rwx,g::rwx,o::rwx /app && \ - setfacl -d -m u::rw-,g::rw-,o::rw- /app +# Set the default ACL for /app to rwx for all users +RUN setfacl -d -m u::rwx,g::rwx,o::rwx /app # Environment settings for the virtual environment ENV VIRTUAL_ENV=/app/venv @@ -23,34 +21,15 @@ ENV PATH="$VIRTUAL_ENV/bin:$PATH" RUN python3 -m venv $VIRTUAL_ENV --system-site-packages # Copy dependency files with universal write permissions for all users -COPY --chmod=666 setup.py setup.cfg pyproject.toml ./ -COPY --chmod=666 ./fast_llm/csrc/ fast_llm/csrc/ +COPY --chmod=777 setup.py setup.cfg pyproject.toml ./ +COPY --chmod=777 ./fast_llm/csrc/ fast_llm/csrc/ # Install dependencies within the virtual environment RUN pip install --no-cache-dir --no-build-isolation -e ".[CORE,OPTIONAL,DEV]" -# Use intermediate build stage to copy the remaining source code -FROM alpine as copy_source - -# Set the working directory -WORKDIR /app - # Copy remaining source code with universal write permissions -COPY ./Megatron-LM Megatron-LM -COPY ./examples examples -COPY ./tests tests -COPY ./tools tools -COPY --exclude=./fast_llm/csrc/ ./fast_llm/ fast_llm/ - -# Set permissions for all users to write to /app -RUN chmod -R a+w /app - -# Create a tar archive of /app with permissions preserved -RUN tar -cf /app.tar -C /app . - -# Continue with the base stage -FROM base - -# Copy the remaining source code from the intermediate build stage -COPY --from=copy_source /app.tar / -RUN tar -xf /app.tar -C /app && rm /app.tar +COPY --chmod=777 ./Megatron-LM Megatron-LM +COPY --chmod=777 ./examples examples +COPY --chmod=777 ./tests tests +COPY --chmod=777 ./tools tools +COPY --chmod=777 --exclude=./fast_llm/csrc/ ./fast_llm/ fast_llm/ From fb50c13c0fae7ffecb33859383cd97566fd45010 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Tue, 12 Nov 2024 08:35:19 -0500 Subject: [PATCH 21/30] address comments --- fast_llm/data/config.py | 72 +++++++++++++++++++++++++++++ fast_llm/data/prepare.py | 0 fast_llm/tools/prepare_dataset.py | 75 ++----------------------------- fast_llm/utils.py | 8 ++-- setup.cfg | 2 +- 5 files changed, 80 insertions(+), 77 deletions(-) create mode 100644 fast_llm/data/prepare.py diff --git a/fast_llm/data/config.py b/fast_llm/data/config.py index 59476eb4..7add6092 100644 --- a/fast_llm/data/config.py +++ b/fast_llm/data/config.py @@ -1,9 +1,12 @@ import abc +import argparse import enum +import os import pathlib import typing from fast_llm.config import Config, Field, FieldHint, check_field, config_class, skip_valid_if_none +from fast_llm.engine.config_utils.runnable import RunnableConfig from fast_llm.engine.distributed.config import PhaseType from fast_llm.engine.schedule.config import BatchConfig from fast_llm.utils import Assert @@ -186,3 +189,72 @@ def __getitem__(self, index: int): @abc.abstractmethod def __len__(self): pass + + +@config_class +class _DistributedConfig(Config): + # TODO: Unify with fast_llm.engine.distributed.config.DistributedConfig + + default_world_size: typing.ClassVar[int] = int(os.environ.get("WORLD_SIZE", 1)) + default_rank: typing.ClassVar[int] = int(os.environ.get("RANK", 0)) + world_size: int = Field( + default=None, + desc="Size of the world group. Typically provided by torchrun or equivalent through the `WORLD_SIZE` environment variable.", + hint=FieldHint.expert, + valid=check_field(Assert.gt, 0), + ) + rank: int = Field( + default=None, + desc="Rank of the local process. Typically provided by torchrun or equivalent through the `RANK` environment variable.", + hint=FieldHint.expert, + valid=check_field(Assert.geq, 0), + ) + backend: str = Field( + default="gloo", + desc="Distributed backend to use.", + hint=FieldHint.optional, + ) + + def _validate(self): + if self.world_size is None: + self.world_size = self.default_world_size + if self.rank is None: + self.rank = self.default_rank + super()._validate() + Assert.in_range(self.rank, 0, self.world_size) + + +@config_class() +class DatasetPreparatorConfig(RunnableConfig): + preparator_name: typing.ClassVar[str] + + output_path: pathlib.Path = Field( + desc="Output directory for the processed dataset.", + hint=FieldHint.core, + ) + distributed: _DistributedConfig = Field( + default_factory=_DistributedConfig, + desc="Configuration for distributed processing.", + hint=FieldHint.feature, + ) + + @classmethod + def get_dataset_preparator_class(cls) -> typing.Type["DatasetPreparator"]: + raise NotImplementedError + + def _get_runnable(self, parsed: argparse.Namespace) -> typing.Callable[[], None]: + dataset_preparator = self.get_dataset_preparator_class()(config=self) + return dataset_preparator.run + + +class DatasetPreparator(abc.ABC): + _config: DatasetPreparatorConfig + config_class: typing.ClassVar[type[DatasetPreparatorConfig]] = DatasetPreparatorConfig + + def __init__(self, config: DatasetPreparatorConfig) -> None: + Assert.custom(isinstance, config, self.config_class) + config.validate() + self._config = config + + def run(self) -> None: + raise NotImplementedError diff --git a/fast_llm/data/prepare.py b/fast_llm/data/prepare.py new file mode 100644 index 00000000..e69de29b diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py index cc5dc912..8e4283aa 100644 --- a/fast_llm/tools/prepare_dataset.py +++ b/fast_llm/tools/prepare_dataset.py @@ -19,75 +19,6 @@ from fast_llm.utils import Assert, Registry -@config_class -class DistributedConfig(Config): - default_world_size: typing.ClassVar[int] = int(os.environ.get("WORLD_SIZE", 1)) - default_rank: typing.ClassVar[int] = int(os.environ.get("RANK", 0)) - world_size: int = Field( - default=None, - desc="Size of the world group. Typically provided by torchrun or equivalent through the `WORLD_SIZE` environment variable.", - hint=FieldHint.expert, - valid=check_field(Assert.gt, 0), - ) - rank: int = Field( - default=None, - desc="Rank of the local process. Typically provided by torchrun or equivalent through the `RANK` environment variable.", - hint=FieldHint.expert, - valid=check_field(Assert.geq, 0), - ) - backend: str = Field( - default="gloo", - desc="Distributed backend to use.", - hint=FieldHint.optional, - valid=check_field(Assert.incl, torch.distributed.Backend.backend_list), - ) - - def _validate(self): - if self.world_size is None: - self.world_size = self.default_world_size - if self.rank is None: - self.rank = self.default_rank - super()._validate() - Assert.in_range(self.rank, 0, self.world_size) - - -@config_class() -class DatasetPreparatorConfig(RunnableConfig): - _abstract = True - model_name: typing.ClassVar[str] - - output_path: pathlib.Path = Field( - desc="Output directory for the processed dataset.", - hint=FieldHint.core, - ) - distributed: DistributedConfig = Field( - default_factory=DistributedConfig, - desc="Configuration for distributed processing.", - hint=FieldHint.feature, - ) - - @classmethod - def get_dataset_preparator_class(cls) -> typing.Type["DatasetPreparator"]: - raise NotImplementedError - - def _get_runnable(self, parsed: argparse.Namespace) -> typing.Callable[[], None]: - dataset_preparator = self.get_dataset_preparator_class()(config=self) - return dataset_preparator.run - - -class DatasetPreparator(abc.ABC): - _abstract = True - _config: DatasetPreparatorConfig - config_class: typing.ClassVar[type[DatasetPreparatorConfig]] = DatasetPreparatorConfig - - def __init__(self, config: DatasetPreparatorConfig) -> None: - Assert.custom(isinstance, config, self.config_class) - config.validate() - self._config = config - - def run(self) -> None: - raise NotImplementedError - @config_class class GPTDatasetConfig(Config): @@ -130,13 +61,13 @@ class GPTDatasetConfig(Config): @config_class() class GPTDatasetPreparatorConfig(DatasetPreparatorConfig): _abstract = False - model_name: typing.ClassVar[str] = "gpt" + preparator_name: typing.ClassVar[str] = "gpt_memmap" tokens_per_shard: int = Field( - default=1_000_000_000, + default=10**9, desc="Approximate number of tokens per shard.", hint=FieldHint.feature, - valid=check_field(Assert.geq, 100_000), + valid=check_field(Assert.geq, 10**5), ) loading_workers: int = Field( default=1, diff --git a/fast_llm/utils.py b/fast_llm/utils.py index 66539efb..5ae1c5d0 100644 --- a/fast_llm/utils.py +++ b/fast_llm/utils.py @@ -175,12 +175,12 @@ def not_custom(fn, *args, **kwargs): ), f"Assertion failed: not fn({', '.join(itertools.chain((str(x) for x in args),(f'{str(k)}={str(v)}' for k,v in kwargs.items())))})" -_KT = typing.TypeVar("_KT") -_VT = typing.TypeVar("_VT") +_KeyType = typing.TypeVar("_KeyType") +_ValueType = typing.TypeVar("_ValueType") -class Registry(typing.Generic[_KT, _VT]): - def __init__(self, name: str, data: dict[_KT, _VT]): +class Registry(typing.Generic[_KeyType, _ValueType]): + def __init__(self, name: str, data: dict[_KeyType, _ValueType]): self._name = name self._data = data.copy() diff --git a/setup.cfg b/setup.cfg index 1cf0a541..68f3a064 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,7 +38,7 @@ OPTIONAL = # Hydra hydra-core>=1.3.2 omegaconf>=2.3.0 - # Miscaleaneous + # Miscellanous tqdm>=4.66.3 # Required for testing From 33067c81cc87a734272620c0dab24737575bfcf8 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Tue, 12 Nov 2024 08:37:00 -0500 Subject: [PATCH 22/30] address comments --- fast_llm/tools/prepare_dataset.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py index 8e4283aa..3339636f 100644 --- a/fast_llm/tools/prepare_dataset.py +++ b/fast_llm/tools/prepare_dataset.py @@ -1,8 +1,5 @@ -import abc import argparse import json -import os -import pathlib import shutil import typing import multiprocessing From dbc221c6dcfc17d5e6e0a59701a2d23cc192407a Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Tue, 12 Nov 2024 08:48:59 -0500 Subject: [PATCH 23/30] address comments --- fast_llm/data/auto.py | 12 ++ fast_llm/data/gpt/prepare.py | 260 +++++++++++++++++++++++++++++++++++ fast_llm/data/prepare.py | 0 3 files changed, 272 insertions(+) create mode 100644 fast_llm/data/auto.py create mode 100644 fast_llm/data/gpt/prepare.py delete mode 100644 fast_llm/data/prepare.py diff --git a/fast_llm/data/auto.py b/fast_llm/data/auto.py new file mode 100644 index 00000000..c7467fba --- /dev/null +++ b/fast_llm/data/auto.py @@ -0,0 +1,12 @@ +from fast_llm.data.gpt.prepare import GPTDatasetPreparatorConfig +from fast_llm.utils import Registry + +dataset_preparator_registry = Registry( + "DatasetPreparator", + { + dataset_preparator.model_name: dataset_preparator + for dataset_preparator in [ + GPTDatasetPreparatorConfig, + ] + }, +) diff --git a/fast_llm/data/gpt/prepare.py b/fast_llm/data/gpt/prepare.py new file mode 100644 index 00000000..6a7ad6d0 --- /dev/null +++ b/fast_llm/data/gpt/prepare.py @@ -0,0 +1,260 @@ +import json +import multiprocessing +import shutil +import typing + +import numpy as np +import torch.distributed + +from fast_llm.config import Config, Field, FieldHint, check_field, config_class +from fast_llm.data.config import DatasetPreparator, DatasetPreparatorConfig, TokenizerConfig +from fast_llm.data.gpt.memmap import GPTMemmapDataset +from fast_llm.data.tokenizer import Tokenizer +from fast_llm.engine.config_utils.data_type import DataType +from fast_llm.utils import Assert + + +@config_class +class GPTDatasetConfig(Config): + name_or_path: str = Field( + desc="Name or path of the dataset.", + hint=FieldHint.core, + ) + config_name: None | str = Field( + default=None, + desc="Specific configuration name for the dataset.", + hint=FieldHint.optional, + ) + split: str = Field( + default="train", + desc="Split of the dataset to use.", + hint=FieldHint.optional, + ) + field: str = Field( + default="text", + desc="Field of the dataset to use.", + hint=FieldHint.optional, + ) + data_type: DataType | None = Field( + default=None, + desc="Data type of the dataset field. If not provided, it will be inferred based on the tokenizer vocabulary size.", + hint=FieldHint.optional, + ) + trust_remote_code: bool = Field( + default=False, + desc="Trust remote code when downloading the dataset.", + hint=FieldHint.optional, + ) + disable_disk_space_check: bool = Field( + default=False, + desc="Disable disk space check. Useful for environments where disk space is not accurately reported.", + hint=FieldHint.optional, + ) + + +@config_class() +class GPTDatasetPreparatorConfig(DatasetPreparatorConfig): + _abstract = False + preparator_name: typing.ClassVar[str] = "gpt_memmap" + + tokens_per_shard: int = Field( + default=10**9, + desc="Approximate number of tokens per shard.", + hint=FieldHint.feature, + valid=check_field(Assert.geq, 10**5), + ) + loading_workers: int = Field( + default=1, + desc="Number of workers in load_dataset() call.", + hint=FieldHint.optional, + valid=check_field(Assert.geq, 1), + ) + tokenize_workers: int = Field( + default=1, + desc="Number of workers for tokenization.", + hint=FieldHint.optional, + valid=check_field(Assert.geq, 1), + ) + saving_workers: int = Field( + default=1, + desc="Number of processes for saving the data.", + hint=FieldHint.optional, + valid=check_field(Assert.geq, 1), + ) + remove_downloads: bool = Field( + default=False, + desc="Remove downloaded dataset after processing.", + hint=FieldHint.optional, + ) + dataset: GPTDatasetConfig = Field( + default_factory=GPTDatasetConfig, + desc="Configuration for the dataset.", + hint=FieldHint.feature, + ) + tokenizer: TokenizerConfig = Field( + default_factory=TokenizerConfig, + desc="Configuration for the tokenizer.", + hint=FieldHint.feature, + ) + _tokenizer: Tokenizer = Field( + init=False, + desc="The tokenizer instance.", + hint=FieldHint.derived, + ) + + def _validate(self): + assert self.tokenizer.path is not None + if self.dataset.data_type is not None: + Assert.incl(self.dataset.data_type.numpy, GPTMemmapDataset._DTYPES.values()) + super()._validate() + + @classmethod + def get_dataset_preparator_class(cls): + return GPTDatasetPreparator + + +class GPTDatasetPreparator(DatasetPreparator): + _abstract = False + _config: GPTDatasetPreparatorConfig + config_class = GPTDatasetPreparatorConfig + + def _tokenize_batch(self, batch): + input_ids = [ + np.array(self._config._tokenizer.tokenize(text), dtype=self._config.dataset.data_type.numpy) + for text in batch[self._config.dataset.field] + ] + num_tokens = [len(x) for x in input_ids] + return { + "input_ids": input_ids, + "num_tokens": num_tokens, + } + + def _save_shard(self, args) -> dict: + from tqdm import tqdm + + shard_idx, shard_dataset = args + prefix = f"shard_{self._config.distributed.rank}_{shard_idx}" + shard_output_path = self._config.output_path / prefix + documents = [ + np.array(item["input_ids"], dtype=self._config.dataset.data_type.numpy) + for item in tqdm(shard_dataset, desc=f"Saving shard {shard_idx}", unit="docs") + ] + GPTMemmapDataset.write_dataset(prefix=shard_output_path, documents=documents) + dataset_dict = { + "prefix": prefix, + "num_documents": len(documents), + "num_tokens": sum(len(doc) for doc in documents), + } + return dataset_dict + + def run(self): + import datasets + import transformers + from tqdm import tqdm + + # Set transformers logging verbosity + transformers.logging.set_verbosity_error() + + # Disable disk space check if requested + if self._config.dataset.disable_disk_space_check: + datasets.builder.has_sufficient_disk_space = lambda needed_bytes, directory=".": True + + # Load tokenizer + self._tokenizer = Tokenizer(config=self.tokenizer) + + # Set data type if not provided + if self.dataset.data_type is None: + # Decide the datatype based on the tokenizer vocabulary size + vocab_size = self._tokenizer.vocab_size + if vocab_size <= np.iinfo(np.int16).max: + self.dataset.data_type = DataType.int16 + # elif vocab_size <= np.iinfo(np.uint16).max: + # self.dataset.data_type = DataType.uint16 # Not supported by Fast-LLM's DataType + elif vocab_size <= np.iinfo(np.int32).max: + self.dataset.data_type = DataType.int32 + else: + raise ValueError(f"Tokenizer vocabulary size {vocab_size} is too large. This is likely an error.") + + # Initialize distributed processing + if self._config.distributed.world_size > 1: + torch.distributed.init_process_group( + backend=self._config.distributed.backend, + rank=self._config.distributed.rank, + world_size=self._config.distributed.world_size, + ) + + # Prepare output directory + self._config.output_path.mkdir(parents=True, exist_ok=True) + + # Download dataset if necessary on rank 0 + download_path = self._config.output_path / "downloaded_dataset" + download_path_ok = download_path / "ok" + if self._config.distributed.rank == 0 and not download_path_ok.exists(): + datasets.load_dataset( + path=self._config.dataset.name_or_path, + name=self._config.dataset.config_name, + split=self._config.dataset.split, + num_proc=self._config.loading_workers, + trust_remote_code=self._config.dataset.trust_remote_code, + ).save_to_disk(download_path, num_proc=self._config.saving_workers) + download_path_ok.touch() + + # Synchronize processes to wait for the download to finish + if self._config.distributed.world_size > 1: + torch.distributed.barrier() + + # Load and shard the dataset on each rank + dataset = datasets.load_from_disk(download_path).shard( + num_shards=self._config.distributed.world_size, + index=self._config.distributed.rank, + ) + if self._config.dataset.field not in dataset.column_names: + raise ValueError(f"Dataset does not have field '{self._config.dataset.field}'.") + + # Tokenize the dataset in parallel + tokenized_dataset = dataset.map( + self._tokenize_batch, + batched=True, + num_proc=self._config.tokenize_workers, + desc="Tokenizing batches", + ) + + # Calculate total number of tokens + total_tokens = sum(tqdm(tokenized_dataset["num_tokens"], desc="Counting tokens", unit="tokens")) + + # Split dataset into shards based on number of tokens + num_shards = int(np.ceil(total_tokens / self._config.tokens_per_shard)) + shards = [ + (i, tokenized_dataset.shard(num_shards=num_shards, index=i)) + for i in tqdm(range(num_shards), desc="Creating shards") + ] + + # Use multiprocessing to save each shard in parallel on all ranks + with multiprocessing.Pool(processes=self._config.saving_workers) as pool: + dataset_dicts = pool.map(self._save_shard, shards) + + # Gather dataset_dicts from all ranks to rank 0 + if self._config.distributed.world_size > 1: + if self._config.distributed.rank == 0: + all_dataset_dicts = [None] * self._config.distributed.world_size + torch.distributed.gather_object(dataset_dicts, all_dataset_dicts, dst=0) + dataset_dicts = [item for sublist in all_dataset_dicts for item in sublist] + else: + torch.distributed.gather_object(dataset_dicts, [], dst=0) + + # Create a metadata file on rank 0 + if self._config.distributed.rank == 0: + total_tokens = sum(dataset_dict["num_tokens"] for dataset_dict in dataset_dicts) + for dataset_dict in dataset_dicts: + dataset_dict["weight"] = float(dataset_dict["num_tokens"]) / float(total_tokens) + output_file = self._config.output_path / "fast_llm_dataset.json" + json.dump({"datasets": dataset_dicts}, output_file.open("w")) + + # Finalize distributed processing + if self._config.distributed.world_size > 1: + torch.distributed.barrier() + torch.distributed.destroy_process_group() + + # Clean up downloaded dataset + if self._config.remove_downloads and self._config.distributed.rank == 0: + shutil.rmtree(download_path) diff --git a/fast_llm/data/prepare.py b/fast_llm/data/prepare.py deleted file mode 100644 index e69de29b..00000000 From a2ae05109f82d55507c86ad01be7a8d88d8d3b06 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Tue, 12 Nov 2024 09:12:37 -0500 Subject: [PATCH 24/30] address comments --- fast_llm/config.py | 4 +- fast_llm/data/auto.py | 6 +- fast_llm/data/config.py | 1 + fast_llm/data/gpt/prepare.py | 42 +++-- fast_llm/tools/prepare_dataset.py | 272 +----------------------------- 5 files changed, 27 insertions(+), 298 deletions(-) diff --git a/fast_llm/config.py b/fast_llm/config.py index 66d9c530..a3211de7 100644 --- a/fast_llm/config.py +++ b/fast_llm/config.py @@ -301,7 +301,7 @@ def __setattr__(self, key, value): # Allow setting the exact same object to facilitate setup of cross-dependencies. # Ex. allow re-setting cross-dependencies of already validated sub-configs. return - raise RuntimeError() + raise RuntimeError(f"Cannot set attribute `{key}` after validation.") super().__setattr__(key, value) def __delattr__(self, key): @@ -309,7 +309,7 @@ def __delattr__(self, key): Make the class read-only after validation. """ if getattr(self, "_validated", False): - raise RuntimeError() + raise RuntimeError(f"Cannot delete attribute `{key}` after validation.") super().__delattr__(key) def validate(self, *, _is_validating=False): diff --git a/fast_llm/data/auto.py b/fast_llm/data/auto.py index c7467fba..68205696 100644 --- a/fast_llm/data/auto.py +++ b/fast_llm/data/auto.py @@ -1,12 +1,12 @@ -from fast_llm.data.gpt.prepare import GPTDatasetPreparatorConfig +from fast_llm.data.gpt.prepare import GPTMemmapDatasetPreparatorConfig from fast_llm.utils import Registry dataset_preparator_registry = Registry( "DatasetPreparator", { - dataset_preparator.model_name: dataset_preparator + dataset_preparator.preparator_name: dataset_preparator for dataset_preparator in [ - GPTDatasetPreparatorConfig, + GPTMemmapDatasetPreparatorConfig, ] }, ) diff --git a/fast_llm/data/config.py b/fast_llm/data/config.py index 7add6092..89107670 100644 --- a/fast_llm/data/config.py +++ b/fast_llm/data/config.py @@ -256,5 +256,6 @@ def __init__(self, config: DatasetPreparatorConfig) -> None: config.validate() self._config = config + @abc.abstractmethod def run(self) -> None: raise NotImplementedError diff --git a/fast_llm/data/gpt/prepare.py b/fast_llm/data/gpt/prepare.py index 6a7ad6d0..fd1e82f5 100644 --- a/fast_llm/data/gpt/prepare.py +++ b/fast_llm/data/gpt/prepare.py @@ -15,7 +15,7 @@ @config_class -class GPTDatasetConfig(Config): +class GPTHuggingfaceDatasetConfig(Config): name_or_path: str = Field( desc="Name or path of the dataset.", hint=FieldHint.core, @@ -53,8 +53,7 @@ class GPTDatasetConfig(Config): @config_class() -class GPTDatasetPreparatorConfig(DatasetPreparatorConfig): - _abstract = False +class GPTMemmapDatasetPreparatorConfig(DatasetPreparatorConfig): preparator_name: typing.ClassVar[str] = "gpt_memmap" tokens_per_shard: int = Field( @@ -86,8 +85,8 @@ class GPTDatasetPreparatorConfig(DatasetPreparatorConfig): desc="Remove downloaded dataset after processing.", hint=FieldHint.optional, ) - dataset: GPTDatasetConfig = Field( - default_factory=GPTDatasetConfig, + dataset: GPTHuggingfaceDatasetConfig = Field( + default_factory=GPTHuggingfaceDatasetConfig, desc="Configuration for the dataset.", hint=FieldHint.feature, ) @@ -96,11 +95,6 @@ class GPTDatasetPreparatorConfig(DatasetPreparatorConfig): desc="Configuration for the tokenizer.", hint=FieldHint.feature, ) - _tokenizer: Tokenizer = Field( - init=False, - desc="The tokenizer instance.", - hint=FieldHint.derived, - ) def _validate(self): assert self.tokenizer.path is not None @@ -110,17 +104,19 @@ def _validate(self): @classmethod def get_dataset_preparator_class(cls): - return GPTDatasetPreparator + return GPTMemmapDatasetPreparator + +class GPTMemmapDatasetPreparator(DatasetPreparator): + _config: GPTMemmapDatasetPreparatorConfig + config_class = GPTMemmapDatasetPreparatorConfig -class GPTDatasetPreparator(DatasetPreparator): - _abstract = False - _config: GPTDatasetPreparatorConfig - config_class = GPTDatasetPreparatorConfig + _tokenizer: Tokenizer + _data_type: DataType def _tokenize_batch(self, batch): input_ids = [ - np.array(self._config._tokenizer.tokenize(text), dtype=self._config.dataset.data_type.numpy) + np.array(self._tokenizer.tokenize(text), dtype=self._data_type.numpy) for text in batch[self._config.dataset.field] ] num_tokens = [len(x) for x in input_ids] @@ -136,7 +132,7 @@ def _save_shard(self, args) -> dict: prefix = f"shard_{self._config.distributed.rank}_{shard_idx}" shard_output_path = self._config.output_path / prefix documents = [ - np.array(item["input_ids"], dtype=self._config.dataset.data_type.numpy) + np.array(item["input_ids"], dtype=self._data_type.numpy) for item in tqdm(shard_dataset, desc=f"Saving shard {shard_idx}", unit="docs") ] GPTMemmapDataset.write_dataset(prefix=shard_output_path, documents=documents) @@ -160,20 +156,22 @@ def run(self): datasets.builder.has_sufficient_disk_space = lambda needed_bytes, directory=".": True # Load tokenizer - self._tokenizer = Tokenizer(config=self.tokenizer) + self._tokenizer = Tokenizer(config=self._config.tokenizer) # Set data type if not provided - if self.dataset.data_type is None: + if self._config.dataset.data_type is None: # Decide the datatype based on the tokenizer vocabulary size vocab_size = self._tokenizer.vocab_size if vocab_size <= np.iinfo(np.int16).max: - self.dataset.data_type = DataType.int16 + self._data_type = DataType.int16 # elif vocab_size <= np.iinfo(np.uint16).max: - # self.dataset.data_type = DataType.uint16 # Not supported by Fast-LLM's DataType + # self._data_type = DataType.uint16 # Not supported by Fast-LLM's DataType elif vocab_size <= np.iinfo(np.int32).max: - self.dataset.data_type = DataType.int32 + self._data_type = DataType.int32 else: raise ValueError(f"Tokenizer vocabulary size {vocab_size} is too large. This is likely an error.") + else: + self._data_type = self._config.dataset.data_type # Initialize distributed processing if self._config.distributed.world_size > 1: diff --git a/fast_llm/tools/prepare_dataset.py b/fast_llm/tools/prepare_dataset.py index 3339636f..aafe2690 100644 --- a/fast_llm/tools/prepare_dataset.py +++ b/fast_llm/tools/prepare_dataset.py @@ -1,277 +1,7 @@ import argparse -import json -import shutil -import typing -import multiprocessing -import numpy as np -import torch.distributed - -from fast_llm.config import Config, Field, FieldHint, check_field, config_class -from fast_llm.data.config import TokenizerConfig -from fast_llm.data.gpt.memmap import GPTMemmapDataset -from fast_llm.data.tokenizer import Tokenizer -from fast_llm.engine.config_utils.data_type import DataType +from fast_llm.data.auto import dataset_preparator_registry from fast_llm.engine.config_utils.runnable import RunnableConfig -from fast_llm.utils import Assert, Registry - - - -@config_class -class GPTDatasetConfig(Config): - name_or_path: str = Field( - desc="Name or path of the dataset.", - hint=FieldHint.core, - ) - config_name: None | str = Field( - default=None, - desc="Specific configuration name for the dataset.", - hint=FieldHint.optional, - ) - split: str = Field( - default="train", - desc="Split of the dataset to use.", - hint=FieldHint.optional, - ) - field: str = Field( - default="text", - desc="Field of the dataset to use.", - hint=FieldHint.optional, - ) - data_type: DataType | None = Field( - default=None, - desc="Data type of the dataset field. If not provided, it will be inferred based on the tokenizer vocabulary size.", - hint=FieldHint.optional, - ) - trust_remote_code: bool = Field( - default=False, - desc="Trust remote code when downloading the dataset.", - hint=FieldHint.optional, - ) - disable_disk_space_check: bool = Field( - default=False, - desc="Disable disk space check. Useful for environments where disk space is not accurately reported.", - hint=FieldHint.optional, - ) - - -@config_class() -class GPTDatasetPreparatorConfig(DatasetPreparatorConfig): - _abstract = False - preparator_name: typing.ClassVar[str] = "gpt_memmap" - - tokens_per_shard: int = Field( - default=10**9, - desc="Approximate number of tokens per shard.", - hint=FieldHint.feature, - valid=check_field(Assert.geq, 10**5), - ) - loading_workers: int = Field( - default=1, - desc="Number of workers in load_dataset() call.", - hint=FieldHint.optional, - valid=check_field(Assert.geq, 1), - ) - tokenize_workers: int = Field( - default=1, - desc="Number of workers for tokenization.", - hint=FieldHint.optional, - valid=check_field(Assert.geq, 1), - ) - saving_workers: int = Field( - default=1, - desc="Number of processes for saving the data.", - hint=FieldHint.optional, - valid=check_field(Assert.geq, 1), - ) - remove_downloads: bool = Field( - default=False, - desc="Remove downloaded dataset after processing.", - hint=FieldHint.optional, - ) - dataset: GPTDatasetConfig = Field( - default_factory=GPTDatasetConfig, - desc="Configuration for the dataset.", - hint=FieldHint.feature, - ) - tokenizer: TokenizerConfig = Field( - default_factory=TokenizerConfig, - desc="Configuration for the tokenizer.", - hint=FieldHint.feature, - ) - _tokenizer: Tokenizer = Field( - init=False, - desc="The tokenizer instance.", - hint=FieldHint.derived, - ) - - def _validate(self): - assert self.tokenizer.path is not None - if self.dataset.data_type is not None: - Assert.incl(self.dataset.data_type.numpy, GPTMemmapDataset._DTYPES.values()) - super()._validate() - - @classmethod - def get_dataset_preparator_class(cls): - return GPTDatasetPreparator - - -class GPTDatasetPreparator(DatasetPreparator): - _abstract = False - _config: GPTDatasetPreparatorConfig - config_class = GPTDatasetPreparatorConfig - - def _tokenize_batch(self, batch): - input_ids = [ - np.array(self._config._tokenizer.tokenize(text), dtype=self._config.dataset.data_type.numpy) - for text in batch[self._config.dataset.field] - ] - num_tokens = [len(x) for x in input_ids] - return { - "input_ids": input_ids, - "num_tokens": num_tokens, - } - - def _save_shard(self, args) -> dict: - from tqdm import tqdm - - shard_idx, shard_dataset = args - prefix = f"shard_{self._config.distributed.rank}_{shard_idx}" - shard_output_path = self._config.output_path / prefix - documents = [ - np.array(item["input_ids"], dtype=self._config.dataset.data_type.numpy) - for item in tqdm(shard_dataset, desc=f"Saving shard {shard_idx}", unit="docs") - ] - GPTMemmapDataset.write_dataset(prefix=shard_output_path, documents=documents) - dataset_dict = { - "prefix": prefix, - "num_documents": len(documents), - "num_tokens": sum(len(doc) for doc in documents), - } - return dataset_dict - - def run(self): - import datasets - import transformers - from tqdm import tqdm - - # Set transformers logging verbosity - transformers.logging.set_verbosity_error() - - # Disable disk space check if requested - if self._config.dataset.disable_disk_space_check: - datasets.builder.has_sufficient_disk_space = lambda needed_bytes, directory=".": True - - # Load tokenizer - self._tokenizer = Tokenizer(config=self.tokenizer) - - # Set data type if not provided - if self.dataset.data_type is None: - # Decide the datatype based on the tokenizer vocabulary size - vocab_size = self._tokenizer.vocab_size - if vocab_size <= np.iinfo(np.int16).max: - self.dataset.data_type = DataType.int16 - # elif vocab_size <= np.iinfo(np.uint16).max: - # self.dataset.data_type = DataType.uint16 # Not supported by Fast-LLM's DataType - elif vocab_size <= np.iinfo(np.int32).max: - self.dataset.data_type = DataType.int32 - else: - raise ValueError(f"Tokenizer vocabulary size {vocab_size} is too large. This is likely an error.") - - # Initialize distributed processing - if self._config.distributed.world_size > 1: - torch.distributed.init_process_group( - backend=self._config.distributed.backend, - rank=self._config.distributed.rank, - world_size=self._config.distributed.world_size, - ) - - # Prepare output directory - self._config.output_path.mkdir(parents=True, exist_ok=True) - - # Download dataset if necessary on rank 0 - download_path = self._config.output_path / "downloaded_dataset" - download_path_ok = download_path / "ok" - if self._config.distributed.rank == 0 and not download_path_ok.exists(): - datasets.load_dataset( - path=self._config.dataset.name_or_path, - name=self._config.dataset.config_name, - split=self._config.dataset.split, - num_proc=self._config.loading_workers, - trust_remote_code=self._config.dataset.trust_remote_code, - ).save_to_disk(download_path, num_proc=self._config.saving_workers) - download_path_ok.touch() - - # Synchronize processes to wait for the download to finish - if self._config.distributed.world_size > 1: - torch.distributed.barrier() - - # Load and shard the dataset on each rank - dataset = datasets.load_from_disk(download_path).shard( - num_shards=self._config.distributed.world_size, - index=self._config.distributed.rank, - ) - if self._config.dataset.field not in dataset.column_names: - raise ValueError(f"Dataset does not have field '{self._config.dataset.field}'.") - - # Tokenize the dataset in parallel - tokenized_dataset = dataset.map( - self._tokenize_batch, - batched=True, - num_proc=self._config.tokenize_workers, - desc="Tokenizing batches", - ) - - # Calculate total number of tokens - total_tokens = sum(tqdm(tokenized_dataset["num_tokens"], desc="Counting tokens", unit="tokens")) - - # Split dataset into shards based on number of tokens - num_shards = int(np.ceil(total_tokens / self._config.tokens_per_shard)) - shards = [ - (i, tokenized_dataset.shard(num_shards=num_shards, index=i)) - for i in tqdm(range(num_shards), desc="Creating shards") - ] - - # Use multiprocessing to save each shard in parallel on all ranks - with multiprocessing.Pool(processes=self._config.saving_workers) as pool: - dataset_dicts = pool.map(self._save_shard, shards) - - # Gather dataset_dicts from all ranks to rank 0 - if self._config.distributed.world_size > 1: - if self._config.distributed.rank == 0: - all_dataset_dicts = [None] * self._config.distributed.world_size - torch.distributed.gather_object(dataset_dicts, all_dataset_dicts, dst=0) - dataset_dicts = [item for sublist in all_dataset_dicts for item in sublist] - else: - torch.distributed.gather_object(dataset_dicts, [], dst=0) - - # Create a metadata file on rank 0 - if self._config.distributed.rank == 0: - total_tokens = sum(dataset_dict["num_tokens"] for dataset_dict in dataset_dicts) - for dataset_dict in dataset_dicts: - dataset_dict["weight"] = float(dataset_dict["num_tokens"]) / float(total_tokens) - output_file = self._config.output_path / "fast_llm_dataset.json" - json.dump({"datasets": dataset_dicts}, output_file.open("w")) - - # Finalize distributed processing - if self._config.distributed.world_size > 1: - torch.distributed.barrier() - torch.distributed.destroy_process_group() - - # Clean up downloaded dataset - if self._config.remove_downloads and self._config.distributed.rank == 0: - shutil.rmtree(download_path) - - -dataset_preparator_registry = Registry( - "DatasetPreparator", - { - dataset_preparator.model_name: dataset_preparator - for dataset_preparator in [ - GPTDatasetPreparatorConfig, - ] - }, -) class PrepareDatasetConfig(RunnableConfig): From 81162b33f45debdc2ba7716a7e8686302ce25c7f Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Tue, 12 Nov 2024 16:16:32 -0500 Subject: [PATCH 25/30] fixes --- Dockerfile | 26 +-- fast_llm/config.py | 10 +- fast_llm/data/auto.py | 2 +- fast_llm/data/config.py | 73 -------- fast_llm/data/gpt/memmap.py | 22 +-- fast_llm/data/preparator/__init__.py | 0 fast_llm/data/preparator/config.py | 34 ++++ .../data/preparator/gpt_memmap/__init__.py | 0 fast_llm/data/preparator/gpt_memmap/config.py | 162 ++++++++++++++++++ .../{gpt => preparator/gpt_memmap}/prepare.py | 114 +----------- fast_llm/engine/config_utils/data_type.py | 4 +- fast_llm/tools/cli.py | 4 +- tests/test_config.py | 14 +- tests/test_memmap_dataset.py | 3 +- 14 files changed, 254 insertions(+), 214 deletions(-) create mode 100644 fast_llm/data/preparator/__init__.py create mode 100644 fast_llm/data/preparator/config.py create mode 100644 fast_llm/data/preparator/gpt_memmap/__init__.py create mode 100644 fast_llm/data/preparator/gpt_memmap/config.py rename fast_llm/data/{gpt => preparator/gpt_memmap}/prepare.py (63%) diff --git a/Dockerfile b/Dockerfile index 64d2bb36..2a2eedc3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,33 +1,37 @@ # syntax=docker/dockerfile:1.7-labs -FROM nvcr.io/nvidia/pytorch:24.07-py3 as base +FROM nvcr.io/nvidia/pytorch:24.07-py3 -# Install dependencies +# Install dependencies. RUN apt-get update \ && apt-get install --no-install-recommends -y acl python3.10-venv git-lfs \ && rm -rf /var/lib/apt/lists/* \ && git lfs install -# Set the working directory -WORKDIR /app +# Set the permission to 777 for all files and directories in `/app` and `/home`: +# 1. Create directories explicitly because docker use the wrong permission for explicit creation. +# 2. For the rest, set the default ACL to 777 for all users. +RUN mkdir -m 777 /app /app/Megatron-LM /app/examples /app/fast_llm /app/tests /app/tools \ + && chmod -R 777 /home \ + && setfacl -d -m u::rwx,g::rwx,o::rwx /app /home -# Set the default ACL for /app to rwx for all users -RUN setfacl -d -m u::rwx,g::rwx,o::rwx /app +# Set the working directory. +WORKDIR /app -# Environment settings for the virtual environment +# Environment settings for the virtual environment. ENV VIRTUAL_ENV=/app/venv ENV PATH="$VIRTUAL_ENV/bin:$PATH" -# Create the virtual environment with system site packages +# Create the virtual environment with system site packages. RUN python3 -m venv $VIRTUAL_ENV --system-site-packages -# Copy dependency files with universal write permissions for all users +# Copy dependency files with universal write permissions for all users. COPY --chmod=777 setup.py setup.cfg pyproject.toml ./ COPY --chmod=777 ./fast_llm/csrc/ fast_llm/csrc/ -# Install dependencies within the virtual environment +# Install dependencies within the virtual environment. RUN pip install --no-cache-dir --no-build-isolation -e ".[CORE,OPTIONAL,DEV]" -# Copy remaining source code with universal write permissions +# Copy the remaining source code with universal write permissions. COPY --chmod=777 ./Megatron-LM Megatron-LM COPY --chmod=777 ./examples examples COPY --chmod=777 ./tests tests diff --git a/fast_llm/config.py b/fast_llm/config.py index a3211de7..a7b237c0 100644 --- a/fast_llm/config.py +++ b/fast_llm/config.py @@ -301,7 +301,10 @@ def __setattr__(self, key, value): # Allow setting the exact same object to facilitate setup of cross-dependencies. # Ex. allow re-setting cross-dependencies of already validated sub-configs. return - raise RuntimeError(f"Cannot set attribute `{key}` after validation.") + raise RuntimeError( + f"Cannot set attribute `{key}`" + f" in configuration class `{get_type_name(type(self))}` after validation." + ) super().__setattr__(key, value) def __delattr__(self, key): @@ -309,7 +312,10 @@ def __delattr__(self, key): Make the class read-only after validation. """ if getattr(self, "_validated", False): - raise RuntimeError(f"Cannot delete attribute `{key}` after validation.") + raise RuntimeError( + f"Cannot delete attribute `{key}`" + f" in configuration class `{get_type_name(type(self))}` after validation." + ) super().__delattr__(key) def validate(self, *, _is_validating=False): diff --git a/fast_llm/data/auto.py b/fast_llm/data/auto.py index 68205696..1a5b7986 100644 --- a/fast_llm/data/auto.py +++ b/fast_llm/data/auto.py @@ -1,4 +1,4 @@ -from fast_llm.data.gpt.prepare import GPTMemmapDatasetPreparatorConfig +from fast_llm.data.preparator.gpt_memmap.config import GPTMemmapDatasetPreparatorConfig from fast_llm.utils import Registry dataset_preparator_registry = Registry( diff --git a/fast_llm/data/config.py b/fast_llm/data/config.py index 89107670..59476eb4 100644 --- a/fast_llm/data/config.py +++ b/fast_llm/data/config.py @@ -1,12 +1,9 @@ import abc -import argparse import enum -import os import pathlib import typing from fast_llm.config import Config, Field, FieldHint, check_field, config_class, skip_valid_if_none -from fast_llm.engine.config_utils.runnable import RunnableConfig from fast_llm.engine.distributed.config import PhaseType from fast_llm.engine.schedule.config import BatchConfig from fast_llm.utils import Assert @@ -189,73 +186,3 @@ def __getitem__(self, index: int): @abc.abstractmethod def __len__(self): pass - - -@config_class -class _DistributedConfig(Config): - # TODO: Unify with fast_llm.engine.distributed.config.DistributedConfig - - default_world_size: typing.ClassVar[int] = int(os.environ.get("WORLD_SIZE", 1)) - default_rank: typing.ClassVar[int] = int(os.environ.get("RANK", 0)) - world_size: int = Field( - default=None, - desc="Size of the world group. Typically provided by torchrun or equivalent through the `WORLD_SIZE` environment variable.", - hint=FieldHint.expert, - valid=check_field(Assert.gt, 0), - ) - rank: int = Field( - default=None, - desc="Rank of the local process. Typically provided by torchrun or equivalent through the `RANK` environment variable.", - hint=FieldHint.expert, - valid=check_field(Assert.geq, 0), - ) - backend: str = Field( - default="gloo", - desc="Distributed backend to use.", - hint=FieldHint.optional, - ) - - def _validate(self): - if self.world_size is None: - self.world_size = self.default_world_size - if self.rank is None: - self.rank = self.default_rank - super()._validate() - Assert.in_range(self.rank, 0, self.world_size) - - -@config_class() -class DatasetPreparatorConfig(RunnableConfig): - preparator_name: typing.ClassVar[str] - - output_path: pathlib.Path = Field( - desc="Output directory for the processed dataset.", - hint=FieldHint.core, - ) - distributed: _DistributedConfig = Field( - default_factory=_DistributedConfig, - desc="Configuration for distributed processing.", - hint=FieldHint.feature, - ) - - @classmethod - def get_dataset_preparator_class(cls) -> typing.Type["DatasetPreparator"]: - raise NotImplementedError - - def _get_runnable(self, parsed: argparse.Namespace) -> typing.Callable[[], None]: - dataset_preparator = self.get_dataset_preparator_class()(config=self) - return dataset_preparator.run - - -class DatasetPreparator(abc.ABC): - _config: DatasetPreparatorConfig - config_class: typing.ClassVar[type[DatasetPreparatorConfig]] = DatasetPreparatorConfig - - def __init__(self, config: DatasetPreparatorConfig) -> None: - Assert.custom(isinstance, config, self.config_class) - config.validate() - self._config = config - - @abc.abstractmethod - def run(self) -> None: - raise NotImplementedError diff --git a/fast_llm/data/gpt/memmap.py b/fast_llm/data/gpt/memmap.py index 0ff4857b..a2a57271 100644 --- a/fast_llm/data/gpt/memmap.py +++ b/fast_llm/data/gpt/memmap.py @@ -4,6 +4,8 @@ import numpy as np from fast_llm.data.gpt.dataset import GPTIndexedDataset +from fast_llm.data.preparator.gpt_memmap.config import MEMMAP_DTYPES, MEMMAP_DTYPES_INV, MEMMAP_INDEX_HEADER +from fast_llm.engine.config_utils.data_type import DataType from fast_llm.utils import Assert, div, padded_cumsum @@ -16,18 +18,6 @@ class GPTMemmapDataset(GPTIndexedDataset): See https://github.com/NVIDIA/Megatron-LM?tab=readme-ov-file#data-preprocessing for more details. """ - _DTYPES = { - 1: np.uint8, - 2: np.int8, - 3: np.int16, - 4: np.int32, - 5: np.int64, - 6: np.float32, - 7: np.float64, - 8: np.uint16, - } - _INDEX_HEADER = b"MMIDIDX\x00\x00" - def __init__(self, name: str, prefix: pathlib.Path | str): self._init(name, prefix) @@ -37,10 +27,10 @@ def _init(self, name: str, prefix: pathlib.Path | str): self._prefix = pathlib.Path(prefix) with self._prefix.with_suffix(".idx").open("rb") as stream: - Assert.eq(stream.read(9), self._INDEX_HEADER) + Assert.eq(stream.read(9), MEMMAP_INDEX_HEADER) Assert.eq(struct.unpack(" type["DatasetPreparator"]: + raise NotImplementedError + + def _get_runnable(self, parsed: argparse.Namespace) -> typing.Callable[[], None]: + dataset_preparator = self.get_dataset_preparator_class()(config=self) + return dataset_preparator.run + + +class DatasetPreparator(abc.ABC): + _config: DatasetPreparatorConfig + config_class: typing.ClassVar[type[DatasetPreparatorConfig]] = DatasetPreparatorConfig + + def __init__(self, config: DatasetPreparatorConfig) -> None: + Assert.custom(isinstance, config, self.config_class) + config.validate() + self._config = config + + @abc.abstractmethod + def run(self) -> None: + raise NotImplementedError diff --git a/fast_llm/data/preparator/gpt_memmap/__init__.py b/fast_llm/data/preparator/gpt_memmap/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fast_llm/data/preparator/gpt_memmap/config.py b/fast_llm/data/preparator/gpt_memmap/config.py new file mode 100644 index 00000000..9188a14c --- /dev/null +++ b/fast_llm/data/preparator/gpt_memmap/config.py @@ -0,0 +1,162 @@ +import os +import pathlib +import typing + +from fast_llm.config import Config, Field, FieldHint, check_field, config_class +from fast_llm.data.config import TokenizerConfig +from fast_llm.data.preparator.config import DatasetPreparatorConfig +from fast_llm.engine.config_utils.data_type import DataType +from fast_llm.utils import Assert + +MEMMAP_DTYPES = { + 1: DataType.uint8, + 2: DataType.int8, + 3: DataType.int16, + 4: DataType.int32, + 5: DataType.int64, + 6: DataType.float32, + 7: DataType.float64, + 8: DataType.uint16, +} +MEMMAP_DTYPES_INV = {y: x for x, y in MEMMAP_DTYPES.items()} +MEMMAP_INDEX_HEADER = b"MMIDIDX\x00\x00" + + +@config_class +class GPTHuggingfaceDatasetConfig(Config): + path: str = Field( + default=None, + desc="Name or path of the dataset.", + hint=FieldHint.core, + ) + config_name: None | str = Field( + default=None, + desc="Specific configuration name for the dataset.", + hint=FieldHint.optional, + ) + split: str = Field( + default="train", + desc="Split of the dataset to use.", + hint=FieldHint.optional, + ) + field: str = Field( + default="text", + desc="Field of the dataset to use.", + hint=FieldHint.optional, + ) + data_type: DataType | None = Field( + default=None, + desc="Data type of the dataset field." + " If not provided, it will be inferred based on the tokenizer vocabulary size.", + hint=FieldHint.optional, + ) + trust_remote_code: bool = Field( + default=False, + desc="Trust remote code when downloading the dataset.", + hint=FieldHint.optional, + ) + disable_disk_space_check: bool = Field( + default=False, + desc="Disable disk space check. Useful for environments where disk space is not accurately reported.", + hint=FieldHint.optional, + ) + + +@config_class +class DatasetPreparatorDistributedConfig(Config): + # TODO: Unify with fast_llm.engine.distributed.config.DistributedConfig + + default_world_size: typing.ClassVar[int] = int(os.environ.get("WORLD_SIZE", 1)) + default_rank: typing.ClassVar[int] = int(os.environ.get("RANK", 0)) + world_size: int = Field( + default=None, + desc="Size of the world group. Typically provided by torchrun or equivalent through the `WORLD_SIZE` environment variable.", + hint=FieldHint.expert, + valid=check_field(Assert.gt, 0), + ) + rank: int = Field( + default=None, + desc="Rank of the local process. Typically provided by torchrun or equivalent through the `RANK` environment variable.", + hint=FieldHint.expert, + valid=check_field(Assert.geq, 0), + ) + backend: str = Field( + default="gloo", + desc="Distributed backend to use.", + hint=FieldHint.optional, + ) + + def _validate(self): + if self.world_size is None: + self.world_size = self.default_world_size + if self.rank is None: + self.rank = self.default_rank + super()._validate() + Assert.in_range(self.rank, 0, self.world_size) + + +@config_class() +class GPTMemmapDatasetPreparatorConfig(DatasetPreparatorConfig): + preparator_name: typing.ClassVar[str] = "gpt_memmap" + + output_path: pathlib.Path = Field( + default=None, + desc="Output directory for the processed dataset.", + hint=FieldHint.core, + ) + distributed: DatasetPreparatorDistributedConfig = Field( + default_factory=DatasetPreparatorDistributedConfig, + desc="Configuration for distributed processing.", + hint=FieldHint.feature, + ) + tokens_per_shard: int = Field( + default=10**9, + desc="Approximate number of tokens per shard.", + hint=FieldHint.feature, + valid=check_field(Assert.geq, 10**5), + ) + loading_workers: int = Field( + default=1, + desc="Number of workers in load_dataset() call.", + hint=FieldHint.optional, + valid=check_field(Assert.geq, 1), + ) + tokenize_workers: int = Field( + default=1, + desc="Number of workers for tokenization.", + hint=FieldHint.optional, + valid=check_field(Assert.geq, 1), + ) + saving_workers: int = Field( + default=1, + desc="Number of processes for saving the data.", + hint=FieldHint.optional, + valid=check_field(Assert.geq, 1), + ) + remove_downloads: bool = Field( + default=False, + desc="Remove downloaded dataset after processing.", + hint=FieldHint.optional, + ) + dataset: GPTHuggingfaceDatasetConfig = Field( + default_factory=GPTHuggingfaceDatasetConfig, + desc="Configuration for the dataset.", + hint=FieldHint.feature, + ) + tokenizer: TokenizerConfig = Field( + default_factory=TokenizerConfig, + desc="Configuration for the tokenizer.", + hint=FieldHint.feature, + ) + + def _validate(self): + assert self.tokenizer.path is not None + if self.dataset.data_type is not None: + Assert.incl(DataType.from_numpy(self.dataset.data_type.numpy), MEMMAP_DTYPES_INV) + super()._validate() + + @classmethod + def get_dataset_preparator_class(cls): + from fast_llm.data.preparator.gpt_memmap.prepare import GPTMemmapDatasetPreparator + + return GPTMemmapDatasetPreparator diff --git a/fast_llm/data/gpt/prepare.py b/fast_llm/data/preparator/gpt_memmap/prepare.py similarity index 63% rename from fast_llm/data/gpt/prepare.py rename to fast_llm/data/preparator/gpt_memmap/prepare.py index fd1e82f5..c51bd4a7 100644 --- a/fast_llm/data/gpt/prepare.py +++ b/fast_llm/data/preparator/gpt_memmap/prepare.py @@ -1,110 +1,18 @@ import json import multiprocessing import shutil -import typing +import datasets import numpy as np import torch.distributed +import tqdm +import transformers -from fast_llm.config import Config, Field, FieldHint, check_field, config_class -from fast_llm.data.config import DatasetPreparator, DatasetPreparatorConfig, TokenizerConfig from fast_llm.data.gpt.memmap import GPTMemmapDataset +from fast_llm.data.preparator.config import DatasetPreparator +from fast_llm.data.preparator.gpt_memmap.config import GPTMemmapDatasetPreparatorConfig from fast_llm.data.tokenizer import Tokenizer from fast_llm.engine.config_utils.data_type import DataType -from fast_llm.utils import Assert - - -@config_class -class GPTHuggingfaceDatasetConfig(Config): - name_or_path: str = Field( - desc="Name or path of the dataset.", - hint=FieldHint.core, - ) - config_name: None | str = Field( - default=None, - desc="Specific configuration name for the dataset.", - hint=FieldHint.optional, - ) - split: str = Field( - default="train", - desc="Split of the dataset to use.", - hint=FieldHint.optional, - ) - field: str = Field( - default="text", - desc="Field of the dataset to use.", - hint=FieldHint.optional, - ) - data_type: DataType | None = Field( - default=None, - desc="Data type of the dataset field. If not provided, it will be inferred based on the tokenizer vocabulary size.", - hint=FieldHint.optional, - ) - trust_remote_code: bool = Field( - default=False, - desc="Trust remote code when downloading the dataset.", - hint=FieldHint.optional, - ) - disable_disk_space_check: bool = Field( - default=False, - desc="Disable disk space check. Useful for environments where disk space is not accurately reported.", - hint=FieldHint.optional, - ) - - -@config_class() -class GPTMemmapDatasetPreparatorConfig(DatasetPreparatorConfig): - preparator_name: typing.ClassVar[str] = "gpt_memmap" - - tokens_per_shard: int = Field( - default=10**9, - desc="Approximate number of tokens per shard.", - hint=FieldHint.feature, - valid=check_field(Assert.geq, 10**5), - ) - loading_workers: int = Field( - default=1, - desc="Number of workers in load_dataset() call.", - hint=FieldHint.optional, - valid=check_field(Assert.geq, 1), - ) - tokenize_workers: int = Field( - default=1, - desc="Number of workers for tokenization.", - hint=FieldHint.optional, - valid=check_field(Assert.geq, 1), - ) - saving_workers: int = Field( - default=1, - desc="Number of processes for saving the data.", - hint=FieldHint.optional, - valid=check_field(Assert.geq, 1), - ) - remove_downloads: bool = Field( - default=False, - desc="Remove downloaded dataset after processing.", - hint=FieldHint.optional, - ) - dataset: GPTHuggingfaceDatasetConfig = Field( - default_factory=GPTHuggingfaceDatasetConfig, - desc="Configuration for the dataset.", - hint=FieldHint.feature, - ) - tokenizer: TokenizerConfig = Field( - default_factory=TokenizerConfig, - desc="Configuration for the tokenizer.", - hint=FieldHint.feature, - ) - - def _validate(self): - assert self.tokenizer.path is not None - if self.dataset.data_type is not None: - Assert.incl(self.dataset.data_type.numpy, GPTMemmapDataset._DTYPES.values()) - super()._validate() - - @classmethod - def get_dataset_preparator_class(cls): - return GPTMemmapDatasetPreparator class GPTMemmapDatasetPreparator(DatasetPreparator): @@ -126,14 +34,13 @@ def _tokenize_batch(self, batch): } def _save_shard(self, args) -> dict: - from tqdm import tqdm shard_idx, shard_dataset = args prefix = f"shard_{self._config.distributed.rank}_{shard_idx}" shard_output_path = self._config.output_path / prefix documents = [ np.array(item["input_ids"], dtype=self._data_type.numpy) - for item in tqdm(shard_dataset, desc=f"Saving shard {shard_idx}", unit="docs") + for item in tqdm.tqdm(shard_dataset, desc=f"Saving shard {shard_idx}", unit="docs") ] GPTMemmapDataset.write_dataset(prefix=shard_output_path, documents=documents) dataset_dict = { @@ -144,9 +51,6 @@ def _save_shard(self, args) -> dict: return dataset_dict def run(self): - import datasets - import transformers - from tqdm import tqdm # Set transformers logging verbosity transformers.logging.set_verbosity_error() @@ -189,7 +93,7 @@ def run(self): download_path_ok = download_path / "ok" if self._config.distributed.rank == 0 and not download_path_ok.exists(): datasets.load_dataset( - path=self._config.dataset.name_or_path, + path=self._config.dataset.path, name=self._config.dataset.config_name, split=self._config.dataset.split, num_proc=self._config.loading_workers, @@ -218,13 +122,13 @@ def run(self): ) # Calculate total number of tokens - total_tokens = sum(tqdm(tokenized_dataset["num_tokens"], desc="Counting tokens", unit="tokens")) + total_tokens = sum(tqdm.tqdm(tokenized_dataset["num_tokens"], desc="Counting tokens", unit="tokens")) # Split dataset into shards based on number of tokens num_shards = int(np.ceil(total_tokens / self._config.tokens_per_shard)) shards = [ (i, tokenized_dataset.shard(num_shards=num_shards, index=i)) - for i in tqdm(range(num_shards), desc="Creating shards") + for i in tqdm.tqdm(range(num_shards), desc="Creating shards") ] # Use multiprocessing to save each shard in parallel on all ranks diff --git a/fast_llm/engine/config_utils/data_type.py b/fast_llm/engine/config_utils/data_type.py index 25aa1ea4..f4ae9307 100644 --- a/fast_llm/engine/config_utils/data_type.py +++ b/fast_llm/engine/config_utils/data_type.py @@ -25,6 +25,7 @@ class DataType(str, enum.Enum): int16 = "int16" int8 = "int8" uint8 = "uint8" + uint16 = "uint16" @classmethod def _missing_(cls, dtype: str) -> "DataType": @@ -128,8 +129,9 @@ def _set_numpy_dtype_map(): DataType.int16: np.int16, DataType.int8: np.int8, DataType.uint8: np.uint8, + DataType.uint16: np.uint16, } - _TORCH_DTYPE_MAP_INV = {y: x for x, y in _NUMPY_DTYPE_MAP.items()} + _NUMPY_DTYPE_MAP_INV = {y: x for x, y in _NUMPY_DTYPE_MAP.items()} _TRITON_DTYPE_MAP: dict[DataType, "tl.dtype"] = {} diff --git a/fast_llm/tools/cli.py b/fast_llm/tools/cli.py index d3ac5e6d..e9df18ed 100644 --- a/fast_llm/tools/cli.py +++ b/fast_llm/tools/cli.py @@ -15,14 +15,14 @@ def fast_llm(args=None): # (Pre-)configure logging configure_logging() parser = argparse.ArgumentParser(add_help=False) - parser.add_argument("subcommand", choices=["train", "convert", "prepare_dataset"]) + parser.add_argument("subcommand", choices=["train", "convert", "prepare"]) parsed, unparsed = parser.parse_known_args(args) try: if parsed.subcommand == "train": from fast_llm.tools.train import CliTrainingConfig as Runnable elif parsed.subcommand == "convert": from fast_llm.tools.convert import ConversionConfig as Runnable - elif parsed.subcommand == "prepare_dataset": + elif parsed.subcommand == "prepare": from fast_llm.tools.prepare_dataset import PrepareDatasetConfig as Runnable else: raise RuntimeError("Unknown subcommand") diff --git a/tests/test_config.py b/tests/test_config.py index a9f2aeaf..c382aedb 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -6,7 +6,7 @@ from fast_llm.models.auto import trainer_registry -def test_validate_without_import(): +def run_without_import(cmd: str): # Make sure validation imports only the bare minimum. # Run the test in a separate process since lots of things are already imported in this one. repo_path = pathlib.Path(__file__).parents[1].resolve() @@ -22,7 +22,7 @@ def test_validate_without_import(): # We still want to enable imports from within Fast-llm f"sys.path.append('{repo_path}')", "from fast_llm.tools.cli import fast_llm as main", - "main(['train', 'gpt', '-v'])", + cmd, ] ), ] @@ -32,6 +32,16 @@ def test_validate_without_import(): raise RuntimeError(f"Process failed with return code {completed_proc.returncode}") +def test_validate_train_gpt_without_import(): + run_without_import("main(['train', 'gpt', '-v'])") + + +def test_validate_prepare_gpt_memmap_without_import(): + run_without_import( + "main(['prepare', 'gpt_memmap', '-v', 'dataset.path=test', 'output_path=test', 'tokenizer.path=test'])" + ) + + def test_validate_example_config(): fast_llm_config_dict = yaml.safe_load( (pathlib.Path(__file__).parents[1] / "examples" / "mistral-4-node-benchmark.yaml").read_text() diff --git a/tests/test_memmap_dataset.py b/tests/test_memmap_dataset.py index 8cef40a8..414f7acc 100644 --- a/tests/test_memmap_dataset.py +++ b/tests/test_memmap_dataset.py @@ -8,6 +8,7 @@ import pytest from fast_llm.data.gpt.memmap import GPTMemmapDataset +from fast_llm.data.preparator.gpt_memmap.config import MEMMAP_DTYPES def dtype_arrays(dtype: np.dtype, min_size: int = 1, max_size: int = 100) -> hypothesis.strategies.SearchStrategy: @@ -18,7 +19,7 @@ def dtype_arrays(dtype: np.dtype, min_size: int = 1, max_size: int = 100) -> hyp ) -@pytest.mark.parametrize("dtype", GPTMemmapDataset._DTYPES.values()) +@pytest.mark.parametrize("dtype", MEMMAP_DTYPES.values()) def test_gpt_memmap_dataset(dtype): @hypothesis.given(documents=dtype_arrays(dtype)) def inner_test(documents): From a134a520c54ea9bfbb54936be6d559802996f867 Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Tue, 12 Nov 2024 16:18:23 -0500 Subject: [PATCH 26/30] fix --- tests/test_memmap_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_memmap_dataset.py b/tests/test_memmap_dataset.py index 414f7acc..b51b3b92 100644 --- a/tests/test_memmap_dataset.py +++ b/tests/test_memmap_dataset.py @@ -1,5 +1,5 @@ import pathlib -from tempfile import TemporaryDirectory +import tempfile import hypothesis import hypothesis.extra.numpy @@ -23,7 +23,7 @@ def dtype_arrays(dtype: np.dtype, min_size: int = 1, max_size: int = 100) -> hyp def test_gpt_memmap_dataset(dtype): @hypothesis.given(documents=dtype_arrays(dtype)) def inner_test(documents): - with TemporaryDirectory() as temp_dir: + with tempfile.TemporaryDirectory() as temp_dir: prefix = pathlib.Path(temp_dir) GPTMemmapDataset.write_dataset(prefix=prefix, documents=documents) dataset = GPTMemmapDataset(name="foo", prefix=prefix) From fbb011a4f527d232105ab54e02cd58a897f44354 Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Tue, 12 Nov 2024 18:17:00 -0500 Subject: [PATCH 27/30] No venv --- Dockerfile | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2a2eedc3..b86aa749 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,22 +7,22 @@ RUN apt-get update \ && rm -rf /var/lib/apt/lists/* \ && git lfs install -# Set the permission to 777 for all files and directories in `/app` and `/home`: -# 1. Create directories explicitly because docker use the wrong permission for explicit creation. -# 2. For the rest, set the default ACL to 777 for all users. -RUN mkdir -m 777 /app /app/Megatron-LM /app/examples /app/fast_llm /app/tests /app/tools \ - && chmod -R 777 /home \ - && setfacl -d -m u::rwx,g::rwx,o::rwx /app /home - # Set the working directory. WORKDIR /app - -# Environment settings for the virtual environment. -ENV VIRTUAL_ENV=/app/venv -ENV PATH="$VIRTUAL_ENV/bin:$PATH" - -# Create the virtual environment with system site packages. -RUN python3 -m venv $VIRTUAL_ENV --system-site-packages +# Set the permission to 777 for all files and directories in `/app`, `/home` and python install directories: +# 1. Create directories explicitly because docker use the wrong permission for explicit creation. +# 2. For the rest, set the default ACL to 777 for all users. +RUN mkdir -m 777 /app/Megatron-LM /app/examples /app/fast_llm /app/tests /app/tools \ + && setfacl -m d:u::rwx,d:g::rwx,d:o::rwx,u::rwx,g::rwx,o::rwx \ + /app \ + /home \ + /usr \ + /usr/local \ + /usr/local/bin \ + /usr/local/lib \ + /usr/local/lib/python3.10 \ + /usr/local/lib/python3.10/dist-packages \ + /usr/local/lib/python3.10/dist-packages/__pycache__ # Copy dependency files with universal write permissions for all users. COPY --chmod=777 setup.py setup.cfg pyproject.toml ./ From 4827f492ff668830afe942fa2e48f677876ece23 Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Tue, 12 Nov 2024 18:31:32 -0500 Subject: [PATCH 28/30] Faster tests --- setup.cfg | 1 - tests/test_memmap_dataset.py | 30 +++++++++--------------------- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/setup.cfg b/setup.cfg index 68f3a064..51f87ac5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -45,7 +45,6 @@ OPTIONAL = DEV = pytest>=8.3.2 pytest-depends>=1.0.1 - hypothesis>=6.118.1 # Required for building the documentation DOCS = diff --git a/tests/test_memmap_dataset.py b/tests/test_memmap_dataset.py index b51b3b92..f516083e 100644 --- a/tests/test_memmap_dataset.py +++ b/tests/test_memmap_dataset.py @@ -1,9 +1,6 @@ import pathlib import tempfile -import hypothesis -import hypothesis.extra.numpy -import hypothesis.strategies import numpy as np import pytest @@ -11,23 +8,14 @@ from fast_llm.data.preparator.gpt_memmap.config import MEMMAP_DTYPES -def dtype_arrays(dtype: np.dtype, min_size: int = 1, max_size: int = 100) -> hypothesis.strategies.SearchStrategy: - return hypothesis.strategies.lists( - hypothesis.extra.numpy.arrays(dtype=dtype, shape=hypothesis.strategies.integers(1, 1000)), - min_size=min_size, - max_size=max_size, - ) - - @pytest.mark.parametrize("dtype", MEMMAP_DTYPES.values()) def test_gpt_memmap_dataset(dtype): - @hypothesis.given(documents=dtype_arrays(dtype)) - def inner_test(documents): - with tempfile.TemporaryDirectory() as temp_dir: - prefix = pathlib.Path(temp_dir) - GPTMemmapDataset.write_dataset(prefix=prefix, documents=documents) - dataset = GPTMemmapDataset(name="foo", prefix=prefix) - for i, document in enumerate(documents): - assert np.array_equal(dataset.get(i), document, equal_nan=True), f"Mismatch at index {i}" - - inner_test() + documents = [np.random.randint(1000, size=np.random.randint(1, 100)) for _ in range(100)] + with tempfile.TemporaryDirectory() as temp_dir: + prefix = pathlib.Path(temp_dir) + GPTMemmapDataset.write_dataset(prefix=prefix, documents=documents) + dataset = GPTMemmapDataset(name="foo", prefix=prefix) + for i, document in enumerate(documents): + assert np.array_equal( + dataset.get(i), document, equal_nan=True + ), f"Mismatch for document {i}: {document} != {dataset.get(i)}." From f8c328fd6695283636197316a96fea70f2a211e9 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Tue, 12 Nov 2024 20:37:01 -0500 Subject: [PATCH 29/30] use dtype --- tests/test_memmap_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_memmap_dataset.py b/tests/test_memmap_dataset.py index f516083e..413c2648 100644 --- a/tests/test_memmap_dataset.py +++ b/tests/test_memmap_dataset.py @@ -10,7 +10,7 @@ @pytest.mark.parametrize("dtype", MEMMAP_DTYPES.values()) def test_gpt_memmap_dataset(dtype): - documents = [np.random.randint(1000, size=np.random.randint(1, 100)) for _ in range(100)] + documents = [np.random.randint(1000, size=np.random.randint(1, 100)).astype(dtype) for _ in range(100)] with tempfile.TemporaryDirectory() as temp_dir: prefix = pathlib.Path(temp_dir) GPTMemmapDataset.write_dataset(prefix=prefix, documents=documents) From ded3027fdf38e0a4503e17a9aab2f7f8808acf57 Mon Sep 17 00:00:00 2001 From: Torsten Scholak Date: Tue, 12 Nov 2024 20:37:13 -0500 Subject: [PATCH 30/30] remove unused venv package --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index b86aa749..b3b3cf13 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ FROM nvcr.io/nvidia/pytorch:24.07-py3 # Install dependencies. RUN apt-get update \ - && apt-get install --no-install-recommends -y acl python3.10-venv git-lfs \ + && apt-get install --no-install-recommends -y acl git-lfs \ && rm -rf /var/lib/apt/lists/* \ && git lfs install