Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion airbyte-integrations/connectors/source-orb/source_orb/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,13 @@ def enrich_ledger_entries_with_event_data(self, ledger_entries):
# Build up a list of the subset of ledger entries we are expected
# to enrich with event metadata.
event_id_to_ledger_entries = {}
min_created_at_timestamp = pendulum.now()
max_created_at_timestamp = pendulum.from_timestamp(0)

for entry in ledger_entries:
maybe_event_id: Optional[str] = entry.get("event_id")
min_created_at_timestamp = min(min_created_at_timestamp, pendulum.parse(entry["created_at"]))
max_created_at_timestamp = max(max_created_at_timestamp, pendulum.parse(entry["created_at"]))
if maybe_event_id:
# There can be multiple entries with the same event ID
event_id_to_ledger_entries[maybe_event_id] = event_id_to_ledger_entries.get(maybe_event_id, []) + [entry]
Expand Down Expand Up @@ -621,7 +626,11 @@ def modify_ledger_entry_schema(ledger_entry):

# The events endpoint is a `POST` endpoint which expects a list of
# event_ids to filter on
request_filter_json = {"event_ids": list(event_id_to_ledger_entries)}
request_filter_json = {
"event_ids": list(event_id_to_ledger_entries),
"timeframe_start": min_created_at_timestamp.to_iso8601_string(),
"timeframe_end": max_created_at_timestamp.add(minutes=1).to_iso8601_string()
}

# Prepare request with self._session, which should
# automatically deal with the authentication header.
Expand Down