Skip to content

Commit 277f032

Browse files
committed
stackdriver exporter implemented
1 parent 2ec72f4 commit 277f032

File tree

6 files changed

+1133
-0
lines changed

6 files changed

+1133
-0
lines changed

Diff for: monitoring/exporter/data_test.go

+126
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package exporter
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
"go.opencensus.io/stats"
10+
"go.opencensus.io/stats/view"
11+
"go.opencensus.io/tag"
12+
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
13+
)
14+
15+
// This file defines various data needed for testing.
16+
17+
func init() {
18+
// For testing convenience, we reduce maximum time series that metric client accepts.
19+
MaxTimeSeriesPerUpload = 3
20+
}
21+
22+
const (
23+
label1name = "key_1"
24+
label2name = "key_2"
25+
label3name = "key_3"
26+
label4name = "key_4"
27+
label5name = "key_5"
28+
29+
value1 = "value_1"
30+
value2 = "value_2"
31+
value3 = "value_3"
32+
value4 = "value_4"
33+
value5 = "value_5"
34+
value6 = "value_6"
35+
36+
metric1name = "metric_1"
37+
metric1desc = "this is metric 1"
38+
metric2name = "metric_2"
39+
metric2desc = "this is metric 2"
40+
41+
project1 = "project-1"
42+
project2 = "project-2"
43+
)
44+
45+
var (
46+
ctx = context.Background()
47+
48+
// This error is used for test to catch some error happpened.
49+
invalidDataError = errors.New("invalid data")
50+
// This error is used for unexpected error.
51+
unrecognizedDataError = errors.New("unrecognized data")
52+
53+
key1 = getKey(label1name)
54+
key2 = getKey(label2name)
55+
key3 = getKey(label3name)
56+
57+
view1 = &view.View{
58+
Name: metric1name,
59+
Description: metric1desc,
60+
TagKeys: nil,
61+
Measure: stats.Int64(metric1name, metric1desc, stats.UnitDimensionless),
62+
Aggregation: view.Sum(),
63+
}
64+
view2 = &view.View{
65+
Name: metric2name,
66+
Description: metric2desc,
67+
TagKeys: []tag.Key{key1, key2, key3},
68+
Measure: stats.Int64(metric2name, metric2desc, stats.UnitDimensionless),
69+
Aggregation: view.Sum(),
70+
}
71+
72+
// To make verification easy, we require all valid rows should int64 values and all of them
73+
// must be distinct.
74+
view1row1 = &view.Row{
75+
Tags: nil,
76+
Data: &view.SumData{Value: 1},
77+
}
78+
view1row2 = &view.Row{
79+
Tags: nil,
80+
Data: &view.SumData{Value: 2},
81+
}
82+
view1row3 = &view.Row{
83+
Tags: nil,
84+
Data: &view.SumData{Value: 3},
85+
}
86+
view2row1 = &view.Row{
87+
Tags: []tag.Tag{{key1, value1}, {key2, value2}, {key3, value3}},
88+
Data: &view.SumData{Value: 4},
89+
}
90+
view2row2 = &view.Row{
91+
Tags: []tag.Tag{{key1, value4}, {key2, value5}, {key3, value6}},
92+
Data: &view.SumData{Value: 5},
93+
}
94+
// This Row does not have valid Data field, so is invalid.
95+
invalidRow = &view.Row{Data: nil}
96+
97+
startTime1 = endTime1.Add(-10 * time.Second)
98+
endTime1 = startTime2.Add(-time.Second)
99+
startTime2 = endTime2.Add(-10 * time.Second)
100+
endTime2 = time.Now()
101+
102+
resource1 = &monitoredrespb.MonitoredResource{
103+
Type: "cloudsql_database",
104+
Labels: map[string]string{
105+
"project_id": project1,
106+
"region": "us-central1",
107+
"database_id": "cloud-SQL-instance-1",
108+
},
109+
}
110+
resource2 = &monitoredrespb.MonitoredResource{
111+
Type: "gce_instance",
112+
Labels: map[string]string{
113+
"project_id": project2,
114+
"zone": "us-east1",
115+
"database_id": "GCE-instance-1",
116+
},
117+
}
118+
)
119+
120+
func getKey(name string) tag.Key {
121+
key, err := tag.NewKey(name)
122+
if err != nil {
123+
panic(fmt.Errorf("key creation failed for key name: %s", name))
124+
}
125+
return key
126+
}

Diff for: monitoring/exporter/exporter.go

