From c497a1e9d2d9faef96070db688089d99d786299c Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Mon, 23 Aug 2021 16:58:02 +0800 Subject: [PATCH] [BACKPORT] Fix KeyError when remote function returns None (#2371) (#2375) --- LICENSE | 8 ++++++++ mars/dataframe/utils.py | 11 ++++++----- mars/lib/filesystem/_oss_lib/common.py | 4 ++-- mars/lib/filesystem/_oss_lib/handle.py | 2 +- mars/lib/filesystem/oss.py | 4 ++-- mars/remote/tests/test_remote_function.py | 12 ++++++++++++ mars/services/subtask/worker/processor.py | 10 +++++++--- mars/services/task/tests/test_service.py | 1 - mars/tensor/images/imread.py | 11 ++++++----- mars/tensor/linalg/cholesky.py | 9 +++------ mars/tensor/special/bessel.py | 4 +++- mars/tensor/special/convenience.py | 4 +++- mars/tensor/special/core.py | 14 -------------- mars/tensor/special/err_fresnel.py | 4 +++- mars/tensor/special/gamma_funcs.py | 4 +++- mars/tensor/special/info_theory.py | 5 ++++- mars/tests/test_utils.py | 9 ++++----- mars/utils.py | 14 ++++++++++---- 18 files changed, 77 insertions(+), 53 deletions(-) diff --git a/LICENSE b/LICENSE index 216a0ad5bc..9cf6751916 100644 --- a/LICENSE +++ b/LICENSE @@ -225,6 +225,7 @@ Apache Software Foundation License 2.0 - vineyard:0.1.2 - ray:1.4.0 - uvloop:0.15.2 +- oss2:2.12.0 BSD 3-Clause @@ -263,6 +264,13 @@ MIT License - react-router-dom:5.2.0 - material-ui:4.0.0 - lodash:4.17.0 +- dagre-d3:0.6.4 + + +ISC License +----------- + +- d3:7.0.0 CC0 1.0 Universal (CC0 1.0) diff --git a/mars/dataframe/utils.py b/mars/dataframe/utils.py index 96b1a36a9c..261d0f6a34 100644 --- a/mars/dataframe/utils.py +++ b/mars/dataframe/utils.py @@ -23,15 +23,16 @@ from pandas.api.types import is_string_dtype from pandas.api.extensions import ExtensionDtype from pandas.core.dtypes.cast import find_common_type -try: - import pyarrow as pa -except ImportError: # pragma: no cover - pass from ..core import Entity, ExecutableTuple from ..lib.mmh3 import hash as mmh_hash from ..tensor.utils import dictify_chunk_size, normalize_chunk_sizes -from ..utils import tokenize, sbytes, lazy_import +from ..utils import tokenize, sbytes, lazy_import, ModulePlaceholder + +try: + import pyarrow as pa +except ImportError: # pragma: no cover + pa = ModulePlaceholder('pyarrow') cudf = lazy_import('cudf', globals=globals(), rename='cudf') diff --git a/mars/lib/filesystem/_oss_lib/common.py b/mars/lib/filesystem/_oss_lib/common.py index c36601bf92..73b534e135 100644 --- a/mars/lib/filesystem/_oss_lib/common.py +++ b/mars/lib/filesystem/_oss_lib/common.py @@ -17,12 +17,12 @@ import os from ..base import path_type, stringify_path -from ....utils import ImportErrorHandler +from ....utils import ModulePlaceholder try: import oss2 except ImportError: - oss2 = ImportErrorHandler('oss2') + oss2 = ModulePlaceholder('oss2') # OSS api time out _oss_time_out = 10 diff --git a/mars/lib/filesystem/_oss_lib/handle.py b/mars/lib/filesystem/_oss_lib/handle.py index 5cd54ea8fb..1b7128336d 100644 --- a/mars/lib/filesystem/_oss_lib/handle.py +++ b/mars/lib/filesystem/_oss_lib/handle.py @@ -19,7 +19,7 @@ try: import oss2 except ImportError: - oss2 = ImportErrorHandler('oss2') + oss2 = ModulePlaceholder('oss2') class OSSIOBase(IOBase): diff --git a/mars/lib/filesystem/oss.py b/mars/lib/filesystem/oss.py index ff41cb6ec7..5f0ba549a6 100644 --- a/mars/lib/filesystem/oss.py +++ b/mars/lib/filesystem/oss.py @@ -19,12 +19,12 @@ from ._oss_lib.glob import glob from ._oss_lib.handle import OSSIOBase, dict_to_url from .base import FileSystem, path_type -from ...utils import implements, ImportErrorHandler +from ...utils import implements, ModulePlaceholder try: import oss2 except ImportError: - oss2 = ImportErrorHandler('oss2') + oss2 = ModulePlaceholder('oss2') _oss_time_out = 10 diff --git a/mars/remote/tests/test_remote_function.py b/mars/remote/tests/test_remote_function.py index 3b1c7edddd..833e357535 100644 --- a/mars/remote/tests/test_remote_function.py +++ b/mars/remote/tests/test_remote_function.py @@ -177,3 +177,15 @@ def f(t, x): result = s.execute().fetch() expected = (raw[raw > 0] * 3).sum() assert pytest.approx(result) == expected + + +def test_none_outputs(setup): + def f(*_args): + pass + + r1 = spawn(f, args=(0,)) + r2 = spawn(f, args=(r1, 1)) + r3 = spawn(f, args=(r1, 2)) + r4 = spawn(f, args=(r2, r3)) + + assert r4.execute().fetch() is None diff --git a/mars/services/subtask/worker/processor.py b/mars/services/subtask/worker/processor.py index b4a0a1aa7f..f48d160222 100644 --- a/mars/services/subtask/worker/processor.py +++ b/mars/services/subtask/worker/processor.py @@ -97,21 +97,25 @@ def subtask_id(self): return self.subtask.subtask_id async def _load_input_data(self): - keys = [] - gets = [] + keys, gets, accept_nones = [], [], [] for chunk in self._chunk_graph.iter_indep(): if isinstance(chunk.op, Fetch): keys.append(chunk.key) gets.append(self._storage_api.get.delay(chunk.key)) + accept_nones.append(True) elif isinstance(chunk.op, FetchShuffle): for key in self._chunk_key_to_data_keys[chunk.key]: keys.append(key) gets.append(self._storage_api.get.delay(key, error='ignore')) + accept_nones.append(False) if keys: logger.debug('Start getting input data, keys: %s, ' 'subtask id: %s', keys, self.subtask.subtask_id) inputs = await self._storage_api.get.batch(*gets) - self._datastore.update({key: get for key, get in zip(keys, inputs) if get is not None}) + self._datastore.update( + {key: get for key, get, accept_none in zip(keys, inputs, accept_nones) + if accept_none or get is not None} + ) logger.debug('Finish getting input data keys: %s, ' 'subtask id: %s', keys, self.subtask.subtask_id) return keys diff --git a/mars/services/task/tests/test_service.py b/mars/services/task/tests/test_service.py index 38344f506e..9bc5854246 100644 --- a/mars/services/task/tests/test_service.py +++ b/mars/services/task/tests/test_service.py @@ -318,7 +318,6 @@ def f(*_args, raises=False): progress_controller = get_context().get_remote_object('progress_controller') progress_controller.wait() get_context().set_progress(1.0) - return 'RET' # test non-fused DAGs r1 = mr.spawn(f) diff --git a/mars/tensor/images/imread.py b/mars/tensor/images/imread.py index bb8622bc66..c97e71e50b 100644 --- a/mars/tensor/images/imread.py +++ b/mars/tensor/images/imread.py @@ -13,18 +13,19 @@ # limitations under the License. import numpy as np -try: - from PIL import Image -except ImportError: - Image = None from ... import opcodes as OperandDef from ...serialization.serializables import AnyField from ...config import options from ...lib.filesystem import open_file, glob, file_size -from ...utils import ceildiv +from ...utils import ceildiv, ModulePlaceholder from ..operands import TensorOperandMixin, TensorOperand +try: + from PIL import Image +except ImportError: + Image = ModulePlaceholder('PIL') + def _read_image(fpath): return np.asarray(Image.open(fpath)) diff --git a/mars/tensor/linalg/cholesky.py b/mars/tensor/linalg/cholesky.py index 0f9925f5ed..b5ccd89c9b 100644 --- a/mars/tensor/linalg/cholesky.py +++ b/mars/tensor/linalg/cholesky.py @@ -139,13 +139,10 @@ def execute(cls, ctx, op): with device(device_id): if xp is np: - try: - import scipy.linalg + import scipy.linalg - ctx[chunk.key] = scipy.linalg.cholesky(a, lower=op.lower) - return - except ImportError: # pragma: no cover - pass + ctx[chunk.key] = scipy.linalg.cholesky(a, lower=op.lower) + return r = xp.linalg.cholesky(a) if not chunk.op.lower: diff --git a/mars/tensor/special/bessel.py b/mars/tensor/special/bessel.py index e600cbafb7..72383574df 100644 --- a/mars/tensor/special/bessel.py +++ b/mars/tensor/special/bessel.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import scipy.special as spspecial + from ..arithmetic.utils import arithmetic_operand from ..utils import infer_dtype, implement_scipy -from .core import spspecial, TensorSpecialBinOp, _register_special_op +from .core import TensorSpecialBinOp, _register_special_op @_register_special_op diff --git a/mars/tensor/special/convenience.py b/mars/tensor/special/convenience.py index 94fce49911..17de03c9d6 100644 --- a/mars/tensor/special/convenience.py +++ b/mars/tensor/special/convenience.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import scipy.special as spspecial + from ..utils import infer_dtype, implement_scipy -from .core import spspecial, TensorSpecialBinOp, _register_special_op +from .core import TensorSpecialBinOp, _register_special_op @_register_special_op diff --git a/mars/tensor/special/core.py b/mars/tensor/special/core.py index 98a52a6fd9..51f82b7e8f 100644 --- a/mars/tensor/special/core.py +++ b/mars/tensor/special/core.py @@ -13,20 +13,6 @@ # limitations under the License. -class _EmptyStub: - def __init__(self, obj): - self._obj = obj - - def __getattr__(self, item): - return getattr(self._obj, item, None) - - -try: - import scipy.special - spspecial = _EmptyStub(scipy.special) -except ImportError: # pragma: no cover - spspecial = _EmptyStub(None) - from ... import opcodes from ..arithmetic.core import TensorUnaryOp, TensorBinOp, TensorMultiOp from ..array_utils import np, cp, sparse, convert_order, as_same_device, device diff --git a/mars/tensor/special/err_fresnel.py b/mars/tensor/special/err_fresnel.py index de09314aad..eb8a9a26d4 100644 --- a/mars/tensor/special/err_fresnel.py +++ b/mars/tensor/special/err_fresnel.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import scipy.special as spspecial + from ..arithmetic.utils import arithmetic_operand from ..utils import infer_dtype, implement_scipy -from .core import spspecial, TensorSpecialUnaryOp, _register_special_op +from .core import TensorSpecialUnaryOp, _register_special_op @_register_special_op diff --git a/mars/tensor/special/gamma_funcs.py b/mars/tensor/special/gamma_funcs.py index 588cc21285..0e217d369b 100644 --- a/mars/tensor/special/gamma_funcs.py +++ b/mars/tensor/special/gamma_funcs.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import scipy.special as spspecial + from ..arithmetic.utils import arithmetic_operand from ..utils import infer_dtype, implement_scipy -from .core import spspecial, TensorSpecialUnaryOp, TensorSpecialMultiOp, \ +from .core import TensorSpecialUnaryOp, TensorSpecialMultiOp, \ TensorSpecialBinOp, _register_special_op diff --git a/mars/tensor/special/info_theory.py b/mars/tensor/special/info_theory.py index 0ae356ce0f..95c1842062 100644 --- a/mars/tensor/special/info_theory.py +++ b/mars/tensor/special/info_theory.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import scipy.special as spspecial + from ..arithmetic.utils import arithmetic_operand from ..utils import infer_dtype, implement_scipy -from .core import spspecial, TensorSpecialUnaryOp, TensorSpecialBinOp, _register_special_op +from .core import TensorSpecialUnaryOp, TensorSpecialBinOp, \ + _register_special_op @_register_special_op diff --git a/mars/tests/test_utils.py b/mars/tests/test_utils.py index 1cbdd8d27f..5add44e220 100644 --- a/mars/tests/test_utils.py +++ b/mars/tests/test_utils.py @@ -398,12 +398,11 @@ async def main(): asyncio.run(main()) -def test_import_error_handler(): - try: - import required_module - except ImportError: - required_module = utils.ImportErrorHandler('required_module') +def test_module_placeholder(): + required_module = utils.ModulePlaceholder('required_module') + with pytest.raises(AttributeError): + required_module() with pytest.raises(AttributeError) as e: required_module.method() msg = e.value.args[0] diff --git a/mars/utils.py b/mars/utils.py index 8ece068bc2..66c01cad08 100644 --- a/mars/utils.py +++ b/mars/utils.py @@ -1262,9 +1262,15 @@ def get_chunk_key_to_data_keys(chunk_graph): return chunk_key_to_data_keys -class ImportErrorHandler: - def __init__(self, name: str): - self._name = name +class ModulePlaceholder: + def __init__(self, mod_name: str): + self._mod_name = mod_name + + def _raises(self): + raise AttributeError(f'{self._mod_name} is required but not installed.') def __getattr__(self, key): - raise AttributeError(f'{self._name} is required but not installed.') + self._raises() + + def __call__(self, *_args, **_kwargs): + self._raises()