@@ -13,22 +13,77 @@ import (
13
13
"github.com/opencost/opencost/core/pkg/opencost"
14
14
)
15
15
16
- type SnowFlakeClient struct {
16
+ // SnowflakeClient defines the interface for interacting with Snowflake
17
+ type SnowflakeClient interface {
18
+ ExecuteQuery (query string ) (* sql.Rows , error )
17
19
}
20
+
21
+ // snowflakeClient is the concrete implementation of the SnowflakeClient interface
22
+ type snowflakeClient struct {
23
+ db * sql.DB
24
+ }
25
+
26
+ // NewSnowflakeClient creates and returns a new SnowflakeClient instance
27
+ func NewSnowflakeClient (snowflakeConfig * snowflakeconfig.SnowflakeConfig ) (SnowflakeClient , error ) {
28
+ dsn := fmt .Sprintf ("user=%s password=%s account=%s db=%s schema=%s warehouse=%s" ,
29
+ snowflakeConfig .Username ,
30
+ snowflakeConfig .Password ,
31
+ snowflakeConfig .Account ,
32
+ snowflakeConfig .Database ,
33
+ snowflakeConfig .Schema ,
34
+ snowflakeConfig .Warehouse )
35
+
36
+ // Open a connection to Snowflake
37
+ db , err := sql .Open ("snowflake" , dsn )
38
+ if err != nil {
39
+ return nil , err
40
+ }
41
+
42
+ // Check if the connection is alive
43
+ if err = db .Ping (); err != nil {
44
+ return nil , err
45
+ }
46
+
47
+ return & snowflakeClient {db : db }, nil
48
+ }
49
+
50
+ // ExecuteQuery executes a SQL query and returns the resulting rows
51
+ func (s * snowflakeClient ) ExecuteQuery (query string ) (* sql.Rows , error ) {
52
+ return s .db .Query (query )
53
+ }
54
+
18
55
type SnowflakeCostSource struct {
19
- snowflakeClient SnowFlakeClient
56
+ snowflakeClient SnowflakeClient
20
57
}
21
58
22
59
// GetInvoices fetches invoices from Snowflake
23
- func GetInvoices (snowflakeClient SnowFlakeClient ) ([]snowflakeplugin.LineItem , error ) {
60
+ func GetInvoices (snowflakeClient SnowflakeClient ) ([]snowflakeplugin.LineItem , error ) {
61
+ // Example query
62
+ //TODO make sure that query maps to snowflakeplugin.LineItem
63
+ query := `
64
+ SELECT
65
+ date_trunc('day', start_time) AS usage_date,
66
+ SUM(credits_used) AS total_credits
67
+ FROM snowflake.account_usage.warehouse_metering_history
68
+ WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
69
+ GROUP BY usage_date
70
+ ORDER BY usage_date DESC
71
+ `
72
+
73
+ // Execute the query using the Snowflake client
74
+ rows , err := snowflakeClient .ExecuteQuery (query )
75
+ if err != nil {
76
+ log .Fatal ("Query execution failed:" , err )
77
+ return nil , err
78
+ }
79
+ defer rows .Close ()
24
80
// Implement the logic to fetch pending invoices from Snowflake
25
81
// This is a placeholder implementation
26
- return [] , nil
82
+ return rows , nil
27
83
}
28
84
29
85
func main () {
30
- //TODO get this information from the config file
31
- // Snowflake connection details
86
+
32
87
log .Debug ("Initializing Snowflake plugin" )
33
88
34
89
configFile , err := commonconfig .GetConfigFilePath ()
@@ -40,60 +95,36 @@ func main() {
40
95
if err != nil {
41
96
log .Fatalf ("error building Atlas config: %v" , err )
42
97
}
43
- log .SetLogLevel (snowflakeConfig .LogLevel )
44
- dsn := fmt .Sprintf ("user=%s password=%s account=%s db=%s schema=%s warehouse=%s" ,
45
- snowflakeConfig .Username , // Replace with your Snowflake username
46
- snowflakeConfig .Password , // Replace with your Snowflake password
47
- snowflakeConfig .Account , // Replace with your Snowflake account name
48
- snowflakeConfig .Database , // Replace with your database name
49
- snowflakeConfig .Schema , // Replace with your schema name
50
- snowflakeConfig .Warehouse ) // Replace with your warehouse name
98
+ log .SetLogLevel ("info" ) //default
99
+ if snowflakeConfig .LogLevel != "" {
100
+ log .SetLogLevel (snowflakeConfig .LogLevel )
51
101
52
- // Open a connection to Snowflake
53
- db , err := sql .Open ("snowflake" , dsn )
54
- if err != nil {
55
- log .Fatal (err )
56
102
}
57
- defer db .Close ()
58
103
59
- // Check if the connection is alive
60
- err = db . Ping ()
104
+ client , err := NewSnowflakeClient ( snowflakeConfig )
105
+
61
106
if err != nil {
62
- log .Fatal (err )
107
+ log .Fatal ("Failed to create Snowflake client:" , err )
63
108
}
64
-
65
- // Example query to fetch costs; adjust this query based on your needs and Snowflake's usage data
66
- query := `
67
- SELECT
68
- date_trunc('day', start_time) AS usage_date,
69
- SUM(credits_used) AS total_credits
70
- FROM snowflake.account_usage.warehouse_metering_history
71
- WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
72
- GROUP BY usage_date
73
- ORDER BY usage_date DESC
74
- `
75
-
76
- rows , err := db .Query (query )
77
- if err != nil {
78
- log .Fatal (err )
109
+ snowflakeCostSource := SnowflakeCostSource {
110
+ snowflakeClient : client
79
111
}
80
- defer rows .Close ()
112
+ defer client .( * snowflakeClient ). db .Close ()
81
113
82
- // Print the results
83
- fmt .Println ("Date\t \t Total Credits" )
84
- for rows .Next () {
85
- var date string
86
- var credits float64
87
- if err := rows .Scan (& date , & credits ); err != nil {
88
- log .Fatal (err )
89
- }
90
- fmt .Printf ("%s\t %.2f\n " , date , credits )
114
+ // pluginMap is the map of plugins we can dispense.
115
+ var pluginMap = map [string ]plugin.Plugin {
116
+ "CustomCostSource" : & ocplugin.CustomCostPlugin {Impl : & snowflakeCostSource },
91
117
}
92
118
93
- // Check for errors from iterating over rows
94
- if err = rows .Err (); err != nil {
95
- log .Fatal (err )
96
- }
119
+ plugin .Serve (& plugin.ServeConfig {
120
+ HandshakeConfig : handshakeConfig ,
121
+ Plugins : pluginMap ,
122
+ GRPCServer : plugin .DefaultGRPCServer ,
123
+ })
124
+
125
+
126
+
127
+
97
128
}
98
129
func (s * SnowflakeCostSource ) GetCustomCosts (req * pb.CustomCostRequest ) []* pb.CustomCostResponse {
99
130
results := []* pb.CustomCostResponse {}
@@ -127,6 +158,34 @@ func (s *SnowflakeCostSource) GetCustomCosts(req *pb.CustomCostRequest) []*pb.Cu
127
158
}
128
159
//TODO convert target to CustomCostResponse
129
160
for _ , target := range targets {
161
+ if target .Start ().After (time .Now ().UTC ()) {
162
+ log .Debugf ("skipping future window %v" , target )
163
+ continue
164
+ }
165
+
166
+ log .Debugf ("fetching atlas costs for window %v" , target )
167
+
168
+ // Print the results
169
+ fmt .Println ("Date\t \t Total Credits" )
170
+ for rows .Next () {
171
+ var date string
172
+ var credits float64
173
+ var warehouse string
174
+ if err := rows .Scan (& date , & credits , & warehouse ); err != nil {
175
+ log .Fatal (err )
176
+ }
177
+ //TODO extract lineItem into CustomCostResponse
178
+ //result := a.getAtlasCostsForWindow(&target, lineItems)
179
+
180
+ results = append (results , result )
181
+ //fmt.Printf("%s\t%.2f\n", date, credits)
182
+
183
+ }
184
+
185
+ // Check for errors from iterating over rows
186
+ if err = rows .Err (); err != nil {
187
+ log .Fatal (err )
188
+ }
130
189
131
190
}
132
191
0 commit comments