-
Notifications
You must be signed in to change notification settings - Fork 111
SeaDatabricksClient: Add Metadata Commands #593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: sea-migration
Are you sure you want to change the base?
Conversation
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments
""" | ||
|
||
@staticmethod | ||
def _filter_sea_result_set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is specific to SEA result set and can't be used for a generic result set class? let's try to make it generic for a result set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need some service specific methods at some point during the filtering process to know what kind of result set to return, since our concrete instances are service specific. I tried to keep the root methods invoked (filter by table type
) general, following which they invoke the service specific builders based on the type of the instance passed to them.
sea_client=cast(SeaDatabricksClient, result_set.backend), | ||
buffer_size_bytes=result_set.buffer_size_bytes, | ||
arraysize=result_set.arraysize, | ||
result_data=result_data, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you remind me what is the significance of this result_data param in result set? is this present in the base class? Is this an optional param and is used to create a result set with hard-coded rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not present in the base class, it is an instance of a ResultData
model which represents the results
returned during SEA execution. In our case, we set the filtered rows in the data array of this ResultData
to effectively create a filtered SeaResultSet
.
all_rows = result_set.results.remaining_rows() | ||
|
||
# Filter rows | ||
filtered_rows = [row for row in all_rows if filter_func(row)] | ||
|
||
# Import SeaResultSet here to avoid circular imports | ||
from databricks.sql.result_set import SeaResultSet | ||
|
||
# Reuse the command_id from the original result set | ||
command_id = result_set.command_id | ||
|
||
# Create an ExecuteResponse with the filtered data | ||
execute_response = ExecuteResponse( | ||
command_id=command_id, | ||
status=result_set.status, | ||
description=result_set.description, | ||
has_been_closed_server_side=result_set.has_been_closed_server_side, | ||
lz4_compressed=result_set.lz4_compressed, | ||
arrow_schema_bytes=result_set._arrow_schema_bytes, | ||
is_staging_operation=False, | ||
) | ||
|
||
# Create a new ResultData object with filtered data | ||
from databricks.sql.backend.sea.models.base import ResultData | ||
|
||
result_data = ResultData(data=filtered_rows, external_links=None) | ||
|
||
# Create a new SeaResultSet with the filtered data | ||
filtered_result_set = SeaResultSet( | ||
connection=result_set.connection, | ||
execute_response=execute_response, | ||
sea_client=cast(SeaDatabricksClient, result_set.backend), | ||
buffer_size_bytes=result_set.buffer_size_bytes, | ||
arraysize=result_set.arraysize, | ||
result_data=result_data, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the whole implementation can be improved. you are essentially first downloading the complete result set and then initialising a new one. a filter method should ideally just take the object to be filtered and return true/false on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introducing a filter method that is utilised in the fetch phase would lead to a lot of specialised code for the table queries during the fetch phase.
Currently, all that the execution relevant methods (execute_command
and the metadata methods like get_tables
, get_schemas
, etc.) do is return a ResultSet
that is set as the active result set of the Cursor
.
The fetch phase from this step on is completely invariant of the kind of query that took place. If we want to use a separate filter
method, then we would have to add custom logic during the fetch (if table metadata then filter result; return result;
). Maintaining the generality of the fetch phase is likely worth the tradeoff involved in creating a new copy.
def filter_by_column_values( | ||
result_set: "ResultSet", | ||
column_index: int, | ||
allowed_values: List[str], | ||
case_sensitive: bool = False, | ||
) -> "ResultSet": | ||
""" | ||
Filter a result set by values in a specific column. | ||
|
||
Args: | ||
result_set: The result set to filter | ||
column_index: The index of the column to filter on | ||
allowed_values: List of allowed values for the column | ||
case_sensitive: Whether to perform case-sensitive comparison | ||
|
||
Returns: | ||
A filtered result set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
def filter_tables_by_type( | ||
result_set: "ResultSet", table_types: Optional[List[str]] = None | ||
) -> "ResultSet": | ||
""" | ||
Filter a result set of tables by the specified table types. | ||
|
||
This is a client-side filter that processes the result set after it has been | ||
retrieved from the server. It filters out tables whose type does not match | ||
any of the types in the table_types list. | ||
|
||
Args: | ||
result_set: The original result set containing tables | ||
table_types: List of table types to include (e.g., ["TABLE", "VIEW"]) | ||
|
||
Returns: | ||
A filtered result set containing only tables of the specified types | ||
""" | ||
|
||
# Default table types if none specified | ||
DEFAULT_TABLE_TYPES = ["TABLE", "VIEW", "SYSTEM TABLE"] | ||
valid_types = ( | ||
table_types if table_types and len(table_types) > 0 else DEFAULT_TABLE_TYPES | ||
) | ||
|
||
# Table type is the 6th column (index 5) | ||
return ResultSetFilter.filter_by_column_values( | ||
result_set, 5, valid_types, case_sensitive=True | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
Co-authored-by: jayant <[email protected]>
Thanks for your contribution! To satisfy the DCO policy in our contributing guide every commit message must include a sign-off message. One or more of your commits is missing this message. You can reword previous commit messages with an interactive rebase ( |
Signed-off-by: varun-edachali-dbx <[email protected]>
…ions Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
What type of PR is this?
Description
Add metadata command implementations for the
SeaDatabricksClient
(execution phase) -get_catalogs
,get_schemas
,get_tables
andget_columns
.How is this tested?
The coverage of the functionality added (by
test_filters.py
and the new tests intest_sea_backend.py
) are as below:filters.py
from databricks.sql.result_set import ResultSet, SeaResultSet
(TYPE_CHECKING import)sea/backend.py
(metadata methods)Related Tickets & Documents
https://docs.google.com/document/d/1Y-eXLhNqqhrMVGnOlG8sdFrCxBTN1GdQvuKG4IfHmo0/edit?usp=sharing