Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
chore: more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneswuerbach committed Feb 21, 2022
1 parent 2311752 commit 1169574
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 39 deletions.
13 changes: 8 additions & 5 deletions plugin/s3spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,11 @@ func (r *Reader) getServicesAndOperations(ctx context.Context) ([]types.Row, err
fmt.Sprintf(`datehour BETWEEN '%s' AND '%s'`, r.DefaultMinTime().Format(PARTION_FORMAT), r.DefaultMaxTime().Format(PARTION_FORMAT)),
}

result, err := r.queryAthenaCached(ctx, fmt.Sprintf(`SELECT service_name, operation_name, span_kind FROM "%s" WHERE %s GROUP BY 1, 2, 3 ORDER BY 1, 2, 3`, r.cfg.TableName, strings.Join(conditions, " AND ")), r.servicesQueryTTL)
result, err := r.queryAthenaCached(
ctx,
fmt.Sprintf(`SELECT service_name, operation_name, span_kind FROM "%s" WHERE %s GROUP BY 1, 2, 3 ORDER BY 1, 2, 3`, r.cfg.TableName, strings.Join(conditions, " AND ")),
fmt.Sprintf(`SELECT service_name, operation_name, span_kind FROM "%s" WHERE`, r.cfg.TableName),
r.servicesQueryTTL)
if err != nil {
return nil, fmt.Errorf("failed to query athena: %w", err)
}
Expand Down Expand Up @@ -301,7 +305,7 @@ func (r *Reader) GetDependencies(ctx context.Context, endTs time.Time, lookback
JOIN %s as jaeger ON spans_with_references.ref_trace_id = jaeger.trace_id AND spans_with_references.ref_span_id = jaeger.span_id
WHERE %s
GROUP BY 1, 2
`, r.cfg.TableName, r.cfg.TableName, strings.Join(conditions, " AND ")), r.dependenciesQueryTTL)
`, r.cfg.TableName, r.cfg.TableName, strings.Join(conditions, " AND ")), "WITH spans_with_reference", r.dependenciesQueryTTL)
if err != nil {
return nil, fmt.Errorf("failed to query athena: %w", err)
}
Expand All @@ -323,7 +327,7 @@ func (r *Reader) GetDependencies(ctx context.Context, endTs time.Time, lookback
return dependencyLinks, nil
}

