Skip to content

Feature/fit on ray#165

Open
pradyumna-rfai wants to merge 27 commits intomainfrom
feature/fit-on-ray
Open

Feature/fit on ray#165
pradyumna-rfai wants to merge 27 commits intomainfrom
feature/fit-on-ray

Conversation

@pradyumna-rfai
Copy link
Copy Markdown
Collaborator

@pradyumna-rfai pradyumna-rfai commented Feb 3, 2026

PR Summary: Unified Fit and Evals

This is a major refactoring PR that unifies the codebase for both fit (training) and evals (inference) modes, eliminating code duplication and enabling shared infrastructure for experiment tracking, interactive control, and metric logging and RF setup.

Fit and evals experiments can be run back to back on the same rapidfireai start session - the shared Ray cluster persists across Experiment.end() calls, so users can seamlessly switch between modes without restarting any services.

Fixed FSDP device indexing under Ray actor GPU isolation for Fit

Ray assigns each worker actor a single GPU that always appears as cuda:0, regardless of the actor's position in the group. However, the FSDP setup was passing the actor's group rank as the CUDA device index, causing ValueError: device_id cuda:N is out of range for any actor with rank > 0. This fix decouples the CUDA device index (always 0 under Ray's per-actor GPU isolation) from the FSDP group rank. trainer_config.local_rank continues to hold the FSDP group rank -- not the device index -- so rank-gated logic (e.g., "only rank 0 saves checkpoints") remains unaffected.

Changes

Major Changes

1. Unified Database Schema

  • Single experiments table for both fit and evals modes
    • Mode-specific configuration stored in JSON config column
  • Unified interactive_control table for dynamic operations
    • target_type field: 'run' (fit) or 'pipeline' (evals)
    • target_id field: holds run_id or pipeline_id
    • config_data field: holds operation-specific JSON configuration
    • Supports operations: stop, resume, delete, clone, clone_warm
  • Mode-specific tables remain separate:
    • Fit mode: runs, worker_task, controller_progress, worker_progress
    • Evals mode: pipelines, contexts, actor_tasks

2. Unified Experiment Class

  • Single entry point Experiment(name, mode="fit"|"evals") for both modes
  • Mode-specific initialization:
    • _init_fit_mode() - Sets up training infrastructure
    • _init_evals_mode() - Sets up inference infrastructure
  • Shared methods:
    • end() - Clean up resources
    • cancel_current() - Cancel current operation
    • get_log_file_path() - Get experiment logs
  • Mode-specific methods:
    • run_fit() - Execute training (fit mode only)
    • run_evals() - Execute inference (evals mode only)
    • get_results() - Get training metrics (fit mode only)
    • get_runs_info() - Get run information (fit mode only)

3. Unified Metric Logging System

4. Unified Status Enums

5. Setup

  • Unified setup for both fit and evals mode. Removed flags for --init command.
  • Added --clear command to clear all Db, logs and dashboard files.
  1. Combined duplicate Ray initialization from fit and evals into a shared _init_ray() method using a connect-or-create strategy. Experiment.end() no longer shuts down the Ray cluster, allowing sequential fit and evals experiments to reuse the same cluster. Ray is now a managed service in the start/stop scripts alongside MLflow, the API server, and the frontend.

Testing after rebases

For each of the following notebooks, IC Ops was performed - Clone, Clone with warm-start, Stop.

  • SFT-DPO-lite
  • SFT-chatqa-lite
  • SFT-GRPO-math-reasoning
  • RAG-fiqa-lite
  • RAG-gsm8k
  • RAG-fiqa-pgvector

Also verified that SFT-chatqa-lite and RAG-fiqa-lite notebooks can be run back to back without needing to re-run rapidfireai commands.

Screenshots (after rebase)

RAG-Fiqa RAG-gsm8k RAG-fiqa-pgvector RAG-fiqa-pgvector1 RAG-gsm8k1 SFT-chatqa-lite SFT-dpo SFT-GRPO-math-reasoning

Note

High Risk
High risk because it introduces a new unified SQLite schema/DB layer and a new Dispatcher REST API that affect experiment tracking and interactive-control flows across both fit and evals, plus changes dependency installation behavior in rapidfireai init.

Overview
Unifies experiment tracking across fit and evals by introducing a new rapidfireai.db module with a single SQLite schema (tables.sql) and a high-level RfDb API that covers experiments, runs, pipelines/contexts, tasks, and unified interactive-control operations.

Adds a new top-level rapidfireai.dispatcher Flask service exposing REST endpoints for interactive control of both runs and pipelines, and repoints Gunicorn to the new WSGI app.

Simplifies CLI initialization by removing the --evals split and switching to unified requirements selection (setup/rapidfireai/requirements-*.txt), adds a clear command to delete DB/log/experiment directories, and updates various imports (exceptions/logging/constants) to the new shared locations; pyproject.toml also adjusts optional deps (adds ray[default], drops loguru).

Written by Cursor Bugbot for commit 4d3ed38. This will update automatically on new commits. Configure here.

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 4 potential issues.

Autofix Details

Bugbot Autofix prepared fixes for all 4 issues found in the latest run.

  • ✅ Fixed: Duplicate torch/torchvision/torchaudio installation in setup
    • Removed the second unconditional torch/torchvision/torchaudio append block so each package is installed only once per init run.
  • ✅ Fixed: Cannot clear ended_by field via set_run_details
    • Updated set_run_details to accept enum-or-string values for source and ended_by so empty strings are persisted and can clear stale values.
  • ✅ Fixed: Fit controller fetches pipeline IC operations from unified table
    • Changed the fit controller to call get_pending_ic_operations(target_type="run") so it ignores pipeline-targeted IC operations.
  • ✅ Fixed: Duplicate encode_payload/decode_db_payload definitions in rf_db.py
    • Removed local duplicate serializer helpers from rf_db.py and imported the canonical implementations from rapidfireai.utils.serialize.

Create PR

Or push these changes by commenting:

@cursor push 6d1da7f0c6
Preview (6d1da7f0c6)
diff --git a/rapidfireai/cli.py b/rapidfireai/cli.py
--- a/rapidfireai/cli.py
+++ b/rapidfireai/cli.py
@@ -252,9 +252,6 @@
         if get_compute_capability() >= 8.0:
             packages.append({"package": "flash-attn>=2.8.3", "extra_args": ["--upgrade", "--no-build-isolation"]})
         packages.append({"package": "transformers>=4.56.1,<5.0.0", "extra_args": ["--upgrade"]})
-        packages.append({"package": f"torch=={torch_version}", "extra_args": ["--upgrade", "--index-url", f"https://download.pytorch.org/whl/{torch_cuda}"]})
-        packages.append({"package": f"torchvision=={torchvision_version}", "extra_args": ["--upgrade", "--index-url", f"https://download.pytorch.org/whl/{torch_cuda}"]})
-        packages.append({"package": f"torchaudio=={torchaudio_version}", "extra_args": ["--upgrade", "--index-url", f"https://download.pytorch.org/whl/{torch_cuda}"]})
 
         packages.append({"package": "numpy<2.3", "extra_args": ["--upgrade"]})
 

diff --git a/rapidfireai/db/rf_db.py b/rapidfireai/db/rf_db.py
--- a/rapidfireai/db/rf_db.py
+++ b/rapidfireai/db/rf_db.py
@@ -10,6 +10,7 @@
 from typing import Any
 
 from rapidfireai.db.db_interface import DatabaseInterface
+from rapidfireai.utils.serialize import decode_db_payload, encode_payload, extract_pipeline_config_json
 from rapidfireai.utils.constants import (
     ContextStatus,
     ControllerTask,
@@ -26,20 +27,6 @@
 )
 
 
-def encode_payload(payload: object) -> str:
-    """Encode the payload for the database using dill."""
-    import base64
-    import dill
-    return base64.b64encode(dill.dumps(payload)).decode("utf-8")
-
-
-def decode_db_payload(payload: str) -> object:
-    """Decode the payload from the database using dill."""
-    import base64
-    import dill
-    return dill.loads(base64.b64decode(payload))
-
-
 class RfDb:
     """
     Database manager for RapidFire AI experiments.
@@ -787,8 +774,8 @@
         num_epochs_completed: int | None = None,
         chunk_offset: int | None = None,
         error: str | None = None,
-        source: RunSource | None = None,
-        ended_by: RunEndedBy | None = None,
+        source: RunSource | str | None = None,
+        ended_by: RunEndedBy | str | None = None,
         warm_started_from: int | None = None,
         cloned_from: int | None = None,
     ) -> None:
@@ -804,8 +791,8 @@
             "num_epochs_completed": num_epochs_completed,
             "chunk_offset": chunk_offset,
             "error": error,
-            "source": source.value if source else None,
-            "ended_by": ended_by.value if ended_by else None,
+            "source": source.value if isinstance(source, RunSource) else source,
+            "ended_by": ended_by.value if isinstance(ended_by, RunEndedBy) else ended_by,
             "warm_started_from": warm_started_from,
             "cloned_from": cloned_from,
         }
@@ -1068,7 +1055,6 @@
         encoded_config = encode_payload(pipeline_config)
 
         # Extract JSON-serializable data
-        from rapidfireai.utils.serialize import extract_pipeline_config_json
         json_config_dict = extract_pipeline_config_json(pipeline_config)
         json_config_str = json.dumps(json_config_dict) if json_config_dict else "{}"
         flattened_config_str = json.dumps(flattened_config) if flattened_config else "{}"

diff --git a/rapidfireai/fit/backend/controller.py b/rapidfireai/fit/backend/controller.py
--- a/rapidfireai/fit/backend/controller.py
+++ b/rapidfireai/fit/backend/controller.py
@@ -423,7 +423,7 @@
     ) -> tuple[dict[str, Any], list[dict[str, Any]]]:
         """Process the interactive control."""
         # get IC Ops scheduled tasks
-        ic_scheduled_tasks = self.db.get_pending_ic_operations()
+        ic_scheduled_tasks = self.db.get_pending_ic_operations(target_type="run")
 
         # track states for each task(run) and collect clone_modify tasks separately
         run_states = {}
This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Comment thread rapidfireai/cli.py Outdated
Comment thread rapidfireai/db/rf_db.py
Comment thread rapidfireai/db/rf_db.py
Comment thread rapidfireai/db/rf_db.py Outdated
Comment thread rapidfireai/dispatcher/dispatcher.py Outdated
Comment thread rapidfireai/fit/backend/controller.py Outdated
Comment thread community_notebooks/sft_child_facing_chatbot.ipynb Outdated
Comment thread rapidfireai/db/rf_db.py
@pradyumna-rfai pradyumna-rfai force-pushed the feature/fit-on-ray branch 2 times, most recently from 4dd07fc to 8e76752 Compare March 11, 2026 04:28
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Missing parentheses in compound boolean condition
    • Added explicit parentheses around the and clause to make the condition’s precedence and intent unambiguous without changing behavior.

Create PR

Or push these changes by commenting:

@cursor push 0c9d90553f
Preview (0c9d90553f)
diff --git a/rapidfireai/cli.py b/rapidfireai/cli.py
--- a/rapidfireai/cli.py
+++ b/rapidfireai/cli.py
@@ -515,7 +515,7 @@
             os.environ["RF_TRACKIO_ENABLED"] = "true"
     if args.tensorboard_log_dir:
         os.environ["RF_TENSORBOARD_LOG_DIR"] = args.tensorboard_log_dir
-    if args.colab or ColabConfig.ON_COLAB and os.getenv("RF_COLAB_MODE") is None:
+    if args.colab or (ColabConfig.ON_COLAB and os.getenv("RF_COLAB_MODE") is None):
         os.environ["RF_COLAB_MODE"] = "true"
 
     # Handle force command separately

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Comment thread rapidfireai/cli.py Outdated
Comment thread rapidfireai/cli.py Outdated
@pradyumna-rfai pradyumna-rfai force-pushed the feature/fit-on-ray branch 2 times, most recently from 96edc83 to eadd6fa Compare March 19, 2026 06:35
Copy link
Copy Markdown
Collaborator

@david-rfai david-rfai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments on minor changes

Comment on lines +36 to +37
# mlflow.set_experiment automatically creates if it doesn't exist
experiment = mlflow.set_experiment(experiment_name)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually create the experiment?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will automatically create a new experiment if it doesn't already exist

Copy link
Copy Markdown
Collaborator

@david-rfai david-rfai Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove changes to Colab notebooks, as Colab notebooks are handled separately when tested after a release.

Copy link
Copy Markdown
Collaborator

@david-rfai david-rfai Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove changes to Colab notebooks, as Colab notebooks are handled separately when tested after a release.

@arun-rfai arun-rfai requested a review from anay-rfai March 20, 2026 04:17
The unified db/rf_db.py was missing estimated_runtime, required_workers
columns in the runs table and multi_worker_details in worker_task table.
Also fixed set_run_details missing those params, added set_estimated_runtime
method, and fixed set_experiment_error call in worker_actor.py to pass
experiment_id matching the unified API signature.

Made-with: Cursor
Updated interactive_controller import from fit.utils to fit.backend
and rf_db import from fit.db to db across all tutorial and community
notebooks.

Made-with: Cursor
Renamed MLFlowConfig to MLflowConfig in utils/__init__.py, dispatcher,
and removed duplicate stale MLFlowConfig imports in controller.py.
Fixed automl_utils.py import from fit.utils.exceptions to utils.exceptions
and callbacks.py import from fit.utils.logging to utils.logging.

Made-with: Cursor
Fixed stale imports in conftest.py and test_metric_logger.py to use
the new metrics package paths. Removed tests for DualMetricLogger and
create_metric_logger which no longer exist. Fixed callback import path
from rapidfireai.ml to rapidfireai.fit.ml. Deleted orphaned
fit/utils/serialize.py superseded by utils/serialize.py.

Made-with: Cursor
Each Ray worker actor sees only its assigned GPU as cuda:0, but the
FSDP setup was using the actor's group rank as the CUDA device index,
causing ValueError for any actor with rank > 0.

Fixed by decoupling the CUDA device index (always 0 under Ray) from
the FSDP group rank. Note that trainer_config.local_rank retains the
FSDP group rank (not the device index) and is used throughout for
conditional logic like "only rank 0 saves checkpoints."

Made-with: Cursor
When a run is deleted through Interactive Control while a worker is
still training, the MetricLoggingCallback now checks the run status
in the internal DB before logging metrics. If deleted, training stops
gracefully via control.should_training_stop. The controller also
cancels any scheduled worker tasks for the deleted run to prevent
workers from picking them up.

Made-with: Cursor
…t marker

- Remove duplicate copy() method and conflict marker in rag_pipeline.py
- Consolidate duplicate imports in fit controller, worker_actor, and evals controller

Made-with: Cursor
Consolidate duplicate ray.init() from _init_fit_mode() and
_init_evals_mode() into a single _init_ray() method that connects
to an existing cluster or starts a new one. Move Ray start/stop
to the CLI (setup/start.sh) so a single cluster persists across
sequential experiments without port conflicts. Experiment.end()
now kills named fit workers instead of shutting down the cluster.

Made-with: Cursor
Remove unused imports across 9 files (MLflowConfig, ICStatus,
PipelineStatus, RunStatus, get_dispatcher_headers, DBException,
extract_pipeline_display_metadata, RFLogger, warnings, Any, and a
duplicate contextlib import). Fix duplicate _verify_sufficient_model_size
call in shm_manager, duplicate torch package installs in cli, wrong
pkill pattern in start.sh, incorrect error message in trainer, duplicate
type hint in rag_pipeline, and stale module path references pointing to
the old rapidfireai/evals/utils/constants.py location.

Made-with: Cursor
Non-Colab environments now must run `rapidfireai start` before creating
an experiment, raising a clear ConnectionError otherwise. Colab retains
the auto-start fallback. Also reformats pgvector RAG tutorial notebook.

Made-with: Cursor
Made-with: Cursor
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Mode detection fails for sequential fit-then-evals experiments
    • The dispatcher now reads mode from the running experiment config and Experiment initialization persists that mode, with legacy table-based fallback only when config mode is missing.

Create PR

Or push these changes by commenting:

@cursor push 845d4ef266
Preview (845d4ef266)
diff --git a/rapidfireai/dispatcher/dispatcher.py b/rapidfireai/dispatcher/dispatcher.py
--- a/rapidfireai/dispatcher/dispatcher.py
+++ b/rapidfireai/dispatcher/dispatcher.py
@@ -214,10 +214,13 @@
         try:
             experiment = self.db.get_running_experiment()
 
-            # Detect mode by checking if we have runs or pipelines
-            has_runs = len(self.db.get_all_runs()) > 0
-            has_pipelines = len(self.db.get_all_pipelines()) > 0
-            mode = "fit" if has_runs else ("evals" if has_pipelines else "unknown")
+            # Prefer explicit mode persisted on the running experiment.
+            mode = experiment.get("config", {}).get("mode")
+            if mode not in ("fit", "evals"):
+                # Backward-compatible fallback for older experiments without mode in config.
+                has_runs = len(self.db.get_all_runs()) > 0
+                has_pipelines = len(self.db.get_all_pipelines()) > 0
+                mode = "fit" if has_runs else ("evals" if has_pipelines else "unknown")
 
             return jsonify({
                 "experiment_id": experiment["experiment_id"],

diff --git a/rapidfireai/experiment.py b/rapidfireai/experiment.py
--- a/rapidfireai/experiment.py
+++ b/rapidfireai/experiment.py
@@ -176,6 +176,7 @@
             )
         except Exception as e:
             raise ExperimentException(f"Error creating experiment: {e}, traceback: {traceback.format_exc()}") from e
+        self.db.update_experiment_config(self.experiment_id, mode=self.mode)
 
         # Create logger
         try:
@@ -234,6 +235,7 @@
 
         # Create database reference
         self.db = RfDb()
+        self.db.update_experiment_config(self.experiment_id, mode=self.mode)
 
         try:
             metric_loggers = RFMetricLogger.get_default_metric_loggers(experiment_name=self.experiment_name)

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

# Detect mode by checking if we have runs or pipelines
has_runs = len(self.db.get_all_runs()) > 0
has_pipelines = len(self.db.get_all_pipelines()) > 0
mode = "fit" if has_runs else ("evals" if has_pipelines else "unknown")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mode detection fails for sequential fit-then-evals experiments

Medium Severity

The get_running_experiment endpoint detects mode by checking get_all_runs() and get_all_pipelines() table contents. Since has_runs takes priority over has_pipelines, running a fit experiment followed by an evals experiment (a key use case per the PR description) will incorrectly report mode = "fit" because old runs persist in the database. The mode could instead be read from the experiment's config JSON column or determined from the current experiment's actual mode.

Fix in Cursor Fix in Web

@humaira-rf
Copy link
Copy Markdown
Collaborator

Screenshot 2026-03-24 at 9 55 13 AM

rapidfire.log is currently cluttered with training-related entries that should be directed to training.log. We should update the logging to ensure these logs are routed to the correct file for better readability.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants