Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -653,34 +654,46 @@ public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) {
* @return true if the request was sent, else false
*/
public boolean sendRequest(@NonNull final PublishStreamRequest request) {
return sendRequest(-1, -1, request);
}

private boolean sendRequest(
final long blockNumber, final int requestNumber, @NonNull final PublishStreamRequest request) {
requireNonNull(request, "request must not be null");

final Pipeline<? super PublishStreamRequest> pipeline = requestPipelineRef.get();

if (getConnectionState() == ConnectionState.ACTIVE && pipeline != null) {
try {
if (logger.isDebugEnabled()) {
if (blockNumber == -1 && requestNumber == -1) {
logger.debug(
"{} Sending request to block node (type={}).",
"{} Sending ad hoc request to block node (type={})",
this,
request.request().kind());
}
if (logger.isTraceEnabled()) {
/*
PublishStreamRequest#protobufSize does the size calculation lazily and thus calling this can incur
a performance penality. Therefore, we only want to log the byte size at trace level.
*/
logger.trace(
"{} Sending request to block node (type={}, bytes={})",
} else {
logger.debug(
"{} [block={}, request={}] Sending request to block node (type={})",
this,
request.request().kind(),
request.protobufSize());
blockNumber,
requestNumber,
request.request().kind());
}

final long startMs = System.currentTimeMillis();
pipeline.onNext(request);
final long durationMs = System.currentTimeMillis() - startMs;
blockStreamMetrics.recordRequestLatency(durationMs);
logger.trace("{} Request took {}ms to send", this, durationMs);

if (blockNumber == -1 && requestNumber == -1) {
logger.trace("{} Ad hoc request took {}ms to send", this, durationMs);
} else {
logger.trace(
"{} [block={}, request={}] Request took {}ms to send",
this,
blockNumber,
requestNumber,
durationMs);
}

if (request.hasEndStream()) {
blockStreamMetrics.recordRequestEndStreamSent(
Expand Down Expand Up @@ -944,6 +957,7 @@ private class ConnectionWorkerLoopTask implements Runnable {
private int itemIndex = 0;
private BlockState block;
private long lastSendTimeMillis = -1;
private final AtomicInteger requestCtr = new AtomicInteger(1);

@Override
public void run() {
Expand Down Expand Up @@ -991,7 +1005,7 @@ private void doWork() {
"{} Starting to process items for block {}", BlockNodeConnection.this, block.blockNumber());
}

final int itemSize = item.protobufSize();
final int itemSize = item.protobufSize() + 2; // each item has 2 bytes of overhead
final long newRequestBytes = pendingRequestBytes + itemSize;

if (newRequestBytes > MAX_BYTES_PER_REQUEST) {
Expand Down Expand Up @@ -1073,14 +1087,26 @@ private boolean sendPendingRequest() {
final PublishStreamRequest req =
PublishStreamRequest.newBuilder().blockItems(itemSet).build();

if (logger.isTraceEnabled()) {
logger.trace(
"{} Attempting to send request (block={}, request={}, itemCount={}, estimatedBytes={} actualBytes={})",
BlockNodeConnection.this,
block.blockNumber(),
requestCtr.get(),
pendingRequestItems.size(),
pendingRequestBytes,
req.protobufSize());
}

try {
if (sendRequest(req)) {
if (sendRequest(block.blockNumber(), requestCtr.get(), req)) {
// record that we've sent the request
lastSendTimeMillis = System.currentTimeMillis();

// clear the pending request data
pendingRequestBytes = BYTES_PADDING;
pendingRequestItems.clear();
requestCtr.incrementAndGet();
return true;
}
} catch (final UncheckedIOException e) {
Expand Down Expand Up @@ -1135,6 +1161,7 @@ private void switchBlockIfNeeded() {
pendingRequestBytes = BYTES_PADDING;
itemIndex = 0;
pendingRequestItems.clear();
requestCtr.set(1);
}

/**
Expand Down
Loading