Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test fails with Dask 2024.11.0+ #10994

Open
hcho3 opened this issue Nov 12, 2024 · 8 comments
Open

Test fails with Dask 2024.11.0+ #10994

hcho3 opened this issue Nov 12, 2024 · 8 comments

Comments

@hcho3
Copy link
Collaborator

hcho3 commented Nov 12, 2024

https://github.com/dmlc/xgboost/actions/runs/11753771153/job/32747003155

E                       distributed.client.FutureCancelledError: ('_argmax-06657a445bd2e0d811c6ff48d5860817', 24) cancelled for reason: unknown.

../../mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/client.py:2427: FutureCancelledError
-------------------------------------------------- Captured log setup --------------------------------------------------
INFO     distributed.http.proxy:proxy.py:85 To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO     distributed.scheduler:scheduler.py:1755 State start
INFO     distributed.scheduler:scheduler.py:4225   Scheduler at:     tcp://127.0.0.1:45365
INFO     distributed.scheduler:scheduler.py:4240   dashboard at:  http://127.0.0.1:35259/status
INFO     distributed.scheduler:scheduler.py:8115 Registering Worker plugin shuffle
INFO     distributed.nanny:nanny.py:368         Start Nanny at: 'tcp://127.0.0.1:44537'
INFO     distributed.nanny:nanny.py:368         Start Nanny at: 'tcp://127.0.0.1:32881'
INFO     distributed.scheduler:scheduler.py:4579 Register worker <WorkerState 'tcp://127.0.0.1:41799', name: 0, status: init, memory: 0, processing: 0>
INFO     distributed.scheduler:scheduler.py:6171 Starting worker compute stream, tcp://127.0.0.1:41799
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:51116
INFO     distributed.scheduler:scheduler.py:4579 Register worker <WorkerState 'tcp://127.0.0.1:42755', name: 1, status: init, memory: 0, processing: 0>
INFO     distributed.scheduler:scheduler.py:6171 Starting worker compute stream, tcp://127.0.0.1:42755
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:51122
INFO     distributed.scheduler:scheduler.py:5925 Receive client connection: Client-1b5e6d99-a113-11ef-bd45-00c0cab020ae
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:51132
-------------------------------------------------- Captured log call ---------------------------------------------------
INFO     distributed.worker:worker.py:3171 Run out-of-band function '_start_tracker'
INFO     distributed.scheduler:scheduler.py:5925 Receive client connection: Client-worker-1c041fdc-a113-11ef-bd59-00c0cab020ae
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:51160
INFO     distributed.scheduler:scheduler.py:5925 Receive client connection: Client-worker-1c049a95-a113-11ef-bd5c-00c0cab020ae
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:51200
ERROR    distributed.scheduler:scheduler.py:4956 No keys provided
ERROR    distributed.scheduler:scheduler.py:4956 No keys provided
ERROR    distributed.scheduler:scheduler.py:4956 No keys provided
INFO     distributed.scheduler:scheduler.py:4637 User asked for computation on lost data, ('_argmax-06657a445bd2e0d811c6ff48d5860817', 24)
ERROR    distributed.scheduler:scheduler.py:2091 Error transitioning 'Booster-86632f99bd6a485787c42e9e625f998e' from 'waiting' to 'processing'
Traceback (most recent call last):
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 2010, in _transition
    recommendations, client_msgs, worker_msgs = func(
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 2461, in _transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 3407, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 3579, in _task_to_msg
    assert ts.priority, ts
AssertionError: <TaskState 'Booster-86632f99bd6a485787c42e9e625f998e' processing>
ERROR    distributed.scheduler:scheduler.py:4956 <TaskState 'Booster-86632f99bd6a485787c42e9e625f998e' processing>
ERROR    distributed.protocol.pickle:pickle.py:79 Failed to serialize <TaskState 'Booster-86632f99bd6a485787c42e9e625f998e' processing>.
Traceback (most recent call last):
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 4911, in update_graph
    metrics = self._create_taskstate_from_graph(
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 4788, in _create_taskstate_from_graph
    self.transitions(recommendations, stimulus_id)
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 8227, in transitions
    self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 2127, in _transitions
    new_recs, new_cmsgs, new_wmsgs = self._transition(key, finish, stimulus_id)
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 2010, in _transition
    recommendations, client_msgs, worker_msgs = func(
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 2461, in _transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 3407, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/scheduler.py", line 3579, in _task_to_msg
    assert ts.priority, ts
AssertionError: <TaskState 'Booster-86632f99bd6a485787c42e9e625f998e' processing>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object '_inplace_predict_async.<locals>.mapped_predict'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 65, in dumps
    pickler.dump(x)
AttributeError: Can't pickle local object '_inplace_predict_async.<locals>.mapped_predict'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1529, in dumps
    cp.dump(obj)
  File "/home/phcho/mambaforge/envs/linux_cpu_test/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1295, in dump
    return super().dump(obj)
TypeError: cannot pickle 'weakref.ReferenceType' object
@trivialfis
Copy link
Member

Looking at the error, I can't be sure how to create an example for dask to debug it.

@hcho3
Copy link
Collaborator Author

hcho3 commented Nov 13, 2024

Were you able to reproduce the failure in the XGBoost pytest?

@trivialfis
Copy link
Member

Yes, on my local machine with the latest dask/distributed, running only the classification tests.

$ pytest -s -v ./tests/test_distributed/test_gpu_with_dask/ -k test_dask_classifier

@trivialfis
Copy link
Member

Dask is getting flakier with the new dask-expr and the new shuffle engine, might take some time to debug these.

@TomAugspurger
Copy link

This patch less us progress a bit further in the tests, by avoiding the issue in dask/distributed#8998 (we don't want to disable optimization, so this is just for debugging).

diff --git a/python-package/xgboost/dask/__init__.py b/python-package/xgboost/dask/__init__.py
index 635bedc7d..f107d5a1e 100644
--- a/python-package/xgboost/dask/__init__.py
+++ b/python-package/xgboost/dask/__init__.py
@@ -397,7 +397,7 @@ class DaskDMatrix:
 
             """
             d = client.persist(d)
-            delayed_obj = d.to_delayed()
+            delayed_obj = d.to_delayed(optimize_graph=False)  # debugging.
             if isinstance(delayed_obj, numpy.ndarray):
                 # da.Array returns an array to delayed objects
                 check_columns(delayed_obj)

I believe that finishes training, and errors during predict:

__________________________________________________________________________________________________________________________________ test_dask_classifier[boosting] __________________________________________________________________________________________________________________________________
tests/test_distributed/test_with_dask/test_with_dask.py:738: in test_dask_classifier
    run_dask_classifier(X, y, w, model, None, "cpu", client, 10)
tests/test_distributed/test_with_dask/test_with_dask.py:723: in run_dask_classifier
    prediction_df = classifier.predict(X_d).compute()
/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/dask/base.py:374: in compute
    (result,) = compute(self, traverse=False, **kwargs)
/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/dask/base.py:662: in compute
    results = schedule(dsk, keys, **kwargs)
/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/client.py:2426: in _gather
    raise exception.with_traceback(traceback)
E   distributed.client.FutureCancelledError: ('_argmax-066227e92a16f05ab1583bdbd8d7a30e', 3) cancelled for reason: unknown.

The scheduler logs reveal a couple of errors

INFO     distributed.worker:worker.py:3171 Run out-of-band function '_start_tracker'
INFO     distributed.scheduler:scheduler.py:5927 Receive client connection: Client-worker-0d01b752-e22d-11ef-9366-d8c49764f61b
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:39370
INFO     distributed.scheduler:scheduler.py:5927 Receive client connection: Client-worker-0d0390fb-e22d-11ef-9368-d8c49764f61b
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:39396
INFO     distributed.scheduler:scheduler.py:4637 User asked for computation on lost data, ('_argmax-066227e92a16f05ab1583bdbd8d7a30e', 3)
ERROR    distributed.scheduler:scheduler.py:2088 Error transitioning 'Booster-d7dc156c43ba4c319134fa81633b34f2' from 'waiting' to 'processing'
Traceback (most recent call last):
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 2007, in _transition
    recommendations, client_msgs, worker_msgs = func(
                                                ^^^^^
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 2458, in _transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 3407, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
                                 ^^^^^^^^^^^^^^^^^^^^^
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 3579, in _task_to_msg
    assert ts.priority, ts
           ^^^^^^^^^^^
AssertionError: <TaskState 'Booster-d7dc156c43ba4c319134fa81633b34f2' processing>
ERROR    distributed.scheduler:scheduler.py:4956 <TaskState 'Booster-d7dc156c43ba4c319134fa81633b34f2' processing>
ERROR    distributed.protocol.pickle:pickle.py:79 Failed to serialize <TaskState 'Booster-d7dc156c43ba4c319134fa81633b34f2' processing>.
Traceback (most recent call last):
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 4911, in update_graph
    metrics = self._create_taskstate_from_graph(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 4788, in _create_taskstate_from_graph
    self.transitions(recommendations, stimulus_id)
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 8231, in transitions
    self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 2124, in _transitions
    new_recs, new_cmsgs, new_wmsgs = self._transition(key, finish, stimulus_id)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 2007, in _transition
    recommendations, client_msgs, worker_msgs = func(
                                                ^^^^^
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 2458, in _transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 3407, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
                                 ^^^^^^^^^^^^^^^^^^^^^
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/scheduler.py", line 3579, in _task_to_msg
    assert ts.priority, ts
           ^^^^^^^^^^^
AssertionError: <TaskState 'Booster-d7dc156c43ba4c319134fa81633b34f2' processing>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get local object '_inplace_predict_async.<locals>.mapped_predict'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 65, in dumps
    pickler.dump(x)
AttributeError: Can't get local object '_inplace_predict_async.<locals>.mapped_predict'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1537, in dumps
    cp.dump(obj)
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1303, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle 'weakref.ReferenceType' object

The log message

INFO     distributed.scheduler:scheduler.py:4637 User asked for computation on lost data, ('_argmax-066227e92a16f05ab1583bdbd8d7a30e', 3)

Is probably relevant. Seems like Dask lost track of some data unexpectedly. After that, we have some issue on this Booster task not having a priority set, which causes the AssertionError.

The final

ERROR    distributed.protocol.pickle:pickle.py:79 Failed to serialize <TaskState 'Booster-d7dc156c43ba4c319134fa81633b34f2' processing>.

might be irrelevant. That could just be an issue with Dask attempting to recreate the error. I'm not sure yet.

@hcho3
Copy link
Collaborator Author

hcho3 commented Feb 4, 2025

@TomAugspurger Are you working with Dask 2024.11.2 or 2024.12.1 ? I get different error messages depending on the Dask version I use.

Dask 2024.11.2: distributed.client.FutureCancelledError: ('_argmax-xxx', 24) cancelled for reason: unknown.
Dask 2024.12.1: ValueError: Please reshape the input data into 2-dimensional matrix.

@TomAugspurger
Copy link

I've been testing multiple versions of dask, and my error messages match yours.

Right now, my suspicion is that there are multiple issues to work through

  1. Behavior change in Future / task resolution for Client.submit with complicated arguments dask/distributed#8998, which is preventing us from mixing Futures (persisted arrays) and Delayed objects. This is the error we're hitting with dask>=2024.12.1, because of a change to optimize the task graph in dask==2024.12.1.
  2. distributed.client.FutureCancelledError, coming from something I don't understand yet. We hit this with Dask 2024.11.2 (so it's been around for a little while), or with dask>=2024.12.1 when changing to_delalayed to set optimize_graph=False
diff --git a/python-package/xgboost/dask/__init__.py b/python-package/xgboost/dask/__init__.py
index 635bedc7d..20da9fcc4 100644
--- a/python-package/xgboost/dask/__init__.py
+++ b/python-package/xgboost/dask/__init__.py
@@ -397,7 +397,7 @@ class DaskDMatrix:
 
             """
             d = client.persist(d)
-            delayed_obj = d.to_delayed()
+            delayed_obj = d.to_delayed(optimize_graph=False)
             if isinstance(delayed_obj, numpy.ndarray):
                 # da.Array returns an array to delayed objects
                 check_columns(delayed_obj)

@TomAugspurger
Copy link

One other relevant point: this only fails when passing in a Dask DataFrame into classifier.predict (after including that optimize_graph=False) change. Things work fine with a Dask Array.

Things also work fine with a simpler predict.

# The key components
import dask.distributed
import xgboost.dask
import dask.array as da
import xgboost
import numpy as np

X = da.random.random(size=(100, 2))
y = da.random.choice(range(2), size=(100,))
X_d = X.to_dask_dataframe()

cluster = dask.distributed.LocalCluster()
client = dask.distributed.Client(cluster)

classifier = xgboost.dask.DaskXGBClassifier(eval_metric="merror")
classifier.fit(X, y)
prediction = classifier.predict_proba(X).compute()  # OK!
print("array OK!")


def mapped_predict(partition, *, booster: xgboost.Booster) -> np.ndarray:
    m = xgboost.DMatrix(partition)
    predt = booster.predict(m)
    return predt

# works fine
booster = classifier.get_booster()
X_d.map_partitions(mapped_predict, booster=booster).compute()[:1]

booster_f = client.scatter(booster, hash=False, broadcast=True)
meta = np.empty((0, classifier.n_classes_))
X_d.map_partitions(mapped_predict, booster=booster_f, meta=meta).compute()[:1]

It's only when using classifier.predict with a dask dataframe that we get the error:

prediction_df = classifier.predict_proba(X_d).compute()

hcho3 added a commit to hcho3/xgboost that referenced this issue Feb 4, 2025
hcho3 added a commit to hcho3/xgboost that referenced this issue Feb 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants