File tree Expand file tree Collapse file tree 11 files changed +118
-4
lines changed
docs/changelog/next_release
syncmaster/worker/handlers Expand file tree Collapse file tree 11 files changed +118
-4
lines changed Original file line number Diff line number Diff line change 1+ Implement SQL transformations in worker
Original file line number Diff line number Diff line change @@ -58,7 +58,14 @@ def read(self) -> DataFrame:
5858 columns = self ._get_columns_filter_expressions (),
5959 ** reader_params ,
6060 )
61- return reader .run ()
61+ df = reader .run ()
62+
63+ sql_query = self ._get_sql_query ()
64+ if sql_query :
65+ df .createOrReplaceTempView ("source" )
66+ df = self .connection .spark .sql (sql_query )
67+
68+ return df
6269
6370 def write (self , df : DataFrame ) -> None :
6471 if self .transfer_dto .strategy .type == "incremental" and self .hwm and self .hwm .value :
@@ -110,6 +117,12 @@ def _get_columns_filter_expressions(self) -> list[str] | None:
110117
111118 return self ._make_columns_filter_expressions (expressions )
112119
120+ def _get_sql_query (self ) -> str | None :
121+ for transformation in self .transfer_dto .transformations :
122+ if transformation ["type" ] == "sql" :
123+ return transformation ["query" ]
124+ return None
125+
113126 def _get_reading_options (self ) -> dict :
114127 options : dict [str , Any ] = {}
115128
Original file line number Diff line number Diff line change @@ -135,3 +135,9 @@ def _get_columns_filter_expressions(self) -> list[str] | None:
135135 expressions .extend (transformation ["filters" ])
136136
137137 return self ._make_columns_filter_expressions (expressions )
138+
139+ def _get_sql_query (self ) -> str | None :
140+ for transformation in self .transfer_dto .transformations :
141+ if transformation ["type" ] == "sql" :
142+ return transformation ["query" ]
143+ return None
Original file line number Diff line number Diff line change @@ -58,6 +58,11 @@ def read(self) -> DataFrame:
5858 if columns_filter_expressions :
5959 df = df .selectExpr (* columns_filter_expressions )
6060
61+ sql_query = self ._get_sql_query ()
62+ if sql_query :
63+ df .createOrReplaceTempView ("source" )
64+ df = self .df_connection .spark .sql (sql_query )
65+
6166 return df
6267
6368 def write (self , df : DataFrame ) -> None :
Original file line number Diff line number Diff line change @@ -35,6 +35,11 @@ def read(self) -> DataFrame:
3535 if columns_filter_expressions :
3636 df = df .selectExpr (* columns_filter_expressions )
3737
38+ sql_query = self ._get_sql_query ()
39+ if sql_query :
40+ df .createOrReplaceTempView ("source" )
41+ df = self .df_connection .spark .sql (sql_query )
42+
3843 return df
3944
4045 def write (self , df : DataFrame ) -> None :
Original file line number Diff line number Diff line change @@ -71,6 +71,11 @@ def read(self) -> DataFrame:
7171 if columns_filter_expressions :
7272 df = df .selectExpr (* columns_filter_expressions )
7373
74+ sql_query = self ._get_sql_query ()
75+ if sql_query :
76+ df .createOrReplaceTempView ("source" )
77+ df = self .df_connection .spark .sql (sql_query )
78+
7479 return df
7580
7681 @slot
Original file line number Diff line number Diff line change 11pytest_plugins = [
22 "tests.test_unit.test_scheduler.scheduler_fixtures" ,
33 "tests.test_integration.test_scheduler.scheduler_fixtures" ,
4+ "tests.test_integration.test_run_transfer.connection_fixtures" ,
45]
Load Diff This file was deleted.
Original file line number Diff line number Diff line change 2424 dataframe_rows_filter_transformations ,
2525 expected_dataframe_columns_filter ,
2626 expected_dataframe_rows_filter ,
27+ expected_sql_transformation ,
2728 file_metadata_filter_transformations ,
29+ sql_transformation ,
2830)
2931from tests .test_integration .test_run_transfer .connection_fixtures .ftp_fixtures import (
3032 ftp_connection ,
151153 "dataframe_rows_filter_transformations" ,
152154 "expected_dataframe_columns_filter" ,
153155 "expected_dataframe_rows_filter" ,
156+ "expected_sql_transformation" ,
154157 "file_format_flavor" ,
155158 "file_metadata_filter_transformations" ,
156159 "ftp_connection" ,
239242 "sftp_for_worker" ,
240243 "source_file_format" ,
241244 "spark" ,
245+ "sql_transformation" ,
242246 "target_file_format" ,
243247 "update_transfer_strategy" ,
244248 "webdav_connection" ,
Original file line number Diff line number Diff line change @@ -133,3 +133,19 @@ def file_metadata_filter_transformations():
133133 ],
134134 },
135135 ]
136+
137+
138+ @pytest .fixture
139+ def sql_transformation ():
140+ return [
141+ {
142+ "type" : "sql" ,
143+ "query" : "SELECT * FROM source WHERE NUMBER <= 20" ,
144+ "dialect" : "spark" ,
145+ },
146+ ]
147+
148+
149+ @pytest .fixture
150+ def expected_sql_transformation ():
151+ return lambda df , source_type : df .filter (df .NUMBER <= 20 )
You can’t perform that action at this time.
0 commit comments