Skip to content

Commit

Permalink
Fix: Syncs shows "0 records" synced when using incremental sync (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
avirajsingh7 authored Jul 8, 2024
1 parent aea02e1 commit 3e1ffb0
Showing 1 changed file with 33 additions and 1 deletion.
34 changes: 33 additions & 1 deletion airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any, Optional, cast

import jsonschema
import pendulum
Expand Down Expand Up @@ -222,6 +222,14 @@ def get_available_streams(self) -> list[str]:
"""Get the available streams from the spec."""
return [s.name for s in self.discovered_catalog.streams]

def _get_incremental_stream_names(self) -> list[str]:
"""Get the name of streams that support incremental sync."""
return [
stream.name
for stream in self.discovered_catalog.streams
if SyncMode.incremental in stream.supported_sync_modes
]

def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
"""Call spec on the connector.
Expand Down Expand Up @@ -624,6 +632,19 @@ def _log_sync_start(
event_type=EventType.SYNC,
)

def _log_incremental_streams(
self,
*,
incremental_streams: Optional[set[str]] = None,
) -> None:
"""Log the streams which are using incremental sync mode."""
log_message = (
"The following streams are currently using incremental sync:\n"
f"{incremental_streams}\n"
"To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
)
print(log_message)

def _log_stream_read_start(self, stream: str) -> None:
print(f"Read started on stream: {stream} at {pendulum.now().format('HH:mm:ss')}...")

Expand Down Expand Up @@ -737,6 +758,17 @@ def read(

self._log_sync_start(cache=cache)

# Log incremental stream if incremental streams are known
if state_provider and state_provider.known_stream_names:
# Retrieve set of the known streams support which support incremental sync
incremental_streams = (
set(self._get_incremental_stream_names())
& state_provider.known_stream_names
& set(self.get_selected_streams())
)
if incremental_streams:
self._log_incremental_streams(incremental_streams=incremental_streams)

cache_processor = cache.get_record_processor(
source_name=self.name,
catalog_provider=CatalogProvider(self.configured_catalog),
Expand Down

0 comments on commit 3e1ffb0

Please sign in to comment.