Skip to content

feat: Adding pg_legacy_replication verified source using decoderbufs #589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 88 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
59e7557
fix: finally got pg_replication tests working as is
Sep 17, 2024
79220b7
feat: got decoderbufs to run and compile in docker
Sep 17, 2024
9de0835
chore: updated protobuf to latest compatible version
Sep 17, 2024
75a0f7f
chore: copying all files from pg_replication; format-lint is reformat…
Sep 18, 2024
73704af
wip: saving work
Sep 18, 2024
7d1b8e7
wip: saving work
Oct 1, 2024
ecbf98d
wip: saving work
Oct 2, 2024
3ed14da
wip: removed all references to publications
Oct 3, 2024
9fe0301
fix: applied suggested changes mentioned here https://github.com/dlt-…
Oct 8, 2024
197ba82
wip: saving work
Oct 8, 2024
c897ee0
wip: finally got snapshot to work
Oct 9, 2024
d303c04
chore: simply cleaning up
Oct 9, 2024
6566fe4
chore: need to find a better way to clean up the underlying engine
Oct 9, 2024
70d40a0
wip: handling begin/commit
Oct 11, 2024
f703431
wip: saving work
Oct 11, 2024
f001633
wip: saving work
Oct 11, 2024
c0df7c9
wip: saving work
Oct 14, 2024
aa464d5
wip: saving work
Oct 17, 2024
db09568
wip: making progress
Oct 17, 2024
c3c0518
wip: saving work
Oct 19, 2024
a5b1a87
refactor: some test refactoring
Oct 19, 2024
7fad621
wip: saving work
Oct 20, 2024
fbc65bc
wip: saving work
Oct 20, 2024
1299b60
wip: cleaning up + refactor
Oct 21, 2024
f44853b
wip: cleaning up + refactor
Oct 21, 2024
46200ca
wip: cleaning up + refactor
Oct 21, 2024
f0f0146
wip: slowly progressing
Oct 22, 2024
cd8d906
wip: all tests pass now to update docs and cleanup
Oct 22, 2024
02851f4
wip: still trying to get it work with all versions of dlt
Oct 22, 2024
beef6ea
wip
Oct 22, 2024
77242e8
wip: changing signature
Oct 23, 2024
f9cdf78
wip: finally got rid of those errors
Oct 23, 2024
327b44c
wip: correcting failing tests
Oct 23, 2024
a9a9bb7
wip: fixed working examples
Oct 23, 2024
37acc35
wip: more refactoring now docs... -_-
Oct 23, 2024
a90acee
wip: cleaning up
Oct 23, 2024
f927f13
wip: cleaning up
Oct 23, 2024
cc7ad61
wip: attempting to refactor to use dlt resources
Oct 24, 2024
f9c7694
wip: second test passing
Oct 24, 2024
927ae03
wip: all tests pass again now for refactoring
Oct 25, 2024
3f31752
wip: init_replication is now a dlt source
Oct 25, 2024
637a6e9
wip: more refactoring
Oct 25, 2024
8a8134b
wip: saving work until I can get hinting to work
Oct 25, 2024
ee3cb9c
wip: finally got something somewhat working
Oct 26, 2024
1727456
wip: done with coding now docs
Oct 27, 2024
81fdce8
fix: various performance improvements
Oct 28, 2024
8fbfc62
fix: minor corrections to handle old versions of postgres
Oct 28, 2024
fd4638b
fix: small type corrections for pg9.6
Oct 29, 2024
526eff3
fix: exposing table options for later arrow support
Oct 29, 2024
2f5ad15
wip: saving work for arrow
Oct 30, 2024
32063e2
wip: first test with arrow passing
Oct 30, 2024
28f463d
wip: almost done passing all tests
Oct 30, 2024
385e8a6
wip: some arrow tests are still not passing
Oct 30, 2024
a291b69
fix: done with pyarrow; too many issues with duckdb atm
Oct 31, 2024
ba23505
wip: some bug fixes
Oct 31, 2024
5993fb4
wip: small refactoring
Nov 3, 2024
6db693a
wip: duckdb needs patching, trying out new max_lsn
Nov 3, 2024
c53c9f9
wip: some refactoring of options to make certain features togglable
Nov 14, 2024
ba1c3fc
wip: lsn and deleted ts are optional
Nov 15, 2024
6b960df
feat: added optional transaction id
Nov 16, 2024
9fa9d98
feat: added optional commit timestamp
Nov 17, 2024
1947029
fix: never handled missing type and added text oid mapping
Nov 18, 2024
7a7ba30
fix: added some logging and bug fixes
Nov 20, 2024
a752581
chore: basic refactoring
Nov 27, 2024
4184ca9
fix: minor corrections
Dec 16, 2024
3c7232f
chore: reverting back to prev state
Dec 16, 2024
c8f1ad2
chore: rebasing 1.x branch onto my own
Dec 16, 2024
7024ce7
fix: corrected bug regarding column names
Dec 19, 2024
63b1de0
chore: minor fixes
Dec 19, 2024
e8b2a0c
chore: small perf fixes and aligning with more adt
Dec 20, 2024
4c33129
chore: refactoring and cleaning
Dec 20, 2024
0b7c151
chore: finished docstrings
Dec 22, 2024
ec72e36
bugfix: misuse of defaultdict
Dec 22, 2024
ecc6089
Finally done with docs
Dec 23, 2024
dd5a63b
fix: wasn't able to execute local tests without these settings
Dec 30, 2024
d377423
feat: added basic support for scalar array types
Jan 14, 2025
acdf446
chore: slight perf improvments for pg_arrays
Jan 15, 2025
a3dc99d
fix: it turns out pg_arrays are annoying found temp workaround
Jan 16, 2025
c9c5bcb
refactor: all sqlalchemy event code is done at engine configuration
Jan 22, 2025
d695afb
chore: bumped python to 3.9; small refactorings
Jan 22, 2025
8f45283
refactor: init_replication is now in pkg ns
Jan 22, 2025
41f8ded
fix: corrected bugs regarding inferring nullability wrong; refactored…
Jan 28, 2025
129b18a
fix: rolling back on managing conn lifecycle using context mgrs: it d…
Jan 29, 2025
9083611
fix: corrected regression with occasional datum_missinng values
Jan 30, 2025
864b746
fix: add support for ordinary json pg_type
Feb 7, 2025
a591618
fix: various fixes of bugs encountered during production
Feb 19, 2025
5d6790e
fix: various fixes related to pyarrow backends
Mar 3, 2025
f276b4a
chore: updating poetry + dlt
Jul 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
697 changes: 510 additions & 187 deletions poetry.lock

