Skip to content

Commit 2732b1b

Browse files
wjsihekaisheng
andauthored
[BACKPORT] Fixes md.read_csv when dtypes is not inferred correctly (#1606) (#1617)
Co-authored-by: He Kaisheng <[email protected]>
1 parent 4b2484f commit 2732b1b

File tree

5 files changed

+138
-51
lines changed

5 files changed

+138
-51
lines changed

mars/dataframe/datasource/read_csv.py

+13-16
Original file line numberDiff line numberDiff line change
@@ -185,14 +185,6 @@ def _tile_compressed(cls, op):
185185
columns_value=df.columns_value,
186186
chunks=[new_chunk], nsplits=nsplits)
187187

188-
@classmethod
189-
def _validate_dtypes(cls, dtypes, is_gpu):
190-
dtypes = dtypes.to_dict()
191-
# CuDF doesn't support object type, turn it to 'str'.
192-
if is_gpu:
193-
dtypes = dict((n, dt.name if dt != np.dtype('object') else 'str') for n, dt in dtypes.items())
194-
return dtypes
195-
196188
@classmethod
197189
def tile(cls, op):
198190
if op.compression:
@@ -270,8 +262,9 @@ def _pandas_read_csv(cls, f, op):
270262
# will replace null value with np.nan,
271263
# which will cause failure when converting to arrow string array
272264
csv_kwargs['keep_default_na'] = False
273-
df = pd.read_csv(b, sep=op.sep, names=op.names, index_col=op.index_col, usecols=usecols,
274-
dtype=dtypes.to_dict(), nrows=op.nrows, **csv_kwargs)
265+
csv_kwargs['dtype'] = cls._select_arrow_dtype(dtypes)
266+
df = pd.read_csv(b, sep=op.sep, names=op.names, index_col=op.index_col,
267+
usecols=usecols, nrows=op.nrows, **csv_kwargs)
275268
if op.keep_usecols_order:
276269
df = df[op.usecols]
277270
return df
@@ -287,8 +280,7 @@ def _cudf_read_csv(cls, op): # pragma: no cover
287280
df = cudf.read_csv(op.path, byte_range=(op.offset, op.size), sep=op.sep, usecols=usecols, **csv_kwargs)
288281
else:
289282
df = cudf.read_csv(op.path, byte_range=(op.offset, op.size), sep=op.sep, names=op.names,
290-
usecols=usecols, dtype=cls._validate_dtypes(op.outputs[0].dtypes, op.gpu),
291-
nrows=op.nrows, **csv_kwargs)
283+
usecols=usecols, nrows=op.nrows, **csv_kwargs)
292284

293285
if op.keep_usecols_order:
294286
df = df[op.usecols]
@@ -298,6 +290,11 @@ def _cudf_read_csv(cls, op): # pragma: no cover
298290
def _contains_arrow_dtype(cls, dtypes):
299291
return any(isinstance(dtype, ArrowStringDtype) for dtype in dtypes)
300292

293+
@classmethod
294+
def _select_arrow_dtype(cls, dtypes):
295+
return dict((c, dtype) for c, dtype in dtypes.items() if
296+
isinstance(dtype, ArrowStringDtype))
297+
301298
@classmethod
302299
def execute(cls, ctx, op):
303300
xdf = cudf if op.gpu else pd
@@ -308,15 +305,15 @@ def execute(cls, ctx, op):
308305
if op.compression is not None:
309306
# As we specify names and dtype, we need to skip header rows
310307
csv_kwargs['skiprows'] = 1 if op.header == 'infer' else op.header
311-
dtypes = cls._validate_dtypes(op.outputs[0].dtypes, op.gpu)
312-
if contain_arrow_dtype(dtypes.values()):
308+
dtypes = op.outputs[0].dtypes
309+
if contain_arrow_dtype(dtypes):
313310
# when keep_default_na is True which is default,
314311
# will replace null value with np.nan,
315312
# which will cause failure when converting to arrow string array
316313
csv_kwargs['keep_default_na'] = False
314+
csv_kwargs['dtype'] = cls._select_arrow_dtype(dtypes)
317315
df = xdf.read_csv(f, sep=op.sep, names=op.names, index_col=op.index_col,
318-
usecols=op.usecols, dtype=dtypes,
319-
nrows=op.nrows, **csv_kwargs)
316+
usecols=op.usecols, nrows=op.nrows, **csv_kwargs)
320317
if op.keep_usecols_order:
321318
df = df[op.usecols]
322319
else:

mars/dataframe/datasource/tests/test_datasource_execution.py

+21
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,27 @@ def testReadCSVExecution(self):
374374
concat=True)[0]
375375
pd.testing.assert_frame_equal(pdf, mdf2)
376376

