Skip to content

Commit

Permalink
[SPARK-43111][PS][CONNECT][PYTHON] Merge nested if statements into …
Browse files Browse the repository at this point in the history
…single `if` statements

### What changes were proposed in this pull request?
This PR aims to simplify the code by merging nested `if` statements into single `if` statements using the `and` operator.

There are 7 of these according to [Sonarcloud](https://sonarcloud.io/project/issues?languages=py&resolved=false&rules=python%3AS1066&id=spark-python&open=AYQdnXXBRrJbVxW9ZDpw). And this PR fix them all.

### Why are the changes needed?
The changes do not affect the functionality of the code, but they improve readability and maintainability.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes apache#40759 from bjornjorgensen/Merge-if-with-the-enclosing-one.

Lead-authored-by: Bjørn Jørgensen <[email protected]>
Co-authored-by: bjornjorgensen <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
bjornjorgensen authored and HyukjinKwon committed Apr 18, 2023
1 parent 119ec5b commit 462d456
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 33 deletions.
5 changes: 2 additions & 3 deletions python/pyspark/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,8 @@ def poll(func: Callable[[], bool]) -> None:
while not self.server.server_shutdown: # type: ignore[attr-defined]
# Poll every 1 second for new data -- don't block in case of shutdown.
r, _, _ = select.select([self.rfile], [], [], 1)
if self.rfile in r:
if func():
break
if self.rfile in r and func():
break

def accum_updates() -> bool:
num_updates = read_int(self.rfile)
Expand Down
17 changes: 10 additions & 7 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -8915,15 +8915,19 @@ def append(
if len(index_scols) != other._internal.index_level:
raise ValueError("Both DataFrames have to have the same number of index levels")

if verify_integrity and len(index_scols) > 0:
if (
if (
verify_integrity
and len(index_scols) > 0
and (
self._internal.spark_frame.select(index_scols)
.intersect(
other._internal.spark_frame.select(other._internal.index_spark_columns)
)
.count()
) > 0:
raise ValueError("Indices have overlapping values")
)
> 0
):
raise ValueError("Indices have overlapping values")

# Lazy import to avoid circular dependency issues
from pyspark.pandas.namespace import concat
Expand Down Expand Up @@ -11581,9 +11585,8 @@ def mapper_fn(x: Any) -> Any:

index_columns = psdf._internal.index_spark_column_names
num_indices = len(index_columns)
if level:
if level < 0 or level >= num_indices:
raise ValueError("level should be an integer between [0, %s)" % num_indices)
if level is not None and (level < 0 or level >= num_indices):
raise ValueError("level should be an integer between [0, %s)" % num_indices)

@pandas_udf(returnType=index_mapper_ret_stype) # type: ignore[call-overload]
def index_mapper_udf(s: pd.Series) -> pd.Series:
Expand Down
17 changes: 8 additions & 9 deletions python/pyspark/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -3550,15 +3550,14 @@ def _validate_agg_columns(self, numeric_only: Optional[bool], function_name: str
if isinstance(self, SeriesGroupBy):
raise TypeError("Only numeric aggregation column is accepted.")

if not numeric_only:
if has_non_numeric:
warnings.warn(
"Dropping invalid columns in DataFrameGroupBy.%s is deprecated. "
"In a future version, a TypeError will be raised. "
"Before calling .%s, select only columns which should be "
"valid for the function." % (function_name, function_name),
FutureWarning,
)
if not numeric_only and has_non_numeric:
warnings.warn(
"Dropping invalid columns in DataFrameGroupBy.%s is deprecated. "
"In a future version, a TypeError will be raised. "
"Before calling .%s, select only columns which should be "
"valid for the function." % (function_name, function_name),
FutureWarning,
)

def _reduce_for_stat_function(
self,
Expand Down
9 changes: 4 additions & 5 deletions python/pyspark/pandas/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2095,11 +2095,10 @@ def set_names(
"""
from pyspark.pandas.indexes.multi import MultiIndex

if isinstance(self, MultiIndex):
if level is not None:
self_names = self.names
self_names[level] = names # type: ignore[index]
names = self_names
if isinstance(self, MultiIndex) and level is not None:
self_names = self.names
self_names[level] = names # type: ignore[index]
names = self_names
return self.rename(name=names, inplace=inplace)

def difference(self, other: "Index", sort: Optional[bool] = None) -> "Index":
Expand Down
5 changes: 2 additions & 3 deletions python/pyspark/pandas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2190,9 +2190,8 @@ def get_dummies(
if sparse is not False:
raise NotImplementedError("get_dummies currently does not support sparse")

if columns is not None:
if not is_list_like(columns):
raise TypeError("Input must be a list-like for parameter `columns`")
if columns is not None and not is_list_like(columns):
raise TypeError("Input must be a list-like for parameter `columns`")

if dtype is None:
dtype = "byte"
Expand Down
11 changes: 5 additions & 6 deletions python/pyspark/sql/connect/streaming/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ def load(
if schema is not None:
self.schema(schema)
self.options(**options)
if path is not None:
if type(path) != str or len(path.strip()) == 0:
raise ValueError(
"If the path is provided for stream, it needs to be a "
+ "non-empty string. List of paths are not supported."
)
if path is not None and (type(path) != str or len(path.strip()) == 0):
raise ValueError(
"If the path is provided for stream, it needs to be a "
+ "non-empty string. List of paths are not supported."
)

plan = DataSource(
format=self._format,
Expand Down

0 comments on commit 462d456

Please sign in to comment.