Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
Expand All @@ -47,6 +49,7 @@ public class AwsSubscription extends AbstractSubscription<AwsSubscription> {
private final SqsClient sqsClient;
private final boolean nackLazy;
private final long waitTimeSeconds;
private final String subscriptionUrl;

public AwsSubscription() {
this(new Builder());
Expand All @@ -57,6 +60,7 @@ public AwsSubscription(Builder builder) {
this.nackLazy = builder.nackLazy;
this.waitTimeSeconds = builder.waitTimeSeconds;
this.sqsClient = builder.sqsClient;
this.subscriptionUrl = builder.subscriptionUrl;
}

@Override
Expand All @@ -81,7 +85,7 @@ protected void doSendAcks(List<AckID> ackIDs) {
.build());
}
DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder()
.queueUrl(subscriptionName)
.queueUrl(subscriptionUrl)
.entries(entries)
.build();

Expand Down Expand Up @@ -124,7 +128,7 @@ protected void doSendNacks(List<AckID> ackIDs) {
.build());
}
ChangeMessageVisibilityBatchRequest request = ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(subscriptionName)
.queueUrl(subscriptionUrl)
.entries(entries)
.build();

Expand All @@ -147,7 +151,7 @@ protected void doSendNacks(List<AckID> ackIDs) {
@Override
protected List<Message> doReceiveBatch(int batchSize) {
ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder()
.queueUrl(subscriptionName)
.queueUrl(subscriptionUrl)
.maxNumberOfMessages(Math.min(batchSize, 10)) // SQS supports max 10 messages
.messageAttributeNames("All")
.attributeNames(QueueAttributeName.ALL);
Expand Down Expand Up @@ -310,10 +314,16 @@ static void validateSubscriptionName(String subscriptionName) {
if (subscriptionName == null || subscriptionName.trim().isEmpty()) {
throw new InvalidArgumentException("Subscription name cannot be null or empty");
}
if (!subscriptionName.startsWith("https://sqs.") || !subscriptionName.contains(".amazonaws.com/")) {
throw new InvalidArgumentException(
"Subscription name must be in format: https://sqs.region.amazonaws.com/account/queue-name, got: " + subscriptionName);
}
}

static String getQueueUrl(String queueName, SqsClient sqsClient)
throws AwsServiceException, SdkClientException {
GetQueueUrlRequest request = GetQueueUrlRequest.builder()
.queueName(queueName)
.build();

GetQueueUrlResponse response = sqsClient.getQueueUrl(request);
return response.queueUrl();
}

@Override
Expand Down Expand Up @@ -349,7 +359,7 @@ public boolean isRetryable(Throwable error) {
public GetAttributeResult getAttributes() {
try {
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
.queueUrl(subscriptionName)
.queueUrl(subscriptionUrl)
.attributeNames(QueueAttributeName.QUEUE_ARN)
.build();

Expand All @@ -361,7 +371,7 @@ public GetAttributeResult getAttributes() {

// Return subscription name (queue URL) as name, and queue ARN as topic
return new GetAttributeResult.Builder()
.name(subscriptionName)
.name(subscriptionUrl)
.topic(queueArn)
.build();
} catch (AwsServiceException | SdkClientException e) {
Expand Down Expand Up @@ -423,6 +433,7 @@ public static class Builder extends AbstractSubscription.Builder<AwsSubscription
private boolean nackLazy = false;
private long waitTimeSeconds = 0;
private SqsClient sqsClient;
private String subscriptionUrl;

public Builder() {
this.providerId = AwsConstants.PROVIDER_ID;
Expand All @@ -443,6 +454,15 @@ public Builder withSqsClient(SqsClient sqsClient) {
return this;
}

/**
* Directly set the subscription URL to avoid calling GetQueueUrl again.
* Used when the queue URL has already been resolved
*/
Builder withSubscriptionUrl(String subscriptionUrl) {
this.subscriptionUrl = subscriptionUrl;
return this;
}

Comment on lines +461 to +465
Copy link
Contributor Author

@roseyang62 roseyang62 Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this method because in integration tests, both topic and subscription may share the same queue. If each build() call triggers GetQueueUrl, WireMock records multiple mappings, which can cause replay mismatches.

By calling GetQueueUrl once and passing the cached URL via withSubscriptionUrl, we can avoid redundant API calls and ensure only one mapping is generated per queue.

private static SqsClient buildSqsClient(Builder builder) {
return SqsClientUtil.buildSqsClient(
builder.region,
Expand All @@ -457,6 +477,12 @@ public AwsSubscription build() {
if (sqsClient == null) {
sqsClient = buildSqsClient(this);
}

// get the full queue URL from the queue name
if (this.subscriptionUrl == null) {
this.subscriptionUrl = getQueueUrl(subscriptionName, sqsClient);
}

return new AwsSubscription(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;

class MetadataKeys {
public static final String DEDUPLICATION_ID = "DeduplicationId";
Expand All @@ -37,6 +39,7 @@ public class AwsTopic extends AbstractTopic<AwsTopic> {

private static final int MAX_SQS_ATTRIBUTES = 10;
private final SqsClient sqsClient;
private final String topicUrl;

public AwsTopic() {
this(new Builder());
Expand All @@ -45,6 +48,7 @@ public AwsTopic() {
public AwsTopic(Builder builder) {
super(builder);
this.sqsClient = builder.sqsClient;
this.topicUrl = builder.topicUrl;
}

/**
Expand Down Expand Up @@ -105,7 +109,7 @@ private void sendToSqs(List<Message> messages) {
}

SendMessageBatchRequest batchRequest = SendMessageBatchRequest.builder()
.queueUrl(topicName)
.queueUrl(topicUrl)
.entries(entries)
.build();

Expand Down Expand Up @@ -168,23 +172,23 @@ protected void executeAfterSendBatchHooks(List<Message> messages) {
}

/**
* Validates that the topic name is in the correct AWS SQS URL format.
* Validates that the topic name is a queue name
*/
static void validateTopicName(String topicName) {
if (topicName == null) {
throw new InvalidArgumentException("SQS topic name cannot be null");
}
if (topicName.trim().isEmpty()) {
throw new InvalidArgumentException("SQS topic name cannot be empty");
}

// Validate SQS URL format: https://sqs.region.amazonaws.com/account/queue-name
String sqsUrlPattern = "https://sqs\\.[^/]+\\.amazonaws\\.com/[^/]+/.+";
if (!topicName.matches(sqsUrlPattern)) {
throw new InvalidArgumentException(
"SQS topic name must be in format: https://sqs.region.amazonaws.com/account/queue-name, got: " + topicName);
if (topicName == null || topicName.trim().isEmpty()) {
throw new InvalidArgumentException("SQS topic name cannot be null or empty");
}
}

static String getQueueUrl(String queueName, SqsClient sqsClient)
throws AwsServiceException, SdkClientException {
GetQueueUrlRequest request = GetQueueUrlRequest.builder()
.queueName(queueName)
.build();

GetQueueUrlResponse response = sqsClient.getQueueUrl(request);
return response.queueUrl();
}

/**
* Sets SQS-specific attributes on a SendMessageBatchRequestEntry based on message metadata.
Expand Down Expand Up @@ -322,6 +326,7 @@ public Builder builder() {

public static class Builder extends AbstractTopic.Builder<AwsTopic> {
private SqsClient sqsClient;
private String topicUrl;

public Builder() {
this.providerId = AwsConstants.PROVIDER_ID;
Expand All @@ -332,6 +337,15 @@ public Builder withSqsClient(SqsClient sqsClient) {
return this;
}

/**
* Directly set the topic URL to avoid calling GetQueueUrl again.
* Used when the queue URL has already been resolved
*/
Builder withTopicUrl(String topicUrl) {
this.topicUrl = topicUrl;
return this;
}

private static SqsClient buildSqsClient(Builder builder) {
return SqsClientUtil.buildSqsClient(
builder.region,
Expand All @@ -345,6 +359,12 @@ public AwsTopic build() {
if (sqsClient == null) {
sqsClient = buildSqsClient(this);
}

// get the full queue URL from the queue name
if (this.topicUrl == null) {
this.topicUrl = getQueueUrl(this.topicName, sqsClient);
}

return new AwsTopic(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;

import java.net.URI;
import java.util.List;
Expand All @@ -31,7 +31,7 @@ public class AwsPubsubIT extends AbstractPubsubIT {
private static final String BASE_QUEUE_NAME = "test-queue";

private HarnessImpl harnessImpl;
private String currentQueueUrl;
private String queueName;

@Override
protected Harness createHarness() {
Expand All @@ -40,17 +40,16 @@ protected Harness createHarness() {
}

/**
* Generate a unique queue URL for each test.
* Generate a unique queue name for each test.
* Uses test method name so the same test always uses the same queue.
* Topic creates queue, Subscription does not.
*/
@BeforeEach
public void setupTestQueue(TestInfo testInfo) {
String testMethodName = testInfo.getTestMethod().map(m -> m.getName()).orElse("unknown");
String queueName = BASE_QUEUE_NAME + "-" + testMethodName;
currentQueueUrl = String.format("https://sqs.us-west-2.amazonaws.com/%s/%s", ACCOUNT_ID, queueName);
queueName = BASE_QUEUE_NAME + "-" + testMethodName;
if (harnessImpl != null) {
harnessImpl.setQueueUrl(currentQueueUrl);
harnessImpl.setQueueName(queueName);
}
}

Expand All @@ -60,10 +59,12 @@ public static class HarnessImpl implements Harness {
private SqsClient sqsClient;
private SdkHttpClient httpClient;
private int port = ThreadLocalRandom.current().nextInt(1000, 10000);
private String queueUrl = String.format("https://sqs.us-west-2.amazonaws.com/%s/%s", ACCOUNT_ID, BASE_QUEUE_NAME);
private String queueName = BASE_QUEUE_NAME;
private String cachedQueueUrl; // Cache queue URL to avoid calling GetQueueUrl multiple times for the same queue

public void setQueueUrl(String queueUrl) {
this.queueUrl = queueUrl;
public void setQueueName(String queueName) {
this.queueName = queueName;
this.cachedQueueUrl = null; // Reset cache when queue name changes
}

private SqsClient createSqsClient() {
Expand All @@ -83,49 +84,74 @@ private SqsClient createSqsClient() {
return sqsClient;
}

@Override
public AbstractTopic createTopicDriver() {
sqsClient = createSqsClient();

// Topic creates queue if it doesn't exist (idempotent)
// Extract queue name from queue URL: https://sqs.region.amazonaws.com/account/queue-name
String queueName = queueUrl.substring(queueUrl.lastIndexOf('/') + 1);
/**
* Ensures the queue exists before build() is called.
* In record mode, we create the queue if it doesn't exist (without calling GetQueueUrl).
* In replay mode, we don't do anything - only build() will call GetQueueUrl once.
* This ensures only one GetQueueUrl mapping is generated per test.
*/
private void ensureQueueExists() {
if (System.getProperty("record") != null) {
// In record mode, create queue if it doesn't exist
// In record mode, try to create queue if it doesn't exist
// We don't call GetQueueUrl here to avoid generating multiple mappings
// build() will call GetQueueUrl once, which will handle both existing and new queues
try {
try {
sqsClient.getQueueUrl(GetQueueUrlRequest.builder()
.queueName(queueName)
.build());
} catch (QueueDoesNotExistException e) {
sqsClient.createQueue(CreateQueueRequest.builder()
.queueName(queueName)
.build());
}
// Try to create the queue - CreateQueue is idempotent if queue already exists
sqsClient.createQueue(CreateQueueRequest.builder()
.queueName(queueName)
.build());
} catch (Exception e) {
System.err.println("Warning: Failed to create queue in createTopicDriver: " + e.getMessage());
// If creation fails, build() will handle it when calling GetQueueUrl
System.err.println("Warning: Failed to create queue: " + e.getMessage());
}
}
// In replay mode, do nothing - build() will call GetQueueUrl once
}

@Override
public AbstractTopic createTopicDriver() {
sqsClient = createSqsClient();
ensureQueueExists();

// If queue URL is not cached, get it now (this will be the only GetQueueUrl call for this queue)
if (cachedQueueUrl == null) {
GetQueueUrlResponse response = sqsClient.getQueueUrl(GetQueueUrlRequest.builder()
.queueName(queueName)
.build());
cachedQueueUrl = response.queueUrl();
}

AwsTopic.Builder topicBuilder = new AwsTopic.Builder();
System.out.println("createTopicDriver using queueUrl: " + queueUrl);
topicBuilder.withTopicName(queueUrl);
System.out.println("createTopicDriver using queueName: " + queueName);
topicBuilder.withTopicName(queueName);
topicBuilder.withSqsClient(sqsClient);
topic = new AwsTopic(topicBuilder);
topicBuilder.withTopicUrl(cachedQueueUrl); // Use cached URL to avoid calling GetQueueUrl again
topic = topicBuilder.build();

return topic;
}

@Override
public AbstractSubscription createSubscriptionDriver() {
sqsClient = createSqsClient();
ensureQueueExists();

// If queue URL is not cached, get it now (this will be the only GetQueueUrl call for this queue)
if (cachedQueueUrl == null) {
GetQueueUrlResponse response = sqsClient.getQueueUrl(GetQueueUrlRequest.builder()
.queueName(queueName)
.build());
cachedQueueUrl = response.queueUrl();
}

AwsSubscription.Builder subscriptionBuilder = new AwsSubscription.Builder();
System.out.println("createSubscriptionDriver using queueUrl: " + queueUrl);
subscriptionBuilder.withSubscriptionName(queueUrl);
System.out.println("createSubscriptionDriver using queueName: " + queueName);
subscriptionBuilder.withSubscriptionName(queueName);
subscriptionBuilder.withWaitTimeSeconds(1); // Use 1 second wait time for conformance tests
subscriptionBuilder.withSqsClient(sqsClient);
subscriptionBuilder.withSubscriptionUrl(cachedQueueUrl); // Use cached URL to avoid calling GetQueueUrl again

// Disable dynamic batch size adjustment by setting MaxHandlers=1 and MaxBatchSize=1
subscriptionBuilder.build(); // This will use the cached URL, not call GetQueueUrl
subscription = new AwsSubscription(subscriptionBuilder) {
@Override
protected Batcher.Options createReceiveBatcherOptions() {
Expand Down
Loading
Loading