Skip to content

Commit 466800e

Browse files
authored
Dataset versioning (#218)
1 parent 4de475a commit 466800e

File tree

6 files changed

+120
-18
lines changed

6 files changed

+120
-18
lines changed

.env.example

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ EVALUATE_SCRIPT_PATH = 'evaluate/evaluate_model.py'
3232
REGISTER_SCRIPT_PATH = 'register/register_model.py'
3333
SOURCES_DIR_TRAIN = 'diabetes_regression'
3434
DATASET_NAME = 'diabetes_ds'
35+
DATASET_VERSION = 'latest'
36+
# Optional. Set it if you have configured non default datastore to point to your data
37+
DATASTORE_NAME = ''
3538
SCORE_SCRIPT = 'scoring/score.py'
3639

3740
# Optional. Used by a training pipeline with R on Databricks

.pipelines/diabetes_regression-variables-template.yml

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ variables:
2323
value: mlopspython
2424
- name: DATASET_NAME
2525
value: diabetes_ds
26+
# Uncomment DATASTORE_NAME if you have configured non default datastore to point to your data
27+
# - name: DATASTORE_NAME
28+
# value: datablobstore
29+
- name: DATASET_VERSION
30+
value: latest
2631
- name: TRAINING_PIPELINE_NAME
2732
value: "diabetes-Training-Pipeline"
2833
- name: MODEL_NAME

diabetes_regression/register/register_model.py

+15-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import argparse
2929
import traceback
3030
import joblib
31-
from azureml.core import Run, Experiment, Workspace
31+
from azureml.core import Run, Experiment, Workspace, Dataset
3232
from azureml.core.model import Model as AMLModel
3333

3434

@@ -105,15 +105,23 @@ def main():
105105
print("Tags present: {parent_tags}")
106106

107107
if (model is not None):
108+
dataset_id = parent_tags["dataset_id"]
108109
if (build_id is None):
109-
register_aml_model(model_file, model_name, model_mse, exp, run_id)
110+
register_aml_model(
111+
model_file,
112+
model_name,
113+
model_mse,
114+
exp,
115+
run_id,
116+
dataset_id)
110117
elif (build_uri is None):
111118
register_aml_model(
112119
model_file,
113120
model_name,
114121
model_mse,
115122
exp,
116123
run_id,
124+
dataset_id,
117125
build_id)
118126
else:
119127
register_aml_model(
@@ -122,6 +130,7 @@ def main():
122130
model_mse,
123131
exp,
124132
run_id,
133+
dataset_id,
125134
build_id,
126135
build_uri)
127136
else:
@@ -146,6 +155,7 @@ def register_aml_model(
146155
model_mse,
147156
exp,
148157
run_id,
158+
dataset_id,
149159
build_id: str = 'none',
150160
build_uri=None
151161
):
@@ -164,7 +174,9 @@ def register_aml_model(
164174
workspace=exp.workspace,
165175
model_name=model_name,
166176
model_path=model_path,
167-
tags=tagsValue)
177+
tags=tagsValue,
178+
datasets=[('training data',
179+
Dataset.get_by_id(exp.workspace, dataset_id))])
168180
os.chdir("..")
169181
print(
170182
"Model registered: {} \nModel Description: {} "

diabetes_regression/training/train.py

+66-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,22 @@
3131
from sklearn.model_selection import train_test_split
3232
import joblib
3333
import json
34+
from azureml.core import Dataset, Datastore, Workspace
35+
36+
37+
def register_dataset(
38+
aml_workspace: Workspace,
39+
dataset_name: str,
40+
datastore_name: str,
41+
file_path: str
42+
) -> Dataset:
43+
datastore = Datastore.get(aml_workspace, datastore_name)
44+
dataset = Dataset.Tabular.from_delimited_files(path=(datastore, file_path))
45+
dataset = dataset.register(workspace=aml_workspace,
46+
name=dataset_name,
47+
create_new_version=True)
48+
49+
return dataset
3450

3551

3652
def train_model(run, data, alpha):
@@ -64,13 +80,47 @@ def main():
6480
help=("output for passing data to next step")
6581
)
6682

