Skip to content

Commit

Permalink
[BACKPORT] Fix KeyError when remote function returns None (#2371) (#2375
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wjsi authored Aug 23, 2021
1 parent b35bd9d commit c497a1e
Show file tree
Hide file tree
Showing 18 changed files with 77 additions and 53 deletions.
8 changes: 8 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ Apache Software Foundation License 2.0
- vineyard:0.1.2
- ray:1.4.0
- uvloop:0.15.2
- oss2:2.12.0


BSD 3-Clause
Expand Down Expand Up @@ -263,6 +264,13 @@ MIT License
- react-router-dom:5.2.0
- material-ui:4.0.0
- lodash:4.17.0
- dagre-d3:0.6.4


ISC License
-----------

- d3:7.0.0


CC0 1.0 Universal (CC0 1.0)
Expand Down
11 changes: 6 additions & 5 deletions mars/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
from pandas.api.types import is_string_dtype
from pandas.api.extensions import ExtensionDtype
from pandas.core.dtypes.cast import find_common_type
try:
import pyarrow as pa
except ImportError: # pragma: no cover
pass

from ..core import Entity, ExecutableTuple
from ..lib.mmh3 import hash as mmh_hash
from ..tensor.utils import dictify_chunk_size, normalize_chunk_sizes
from ..utils import tokenize, sbytes, lazy_import
from ..utils import tokenize, sbytes, lazy_import, ModulePlaceholder

try:
import pyarrow as pa
except ImportError: # pragma: no cover
pa = ModulePlaceholder('pyarrow')

cudf = lazy_import('cudf', globals=globals(), rename='cudf')

Expand Down
4 changes: 2 additions & 2 deletions mars/lib/filesystem/_oss_lib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import os

from ..base import path_type, stringify_path
from ....utils import ImportErrorHandler
from ....utils import ModulePlaceholder

try:
import oss2
except ImportError:
oss2 = ImportErrorHandler('oss2')
oss2 = ModulePlaceholder('oss2')

# OSS api time out
_oss_time_out = 10
Expand Down
2 changes: 1 addition & 1 deletion mars/lib/filesystem/_oss_lib/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
try:
import oss2
except ImportError:
oss2 = ImportErrorHandler('oss2')
oss2 = ModulePlaceholder('oss2')


class OSSIOBase(IOBase):
Expand Down
4 changes: 2 additions & 2 deletions mars/lib/filesystem/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
from ._oss_lib.glob import glob
from ._oss_lib.handle import OSSIOBase, dict_to_url
from .base import FileSystem, path_type
from ...utils import implements, ImportErrorHandler
from ...utils import implements, ModulePlaceholder

try:
import oss2
except ImportError:
oss2 = ImportErrorHandler('oss2')
oss2 = ModulePlaceholder('oss2')

_oss_time_out = 10

Expand Down
12 changes: 12 additions & 0 deletions mars/remote/tests/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,15 @@ def f(t, x):
result = s.execute().fetch()
expected = (raw[raw > 0] * 3).sum()
assert pytest.approx(result) == expected


def test_none_outputs(setup):
def f(*_args):
pass

r1 = spawn(f, args=(0,))
r2 = spawn(f, args=(r1, 1))
r3 = spawn(f, args=(r1, 2))
r4 = spawn(f, args=(r2, r3))

assert r4.execute().fetch() is None
10 changes: 7 additions & 3 deletions mars/services/subtask/worker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,25 @@ def subtask_id(self):
return self.subtask.subtask_id

async def _load_input_data(self):
keys = []
gets = []
keys, gets, accept_nones = [], [], []
for chunk in self._chunk_graph.iter_indep():
if isinstance(chunk.op, Fetch):
keys.append(chunk.key)
gets.append(self._storage_api.get.delay(chunk.key))
accept_nones.append(True)
elif isinstance(chunk.op, FetchShuffle):
for key in self._chunk_key_to_data_keys[chunk.key]:
keys.append(key)
gets.append(self._storage_api.get.delay(key, error='ignore'))
accept_nones.append(False)
if keys:
logger.debug('Start getting input data, keys: %s, '
'subtask id: %s', keys, self.subtask.subtask_id)
inputs = await self._storage_api.get.batch(*gets)
self._datastore.update({key: get for key, get in zip(keys, inputs) if get is not None})
self._datastore.update(
{key: get for key, get, accept_none in zip(keys, inputs, accept_nones)
if accept_none or get is not None}
)
logger.debug('Finish getting input data keys: %s, '
'subtask id: %s', keys, self.subtask.subtask_id)
return keys
Expand Down
1 change: 0 additions & 1 deletion mars/services/task/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ def f(*_args, raises=False):
progress_controller = get_context().get_remote_object('progress_controller')
progress_controller.wait()
get_context().set_progress(1.0)
return 'RET'

# test non-fused DAGs
r1 = mr.spawn(f)
Expand Down
11 changes: 6 additions & 5 deletions mars/tensor/images/imread.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
# limitations under the License.

import numpy as np
try:
from PIL import Image
except ImportError:
Image = None

from ... import opcodes as OperandDef
from ...serialization.serializables import AnyField
from ...config import options
from ...lib.filesystem import open_file, glob, file_size
from ...utils import ceildiv
from ...utils import ceildiv, ModulePlaceholder
from ..operands import TensorOperandMixin, TensorOperand

try:
from PIL import Image
except ImportError:
Image = ModulePlaceholder('PIL')


def _read_image(fpath):
return np.asarray(Image.open(fpath))
Expand Down
9 changes: 3 additions & 6 deletions mars/tensor/linalg/cholesky.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,10 @@ def execute(cls, ctx, op):

with device(device_id):
if xp is np:
try:
import scipy.linalg
import scipy.linalg

ctx[chunk.key] = scipy.linalg.cholesky(a, lower=op.lower)
return
except ImportError: # pragma: no cover
pass
ctx[chunk.key] = scipy.linalg.cholesky(a, lower=op.lower)
return

r = xp.linalg.cholesky(a)
if not chunk.op.lower:
Expand Down
4 changes: 3 additions & 1 deletion mars/tensor/special/bessel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import scipy.special as spspecial

from ..arithmetic.utils import arithmetic_operand
from ..utils import infer_dtype, implement_scipy
from .core import spspecial, TensorSpecialBinOp, _register_special_op
from .core import TensorSpecialBinOp, _register_special_op


@_register_special_op
Expand Down
4 changes: 3 additions & 1 deletion mars/tensor/special/convenience.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import scipy.special as spspecial

from ..utils import infer_dtype, implement_scipy
from .core import spspecial, TensorSpecialBinOp, _register_special_op
from .core import TensorSpecialBinOp, _register_special_op


@_register_special_op
Expand Down
14 changes: 0 additions & 14 deletions mars/tensor/special/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,6 @@
# limitations under the License.


class _EmptyStub:
def __init__(self, obj):
self._obj = obj

def __getattr__(self, item):
return getattr(self._obj, item, None)


try:
import scipy.special
spspecial = _EmptyStub(scipy.special)
except ImportError: # pragma: no cover
spspecial = _EmptyStub(None)

from ... import opcodes
from ..arithmetic.core import TensorUnaryOp, TensorBinOp, TensorMultiOp
from ..array_utils import np, cp, sparse, convert_order, as_same_device, device
Expand Down
4 changes: 3 additions & 1 deletion mars/tensor/special/err_fresnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import scipy.special as spspecial

from ..arithmetic.utils import arithmetic_operand
from ..utils import infer_dtype, implement_scipy
from .core import spspecial, TensorSpecialUnaryOp, _register_special_op
from .core import TensorSpecialUnaryOp, _register_special_op


@_register_special_op
Expand Down
4 changes: 3 additions & 1 deletion mars/tensor/special/gamma_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import scipy.special as spspecial

from ..arithmetic.utils import arithmetic_operand
from ..utils import infer_dtype, implement_scipy
from .core import spspecial, TensorSpecialUnaryOp, TensorSpecialMultiOp, \
from .core import TensorSpecialUnaryOp, TensorSpecialMultiOp, \
TensorSpecialBinOp, _register_special_op


Expand Down
5 changes: 4 additions & 1 deletion mars/tensor/special/info_theory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import scipy.special as spspecial

from ..arithmetic.utils import arithmetic_operand
from ..utils import infer_dtype, implement_scipy
from .core import spspecial, TensorSpecialUnaryOp, TensorSpecialBinOp, _register_special_op
from .core import TensorSpecialUnaryOp, TensorSpecialBinOp, \
_register_special_op


@_register_special_op
Expand Down
9 changes: 4 additions & 5 deletions mars/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,11 @@ async def main():
asyncio.run(main())


def test_import_error_handler():
try:
import required_module
except ImportError:
required_module = utils.ImportErrorHandler('required_module')
def test_module_placeholder():
required_module = utils.ModulePlaceholder('required_module')

with pytest.raises(AttributeError):
required_module()
with pytest.raises(AttributeError) as e:
required_module.method()
msg = e.value.args[0]
Expand Down
14 changes: 10 additions & 4 deletions mars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1262,9 +1262,15 @@ def get_chunk_key_to_data_keys(chunk_graph):
return chunk_key_to_data_keys


class ImportErrorHandler:
def __init__(self, name: str):
self._name = name
class ModulePlaceholder:
def __init__(self, mod_name: str):
self._mod_name = mod_name

def _raises(self):
raise AttributeError(f'{self._mod_name} is required but not installed.')

def __getattr__(self, key):
raise AttributeError(f'{self._name} is required but not installed.')
self._raises()

def __call__(self, *_args, **_kwargs):
self._raises()

0 comments on commit c497a1e

Please sign in to comment.