Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ build/
*.iml
*.ipr
out/
**/.DS_Store

### NetBeans ###
/nbproject/private/
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation 'com.amazonaws:aws-java-sdk-sns'
implementation 'com.amazonaws:aws-java-sdk-sts'
implementation 'com.fasterxml.jackson.core:jackson-core:2.12.1'
implementation 'org.json:json:20211205'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package org.apache.jmeter.protocol.aws.kinesis;

import org.apache.jmeter.config.Argument;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.aws.AWSClientSDK2;
import org.apache.jmeter.protocol.aws.AWSSampler;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.json.JSONArray;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.SdkClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.*;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Kinesis Producer Sampler class to connect and publish data records in Kinesis streams.
* @author samar ranjan
* @since 01/18/2021
* @see "https://github.com/JoseLuisSR/awsmeter"
*/
public class KinesisRecordProducerSampler extends AWSSampler implements AWSClientSDK2 {

/**
* Log attribute.
*/
protected static Logger log = LoggerFactory.getLogger(KinesisRecordProducerSampler.class);

/**
* Kinesis Stream name.
*/
private static final String KINESIS_STREAM_NAME = "kinesis_stream_name";

/**
* Kinesis Stream Partition key.
*/
private static final String KINESIS_PARTITION_KEY = "partition_key";

/**
* Kinesis Stream Data Record.
*/
private static final String KINESIS_DATA_RECORD = "data_record";

/**
* Set Kinesis Data Stream.
*/
private static final List<Argument> KINESIS_PARAMETERS = Stream.of(
new Argument(KINESIS_STREAM_NAME, EMPTY),
new Argument(KINESIS_PARTITION_KEY, EMPTY),
new Argument(KINESIS_DATA_RECORD, EMPTY))
.collect(Collectors.toList());

/**
* AWS Kinesis Data Stream Client.
*/
private KinesisClient kinesisClient;

/**
* Create AWS Kinesis Data Stream Client.
* @param credentials
* Represents the input of JMeter Java Request parameters.
* @return KinesisClient extends SdkClient super class.
*/
@Override
public SdkClient createSdkClient(Map<String, String> credentials) {
return KinesisClient.builder()
.region(Region.of(getAWSRegion(credentials)))
.credentialsProvider(getAwsCredentialsProvider(credentials))
.build();
}

/**
* Initial values for test parameter. They are show in Java Request test sampler.
* AWS parameters and Kinesis Data Stream parameters.
* @return Arguments to set as default on Java Request.
*/
@Override
public Arguments getDefaultParameters() {
Arguments defaultParameters = new Arguments();
defaultParameters.setArguments(Stream.of(AWS_PARAMETERS, KINESIS_PARAMETERS)
.flatMap(List::stream)
.collect(Collectors.toList()));
return defaultParameters;
}

/**
* Read test parameters and initialize AWS Kinesis Data Stream client.
* @param context to get the arguments values on Java Sampler.
*/
@Override
public void setupTest(JavaSamplerContext context) {

log.info("Setup Kinesis Producer Sampler.");
Map<String, String> credentials = new HashMap<>();

context.getParameterNamesIterator()
.forEachRemaining( k -> {
credentials.put(k, context.getParameter(k));
log.info("Parameter: " + k + ", value: " + credentials.get(k));
});

log.info("Create Kinesis Producer.");
kinesisClient = (KinesisClient) createSdkClient(credentials);
}

/**
* Main method to execute the test on single thread. Create Data Records and publish it in Kinesis stream.
* @param context
* Arguments values on Java Sampler.
* @return SampleResult, captures data such as whether the test was successful,
* the response code and message, any request or response data, and the test start/end times
*/
@Override
public SampleResult runTest(JavaSamplerContext context) {

SampleResult result = newSampleResult();
sampleResultStart(result, String.format("Stream Name: %s \nPartition Key: %s \nData Record: %s",
context.getParameter(KINESIS_STREAM_NAME),
context.getParameter(KINESIS_PARTITION_KEY),
context.getParameter(KINESIS_DATA_RECORD)));

try {
log.info("Publishing Data Records.");
PutRecordsResponse response = kinesisClient.putRecords(createPutRecordRequest(context));
sampleResultSuccess(result,String.format("Response details: %s \nEncryption Type: %s",
response.toString(),
response.encryptionTypeAsString()));
}catch (KinesisException e){
sampleResultFail(result, e.awsErrorDetails().errorCode(), e.awsErrorDetails().errorMessage());
}

return result;
}

/**
* Close AWS Kinesis Data Stream Client after run single thread.
* @param context
* Arguments values on Java Sampler.
*/
@Override
public void teardownTest(JavaSamplerContext context) {
log.info("Close Kinesis Producer.");
kinesisClient.close();
}

/**
* Create PutRecordsRequest with stream name, partition key and data.
* @param context
* Arguments values on Java Sampler.
* @return PutRecordsRequest
*/
public PutRecordsRequest createPutRecordRequest(JavaSamplerContext context) {
String tempPayload = context.getParameter(KINESIS_DATA_RECORD);
JSONArray jsonArray = new JSONArray(tempPayload);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
PutRecordsRequest putRecordsRequest = null;
for (int i = 0; i < jsonArray.length(); i++) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit of records is 500.
Please add validation to not exceed this limit.

try {
putRecordsRequestEntryList.add(PutRecordsRequestEntry.builder()
.data(SdkBytes.fromByteArray(String.valueOf(jsonArray.get(i)).getBytes(StandardCharsets.UTF_8)))
.partitionKey(context.getParameter(KINESIS_PARTITION_KEY) + "-" + i).build());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each record could specify the Data, ExplicitHashKey, and Partition Key.
Could you please get these fields from the JSON and use them to build PutReecordsRequestEntry.



} catch (JSONException e) {
e.printStackTrace();
}
putRecordsRequest = PutRecordsRequest.builder()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you are building PutRecordRequest for each record in the array.
I guess is better to create PutRecordRequest once with the List<PutRecordsRequestEntry and streamName.

.streamName(context.getParameter(KINESIS_STREAM_NAME))
.records(putRecordsRequestEntryList).build();
}
return putRecordsRequest;
}
}