Skip to content

Commit 39afb02

Browse files
committed
automated clickhouse migrations
1 parent 7e42585 commit 39afb02

17 files changed

+359
-164
lines changed

cmd/orchestrator.go

+6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/prometheus/client_golang/prometheus/promhttp"
77
"github.com/rs/zerolog/log"
88
"github.com/spf13/cobra"
9+
"github.com/thirdweb-dev/indexer/db"
910
"github.com/thirdweb-dev/indexer/internal/orchestrator"
1011
"github.com/thirdweb-dev/indexer/internal/rpc"
1112
)
@@ -28,6 +29,11 @@ func RunOrchestrator(cmd *cobra.Command, args []string) {
2829
log.Fatal().Err(err).Msg("Failed to initialize RPC")
2930
}
3031

32+
err = db.RunMigrations()
33+
if err != nil {
34+
log.Fatal().Err(err).Msg("Failed to run migrations")
35+
}
36+
3137
orchestrator, err := orchestrator.NewOrchestrator(rpc)
3238
if err != nil {
3339
log.Fatal().Err(err).Msg("Failed to create orchestrator")

db/README.md

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Insight DB migrations
2+
3+
## Clickhouse
4+
5+
Migrations are managed using [golang-migrate](https://github.com/golang-migrate/migrate) and the migration files are located in the `ch_migrations` directory.
6+
7+
Each storage type (orchestrator, staging and main) has its own migrations.
8+
9+
To add a new migration, run
10+
```
11+
migrate create -ext sql -dir db/ch_migrations/<storage_type> <migration_name>
12+
```
13+
14+
Migrations are run when the indexer is started.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
DROP TABLE IF EXISTS blocks;
2+
DROP TABLE IF EXISTS logs;
3+
DROP TABLE IF EXISTS transactions;
4+
DROP TABLE IF EXISTS traces;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
-- create blocks table
2+
CREATE TABLE IF NOT EXISTS blocks (
3+
`chain_id` UInt256,
4+
`number` UInt256,
5+
`timestamp` UInt64 CODEC(Delta, ZSTD),
6+
`hash` FixedString(66),
7+
`parent_hash` FixedString(66),
8+
`sha3_uncles` FixedString(66),
9+
`nonce` FixedString(18),
10+
`mix_hash` FixedString(66),
11+
`miner` FixedString(42),
12+
`state_root` FixedString(66),
13+
`transactions_root` FixedString(66),
14+
`receipts_root` FixedString(66),
15+
`logs_bloom` String,
16+
`size` UInt64,
17+
`extra_data` String,
18+
`difficulty` UInt256,
19+
`total_difficulty` UInt256,
20+
`transaction_count` UInt64,
21+
`gas_limit` UInt256,
22+
`gas_used` UInt256,
23+
`withdrawals_root` Nullable(FixedString(66)),
24+
`base_fee_per_gas` Nullable(UInt64),
25+
`insert_timestamp` DateTime DEFAULT now(),
26+
`is_deleted` UInt8 DEFAULT 0,
27+
INDEX idx_timestamp timestamp TYPE minmax GRANULARITY 1,
28+
INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1,
29+
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
30+
ORDER BY (chain_id, number)
31+
PARTITION BY chain_id
32+
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
33+
34+
-- create logs table
35+
CREATE TABLE IF NOT EXISTS logs (
36+
`chain_id` UInt256,
37+
`block_number` UInt256,
38+
`block_hash` FixedString(66),
39+
`block_timestamp` UInt64 CODEC(Delta, ZSTD),
40+
`transaction_hash` FixedString(66),
41+
`transaction_index` UInt64,
42+
`log_index` UInt64,
43+
`address` FixedString(42),
44+
`data` String,
45+
`topic_0` String,
46+
`topic_1` Nullable(String),
47+
`topic_2` Nullable(String),
48+
`topic_3` Nullable(String),
49+
`insert_timestamp` DateTime DEFAULT now(),
50+
`is_deleted` UInt8 DEFAULT 0,
51+
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
52+
INDEX idx_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 1,
53+
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
54+
INDEX idx_address address TYPE bloom_filter GRANULARITY 1,
55+
INDEX idx_topic0 topic_0 TYPE bloom_filter GRANULARITY 1,
56+
INDEX idx_topic1 topic_1 TYPE bloom_filter GRANULARITY 1,
57+
INDEX idx_topic2 topic_2 TYPE bloom_filter GRANULARITY 1,
58+
INDEX idx_topic3 topic_3 TYPE bloom_filter GRANULARITY 1,
59+
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
60+
ORDER BY (chain_id, block_number, transaction_hash, log_index)
61+
PARTITION BY chain_id
62+
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
63+
64+
-- create transactions table
65+
CREATE TABLE IF NOT EXISTS transactions (
66+
`chain_id` UInt256,
67+
`hash` FixedString(66),
68+
`nonce` UInt64,
69+
`block_hash` FixedString(66),
70+
`block_number` UInt256,
71+
`block_timestamp` UInt64 CODEC(Delta, ZSTD),
72+
`transaction_index` UInt64,
73+
`from_address` FixedString(42),
74+
`to_address` FixedString(42),
75+
`value` UInt256,
76+
`gas` UInt64,
77+
`gas_price` UInt256,
78+
`data` String,
79+
`function_selector` FixedString(10),
80+
`max_fee_per_gas` UInt128,
81+
`max_priority_fee_per_gas` UInt128,
82+
`transaction_type` UInt8,
83+
`r` UInt256,
84+
`s` UInt256,
85+
`v` UInt256,
86+
`access_list` Nullable(String),
87+
`contract_address` Nullable(FixedString(42)),
88+
`gas_used` Nullable(UInt64),
89+
`cumulative_gas_used` Nullable(UInt64),
90+
`effective_gas_price` Nullable(UInt256),
91+
`blob_gas_used` Nullable(UInt64),
92+
`blob_gas_price` Nullable(UInt256),
93+
`logs_bloom` Nullable(String),
94+
`status` Nullable(UInt64),
95+
`is_deleted` UInt8 DEFAULT 0,
96+
`insert_timestamp` DateTime DEFAULT now(),
97+
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
98+
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
99+
INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1,
100+
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1,
101+
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1,
102+
INDEX idx_function_selector function_selector TYPE bloom_filter GRANULARITY 1,
103+
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
104+
ORDER BY (chain_id, block_number, hash)
105+
PARTITION BY chain_id
106+
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
107+
108+
-- create traces table
109+
CREATE TABLE IF NOT EXISTS traces (
110+
`chain_id` UInt256,
111+
`block_number` UInt256,
112+
`block_hash` FixedString(66),
113+
`block_timestamp` UInt64 CODEC(Delta, ZSTD),
114+
`transaction_hash` FixedString(66),
115+
`transaction_index` UInt64,
116+
`subtraces` Int64,
117+
`trace_address` Array(Int64),
118+
`type` LowCardinality(String),
119+
`call_type` LowCardinality(String),
120+
`error` Nullable(String),
121+
`from_address` FixedString(42),
122+
`to_address` FixedString(42),
123+
`gas` UInt64,
124+
`gas_used` UInt64,
125+
`input` String,
126+
`output` Nullable(String),
127+
`value` UInt256,
128+
`author` Nullable(FixedString(42)),
129+
`reward_type` LowCardinality(Nullable(String)),
130+
`refund_address` Nullable(FixedString(42)),
131+
`is_deleted` UInt8 DEFAULT 0,
132+
`insert_timestamp` DateTime DEFAULT now(),
133+
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
134+
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
135+
INDEX idx_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 1,
136+
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1,
137+
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1,
138+
INDEX idx_type type TYPE bloom_filter GRANULARITY 1,
139+
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
140+
ORDER BY (chain_id, block_number, transaction_hash, trace_address)
141+
PARTITION BY chain_id
142+
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
DROP TABLE IF EXISTS block_failures;
2+
DROP TABLE IF EXISTS cursors;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- create block failures table
2+
CREATE TABLE IF NOT EXISTS block_failures (
3+
`chain_id` UInt256,
4+
`block_number` UInt256,
5+
`last_error_timestamp` UInt64 CODEC(Delta, ZSTD),
6+
`count` UInt16,
7+
`reason` String,
8+
`insert_timestamp` DateTime DEFAULT now(),
9+
`is_deleted` UInt8 DEFAULT 0,
10+
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
11+
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
12+
ORDER BY (chain_id, block_number)
13+
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
14+
15+
-- create cursors table
16+
CREATE TABLE IF NOT EXISTS cursors (
17+
`chain_id` UInt256,
18+
`cursor_type` String,
19+
`cursor_value` String,
20+
`insert_timestamp` DateTime DEFAULT now(),
21+
) ENGINE = ReplacingMergeTree(insert_timestamp)
22+
ORDER BY (chain_id, cursor_type);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE IF EXISTS block_data;
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
CREATE TABLE block_data (
1+
-- create staging table
2+
CREATE TABLE IF NOT EXISTS block_data (
23
`chain_id` UInt256,
34
`block_number` UInt256,
45
`data` String,
@@ -8,4 +9,4 @@ CREATE TABLE block_data (
89
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
910
ORDER BY (chain_id, block_number)
1011
PARTITION BY chain_id
11-
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
12+
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

db/migrations.go

+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package db
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"os"
7+
"path/filepath"
8+
"strings"
9+
10+
"github.com/golang-migrate/migrate/v4"
11+
_ "github.com/golang-migrate/migrate/v4/database/clickhouse"
12+
_ "github.com/golang-migrate/migrate/v4/source/file"
13+
"github.com/rs/zerolog/log"
14+
config "github.com/thirdweb-dev/indexer/configs"
15+
)
16+
17+
func RunMigrations() error {
18+
storageConfigs := []struct {
19+
Type string
20+
Config *config.ClickhouseConfig
21+
}{
22+
{Type: "orchestrator", Config: config.Cfg.Storage.Orchestrator.Clickhouse},
23+
{Type: "staging", Config: config.Cfg.Storage.Staging.Clickhouse},
24+
{Type: "main", Config: config.Cfg.Storage.Main.Clickhouse},
25+
}
26+
27+
groupedConfigs := make(map[string][]struct {
28+
Type string
29+
Config *config.ClickhouseConfig
30+
})
31+
for _, cfg := range storageConfigs {
32+
key := fmt.Sprintf("%s:%d:%s", cfg.Config.Host, cfg.Config.Port, cfg.Config.Database)
33+
groupedConfigs[key] = append(groupedConfigs[key], cfg)
34+
}
35+
36+
removeTmpMigrations() // just in case
37+
for _, cfgs := range groupedConfigs {
38+
var types []string
39+
for _, cfg := range cfgs {
40+
copyMigrationsToTmp([]string{"db/ch_migrations/" + cfg.Type})
41+
types = append(types, cfg.Type)
42+
}
43+
log.Info().Msgf("Running Clickhouse migrations for %s", strings.Join(types, ", "))
44+
runClickhouseMigrations(cfgs[0].Config)
45+
removeTmpMigrations()
46+
log.Info().Msgf("Clickhouse migration completed for %s", strings.Join(types, ", "))
47+
}
48+
49+
log.Info().Msg("All Clickhouse migrations completed")
50+
51+
return nil
52+
}
53+
54+
func runClickhouseMigrations(cfg *config.ClickhouseConfig) error {
55+
if cfg.Host == "" {
56+
return nil
57+
}
58+
59+
secureParam := "&secure=true"
60+
if cfg.DisableTLS {
61+
secureParam = "&secure=false"
62+
}
63+
64+
url := fmt.Sprintf("clickhouse://%s:%d/%s?username=%s&password=%s%s&x-multi-statement=true&x-migrations-table-engine=MergeTree",
65+
cfg.Host,
66+
cfg.Port,
67+
cfg.Database,
68+
cfg.Username,
69+
cfg.Password,
70+
secureParam,
71+
)
72+
73+
m, err := migrate.New("file://db/ch_migrations/tmp", url)
74+
if err != nil {
75+
return err
76+
}
77+
m.Up()
78+
79+
m.Close()
80+
81+
return nil
82+
}
83+
84+
func copyMigrationsToTmp(sources []string) error {
85+
destination := "db/ch_migrations/tmp"
86+
err := os.MkdirAll(destination, os.ModePerm)
87+
if err != nil {
88+
return err
89+
}
90+
91+
for _, source := range sources {
92+
filepath.Walk(source, func(path string, info os.FileInfo, err error) error {
93+
if err != nil {
94+
return err
95+
}
96+
// skip directories
97+
if info.IsDir() {
98+
return nil
99+
}
100+
// determine destination path
101+
relPath, err := filepath.Rel(source, path)
102+
if err != nil {
103+
return err
104+
}
105+
destPath := filepath.Join(destination, relPath)
106+
if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
107+
return err
108+
}
109+
110+
return copyFile(path, destPath)
111+
})
112+
}
113+
return nil
114+
}
115+
116+
func copyFile(src string, dst string) error {
117+
srcFile, err := os.Open(src)
118+
if err != nil {
119+
return err
120+
}
121+
defer srcFile.Close()
122+
123+
dstFile, err := os.Create(dst)
124+
if err != nil {
125+
return err
126+
}
127+
defer dstFile.Close()
128+
129+
_, err = io.Copy(dstFile, srcFile)
130+
return err
131+
}
132+
133+
func removeTmpMigrations() error {
134+
err := os.RemoveAll("db/ch_migrations/tmp")
135+
if err != nil {
136+
return fmt.Errorf("error removing directory: %w", err)
137+
}
138+
return nil
139+
}

go.mod

+7-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ require (
77
github.com/ethereum/go-ethereum v1.14.8
88
github.com/gin-gonic/gin v1.10.0
99
github.com/go-redis/redis/v8 v8.11.5
10+
github.com/golang-migrate/migrate/v4 v4.18.1
1011
github.com/gorilla/schema v1.4.1
1112
github.com/hashicorp/golang-lru/v2 v2.0.7
13+
github.com/lib/pq v1.10.9
1214
github.com/prometheus/client_golang v1.20.4
1315
github.com/rs/zerolog v1.33.0
1416
github.com/spf13/cobra v1.8.1
@@ -57,6 +59,8 @@ require (
5759
github.com/goccy/go-json v0.10.3 // indirect
5860
github.com/google/uuid v1.6.0 // indirect
5961
github.com/gorilla/websocket v1.4.2 // indirect
62+
github.com/hashicorp/errwrap v1.1.0 // indirect
63+
github.com/hashicorp/go-multierror v1.1.1 // indirect
6064
github.com/hashicorp/hcl v1.0.0 // indirect
6165
github.com/holiman/uint256 v1.3.1 // indirect
6266
github.com/inconshreveable/mousetrap v1.1.0 // indirect
@@ -100,8 +104,9 @@ require (
100104
github.com/ugorji/go/codec v1.2.12 // indirect
101105
github.com/urfave/cli/v2 v2.27.4 // indirect
102106
github.com/yusufpapurcu/wmi v1.2.3 // indirect
103-
go.opentelemetry.io/otel v1.26.0 // indirect
104-
go.opentelemetry.io/otel/trace v1.26.0 // indirect
107+
go.opentelemetry.io/otel v1.29.0 // indirect
108+
go.opentelemetry.io/otel/trace v1.29.0 // indirect
109+
go.uber.org/atomic v1.11.0 // indirect
105110
go.uber.org/multierr v1.11.0 // indirect
106111
golang.org/x/arch v0.10.0 // indirect
107112
golang.org/x/crypto v0.27.0 // indirect

0 commit comments

Comments
 (0)