Replies: 7 comments 7 replies
-
|
clarifying the exact network topology. This is an incredibly robust setup. Your laptop acts as the Permanent Data Lake (exposed via VS Code public port forwarding), while Lightning AI acts as the Heavy Compute Engine & API Gateway (exposed via Cloudflare Tunnel). If Lightning AI resets, you just spin up a new instance, reconnect the Cloudflare Tunnel, and it instantly reconnects to your laptop's MinIO. Zero data loss. Here is the corrected architecture diagram and the precise prompt to give your AI developer. 🗺️ Corrected System Architecture (v0.12.0)📋 Copy & Paste This Prompt to Your AI Developer:Step 2: Refactor the JobWorker (The Poller)Modify Add state tracking to self._running = True
self.active_futures = {} # { ray_ref: str(job_id) }
self.job_metadata = {} # { str(job_id): dict }Replace def run_once(self) -> bool:
import ray
from .ray_tasks import execute_pipeline_remote
from io import BytesIO
processed_something = False
# 1. Poll active Ray futures (non-blocking)
if self.active_futures:
ready_refs, _ = ray.wait(list(self.active_futures.keys()), num_returns=len(self.active_futures), timeout=0)
for ref in ready_refs:
job_id = self.active_futures.pop(ref)
meta = self.job_metadata.pop(job_id)
try:
results = ray.get(ref)
self._finalize_job(job_id, meta, results)
except Exception as e:
logger.error(f"Ray task failed for job {job_id}: {e}", exc_info=True)
self._fail_job(job_id, str(e))
processed_something = True
# 2. Dispatch new jobs (Limit concurrency to avoid OOM)
if len(self.active_futures) < 2:
db = self._session_factory()
try:
job = db.query(Job).filter(Job.status == JobStatus.pending).order_by(Job.created_at.asc()).first()
if job:
job.status = JobStatus.running
db.commit()
db.refresh(job)
is_multi = job.job_type in ("image_multi", "video_multi")
tools_to_run = json.loads(job.tool_list) if is_multi else [job.tool]
self.job_metadata[str(job.job_id)] = {
"plugin_id": job.plugin_id,
"job_type": job.job_type,
"tools_to_run": tools_to_run,
"is_multi": is_multi
}
# Dispatch to Ray Cluster
future = execute_pipeline_remote.remote(
plugin_id=job.plugin_id,
tools_to_run=tools_to_run,
input_path=job.input_path,
job_type=job.job_type
)
self.active_futures[future] = str(job.job_id)
logger.info(f"Job {job.job_id} dispatched to Ray cluster")
processed_something = True
except Exception as e:
logger.error(f"Error dispatching job: {e}")
finally:
db.close()
return processed_something
def _finalize_job(self, job_id: str, meta: dict, results: dict):
from io import BytesIO
db = self._session_factory()
try:
job = db.query(Job).filter(Job.job_id == job_id).first()
if not job: return
tools_to_run = meta["tools_to_run"]
output_data: Dict[str, Any]
if job.job_type in ("video", "video_multi"):
first_tool_output = results[tools_to_run[0]]
if isinstance(first_tool_output, dict):
output_data = {
"job_id": str(job.job_id),
"status": "completed",
"total_frames": first_tool_output.get("total_frames"),
"frames": first_tool_output.get("frames",[]),
}
for key in first_tool_output:
if key not in ("total_frames", "frames"):
output_data[key] = first_tool_output[key]
elif isinstance(first_tool_output, list):
output_data = {
"job_id": str(job.job_id),
"status": "completed",
"total_frames": len(first_tool_output),
"frames": first_tool_output,
}
else:
output_data = {"job_id": str(job.job_id), "status": "completed", "results": first_tool_output}
elif meta["is_multi"]:
output_data = {"plugin_id": job.plugin_id, "tools": results}
else:
output_data = {"plugin_id": job.plugin_id, "tool": tools_to_run[0], "results": results[tools_to_run[0]]}
output_json = json.dumps(output_data)
output_path = self._storage.save_file(BytesIO(output_json.encode()), f"{job.job_type}/output/{job.job_id}.json")
job.status = JobStatus.completed
job.output_path = output_path
if job.job_type in ("video", "video_multi"):
job.progress = 100
db.commit()
send_job_completed(str(job.job_id))
logger.info(f"Job {job.job_id} completed successfully via Ray")
finally:
db.close()
def _fail_job(self, job_id: str, error_msg: str):
db = self._session_factory()
try:
job = db.query(Job).filter(Job.job_id == job_id).first()
if job:
job.status = JobStatus.failed
job.error_message = error_msg
db.commit()
finally:
db.close()(You can safely remove the old Step 3: Initialize Ray on Worker BootModify import ray
def run_worker_forever(plugin_manager: PluginRegistry):
# ... existing setup ...
ray.init(ignore_reinit_error=True)
logger.info("Ray initialized locally for workers")
worker.run_forever()(Apply the same Step 4: VerificationBefore committing, ensure Run the governance checks: uv run pre-commit run --all-files
cd server && uv run pytest tests/ -v
python scripts/scan_execution_violations.py
cd web-ui && npm run type-checkConfirm when analyzed. Do not proceed until you understand the Ray transition logic. |
Beta Was this translation helpful? Give feedback.
-
│ The forbidden_vocabulary.yaml marks 'distributed' and 'worker' as Phase 17+ concepts. Should I update this │ |
Beta Was this translation helpful? Give feedback.
-
|
│ For Ray initialization: should the worker init Ray locally (ray.init()) or connect to an existing head node │ |
Beta Was this translation helpful? Give feedback.
-
|
You just hit the exact reason why Platform Engineers love Tailscale. Yes, Ray brings Tailscale back into play. If you use Tailscale, you can completely bypass the ephemeral nature of Lightning AI's VMs. Because you are bringing Tailscale back to enable a true multi-node cluster (Laptop = Head, Lightning = Worker), I am changing my recommendation for the script. Change your vote to: Option 1: Yes - Add ScriptSince your laptop will now act as the Ray Head Node, having a simple script to boot it up with the correct ports and Tailscale IP binding will save you a ton of time. What to tell your AI Developer:Copy and paste this directly to your AI to finalize the plan for v0.12.0: "Let's go with the following choices:
Please execute v0.12.0 with these exact parameters." This is going to be an incredibly powerful setup. Your laptop manages the queue and the state, and Lightning AI does nothing but crunch numbers on the GPU over the VPN. Perfect architecture. |
Beta Was this translation helpful? Give feedback.
-
Plan Ready for ReviewPhase B — v0.12.0 Asynchronous Ray JobsGoalMove batch/video jobs onto Lightning's GPU under your laptop's Ray head. Infrastructure
Scope
Implementation PhasesPhase 1: Dependencies & GovernanceTasks:
Verification: cd server && uv sync
python scripts/scan_execution_violations.py
uv run pre-commit run --all-files 2>&1 | tee phase1.logCommit: Phase 2: Ray Tasks Module (Step 1 from discussion)Create
Tests (TDD — Write First):
Verification: cd server && uv run pytest tests/test_ray_tasks.py -v 2>&1 | tee phase2.logCommit: Phase 3: JobWorker Refactor (Step 2 from discussion)Modify
Tests (TDD — Write First):
Verification: cd server && uv run pytest tests/workers/ -v 2>&1 | tee phase3.log
cd server && uv run pytest tests/ -v --tb=short 2>&1 | tee phase3_full.logCommit: Phase 4: Ray Initialization (Step 3 from discussion)Modify worker entry point:
Tests (TDD — Write First):
Verification: cd server && uv run pytest tests/workers/test_ray_init.py -v 2>&1 | tee phase4.logCommit: Phase 5: ScriptsCreate scripts:
Verification: bash server/scripts/forge_ray_head.sh --help 2>&1 | tee phase5.logCommit: Phase 6: Integration TestsCreate
Verification: cd server && uv run pytest tests/integration/test_ray_integration.py -v 2>&1 | tee phase6.logCommit: Phase 7: Full Verification (Step 4 from discussion)Run all checks: uv run pre-commit run --all-files 2>&1 | tee phase7_precommit.log
cd server && uv run pytest tests/ -v 2>&1 | tee phase7_tests.log
python scripts/scan_execution_violations.py 2>&1 | tee phase7_governance.log
cd web-ui && npm run lint && npm run type-check && npm run test -- --run 2>&1 | tee phase7_webui.logCommit: Summary
Total Commits: 7 Each phase commit requires GREEN status on all tests before proceeding. Would you like to proceed?
|
Beta Was this translation helpful? Give feedback.
-
|
Since Lightning AI creates a fresh VM every time it wakes up, you need a setup process that takes less than 10 seconds and requires zero manual configuration. Because we are bringing Tailscale back to enable the Ray cluster, you can use a Tailscale Reusable Auth Key. This allows the fresh Lightning VM to silently rejoin your private network the moment it boots. Here is the exact step-by-step guide and the automated script to set up the Lightning AI side. Step 1: Get your Tailscale Reusable Auth KeyTo stop Tailscale from asking you to log in every time Lightning restarts:
Step 2: Create the Lightning Startup ScriptOn Lightning AI, create a script named #!/bin/bash
# --- CONFIGURATION ---
TAILSCALE_KEY="tskey-auth-YOUR_KEY_HERE" # Paste your key here
LAPTOP_IP="100.x.x.A" # Your laptop's Tailscale IP
echo "🚀 Setting up Lightning AI GPU Worker..."
# 1. Install and start Tailscale
if ! command -v tailscale &> /dev/null; then
echo "🦎 Installing Tailscale..."
curl -fsSL https://tailscale.com/install.sh | sh
fi
echo "🦎 Starting Tailscale daemon..."
sudo tailscaled > /workspace/tailscaled.log 2>&1 &
sleep 2
echo "🦎 Authenticating Tailscale..."
sudo tailscale up --authkey=${TAILSCALE_KEY} --hostname=lightning-gpu
echo "✅ Tailscale connected! IP: $(tailscale ip -4)"
# 2. Install Ray (if not already in the environment)
echo "📦 Installing Ray..."
pip install -q "ray[default]"
# 3. Connect to the Laptop's Ray Head Node
echo "🧠 Joining Ray Cluster..."
# Stop any existing local Ray instances just in case
ray stop
# Start Ray and point it to the Laptop
ray start --address="${LAPTOP_IP}:6379"
echo "✅ Lightning AI is now a Ray GPU Worker!"Make it executable: chmod +x /workspace/start_worker.shStep 3: Update the
|
Beta Was this translation helpful? Give feedback.
-
|
Here is your fully updated Lightning AI startup script ( I have integrated your exact Docker command. I also added a smart cleanup step ( 📝 Create / Update
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Phase B — v0.12.0 Asynchronous Ray jobs (video/batch)
Goal: Move batch/video jobs onto Lightning’s GPU under your laptop’s Ray head.
Infra:
Laptop: ray start --head ...
Lightning: ray start --address='HEAD_ADDR:6379' ...
Use Tailscale or Cloudflare Tunnel TCP so Lightning can reach the head.
Code:
Introduce RayExecutionService implementing your existing ExecutionService/job protocol.
Wrap heavy plugins in @ray.remote(num_gpus=1) functions.
Job submission: store job_uuid -> ray_ref in your existing JobStore.
Background poller: ray.wait + ray.get, then write results to S3/MinIO and mark job complete.
Scope: Only non‑interactive jobs (video, batch). WebSockets untouched.
Beta Was this translation helpful? Give feedback.
All reactions