377+
# test nan
378+
with tempfile.TemporaryDirectory() as tempdir:
379+
file_path = os.path.join(tempdir, 'test.csv')
380+
381+
df = pd.DataFrame({
382+
'col1': np.random.rand(100, ),
383+
'col2': np.random.choice(['a', 'b', 'c'], (100,)),
384+
'col3': np.arange(100)
385+
})
386+
df.iloc[20:, :] = pd.NA
387+
df.to_csv(file_path)
388+
389+
pdf = pd.read_csv(file_path, index_col=0)
390+
mdf = md.read_csv(file_path, index_col=0, head_lines=10, chunk_bytes=200)
391+
result = self.executor.execute_dataframe(mdf, concat=True)[0]
392+
pd.testing.assert_frame_equal(pdf, result)
393+
394+
# dtypes is inferred as expected
395+
pd.testing.assert_series_equal(mdf.dtypes, pd.Series(['float64', 'object', 'int64'],
396+
index=df.columns))
397+
377398
# test compression
378399
with tempfile.TemporaryDirectory() as tempdir:
379400
file_path = os.path.join(tempdir, 'test.gzip')

mars/dataframe/groupby/aggregation.py

+16-6
Original file line numberDiff line numberDiff line change
@@ -645,12 +645,22 @@ def _check_if_func_available(func):
645645
def agg(groupby, func, method='auto', *args, **kwargs):
646646
"""
647647
Aggregate using one or more operations on grouped data.
648-
:param groupby: Groupby data.
649-
:param func: Aggregation functions.
650-
:param method: 'shuffle' or 'tree', 'tree' method provide a better performance, 'shuffle' is recommended
651-
if aggregated result is very large, 'auto' will use 'shuffle' method in distributed mode and use 'tree'
652-
in local mode.
653-
:return: Aggregated result.
648+
649+
Parameters
650+
----------
651+
groupby : Mars Groupby
652+
Groupby data.
653+
func : str or list-like
654+
Aggregation functions.
655+
method : {'auto', 'shuffle', 'tree'}, default 'auto'
656+
'tree' method provide a better performance, 'shuffle' is recommended
657+
if aggregated result is very large, 'auto' will use 'shuffle' method
658+
in distributed mode and use 'tree' in local mode.
659+
660+
Returns
661+
-------
662+
Series or DataFrame
663+
Aggregated result.
654664
"""
655665

656666
# When perform a computation on the grouped data, we won't shuffle

mars/dataframe/sort/sort_values.py

+45-17
Original file line numberDiff line numberDiff line change
@@ -112,31 +112,48 @@ def dataframe_sort_values(df, by, axis=0, ascending=True, inplace=False, kind='q
112112
na_position='last', ignore_index=False, parallel_kind='PSRS', psrs_kinds=None):
113113
"""
114114
Sort by the values along either axis.
115-
:param df: input DataFrame.
116-
:param by: Name or list of names to sort by.
117-
:param axis: Axis to be sorted.
118-
:param ascending: Sort ascending vs. descending. Specify list for multiple sort orders.
119-
If this is a list of bools, must match the length of the by.
120-
:param inplace: If True, perform operation in-place.
121-
:param kind: Choice of sorting algorithm. See also ndarray.np.sort for more information.
122-
mergesort is the only stable algorithm. For DataFrames, this option is only applied
123-
when sorting on a single column or label.
124-
:param na_position: Puts NaNs at the beginning if first; last puts NaNs at the end.
125-
:param ignore_index: If True, the resulting axis will be labeled 0, 1, …, n - 1.
126-
:param parallel_kind: {'PSRS'}, optional. Parallel sorting algorithm, for the details, refer to:
127-
http://csweb.cs.wfu.edu/bigiron/LittleFE-PSRS/build/html/PSRSalgorithm.html
128-
:param psrs_kinds: Sorting algorithms during PSRS algorithm.
129-
:return: sorted dataframe.
115+
116+
Parameters
117+
----------
118+
df : Mars DataFrame
119+
Input dataframe.
120+
by : str
121+
Name or list of names to sort by.
122+
axis : %(axes_single_arg)s, default 0
123+
Axis to be sorted.
124+
ascending : bool or list of bool, default True
125+
Sort ascending vs. descending. Specify list for multiple sort
126+
orders. If this is a list of bools, must match the length of
127+
the by.
128+
inplace : bool, default False
129+
If True, perform operation in-place.
130+
kind : {'quicksort', 'mergesort', 'heapsort'}, default 'quicksort'
131+
Choice of sorting algorithm. See also ndarray.np.sort for more
132+
information. `mergesort` is the only stable algorithm. For
133+
DataFrames, this option is only applied when sorting on a single
134+
column or label.
135+
na_position : {'first', 'last'}, default 'last'
136+
Puts NaNs at the beginning if `first`; `last` puts NaNs at the
137+
end.
138+
ignore_index : bool, default False
139+
If True, the resulting axis will be labeled 0, 1, …, n - 1.
140+
parallel_kind : {'PSRS'}, default 'PSRS'
141+
Parallel sorting algorithm, for the details, refer to:
142+
http://csweb.cs.wfu.edu/bigiron/LittleFE-PSRS/build/html/PSRSalgorithm.html
143+
144+
Returns
145+
-------
146+
sorted_obj : DataFrame or None
147+
DataFrame with sorted values if inplace=False, None otherwise.
130148
131149
Examples
132150
--------
133151
>>> import mars.dataframe as md
134-
>>> raw = pd.DataFrame({
152+
>>> df = md.DataFrame({
135153
... 'col1': ['A', 'A', 'B', np.nan, 'D', 'C'],
136154
... 'col2': [2, 1, 9, 8, 7, 4],
137155
... 'col3': [0, 1, 9, 4, 2, 3],
138156
... })
139-
>>> df = md.DataFrame(raw)
140157
>>> df.execute()
141158
col1 col2 col3
142159
0 A 2 0
@@ -179,7 +196,18 @@ def dataframe_sort_values(df, by, axis=0, ascending=True, inplace=False, kind='q
179196
1 A 1 1
180197
3 NaN 8 4
181198
199+
Putting NAs first
200+
201+
>>> df.sort_values(by='col1', ascending=False, na_position='first').execute()
202+
col1 col2 col3
203+
3 NaN 8 4
204+
4 D 7 2
205+
5 C 4 3
206+
2 B 9 9
207+
0 A 2 0
208+
1 A 1 1
182209
"""
210+
183211
if na_position not in ['last', 'first']: # pragma: no cover
184212
raise TypeError(f'invalid na_position: {na_position}')
185213
axis = validate_axis(axis, df)

