Skip to content

Commit ab2e3f0

Browse files
svartalfsentrivana
andauthored
fix(integrations/ray): Correctly pass keyword arguments to ray.remote function (#4430)
Monkey-patched implementation was passing the provided keyword arguments incorrectly due to a typo - "*kwargs" was used instead of "**kwargs" twice. Fixed integration started hitting an assert in the Ray codebase that requires for users to use "@ray.remote" decorator either with no arguments and no parentheses, or with some of the arguments provided. An additional wrapper function was added to support both scenarios. --------- Co-authored-by: Ivana Kellyer <[email protected]>
1 parent 7804260 commit ab2e3f0

File tree

2 files changed

+81
-65
lines changed

2 files changed

+81
-65
lines changed

sentry_sdk/integrations/ray.py

Lines changed: 70 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -42,73 +42,81 @@ def _patch_ray_remote():
4242
old_remote = ray.remote
4343

4444
@functools.wraps(old_remote)
45-
def new_remote(f, *args, **kwargs):
46-
# type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any]
45+
def new_remote(f=None, *args, **kwargs):
46+
# type: (Optional[Callable[..., Any]], *Any, **Any) -> Callable[..., Any]
47+
4748
if inspect.isclass(f):
4849
# Ray Actors
4950
# (https://docs.ray.io/en/latest/ray-core/actors.html)
5051
# are not supported
5152
# (Only Ray Tasks are supported)
52-
return old_remote(f, *args, *kwargs)
53-
54-
def _f(*f_args, _tracing=None, **f_kwargs):
55-
# type: (Any, Optional[dict[str, Any]], Any) -> Any
56-
"""
57-
Ray Worker
58-
"""
59-
_check_sentry_initialized()
60-
61-
transaction = sentry_sdk.continue_trace(
62-
_tracing or {},
63-
op=OP.QUEUE_TASK_RAY,
64-
name=qualname_from_function(f),
65-
origin=RayIntegration.origin,
66-
source=TransactionSource.TASK,
67-
)
68-
69-
with sentry_sdk.start_transaction(transaction) as transaction:
70-
try:
71-
result = f(*f_args, **f_kwargs)
72-
transaction.set_status(SPANSTATUS.OK)
73-
except Exception:
74-
transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
75-
exc_info = sys.exc_info()
76-
_capture_exception(exc_info)
77-
reraise(*exc_info)
78-
79-
return result
80-
81-
rv = old_remote(_f, *args, *kwargs)
82-
old_remote_method = rv.remote
83-
84-
def _remote_method_with_header_propagation(*args, **kwargs):
85-
# type: (*Any, **Any) -> Any
86-
"""
87-
Ray Client
88-
"""
89-
with sentry_sdk.start_span(
90-
op=OP.QUEUE_SUBMIT_RAY,
91-
name=qualname_from_function(f),
92-
origin=RayIntegration.origin,
93-
) as span:
94-
tracing = {
95-
k: v
96-
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
97-
}
98-
try:
99-
result = old_remote_method(*args, **kwargs, _tracing=tracing)
100-
span.set_status(SPANSTATUS.OK)
101-
except Exception:
102-
span.set_status(SPANSTATUS.INTERNAL_ERROR)
103-
exc_info = sys.exc_info()
104-
_capture_exception(exc_info)
105-
reraise(*exc_info)
106-
107-
return result
108-
109-
rv.remote = _remote_method_with_header_propagation
110-
111-
return rv
53+
return old_remote(f, *args, **kwargs)
54+
55+
def wrapper(user_f):
56+
# type: (Callable[..., Any]) -> Any
57+
def new_func(*f_args, _tracing=None, **f_kwargs):
58+
# type: (Any, Optional[dict[str, Any]], Any) -> Any
59+
_check_sentry_initialized()
60+
61+
transaction = sentry_sdk.continue_trace(
62+
_tracing or {},
63+
op=OP.QUEUE_TASK_RAY,
64+
name=qualname_from_function(user_f),
65+
origin=RayIntegration.origin,
66+
source=TransactionSource.TASK,
67+
)
68+
69+
with sentry_sdk.start_transaction(transaction) as transaction:
70+
try:
71+
result = user_f(*f_args, **f_kwargs)
72+
transaction.set_status(SPANSTATUS.OK)
73+
except Exception:
74+
transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
75+
exc_info = sys.exc_info()
76+
_capture_exception(exc_info)
77+
reraise(*exc_info)
78+
79+
return result
80+
81+
if f:
82+
rv = old_remote(new_func)
83+
else:
84+
rv = old_remote(*args, **kwargs)(new_func)
85+
old_remote_method = rv.remote
86+
87+
def _remote_method_with_header_propagation(*args, **kwargs):
88+
# type: (*Any, **Any) -> Any
89+
"""
90+
Ray Client
91+
"""
92+
with sentry_sdk.start_span(
93+
op=OP.QUEUE_SUBMIT_RAY,
94+
name=qualname_from_function(user_f),
95+
origin=RayIntegration.origin,
96+
) as span:
97+
tracing = {
98+
k: v
99+
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
100+
}
101+
try:
102+
result = old_remote_method(*args, **kwargs, _tracing=tracing)
103+
span.set_status(SPANSTATUS.OK)
104+
except Exception:
105+
span.set_status(SPANSTATUS.INTERNAL_ERROR)
106+
exc_info = sys.exc_info()
107+
_capture_exception(exc_info)
108+
reraise(*exc_info)
109+
110+
return result
111+
112+
rv.remote = _remote_method_with_header_propagation
113+
114+
return rv
115+
116+
if f is not None:
117+
return wrapper(f)
118+
else:
119+
return wrapper
112120

113121
ray.remote = new_remote
114122

tests/integrations/ray/test_ray.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ def read_error_from_log(job_id):
5959

6060

6161
@pytest.mark.forked
62-
def test_tracing_in_ray_tasks():
62+
@pytest.mark.parametrize(
63+
"task_options", [{}, {"num_cpus": 0, "memory": 1024 * 1024 * 10}]
64+
)
65+
def test_tracing_in_ray_tasks(task_options):
6366
setup_sentry()
6467

6568
ray.init(
@@ -69,14 +72,19 @@ def test_tracing_in_ray_tasks():
6972
}
7073
)
7174

72-
# Setup ray task
73-
@ray.remote
7475
def example_task():
7576
with sentry_sdk.start_span(op="task", name="example task step"):
7677
...
7778

7879
return sentry_sdk.get_client().transport.envelopes
7980

81+
# Setup ray task, calling decorator directly instead of @,
82+
# to accommodate for test parametrization
83+
if task_options:
84+
example_task = ray.remote(**task_options)(example_task)
85+
else:
86+
example_task = ray.remote(example_task)
87+
8088
with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
8189
worker_envelopes = ray.get(example_task.remote())
8290

0 commit comments

Comments
 (0)