Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions tests/sft/checkpoint_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import jax.sharding as shd
import numpy as np
import qwix
import orbax.checkpoint as ocp
from tunix.sft import checkpoint_manager

os.environ['XLA_FLAGS'] = '--xla_force_host_platform_device_count=4'
Expand Down Expand Up @@ -252,6 +253,67 @@ def test_save_and_restore_with_custom_metadata(self):
self.assertEqual(restored_step, 1)
self.assertEqual(restored_metadata, custom_metadata)

def test_save_with_metrics(self):
options = ocp.CheckpointManagerOptions(
best_fn=lambda x: x['accuracy'],
best_mode='max',
)
ckpt_manager = checkpoint_manager.CheckpointManager(
self.temp_path, options=options
)
model, _ = create_sharded_model(TestModel, nnx.Rngs(0), self.mesh)
metrics = {'accuracy': 0.9}
self.assertTrue(ckpt_manager.save(1, model, metrics=metrics))
ckpt_manager._checkpoint_manager.wait_until_finished()
self.assertEqual(ckpt_manager.latest_step(), 1)
self.assertEqual(
ckpt_manager._checkpoint_manager.metadata(1).metrics, metrics
)

metrics_low = {'accuracy': 0.8}
self.assertTrue(ckpt_manager.save(2, model, metrics=metrics_low))
ckpt_manager._checkpoint_manager.wait_until_finished()
self.assertEqual(ckpt_manager.latest_step(), 2)
self.assertEqual(
ckpt_manager._checkpoint_manager.metadata(2).metrics, metrics_low
)

def test_best_checkpoint(self):
options = ocp.CheckpointManagerOptions(
max_to_keep=2,
best_fn=lambda x: x['accuracy'],
best_mode='max',
)
ckpt_manager = checkpoint_manager.CheckpointManager(
self.temp_path, options=options
)
model, _ = create_sharded_model(TestModel, nnx.Rngs(0), self.mesh)

# Save step 1, accuracy 0.5.
ckpt_manager.save(1, model, metrics={'accuracy': 0.5})
ckpt_manager._checkpoint_manager.wait_until_finished()

# Save step 2, accuracy 0.9. Best.
ckpt_manager.save(2, model, metrics={'accuracy': 0.9})
ckpt_manager._checkpoint_manager.wait_until_finished()

# Save step 3, accuracy 0.1. Worst.
ckpt_manager.save(3, model, metrics={'accuracy': 0.1})
ckpt_manager._checkpoint_manager.wait_until_finished()

# Save step 4, accuracy 0.8. Second Best.
ckpt_manager.save(4, model, metrics={'accuracy': 0.8})
ckpt_manager._checkpoint_manager.wait_until_finished()

self.assertEqual(ckpt_manager._checkpoint_manager.best_step(), 2)

# With max_to_keep=2, we expect step 2 (0.9) and step 4 (0.8) to be kept.
# Step 1 (0.5) and Step 3 (0.1) should be deleted.
self.assertTrue((epath.Path(self.temp_path) / '2').exists())
self.assertTrue((epath.Path(self.temp_path) / '4').exists())
self.assertFalse((epath.Path(self.temp_path) / '1').exists())
self.assertFalse((epath.Path(self.temp_path) / '3').exists())

@parameterized.parameters(['test_data/checkpoints'])
def test_restore_with_backward_compatibility(self, ckpt_path):
# The checkpoints in test_data is saved with StandardSave. The test is to
Expand Down
33 changes: 27 additions & 6 deletions tests/sft/metrics_logger_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ def setUp(self):

if env_utils.is_internal_env():
self.mock_backends.append(
self.enter_context(
mock.patch.object(metrics_logger, "CluBackend")
)
self.enter_context(mock.patch.object(metrics_logger, "CluBackend"))
)
else:
self.mock_backends.append(
Expand All @@ -33,9 +31,7 @@ def setUp(self):
)
)
self.mock_backends.append(
self.enter_context(
mock.patch.object(metrics_logger, "WandbBackend")
)
self.enter_context(mock.patch.object(metrics_logger, "WandbBackend"))
)

@mock.patch.object(jax.monitoring, "register_scalar_listener")
Expand Down Expand Up @@ -159,6 +155,31 @@ def test_log_perplexity(self, mock_record_scalar):
logger.get_metric("test_prefix", "perplexity", "eval"), 31.6227766
)

@mock.patch.object(jax.monitoring, "record_scalar")
def test_get_latest_metrics(self, mock_record_scalar):
options = metrics_logger.MetricsLoggerOptions(
log_dir=self.log_dir, backend_factories=[]
)
logger = metrics_logger.MetricsLogger(metrics_logger_options=options)
logger.log("test_prefix", "loss", 0.1, metrics_logger.Mode.TRAIN, 1)
logger.log("test_prefix", "loss", 0.05, metrics_logger.Mode.TRAIN, 2)
logger.log("test_prefix", "accuracy", 0.9, metrics_logger.Mode.TRAIN, 2)

latest_metrics = logger.get_latest_metrics(
"test_prefix", metrics_logger.Mode.TRAIN
)
self.assertEqual(
latest_metrics,
{
"test_prefix/train/loss": 0.05,
"test_prefix/train/accuracy": 0.9,
},
)

