Replies: 2 comments 1 reply
-
|
Here is the complete mechanical solution to exactly why the 🧠 The Root CauseThe When the background worker attempts to load the However, your 🛠️ The FixWe will do two things:
Apply these three search/replace blocks: 1. Surface silent plugin load errorsUpdate <<<<
server/app/ray_tasks.py
====
def _get_plugin_service() -> PluginServiceProtocol:
"""Get plugin service instance (lazy import to avoid circular deps)."""
from .plugin_loader import PluginRegistry
from .services.plugin_management_service import PluginManagementService
registry = PluginRegistry()
registry.load_plugins() # Issue #304: Load plugins from entry points
return PluginManagementService(registry)
>>>>
<<<<
server/app/ray_tasks.py
====
def _get_plugin_service() -> PluginServiceProtocol:
"""Get plugin service instance (lazy import to avoid circular deps)."""
from .plugin_loader import PluginRegistry
from .services.plugin_management_service import PluginManagementService
registry = PluginRegistry()
load_result = registry.load_plugins() # Issue #304: Load plugins from entry points
# Surface silent plugin initialization errors in Ray workers
if load_result.get("errors"):
logger.error(f"Ray Worker Plugin Load Errors: {load_result['errors']}")
return PluginManagementService(registry)
>>>><<<<
server/app/ray_tasks.py
====
args: Dict[str, Any] = {}
manifest = plugin_service.get_plugin_manifest(plugin_id)
if not manifest:
raise RuntimeError(f"Plugin '{plugin_id}' not found")
>>>>
<<<<
server/app/ray_tasks.py
====
args: Dict[str, Any] = {}
manifest = plugin_service.get_plugin_manifest(plugin_id)
if not manifest:
# Provide detailed error instead of just "not found"
available =[]
if hasattr(plugin_service, "registry") and hasattr(plugin_service.registry, "list"):
available = list(plugin_service.registry.list().keys())
raise RuntimeError(
f"Plugin '{plugin_id}' not found in Ray Worker process. "
f"Available plugins loaded in this worker: {available}. "
f"If the plugin is installed but missing here, it crashed during __init__. "
f"Check Ray worker logs above for 'Ray Worker Plugin Load Errors'."
)
>>>>2. Sync the Working Directory for Job WorkersUpdate <<<<
server/app/workers/worker.py
====
def init_ray() -> bool:
"""Initialize Ray with configurable mode.
v0.12.0: Supports both local and remote Ray initialization.
- If RAY_ADDRESS env var is set, connects to existing cluster
- Otherwise, starts local Ray instance
Returns:
True if initialization succeeded, False otherwise
"""
import os
import ray
try:
ray_address = os.environ.get("RAY_ADDRESS")
if ray_address:
ray.init(address=ray_address, ignore_reinit_error=True)
logger.info(f"Ray connected to cluster at {ray_address}")
else:
ray.init(ignore_reinit_error=True)
logger.info("Ray initialized locally")
return True
except Exception as e:
logger.error(f"Failed to initialize Ray: {e}")
return False
>>>>
<<<<
server/app/workers/worker.py
====
def init_ray() -> bool:
"""Initialize Ray with configurable mode.
v0.12.0: Supports both local and remote Ray initialization.
- If RAY_ADDRESS env var is set, connects to existing cluster
- Otherwise, starts local Ray instance
Returns:
True if initialization succeeded, False otherwise
"""
import os
from pathlib import Path
import ray
try:
# Ensure Ray workers share the exact same working directory
# and environment variables so plugins can find their models/files.
runtime_env = {
"working_dir": str(Path.cwd().resolve()),
"env_vars": {
k: v for k, v in os.environ.items()
if k.startswith(("FORGESYTE_", "RAY_")) or k in ["PYTHONPATH", "PATH"]
}
}
ray_address = os.environ.get("RAY_ADDRESS")
if ray_address:
ray.init(address=ray_address, ignore_reinit_error=True, runtime_env=runtime_env)
logger.info(f"Ray connected to cluster at {ray_address}")
else:
ray.init(ignore_reinit_error=True, runtime_env=runtime_env)
logger.info("Ray initialized locally with synced runtime_env")
return True
except Exception as e:
logger.error(f"Failed to initialize Ray: {e}")
return False
>>>>3. Sync the Working Directory for WebSocket ActorsAlso update <<<<
server/app/main.py
====
try:
if not ray.is_initialized():
ray_address = os.environ.get("RAY_ADDRESS")
if ray_address:
ray.init(address=ray_address, ignore_reinit_error=True)
logger.info(
f"Main process connected to Ray cluster at {ray_address}"
)
else:
ray.init(ignore_reinit_error=True)
logger.info("Main process initialized Ray locally")
except Exception as e:
>>>>
<<<<
server/app/main.py
====
try:
if not ray.is_initialized():
from pathlib import Path
ray_address = os.environ.get("RAY_ADDRESS")
# Sync environment to Ray actors (fixes missing plugins in WebSockets)
runtime_env = {
"working_dir": str(Path.cwd().resolve()),
"env_vars": {
k: v for k, v in os.environ.items()
if k.startswith(("FORGESYTE_", "RAY_")) or k in ["PYTHONPATH", "PATH"]
}
}
if ray_address:
ray.init(address=ray_address, ignore_reinit_error=True, runtime_env=runtime_env)
logger.info(
f"Main process connected to Ray cluster at {ray_address}"
)
else:
ray.init(ignore_reinit_error=True, runtime_env=runtime_env)
logger.info("Main process initialized Ray locally with runtime_env")
except Exception as e:
>>>>Next Steps After Applying
|
Beta Was this translation helpful? Give feedback.
-
|
You make a crucial architectural point. Using We absolutely want Option 2: Env vars only. Because Lightning Studio shares the filesystem across all processes, the Ray worker doesn't need the files copied to it. It just needs its Here is the refined, high-performance solution. We will inject the Current Working Directory directly into the worker's 1. Update Worker Initialization (Lightweight Env Sync)<<<<
server/app/workers/worker.py
====
def init_ray() -> bool:
"""Initialize Ray with configurable mode.
v0.12.0: Supports both local and remote Ray initialization.
- If RAY_ADDRESS env var is set, connects to existing cluster
- Otherwise, starts local Ray instance
Returns:
True if initialization succeeded, False otherwise
"""
import os
import ray
try:
ray_address = os.environ.get("RAY_ADDRESS")
if ray_address:
ray.init(address=ray_address, ignore_reinit_error=True)
logger.info(f"Ray connected to cluster at {ray_address}")
else:
ray.init(ignore_reinit_error=True)
logger.info("Ray initialized locally")
return True
except Exception as e:
logger.error(f"Failed to initialize Ray: {e}")
return False
>>>>
<<<<
server/app/workers/worker.py
====
def init_ray() -> bool:
"""Initialize Ray with configurable mode.
v0.12.0: Supports both local and remote Ray initialization.
- If RAY_ADDRESS env var is set, connects to existing cluster
- Otherwise, starts local Ray instance
Returns:
True if initialization succeeded, False otherwise
"""
import os
from pathlib import Path
import ray
try:
# Lightweight sync: Pass FORGESYTE_* vars and inject CWD into PYTHONPATH.
# This avoids the massive performance penalty of Ray's 'working_dir' upload
# while still allowing workers to resolve local plugins on a shared filesystem.
cwd = str(Path.cwd().resolve())
current_pythonpath = os.environ.get("PYTHONPATH", "")
new_pythonpath = f"{cwd}:{current_pythonpath}" if current_pythonpath else cwd
runtime_env = {
"env_vars": {
k: v for k, v in os.environ.items()
if k.startswith(("FORGESYTE_", "RAY_")) or k == "PATH"
}
}
runtime_env["env_vars"]["PYTHONPATH"] = new_pythonpath
ray_address = os.environ.get("RAY_ADDRESS")
if ray_address:
ray.init(address=ray_address, ignore_reinit_error=True, runtime_env=runtime_env)
logger.info(f"Ray connected to cluster at {ray_address}")
else:
ray.init(ignore_reinit_error=True, runtime_env=runtime_env)
logger.info("Ray initialized locally with synced env_vars")
return True
except Exception as e:
logger.error(f"Failed to initialize Ray: {e}")
return False
>>>>2. Update Main Server Ray Initialization (Lightweight Env Sync)<<<<
server/app/main.py
====
try:
if not ray.is_initialized():
ray_address = os.environ.get("RAY_ADDRESS")
if ray_address:
ray.init(address=ray_address, ignore_reinit_error=True)
logger.info(
f"Main process connected to Ray cluster at {ray_address}"
)
else:
ray.init(ignore_reinit_error=True)
logger.info("Main process initialized Ray locally")
except Exception as e:
>>>>
<<<<
server/app/main.py
====
try:
if not ray.is_initialized():
from pathlib import Path
ray_address = os.environ.get("RAY_ADDRESS")
# Lightweight sync: Avoid 'working_dir' upload penalty.
cwd = str(Path.cwd().resolve())
curr_pp = os.environ.get("PYTHONPATH", "")
new_pp = f"{cwd}:{curr_pp}" if curr_pp else cwd
runtime_env = {
"env_vars": {
k: v for k, v in os.environ.items()
if k.startswith(("FORGESYTE_", "RAY_")) or k == "PATH"
}
}
runtime_env["env_vars"]["PYTHONPATH"] = new_pp
if ray_address:
ray.init(address=ray_address, ignore_reinit_error=True, runtime_env=runtime_env)
logger.info(
f"Main process connected to Ray cluster at {ray_address}"
)
else:
ray.init(ignore_reinit_error=True, runtime_env=runtime_env)
logger.info("Main process initialized Ray locally with synced env_vars")
except Exception as e:
>>>>3. Surface the actual error (crucial for debugging plugin crashes)We still need to make sure <<<<
server/app/ray_tasks.py
====
def _get_plugin_service() -> PluginServiceProtocol:
"""Get plugin service instance (lazy import to avoid circular deps)."""
from .plugin_loader import PluginRegistry
from .services.plugin_management_service import PluginManagementService
registry = PluginRegistry()
registry.load_plugins() # Issue #304: Load plugins from entry points
return PluginManagementService(registry)
>>>>
<<<<
server/app/ray_tasks.py
====
def _get_plugin_service() -> PluginServiceProtocol:
"""Get plugin service instance (lazy import to avoid circular deps)."""
from .plugin_loader import PluginRegistry
from .services.plugin_management_service import PluginManagementService
registry = PluginRegistry()
load_result = registry.load_plugins() # Issue #304: Load plugins from entry points
# Surface silent plugin initialization errors in Ray workers
if load_result.get("errors"):
logger.error(f"Ray Worker Plugin Load Errors: {load_result['errors']}")
return PluginManagementService(registry)
>>>><<<<
server/app/ray_tasks.py
====
args: Dict[str, Any] = {}
manifest = plugin_service.get_plugin_manifest(plugin_id)
if not manifest:
raise RuntimeError(f"Plugin '{plugin_id}' not found")
>>>>
<<<<
server/app/ray_tasks.py
====
args: Dict[str, Any] = {}
manifest = plugin_service.get_plugin_manifest(plugin_id)
if not manifest:
# Provide detailed error instead of just "not found"
available = |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
uploding imge using ocr plugins produces errors onlt when yoy view the jobs list nd elect the job .....
�[36mray::execute_pipeline_remote()�[39m (pid=71303, ip=100.66.116.64) File "/teamspace/studios/this_studio/forgesyte/server/app/ray_tasks.py", line 109, in execute_pipeline_remote return _execute_pipeline_impl( ^^^^^^^^^^^^^^^^^^^^^^^ File "/teamspace/studios/this_studio/forgesyte/server/app/ray_tasks.py", line 160, in _execute_pipeline_impl raise RuntimeError(f"Plugin '{plugin_id}' not found") RuntimeError: Plugin 'ocr' not found
i
Beta Was this translation helpful? Give feedback.
All reactions