Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,13 @@

`region` - The AWS region your Kinesis stream is located - i.e. eu-west-2

`batch_size` - Optional. The maximum size of the records in a batch is 5MiB. If your records are larger in size setting this batch size paramter can guarantee you don't have failed delivery due to too large a batch. Default size if unset is 100.
`batch_size` - The maximum size of the records in a batch is 5MiB. If your records are larger in size setting this batch size paramter can guarantee you don't have failed delivery due to too large a batch. Default size if unset is 100 (optional)

`access_key_id` - AWS Access Key ID for authentication. If not provided, will use default credential chain (environment variables, shared credentials file, IAM roles, etc.) (optional)

`secret_access_key` - AWS Secret Access Key for authentication. If not provided, will use default credential chain (optional)

`session_token` - AWS Session Token for temporary credentials (optional)

Check notice on line 1367 in README.md

View check run for this annotation

probelabs / Visor: security

documentation Issue

The documentation introduces new configuration fields for static AWS credentials but does not include a security warning about the risks of storing sensitive keys in plaintext configuration files.
Raw output
Add a security note to the documentation recommending the use of more secure authentication methods like IAM roles or environment variables where possible, and warning users about the risks of committing configuration files containing plaintext credentials to source control.

Check warning on line 1367 in README.md

View check run for this annotation

probelabs / Visor: quality

documentation Issue

The documentation for the new static credential fields does not include a security warning regarding the risks of storing sensitive keys in plaintext configuration files.
Raw output
Add a security note to the documentation. It should recommend using more secure authentication methods like IAM roles or environment variables where possible and warn users about the risks of committing configuration files containing plaintext credentials to source control.

###### JSON / Conf File

Expand All @@ -1368,7 +1374,10 @@
"meta": {
"stream_name": "my-stream",
"region": "eu-west-2",
"batch_size": 100
"batch_size": 100,
"access_key_id": "your-key-id",
"secret_access_key": "your-secret-key",
"session_token": "your-session-token"
}
},
```
Expand All @@ -1376,11 +1385,14 @@
###### Env Variables

```
#Kinesis Pump Configuration

Check notice on line 1388 in README.md

View check run for this annotation

probelabs / Visor: quality

documentation Issue

