forked from Adamant-im/ETH-transactions-storage
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathethsync.py
130 lines (110 loc) · 4.75 KB
/
ethsync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Indexer for Ethereum to get transaction list by ETH address
# https://github.com/Adamant-im/ETH-transactions-storage
# By Artem Brunov, Aleksei Lebedev
# 2020-2021 ADAMANT Foundation
# 2017-2020 ADAMANT TECH LABS LP
# v1.2
from web3 import Web3
import psycopg2
import time
import sys
import logging
#from systemd.journal import JournalHandler
# Get postgre database name
if len(sys.argv) < 2:
print('Add postgre database name as an argument')
exit()
dbname = sys.argv[1]
# Connect to geth node
#web3 = Web3(Web3.IPCProvider("/home/geth/.ethereum/geth.ipc"))
# Or connect to openethereum node
web3 = Web3(Web3.IPCProvider("/home/parity/.local/share/openethereum/jsonrpc.ipc"))
# Start logger
logger = logging.getLogger("EthIndexerLog")
logger.setLevel(logging.INFO)
# File logger
lfh = logging.FileHandler("/var/log/ethindexer.log")
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
lfh.setFormatter(formatter)
logger.addHandler(lfh)
# Systemd logger, if we want to user journalctl logs
# Install systemd-python and
# decomment "#from systemd.journal import JournalHandler" up
#ljc = JournalHandler()
#formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
#ljc.setFormatter(formatter)
#logger.addHandler(ljc)
try:
conn = psycopg2.connect("dbname=" + dbname)
conn.autocommit = True
logger.info("Connected to the database")
except:
logger.error("Unable to connect to database")
# Delete last block as it may be not imparted in full
cur = conn.cursor()
cur.execute('DELETE FROM public.ethtxs WHERE block = (SELECT Max(block) from public.ethtxs)')
cur.close()
conn.close()
# Wait for the node to be in sync before indexing
logger.info("Waiting Ethereum node to be in sync...")
while web3.eth.syncing != False:
# Change with the time, in second, do you want to wait
# before cheking again, default is 5 minutes
time.sleep(300)
logger.info("Ethereum node is synced!")
# Adds all transactions from Ethereum block
def insertion(blockid, tr):
time = web3.eth.getBlock(blockid)['timestamp']
for x in range(0, tr):
trans = web3.eth.getTransactionByBlock(blockid, x)
# Save also transaction status, should be null if pre byzantium blocks
status = bool(web3.eth.get_transaction_receipt(trans['hash']).status)
txhash = trans['hash']
value = trans['value']
inputinfo = trans['input']
# Check if transaction is a contract transfer
if (value == 0 and not inputinfo.startswith('0xa9059cbb')):
continue
fr = trans['from']
to = trans['to']
gasprice = trans['gasPrice']
gas = web3.eth.getTransactionReceipt(trans['hash'])['gasUsed']
contract_to = ''
contract_value = ''
# Check if transaction is a contract transfer
if inputinfo.startswith('0xa9059cbb'):
contract_to = inputinfo[10:-64]
contract_value = inputinfo[74:]
# Correct contract transfer transaction represents '0x' + 4 bytes 'a9059cbb' + 32 bytes (64 chars) for contract address and 32 bytes for its value
# Some buggy txs can break up Indexer, so we'll filter it
if len(contract_to) > 128:
logger.info('Skipping ' + str(txhash) + ' tx. Incorrect contract_to length: ' + str(len(contract_to)))
contract_to = ''
contract_value = ''
cur.execute(
'INSERT INTO public.ethtxs(time, txfrom, txto, value, gas, gasprice, block, txhash, contract_to, contract_value, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
(time, fr, to, value, gas, gasprice, blockid, txhash, contract_to, contract_value, status))
# Fetch all of new (not in index) Ethereum blocks and add transactions to index
while True:
try:
conn = psycopg2.connect("dbname=" + dbname)
conn.autocommit = True
except:
logger.error("Unable to connect to database")
cur = conn.cursor()
cur.execute('SELECT Max(block) from public.ethtxs')
maxblockindb = cur.fetchone()[0]
# On first start, we index transactions from a block number you indicate. 10000000 is a sample.
if maxblockindb is None:
maxblockindb = 10000000
endblock = int(web3.eth.blockNumber)
logger.info('Current best block in index: ' + str(maxblockindb) + '; in Ethereum chain: ' + str(endblock))
for block in range(maxblockindb + 1, endblock):
transactions = web3.eth.getBlockTransactionCount(block)
if transactions > 0:
insertion(block, transactions)
else:
logger.info('Block ' + str(block) + ' does not contain transactions')
cur.close()
conn.close()
time.sleep(20)