+232
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
// Package exporter provides a way to export data from opencensus to multiple GCP projects.
2+
//
3+
// General assumptions or requirements when using this exporter.
4+
// 1. The basic unit of data is a view.Data with only a single view.Row. We define it as a separate
5+
// type called RowData.
6+
// 2. We can inspect each RowData to tell whether this RowData is applicable for this exporter.
7+
// 3. For RowData that is applicable to this exporter, we require that
8+
// 3.1. Any view associated to RowData corresponds to a stackdriver metric, and it is already
9+
// defined for all GCP projects.
10+
// 3.2. RowData has correcponding GCP projects, and we can determine its project ID.
11+
// 3.3. After trimming labels and tags, configuration of all view data matches that of corresponding
12+
// stackdriver metric
13+
package exporter
14+
15+
import (
16+
"context"
17+
"errors"
18+
"fmt"
19+
"sync"
20+
"time"
21+
22+
monitoring "cloud.google.com/go/monitoring/apiv3"
23+
gax "github.com/googleapis/gax-go"
24+
"go.opencensus.io/stats/view"
25+
"google.golang.org/api/option"
26+
"google.golang.org/api/support/bundler"
27+
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
28+
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
29+
)
30+
31+
// StatsExporter is the exporter that can be registered to opencensus. A StatsExporter object must
32+
// be created by NewStatsExporter().
33+
type StatsExporter struct {
34+
ctx context.Context
35+
client metricClient
36+
opts *Options
37+
38+
// copy of some option values which may be modified by exporter.
39+
getProjectID func(*RowData) (string, error)
40+
onError func(error, ...*RowData)
41+
makeResource func(*RowData) (*monitoredrespb.MonitoredResource, error)
42+
43+
// mu protects access to projDataMap
44+
mu sync.Mutex
45+
// per-project data of exporter
46+
projDataMap map[string]*projectData
47+
}
48+
49+
// Options designates various parameters used by stats exporter. Default value of fields in Options
50+
// are valid for use.
51+
type Options struct {
52+
// ClientOptions designates options for creating metric client, especially credentials for
53+
// RPC calls.
54+
ClientOptions []option.ClientOption
55+
56+
// options for bundles amortizing export requests. Note that a bundle is created for each
57+
// project. When not provided, default values in bundle package are used.
58+
BundleDelayThreshold time.Duration
59+
BundleCountThreshold int
60+
61+
// callback functions provided by user.
62+
63+
// GetProjectID is used to filter whether given row data can be applicable to this exporter
64+
// and if so, it also determines the projectID of given row data. If
65+
// RowDataNotApplicableError is returned, then the row data is not applicable to this
66+
// exporter, and it will be silently ignored. Though not recommended, other errors can be
67+
// returned, and in that case the error is reported to callers via OnError and the row data
68+
// will not be uploaded to stackdriver. When GetProjectID is not set, all row data will be
69+
// considered not applicable to this exporter.
70+
GetProjectID func(*RowData) (projectID string, err error)
71+
// OnError is used to report any error happened while exporting view data fails. Whenever
72+
// this function is called, it's guaranteed that at least one row data is also passed to
73+
// OnError. Row data passed to OnError must not be modified. When OnError is not set, all
74+
// errors happened on exporting are ignored.
75+
OnError func(error, ...*RowData)
76+
// MakeResource creates monitored resource from RowData. It is guaranteed that only RowData
77+
// that passes GetProjectID will be given to this function. Though not recommended, error
78+
// can be returned, and in that case the error is reported to callers via OnError and the
79+
// row data will not be uploaded to stackdriver. When MakeResource is not set, global
80+
// resource is used for all RowData objects.
81+
MakeResource func(rd *RowData) (*monitoredrespb.MonitoredResource, error)
82+
83+
// options concerning labels.
84+
85+
// DefaultLabels store default value of some labels. Labels in DefaultLabels need not be
86+
// specified in tags of view data. Default labels and tags of view may have overlapping
87+
// label keys. In this case, values in tag are used. Default labels are used for labels
88+
// those are constant throughout export operation, like version number of the calling
89+
// program.
90+
DefaultLabels map[string]string
91+
// UnexportedLabels contains key of labels that will not be exported stackdriver. Typical
92+
// uses of unexported labels will be either that marks project ID, or that's used only for
93+
// constructing resource.
94+
UnexportedLabels []string
95+
}
96+
97+
// default values for options
98+
func defaultGetProjectID(rd *RowData) (string, error) {
99+
return "", RowDataNotApplicableError
100+
}
101+
102+
func defaultOnError(err error, rds ...*RowData) {}
103+
104+
func defaultMakeResource(rd *RowData) (*monitoredrespb.MonitoredResource, error) {
105+
return &monitoredrespb.MonitoredResource{Type: "global"}, nil
106+
}
107+
108+
// NewStatsExporter creates a StatsExporter object. Once a call to NewStatsExporter is made, any
109+
// fields in opts must not be modified at all. ctx will also be used throughout entire exporter
110+
// operation when making RPC call.
111+
func NewStatsExporter(ctx context.Context, opts *Options) (*StatsExporter, error) {
112+
client, err := newMetricClient(ctx, opts.ClientOptions...)
113+
if err != nil {
114+
return nil, fmt.Errorf("failed to create a metric client: %v", err)
115+
}
116+
117+
e := &StatsExporter{
118+
ctx: ctx,
119+
client: client,
120+
opts: opts,
121+
projDataMap: make(map[string]*projectData),
122+
}
123+
124+
// We don't want to modify user-supplied options, so save default options directly in
125+
// exporter.
126+
if opts.GetProjectID != nil {
127+
e.getProjectID = opts.GetProjectID
128+
} else {
129+
e.getProjectID = defaultGetProjectID
130+
}
131+
if opts.OnError != nil {
132+
e.onError = opts.OnError
133+
} else {
134+
e.onError = defaultOnError
135+
}
136+
if opts.MakeResource != nil {
137+
e.makeResource = opts.MakeResource
138+
} else {
139+
e.makeResource = defaultMakeResource
140+
}
141+
142+
return e, nil
143+
}
144+
145+
// We wrap monitoring.MetricClient and it's maker for testing.
146+
type metricClient interface {
147+
CreateTimeSeries(context.Context, *monitoringpb.CreateTimeSeriesRequest, ...gax.CallOption) error
148+
Close() error
149+
}
150+
151+
var newMetricClient = defaultNewMetricClient
152+
153+
func defaultNewMetricClient(ctx context.Context, opts ...option.ClientOption) (metricClient, error) {
154+
return monitoring.NewMetricClient(ctx, opts...)
155+
}
156+
157+
// RowData represents a single row in view data. This is our unit of computation. We use a single
158+
// row instead of view data because a view data consists of multiple rows, and each row may belong
159+
// to different projects.
160+
type RowData struct {
161+
View *view.View
162+
Start, End time.Time
163+
Row *view.Row
164+
}
165+
166+
// ExportView is the method called by opencensus to export view data. It constructs RowData out of
167+
// view.Data objects.
168+
func (e *StatsExporter) ExportView(vd *view.Data) {
169+
for _, row := range vd.Rows {
170+
rd := &RowData{
171+
View: vd.View,
172+
Start: vd.Start,
173+
End: vd.End,
174+
Row: row,
175+
}
176+
e.exportRowData(rd)
177+
}
178+
}
179+
180+
// RowDataNotApplicableError is used to tell that given row data is not applicable to the exporter.
181+
// See GetProjectID of Options for more detail.
182+
var RowDataNotApplicableError = errors.New("row data is not applicable to the exporter, so it will be ignored")
183+
184+
// exportRowData exports a single row data.
185+
func (e *StatsExporter) exportRowData(rd *RowData) {
186+
projID, err := e.getProjectID(rd)
187+
if err != nil {
188+
// We ignore non-applicable RowData.
189+
if err != RowDataNotApplicableError {
190+
newErr := fmt.Errorf("failed to get project ID on row data with view %s: %v", rd.View.Name, err)
191+
e.onError(newErr, rd)
192+
}
193+
return
194+
}
195+
pd := e.getProjectData(projID)
196+
switch err := pd.bndler.Add(rd, 1); err {
197+
case nil:
198+
case bundler.ErrOversizedItem:
199+
go pd.uploadRowData(rd)
200+
default:
201+
newErr := fmt.Errorf("failed to add row data with view %s to bundle for project %s: %v", rd.View.Name, projID, err)
202+
e.onError(newErr, rd)
203+
}
204+
}
205+
206+
func (e *StatsExporter) getProjectData(projectID string) *projectData {
207+
e.mu.Lock()
208+
defer e.mu.Unlock()
209+
if pd, ok := e.projDataMap[projectID]; ok {
210+
return pd
211+
}
212+
213+
pd := e.newProjectData(projectID)
214+
e.projDataMap[projectID] = pd
215+
return pd
216+
}
217+
218+
// Close flushes and closes the exporter. Close must be called after the exporter is unregistered
219+
// and no further calls to ExportView() are made. Once Close() is returned no further access to the
220+
// exporter is allowed in any way.
221+
func (e *StatsExporter) Close() error {
222+
e.mu.Lock()
223+
for _, pd := range e.projDataMap {
224+
pd.bndler.Flush()
225+
}
226+
e.mu.Unlock()
227+
228+
if err := e.client.Close(); err != nil {
229+
return fmt.Errorf("failed to close the metric client: %v", err)
230+
}
231+
return nil
232+
}

0 commit comments

Comments
 (0)