Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add HuggingFaceDataSource #23

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions bigscience/beam/cache_datasets.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Need to install seqio
# gcloud auth application-default login

MODULE_IMPORT=beam.task
TASK_NAME=mt0.oscar
JOB_NAME=mt0oscar # the name must consist of only the characters [-a-z0-9], starting with a letter and ending with a letter or number
BUCKET=gs://bigscience-t5x # Don't know is cache needs to be task specific or not ...
PROJECT=bigscience
REGION=us-central1 # TODO: Check if we can have a generic us region
NUM_WORKERS=1000 # TODO: We might need a log more than this

# TODO: One thing we need to figure out is how does it handle HF datasets cache. If all workers need to download it, it's a big no no.

seqio_cache_tasks \
--module_import=$MODULE_IMPORT \
--tasks=${TASK_NAME} \
--output_cache_dir=${BUCKET}/multilingual_t0/v0.3 \
--pipeline_options="--runner=DataflowRunner,--project=$PROJECT,--region=$REGION,--job_name=$JOB_NAME,--staging_location=$BUCKET/binaries,--temp_location=$BUCKET/tmp,--setup_file=$PWD/setup.py,--num_workers=$NUM_WORKERS,--autoscaling_algorithm=NONE,--machine_type=n1-highmem-2"
127 changes: 127 additions & 0 deletions bigscience/beam/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Setup.py module for the workflow's worker utilities.
All the workflow related code is gathered in a package that will be built as a
source distribution, staged in the staging area for the workflow being run and
then installed in the workers when they start running.
This behavior is triggered by specifying the --setup_file command line option
when running the workflow for remote execution.
"""

# pytype: skip-file

import subprocess
from distutils.command.build import build as _build # type: ignore

import setuptools


# This class handles the pip install mechanism.
class build(_build): # pylint: disable=invalid-name
"""A build command class that will be invoked during package install.
The package built using the current setup.py will be staged and later
installed in the worker using `pip install package'. This class will be
instantiated during install for this specific scenario and will trigger
running the custom commands specified.
"""
sub_commands = _build.sub_commands + [('CustomCommands', None)]


# Some custom command to run during setup. The command is not essential for this
# workflow. It is used here as an example. Each command will spawn a child
# process. Typically, these commands will include steps to install non-Python
# packages. For instance, to install a C++-based library libjpeg62 the following
# two commands will have to be added:
#
# ['apt-get', 'update'],
# ['apt-get', '--assume-yes', 'install', 'libjpeg62'],
#
# First, note that there is no need to use the sudo command because the setup
# script runs with appropriate access.
# Second, if apt-get tool is used then the first command needs to be 'apt-get
# update' so the tool refreshes itself and initializes links to download
# repositories. Without this initial step the other apt-get install commands
# will fail with package not found errors. Note also --assume-yes option which
# shortcuts the interactive confirmation.
#
# Note that in this example custom commands will run after installing required
# packages. If you have a PyPI package that depends on one of the custom
# commands, move installation of the dependent package to the list of custom
# commands, e.g.:
#
# ['pip', 'install', 'my_package'],
#
# TODO(BEAM-3237): Output from the custom commands are missing from the logs.
# The output of custom commands (including failures) will be logged in the
# worker-startup log.
CUSTOM_COMMANDS = [
['echo', 'Custom command worked!'],
['pip', 'install', 'seqio'],
['pip', 'install', 't5[cache-tasks]'],
['pip', 'install', 'datasets'],

]


class CustomCommands(setuptools.Command):
"""A setuptools Command class able to run arbitrary commands."""
def initialize_options(self):
pass

def finalize_options(self):
pass

def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))

def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)


# Configure the required packages and scripts to install.
# Note that the Python Dataflow containers come with numpy already installed
# so this dependency will not trigger anything to be installed unless a version
# restriction is specified.
REQUIRED_PACKAGES = [
'numpy',
]

setuptools.setup(
name='beam',
version='0.0.1',
description='Cache datasets set workflow package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
# Command class instantiated and run during pip install scenarios.
'build': build,
'CustomCommands': CustomCommands,
})
106 changes: 106 additions & 0 deletions bigscience/beam/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import functools
from typing import Sequence, Optional

import datasets
import seqio
from seqio import TaskRegistry, ShardInfo
from t5.data import preprocessors, get_default_vocabulary
import tensorflow as tf


VOCABULARY = get_default_vocabulary()

class HuggingFaceDatasetsSource(seqio.DataSource):
def __init__(
self,
dataset_name: str,
subset_name: str,
num_shards: int,
caching_permitted: bool = True
):
"""HuggingFaceDatasetsSource constructor.
Args:
dataset_name: HF dataset name.
subset_name: HF dataset subset.
num_shards: The number of shards, this is useful when processing large files in parallel.
caching_permitted: indicates whether this data source may be cached by seqio.
Default True.
"""
self._dataset_fn = dataset_name
self._subset_name = subset_name
self._num_shards = num_shards

# Get dataset information
info = datasets.get_dataset_infos(dataset_name)
subset_name = subset_name
splits = list(info[subset_name].splits.keys())
num_input_examples = {split_name: split_info.num_examples for split_name, split_info in info[subset_name].splits.items()}
self._columns = list(info[subset_name].features.keys())

super().__init__(
splits=splits,
num_input_examples=num_input_examples,
caching_permitted=caching_permitted)

@property
def supports_arbitrary_sharding(self) -> bool:
return False

def get_dataset(
self,
split: str,
shuffle: bool = True,
seed: Optional[int] = None,
shard_info: Optional[ShardInfo] = None
) -> tf.data.Dataset:
dataset = datasets.load_dataset(
self._dataset_fn,
self._subset_name,
split=split,
)
dataset = dataset.shard(num_shards=shard_info.num_shards, index=shard_info.index)
if shuffle:
dataset = dataset.shuffle(seed)
return dataset.to_tf_dataset(
columns=self._columns,
batch_size=1,
shuffle=False
)

def list_shards(self, split: str) -> Sequence[str]:
return [str(i) for i in range(self._num_shards)]

TaskRegistry.add(
"oscar_fr_lm_objective",
source=HuggingFaceDatasetsSource(
"oscar",
"unshuffled_deduplicated_fr",
num_shards=1024
),
preprocessors=[
functools.partial(
preprocessors.rekey, key_map={
"inputs": None,
"targets": "text"
}),
seqio.preprocessors.tokenize,
seqio.CacheDatasetPlaceholder(),
preprocessors.targets_for_prefix_lm_objective,
preprocessors.pack_prefix_lm_encoder_decoder,
],
output_features={
"encoder_input_tokens": seqio.Feature(vocabulary=VOCABULARY, add_eos=False),
"decoder_target_tokens": seqio.Feature(vocabulary=VOCABULARY, add_eos=False),
"decoder_input_tokens": seqio.Feature(vocabulary=VOCABULARY, add_eos=False),
"encoder_segment_ids": seqio.Feature(vocabulary=VOCABULARY, add_eos=False),
"encoder_positions": seqio.Feature(vocabulary=VOCABULARY, add_eos=False),
"decoder_segment_ids": seqio.Feature(vocabulary=VOCABULARY, add_eos=False),
"decoder_positions": seqio.Feature(vocabulary=VOCABULARY, add_eos=False),
"decoder_loss_weights": seqio.Feature(vocabulary=VOCABULARY, add_eos=False),
# All but the last stage of the preprocessing uses "targets" as the key,
# so this output feature is necessary. It is not marked required because
# the final preprocessor drops it.
"targets": seqio.Feature(vocabulary=VOCABULARY, add_eos=True),
},
metric_fns=[]
)
2 changes: 0 additions & 2 deletions bigscience/gins/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

from promptsource import seqio_tasks

from t5x.partitioning import LogicalAxisRules

# --- Seqio ---
seqio.add_global_cache_dirs([
'gs://bigscience-t5x/seqio_cached_tasks',
Expand Down