Skip to content

Commit

Permalink
[SPARK-43042][SS][CONNECT] Add table() API support for DataStreamReader
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add the table() method support in DataStreamReader in Spark Connect.

### Why are the changes needed?

Continuation of building SS Connect

### Does this PR introduce _any_ user-facing change?

Yes now the table() API is available.

### How was this patch tested?

Unit test

Closes apache#40797 from WweiL/SPARK-43042-dsreader-table-new.

Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
WweiL authored and HyukjinKwon committed Apr 17, 2023
1 parent 6484118 commit b2a86e3
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -829,12 +829,13 @@ class SparkConnectPlanner(val session: SparkSession) {
private def transformReadRel(rel: proto.Read): LogicalPlan = {

rel.getReadTypeCase match {
case proto.Read.ReadTypeCase.NAMED_TABLE if !rel.getIsStreaming =>
case proto.Read.ReadTypeCase.NAMED_TABLE =>
val multipartIdentifier =
CatalystSqlParser.parseMultipartIdentifier(rel.getNamedTable.getUnparsedIdentifier)
UnresolvedRelation(
multipartIdentifier,
new CaseInsensitiveStringMap(rel.getNamedTable.getOptionsMap))
new CaseInsensitiveStringMap(rel.getNamedTable.getOptionsMap),
isStreaming = rel.getIsStreaming)

case proto.Read.ReadTypeCase.DATA_SOURCE if !rel.getIsStreaming =>
val localMap = CaseInsensitiveMap[String](rel.getDataSource.getOptionsMap.asScala.toMap)
Expand Down
10 changes: 9 additions & 1 deletion python/pyspark/sql/connect/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,22 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:


class Read(LogicalPlan):
def __init__(self, table_name: str, options: Optional[Dict[str, str]] = None) -> None:
def __init__(
self,
table_name: str,
options: Optional[Dict[str, str]] = None,
is_streaming: Optional[bool] = None,
) -> None:
super().__init__(None)
self.table_name = table_name
self.options = options or {}
self._is_streaming = is_streaming

def plan(self, session: "SparkConnectClient") -> proto.Relation:
plan = self._create_proto_relation()
plan.read.named_table.unparsed_identifier = self.table_name
if self._is_streaming is not None:
plan.read.is_streaming = self._is_streaming
for k, v in self.options.items():
plan.read.named_table.options[k] = v
return plan
Expand Down
7 changes: 5 additions & 2 deletions python/pyspark/sql/connect/streaming/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from typing import cast, overload, Callable, Dict, List, Optional, TYPE_CHECKING, Union

from pyspark.sql.connect.plan import DataSource, LogicalPlan, WriteStreamOperation
from pyspark.sql.connect.plan import DataSource, LogicalPlan, Read, WriteStreamOperation
import pyspark.sql.connect.proto as pb2
from pyspark.sql.connect.readwriter import OptionUtils, to_str
from pyspark.sql.connect.streaming.query import StreamingQuery
Expand Down Expand Up @@ -311,7 +311,10 @@ def csv(

csv.__doc__ = PySparkDataStreamReader.csv.__doc__

# def table() TODO(SPARK-43042). Use Read(table_name) relation.
def table(self, tableName: str) -> "DataFrame":
return self._df(Read(tableName, self._options, is_streaming=True))

table.__doc__ = PySparkDataStreamReader.table.__doc__


DataStreamReader.__doc__ = PySparkDataStreamReader.__doc__
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,6 @@ def test_stream_status_and_progress(self):
def test_query_manager_await_termination(self):
super().test_query_manager_await_termination()

@unittest.skip("table API will be supported later with SPARK-43042.")
def test_streaming_read_from_table(self):
super().test_streaming_read_from_table()

@unittest.skip("table API will be supported later with SPARK-43042.")
def test_streaming_write_to_table(self):
super().test_streaming_write_to_table()

@unittest.skip("Query manager API will be supported later with SPARK-43032.")
def test_stream_save_options(self):
super().test_stream_save_options()
Expand Down

0 comments on commit b2a86e3

Please sign in to comment.