[Pipeline RL] Add support for PipelineRL#428
[Pipeline RL] Add support for PipelineRL#428jlamypoirier wants to merge 109 commits intojlp_entropy_loss_tweaksfrom
Conversation
…M into denis/new_datasets
…enis/new_datasets
fast_llm/data/dataset/streaming.py
Outdated
| assert stream_key == REDIS_DATA_STREAM.encode() | ||
| for msg_id, msg_data in msgs: | ||
| processed += 1 | ||
| # TODO: or do it after processing all received messaged then count > 1? |
There was a problem hiding this comment.
We do not need this for now if we stick with consumer groups only. The producer can rely on the group lag to control the production rate and on last-delivered-id to safely trim the stream.
| backend="nccl", | ||
| init_method=init_method, | ||
| world_size=config.broadcast.external_world_size + 1, | ||
| rank=0, |
There was a problem hiding this comment.
Maybe this should be configurable, since the external system may not treat us as rank 0.
There was a problem hiding this comment.
We also have control over the external system, we can make it so. It's easier to sync program with hard-coded values than syncing config files.
There was a problem hiding this comment.
Maybe name it tests/models/test_streaming_training_callbacks.py or something similar?
Or perhaps it would be better under tests/engine/trainer/ or tests/trainer/, since this is testing trainer callbacks with a streaming dataset rather than model logic (with the exception of the tensor iterator).
There was a problem hiding this comment.
This needs to be in test/models because it uses the model_configs machinery. I tend to prefer concise names, so test_streaming seems appropriate.
| worker_resources: WorkerResources, | ||
| report_subtest, | ||
| ): | ||
| report_subtest(path := run_test_script_base_path / config.name, config.total_gpus) |
There was a problem hiding this comment.
Here we need to check whether we have enough GPUs for a subtest; otherwise, it will incorrectly report the test as failed with “did not run.”
tests/data/test_streaming.py
Outdated
| @pytest.mark.depends_on(on=["test_data_streaming"]) | ||
| @pytest.mark.parametrize(("name", "num_gpus", "distributed_config_dict"), _DISTRIBUTED_TESTING_CONFIGS) | ||
| def test_data_streaming_distributed(result_path, name, num_gpus, distributed_config_dict, report_subtest): | ||
| report_subtest(path := result_path / f"data_streaming/{name}", num_gpus) |
There was a problem hiding this comment.
Here we need to check whether we have enough GPUs for a subtest; otherwise, it will incorrectly report the test as failed with “did not run.”
fast_llm/data/dataset/config.py
Outdated
|
|
||
| _abstract = False | ||
|
|
||
| acknowledge_interval: int = Field( |
There was a problem hiding this comment.
This is also not needed if we only use consumer gorups, see above comment for implementation.
|
what does this do? is it feature complete? what is the next step? |
Co-authored-by: oleksost <[email protected]>
This PR provides the initial integration with PipelineRL with GRPO loss.
It introduces:
training_started,step_finished, andtraining_finished.This enables seamless coordination between Fast-LLM training and PipelineRL-based inference or orchestration components.