Skip to content

automated clickhouse migrations #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/thirdweb-dev/indexer/db"
"github.com/thirdweb-dev/indexer/internal/orchestrator"
"github.com/thirdweb-dev/indexer/internal/rpc"
)
Expand All @@ -28,6 +29,11 @@ func RunOrchestrator(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("Failed to initialize RPC")
}

err = db.RunMigrations()
if err != nil {
log.Fatal().Err(err).Msg("Failed to run migrations")
}

orchestrator, err := orchestrator.NewOrchestrator(rpc)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create orchestrator")
Expand Down
14 changes: 14 additions & 0 deletions db/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Insight DB migrations

## Clickhouse

Migrations are managed using [golang-migrate](https://github.com/golang-migrate/migrate) and the migration files are located in the `ch_migrations` directory.

Each storage type (orchestrator, staging and main) has its own migrations.

To add a new migration, run
```
migrate create -ext sql -dir db/ch_migrations/<storage_type> <migration_name>
```

Migrations are run when the indexer is started.
4 changes: 4 additions & 0 deletions db/ch_migrations/main/20250114110057_init.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TABLE IF EXISTS blocks;
DROP TABLE IF EXISTS logs;
DROP TABLE IF EXISTS transactions;
DROP TABLE IF EXISTS traces;
142 changes: 142 additions & 0 deletions db/ch_migrations/main/20250114110057_init.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
-- create blocks table
CREATE TABLE IF NOT EXISTS blocks (
`chain_id` UInt256,
`number` UInt256,
`timestamp` UInt64 CODEC(Delta, ZSTD),
`hash` FixedString(66),
`parent_hash` FixedString(66),
`sha3_uncles` FixedString(66),
`nonce` FixedString(18),
`mix_hash` FixedString(66),
`miner` FixedString(42),
`state_root` FixedString(66),
`transactions_root` FixedString(66),
`receipts_root` FixedString(66),
`logs_bloom` String,
`size` UInt64,
`extra_data` String,
`difficulty` UInt256,
`total_difficulty` UInt256,
`transaction_count` UInt64,
`gas_limit` UInt256,
`gas_used` UInt256,
`withdrawals_root` Nullable(FixedString(66)),
`base_fee_per_gas` Nullable(UInt64),
`insert_timestamp` DateTime DEFAULT now(),
`is_deleted` UInt8 DEFAULT 0,
INDEX idx_timestamp timestamp TYPE minmax GRANULARITY 1,
INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, number)
PARTITION BY chain_id
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

-- create logs table
CREATE TABLE IF NOT EXISTS logs (
`chain_id` UInt256,
`block_number` UInt256,
`block_hash` FixedString(66),
`block_timestamp` UInt64 CODEC(Delta, ZSTD),
`transaction_hash` FixedString(66),
`transaction_index` UInt64,
`log_index` UInt64,
`address` FixedString(42),
`data` String,
`topic_0` String,
`topic_1` Nullable(String),
`topic_2` Nullable(String),
`topic_3` Nullable(String),
`insert_timestamp` DateTime DEFAULT now(),
`is_deleted` UInt8 DEFAULT 0,
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_address address TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic0 topic_0 TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic1 topic_1 TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic2 topic_2 TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic3 topic_3 TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number, transaction_hash, log_index)
PARTITION BY chain_id
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

-- create transactions table
CREATE TABLE IF NOT EXISTS transactions (
`chain_id` UInt256,
`hash` FixedString(66),
`nonce` UInt64,
`block_hash` FixedString(66),
`block_number` UInt256,
`block_timestamp` UInt64 CODEC(Delta, ZSTD),
`transaction_index` UInt64,
`from_address` FixedString(42),
`to_address` FixedString(42),
`value` UInt256,
`gas` UInt64,
`gas_price` UInt256,
`data` String,
`function_selector` FixedString(10),
`max_fee_per_gas` UInt128,
`max_priority_fee_per_gas` UInt128,
`transaction_type` UInt8,
`r` UInt256,
`s` UInt256,
`v` UInt256,
`access_list` Nullable(String),
`contract_address` Nullable(FixedString(42)),
`gas_used` Nullable(UInt64),
`cumulative_gas_used` Nullable(UInt64),
`effective_gas_price` Nullable(UInt256),
`blob_gas_used` Nullable(UInt64),
`blob_gas_price` Nullable(UInt256),
`logs_bloom` Nullable(String),
`status` Nullable(UInt64),
`is_deleted` UInt8 DEFAULT 0,
`insert_timestamp` DateTime DEFAULT now(),
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_function_selector function_selector TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number, hash)
PARTITION BY chain_id
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

-- create traces table
CREATE TABLE IF NOT EXISTS traces (
`chain_id` UInt256,
`block_number` UInt256,
`block_hash` FixedString(66),
`block_timestamp` UInt64 CODEC(Delta, ZSTD),
`transaction_hash` FixedString(66),
`transaction_index` UInt64,
`subtraces` Int64,
`trace_address` Array(Int64),
`type` LowCardinality(String),
`call_type` LowCardinality(String),
`error` Nullable(String),
`from_address` FixedString(42),
`to_address` FixedString(42),
`gas` UInt64,
`gas_used` UInt64,
`input` String,
`output` Nullable(String),
`value` UInt256,
`author` Nullable(FixedString(42)),
`reward_type` LowCardinality(Nullable(String)),
`refund_address` Nullable(FixedString(42)),
`is_deleted` UInt8 DEFAULT 0,
`insert_timestamp` DateTime DEFAULT now(),
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_type type TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number, transaction_hash, trace_address)
PARTITION BY chain_id
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
2 changes: 2 additions & 0 deletions db/ch_migrations/orchestrator/20250114110046_init.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS block_failures;
DROP TABLE IF EXISTS cursors;
22 changes: 22 additions & 0 deletions db/ch_migrations/orchestrator/20250114110046_init.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- create block failures table
CREATE TABLE IF NOT EXISTS block_failures (
`chain_id` UInt256,
`block_number` UInt256,
`last_error_timestamp` UInt64 CODEC(Delta, ZSTD),
`count` UInt16,
`reason` String,
`insert_timestamp` DateTime DEFAULT now(),
`is_deleted` UInt8 DEFAULT 0,
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number)
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

-- create cursors table
CREATE TABLE IF NOT EXISTS cursors (
`chain_id` UInt256,
`cursor_type` String,
`cursor_value` String,
`insert_timestamp` DateTime DEFAULT now(),
) ENGINE = ReplacingMergeTree(insert_timestamp)
ORDER BY (chain_id, cursor_type);
1 change: 1 addition & 0 deletions db/ch_migrations/staging/20250114110054_init.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS block_data;
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
CREATE TABLE block_data (
-- create staging table
CREATE TABLE IF NOT EXISTS block_data (
`chain_id` UInt256,
`block_number` UInt256,
`data` String,
Expand All @@ -8,4 +9,4 @@ CREATE TABLE block_data (
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, block_number)
PARTITION BY chain_id
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
139 changes: 139 additions & 0 deletions db/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package db

import (
"fmt"
"io"
"os"
"path/filepath"
"strings"

"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/clickhouse"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
)

func RunMigrations() error {
storageConfigs := []struct {
Type string
Config *config.ClickhouseConfig
}{
{Type: "orchestrator", Config: config.Cfg.Storage.Orchestrator.Clickhouse},
{Type: "staging", Config: config.Cfg.Storage.Staging.Clickhouse},
{Type: "main", Config: config.Cfg.Storage.Main.Clickhouse},
}

groupedConfigs := make(map[string][]struct {
Type string
Config *config.ClickhouseConfig
})
for _, cfg := range storageConfigs {
key := fmt.Sprintf("%s:%d:%s", cfg.Config.Host, cfg.Config.Port, cfg.Config.Database)
groupedConfigs[key] = append(groupedConfigs[key], cfg)
}

removeTmpMigrations() // just in case
for _, cfgs := range groupedConfigs {
var types []string
for _, cfg := range cfgs {
copyMigrationsToTmp([]string{"db/ch_migrations/" + cfg.Type})
types = append(types, cfg.Type)
}
log.Info().Msgf("Running Clickhouse migrations for %s", strings.Join(types, ", "))
runClickhouseMigrations(cfgs[0].Config)
removeTmpMigrations()
log.Info().Msgf("Clickhouse migration completed for %s", strings.Join(types, ", "))
}

log.Info().Msg("All Clickhouse migrations completed")

return nil
}

func runClickhouseMigrations(cfg *config.ClickhouseConfig) error {
if cfg.Host == "" {
return nil
}

secureParam := "&secure=true"
if cfg.DisableTLS {
secureParam = "&secure=false"
}

url := fmt.Sprintf("clickhouse://%s:%d/%s?username=%s&password=%s%s&x-multi-statement=true&x-migrations-table-engine=MergeTree",
cfg.Host,
cfg.Port,
cfg.Database,
cfg.Username,
cfg.Password,
secureParam,
)

m, err := migrate.New("file://db/ch_migrations/tmp", url)
if err != nil {
return err
}
m.Up()

m.Close()

return nil
}

func copyMigrationsToTmp(sources []string) error {
destination := "db/ch_migrations/tmp"
err := os.MkdirAll(destination, os.ModePerm)
if err != nil {
return err
}

for _, source := range sources {
filepath.Walk(source, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// skip directories
if info.IsDir() {
return nil
}
// determine destination path
relPath, err := filepath.Rel(source, path)
if err != nil {
return err
}
destPath := filepath.Join(destination, relPath)
if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
return err
}

return copyFile(path, destPath)
})
}
return nil
}

func copyFile(src string, dst string) error {
srcFile, err := os.Open(src)
if err != nil {
return err
}
defer srcFile.Close()

dstFile, err := os.Create(dst)
if err != nil {
return err
}
defer dstFile.Close()

_, err = io.Copy(dstFile, srcFile)
return err
}

func removeTmpMigrations() error {
err := os.RemoveAll("db/ch_migrations/tmp")
if err != nil {
return fmt.Errorf("error removing directory: %w", err)
}
return nil
}
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ require (
github.com/ethereum/go-ethereum v1.14.8
github.com/gin-gonic/gin v1.10.0
github.com/go-redis/redis/v8 v8.11.5
github.com/golang-migrate/migrate/v4 v4.18.1
github.com/gorilla/schema v1.4.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/lib/pq v1.10.9
github.com/prometheus/client_golang v1.20.4
github.com/rs/zerolog v1.33.0
github.com/spf13/cobra v1.8.1
Expand Down Expand Up @@ -57,6 +59,8 @@ require (
github.com/goccy/go-json v0.10.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.3.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down Expand Up @@ -100,8 +104,9 @@ require (
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/urfave/cli/v2 v2.27.4 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.10.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
Expand Down
Loading
Loading