-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathmigrations.go
139 lines (119 loc) · 3.08 KB
/
migrations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package db
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/clickhouse"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
)
func RunMigrations() error {
storageConfigs := []struct {
Type string
Config *config.ClickhouseConfig
}{
{Type: "orchestrator", Config: config.Cfg.Storage.Orchestrator.Clickhouse},
{Type: "staging", Config: config.Cfg.Storage.Staging.Clickhouse},
{Type: "main", Config: config.Cfg.Storage.Main.Clickhouse},
}
groupedConfigs := make(map[string][]struct {
Type string
Config *config.ClickhouseConfig
})
for _, cfg := range storageConfigs {
key := fmt.Sprintf("%s:%d:%s", cfg.Config.Host, cfg.Config.Port, cfg.Config.Database)
groupedConfigs[key] = append(groupedConfigs[key], cfg)
}
removeTmpMigrations() // just in case
for _, cfgs := range groupedConfigs {
var types []string
for _, cfg := range cfgs {
copyMigrationsToTmp([]string{"db/ch_migrations/" + cfg.Type})
types = append(types, cfg.Type)
}
log.Info().Msgf("Running Clickhouse migrations for %s", strings.Join(types, ", "))
runClickhouseMigrations(cfgs[0].Config)
removeTmpMigrations()
log.Info().Msgf("Clickhouse migration completed for %s", strings.Join(types, ", "))
}
log.Info().Msg("All Clickhouse migrations completed")
return nil
}
func runClickhouseMigrations(cfg *config.ClickhouseConfig) error {
if cfg.Host == "" {
return nil
}
secureParam := "&secure=true"
if cfg.DisableTLS {
secureParam = "&secure=false"
}
url := fmt.Sprintf("clickhouse://%s:%d/%s?username=%s&password=%s%s&x-multi-statement=true&x-migrations-table-engine=MergeTree",
cfg.Host,
cfg.Port,
cfg.Database,
cfg.Username,
cfg.Password,
secureParam,
)
m, err := migrate.New("file://db/ch_migrations/tmp", url)
if err != nil {
return err
}
m.Up()
m.Close()
return nil
}
func copyMigrationsToTmp(sources []string) error {
destination := "db/ch_migrations/tmp"
err := os.MkdirAll(destination, os.ModePerm)
if err != nil {
return err
}
for _, source := range sources {
filepath.Walk(source, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// skip directories
if info.IsDir() {
return nil
}
// determine destination path
relPath, err := filepath.Rel(source, path)
if err != nil {
return err
}
destPath := filepath.Join(destination, relPath)
if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
return err
}
return copyFile(path, destPath)
})
}
return nil
}
func copyFile(src string, dst string) error {
srcFile, err := os.Open(src)
if err != nil {
return err
}
defer srcFile.Close()
dstFile, err := os.Create(dst)
if err != nil {
return err
}
defer dstFile.Close()
_, err = io.Copy(dstFile, srcFile)
return err
}
func removeTmpMigrations() error {
err := os.RemoveAll("db/ch_migrations/tmp")
if err != nil {
return fmt.Errorf("error removing directory: %w", err)
}
return nil
}