Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
frederiksteiner authored Nov 28, 2024
2 parents 8ef4c68 + ce1d22b commit 5da8304
Show file tree
Hide file tree
Showing 21 changed files with 353 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Please answer these questions before creating your pull request. Thanks!
- [ ] I am adding new credentials
- [ ] I am adding a new dependency
- [ ] If this is a new feature/behavior, I'm adding the Local Testing parity changes.
- [ ] I acknowledge that I have ensured my changes to be thread-safe. Follow the link for more information: [Thread-safe Developer Guidelines](https://docs.google.com/document/d/162d_i4zZ2AfcGRXojj0jByt8EUq-DrSHPPnTa4QvwbA/edit#bookmark=id.e82u4nekq80k)
- [ ] I acknowledge that I have ensured my changes to be thread-safe. Follow the link for more information: [Thread-safe Developer Guidelines](https://github.com/snowflakedb/snowpark-python/blob/main/CONTRIBUTING.md#thread-safe-development)

3. Please describe how your code solves the related issue.

Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/daily_jupyter_nb_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ jobs:
run: ls -lh dist
shell: bash
- name: Upgrade setuptools, pip and wheel
run: python -m pip install -U setuptools pip wheel
run: python -m pip install -U setuptools pip wheel tox
- name: Build protobuf Python files
run: python -m tox -e protoc
- name: Install project
run: python -m pip install -e ".[modin-development]"
- name: Install test requirements
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
- To `MapType`:
- `keyType`: keys of the map
- `valueType`: values of the map
- Added support for method `appName` in `SessionBuilder`.
- Added support for `include_nulls` argument in `DataFrame.unpivot`.
- Added support for following functions in `functions.py`:
- `size` to get size of array, object, or map columns.
- `collect_list` an alias of `array_agg`.
- `concat_ws_ignore_nulls` to concatenate strings with a separator, ignoring null values.
- `substring` makes `len` argument optional.
- Added parameter `ast_enabled` to session for internal usage (default: `False`).

#### Improvements
Expand Down Expand Up @@ -58,6 +64,7 @@
- Added support for `DataFrame.align` and `Series.align` for `axis=1` and `axis=None`.
- Added support fot `pd.json_normalize`.
- Added support for `GroupBy.pct_change` with `axis=0`, `freq=None`, and `limit=None`.
- Added support for `DataFrameGroupBy.__iter__` and `SeriesGroupBy.__iter__`.

#### Dependency Updates

Expand Down
29 changes: 25 additions & 4 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ cd snowpark-python
#### Install the library in edit mode and install its dependencies

- Create a new Python virtual environment with any Python version that we support.
- The Snowpark Python API supports **Python 3.8, Python 3.9, Python 3.10, and Python 3.11**.
- The Snowpark Python API supports **Python 3.8, Python 3.9, Python 3.10, and Python 3.11**.
- The Snowpark pandas API supports **Python 3.9, Python 3.10, and Python 3.11**. Additionally, Snowpark pandas requires **Modin 0.30.1** and **pandas 2.2.x**.

```bash
conda create --name snowpark-dev python=3.9
```

- Activate the new Python virtual environment. For example,

```bash
Expand Down Expand Up @@ -83,6 +83,27 @@ and "Mark Directory as" -> "Source Root". **NOTE**: VS Code doesn't have "Source
[Configure PyCharm interpreter][config pycharm interpreter] or [Configure VS Code interpreter][config vscode interpreter] to use the previously created Python virtual environment.
### Thread-safe development
This section covers guidelines for developers that wish to contribute code to `Session`, `ServerConnection`, `MockServerConnection` and other related objects that are critical to correct functionality of `snowpark-python`.
#### Add Config Parameter to `Session`
1. If the config parameter is set once during initialization and never changed, it is safe to add the parameter to the `Session` object.
2. If the config parameter can be updated by the user, and the update has side-effects during compilation i.e. `analyzer.analyze()`, `analyzer.resolve()` etc, add a warning at config update using `warn_session_config_update_in_multithreaded_mode`.
#### Adding a new localized or shared component
Once you have decided that the new component being added with required protection during concurrent access, following can be used:
- `Session._thread_store`, `ServerConnection._thread_store` are `threading.local()` objects which can be used to store a per-thread instance of the component. The python connector cursor object is an example of this.
- `Session._lock` and `ServerConnection._lock` are `RLock` objects which can be used to serialize access to shared resources. `Session.query_tag` is an example of this.
- `Session._package_lock` is a `RLock` object which can be used to protect `packages` and `imports` for stored procedures and user defined functions.
- `Session._plan_lock` is a `RLock` object which can be used to serialize `SnowflakePlan` and `Selectable` method calls. `SnowflakePlan.plan_state` is an example.
- `QueryHistory(session, include_thread_id=True)` can be used to log the query history with thread id.
An example PR to make auto temp table cleaner thread-safe can be found [here](https://github.com/snowflakedb/snowpark-python/pull/2309).
## Tests
The [README under tests folder](tests/README.md) tells you how to set up to run tests.
Expand All @@ -105,11 +126,11 @@ snowflake
└── snowpark
└── modin
└── pandas ← pandas API frontend layer
└── core
└── core
├── dataframe ← folder containing abstraction
for Modin frontend to DF-algebra
│── execution ← additional patching for I/O
└── plugin
└── plugin
├── _interal ← Snowflake specific internals
├── io ← Snowpark pandas IO functions
├── compiler ← query compiler, Modin -> Snowpark pandas DF
Expand Down
2 changes: 1 addition & 1 deletion docs/source/modin/supported/groupby_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Indexing, iteration
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``indices`` | Y | |
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``__iter__`` | N | |
| ``__iter__`` | Y | |
+-----------------------------+---------------------------------+----------------------------------------------------+

Function application
Expand Down
3 changes: 3 additions & 0 deletions docs/source/snowpark/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ Functions
col
collate
collation
collect_list
collect_set
column
concat
concat_ws
concat_ws_ignore_nulls
contains
convert_timezone
corr
Expand Down Expand Up @@ -257,6 +259,7 @@ Functions
sha2
sin
sinh
size
skew
snowflake_cortex_summarize
sort_array
Expand Down
4 changes: 2 additions & 2 deletions recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ requirements:
- mypy-protobuf
run:
- python
- cloudpickle >=1.6.0,<=2.0.0 # [py<=310]
- cloudpickle >=1.6.0,<=2.2.1,!=2.1.0,!=2.2.0 # [py<=310]
- cloudpickle==2.2.1 # [py==311]
- snowflake-connector-python >=3.10.0,<4.0.0
- typing-extensions >=4.1.0
- typing-extensions >=4.1.0,<5.0.0
# need to pin libffi because of problems in cryptography.
# This might no longer hold true but keep it just to avoid it from biting us again
- libffi <=3.4.4
Expand Down
7 changes: 4 additions & 3 deletions src/snowflake/snowpark/_internal/analyzer/select_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,14 +997,15 @@ def projection_complexities(self) -> List[Dict[PlanNodeCategory, int]]:
# - {PlanNodeCategory.COLUMN: 1} + col2_complexity
# - {PlanNodeCategory.COLUMN: 1} + col1_complexity
dependent_columns = proj.dependent_column_names_with_duplication()
projection_complexity = proj.cumulative_node_complexity
projection_complexity = proj.cumulative_node_complexity.copy()
for dependent_column in dependent_columns:
dependent_column_complexity = (
subquery_projection_name_complexity_map[dependent_column]
)
projection_complexity[PlanNodeCategory.COLUMN] -= 1
projection_complexity = sum_node_complexities(
projection_complexity, dependent_column_complexity
projection_complexity,
dependent_column_complexity,
{PlanNodeCategory.COLUMN: -1},
)

self._projection_complexities.append(projection_complexity)
Expand Down
97 changes: 93 additions & 4 deletions src/snowflake/snowpark/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3017,7 +3017,7 @@ def split(str: ColumnOrName, pattern: ColumnOrName, _emit_ast: bool = True) -> C
def substring(
str: ColumnOrName,
pos: Union[Column, int],
len: Union[Column, int],
len: Optional[Union[Column, int]] = None,
_emit_ast: bool = True,
) -> Column:
"""Returns the portion of the string or binary value str, starting from the
Expand All @@ -3030,16 +3030,26 @@ def substring(
:func:`substr` is an alias of :func:`substring`.
Example::
Example 1::
>>> df = session.create_dataframe(
... ["abc", "def"],
... schema=["S"],
... ).select(substring(col("S"), 1, 1))
>>> df.collect()
... )
>>> df.select(substring(col("S"), 1, 1)).collect()
[Row(SUBSTRING("S", 1, 1)='a'), Row(SUBSTRING("S", 1, 1)='d')]
Example 2::
>>> df = session.create_dataframe(
... ["abc", "def"],
... schema=["S"],
... )
>>> df.select(substring(col("S"), 2)).collect()
[Row(SUBSTRING("S", 2)='bc'), Row(SUBSTRING("S", 2)='ef')]
"""
s = _to_col_if_str(str, "substring")
p = pos if isinstance(pos, Column) else lit(pos, _emit_ast=_emit_ast)
if len is None:
return builtin("substring", _emit_ast=_emit_ast)(s, p)
length = len if isinstance(len, Column) else lit(len, _emit_ast=_emit_ast)
return builtin("substring", _emit_ast=_emit_ast)(s, p, length)

Expand Down Expand Up @@ -3392,6 +3402,46 @@ def concat_ws(*cols: ColumnOrName, _emit_ast: bool = True) -> Column:
return builtin("concat_ws", _emit_ast=_emit_ast)(*columns)


@publicapi
def concat_ws_ignore_nulls(
sep: str, *cols: ColumnOrName, _emit_ast: bool = True
) -> Column:
"""Concatenates two or more strings, or concatenates two or more binary values. Null values are ignored.
Args:
sep: The separator to use between the strings.
Examples::
>>> df = session.create_dataframe([
... ['Hello', 'World', None],
... [None, None, None],
... ['Hello', None, None],
... ], schema=['a', 'b', 'c'])
>>> df.select(concat_ws_ignore_nulls(',', df.a, df.b, df.c)).show()
----------------------------------------------------
|"CONCAT_WS_IGNORE_NULLS(',', ""A"",""B"",""C"")" |
----------------------------------------------------
|Hello,World |
| |
|Hello |
----------------------------------------------------
<BLANKLINE>
"""
# TODO: SNOW-1831917 create ast
columns = [_to_col_if_str(c, "concat_ws_ignore_nulls") for c in cols]
names = ",".join([c.get_name() for c in columns])

input_column_array = array_construct_compact(*columns, _emit_ast=False)
reduced_result = builtin("reduce", _emit_ast=False)(
input_column_array,
lit("", _emit_ast=False),
sql_expr(f"(l, r) -> l || '{sep}' || r", _emit_ast=False),
)
return substring(reduced_result, 2, _emit_ast=False).alias(
f"CONCAT_WS_IGNORE_NULLS('{sep}', {names})", _emit_ast=False
)


@publicapi
def translate(
src: ColumnOrName,
Expand Down Expand Up @@ -6722,6 +6772,44 @@ def array_unique_agg(col: ColumnOrName, _emit_ast: bool = True) -> Column:
return _call_function("array_unique_agg", True, c, _emit_ast=_emit_ast)


@publicapi
def size(col: ColumnOrName, _emit_ast: bool = True) -> Column:
"""Returns the size of the input ARRAY, OBJECT or MAP. Returns NULL if the
input column does not match any of these types.
Args:
col: A :class:`Column` object or column name that determines the values.
Example::
>>> df = session.create_dataframe([([1,2,3], {'a': 1, 'b': 2}, 3)], ['col1', 'col2', 'col3'])
>>> df.select(size(df.col1), size(df.col2), size(df.col3)).show()
----------------------------------------------------------
|"SIZE(""COL1"")" |"SIZE(""COL2"")" |"SIZE(""COL3"")" |
----------------------------------------------------------
|3 |2 |NULL |
----------------------------------------------------------
<BLANKLINE>
"""
c = _to_col_if_str(col, "size")
v = to_variant(c)

# TODO: SNOW-1831923 build AST
return (
when(
is_array(v, _emit_ast=False),
array_size(v, _emit_ast=False),
_emit_ast=False,
)
.when(
is_object(v, _emit_ast=False),
array_size(object_keys(v, _emit_ast=False), _emit_ast=False),
_emit_ast=False,
)
.otherwise(lit(None), _emit_ast=False)
.alias(f"SIZE({c.get_name()})", _emit_ast=False)
)


@publicapi
def object_agg(
key: ColumnOrName, value: ColumnOrName, _emit_ast: bool = True
Expand Down Expand Up @@ -9865,6 +9953,7 @@ def sproc(
# Add these alias for user code migration
call_builtin = call_function
collect_set = array_unique_agg
collect_list = array_agg
builtin = function
countDistinct = count_distinct
substr = substring
Expand Down
45 changes: 45 additions & 0 deletions src/snowflake/snowpark/modin/plugin/docstrings/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -2005,6 +2005,51 @@ def __iter__():
-------
Generator
A generator yielding a sequence of (name, subsetted object) for each group.
Examples
--------
For SeriesGroupBy:
>>> lst = ['a', 'a', 'b']
>>> ser = pd.Series([1, 2, 3], index=lst)
>>> ser
a 1
a 2
b 3
dtype: int64
>>> for x, y in ser.groupby(level=0):
... print(f'{x}\\n{y}\\n')
a
a 1
a 2
dtype: int64
<BLANKLINE>
b
b 3
dtype: int64
<BLANKLINE>
For DataFrameGroupBy:
>>> data = [[1, 2, 3], [1, 5, 6], [7, 8, 9]]
>>> df = pd.DataFrame(data, columns=["a", "b", "c"])
>>> df
a b c
0 1 2 3
1 1 5 6
2 7 8 9
>>> for x, y in df.groupby(by=["a"]):
... print(f'{x}\\n{y}\\n')
(1,)
a b c
0 1 2 3
1 1 5 6
<BLANKLINE>
(7,)
a b c
2 7 8 9
<BLANKLINE>
"""

def cov():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,7 @@ def groupby(

idx_name = None

return_tuple_when_iterating = False
if (
not isinstance(by, Series)
and is_list_like(by)
Expand All @@ -1000,6 +1001,7 @@ def groupby(
# `None`, and by=None wold mean that there is no `by` param.
and by[0] is not None
):
return_tuple_when_iterating = True
by = by[0]

if hashable(by) and (
Expand Down Expand Up @@ -1050,6 +1052,7 @@ def groupby(
idx_name,
observed=observed,
dropna=dropna,
return_tuple_when_iterating=return_tuple_when_iterating,
)


Expand Down
Loading

0 comments on commit 5da8304

Please sign in to comment.