Skip to content

Commit 0b71383

Browse files
committed
dlt: Hello, World!
1 parent 2f22f42 commit 0b71383

File tree

6 files changed

+218
-0
lines changed

6 files changed

+218
-0
lines changed

framework/dlt/.dlt/config.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Put your main configuration values here.
2+
#add_dlt_id = false
3+
#add_dlt_load_id = false
4+
5+
[runtime]
6+
7+
# The system log level of dlt.
8+
log_level="DEBUG"
9+
10+
# Use the `dlthub_telemetry` setting to enable/disable anonymous
11+
# usage data reporting, see https://dlthub.com/docs/reference/telemetry.
12+
dlthub_telemetry = false

framework/dlt/.dlt/secrets.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[destination.cratedb.credentials]
2+
host = "localhost"
3+
port = 5432
4+
username = "crate"
5+
password = ""
6+
7+
[destination.sqlalchemy.credentials]
8+
drivername = "crate"
9+
host = "localhost"
10+
port = 4200
11+
username = "crate"
12+
password = ""

framework/dlt/.gitignore

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# ignore secrets, virtual environments and typical python compilation artifacts
2+
# remark: Add it in this case, in order to provide out-of-the-box settings for localhost
3+
# secrets.toml
4+
# ignore basic python artifacts
5+
.env
6+
**/__pycache__/
7+
**/*.py[cod]
8+
**/*$py.class
9+
# ignore duckdb
10+
*.duckdb
11+
*.wal

framework/dlt/basic.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
# mypy: disable-error-code="no-untyped-def,arg-type"
2+
"""The Intro Pipeline Template contains the example from the docs intro page"""
3+
4+
from typing import Optional
5+
import pandas as pd
6+
import sqlalchemy as sa
7+
8+
import dlt
9+
from dlt.sources.helpers import requests
10+
11+
12+
CRATEDB_ADDRESS = "postgresql://crate:@localhost:5432/"
13+
14+
15+
def load_api_data() -> None:
16+
"""Load data from the chess api, for more complex examples use our rest_api source"""
17+
18+
# Create a dlt pipeline that will load
19+
# chess player data to the DuckDB destination
20+
pipeline = dlt.pipeline(
21+
pipeline_name="from_api",
22+
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
23+
dataset_name="doc",
24+
)
25+
26+
# Grab some player data from Chess.com API
27+
data = []
28+
for player in ["magnuscarlsen", "rpragchess"]:
29+
response = requests.get(f"https://api.chess.com/pub/player/{player}")
30+
response.raise_for_status()
31+
data.append(response.json())
32+
33+
# Extract, normalize, and load the data
34+
load_info = pipeline.run(
35+
data=data,
36+
table_name="chess_players",
37+
)
38+
print(load_info) # noqa: T201
39+
40+
41+
def load_pandas_data() -> None:
42+
"""Load data from a public csv via pandas"""
43+
44+
owid_disasters_csv = (
45+
"https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/"
46+
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/"
47+
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv"
48+
)
49+
df = pd.read_csv(owid_disasters_csv)
50+
51+
pipeline = dlt.pipeline(
52+
pipeline_name="from_csv",
53+
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
54+
dataset_name="doc",
55+
)
56+
load_info = pipeline.run(
57+
data=df,
58+
table_name="natural_disasters",
59+
)
60+
61+
print(load_info) # noqa: T201
62+
63+
64+
def load_sql_data() -> None:
65+
"""Load data from a sql database with sqlalchemy, for more complex examples use our sql_database source"""
66+
67+
# Use any SQL database supported by SQLAlchemy, below we use a public
68+
# MySQL instance to get data.
69+
# NOTE: you'll need to install pymysql with `pip install pymysql`
70+
# NOTE: loading data from public mysql instance may take several seconds
71+
engine = sa.create_engine(
72+
"mysql+pymysql://[email protected]:4497/Rfam"
73+
)
74+
75+
with engine.connect() as conn:
76+
# Select genome table, stream data in batches of 100 elements
77+
query = "SELECT * FROM genome LIMIT 1000"
78+
rows = conn.execution_options(yield_per=100).exec_driver_sql(query)
79+
80+
pipeline = dlt.pipeline(
81+
pipeline_name="from_database",
82+
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
83+
dataset_name="doc",
84+
)
85+
86+
# Convert the rows into dictionaries on the fly with a map function
87+
load_info = pipeline.run(
88+
data=map(lambda row: dict(row._mapping), rows),
89+
table_name="genome",
90+
)
91+
92+
print(load_info) # noqa: T201
93+
94+
95+
@dlt.resource(write_disposition="replace")
96+
def github_api_resource(api_secret_key: Optional[str] = dlt.secrets.value):
97+
from dlt.sources.helpers.rest_client import paginate
98+
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
99+
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator
100+
101+
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
102+
103+
# Github allows both authenticated and non-authenticated requests (with low rate limits)
104+
auth = BearerTokenAuth(api_secret_key) if api_secret_key else None
105+
for page in paginate(
106+
url,
107+
auth=auth,
108+
paginator=HeaderLinkPaginator(),
109+
params={"state": "open", "per_page": "100"},
110+
):
111+
yield page
112+
113+
114+
@dlt.source
115+
def github_api_source(api_secret_key: Optional[str] = dlt.secrets.value):
116+
return github_api_resource(api_secret_key=api_secret_key)
117+
118+
119+
def load_data_from_source():
120+
pipeline = dlt.pipeline(
121+
pipeline_name="github_api_pipeline",
122+
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
123+
dataset_name="doc",
124+
)
125+
load_info = pipeline.run(
126+
data=github_api_source(),
127+
table_name="github_api_data",
128+
)
129+
print(load_info) # noqa: T201
130+
131+
132+
if __name__ == "__main__":
133+
load_api_data()
134+
load_pandas_data()
135+
load_sql_data()

framework/dlt/pokemon.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""data load tool (dlt) — the open-source Python library for data loading
2+
3+
How to create a data loading pipeline with dlt and CrateDB in 3 seconds:
4+
5+
0. Configure `cratedb` destination in `.dlt/secrets.toml`.
6+
```toml
7+
[destination.cratedb.credentials]
8+
host = "localhost"
9+
port = 5432
10+
username = "crate"
11+
password = ""
12+
```
13+
14+
1. Write a pipeline script
15+
>>> import dlt
16+
>>> from dlt.sources.helpers import requests
17+
>>> dlt.run(
18+
... data=requests.get("https://pokeapi.co/api/v2/pokemon/").json()["results"],
19+
... destination="cratedb",
20+
... dataset_name="doc",
21+
... table_name="pokemon")
22+
23+
2. Run your pipeline script
24+
> $ python pokemon.py
25+
26+
3. See and query your data with autogenerated Streamlit app
27+
> $ dlt pipeline dlt_pokemon show
28+
29+
Or start with our pipeline template with sample PokeAPI (pokeapi.co) data loaded to bigquery
30+
31+
> $ dlt init pokemon bigquery
32+
33+
For more detailed info, see https://dlthub.com/docs/intro
34+
"""
35+
36+
37+
import dlt
38+
from dlt.sources.helpers import requests
39+
dlt.run(
40+
data=requests.get("https://pokeapi.co/api/v2/pokemon/").json()["results"],
41+
destination="cratedb",
42+
dataset_name="doc",
43+
table_name="pokemon")

framework/dlt/requirements.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Development
2+
dlt[cratedb] @ git+https://github.com/crate-workbench/dlt@cratedb
3+
4+
# Production
5+
# dlt[cratedb]>=1.12.0

0 commit comments

Comments
 (0)