Skip to content
Open
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
51ce6e4
kafka streaming support
May 20, 2022
72affb9
kafka item exporter
May 21, 2022
fdd3ada
initial working version of kafka item exporter
May 21, 2022
ed7e8d6
formatted code
May 21, 2022
2fea40d
updated gitignore
May 21, 2022
2b20177
removed verbose logging
May 21, 2022
72ef5cf
formatted code and fixed start warm script
May 21, 2022
5b1975d
added suport to publish to kafka
Jun 7, 2022
aa53133
added token_address, fixed params
saurabhdaga-merkle Jun 21, 2022
373f1b7
added token_address, fixed params
saurabhdaga-merkle Jun 22, 2022
95b79f8
unnecessary file
saurabhdaga-merkle Jun 22, 2022
268e573
unnecessary file
saurabhdaga-merkle Jun 22, 2022
3f75768
comment
saurabhdaga-merkle Jun 26, 2022
94fb3de
add token_address key
saurabhdaga-merkle Jun 27, 2022
e9b5efe
add token_address key
saurabhdaga-merkle Jun 27, 2022
197bc80
add token_address key
saurabhdaga-merkle Jun 27, 2022
a5c6082
add token_address key
saurabhdaga-merkle Jul 4, 2022
7d5427b
warm
saurabhdaga-merkle Jul 4, 2022
5bd3864
warm
saurabhdaga-merkle Jul 4, 2022
912515d
warm
saurabhdaga-merkle Jul 4, 2022
9e4679f
warm
saurabhdaga-merkle Jul 4, 2022
4d70aa1
doppler
saurabhdaga-merkle Jul 6, 2022
d8b91e1
doppler testing
saurabhdaga-merkle Jul 7, 2022
cab01bf
start warm
saurabhdaga-merkle Jul 7, 2022
44f8785
start warm
saurabhdaga-merkle Jul 7, 2022
09485de
start warm
saurabhdaga-merkle Jul 7, 2022
829741b
start warm
saurabhdaga-merkle Jul 7, 2022
82893b8
start warm
saurabhdaga-merkle Jul 7, 2022
1a47680
start warm
saurabhdaga-merkle Jul 7, 2022
cf453c3
start warm
saurabhdaga-merkle Jul 7, 2022
81d17c5
start warm
saurabhdaga-merkle Jul 7, 2022
3024795
start warm
saurabhdaga-merkle Jul 7, 2022
93f6180
start warm
saurabhdaga-merkle Jul 7, 2022
70b69ec
start warm
saurabhdaga-merkle Jul 7, 2022
583b42a
start warm
saurabhdaga-merkle Jul 7, 2022
0d4e9c7
start warm
saurabhdaga-merkle Jul 7, 2022
1568706
start hot
saurabhdaga-merkle Jul 7, 2022
4f69a33
start hot
saurabhdaga-merkle Jul 7, 2022
b6d32eb
start hot
saurabhdaga-merkle Jul 7, 2022
fff8631
start hot
saurabhdaga-merkle Jul 7, 2022
f2a10f4
v2 kafka writes
akshay-ghy Aug 29, 2023
81d3804
start-hot.sh update
akshay-ghy Aug 29, 2023
f63ee56
python3.9 support
akshay-ghy Aug 29, 2023
5d52f06
python version
Aug 29, 2023
87b93a1
add kafka to setup
akshay-ghy Aug 29, 2023
dc23aa1
all changes done
akshay-ghy Aug 29, 2023
f691a5f
changed API for receipt in stream
Jan 30, 2024
a76db63
Delete .idea directory
akshay-ghy Jan 30, 2024
81e8c6f
bump version
akshay-ghy Jan 30, 2024
4eb71a3
Merge pull request #15 from merklescience/temp
akshay-ghy Jan 30, 2024
82b5046
fix streaming code
akshay-ghy Jan 31, 2024
d5d5065
ignore last sync block
akshay-ghy Feb 1, 2024
580ebf2
make lag 0 for hot
akshay-ghy Feb 1, 2024
9f5b6e5
gzip compression and input data
akshay-ghy Oct 8, 2024
5212a1b
modify start-hot.sh
akshay-ghy Oct 8, 2024
dde6e75
make key none for round robin partioning
akshay-ghy May 19, 2025
32b356b
change compression to lz4
akshay-ghy May 19, 2025
d67bef6
compression back to gzip
akshay-ghy May 19, 2025
03891db
similar configs to polygon & bsc
akshay-ghy May 20, 2025
3b448e3
tune batch size
akshay-ghy May 20, 2025
92190d5
increase batch size
akshay-ghy May 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# These are supported funding model platforms

github: [medvedev1088]

4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ coverage.xml
.venv
venv/
ENV/

last_synced_block.txt
pyrightconfig.json
pyrightconfig.json
7 changes: 6 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,10 @@ WORKDIR /$PROJECT_DIR
COPY . .
RUN apk add --no-cache gcc musl-dev #for C libraries: <limits.h> <stdio.h>
RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/
RUN apt-get update && apt-get install -y apt-transport-https ca-certificates curl gnupg && \
curl -sLf --retry 3 --tlsv1.2 --proto "=https" 'https://packages.doppler.com/public/cli/gpg.DE2A7741A397C129.key' | apt-key add - && \
echo "deb https://packages.doppler.com/public/cli/deb/debian any-version main" | tee /etc/apt/sources.list.d/doppler-cli.list && \
apt-get update && \
apt-get -y install doppler

