Skip to content

Commit 7dd7d7b

Browse files
committed
per worker var for shadow and trainable wrapper
1 parent 2d373e1 commit 7dd7d7b

File tree

11 files changed

+904
-194
lines changed

11 files changed

+904
-194
lines changed

demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66

77
try:
88
from tensorflow.keras.optimizers.legacy import Adam
9+
from tensorflow.keras.optimizers.legacy import Adagrad
910
except:
1011
from tensorflow.keras.optimizers import Adam
12+
from tensorflow.keras.optimizers import Adagrad
1113

1214
from tensorflow import distribute as tf_dist
1315

@@ -130,7 +132,7 @@ def __init__(self, strategy, train_bs, test_bs, epochs, steps_per_epoch,
130132
"/job:ps/replica:0/task:{}/device:CPU:0".format(idx)
131133
for idx in range(self.num_ps)
132134
]
133-
self.embedding_size = 4
135+
self.embedding_size = 1
134136
self.train_bs = train_bs
135137
self.test_bs = test_bs
136138
self.epochs = epochs
@@ -148,7 +150,7 @@ def get_dataset(self, batch_size=1):
148150
ratings = dataset.map(
149151
lambda x: tf.one_hot(tf.cast(x['user_rating'] - 1, dtype=tf.int64), 5))
150152
dataset = dataset.zip((features, ratings))
151-
dataset = dataset.shuffle(4096, reshuffle_each_iteration=False)
153+
dataset = dataset.shuffle(4096, reshuffle_each_iteration=False).repeat()
152154
if batch_size > 1:
153155
dataset = dataset.batch(batch_size)
154156
return dataset
@@ -161,6 +163,8 @@ def train(self):
161163
self.ps_devices, self.embedding_size, self.embedding_size,
162164
tf.keras.initializers.RandomNormal(0.0, 0.5))
163165
optimizer = Adam(1E-3)
166+
167+
# optimizer = Adagrad(1E-3)
164168
optimizer = de.DynamicEmbeddingOptimizer(optimizer)
165169

166170
auc = tf.keras.metrics.AUC(num_thresholds=1000)
@@ -176,7 +180,7 @@ def train(self):
176180
model.load_weights(self.model_dir)
177181

178182
model.fit(dataset, epochs=self.epochs, steps_per_epoch=self.steps_per_epoch)
179-
183+
print(f"model: {model.trainable_variables}")
180184
if self.model_dir:
181185
save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])
182186
model.save(self.model_dir, options=save_options)
@@ -255,10 +259,10 @@ def start_chief(config):
255259
cluster_spec, task_type="chief", task_id=0)
256260
strategy = tf_dist.experimental.ParameterServerStrategy(cluster_resolver)
257261
runner = Runner(strategy=strategy,
258-
train_bs=4,
262+
train_bs=2,
259263
test_bs=1,
260264
epochs=1,
261-
steps_per_epoch=4,
265+
steps_per_epoch=2,
262266
model_dir=None,
263267
export_dir=None)
264268
runner.train()
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/bin/bash
2+
rm -rf ./ckpt
3+
sh stop.sh
4+
sleep 1
5+
python movielens-1m-keras-ps.py --ps_list="localhost:2220" --worker_list="localhost:2231" --chief="localhost:2230" --task_mode="ps" --task_id=0 &
6+
sleep 1
7+
python movielens-1m-keras-ps.py --ps_list="localhost:2220" --worker_list="localhost:2231" --chief="localhost:2230" --task_mode="worker" --task_id=0 &
8+
sleep 1
9+
python movielens-1m-keras-ps.py --ps_list="localhost:2220" --worker_list="localhost:2231" --chief="localhost:2230" --task_mode="chief" --task_id=0
10+
echo "ok"

