1
1
package tenantidprocessor
2
2
3
3
import (
4
+ "bytes"
4
5
"context"
6
+ "fmt"
7
+ "io"
8
+ "io/ioutil"
9
+ "net/http"
5
10
"testing"
6
11
"time"
7
12
13
+ "github.com/apache/thrift/lib/go/thrift"
14
+ "github.com/jaegertracing/jaeger/model"
15
+ jaegerconvert "github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
16
+ jaegerthrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
8
17
"github.com/stretchr/testify/assert"
9
18
"github.com/stretchr/testify/require"
10
19
"go.opentelemetry.io/collector/component"
11
20
"go.opentelemetry.io/collector/component/componenttest"
12
21
"go.opentelemetry.io/collector/config/configgrpc"
22
+ "go.opentelemetry.io/collector/config/confighttp"
13
23
"go.opentelemetry.io/collector/config/configtls"
14
24
"go.opentelemetry.io/collector/consumer"
15
25
"go.opentelemetry.io/collector/consumer/consumertest"
16
26
"go.opentelemetry.io/collector/consumer/pdata"
17
27
"go.opentelemetry.io/collector/exporter/otlpexporter"
28
+ "go.opentelemetry.io/collector/receiver/jaegerreceiver"
18
29
"go.opentelemetry.io/collector/receiver/otlpreceiver"
19
30
"go.opentelemetry.io/collector/testutil"
31
+ "go.opentelemetry.io/collector/translator/trace/jaeger"
20
32
"go.uber.org/zap"
21
33
"google.golang.org/grpc"
22
34
"google.golang.org/grpc/metadata"
@@ -145,6 +157,54 @@ func TestReceiveOTLPGRPC(t *testing.T) {
145
157
assert .Equal (t , reqTraces .SpanCount (), tenantAttrsFound )
146
158
}
147
159
160
+ func TestReceiveJaegerThriftHTTP (t * testing.T ) {
161
+ sink := new (consumertest.TracesSink )
162
+ tenantProcessor := & processor {
163
+ logger : zap .NewNop (),
164
+ tenantIDHeaderName : defaultTenantIdHeaderName ,
165
+ tenantIDAttributeKey : defaultTenantIdAttributeKey ,
166
+ }
167
+
168
+ addr := testutil .GetAvailableLocalAddress (t )
169
+ cfg := & jaegerreceiver.Config {
170
+ Protocols : jaegerreceiver.Protocols {
171
+ ThriftHTTP : & confighttp.HTTPServerSettings {
172
+ Endpoint : addr ,
173
+ },
174
+ },
175
+ }
176
+ params := component.ReceiverCreateParams {Logger : zap .NewNop ()}
177
+ jrf := jaegerreceiver .NewFactory ()
178
+ rec , err := jrf .CreateTracesReceiver (context .Background (), params , cfg , multiConsumer {
179
+ sink : sink ,
180
+ tenantIDprocessor : tenantProcessor ,
181
+ })
182
+ require .NoError (t , err )
183
+
184
+ err = rec .Start (context .Background (), componenttest .NewNopHost ())
185
+ require .NoError (t , err )
186
+ defer rec .Shutdown (context .Background ())
187
+
188
+ td := GenerateTraceDataOneSpan ()
189
+ batches , err := jaeger .InternalTracesToJaegerProto (td )
190
+ require .NoError (t , err )
191
+ collectorAddr := fmt .Sprintf ("http://%s/api/traces" , addr )
192
+ for _ , batch := range batches {
193
+ err := sendToJaegerHTTPThrift (collectorAddr , map [string ]string {tenantProcessor .tenantIDHeaderName : testTenantID },jaegerModelToThrift (batch ))
194
+ require .NoError (t , err )
195
+ }
196
+
197
+ traces := sink .AllTraces ()
198
+ assert .Equal (t , 1 , len (traces ))
199
+ tenantAttrsFound := assertTenantAttributeExists (
200
+ t ,
201
+ traces [0 ],
202
+ tenantProcessor .tenantIDAttributeKey ,
203
+ testTenantID ,
204
+ )
205
+ assert .Equal (t , td .SpanCount (), tenantAttrsFound )
206
+ }
207
+
148
208
func assertTenantAttributeExists (t * testing.T , trace pdata.Traces , tenantAttrKey string , tenantID string ) int {
149
209
numOfTenantAttrs := 0
150
210
rss := trace .ResourceSpans ()
@@ -242,6 +302,8 @@ func fillSpanOne(span pdata.Span) {
242
302
span .SetStartTime (TestSpanStartTimestamp )
243
303
span .SetEndTime (TestSpanEndTimestamp )
244
304
span .SetDroppedAttributesCount (1 )
305
+ span .SetTraceID (pdata .NewTraceID ([16 ]byte {0 , 1 , 2 }))
306
+ span .SetSpanID (pdata .NewSpanID ([8 ]byte {0 , 1 }))
245
307
evs := span .Events ()
246
308
evs .Resize (2 )
247
309
ev0 := evs .At (0 )
@@ -262,3 +324,47 @@ func fillSpanOne(span pdata.Span) {
262
324
func initSpanEventAttributes (dest pdata.AttributeMap ) {
263
325
dest .InitFromMap (spanEventAttributes )
264
326
}
327
+
328
+ func jaegerModelToThrift (batch * model.Batch ) * jaegerthrift.Batch {
329
+ return & jaegerthrift.Batch {
330
+ Process : jaegerProcessModelToThrift (batch .Process ),
331
+ Spans : jaegerconvert .FromDomain (batch .Spans ),
332
+ }
333
+ }
334
+
335
+ func jaegerProcessModelToThrift (process * model.Process ) * jaegerthrift.Process {
336
+ if process == nil {
337
+ return nil
338
+ }
339
+ return & jaegerthrift.Process {
340
+ ServiceName : process .ServiceName ,
341
+ }
342
+ }
343
+
344
+ func sendToJaegerHTTPThrift (endpoint string , headers map [string ]string , batch * jaegerthrift.Batch ) error {
345
+ buf , err := thrift .NewTSerializer ().Write (context .Background (), batch )
346
+ if err != nil {
347
+ return err
348
+ }
349
+ req , err := http .NewRequest ("POST" , endpoint , bytes .NewBuffer (buf ))
350
+ if err != nil {
351
+ return err
352
+ }
353
+ req .Header .Set ("Content-Type" , "application/x-thrift" )
354
+ for k , v := range headers {
355
+ req .Header .Add (k , v )
356
+ }
357
+
358
+ resp , err := http .DefaultClient .Do (req )
359
+ if err != nil {
360
+ return err
361
+ }
362
+
363
+ io .Copy (ioutil .Discard , resp .Body )
364
+ resp .Body .Close ()
365
+
366
+ if resp .StatusCode < 200 || resp .StatusCode >= 300 {
367
+ return fmt .Errorf ("failed to upload traces; HTTP status code: %d" , resp .StatusCode )
368
+ }
369
+ return nil
370
+ }
0 commit comments