83+
parser.add_argument(
84+
"--dataset_version",
85+
type=str,
86+
help=("dataset version")
87+
)
88+
89+
parser.add_argument(
90+
"--data_file_path",
91+
type=str,
92+
help=("data file path, if specified,\
93+
a new version of the dataset will be registered")
94+
)
95+
96+
parser.add_argument(
97+
"--caller_run_id",
98+
type=str,
99+
help=("caller run id, for example ADF pipeline run id")
100+
)
101+
102+
parser.add_argument(
103+
"--dataset_name",
104+
type=str,
105+
help=("Dataset name. Dataset must be passed by name\
106+
to always get the desired dataset version\
107+
rather than the one used while the pipeline creation")
108+
)
109+
67110
args = parser.parse_args()
68111

69112
print("Argument [model_name]: %s" % args.model_name)
70113
print("Argument [step_output]: %s" % args.step_output)
114+
print("Argument [dataset_version]: %s" % args.dataset_version)
115+
print("Argument [data_file_path]: %s" % args.data_file_path)
116+
print("Argument [caller_run_id]: %s" % args.caller_run_id)
117+
print("Argument [dataset_name]: %s" % args.dataset_name)
71118

72119
model_name = args.model_name
73120
step_output_path = args.step_output
121+
dataset_version = args.dataset_version
122+
data_file_path = args.data_file_path
123+
dataset_name = args.dataset_name
74124

75125
print("Getting training parameters")
76126

@@ -86,16 +136,27 @@ def main():
86136
run = Run.get_context()
87137

88138
# Get the dataset
89-
dataset = run.input_datasets['training_data']
90-
if (dataset):
91-
df = dataset.to_pandas_dataframe()
92-
X = df.drop('Y', axis=1).values
93-
y = df['Y'].values
139+
if (dataset_name):
140+
if (data_file_path == 'none'):
141+
dataset = Dataset.get_by_name(run.experiment.workspace, dataset_name, dataset_version) # NOQA: E402, E501
142+
else:
143+
dataset = register_dataset(run.experiment.workspace,
144+
dataset_name,
145+
os.environ.get("DATASTORE_NAME"),
146+
data_file_path)
94147
else:
95148
e = ("No dataset provided")
96149
print(e)
97150
raise Exception(e)
98151

152+
# Link dataset to the step run so it is trackable in the UI
153+
run.input_datasets['training_data'] = dataset
154+
run.parent.tag("dataset_id", value=dataset.id)
155+
156+
df = dataset.to_pandas_dataframe()
157+
X = df.drop('Y', axis=1).values
158+
y = df['Y'].values
159+
99160
X_train, X_test, y_train, y_test = train_test_split(
100161
X, y, test_size=0.2, random_state=0)
101162
data = {"train": {"X": X_train, "y": y_train},

ml_service/pipelines/diabetes_regression_build_train_pipeline.py

+21-10
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
from azureml.pipeline.core.graph import PipelineParameter
22
from azureml.pipeline.steps import PythonScriptStep
33
from azureml.pipeline.core import Pipeline, PipelineData
4-
from azureml.core import Workspace
4+
from azureml.core import Workspace, Dataset, Datastore
55
from azureml.core.runconfig import RunConfiguration
6-
from azureml.core import Dataset
76
from ml_service.util.attach_compute import get_compute
87
from ml_service.util.env_variables import Env
98
from ml_service.util.manage_environment import get_environment
@@ -39,8 +38,20 @@ def main():
3938
run_config = RunConfiguration()
4039
run_config.environment = environment
4140

41+
if (e.datastore_name):
42+
datastore_name = e.datastore_name
43+
else:
44+
datastore_name = aml_workspace.get_default_datastore().name
45+
run_config.environment.environment_variables["DATASTORE_NAME"] = datastore_name # NOQA: E501
46+
4247
model_name_param = PipelineParameter(
4348
name="model_name", default_value=e.model_name)
49+
dataset_version_param = PipelineParameter(
50+
name="dataset_version", default_value=e.dataset_version)
51+
data_file_path_param = PipelineParameter(
52+
name="data_file_path", default_value="none")
53+
caller_run_id_param = PipelineParameter(
54+
name="caller_run_id", default_value="none")
4455

4556
# Get dataset name
4657
dataset_name = e.dataset_name
@@ -57,9 +68,9 @@ def main():
5768
df.to_csv(file_name, index=False)
5869

5970
# Upload file to default datastore in workspace
60-
default_ds = aml_workspace.get_default_datastore()
71+
datatstore = Datastore.get(aml_workspace, datastore_name)
6172
target_path = 'training-data/'
62-
default_ds.upload_files(
73+
datatstore.upload_files(
6374
files=[file_name],
6475
target_path=target_path,
6576
overwrite=True,
@@ -68,17 +79,14 @@ def main():
6879
# Register dataset
6980
path_on_datastore = os.path.join(target_path, file_name)
7081
dataset = Dataset.Tabular.from_delimited_files(
71-
path=(default_ds, path_on_datastore))
82+
path=(datatstore, path_on_datastore))
7283
dataset = dataset.register(
7384
workspace=aml_workspace,
7485
name=dataset_name,
7586
description='diabetes training data',
7687
tags={'format': 'CSV'},
7788
create_new_version=True)
7889

79-
# Get the dataset
80-
dataset = Dataset.get_by_name(aml_workspace, dataset_name)
81-
8290
# Create a PipelineData to pass data between steps
8391
pipeline_data = PipelineData(
8492
'pipeline_data',
@@ -89,11 +97,14 @@ def main():
8997
script_name=e.train_script_path,
9098
compute_target=aml_compute,
9199
source_directory=e.sources_directory_train,
92-
inputs=[dataset.as_named_input('training_data')],
93100
outputs=[pipeline_data],
94101
arguments=[
95102
"--model_name", model_name_param,
96-
"--step_output", pipeline_data
103+
"--step_output", pipeline_data,
104+
"--dataset_version", dataset_version_param,
105+
"--data_file_path", data_file_path_param,
106+
"--caller_run_id", caller_run_id_param,
107+
"--dataset_name", dataset_name,
97108
],
98109
runconfig=run_config,
99110
allow_reuse=False,

ml_service/util/env_variables.py

+10
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def __init__(self):
4040
self._score_script = os.environ.get("SCORE_SCRIPT")
4141
self._build_uri = os.environ.get("BUILD_URI")
4242
self._dataset_name = os.environ.get("DATASET_NAME")
43+
self._datastore_name = os.environ.get("DATASTORE_NAME")
44+
self._dataset_version = os.environ.get("DATASET_VERSION")
4345
self._run_evaluation = os.environ.get("RUN_EVALUATION", "true")
4446
self._allow_run_cancel = os.environ.get(
4547
"ALLOW_RUN_CANCEL", "true")
@@ -145,6 +147,14 @@ def build_uri(self):
145147
def dataset_name(self):
146148
return self._dataset_name
147149

150+
@property
151+
def datastore_name(self):
152+
return self._datastore_name
153+
154+
@property
155+
def dataset_version(self):
156+
return self._dataset_version
157+
148158
@property
149159
def run_evaluation(self):
150160
return self._run_evaluation

0 commit comments

Comments
 (0)