Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"client_id": None,
"client_secret": None,
"start_date": None,
'filters': None
}

FORCED_FULL_TABLE = {
Expand Down Expand Up @@ -518,6 +519,7 @@ def main_impl():
select_fields_by_default=CONFIG.get("select_fields_by_default"),
default_start_date=CONFIG.get("start_date"),
api_type=CONFIG.get("api_type"),
filters=CONFIG.get("filters")
)
sf.login()

Expand Down
33 changes: 22 additions & 11 deletions tap_salesforce/salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def __init__(
select_fields_by_default=None,
default_start_date=None,
api_type=None,
filters=None
):
self.api_type = api_type.upper() if api_type else None
self.session = requests.Session()
Expand All @@ -241,6 +242,8 @@ def __init__(
self.select_fields_by_default = select_fields_by_default is True or (
isinstance(select_fields_by_default, str) and select_fields_by_default.lower() == "true"
)
self.filters = filters if filters else []

self.rest_requests_attempted = 0
self.jobs_completed = 0
self.data_url = "{}/services/data/v60.0/{}"
Expand Down Expand Up @@ -393,22 +396,30 @@ def get_start_date(self, state, catalog_entry):
def _build_query_string(self, catalog_entry, start_date, end_date=None, order_by_clause=True):
selected_properties = self._get_selected_properties(catalog_entry)

query = "SELECT {} FROM {}".format(",".join(selected_properties), catalog_entry["stream"])
query = "SELECT {} FROM {}".format(",".join(selected_properties), catalog_entry['stream'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jrsperry. Can we revert these styling changes? If just to make reviewing the actual changes easier.

Thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sure. Oops


catalog_metadata = metadata.to_map(catalog_entry["metadata"])
replication_key = catalog_metadata.get((), {}).get("replication-key")
catalog_metadata = metadata.to_map(catalog_entry['metadata'])
replication_key = catalog_metadata.get((), {}).get('replication-key')

# initialize where clauses with filters (if any)
where_clauses = self.filters.copy()
LOGGER.info(f"where clauses: {where_clauses}")

# Add replication key conditions if they exist
if replication_key:
where_clause = f" WHERE {replication_key} >= {start_date} "
end_date_clause = f" AND {replication_key} < {end_date}" if end_date else ""
where_clauses.append("{} >= {}".format(replication_key, start_date))
if end_date:
where_clauses.append("{} < {}".format(replication_key, end_date))

order_by = f" ORDER BY {replication_key} ASC"
if order_by_clause:
return query + where_clause + end_date_clause + order_by
# Combine all WHERE clauses
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)

return query + where_clause + end_date_clause
else:
return query
# Add ORDER BY clause
if replication_key and order_by_clause:
query += " ORDER BY {} ASC".format(replication_key)

return query

def query(self, catalog_entry, state):
if self.api_type == BULK_API_TYPE:
Expand Down
Loading