Large diffs are not rendered by default.

17 changes: 12 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ readme = "README.md"
packages = [{include = "sources"}]

[tool.poetry.dependencies]
python = ">=3.8.1,<3.13"
dlt = {version = "1.3.0", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb"]}
graphlib-backport = {version = "*", python = "<3.9"}
python = ">=3.9,<3.13"
dlt = {version = "1.8.1", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb"]}

[tool.poetry.group.dltpure.dependencies]
dlt = {version = "1.3.0", allow-prereleases = true}
dlt = {version = "1.8.1", allow-prereleases = true}

[tool.poetry.group.pytest.dependencies]
pytest = "^7.2.0"
Expand Down Expand Up @@ -45,6 +44,9 @@ pytest-mock = "^3.12.0"
twisted = "22.10.0"
pytest-forked = "^1.6.0"
pendulum = "^3.0.0"
types-protobuf = "^5.27.0.20240907"
pytest-cov = "^5.0.0"
mypy-protobuf = "^3.6.0"

[tool.poetry.group.sql_database.dependencies]
sqlalchemy = ">=1.4"
Expand All @@ -54,6 +56,11 @@ connectorx = ">=0.3.1"
[tool.poetry.group.pg_replication.dependencies]
psycopg2-binary = ">=2.9.9"

[tool.poetry.group.pg_legacy_replication.dependencies]
protobuf = ">=4.25"
psycopg2-binary = ">=2.9.9"
sqlalchemy = ">=1.4"

[tool.poetry.group.google_sheets.dependencies]
google-api-python-client = "^2.78.0"

Expand Down Expand Up @@ -116,4 +123,4 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.black]
include = '.*py$'
include = '.*py$'
6 changes: 5 additions & 1 deletion sources/.dlt/example.secrets.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ location = "US"
### Sources
[sources]

# local postgres
helpers.credentials="postgresql://loader:loader@localhost:5432/dlt_data"
pg_legacy_replication.credentials="postgresql://loader:loader@localhost:5432/dlt_data"

## chess pipeline
# the section below defines secrets for "chess_dlt_config_example" source in chess/__init__.py
[sources.chess]
secret_str="secret string" # a string secret
secret_str="secret string" # a string secret
130 changes: 130 additions & 0 deletions sources/pg_legacy_replication/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Postgres legacy replication
[Postgres](https://www.postgresql.org/) is one of the most popular relational database management systems. This verified source uses Postgres' replication functionality to efficiently process changes
in tables (a process often referred to as _Change Data Capture_ or CDC). It uses [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) and the optional `decoderbufs`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean that decoderbufs is optional? if not present, we decode on the client?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heavy lifting is done by the decoderbufs extension, which must be added if using a managed postgres like Cloud SQL, or compiled and installed on a self hosted postgres installation.

Detailed instructions can be found here : https://debezium.io/documentation/reference/stable/postgres-plugins.html#logical-decoding-output-plugin-installation

FYI decoderbufs is the default logical replication plugin used by Debezium.

[output plugin](https://github.com/debezium/postgres-decoderbufs), which is a shared library which must be built or enabled.

| Source | Description |
|---------------------|-------------------------------------------------|
| replication_source | Load published messages from a replication slot |

## Install decoderbufs

Instructions can be found [here](https://github.com/debezium/postgres-decoderbufs?tab=readme-ov-file#building)

Below is an example installation in a docker image:
```Dockerfile
FROM postgres:14

# Install dependencies required to build decoderbufs
RUN apt-get update
RUN apt-get install -f -y \
software-properties-common \
build-essential \
pkg-config \
git

RUN apt-get install -f -y \
postgresql-server-dev-14 \
libprotobuf-c-dev && \
rm -rf /var/lib/apt/lists/*

ARG decoderbufs_version=v1.7.0.Final
RUN git clone https://github.com/debezium/postgres-decoderbufs -b $decoderbufs_version --single-branch && \
cd postgres-decoderbufs && \
make && make install && \
cd .. && \
rm -rf postgres-decoderbufs
```

## Initialize the pipeline

```bash
$ dlt init pg_legacy_replication duckdb
```

This uses `duckdb` as destination, but you can choose any of the supported [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Set up user

The Postgres user needs to have the `LOGIN` and `REPLICATION` attributes assigned:

```sql
CREATE ROLE replication_user WITH LOGIN REPLICATION;
```

It also needs various read only privileges on the database (by first connecting to the database):

```sql
\connect dlt_data
GRANT USAGE ON SCHEMA schema_name TO replication_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;
```

## Add credentials
1. Open `.dlt/secrets.toml`.
2. Enter your Postgres credentials:

```toml
[sources.pg_legacy_replication]
credentials="postgresql://replication_user:<<password>>@localhost:5432/dlt_data"
```
3. Enter credentials for your chosen destination as per the [docs](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Run the pipeline

1. Install the necessary dependencies by running the following command:

```bash
pip install -r requirements.txt
```

1. Now the pipeline can be run by using the command:

```bash
python pg_legacy_replication_pipeline.py
```

1. To make sure that everything is loaded as expected, use the command:

```bash
dlt pipeline pg_replication_pipeline show
```

# Differences between `pg_legacy_replication` and `pg_replication`

## Overview

`pg_legacy_replication` is a fork of the verified `pg_replication` source. The primary goal of this fork is to provide logical replication capabilities for Postgres instances running versions
earlier than 10, when the `pgoutput` plugin was not yet available. This fork draws inspiration from the original `pg_replication` source and the `decoderbufs` library,
which is actively maintained by Debezium.

## Key Differences from `pg_replication`

### Replication User Ownership Requirements
One of the limitations of native Postgre replication is that the replication user must **own** the tables in order to add them to a **publication**.
Additionally, once a table is added to a publication, it cannot be removed, requiring the creation of a new replication slot, which results in the loss of any state tracking.

### Limitations in `pg_replication`
The current pg_replication implementation has several limitations:
- It supports only a single initial snapshot of the data.
- It requires `CREATE` access to the source database in order to perform the initial snapshot.
- **Superuser** access is required to replicate entire Postgres schemas.
While the `pg_legacy_replication` source theoretically reads the entire WAL across all schemas, the current implementation using dlt transformers restricts this functionality.
In practice, this has not been a common use case.
- The implementation is opinionated in its approach to data transfer. Specifically, when updates or deletes are required, it defaults to a `merge` write disposition,
which replicates live data without tracking changes over time.

### Features of `pg_legacy_replication`

This fork of `pg_replication` addresses the aforementioned limitations and introduces the following improvements:
- Adheres to the dlt philosophy by treating the WAL as an upstream resources. This replication stream is then transformed into various DLT resources, with customizable options for write disposition,
file formats, type hints, etc., specified at the resource level rather than at the source level.
- Supports an initial snapshot of all tables using the transaction slot isolation level. Additionally, ad-hoc snapshots can be performed using the serializable deferred isolation level,
similar to `pg_dump`.
- Emphasizes the use of `pyarrow` and parquet formats for efficient data storage and transfer. A dedicated backend has been implemented to support these formats.
- Replication messages are decoded using Protocol Buffers (protobufs) in C, rather than relying on native Python byte buffer parsing. This ensures greater efficiency and performance.

## Next steps
- Add support for the [wal2json](https://github.com/eulerto/wal2json) replication plugin. This is particularly important for environments such as **Amazon RDS**, which supports `wal2json`,
- as opposed to on-premise or Google Cloud SQL instances that support `decoderbufs`.
Loading