func (r *Reader) queryAthenaCached(ctx context.Context, queryString string, ttl time.Duration) ([]types.Row, error) {
func (r *Reader) queryAthenaCached(ctx context.Context, queryString string, lookupString string, ttl time.Duration) ([]types.Row, error) {
paginator := athena.NewListQueryExecutionsPaginator(r.svc, &athena.ListQueryExecutionsInput{
WorkGroup: &r.cfg.WorkGroup,
})
Expand All @@ -345,7 +349,6 @@ func (r *Reader) queryAthenaCached(ctx context.Context, queryString string, ttl

latestCompletionDateTime := time.Now()
latestQueryExecutionId := ""
trimmedQueryString := strings.TrimSpace(queryString)

for _, value := range queryExecutionIdChunks {
value := value
Expand All @@ -359,7 +362,7 @@ func (r *Reader) queryAthenaCached(ctx context.Context, queryString string, ttl

for _, v := range result.QueryExecutions {
// Different query
if *v.Query != trimmedQueryString {
if !strings.Contains(*v.Query, lookupString) {
continue
}

Expand Down
197 changes: 181 additions & 16 deletions plugin/s3spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/athena"
"github.com/aws/aws-sdk-go-v2/service/athena/types"
"github.com/golang/mock/gomock"
"github.com/hashicorp/go-hclog"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/johanneswuerbach/jaeger-s3/plugin/config"
"github.com/johanneswuerbach/jaeger-s3/plugin/s3spanstore/mocks"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -44,17 +46,28 @@ func NewTestReader(ctx context.Context, assert *assert.Assertions, mockSvc *mock
return reader
}

func TestGetServices(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
func toAthenaResultSet(results [][]string) *types.ResultSet {
resultsRows := make([]types.Row, len(results)+1)
resultsRows[0] = types.Row{} // Header row, usually ignored by the reader
for i, row := range results {
row := row

rowValues := make([]types.Datum, len(row))
for i, columnValue := range row {
columnValue := columnValue
rowValues[i] = types.Datum{VarCharValue: &columnValue}
}

resultsRows[i+1] = types.Row{Data: rowValues}
}

return &types.ResultSet{Rows: resultsRows}
}

func mockQueryRunAndResult(mockSvc *mocks.MockAthenaAPI, result [][]string) {
queryID := "queryId"
now := time.Now()
serviceName := "test"

mockSvc := mocks.NewMockAthenaAPI(ctrl)
mockSvc.EXPECT().ListQueryExecutions(gomock.Any(), gomock.Any()).
Return(&athena.ListQueryExecutionsOutput{}, nil)
mockSvc.EXPECT().StartQueryExecution(gomock.Any(), gomock.Any()).
Return(&athena.StartQueryExecutionOutput{
QueryExecutionId: &queryID,
Expand All @@ -67,18 +80,23 @@ func TestGetServices(t *testing.T) {
},
},
}, nil)

mockSvc.EXPECT().GetQueryResults(gomock.Any(), gomock.Any()).
Return(&athena.GetQueryResultsOutput{
ResultSet: &types.ResultSet{
Rows: []types.Row{
{},
{Data: []types.Datum{
{VarCharValue: &serviceName},
}},
},
},
ResultSet: toAthenaResultSet(result),
}, nil)
}

func TestGetServices(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

serviceName := "test"

mockSvc := mocks.NewMockAthenaAPI(ctrl)
mockSvc.EXPECT().ListQueryExecutions(gomock.Any(), gomock.Any()).
Return(&athena.ListQueryExecutionsOutput{}, nil)

mockQueryRunAndResult(mockSvc, [][]string{{serviceName}})

assert := assert.New(t)
ctx := context.TODO()
Expand All @@ -90,3 +108,150 @@ func TestGetServices(t *testing.T) {
assert.NoError(err)
assert.Equal([]string{serviceName}, services)
}

func TestGetServicesCached(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

serviceName := "test"
assert := assert.New(t)
ctx := context.TODO()

validQueryID := "get-services"
invalidQueryID := "different"

mockSvc := mocks.NewMockAthenaAPI(ctrl)
mockSvc.EXPECT().ListQueryExecutions(gomock.Any(), gomock.Any()).
Return(&athena.ListQueryExecutionsOutput{
QueryExecutionIds: []string{invalidQueryID, validQueryID},
}, nil)

mockSvc.EXPECT().BatchGetQueryExecution(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, input *athena.BatchGetQueryExecutionInput, _ ...func(*athena.Options)) (*athena.BatchGetQueryExecutionOutput, error) {
assert.Equal([]string{invalidQueryID, validQueryID}, input.QueryExecutionIds)

return &athena.BatchGetQueryExecutionOutput{
QueryExecutions: []types.QueryExecution{
{
Query: aws.String("asdas"),
QueryExecutionId: aws.String("different"),
},
{
Query: aws.String(`SELECT service_name, operation_name, span_kind FROM "jaeger" WHERE`),
QueryExecutionId: aws.String("get-services"),
Status: &types.QueryExecutionStatus{
CompletionDateTime: aws.Time(time.Now().UTC()),
},
},
},
}, nil
})

mockSvc.EXPECT().GetQueryResults(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, input *athena.GetQueryResultsInput, _ ...func(*athena.Options)) (*athena.GetQueryResultsOutput, error) {
assert.Equal("get-services", *input.QueryExecutionId)

return &athena.GetQueryResultsOutput{
ResultSet: toAthenaResultSet([][]string{{serviceName}}),
}, nil
})

reader := NewTestReader(ctx, assert, mockSvc)

services, err := reader.GetServices(ctx)

assert.NoError(err)
assert.Equal([]string{serviceName}, services)
}

func TestGetOperations(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

results := [][]string{
{
"test",
"server-op",
"server",
},
{
"test",
"client-op",
"client",
},
{
"different",
"server-op",
"server",
},
}

assert := assert.New(t)
ctx := context.TODO()

mockSvc := mocks.NewMockAthenaAPI(ctrl)
mockSvc.EXPECT().ListQueryExecutions(gomock.Any(), gomock.Any()).
Return(&athena.ListQueryExecutionsOutput{}, nil)

mockQueryRunAndResult(mockSvc, results)

reader := NewTestReader(ctx, assert, mockSvc)

operations, err := reader.GetOperations(ctx, spanstore.OperationQueryParameters{ServiceName: "test", SpanKind: ""})

assert.NoError(err)
assert.Equal([]spanstore.Operation{
{
Name: "server-op",
SpanKind: "server",
},
{
Name: "client-op",
SpanKind: "client",
},
}, operations)
}

func TestGetOperationsWithSpanKind(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

results := [][]string{
{
"test",
"server-op",
"server",
},
{
"test",
"client-op",
"client",
},
{
"different",
"server-op",
"server",
},
}

assert := assert.New(t)
ctx := context.TODO()

mockSvc := mocks.NewMockAthenaAPI(ctrl)
mockSvc.EXPECT().ListQueryExecutions(gomock.Any(), gomock.Any()).
Return(&athena.ListQueryExecutionsOutput{}, nil)

mockQueryRunAndResult(mockSvc, results)

reader := NewTestReader(ctx, assert, mockSvc)

operations, err := reader.GetOperations(ctx, spanstore.OperationQueryParameters{ServiceName: "test", SpanKind: "server"})

assert.NoError(err)
assert.Equal([]spanstore.Operation{
{
Name: "server-op",
SpanKind: "server",
},
}, operations)
}
Loading

0 comments on commit 1169574

Please sign in to comment.