-
Notifications
You must be signed in to change notification settings - Fork 72
[GEN-1667] Delete Table Rows Using Filtered DataFrame #1254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ae5a591
6f9c651
01c87d3
5e4e779
24f8485
e439a3a
2e77a10
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4049,31 +4049,38 @@ class TableDeleteRowMixin: | |
|
||
async def delete_rows_async( | ||
self, | ||
query: str, | ||
query: Optional[str] = None, | ||
df: Optional[DATA_FRAME_TYPE] = None, | ||
*, | ||
job_timeout: int = 600, | ||
synapse_client: Optional[Synapse] = None, | ||
) -> DATA_FRAME_TYPE: | ||
""" | ||
Delete rows from a table given a query to select rows. The query at a | ||
minimum must select the `ROW_ID` and `ROW_VERSION` columns. If you want to | ||
Delete rows from a table given a query or a pandas dataframe to select rows. | ||
danlu1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
The query at a minimum must select the `ROW_ID` and `ROW_VERSION` columns. If you want to | ||
inspect the data that will be deleted ahead of time you may use the | ||
`.query` method to get the data. | ||
|
||
The dataframe must at least contain the `ROW_ID` and `ROW_VERSION` columns. And `ROW_ETAG` column is also required | ||
if the entity is one of the following: `EntityView`, `Dataset`, `DatasetCollection`, or `SubmissionView`. | ||
If both query and df are provided, the query will be used. | ||
|
||
Arguments: | ||
query: The query to select the rows to delete. The query at a minimum | ||
must select the `ROW_ID` and `ROW_VERSION` columns. See this document | ||
that describes the expected syntax of the query: | ||
<https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/web/controller/TableExamples.html> | ||
df: A pandas dataframe that contains the rows to delete. The dataframe must at least contain the `ROW_ID` and `ROW_VERSION` columns. | ||
If the entity is one of the following: `EntityView`, `Dataset`, `DatasetCollection`, or `SubmissionView` then the dataframe must also contain the `ROW_ETAG` column. | ||
See this document that describes the expected columns of the dataframe: | ||
<https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/Row.html> | ||
job_timeout: The amount of time to wait for table updates to complete | ||
before a `SynapseTimeoutError` is thrown. The default is 600 seconds. | ||
synapse_client: If not passed in and caching was not disabled by | ||
`Synapse.allow_client_caching(False)` this will use the last created | ||
instance from the Synapse class constructor. | ||
|
||
Returns: | ||
The results of your query for the rows that were deleted from the table. | ||
The results of your query or dataframe for the rows that were deleted from the table. | ||
|
||
Example: Selecting a row to delete | ||
This example shows how you may select a row to delete from a table. | ||
|
@@ -4109,17 +4116,73 @@ async def main(): | |
|
||
asyncio.run(main()) | ||
``` | ||
|
||
Example: Selecting rows to delete using a dataframe | ||
This example shows how you may select a row to delete from a table based on a dataframe. | ||
|
||
```python | ||
import asyncio | ||
import pandas as pd | ||
from synapseclient import Synapse | ||
from synapseclient.models import Table # Also works with `Dataset` | ||
|
||
syn = Synapse() | ||
syn.login() | ||
|
||
# Creating a pandas dataframe that contains the rows to delete. | ||
# In this example, we create a dataframe that specifies the first two rows of the table for deletion. | ||
# Assuming no changes have been made to the table so the ROW_VERSION is 1. | ||
|
||
df = pd.DataFrame({"ROW_ID": [1, 2], "ROW_VERSION": [1, 1]}) | ||
async def main(): | ||
await Table(id="syn1234").delete_rows_async(df=df) | ||
|
||
asyncio.run(main()) | ||
``` | ||
""" | ||
client = Synapse.get_client(synapse_client=synapse_client) | ||
results_from_query = await self.query_async(query=query, synapse_client=client) | ||
client.logger.info( | ||
f"Found {len(results_from_query)} rows to delete for given query: {query}" | ||
) | ||
|
||
# check if both query and df are None | ||
if query is None and df is None: | ||
raise ValueError("Either query or df must be provided.") | ||
if query is not None: | ||
rows_to_delete = await self.query_async(query=query, synapse_client=client) | ||
client.logger.info( | ||
f"Found {len(rows_to_delete)} rows to delete for given query: {query}" | ||
) | ||
elif df is not None: | ||
danlu1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
rows_to_delete = df | ||
if ( | ||
"ROW_ID" not in rows_to_delete.columns | ||
or "ROW_VERSION" not in rows_to_delete.columns | ||
): | ||
raise ValueError( | ||
"The dataframe must contain the 'ROW_ID' and 'ROW_VERSION' columns." | ||
) | ||
danlu1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# check the validity of the ROW_ID and ROW_VERSION columns | ||
existing_rows = await self.query_async( | ||
query=f"SELECT ROW_ID, ROW_VERSION FROM {self.id}", | ||
synapse_client=client, | ||
) | ||
# check if all ROW_ID and ROW_VERSION pair in the dataframe exist in the table | ||
merged = df.merge( | ||
existing_rows, on=["ROW_ID", "ROW_VERSION"], how="left", indicator=True | ||
) | ||
if not all(merged["_merge"] == "both"): | ||
discrepant_idx = merged.loc[merged["_merge"] != "both"].index | ||
raise ValueError( | ||
f"Rows with the following ROW_ID and ROW_VERSION pairs were not found in table {self.id}: {', '.join(map(str, discrepant_idx))}." | ||
) | ||
client.logger.info( | ||
f"Received {len(rows_to_delete)} rows to delete for given dataframe." | ||
) | ||
if self.__class__.__name__ in CLASSES_THAT_CONTAIN_ROW_ETAG: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @BryanFauble When I add test case for the below ValueError, I got the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So these other items are special @danlu1 . You can't actually delete rows of those Entity types, the reason is because there is a separate https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/Dataset.html If you notice that:
Doesn't contain the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for confirming! |
||
filtered_columns = results_from_query[["ROW_ID", "ROW_VERSION", "ROW_ETAG"]] | ||
if "ROW_ETAG" not in rows_to_delete.columns: | ||
raise ValueError( | ||
f"The dataframe must contain the 'ROW_ETAG' column when deleting rows from a {self.__class__.__name__}." | ||
) | ||
filtered_columns = rows_to_delete[["ROW_ID", "ROW_VERSION", "ROW_ETAG"]] | ||
else: | ||
filtered_columns = results_from_query[["ROW_ID", "ROW_VERSION"]] | ||
filtered_columns = rows_to_delete[["ROW_ID", "ROW_VERSION"]] | ||
|
||
filepath = f"{tempfile.mkdtemp()}/{self.id}_upload_{uuid.uuid4()}.csv" | ||
try: | ||
|
@@ -4138,7 +4201,7 @@ async def main(): | |
entity_id=self.id, changes=[upload_request] | ||
).send_job_and_wait_async(synapse_client=client, timeout=job_timeout) | ||
|
||
return results_from_query | ||
return rows_to_delete | ||
|
||
|
||
def infer_column_type_from_data(values: DATA_FRAME_TYPE) -> List[Column]: | ||
|
Uh oh!
There was an error while loading. Please reload this page.