demo/dynamic_embedding/movielens-1m-keras/movielens-1m-keras.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
from absl import app
77
from tensorflow_recommenders_addons import dynamic_embedding as de
88
try:
9-
from tensorflow.keras.legacy.optimizers import Adam
9+
from tensorflow.keras.optimizers.legacy import Adam
10+
from tensorflow.keras.optimizers.legacy import Adagrad
1011
except:
1112
from tensorflow.keras.optimizers import Adam
13+
from tensorflow.keras.optimizers import Adagrad
14+
1215

1316
flags.DEFINE_string('mode', 'train', 'Select the running mode: train or test.')
1417
flags.DEFINE_string('model_dir', 'model_dir',
@@ -119,7 +122,8 @@ def train():
119122
dataset = get_dataset(batch_size=32)
120123
model = DualChannelsDeepModel(FLAGS.embedding_size, FLAGS.embedding_size,
121124
tf.keras.initializers.RandomNormal(0.0, 0.5))
122-
optimizer = Adam(1E-3)
125+
# optimizer = Adam(1E-3)
126+
optimizer = Adagrad(1E-3)
123127
optimizer = de.DynamicEmbeddingOptimizer(optimizer)
124128

125129
auc = tf.keras.metrics.AUC(num_thresholds=1000)

tensorflow_recommenders_addons/dynamic_embedding/python/keras/layers/embedding.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from tensorflow.python.keras.utils import tf_utils
2929

30+
from tensorflow_recommenders_addons.dynamic_embedding.python.ops.parameter_server import create_ps_shadow_variable
3031
from tensorflow_recommenders_addons.dynamic_embedding.python.ops.shadow_embedding_ops import HvdVariable
3132
from tensorflow_recommenders_addons.dynamic_embedding.python.train.utils import \
3233
is_parameter_server_strategy
@@ -246,11 +247,11 @@ def __init__(self,
246247
else:
247248
if is_parameter_server_strategy(self.distribute_strategy):
248249
self.shadow_impl = tf_utils.ListWrapper([
249-
de.shadow_ops.ShadowVariable(
250-
self.params,
250+
create_ps_shadow_variable(
251+
params=self.params,
251252
name=shadow_name,
252253
max_norm=self.max_norm,
253-
distribute_strategy=self.distribute_strategy,
254+
strategy=self.distribute_strategy,
254255
trainable=trainable)
255256
])
256257
else:
@@ -303,13 +304,16 @@ def call(self, ids):
303304
Returns:
304305
A embedding output with shape (shape(ids), embedding_size).
305306
"""
306-
tfprint = tf.print("ids_8a:", ids, output_stream=tf.compat.v1.logging.error)
307-
with tf.control_dependencies([tfprint]):
308-
pass
309-
return de.shadow_ops.embedding_lookup_unique(self.shadow, ids,
307+
308+
r = de.shadow_ops.embedding_lookup_unique(self.shadow, ids,
310309
self.embedding_size,
311310
self.with_unique, self.name)
312311

312+
tfprint = tf.print("ids_8a:", r, ids, self.shadow.ids, output_stream=tf.compat.v1.logging.error)
313+
with tf.control_dependencies([tfprint]):
314+
pass
315+
return r
316+
313317
def get_config(self):
314318
_initializer = self.params.initializer
315319
if _initializer is None:

tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/parameter_server_bzl.py

Lines changed: 112 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import os
33
import sys
44

5-
from absl.testing import parameterized
65
from tensorflow.python.distribute import multi_process_lib
76
import multiprocessing
87
import tensorflow as tf
@@ -73,79 +72,119 @@ def tearDownClass(cls):
7372
super(ParameterServerStrategyV2Test, cls).tearDownClass()
7473
cls.cluster.stop()
7574

76-
#@parameterized.parameters(True, False)
77-
def testPerWorkerVariableCreation(self):
75+
def testPerWorkerTraining(self):
7876
var_dtype = tf.dtypes.float32
7977
var_name = 'var'
80-
shape = [1] #if define_shape else None
81-
82-
# with self.strategy.scope():
83-
var = variables.Variable(initial_value=[0.0],
84-
shape=shape,
85-
dtype=var_dtype,
86-
name=var_name,
87-
per_worker_de_variable=True)
88-
89-
# Use per-worker variable as a capture
90-
@def_function.function
91-
def worker_fn():
92-
var.assign_add(constant_op.constant([1.0]))
93-
return var
94-
95-
num_closures = 10
96-
for ix in range(num_closures):
97-
self.coordinator.schedule(worker_fn)
98-
# Read the PWV many times to ensure result is up-to-date
99-
self.coordinator.join()
100-
result_sum = sum(var.read_all()).numpy()
101-
self.assertEqual(result_sum, ix + 1)
102-
103-
for _ in range(num_closures):
104-
self.coordinator.schedule(worker_fn)
105-
self.coordinator.join()
106-
107-
# Verify placement of variables
108-
devices = [wv._get_values().device for wv in var._per_worker_vars._values]
109-
expected_devices = [
110-
f'/job:worker/replica:0/task:{ix}/device:CPU:0'
111-
for ix in range(self.strategy._num_workers)
112-
] # pylint: disable=protected-access
113-
self.assertAllEqual(devices, expected_devices)
114-
115-
result_sum = sum(var.read_all()).numpy()
116-
self.assertEqual(result_sum, num_closures * 2)
117-
118-
def testKerasFit(self):
119-
embed_dim = 8
78+
shape = [1]
12079
with self.strategy.scope():
121-
model = Sequential([
122-
layers.Input(shape=(1,), dtype=tf.int32),
123-
de.keras.layers.Embedding(embed_dim, key_dtype=tf.int32),
124-
layers.Flatten(),
125-
layers.Dense(1, activation='sigmoid')
126-
])
127-
optimizer = Adam(1E-3)
128-
optimizer = de.DynamicEmbeddingOptimizer(optimizer)
129-
model.compile(loss='binary_crossentropy',
130-
optimizer=optimizer,
131-
metrics=['accuracy'])
132-
133-
ids = np.random.randint(0, 100, size=(64 * 2, 1))
134-
labels = np.random.randint(0, 2, size=(64 * 2, 1))
135-
136-
def dataset_fn(input_context):
137-
global_batch_size = 32
138-
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
139-
dataset = tf.data.Dataset.from_tensor_slices((ids, labels))
140-
dataset = dataset.shard(input_context.num_input_pipelines,
141-
input_context.input_pipeline_id)
142-
dataset = dataset.batch(batch_size).repeat()
143-
return dataset
144-
145-
dataset = self.strategy.distribute_datasets_from_function(dataset_fn)
146-
147-
history = model.fit(dataset, epochs=1, steps_per_epoch=len(ids) // 64)
148-
self.assertIn('loss', history.history)
80+
var = variables.Variable(initial_value=[0.0],
81+
shape=shape,
82+
dtype=var_dtype,
83+
name=var_name,
84+
per_worker_variable=True)
85+
var._trainable = True
86+
with backprop.GradientTape(persistent=True) as tape:
87+
88+
# 定义训练步骤
89+
@tf.function
90+
def train_step():
91+
with tf.GradientTape() as tape:
92+
# var._maybe_create_per_worker_vars()
93+
value = var.read_value()
94+
# if not var.trainable:
95+
tape.watch(value) # still need this with var._trainable = True set.
96+
y = value * 2.0
97+
grad = tape.gradient(y, value)
98+
return grad
99+
100+
@tf.function
101+
def train_step2():
102+
with tf.GradientTape() as tape:
103+
var._maybe_create_per_worker_vars()
104+
value = var.value()
105+
# if not var.trainable:
106+
tape.watch(value) # still need this with var._trainable = True set.
107+
y = value * 2.0
108+
grad = tape.gradient(y, value)
109+
return grad
110+
111+
# 运行并检查结果
112+
grads = self.strategy.run(train_step2)
113+
print(f"grads :{grads}")
114+
print(f"var.read_all() {var.read_all()}")
115+
#@parameterized.parameters(True, False)
116+
# def testPerWorkerVariableCreation(self):
117+
# var_dtype = tf.dtypes.float32
118+
# var_name = 'var'
119+
# shape = [1] #if define_shape else None
120+
#
121+
# with self.strategy.scope():
122+
# var = variables.Variable(initial_value=[0.0],
123+
# shape=shape,
124+
# dtype=var_dtype,
125+
# name=var_name,
126+
# per_worker_de_variable=True)
127+
#
128+
# # Use per-worker variable as a capture
129+
# @def_function.function
130+
# def worker_fn():
131+
# var.assign_add(constant_op.constant([1.0]))
132+
# return var
133+
#
134+
# num_closures = 10
135+
# for ix in range(num_closures):
136+
# self.coordinator.schedule(worker_fn)
137+
# # Read the PWV many times to ensure result is up-to-date
138+
# self.coordinator.join()
139+
# result_sum = sum(var.read_all()).numpy()
140+
# self.assertEqual(result_sum, ix + 1)
141+
#
142+
# for _ in range(num_closures):
143+
# self.coordinator.schedule(worker_fn)
144+
# self.coordinator.join()
145+
#
146+
# # Verify placement of variables
147+
# devices = [wv._get_values().device for wv in var._per_worker_vars._values]
148+
# expected_devices = [
149+
# f'/job:worker/replica:0/task:{ix}/device:CPU:0'
150+
# for ix in range(self.strategy._num_workers)
151+
# ] # pylint: disable=protected-access
152+
# self.assertAllEqual(devices, expected_devices)
153+
#
154+
# result_sum = sum(var.read_all()).numpy()
155+
# self.assertEqual(result_sum, num_closures * 2)
156+
157+
# def testKerasFit(self):
158+
# embed_dim = 8
159+
# with self.strategy.scope():
160+
# model = Sequential([
161+
# layers.Input(shape=(1,), dtype=tf.int32),
162+
# de.keras.layers.Embedding(embed_dim, key_dtype=tf.int32),
163+
# layers.Flatten(),
164+
# layers.Dense(1, activation='sigmoid')
165+
# ])
166+
# optimizer = Adam(1E-3)
167+
# optimizer = de.DynamicEmbeddingOptimizer(optimizer)
168+
# model.compile(loss='binary_crossentropy',
169+
# optimizer=optimizer,
170+
# metrics=['accuracy'])
171+
#
172+
# ids = np.random.randint(0, 100, size=(64 * 2, 1))
173+
# labels = np.random.randint(0, 2, size=(64 * 2, 1))
174+
#
175+
# def dataset_fn(input_context):
176+
# global_batch_size = 32
177+
# batch_size = input_context.get_per_replica_batch_size(global_batch_size)
178+
# dataset = tf.data.Dataset.from_tensor_slices((ids, labels))
179+
# dataset = dataset.shard(input_context.num_input_pipelines,
180+
# input_context.input_pipeline_id)
181+
# dataset = dataset.batch(batch_size).repeat()
182+
# return dataset
183+
#
184+
# dataset = self.strategy.distribute_datasets_from_function(dataset_fn)
185+
#
186+
# history = model.fit(dataset, epochs=1, steps_per_epoch=len(ids) // 64)
187+
# self.assertIn('loss', history.history)
149188

150189

151190
# borrow from multi_process_lib._set_spawn_exe_path and modify it for tf_recommenders_addons
@@ -175,8 +214,8 @@ def guess_path(package_root):
175214
multiprocessing.get_context().set_executable(sys.argv[0])
176215

177216

178-
# This is not for pytest
179-
# bazel test //tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests:parameter_server_bzl
217+
# This is not for pytest bazel clean --expunge
218+
# bazel test --test_output=all //tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests:parameter_server_bzl
180219
if __name__ == "__main__":
181220
multi_process_lib._set_spawn_exe_path = custom_set_spawn_exe_path
182221
v2_compat.enable_v2_behavior()

0 commit comments

Comments
 (0)