@@ -3,16 +3,30 @@ package main
3
3
import (
4
4
"database/sql"
5
5
"fmt"
6
+ "time"
6
7
8
+ "github.com/hashicorp/go-plugin"
7
9
commonconfig "github.com/opencost/opencost-plugins/common/config"
8
10
commonrequest "github.com/opencost/opencost-plugins/common/request"
9
11
snowflakeconfig "github.com/opencost/opencost-plugins/pkg/plugins/snowflake/config"
10
12
snowflakeplugin "github.com/opencost/opencost-plugins/pkg/plugins/snowflake/plugin"
11
13
"github.com/opencost/opencost/core/pkg/log"
12
14
"github.com/opencost/opencost/core/pkg/model/pb"
13
15
"github.com/opencost/opencost/core/pkg/opencost"
16
+ ocplugin "github.com/opencost/opencost/core/pkg/plugin"
17
+ "google.golang.org/protobuf/types/known/timestamppb"
14
18
)
15
19
20
+ // handshakeConfigs are used to just do a basic handshake between
21
+ // a plugin and host. If the handshake fails, a user friendly error is shown.
22
+ // This prevents users from executing bad plugins or executing a plugin
23
+ // directory. It is a UX feature, not a security feature.
24
+ var handshakeConfig = plugin.HandshakeConfig {
25
+ ProtocolVersion : 1 ,
26
+ MagicCookieKey : "PLUGIN_NAME" ,
27
+ MagicCookieValue : "snowflake" ,
28
+ }
29
+
16
30
// SnowflakeClient defines the interface for interacting with Snowflake
17
31
type SnowflakeClient interface {
18
32
ExecuteQuery (query string ) (* sql.Rows , error )
@@ -24,6 +38,17 @@ type snowflakeClient struct {
24
38
}
25
39
26
40
// NewSnowflakeClient creates and returns a new SnowflakeClient instance
41
+ // NewSnowflakeClient creates a new Snowflake client using the provided Snowflake configuration.
42
+ // It constructs a DSN (Data Source Name) string from the configuration and attempts to open a connection to Snowflake.
43
+ // If the connection is successful and the database is reachable, it returns a SnowflakeClient instance.
44
+ // If there is an error during the connection or ping process, it returns an error.
45
+ //
46
+ // Parameters:
47
+ // - snowflakeConfig: A pointer to a SnowflakeConfig struct containing the necessary configuration details.
48
+ //
49
+ // Returns:
50
+ // - SnowflakeClient: An instance of the SnowflakeClient interface if the connection is successful.
51
+ // - error: An error if there is an issue with the connection or ping process.
27
52
func NewSnowflakeClient (snowflakeConfig * snowflakeconfig.SnowflakeConfig ) (SnowflakeClient , error ) {
28
53
dsn := fmt .Sprintf ("user=%s password=%s account=%s db=%s schema=%s warehouse=%s" ,
29
54
snowflakeConfig .Username ,
@@ -58,32 +83,48 @@ type SnowflakeCostSource struct {
58
83
59
84
// GetInvoices fetches invoices from Snowflake
60
85
func GetInvoices (snowflakeClient SnowflakeClient ) ([]snowflakeplugin.LineItem , error ) {
61
- // Example query
62
- //TODO make sure that query maps to snowflakeplugin.LineItem
63
- query := `
64
- SELECT
65
- date_trunc('day', start_time) AS usage_date,
66
- SUM(credits_used) AS total_credits
67
- FROM snowflake.account_usage.warehouse_metering_history
68
- WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
69
- GROUP BY usage_date
70
- ORDER BY usage_date DESC
71
- `
86
+
87
+ query := snowflakeplugin .CreditsByWarehouse ()
72
88
73
89
// Execute the query using the Snowflake client
74
90
rows , err := snowflakeClient .ExecuteQuery (query )
75
91
if err != nil {
76
- log .Fatal ("Query execution failed:" , err )
92
+
93
+ log .Fatalf ("Query execution failed:" , err )
77
94
return nil , err
78
95
}
79
96
defer rows .Close ()
80
- // Implement the logic to fetch pending invoices from Snowflake
81
- // This is a placeholder implementation
82
- return rows , nil
97
+
98
+ var lineItems []snowflakeplugin.LineItem
99
+
100
+ for rows .Next () {
101
+ var warehouse string
102
+ var credits float32
103
+ var date string
104
+
105
+ if err := rows .Scan (& date , & warehouse , & credits ); err != nil {
106
+ log .Fatalf ("" , err )
107
+ }
108
+
109
+ lineItem := snowflakeplugin.LineItem {
110
+ WarehouseName : warehouse ,
111
+ CreditUsed : credits ,
112
+ Date : date ,
113
+ }
114
+
115
+ lineItems = append (lineItems , lineItem )
116
+ }
117
+
118
+ if err = rows .Err (); err != nil {
119
+ log .Fatalf ("" , err )
120
+ }
121
+
122
+ return lineItems , nil
123
+
83
124
}
84
125
85
126
func main () {
86
-
127
+
87
128
log .Debug ("Initializing Snowflake plugin" )
88
129
89
130
configFile , err := commonconfig .GetConfigFilePath ()
@@ -102,12 +143,12 @@ func main() {
102
143
}
103
144
104
145
client , err := NewSnowflakeClient (snowflakeConfig )
105
-
146
+
106
147
if err != nil {
107
- log .Fatal ("Failed to create Snowflake client:" , err )
148
+ log .Fatalf ("Failed to create Snowflake client:" , err )
108
149
}
109
150
snowflakeCostSource := SnowflakeCostSource {
110
- snowflakeClient : client
151
+ snowflakeClient : client ,
111
152
}
112
153
defer client .(* snowflakeClient ).db .Close ()
113
154
@@ -122,11 +163,9 @@ func main() {
122
163
GRPCServer : plugin .DefaultGRPCServer ,
123
164
})
124
165
125
-
126
-
127
-
128
166
}
129
167
func (s * SnowflakeCostSource ) GetCustomCosts (req * pb.CustomCostRequest ) []* pb.CustomCostResponse {
168
+
130
169
results := []* pb.CustomCostResponse {}
131
170
132
171
requestErrors := commonrequest .ValidateRequest (req )
@@ -164,31 +203,42 @@ func (s *SnowflakeCostSource) GetCustomCosts(req *pb.CustomCostRequest) []*pb.Cu
164
203
}
165
204
166
205
log .Debugf ("fetching atlas costs for window %v" , target )
167
-
168
- // Print the results
169
- fmt .Println ("Date\t \t Total Credits" )
170
- for rows .Next () {
171
- var date string
172
- var credits float64
173
- var warehouse string
174
- if err := rows .Scan (& date , & credits , & warehouse ); err != nil {
175
- log .Fatal (err )
176
- }
177
- //TODO extract lineItem into CustomCostResponse
178
- //result := a.getAtlasCostsForWindow(&target, lineItems)
206
+ result := s .GetSnowflakeCostsForWindow (& target , lineItems )
179
207
180
208
results = append (results , result )
181
- //fmt.Printf("%s\t%.2f\n", date, credits)
182
209
183
210
}
184
211
185
- // Check for errors from iterating over rows
186
- if err = rows .Err (); err != nil {
187
- log .Fatal (err )
188
- }
212
+ return results
189
213
214
+ }
215
+ func filterLineItemsByWindow (win * opencost.Window , lineItems []snowflakeplugin.LineItem ) []* pb.CustomCost {
216
+ var filteredItems []* pb.CustomCost
217
+ for _ , li := range lineItems {
218
+ if li .Date == win .Start ().Format ("2006-01-02 15:04:05" ) {
219
+ cost := & pb.CustomCost {
220
+ UsageQuantity : li .CreditUsed ,
221
+ ResourceName : li .WarehouseName ,
222
+ }
223
+ filteredItems = append (filteredItems , cost )
224
+ }
190
225
}
226
+ return filteredItems
227
+ }
191
228
192
- return results
193
-
229
+ func (s * SnowflakeCostSource ) GetSnowflakeCostsForWindow (win * opencost.Window , lineItems []snowflakeplugin.LineItem ) * pb.CustomCostResponse {
230
+
231
+ costsInWindow := filterLineItemsByWindow (win , lineItems )
232
+ resp := pb.CustomCostResponse {
233
+ Metadata : map [string ]string {"api_client_version" : "v1" },
234
+ CostSource : "data_storage" ,
235
+ Domain : "snowflake" ,
236
+ Version : "v1" ,
237
+ Currency : "USD" ,
238
+ Start : timestamppb .New (* win .Start ()),
239
+ End : timestamppb .New (* win .End ()),
240
+ Errors : []string {},
241
+ Costs : costsInWindow ,
242
+ }
243
+ return & resp
194
244
}
0 commit comments