mars/tensor/einsum/core.py

+43-12
Original file line numberDiff line numberDiff line change
@@ -167,18 +167,48 @@ def einsum(subscripts, *operands, dtype=None, order='K', casting='safe', optimiz
167167
168168
See the notes and examples for clarification.
169169
170-
:param subscripts: Specifies the subscripts for summation as comma separated list of subscript labels.
171-
An implicit (classical Einstein summation) calculation is performed unless the explicit indicator ‘->’ is
172-
included as well as subscript labels of the precise output form.
173-
:param operands: These are the arrays for the operation.
174-
:param dtype: If provided, forces the calculation to use the data type specified.
175-
Note that you may have to also give a more liberal casting parameter to allow the conversions.
176-
Default is None.
177-
:param order: Controls the memory layout of the output.
178-
:param casting: Controls what kind of data casting may occur. Setting this to ‘unsafe’ is not recommended,
179-
as it can adversely affect accumulations.
180-
:param optimize: Controls if intermediate optimization should occur.
181-
:return: The calculation based on the Einstein summation convention.
170+
Parameters
171+
----------
172+
subscripts : str
173+
Specifies the subscripts for summation as comma separated list of
174+
subscript labels. An implicit (classical Einstein summation)
175+
calculation is performed unless the explicit indicator '->' is
176+
included as well as subscript labels of the precise output form.
177+
operands : list of array_like
178+
These are the arrays for the operation.
179+
dtype : {data-type, None}, optional
180+
If provided, forces the calculation to use the data type specified.
181+
Note that you may have to also give a more liberal `casting`
182+
parameter to allow the conversions. Default is None.
183+
order : {'C', 'F', 'A', 'K'}, optional
184+
Controls the memory layout of the output. 'C' means it should
185+
be C contiguous. 'F' means it should be Fortran contiguous,
186+
'A' means it should be 'F' if the inputs are all 'F', 'C' otherwise.
187+
'K' means it should be as close to the layout as the inputs as
188+
is possible, including arbitrarily permuted axes.
189+
Default is 'K'.
190+
casting : {'no', 'equiv', 'safe', 'same_kind', 'unsafe'}, optional
191+
Controls what kind of data casting may occur. Setting this to
192+
'unsafe' is not recommended, as it can adversely affect accumulations.
193+
194+
* 'no' means the data types should not be cast at all.
195+
* 'equiv' means only byte-order changes are allowed.
196+
* 'safe' means only casts which can preserve values are allowed.
197+
* 'same_kind' means only safe casts or casts within a kind,
198+
like float64 to float32, are allowed.
199+
* 'unsafe' means any data conversions may be done.
200+
201+
Default is 'safe'.
202+
optimize : {False, True, 'greedy', 'optimal'}, optional
203+
Controls if intermediate optimization should occur. No optimization
204+
will occur if False and True will default to the 'greedy' algorithm.
205+
Also accepts an explicit contraction list from the ``np.einsum_path``
206+
function. See ``np.einsum_path`` for more details. Defaults to False.
207+
208+
Returns
209+
-------
210+
output : Mars.tensor
211+
The calculation based on the Einstein summation convention.
182212
183213
The Einstein summation convention can be used to compute
184214
many multi-dimensional, linear algebraic array operations. `einsum`
@@ -393,6 +423,7 @@ def einsum(subscripts, *operands, dtype=None, order='K', casting='safe', optimiz
393423
... _ = mt.einsum('ijk,ilm,njm,nlk,abc->',a,a,a,a,a, optimize=path)
394424
395425
"""
426+
396427
all_inputs = [subscripts] + list(operands)
397428
inputs, outputs, operands = parse_einsum_input(all_inputs)
398429
subscripts = "->".join((inputs, outputs))

0 commit comments

Comments
 (0)