Skip to content

Commit 34a38fc

Browse files
committed
implemented support for SQS long polling with a thread per SQS Queue
1 parent 2844a6b commit 34a38fc

File tree

2 files changed

+31
-8
lines changed

2 files changed

+31
-8
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
<dependency>
6363
<groupId>com.amazonaws</groupId>
6464
<artifactId>aws-java-sdk</artifactId>
65-
<version>1.3.3</version>
65+
<version>1.3.26</version>
6666
</dependency>
6767
</dependencies>
6868

src/main/java/com/base2services/jenkins/SqsQueueHandler.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
import com.cloudbees.jenkins.GitHubWebHook;
99
import hudson.Extension;
1010
import hudson.model.PeriodicWork;
11+
import hudson.util.SequentialExecutionQueue;
1112
import hudson.util.TimeUnit2;
1213

1314
import java.util.List;
15+
import java.util.concurrent.Executors;
1416
import java.util.logging.Logger;
1517

1618
/**
@@ -23,20 +25,45 @@ public class SqsQueueHandler extends PeriodicWork {
2325

2426
private static final Logger LOGGER = Logger.getLogger(SqsQueueHandler.class.getName());
2527

28+
private transient final SequentialExecutionQueue queue = new SequentialExecutionQueue(Executors.newFixedThreadPool(2));
29+
2630
@Override
2731
public long getRecurrencePeriod() {
28-
return TimeUnit2.SECONDS.toMillis(30);
32+
return TimeUnit2.SECONDS.toMillis(2);
2933
}
3034

3135
@Override
3236
protected void doRun() throws Exception {
33-
List<SqsProfile> profiles = SqsBuildTrigger.DescriptorImpl.get().getSqsProfiles();
34-
for(SqsProfile profile : profiles) {
37+
if(queue.getInProgress().size() == 0) {
38+
List<SqsProfile> profiles = SqsBuildTrigger.DescriptorImpl.get().getSqsProfiles();
39+
queue.setExecutors(Executors.newFixedThreadPool(profiles.size()));
40+
for(final SqsProfile profile : profiles) {
41+
queue.execute(new SQSQueueReceiver(profile));
42+
}
43+
} else {
44+
logger.fine("Currently Waiting for Messages from Queues");
45+
}
46+
}
47+
48+
public static SqsQueueHandler get() {
49+
return PeriodicWork.all().get(SqsQueueHandler.class);
50+
}
51+
52+
private class SQSQueueReceiver implements Runnable {
53+
54+
private SqsProfile profile;
55+
56+
private SQSQueueReceiver(SqsProfile profile) {
57+
this.profile = profile;
58+
}
59+
60+
public void run() {
3561
LOGGER.fine("looking for build triggers on queue:" + profile.sqsQueue);
3662
AmazonSQS sqs = profile.getSQSClient();
3763
String queueUrl = profile.getQueueUrl();
3864
TriggerProcessor processor = profile.getTriggerProcessor();
3965
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
66+
receiveMessageRequest.setWaitTimeSeconds(20);
4067
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
4168
for(Message message : messages) {
4269
//Process the message payload, it needs to conform to the GitHub Web-Hook JSON format
@@ -52,8 +79,4 @@ protected void doRun() throws Exception {
5279
}
5380
}
5481
}
55-
56-
public static SqsQueueHandler get() {
57-
return PeriodicWork.all().get(SqsQueueHandler.class);
58-
}
5982
}

0 commit comments

Comments
 (0)