diff --git a/airbyte-integrations/connectors/source-orb/source_orb/source.py b/airbyte-integrations/connectors/source-orb/source_orb/source.py index 0005a9b70407..45e7a9fc4538 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/source.py +++ b/airbyte-integrations/connectors/source-orb/source_orb/source.py @@ -590,8 +590,13 @@ def enrich_ledger_entries_with_event_data(self, ledger_entries): # Build up a list of the subset of ledger entries we are expected # to enrich with event metadata. event_id_to_ledger_entries = {} + min_created_at_timestamp = pendulum.now() + max_created_at_timestamp = pendulum.from_timestamp(0) + for entry in ledger_entries: maybe_event_id: Optional[str] = entry.get("event_id") + min_created_at_timestamp = min(min_created_at_timestamp, pendulum.parse(entry["created_at"])) + max_created_at_timestamp = max(max_created_at_timestamp, pendulum.parse(entry["created_at"])) if maybe_event_id: # There can be multiple entries with the same event ID event_id_to_ledger_entries[maybe_event_id] = event_id_to_ledger_entries.get(maybe_event_id, []) + [entry] @@ -621,7 +626,11 @@ def modify_ledger_entry_schema(ledger_entry): # The events endpoint is a `POST` endpoint which expects a list of # event_ids to filter on - request_filter_json = {"event_ids": list(event_id_to_ledger_entries)} + request_filter_json = { + "event_ids": list(event_id_to_ledger_entries), + "timeframe_start": min_created_at_timestamp.to_iso8601_string(), + "timeframe_end": max_created_at_timestamp.add(minutes=1).to_iso8601_string() + } # Prepare request with self._session, which should # automatically deal with the authentication header.