Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -653,34 +654,46 @@ public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) {
* @return true if the request was sent, else false
*/
public boolean sendRequest(@NonNull final PublishStreamRequest request) {
return sendRequest(-1, -1, request);
}

private boolean sendRequest(
final long blockNumber, final int requestNumber, @NonNull final PublishStreamRequest request) {
requireNonNull(request, "request must not be null");

final Pipeline<? super PublishStreamRequest> pipeline = requestPipelineRef.get();

if (getConnectionState() == ConnectionState.ACTIVE && pipeline != null) {
try {
if (logger.isDebugEnabled()) {
if (blockNumber == -1 && requestNumber == -1) {
logger.debug(
"{} Sending request to block node (type={}).",
"{} Sending ad hoc request to block node (type={})",
this,
request.request().kind());
}
if (logger.isTraceEnabled()) {
/*
PublishStreamRequest#protobufSize does the size calculation lazily and thus calling this can incur
a performance penality. Therefore, we only want to log the byte size at trace level.
*/
logger.trace(
"{} Sending request to block node (type={}, bytes={})",
} else {
logger.debug(
"{} [block={}, request={}] Sending request to block node (type={})",
this,
request.request().kind(),
request.protobufSize());
blockNumber,
requestNumber,
request.request().kind());
}

final long startMs = System.currentTimeMillis();
pipeline.onNext(request);
final long durationMs = System.currentTimeMillis() - startMs;
blockStreamMetrics.recordRequestLatency(durationMs);
logger.trace("{} Request took {}ms to send", this, durationMs);

if (blockNumber == -1 && requestNumber == -1) {
logger.trace("{} Ad hoc request took {}ms to send", this, durationMs);
} else {
logger.trace(
"{} [block={}, request={}] Request took {}ms to send",
this,
blockNumber,
requestNumber,
durationMs);
}

if (request.hasEndStream()) {
blockStreamMetrics.recordRequestEndStreamSent(
Expand Down Expand Up @@ -944,6 +957,7 @@ private class ConnectionWorkerLoopTask implements Runnable {
private int itemIndex = 0;
private BlockState block;
private long lastSendTimeMillis = -1;
private final AtomicInteger requestCtr = new AtomicInteger(1);

@Override
public void run() {
Expand Down Expand Up @@ -991,7 +1005,7 @@ private void doWork() {
"{} Starting to process items for block {}", BlockNodeConnection.this, block.blockNumber());
}

final int itemSize = item.protobufSize();
final int itemSize = item.protobufSize() + 2; // each item has 2 bytes of overhead
final long newRequestBytes = pendingRequestBytes + itemSize;

if (newRequestBytes > MAX_BYTES_PER_REQUEST) {
Expand Down Expand Up @@ -1073,14 +1087,26 @@ private boolean sendPendingRequest() {
final PublishStreamRequest req =
PublishStreamRequest.newBuilder().blockItems(itemSet).build();

if (logger.isTraceEnabled()) {
logger.trace(
"{} Attempting to send request (block={}, request={}, itemCount={}, estimatedBytes={} actualBytes={})",
BlockNodeConnection.this,
block.blockNumber(),
requestCtr.get(),
pendingRequestItems.size(),
pendingRequestBytes,
req.protobufSize());
}

try {
if (sendRequest(req)) {
if (sendRequest(block.blockNumber(), requestCtr.get(), req)) {
// record that we've sent the request
lastSendTimeMillis = System.currentTimeMillis();

// clear the pending request data
pendingRequestBytes = BYTES_PADDING;
pendingRequestItems.clear();
requestCtr.incrementAndGet();
return true;
}
} catch (final UncheckedIOException e) {
Expand Down Expand Up @@ -1135,6 +1161,7 @@ private void switchBlockIfNeeded() {
pendingRequestBytes = BYTES_PADDING;
itemIndex = 0;
pendingRequestItems.clear();
requestCtr.set(1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ final Stream<DynamicTest> updateAppPropertiesWriterMode() {
timeRef::get,
Duration.ofMinutes(2),
Duration.ofMinutes(2),
String.format("/localhost:%s/ACTIVE] Sending request to block node", portNumbers.getFirst()),
String.format(
"/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE.",
portNumbers.getFirst()))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,8 @@ private Stream<DynamicTest> validateHappyPath(final int blocksToWait) {
assertBlockNodeCommsLogDoesNotContain(
byNodeId(0), "Block node has exceeded high latency threshold", Duration.ofSeconds(0)),
assertBlockNodeCommsLogContains(
byNodeId(0), "Sending request to block node (type=END_OF_BLOCK)", Duration.ofSeconds(0)));
byNodeId(0),
"Sending ad hoc request to block node (type=END_OF_BLOCK)",
Duration.ofSeconds(0)));
}
}
Loading