Skip to content

Commit e50a666

Browse files
committed
refactor
1 parent 1443421 commit e50a666

File tree

6 files changed

+17
-16
lines changed

6 files changed

+17
-16
lines changed

sources/asana_dlt/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
"""
88

99
import typing as t
10-
from typing import Any, Iterable
10+
from typing import Any, Iterable, Sequence
1111

1212
import dlt
1313
from dlt.common.typing import TDataItem
14+
from dlt.extract.resource import DltResource
1415

1516
from .helpers import get_client
1617
from .settings import (
@@ -28,7 +29,7 @@
2829

2930

3031
@dlt.source
31-
def asana_source() -> Any: # should be Sequence[DltResource]:
32+
def asana_source(access_token: str = dlt.secrets.value) -> Sequence[DltResource]:
3233
"""
3334
The main function that runs all the other functions to fetch data from Asana.
3435
Returns:

sources/bing_webmaster/__init__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,27 @@
66
"""
77

88
import time
9-
from typing import Iterable, Iterator, List, Sequence
9+
from typing import Dict, Iterator, List, Sequence
1010

1111
import dlt
1212
from dlt.common import logger
13-
from dlt.common.typing import DictStrAny, DictStrStr
13+
from dlt.common.typing import DictStrAny
1414
from dlt.sources import DltResource
1515

1616
from .helpers import get_stats_with_retry, parse_response
1717

1818

1919
@dlt.source(name="bing_webmaster")
2020
def source(
21-
site_urls: List[str] = None, site_url_pages: Iterable[DictStrStr] = None
21+
site_urls: List[str] = dlt.config.value,
22+
site_url_pages: List[Dict[str, str]] = dlt.config.value,
2223
) -> Sequence[DltResource]:
2324
"""
2425
A dlt source for the Bing Webmaster api.
2526
It groups resources for the APIs which return organic search traffic statistics
2627
Args:
2728
site_urls: List[str]: A list of site_urls, e.g, ["dlthub.com", "dlthub.de"]. Use this if you need the weekly traffic per site_url and page
28-
site_url_pages: Iterable[DictStrStr]: A list of pairs of site_url and page. Use this if you need the weekly traffic per site_url, page, and query
29+
site_url_pages: Iterable[Dict[str, str]]: A list of pairs of site_url and page. Use this if you need the weekly traffic per site_url, page, and query
2930
Returns:
3031
Sequence[DltResource]: A sequence of resources that can be selected from including page_stats and page_query_stats.
3132
"""
@@ -70,7 +71,7 @@ def page_stats(
7071
table_name="bing_page_query_stats",
7172
)
7273
def page_query_stats(
73-
site_url_pages: Iterable[DictStrStr],
74+
site_url_pages: List[Dict[str, str]],
7475
api_key: str = dlt.secrets.value,
7576
) -> Iterator[Iterator[DictStrAny]]:
7677
"""
@@ -80,7 +81,7 @@ def page_query_stats(
8081
https://learn.microsoft.com/en-us/dotnet/api/microsoft.bing.webmaster.api.interfaces.iwebmasterapi.getpagequerystats
8182
8283
Args:
83-
site_url_page (Iterable[DictStrStr]): Iterable of site_url and pages to retrieve statistics for. Can be result of a SQL query, a parsed sitemap, etc.
84+
site_url_page (List[Dict[str,str]]): Iterable of site_url and pages to retrieve statistics for. Can be result of a SQL query, a parsed sitemap, etc.
8485
Yields:
8586
Iterator[Dict[str, Any]]: An iterator over list of organic traffic statistics.
8687
"""

sources/chess/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
@dlt.source(name="chess")
1616
def source(
17-
players: List[str], start_month: str = None, end_month: str = None
17+
players: List[str] = dlt.config.value,
18+
start_month: str = None,
19+
end_month: str = None,
1820
) -> Sequence[DltResource]:
1921
"""
2022
A dlt source for the chess.com api. It groups several resources (in this case chess.com API endpoints) containing

sources/kafka/__init__.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
standalone=True,
2828
)
2929
def kafka_consumer(
30-
topics: Union[str, List[str]],
30+
topics: List[str] = dlt.config.value,
3131
credentials: Union[KafkaCredentials, Consumer] = dlt.secrets.value,
3232
msg_processor: Optional[
3333
Callable[[Message], Dict[str, Any]]
@@ -60,9 +60,6 @@ def kafka_consumer(
6060
Yields:
6161
Iterable[TDataItem]: Kafka messages.
6262
"""
63-
if not isinstance(topics, list):
64-
topics = [topics]
65-
6663
if isinstance(credentials, Consumer):
6764
consumer = credentials
6865
elif isinstance(credentials, KafkaCredentials):

sources/kafka_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def custom_msg_processor(msg: confluent_kafka.Message) -> Dict[str, Any]:
4747
"data": msg.value().decode("utf-8"),
4848
}
4949

50-
data = kafka_consumer("books", msg_processor=custom_msg_processor)
50+
data = kafka_consumer(["books"], msg_processor=custom_msg_processor)
5151

5252
info = pipeline.run(data)
5353
print(info)
@@ -63,7 +63,7 @@ def load_starting_from_date() -> None:
6363
)
6464

6565
from_date = datetime(2023, 12, 15)
66-
data = kafka_consumer("books", start_from=from_date)
66+
data = kafka_consumer(["books"], start_from=from_date)
6767

6868
info = pipeline.run(data)
6969
print(info)

sources/pipedrive/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def parsed_mapping(
174174

175175

176176
@dlt.resource(primary_key="id", write_disposition="merge")
177-
def leads(
177+
def leads(
178178
pipedrive_api_key: str = dlt.secrets.value,
179179
update_time: dlt.sources.incremental[str] = dlt.sources.incremental(
180180
"update_time", "1970-01-01 00:00:00"

0 commit comments

Comments
 (0)