Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
fail_on_extra_columns: false
empty_streams: ["credits_ledger_entries"]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this change for?

incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Down
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-orb/bootstrap.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ Orb is a REST API. Connector has the following streams, and all of them support
* [Plans](https://docs.withorb.com/reference/list-plans)
* [Customers](https://docs.withorb.com/reference/list-customers)
* [Credits Ledger Entries](https://docs.withorb.com/reference/view-credits-ledger)
* [Invoices](https://docs.withorb.com/docs/orb-docs/api-reference/schemas/invoice)

Note that the Credits Ledger Entries must read all Customers for an incremental sync, but will only incrementally return new ledger entries for each customer.

Since the Orb API does not allow querying objects based on `updated_at`, these incremental syncs will capture updates to newly created objects but not resources updated after object creation.
Since the Orb API does not allow querying objects based on `updated_at`, these incremental syncs will capture updates to newly created objects but not resources updated after object creation. Use a full resync in order to capture newly updated entries.

## Pagination

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@
"FDWRvxuBUiFfZech": {
"timeframe_start": "2122-01-01T00:00:00Z"
}
},
"invoices": {
"invoice_date": "2122-01-01T00:00:00Z"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,24 @@
},
{
"stream": {
"name": "credits_ledger_entries",
"name": "invoices",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["created_at"],
"default_cursor_field": ["invoice_date"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "subscription_usage",
"name": "credits_ledger_entries",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["timeframe_start"],
"source_defined_primary_key": [
["subscription_id"],
["billable_metric_id"],
["timeframe_start"]
]
"default_cursor_field": ["created_at"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"api_key": "<sample_api_key>",
"start_date": "2023-01-25T00:00:00Z"
"start_date": "2022-01-25T00:00:00Z"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@
"someId": {
"timeframe_start": "2022-01-01T00:00:00Z"
}
},
"invoices": {
"invoice_date": "2022-01-01T00:00:00Z"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"properties": {
"id": {
"type": "string"
},
"created_at": {
"type": ["null", "string"],
"format": "date-time"
},
"invoice_date": {
"type": ["string"],
"format": "date-time"
},
"due_date": {
"type": ["string"],
"format": "date-time"
},
"invoice_pdf": {
"type": ["null", "string"]
},
"subtotal": {
"type": ["string"]
},
"total": {
"type": ["string"]
},
"amount_due": {
"type": ["string"]
},
"status": {
"type": ["string"]
},
"memo": {
"type": ["null", "string"]
},
"issue_failed_at": {
"type": ["null", "string"],
"format": "date-time"
},
"sync_failed_at": {
"type": ["null", "string"],
"format": "date-time"
},
"payment_failed_at": {
"type": ["null", "string"],
"format": "date-time"
},
"payment_started_at": {
"type": ["null", "string"],
"format": "date-time"
},
"voided_at": {
"type": ["null", "string"],
"format": "date-time"
},
"paid_at": {
"type": ["null", "string"],
"format": "date-time"
},
"issued_at": {
"type": ["null", "string"],
"format": "date-time"
},
"hosted_invoice_url": {
"type": ["null", "string"]
},
"line_items": {
"type": ["array"],
"items": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"quantity": {
"type": "number"
},
"amount": {
"type": "string"
},
"name": {
"type": "string"
},
"start_date": {
"type": ["null", "string"],
"format": "date-time"
},
"end_date": {
"type": ["null", "string"],
"format": "date-time"
}
}
}
},
"subscription": {
"type": ["object", "null"],
"properties": {
"id": {
"type": "string"
}
}
}
},
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add more to the schema here.

"required": ["id", "created_at", "invoice_date", "due_date", "subtotal", "total", "amount_due", "status"]
}

34 changes: 33 additions & 1 deletion airbyte-integrations/connectors/source-orb/source_orb/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMa
if state_based_start_timestamp:
# This may (reasonably) override the existing `created_at[gte]` set based on the start_date
# of the stream, as configured.
params["created_at[gte]"] = state_based_start_timestamp
params[f"{self.cursor_field}[gte]"] = state_based_start_timestamp
return params


Expand Down Expand Up @@ -448,6 +448,37 @@ def path(self, **kwargs) -> str:
return "plans"


class Invoices(IncrementalOrbStream):
"""
Fetches non-draft invoices, including those that are paid, issued, void, or synced.
API Docs: https://docs.withorb.com/docs/orb-docs/api-reference/operations/list-invoices
"""

@property
def cursor_field(self) -> str:
"""
Invoices created in the past may be newly issued, so we store state on
`invoice_date` instead.
"""
return "invoice_date"

def path(self, **kwargs) -> str:
return "invoices"

def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
request_params = super().request_params(stream_state, **kwargs)
# This doesn't make sense for invoices, since the cursor field is `invoice_date`, and the
# created_at for an invoice isn't meaningful, so any attempt to use it should be prevented.
if "created_at[gte]" in request_params:
del request_params["created_at[gte]"]

# Filter to all statuses. Note that if you're currently expecting the status of the invoice
# to update at the sink, you should periodically still expect to re-sync this connector to
# fetch updates.
request_params["status[]"] = ["void", "paid", "issued", "synced"]
return request_params


class CreditsLedgerEntries(IncrementalOrbStream):
page_size = 500
"""
Expand Down Expand Up @@ -705,6 +736,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Customers(authenticator=authenticator, lookback_window_days=lookback_window, start_date=start_date),
Subscriptions(authenticator=authenticator, lookback_window_days=lookback_window, start_date=start_date),
Plans(authenticator=authenticator, lookback_window_days=lookback_window, start_date=start_date),
Invoices(authenticator=authenticator, lookback_window_days=lookback_window, start_date=start_date),
CreditsLedgerEntries(
authenticator=authenticator,
lookback_window_days=lookback_window,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@
import responses
from airbyte_cdk.models import SyncMode
from pytest import fixture
from source_orb.source import CreditsLedgerEntries, Customers, IncrementalOrbStream, OrbStream, Plans, Subscriptions, SubscriptionUsage
from source_orb.source import (
CreditsLedgerEntries,
Customers,
IncrementalOrbStream,
Invoices,
OrbStream,
Plans,
Subscriptions,
SubscriptionUsage,
)


@fixture
Expand Down Expand Up @@ -89,6 +98,34 @@ def test_request_params(patch_incremental_base_class, mocker, config, current_st
assert stream.request_params(**inputs) == expected_params


@pytest.mark.parametrize(
("config", "current_stream_state", "next_page_token", "expected_params"),
[
(
{},
dict(invoice_date="2022-01-25T12:00:00+00:00"),
{"cursor": "f96594d0-8220-11ec-a8a3-0242ac120002"},
{"invoice_date[gte]": "2022-01-25T12:00:00+00:00", "cursor": "f96594d0-8220-11ec-a8a3-0242ac120002"},
),
({}, dict(invoice_date="2022-01-25T12:00:00+00:00"), None, {"invoice_date[gte]": "2022-01-25T12:00:00+00:00"}),
# Honors lookback_window_days
(
dict(lookback_window_days=3),
dict(invoice_date="2022-01-25T12:00:00+00:00"),
None,
{"invoice_date[gte]": "2022-01-22T12:00:00+00:00"},
),
({}, {}, None, None),
],
)
def test_invoices_request_params(patch_incremental_base_class, mocker, config, current_stream_state, next_page_token, expected_params):
stream = Invoices(**config)
inputs = {"stream_state": current_stream_state, "next_page_token": next_page_token}
expected_params = expected_params or {}
expected_params["limit"] = OrbStream.page_size
expected_params["status[]"] = ['void', 'paid', 'issued', 'synced']
assert stream.request_params(**inputs) == expected_params

# We have specific unit tests for CreditsLedgerEntries incremental stream
# because that employs slicing logic

Expand Down Expand Up @@ -125,6 +162,29 @@ def test_credits_ledger_entries_get_updated_state(mocker, current_stream_state,
assert stream.get_updated_state(**inputs) == expected_state


@pytest.mark.parametrize(
("current_stream_state", "latest_record", "expected_state"),
[
# No state
(
{},
dict(invoice_date="2022-01-26T12:00:00+00:00"),
dict(invoice_date="2022-01-26T12:00:00+00:00"),
),
# Existing state
(
dict(invoice_date="2022-01-26T12:00:00+00:00"),
dict(invoice_date="2023-01-26T12:00:00+00:00"),
dict(invoice_date="2023-01-26T12:00:00+00:00"),
),
],
)
def test_invoices_get_updated_state(mocker, current_stream_state, latest_record, expected_state):
stream = Invoices()
inputs = {"current_stream_state": current_stream_state, "latest_record": latest_record}
assert stream.get_updated_state(**inputs) == expected_state


def test_credits_ledger_entries_stream_slices(mocker):
mocker.patch.object(
Customers, "read_records", return_value=iter([{"id": "1", "name": "Customer Foo"}, {"id": "18", "name": "Customer Bar"}])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ def test_streams(mocker):
sample_config = {"api_key": "test-token", "start_date": "2023-01-25T00:00:00Z"}
config_mock.get.side_effect = sample_config.get
streams = source.streams(config_mock)
expected_streams_number = 5
expected_streams_number = 6
assert len(streams) == expected_streams_number
1 change: 1 addition & 0 deletions docs/integrations/sources/orb.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ an Orb Account and API Key.

| Version | Date | Pull Request | Subject |
| --- | --- | --- | --- |
| 1.1.0 | 2023-03-26 | | Add Invoices incremental stream
| 1.0.0 | 2023-02-02 | [21951](https://github.com/airbytehq/airbyte/pull/21951) | Add SubscriptionUsage stream, and made `start_date` a required field
| 0.1.4 | 2022-10-07 | [17761](https://github.com/airbytehq/airbyte/pull/17761) | Fix bug with enriching ledger entries with multiple credit blocks
| 0.1.3 | 2022-08-26 | [16017](https://github.com/airbytehq/airbyte/pull/16017) | Add credit block id to ledger entries
Expand Down