There is a typo in the environment variable for the Kinesis stream name. `TYK_PMP_PUMPS_KINESIs_META_STREAMNAME` should be `TYK_PMP_PUMPS_KINESIS_META_STREAMNAME`.
Raw output
Correct the typo to ensure consistency with other environment variables and prevent configuration errors for users.
TYK_PMP_PUMPS_KINESIS_TYPE=kinesis
TYK_PMP_PUMPS_KINESIs_META_STREAMNAME=my-stream
TYK_PMP_PUMPS_KINESIS_META_STREAMNAME=my-stream
TYK_PMP_PUMPS_KINESIS_META_REGION=eu-west-2
TYK_PMP_PUMPS_KINESIS_META_BATCHSIZE=100
TYK_PMP_PUMPS_KINESIS_META_ACCESSKEYID=your-key-id
TYK_PMP_PUMPS_KINESIS_META_SECRETACCESSKEY=your-secret-key
TYK_PMP_PUMPS_KINESIS_META_SESSIONTOKEN=your-session-token
```

# Base Pump Configurations
Expand Down
15 changes: 14 additions & 1 deletion pumps/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
)
Expand All @@ -37,6 +38,12 @@
StreamName string `mapstructure:"stream_name"`
// AWS Region the Kinesis stream targets
Region string `mapstructure:"region"`
// AWS Access Key ID for authentication. If not provided, will use default credential chain (environment variables, shared credentials file, IAM roles, etc.) -- Optional
AccessKeyID string `mapstructure:"access_key_id"`
// AWS Secret Access Key for authentication. If not provided, will use default credential chain -- Optional

Check notice on line 43 in pumps/kinesis.go

View check run for this annotation

probelabs / Visor: quality

style Issue

The comment for `SecretAccessKey` is less descriptive than the one for `AccessKeyID`. While the `AccessKeyID` comment explains the fallback mechanism in detail (environment variables, shared credentials file, IAM roles, etc.), the `SecretAccessKey` comment is more brief.
Raw output
For improved consistency and clarity, expand the comment for `SecretAccessKey` to match the level of detail provided for `AccessKeyID`.

Check notice on line 43 in pumps/kinesis.go

View check run for this annotation

probelabs / Visor: style

documentation Issue

The comment for `SecretAccessKey` is less descriptive than the one for `AccessKeyID`. While the `AccessKeyID` comment explains the fallback mechanism in detail (environment variables, shared credentials file, IAM roles, etc.), the `SecretAccessKey` comment is more brief.
Raw output
For improved consistency and clarity, expand the comment for `SecretAccessKey` to match the level of detail provided for `AccessKeyID`.
SecretAccessKey string `mapstructure:"secret_access_key"`
// AWS Session Token for temporary credentials -- Optional
SessionToken string `mapstructure:"session_token"`
// Each PutRecords (the function used in this pump)request can support up to 500 records.
// Each record in the request can be as large as 1 MiB, up to a limit of 5 MiB for the entire request, including partition keys.
// Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MiB per second.
Expand Down Expand Up @@ -68,8 +75,14 @@

// Load AWS configuration
// Credentials are loaded as specified in
// https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials

Check warning on line 78 in pumps/kinesis.go

View check run for this annotation

probelabs / Visor: security

security Issue

The code checks for `AccessKeyID` and `SecretAccessKey` being non-empty, but it doesn't validate if only one is provided. If a user provides an `AccessKeyID` but not a `SecretAccessKey` (or vice-versa), the code will fall back to the default credential chain, which might be confusing and lead to unexpected behavior.
Raw output
Improve the conditional logic to handle cases where only one of the two required credential fields is provided. For example, log a warning or return an error if the credential configuration is incomplete, making the behavior more explicit and preventing silent fallbacks.
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(p.kinesisConf.Region))
var cfg aws.Config
if p.kinesisConf.AccessKeyID != "" && p.kinesisConf.SecretAccessKey != "" {
creds := credentials.NewStaticCredentialsProvider(p.kinesisConf.AccessKeyID, p.kinesisConf.SecretAccessKey, p.kinesisConf.SessionToken)
cfg, err = awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithCredentialsProvider(creds), awsconfig.WithRegion(p.kinesisConf.Region))

Check warning on line 82 in pumps/kinesis.go

View check run for this annotation

probelabs / Visor: security

security Issue

The introduction of static AWS credentials (`access_key_id`, `secret_access_key`) directly in the configuration file increases the risk of sensitive data exposure. If this configuration file is accidentally committed to version control, shared, or otherwise exposed, it would lead to a credential leak.
Raw output
While providing static credentials offers flexibility, it is a less secure practice compared to using the default AWS credential chain (like IAM roles for EC2/ECS or environment variables). The documentation in README.md should be updated to include a prominent warning about the risks of storing plaintext credentials and strongly recommend using more secure, dynamically provisioned credentials in production environments.

Check warning on line 82 in pumps/kinesis.go

View check run for this annotation

probelabs / Visor: quality

logic Issue

The configuration logic silently falls back to the default AWS credential chain if only one of the two required static credential keys (`AccessKeyID` or `SecretAccessKey`) is provided. This can lead to confusing behavior for users who might expect an error for an incomplete configuration.
Raw output
Improve the conditional logic to be more explicit. If one key is provided without the other, log a warning or return an error to inform the user that the static credential configuration is incomplete and that the system is falling back to the default chain, or that the configuration is invalid.
} else {
cfg, err = awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(p.kinesisConf.Region))
}

Check notice on line 85 in pumps/kinesis.go

View check run for this annotation

probelabs / Visor: quality

maintainability Issue

The `Init` method's use of `log.Fatalf` on configuration errors makes the function difficult to test reliably, as it terminates the entire process. The current tests verify the parsed configuration but cannot easily test the error-handling path within `Init` itself.
Raw output
Refactor the `Init` method to return an `error` instead of calling `log.Fatalf`. This is a more idiomatic Go practice that allows callers (and tests) to handle errors gracefully. Tests could then assert that specific invalid configurations correctly produce an error.
if err != nil {
p.log.Fatalf("unable to load Kinesis SDK config, %v", err)
}
Expand Down
113 changes: 113 additions & 0 deletions pumps/kinesis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package pumps

import (
"testing"

"github.com/mitchellh/mapstructure"
"github.com/stretchr/testify/assert"
)

func TestKinesisPump_StaticCredentials_ConfigurationParsing(t *testing.T) {
tests := []struct {
name string
input map[string]interface{}
expected KinesisConf
}{
{
name: "Complete static credentials",
input: map[string]interface{}{
"stream_name": "test-stream",
"region": "us-east-1",
"access_key_id": "AKIATEST123",
"secret_access_key": "secretkey123",
"session_token": "sessiontoken123",
"batch_size": 100,
},
expected: KinesisConf{
StreamName: "test-stream",
Region: "us-east-1",
AccessKeyID: "AKIATEST123",
SecretAccessKey: "secretkey123",
SessionToken: "sessiontoken123",
BatchSize: 100,
},
},
{
name: "Static credentials without session token",
input: map[string]interface{}{
"stream_name": "test-stream",
"region": "us-east-1",
"access_key_id": "AKIATEST123",
"secret_access_key": "secretkey123",
},
expected: KinesisConf{
StreamName: "test-stream",
Region: "us-east-1",
AccessKeyID: "AKIATEST123",
SecretAccessKey: "secretkey123",
SessionToken: "",
},
},
{
name: "No static credentials (default chain)",
input: map[string]interface{}{
"stream_name": "test-stream",
"region": "us-west-2",
},
expected: KinesisConf{
StreamName: "test-stream",
Region: "us-west-2",
AccessKeyID: "",
SecretAccessKey: "",
SessionToken: "",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var conf KinesisConf
err := mapstructure.Decode(tt.input, &conf)
assert.NoError(t, err)

assert.Equal(t, tt.expected.AccessKeyID, conf.AccessKeyID)
assert.Equal(t, tt.expected.SecretAccessKey, conf.SecretAccessKey)
assert.Equal(t, tt.expected.SessionToken, conf.SessionToken)
})
}
}

func TestKinesisPump_StaticCredentials_Logic(t *testing.T) {
t.Run("Should use static credentials when both keys provided", func(t *testing.T) {
config := map[string]interface{}{
"stream_name": "test-stream",
"region": "us-east-1",
"access_key_id": "AKIATEST123",
"secret_access_key": "secretkey123",
}

pump := &KinesisPump{}
pump.Init(config)

Check failure on line 90 in pumps/kinesis_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value of `pump.Init` is not checked (errcheck)

if pump.kinesisConf != nil {
hasStaticCreds := pump.kinesisConf.AccessKeyID != "" && pump.kinesisConf.SecretAccessKey != ""
assert.True(t, hasStaticCreds, "Should have static credentials")
}
})

t.Run("Should fall back to default chain when incomplete", func(t *testing.T) {
config := map[string]interface{}{
"stream_name": "test-stream",
"region": "us-east-1",
"access_key_id": "AKIATEST123",
}

pump := &KinesisPump{}
pump.Init(config)

Check failure on line 106 in pumps/kinesis_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value of `pump.Init` is not checked (errcheck)

if pump.kinesisConf != nil {
hasStaticCreds := pump.kinesisConf.AccessKeyID != "" && pump.kinesisConf.SecretAccessKey != ""
assert.False(t, hasStaticCreds, "Should use default credential chain")
}
})

Check warning on line 112 in pumps/kinesis_test.go

View check run for this annotation

probelabs / Visor: quality

logic Issue

The test function `TestKinesisPump_StaticCredentials_Logic` is redundant and its name is misleading. It re-tests the configuration parsing which is already covered in `TestKinesisPump_StaticCredentials_ConfigurationParsing`. However, it does not actually verify that the AWS client initialization logic correctly uses static credentials or falls back to the default chain, which is what the name 'Logic' implies.
Raw output
Consider removing this test function to avoid redundancy. Alternatively, refactor it into a more meaningful unit test that verifies the initialization behavior, possibly by mocking the AWS config loading process to assert that the correct credential providers are selected based on the input configuration. A more descriptive name like `TestKinesisPump_Init_CredentialSelection` would also improve clarity.

Check warning on line 112 in pumps/kinesis_test.go

View check run for this annotation

probelabs / Visor: style

style Issue

The test function `TestKinesisPump_StaticCredentials_Logic` is redundant and its name is misleading. It re-tests the configuration parsing which is already covered in `TestKinesisPump_StaticCredentials_ConfigurationParsing`. However, it does not actually verify that the AWS client initialization logic correctly uses static credentials or falls back to the default chain, which is what the name 'Logic' implies.
Raw output
Consider removing this test function to avoid redundancy. Alternatively, refactor it into a more meaningful unit test that verifies the initialization behavior, possibly by mocking the AWS config loading process to assert that the correct credential providers are selected based on the input configuration. A more descriptive name like `TestKinesisPump_Init_CredentialSelection` would also improve clarity.
}
Loading