diff --git a/go.mod b/go.mod index 774000a..29771c1 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( cloud.google.com/go/bigquery v1.45.0 github.com/googleapis/google-cloud-go-testing v0.0.0-20191008195207-8e1d251e947d - github.com/m-lab/go v0.1.66 + github.com/m-lab/go v0.1.67 github.com/prometheus/client_golang v1.11.1 github.com/spf13/afero v1.2.2 golang.org/x/net v0.0.0-20221014081412-f15817d10f9b diff --git a/go.sum b/go.sum index 83b6c38..550595e 100644 --- a/go.sum +++ b/go.sum @@ -120,6 +120,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/m-lab/go v0.1.66 h1:adDJILqKBCkd5YeVhCrrjWkjoNRtDzlDr6uizWu5/pE= github.com/m-lab/go v0.1.66/go.mod h1:O1D/EoVarJ8lZt9foANcqcKtwxHatBzUxXFFyC87aQQ= +github.com/m-lab/go v0.1.67 h1:jT9tLviED+w2GP6tp0qlB3r8c/yXGtjQW885OaSXn6I= +github.com/m-lab/go v0.1.67/go.mod h1:BirARfHWjjXHaCGNyWCm/CKW1OarjuEj8Yn6Z2rc0M4= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/internal/setup/setup_test.go b/internal/setup/setup_test.go index a6ba6ec..fe94617 100644 --- a/internal/setup/setup_test.go +++ b/internal/setup/setup_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "cloud.google.com/go/bigquery" "github.com/m-lab/go/rtx" "github.com/m-lab/prometheus-bigquery-exporter/sql" "github.com/prometheus/client_golang/prometheus" @@ -69,8 +70,8 @@ func TestFile_IsModified(t *testing.T) { type fakeRunner struct{} -func (f *fakeRunner) Query(query string) ([]sql.Metric, error) { - return nil, fmt.Errorf("Fake failure") +func (f *fakeRunner) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) { + return nil, nil, fmt.Errorf("Fake failure") } func TestFile_Update(t *testing.T) { @@ -106,8 +107,8 @@ type fakeRegister struct { metric sql.Metric } -func (f *fakeRegister) Query(query string) ([]sql.Metric, error) { - return []sql.Metric{f.metric}, nil +func (f *fakeRegister) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) { + return []sql.Metric{f.metric}, nil, nil } func TestFile_Register(t *testing.T) { diff --git a/main.go b/main.go index 1897305..0c896c7 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,8 @@ package main import ( "flag" "fmt" - "io/ioutil" "log" + "os" "path/filepath" "strings" "sync" @@ -76,7 +76,7 @@ func fileToMetric(filename string) string { // fileToQuery reads the content of the given file and returns the query with template values repalced with those in vars. func fileToQuery(filename string, vars map[string]string) string { - queryBytes, err := ioutil.ReadFile(filename) + queryBytes, err := os.ReadFile(filename) rtx.Must(err, "Failed to open %q", filename) q := string(queryBytes) diff --git a/main_test.go b/main_test.go index 05606d0..fc21bd2 100644 --- a/main_test.go +++ b/main_test.go @@ -27,7 +27,7 @@ type fakeRunner struct { updated int } -func (f *fakeRunner) Query(query string) ([]sql.Metric, error) { +func (f *fakeRunner) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) { r := []sql.Metric{ { LabelKeys: []string{"key"}, @@ -40,9 +40,9 @@ func (f *fakeRunner) Query(query string) ([]sql.Metric, error) { f.updated++ if f.updated > 1 { // Simulate an error after one successful query. - return nil, fmt.Errorf("Fake failure for testing") + return nil, nil, fmt.Errorf("Fake failure for testing") } - return r, nil + return r, nil, nil } func Test_main(t *testing.T) { diff --git a/query/bigquery_runner.go b/query/bigquery_runner.go index e53f53f..3b052c6 100644 --- a/query/bigquery_runner.go +++ b/query/bigquery_runner.go @@ -19,23 +19,53 @@ type bigQueryImpl struct { bqiface.Client } -func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Value) error) error { +func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Value) error) (*bigquery.QueryStatistics, error) { + // Create a query job. q := b.Client.Query(query) - it, err := q.Read(context.Background()) + job, err := q.Run(context.Background()) + if err != nil { - return err + return nil, err } + + if job == nil { + return nil, nil + } + + // Wait for the job to complete. + status, err := job.Wait(context.Background()) + if err != nil { + return nil, status.Err() + } + // Now, you can proceed with reading and processing the query results. var row map[string]bigquery.Value + + it, err := job.Read(context.Background()) + + if err != nil { + return nil, err + } + // Get the query statistics to extract the cost. + queryStatistics := new(bigquery.QueryStatistics) + // Assuming that the LastStatus method returns a string + if job.LastStatus() != nil { + queryStats := job.LastStatus().Statistics.Details.(*bigquery.QueryStatistics) + if queryStats != nil { + queryStatistics = queryStats + } + } + for err = it.Next(&row); err == nil; err = it.Next(&row) { err2 := visit(row) if err2 != nil { - return err2 + return queryStatistics, err2 } } + if err != iterator.Done { - return err + return queryStatistics, err } - return nil + return queryStatistics, nil } // BQRunner is a concerete implementation of QueryRunner for BigQuery. @@ -45,7 +75,7 @@ type BQRunner struct { // runner interface allows unit testing of the Query function. type runner interface { - Query(q string, visit func(row map[string]bigquery.Value) error) error + Query(q string, visit func(row map[string]bigquery.Value) error) (*bigquery.QueryStatistics, error) } // NewBQRunner creates a new QueryRunner instance. @@ -60,16 +90,16 @@ func NewBQRunner(client *bigquery.Client) *BQRunner { // Query executes the given query. Query only supports standard SQL. The // query must define a column named "value" for the value, and may define // additional columns, all of which are used as metric labels. -func (qr *BQRunner) Query(query string) ([]sql.Metric, error) { +func (qr *BQRunner) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) { metrics := []sql.Metric{} - err := qr.runner.Query(query, func(row map[string]bigquery.Value) error { + queryStatistics, err := qr.runner.Query(query, func(row map[string]bigquery.Value) error { metrics = append(metrics, rowToMetric(row)) return nil }) if err != nil { - return nil, err + return nil, queryStatistics, err } - return metrics, nil + return metrics, queryStatistics, nil } // valToFloat extracts a float from the bigquery.Value irrespective of the diff --git a/query/bigquery_runner_test.go b/query/bigquery_runner_test.go index 9200cb5..70bbb1e 100644 --- a/query/bigquery_runner_test.go +++ b/query/bigquery_runner_test.go @@ -9,7 +9,7 @@ import ( "cloud.google.com/go/bigquery" "github.com/m-lab/prometheus-bigquery-exporter/sql" - "github.com/m-lab/go/cloud/bqfake" + "github.com/m-lab/go/cloudtest/bqfake" ) func TestRowToMetric(t *testing.T) { @@ -101,21 +101,21 @@ func TestRowToMetric(t *testing.T) { } type fakeQuery struct { - err error rows []map[string]bigquery.Value + err error } -func (f *fakeQuery) Query(q string, visit func(row map[string]bigquery.Value) error) error { +func (f *fakeQuery) Query(q string, visit func(row map[string]bigquery.Value) error) (*bigquery.QueryStatistics, error) { if f.err != nil { - return f.err + return nil, f.err } for i := range f.rows { err := visit(f.rows[i]) if err != nil { - return err + return nil, err } } - return nil + return nil, nil } func TestBQRunner_Query(t *testing.T) { @@ -151,7 +151,7 @@ func TestBQRunner_Query(t *testing.T) { qr := &BQRunner{ runner: tt.runner, } - got, err := qr.Query("select * from `fake-table`") + got, _, err := qr.Query("select * from `fake-table`") if (err != nil) != tt.wantErr { t.Errorf("BQRunner.Query() error = %v, wantErr %v", err, tt.wantErr) return @@ -170,15 +170,15 @@ func TestNewBQRunner(t *testing.T) { func TestBigQueryImpl_Query(t *testing.T) { tests := []struct { name string - config bqfake.QueryConfig[map[string]bigquery.Value] + config bqfake.QueryConfig query string visit func(row map[string]bigquery.Value) error wantErr bool }{ { name: "success-iteration", - config: bqfake.QueryConfig[map[string]bigquery.Value]{ - RowIteratorConfig: bqfake.RowIteratorConfig[map[string]bigquery.Value]{ + config: bqfake.QueryConfig{ + RowIteratorConfig: bqfake.RowIteratorConfig{ Rows: []map[string]bigquery.Value{{"value": 1.234}}, }, }, @@ -188,8 +188,8 @@ func TestBigQueryImpl_Query(t *testing.T) { }, { name: "visit-error", - config: bqfake.QueryConfig[map[string]bigquery.Value]{ - RowIteratorConfig: bqfake.RowIteratorConfig[map[string]bigquery.Value]{ + config: bqfake.QueryConfig{ + RowIteratorConfig: bqfake.RowIteratorConfig{ Rows: []map[string]bigquery.Value{{"value": 1.234}}, }, }, @@ -200,15 +200,15 @@ func TestBigQueryImpl_Query(t *testing.T) { }, { name: "read-error", - config: bqfake.QueryConfig[map[string]bigquery.Value]{ + config: bqfake.QueryConfig{ ReadErr: fmt.Errorf("This is a fake read error"), }, wantErr: true, }, { name: "iterator-error", - config: bqfake.QueryConfig[map[string]bigquery.Value]{ - RowIteratorConfig: bqfake.RowIteratorConfig[map[string]bigquery.Value]{ + config: bqfake.QueryConfig{ + RowIteratorConfig: bqfake.RowIteratorConfig{ IterErr: fmt.Errorf("This is a fake iterator error"), }, }, @@ -221,9 +221,10 @@ func TestBigQueryImpl_Query(t *testing.T) { b := &bigQueryImpl{ Client: client, } - if err := b.Query(tt.query, tt.visit); (err != nil) != tt.wantErr { + if _, err := b.Query(tt.query, tt.visit); (err != nil) != tt.wantErr { t.Errorf("bigQueryImpl.Query() error = %v, wantErr %v", err, tt.wantErr) } + }) } } diff --git a/sql/collector.go b/sql/collector.go index 3f01188..a164e34 100644 --- a/sql/collector.go +++ b/sql/collector.go @@ -6,8 +6,8 @@ import ( "sync" "time" + "cloud.google.com/go/bigquery" "github.com/m-lab/go/logx" - "github.com/prometheus/client_golang/prometheus" ) @@ -29,7 +29,7 @@ func NewMetric(labelKeys []string, labelValues []string, values map[string]float // QueryRunner defines the interface used to run a query and return an array of metrics. type QueryRunner interface { - Query(q string) ([]Metric, error) + Query(q string) ([]Metric, *bigquery.QueryStatistics, error) } // Collector manages a prometheus.Collector for queries performed by a QueryRunner. @@ -51,7 +51,9 @@ type Collector struct { metrics []Metric // mux locks access to types above. mux sync.Mutex - + // Total billed bytes by BigQuery + totalBytesBilled int64 + slotMillis int64 // RegisterErr contains any error during registration. This should be considered fatal. RegisterErr error } @@ -59,13 +61,15 @@ type Collector struct { // NewCollector creates a new BigQuery Collector instance. func NewCollector(runner QueryRunner, valType prometheus.ValueType, metricName, query string) *Collector { return &Collector{ - runner: runner, - metricName: metricName, - query: query, - valType: valType, - descs: nil, - metrics: nil, - mux: sync.Mutex{}, + runner: runner, + metricName: metricName, + query: query, + valType: valType, + descs: nil, + metrics: nil, + totalBytesBilled: 0, + slotMillis: 0, + mux: sync.Mutex{}, } } @@ -106,6 +110,8 @@ func (col *Collector) Collect(ch chan<- prometheus.Metric) { desc, col.valType, metrics[i].Values[k], metrics[i].LabelValues...) } } + setSlotMillis(ch, col.slotMillis, col.metricName) + setTotalBytesBilled(ch, col.totalBytesBilled, col.metricName) } // String satisfies the Stringer interface. String returns the metric name. @@ -117,7 +123,7 @@ func (col *Collector) String() string { // Update is called automaticlly after the collector is registered. func (col *Collector) Update() error { logx.Debug.Println("Update:", col.metricName) - metrics, err := col.runner.Query(col.query) + metrics, queryStatistics, err := col.runner.Query(col.query) if err != nil { logx.Debug.Println("Failed to run query:", err) return err @@ -128,6 +134,10 @@ func (col *Collector) Update() error { // Replace slice reference with new value returned from Query. References // to the previous value of col.metrics are not affected. col.metrics = metrics + if queryStatistics != nil { + col.totalBytesBilled = queryStatistics.TotalBytesBilled + col.slotMillis = queryStatistics.SlotMillis + } return nil } @@ -140,3 +150,13 @@ func (col *Collector) setDesc() { } } } + +func setSlotMillis(ch chan<- prometheus.Metric, slotMillis int64, metricName string) { + desc := prometheus.NewDesc("bqx_slot_seconds_utilized", "slot milliseconds utilized", []string{"filename"}, nil) + ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(slotMillis)/1000, metricName) +} + +func setTotalBytesBilled(ch chan<- prometheus.Metric, totalBytesBilled int64, metricName string) { + desc := prometheus.NewDesc("bqx_total_bytes_billed", "Total billed bytes", []string{"filename"}, nil) + ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(totalBytesBilled), metricName) +} diff --git a/sql/collector_test.go b/sql/collector_test.go index 34c61d7..0742344 100644 --- a/sql/collector_test.go +++ b/sql/collector_test.go @@ -2,13 +2,14 @@ package sql import ( "fmt" - "io/ioutil" + "io" "log" "net/http" "reflect" "strings" "testing" + "cloud.google.com/go/bigquery" "github.com/m-lab/go/prometheusx" "github.com/m-lab/go/prometheusx/promtest" "github.com/prometheus/client_golang/prometheus" @@ -18,17 +19,17 @@ type fakeQueryRunner struct { metrics []Metric } -func (qr *fakeQueryRunner) Query(query string) ([]Metric, error) { - return qr.metrics, nil +func (qr *fakeQueryRunner) Query(query string) ([]Metric, *bigquery.QueryStatistics, error) { + return qr.metrics, nil, nil } type errorQueryRunner struct { count int } -func (qr *errorQueryRunner) Query(query string) ([]Metric, error) { +func (qr *errorQueryRunner) Query(query string) ([]Metric, *bigquery.QueryStatistics, error) { qr.count++ - return nil, fmt.Errorf("Fake query error") + return nil, nil, fmt.Errorf("Fake query error") } func TestCollector(t *testing.T) { @@ -47,6 +48,8 @@ func TestCollector(t *testing.T) { expectedMetrics := []string{ `fake_metric{key="thing"} 1.1`, `fake_metric{key="thing2"} 2.1`, + `bqx_slot_seconds_utilized{filename="fake_metric"} 0`, + `bqx_total_bytes_billed{filename="fake_metric"} 0`, } c := NewCollector( &fakeQueryRunner{metrics}, prometheus.GaugeValue, "fake_metric", "-- not used") @@ -54,8 +57,8 @@ func TestCollector(t *testing.T) { // NOTE: prometheus.Desc and prometheus.Metric are opaque interfaces that do // not allow introspection. But, we know how many to expect, so check the // counts added to the channels. - chDesc := make(chan *prometheus.Desc, 2) - chCol := make(chan prometheus.Metric, 2) + chDesc := make(chan *prometheus.Desc, 4) + chCol := make(chan prometheus.Metric, 4) c.Describe(chDesc) c.Collect(chCol) @@ -66,8 +69,8 @@ func TestCollector(t *testing.T) { if len(chDesc) != 1 { t.Fatalf("want 1 prometheus.Desc, got %d\n", len(chDesc)) } - if len(chCol) != 2 { - t.Fatalf("want 2 prometheus.Metric, got %d\n", len(chCol)) + if len(chCol) != 4 { + t.Fatalf("want 4 prometheus.Metric, got %d\n", len(chCol)) } // Normally, we use the default registry via prometheus.Register. Using a @@ -87,7 +90,7 @@ func TestCollector(t *testing.T) { if err != nil { log.Fatal(err) } - rawMetrics, err := ioutil.ReadAll(res.Body) + rawMetrics, err := io.ReadAll(res.Body) res.Body.Close() if err != nil { log.Fatal(err) diff --git a/sql/live_test.go b/sql/live_test.go index 19061be..57479ce 100644 --- a/sql/live_test.go +++ b/sql/live_test.go @@ -67,10 +67,13 @@ func TestLiveQuery(t *testing.T) { } for _, test := range tests { t.Logf("Live query test: %s", test.name) - metrics, err := qr.Query(test.query) + metrics, queryStatistics, err := qr.Query(test.query) if err != nil { t.Fatal(err) } + if queryStatistics == nil { + t.Error("QueryStatistics is nil") + } if !reflect.DeepEqual(metrics, test.metrics) { t.Errorf("Metrics do not match:\nwant %#v;\n got %#v", test.metrics, metrics) }