Skip to content

Commit ff4045d

Browse files
author
Donald Tregonning
authored
Issue 175 warning log for retry (#180)
* update 3rd party libs * Added warning for resend of batch and added UUID to batch * Update SplunkSinkTaskTest.java removed extra line
1 parent b523ef9 commit ff4045d

File tree

4 files changed

+16
-6
lines changed

4 files changed

+16
-6
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ Use the below schema to configure Splunk Connect for Kafka
149149
| `splunk.hec.ssl.trust.store.path` | Location of Java KeyStore. |`""`|
150150
| `splunk.hec.ssl.trust.store.password` | Password for Java KeyStore. |`""`|
151151
| `splunk.hec.json.event.formatted` | Set to `true` for events that are already in HEC format. Valid settings are `true` or `false`. |`false`|
152+
| `splunk.hec.max.outstanding.events` | Maximum amount of un-acknowledged events kept in memory by connector. Will trigger back-pressure event to slow down collection if reached. | `1000000` |
153+
| `splunk.hec.max.retries` | Amount of times a failed batch will attempt to resend before dropping events completely. Warning: This will result in data loss, default is `-1` which will retry indefinitely | `-1` |
152154
### Acknowledgement Parameters
153155
#### Use Ack
154156
| Name | Description | Default Value |

dependency-reduced-pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -210,4 +210,4 @@
210210
<maven.compiler.target>1.8</maven.compiler.target>
211211
<junit.version>4.12</junit.version>
212212
</properties>
213-
</project>
213+
</project>

src/main/java/com/splunk/hecclient/EventBatch.java

+6
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,18 @@
2424
import java.io.InputStream;
2525
import java.io.OutputStream;
2626
import java.io.SequenceInputStream;
27+
2728
import java.util.ArrayList;
2829
import java.util.Enumeration;
2930
import java.util.List;
3031
import java.util.Map;
32+
import java.util.UUID;
3133

3234
public abstract class EventBatch {
3335
private static Logger log = LoggerFactory.getLogger(EventBatch.class);
3436

37+
private UUID batchUUID = UUID.randomUUID();
38+
3539
private static final int INIT = 0;
3640
private static final int COMMITTED = 1;
3741
private static final int FAILED = 2;
@@ -103,6 +107,8 @@ public final List<Event> getEvents() {
103107
return events;
104108
}
105109

110+
public final String getUUID() {return batchUUID.toString(); }
111+
106112
// Total length of data for all events
107113
public final int length() {
108114
return len;

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -126,22 +126,24 @@ private void handleFailedBatches() {
126126
return;
127127
}
128128

129-
log.debug("going to handle {} failed batches", failed.size());
129+
log.debug("handling {} failed batches", failed.size());
130130
long failedEvents = 0;
131131
// if there are failed ones, first deal with them
132132
for (final EventBatch batch: failed) {
133133
failedEvents += batch.size();
134134
if (connectorConfig.maxRetries > 0 && batch.getFailureCount() > connectorConfig.maxRetries) {
135-
log.error("dropping EventBatch with {} events in it since it reaches max retries {}",
136-
batch.size(), connectorConfig.maxRetries);
135+
log.error("dropping EventBatch {} with {} events after reaching maximum retries {}",
136+
batch.getUUID(), batch.size(), connectorConfig.maxRetries);
137137
continue;
138138
}
139+
log.warn("attempting to resend batch {} with {} events, this is attempt {} out of {} for this batch ",
140+
batch.getUUID(), batch.size(), batch.getFailureCount(), connectorConfig.maxRetries);
139141
send(batch);
140142
}
141143

142144
log.info("handled {} failed batches with {} events", failed.size(), failedEvents);
143145
if (failedEvents * 10 > connectorConfig.maxOutstandingEvents) {
144-
String msg = String.format("failed events reach 10 %% of max outstanding events %d, pause the pull for a while", connectorConfig.maxOutstandingEvents);
146+
String msg = String.format("failed events have reached 10 %% of max outstanding events %d, pausing the pull of events for a while", connectorConfig.maxOutstandingEvents);
145147
throw new RetriableException(new HecException(msg));
146148
}
147149
}
@@ -284,7 +286,7 @@ private void send(final EventBatch batch) {
284286
} catch (Exception ex) {
285287
batch.fail();
286288
onEventFailure(Arrays.asList(batch), ex);
287-
log.error("failed to send batch", ex);
289+
log.error("failed to send batch {}" ,batch.getUUID(), ex);
288290
}
289291
}
290292

0 commit comments

Comments
 (0)