ENTRYPOINT ["python", "ethereumetl"]
CMD ["doppler", "run", "--", "python", "ethereumetl"]
13 changes: 12 additions & 1 deletion Dockerfile_with_streaming
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,21 @@ RUN mkdir /$PROJECT_DIR
WORKDIR /$PROJECT_DIR
COPY . .
RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/[streaming]
RUN apt-get update && apt-get install -y apt-transport-https ca-certificates curl gnupg && \
curl -sLf --retry 3 --tlsv1.2 --proto "=https" 'https://packages.doppler.com/public/cli/gpg.DE2A7741A397C129.key' | apt-key add - && \
echo "deb https://packages.doppler.com/public/cli/deb/debian any-version main" | tee /etc/apt/sources.list.d/doppler-cli.list && \
apt-get update && \
apt-get -y install doppler

RUN (curl -Ls https://cli.doppler.com/install.sh || wget -qO- https://cli.doppler.com/install.sh) | sh

ARG DOPPLER_TOKEN

ENV DOPPLER_TOKEN ${DOPPLER_TOKEN}

# Add Tini
ENV TINI_VERSION v0.18.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
RUN chmod +x /tini

ENTRYPOINT ["/tini", "--", "python", "ethereumetl"]
ENTRYPOINT ["doppler", "run", "--", "/tini", "--", "python", "ethereumetl"]
5 changes: 5 additions & 0 deletions README_DOCKER_BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ETHEREUMETL_STREAMING_VERSION=1.0-test
docker build --platform linux/x86_64 -t merklescience/ethereum-etl-doppler:${ETHEREUMETL_STREAMING_VERSION} -f Dockerfile_with_streaming .
docker tag merklescience/ethereum-etl-doppler:${ETHEREUMETL_STREAMING_VERSION} us.gcr.io/staging-btc-etl/merklescience/ethereum-etl-doppler:${ETHEREUMETL_STREAMING_VERSION}
docker push us.gcr.io/staging-btc-etl/merklescience/ethereum-etl-doppler:${ETHEREUMETL_STREAMING_VERSION}

110 changes: 110 additions & 0 deletions blockchainetl/jobs/exporters/kafka_item_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, [email protected]
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
from confluent_kafka import Producer
from timeout_decorator import timeout_decorator

import logging

import socket
import json


class KafkaItemExporter:
def __init__(
self, item_type_to_topic_mapping, message_attributes=("item_id",)
) -> None:
logging.basicConfig(
level=logging.INFO,
filename="message-publish.log",
format='{"time" : "%(asctime)s", "level" : "%(levelname)s" , "message" : "%(message)s"}',
)

conf = {
"bootstrap.servers": os.getenv("CONFLUENT_BROKER"),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"client.id": socket.gethostname(),
"message.max.bytes": 5242880,
"sasl.username": os.getenv("KAFKA_PRODUCER_KEY"),
"sasl.password": os.getenv("KAFKA_PRODUCER_PASSWORD")
}

producer = Producer(conf)
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.producer = producer
self.logging = logging.getLogger(__name__)
self.message_attributes = message_attributes

def open(self):
pass

def export_items(self, items):
try:
self._export_items_with_timeout(items)
except timeout_decorator.TimeoutError as e:
logging.info("Recreating Pub/Sub publisher.")
raise e

@timeout_decorator.timeout(300)
def _export_items_with_timeout(self, items):
for item in items:
self.export_item(item)

def export_item(self, item):
item_type = item.get("type")
# logging.info("publishing " + item_type)
has_item_type = item_type is not None
if has_item_type and item_type in self.item_type_to_topic_mapping:
data = json.dumps(item).encode("utf-8")
topic = self.item_type_to_topic_mapping[item_type]
message_future = self.write_txns(key=item.get("token_address"),
value=data.decode("utf-8"),
topic=topic)
return message_future
else:
logging.error('Topic for item type "{item_type}" is not configured.')

def get_message_attributes(self, item):
attributes = {}

for attr_name in self.message_attributes:
if item.get(attr_name) is not None:
attributes[attr_name] = item.get(attr_name)

return attributes

def close(self):
self.producer.flush()
pass

def write_txns(self, key: str, value: str, topic: str):
def acked(err, msg):
if err is not None:
self.logging.error('%% Message failed delivery: %s\n' % err)

try:
self.producer.produce(topic, key=key, value=value, callback=acked)
except BufferError:
self.logging.error('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
len(self.producer))
self.producer.poll(0)
29 changes: 18 additions & 11 deletions blockchainetl/streaming/streaming_utils.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@
from blockchainetl.logging_utils import logging_basic_config


def get_item_exporter(output):
if output is not None:
def get_item_exporter(output, topic_prefix, topic_suffix):
item_type_to_topic_mapping = {
"block": topic_prefix + "-blocks-" + topic_suffix,
"transaction": topic_prefix + "-transactions-" + topic_suffix,
"log": topic_prefix + "-logs-" + topic_suffix,
"token_transfer": topic_prefix + "-token_transfers-" + topic_suffix,
"trace": topic_prefix + "-traces-" + topic_suffix,
"contract": topic_prefix + "-contracts-" + topic_suffix,
"token": topic_prefix + "-tokens-" + topic_suffix,
}

if output == "gcp":
from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter
item_exporter = GooglePubSubItemExporter(item_type_to_topic_mapping={
'block': output + '.blocks',
'transaction': output + '.transactions',
'log': output + '.logs',
'token_transfer': output + '.token_transfers',
'trace': output + '.traces',
'contract': output + '.contracts',
'token': output + '.tokens',
})
item_exporter = GooglePubSubItemExporter(item_type_to_topic_mapping)

elif output == "kafka":
from blockchainetl.jobs.exporters.kafka_item_exporter import KafkaItemExporter
item_exporter = KafkaItemExporter(item_type_to_topic_mapping)

else:
item_exporter = ConsoleItemExporter()

Expand Down
Loading