# Test with non-existent mode/prefix
self.assertEqual(logger.get_latest_metrics("test_prefix", "eval"), {})
self.assertEqual(logger.get_latest_metrics("wrong_prefix", "train"), {})

@mock.patch.object(jax, "process_index", return_value=1)
@mock.patch.object(jax.monitoring, "register_scalar_listener")
def test_no_backends_on_secondary_process(
Expand Down
6 changes: 5 additions & 1 deletion tests/sft/peft_trainer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,11 @@ def test_checkpointing(
mock.call.maybe_restore(mock.ANY, restore_only_lora_params=True),
*[
mock.call.save(
i, mock.ANY, save_only_lora_params=True, custom_metadata={}
i,
mock.ANY,
save_only_lora_params=True,
custom_metadata={},
metrics=mock.ANY,
)
for i in expected_save_steps
],
Expand Down
3 changes: 3 additions & 0 deletions tunix/sft/checkpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def save(
save_only_lora_params: bool = False,
force: bool = False,
custom_metadata: dict[str, Any] | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we store the metrics to custom_metadata? It's storing global_steps, and metrics probably belongs here too.

Copy link
Author

@kamalkraj kamalkraj Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could save in custom metadata as well. The reason for saving metrics along with checkpoint is it make use of the orbax checkpointer builtin functionality to keep only the best checkpoints.
https://orbax.readthedocs.io/en/latest/api_reference/checkpoint.checkpoint_manager.html#checkpointmanageroptions
using the best_fn and best_mode

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah makes sense, if that's case, can you add a test for the best_step?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

metrics: dict[str, Any] | None = None,
) -> bool:
"""Saves the params for the given step.

Expand All @@ -92,6 +93,7 @@ def save(
force: Whether to save the checkpoint regardless of the save decision
policy.
custom_metadata: Custom metadata to save with the checkpoint.
metrics: Metrics to save with the checkpoint.

Returns:
Whether the checkpoint was saved.
Expand All @@ -113,6 +115,7 @@ def save(
model_params=checkpoint_args,
),
custom_metadata=custom_metadata or {},
metrics=metrics,
force=force,
)

Expand Down
18 changes: 18 additions & 0 deletions tunix/sft/metrics_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,24 @@ def get_metric_history(
)
return np.stack(self._metrics[metrics_prefix][mode][metric_name])

def get_latest_metrics(
self, metrics_prefix: str, mode: Mode | str
) -> dict[str, float]:
"""Returns the latest metrics for the given mode."""
if metrics_prefix not in self._metrics:
return {}
if mode not in self._metrics[metrics_prefix]:
return {}

metrics = {}
for metric_name in self._metrics[metrics_prefix][mode]:
values = self._metrics[metrics_prefix][mode][metric_name]
if values:
metrics[f"{metrics_prefix}/{mode}/{metric_name}"] = np.array(
values[-1]
).item()
return metrics

def close(self):
"""Closes all registered logging backends."""
for backend in self._backends:
Expand Down
34 changes: 23 additions & 11 deletions tunix/sft/peft_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,21 +663,26 @@ def train(
):
self._train_steps += 1
self._write_train_metrics()
train_metrics = self.metrics_logger.get_latest_metrics(
self.metrics_prefix, sft_metrics_logger.Mode.TRAIN
)

eval_metrics = {}
if (
eval_ds
and self._train_steps % self.config.eval_every_n_steps == 0
):
eval_metrics = self._run_eval(eval_ds, eval_step)

# Checkpoint frequency is configured by checkpointing_options.
self.checkpoint_manager.save(
self._train_steps,
self.model,
save_only_lora_params=self._lora_enabled,
custom_metadata=self.custom_checkpoint_metadata(),
metrics={**train_metrics, **eval_metrics},
)

if (
eval_ds
and self._train_steps % self.config.eval_every_n_steps == 0
):
self._run_eval(eval_ds, eval_step)

self._prof.maybe_deactivate(self._iter_steps)

self._throttler.wait_for_all()
Expand Down Expand Up @@ -728,7 +733,7 @@ def _run_eval(
self,
eval_ds: Iterable[Any],
eval_step_fn: Callable[..., Any],
) -> None:
) -> dict[str, Any]:
"""Runs evaluation loop."""
logging.info("Running evaluation on train step %d.", self._train_steps)
eval_iterator = iter(eval_ds)
Expand Down Expand Up @@ -765,20 +770,27 @@ def _run_eval(
logging.warning(
"No eval examples found. Skipping eval metrics logging."
)
return
return {}

self._write_metrics(self._buffered_eval_metrics)
metrics = self.metrics_logger.get_latest_metrics(
self.metrics_prefix, sft_metrics_logger.Mode.EVAL
)
logging.info(
"Train step %d eval loss: %f - eval perplexity: %f",
self._train_steps,
self.metrics_logger.get_metric(self.metrics_prefix, "loss", "eval"),
self.metrics_logger.get_metric(
self.metrics_prefix, "perplexity", "eval"
metrics.get(
f"{self.metrics_prefix}/{sft_metrics_logger.Mode.EVAL}/loss", 0.0
),
metrics.get(
f"{self.metrics_prefix}/{sft_metrics_logger.Mode.EVAL}/perplexity",
0.0,
),
)
self._buffered_eval_metrics = None
if self.training_hooks:
self.training_hooks.on_eval_step_end(self, eval_loss)
return metrics


def _default_loss_fn(
Expand Down