From 69ba9f27c85d9bf84ac638be7d6b7e716dfd2062 Mon Sep 17 00:00:00 2001 From: Naveen Modi Date: Thu, 7 Sep 2023 16:49:07 +0530 Subject: [PATCH 1/4] updated the flatten logic --- blockchainetl/jobs/exporters/bitcoin_flatten.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/blockchainetl/jobs/exporters/bitcoin_flatten.py b/blockchainetl/jobs/exporters/bitcoin_flatten.py index 389331a..d685701 100644 --- a/blockchainetl/jobs/exporters/bitcoin_flatten.py +++ b/blockchainetl/jobs/exporters/bitcoin_flatten.py @@ -15,11 +15,11 @@ def flatten_transformation(payload_dict): for input in payload_dict["inputs"]: if not payload_dict["is_coinbase"]: if output["value"] > 0: - token_outgoing_value = Decimal((1e-8 * input["value"]) * (1e-8 * output["value"]) / (1e-8 * payload_dict["output_value"])) + token_outgoing_value = Decimal((input["value"]) * (output["value"]) / (payload_dict["output_value"])) else: - token_outgoing_value = Decimal((1e-8 * input["value"]) / payload_dict["output_count"]) + token_outgoing_value = Decimal((input["value"]) / payload_dict["output_count"]) if input["value"] > 0: - token_incoming_value = Decimal((1e-8 * input["value"]) * (1e-8 * output["value"]) / (1e-8 * payload_dict["input_value"])) + token_incoming_value = Decimal((input["value"]) * (output["value"]) / (payload_dict["input_value"])) else: token_incoming_value = 0 @@ -45,8 +45,8 @@ def flatten_transformation(payload_dict): "transaction_type": TYPE_BLOCK_REWARD, "sender_address": f"{NULL_ADDRESS_MINT}_{datetime.datetime.fromtimestamp(payload_dict['block_timestamp']).month}", "receiver_address": "|".join(output["addresses"]), - "token_outgoing_value": str(1e-8 * output["value"]), - "token_incoming_value": str(1e-8 * output["value"]), + "token_outgoing_value": str(output["value"]), + "token_incoming_value": str(output["value"]), "token_address": default_token_address, "token_outgoing_fee": str(0) }) From 68c8acf6db3160294d84eaf2387493b2feca4832 Mon Sep 17 00:00:00 2001 From: Naveen Modi Date: Sat, 9 Sep 2023 11:25:00 +0530 Subject: [PATCH 2/4] bitcoin flatten logic updated --- bitcoinetl/streaming/streaming_utils.py | 7 ++- .../jobs/exporters/bitcoin_flatten.py | 59 ++++++++++--------- last_synced_block.txt | 2 +- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/bitcoinetl/streaming/streaming_utils.py b/bitcoinetl/streaming/streaming_utils.py index 01d73dd..b46933e 100644 --- a/bitcoinetl/streaming/streaming_utils.py +++ b/bitcoinetl/streaming/streaming_utils.py @@ -13,7 +13,7 @@ def get_item_exporter(output,topic_mapping,chain): 'transaction': output + '.transactions' }, message_attributes=('item_id',)) - else: + elif item_exporter_type == ItemExporterType.CONSOLE: item_exporter = ConsoleItemExporter() elif item_exporter_type == ItemExporterType.KAFKA: @@ -36,6 +36,8 @@ def determine_item_exporter_type(output): return ItemExporterType.PUBSUB if output is not None and output.startswith('kafka'): return ItemExporterType.KAFKA + if output is not None and output.startswith('console'): + return ItemExporterType.CONSOLE else: return ItemExporterType.UNKNOWN @@ -43,4 +45,5 @@ def determine_item_exporter_type(output): class ItemExporterType: PUBSUB = 'pubsub' KAFKA = 'kafka' - UNKNOWN = 'unknown' \ No newline at end of file + UNKNOWN = 'unknown' + CONSOLE = 'console' \ No newline at end of file diff --git a/blockchainetl/jobs/exporters/bitcoin_flatten.py b/blockchainetl/jobs/exporters/bitcoin_flatten.py index d685701..1bb5c05 100644 --- a/blockchainetl/jobs/exporters/bitcoin_flatten.py +++ b/blockchainetl/jobs/exporters/bitcoin_flatten.py @@ -3,6 +3,8 @@ from decimal import Decimal + + def flatten_transformation(payload_dict): TYPE_EXTERNAL = 1 @@ -11,33 +13,34 @@ def flatten_transformation(payload_dict): TYPE_BLOCK_REWARD = 3 transformed_transactions = [] - for output in payload_dict["outputs"]: - for input in payload_dict["inputs"]: - if not payload_dict["is_coinbase"]: - if output["value"] > 0: - token_outgoing_value = Decimal((input["value"]) * (output["value"]) / (payload_dict["output_value"])) - else: - token_outgoing_value = Decimal((input["value"]) / payload_dict["output_count"]) - if input["value"] > 0: - token_incoming_value = Decimal((input["value"]) * (output["value"]) / (payload_dict["input_value"])) - else: - token_incoming_value = 0 - - token_outgoing_fee = token_outgoing_value - token_incoming_value + if not payload_dict["is_coinbase"]: + for output in payload_dict["outputs"]: + for input in payload_dict["inputs"]: + if output["value"] > 0: + token_outgoing_value = Decimal((input["value"]) * (output["value"]) / (payload_dict["output_value"])) + else: + token_outgoing_value = Decimal((input["value"]) / payload_dict["output_count"]) + if input["value"] > 0: + token_incoming_value = Decimal((input["value"]) * (output["value"]) / (payload_dict["input_value"])) + else: + token_incoming_value = 0 + + token_outgoing_fee = token_outgoing_value - token_incoming_value - transformed_transactions.append({ - "block": payload_dict["block_number"], - "transaction_id": payload_dict["hash"], - "transaction_ts": payload_dict["block_timestamp"], - "transaction_type": TYPE_EXTERNAL, - "sender_address": "|".join(input["addresses"]), - "receiver_address": "|".join(output["addresses"]), - "token_outgoing_value": str(float(token_outgoing_value)), - "token_address": default_token_address, - "token_incoming_value": str(float(token_incoming_value)), - "token_outgoing_fee": str(float(token_outgoing_fee)) - }) - else: + transformed_transactions.append({ + "block": payload_dict["block_number"], + "transaction_id": payload_dict["hash"], + "transaction_ts": payload_dict["block_timestamp"], + "transaction_type": TYPE_EXTERNAL, + "sender_address": "|".join(input["addresses"]), + "receiver_address": "|".join(output["addresses"]), + "token_outgoing_value": str(float(token_outgoing_value)), + "token_address": default_token_address, + "token_incoming_value": str(float(token_incoming_value)), + "token_outgoing_fee": str(float(token_outgoing_fee)) + }) + else: + for output in payload_dict["outputs"]: transformed_transactions.append({ "block": payload_dict["block_number"], "transaction_id": payload_dict["hash"], @@ -51,6 +54,4 @@ def flatten_transformation(payload_dict): "token_outgoing_fee": str(0) }) - - return transformed_transactions - \ No newline at end of file + return transformed_transactions \ No newline at end of file diff --git a/last_synced_block.txt b/last_synced_block.txt index c6bfcb7..27400d6 100644 --- a/last_synced_block.txt +++ b/last_synced_block.txt @@ -1 +1 @@ -804002 +806550 From a908753db783f9f3762b20bff5db0d49a5703752 Mon Sep 17 00:00:00 2001 From: "naveen.modi" Date: Mon, 29 Jul 2024 16:12:37 +0530 Subject: [PATCH 3/4] added compression type --- .idea/.gitignore | 3 + .idea/bitcoin-etl.iml | 15 +++++ .idea/inspectionProfiles/Project_Default.xml | 63 +++++++++++++++++++ .../inspectionProfiles/profiles_settings.xml | 6 ++ .idea/modules.xml | 8 +++ .idea/vcs.xml | 6 ++ .../jobs/exporters/kafka_exporter.py | 1 + 7 files changed, 102 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/bitcoin-etl.iml create mode 100644 .idea/inspectionProfiles/Project_Default.xml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/bitcoin-etl.iml b/.idea/bitcoin-etl.iml new file mode 100644 index 0000000..5fdd65b --- /dev/null +++ b/.idea/bitcoin-etl.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..acba126 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,63 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..0b204cf --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py index 5ffdf70..57396de 100644 --- a/blockchainetl/jobs/exporters/kafka_exporter.py +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -25,6 +25,7 @@ def __init__(self, output, item_type_to_topic_mapping, converters=()): "sasl.username": os.getenv("KAFKA_PRODUCER_KEY"), "sasl.password": os.getenv("KAFKA_PRODUCER_PASSWORD"), "queue.buffering.max.messages": 10000000, + "compression.type": "gzip" } self.producer = Producer(conf) From 2c80d9025ba7e01d910ade68ba9b49bbf009f172 Mon Sep 17 00:00:00 2001 From: "naveen.modi" Date: Tue, 18 Mar 2025 00:29:40 +0530 Subject: [PATCH 4/4] added transaction raw topic --- bitcoinetl/streaming/streaming_utils.py | 1 + blockchainetl/jobs/exporters/kafka_exporter.py | 1 + 2 files changed, 2 insertions(+) diff --git a/bitcoinetl/streaming/streaming_utils.py b/bitcoinetl/streaming/streaming_utils.py index b46933e..45a3f0e 100644 --- a/bitcoinetl/streaming/streaming_utils.py +++ b/bitcoinetl/streaming/streaming_utils.py @@ -22,6 +22,7 @@ def get_item_exporter(output,topic_mapping,chain): item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping={ 'block': f"producer-{chain}-blocks-hot", 'transaction': f"producer-{chain}-transactions-hot", + 'transaction_raw': f"producer-{chain}-transactions-raw-hot", }) else: item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping=topic_mapping) diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py index 57396de..bceef7f 100644 --- a/blockchainetl/jobs/exporters/kafka_exporter.py +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -48,6 +48,7 @@ def export_items(self, items): transformed_data = flatten_transformation(item) for data in transformed_data: self.export_item(data,item_type) + self.export_item(item,"transaction_raw") else: self.export_item(item,item_type) else: