Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 135 additions & 12 deletions binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import android.os.IBinder;
import android.os.Parcel;
import android.os.Process;

import androidx.annotation.BinderThread;
import androidx.annotation.MainThread;

import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -59,7 +62,9 @@
import io.grpc.internal.StatsTraceContext;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -80,6 +85,12 @@ public final class BinderClientTransport extends BinderTransport
/** Number of ongoing calls which keep this transport "in-use". */
private final AtomicInteger numInUseStreams;

/** Last in-use state that was reported to the listener */
private final AtomicBoolean listenerInUse;

/** Synchronizes transport listener callbacks */
private final Object listenerNotifyLock;

private final long readyTimeoutMillis;
private final PingTracker pingTracker;
private final boolean preAuthorizeServer;
Expand Down Expand Up @@ -121,7 +132,9 @@ public BinderClientTransport(
preAuthServerOverride != null ? preAuthServerOverride : factory.preAuthorizeServers;
this.handshake =
factory.useLegacyAuthStrategy ? new LegacyClientHandshake() : new V2ClientHandshake();
numInUseStreams = new AtomicInteger();
this.numInUseStreams = new AtomicInteger();
this.listenerInUse = new AtomicBoolean();
this.listenerNotifyLock = new Object();
pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id));
serviceBinding =
new ServiceBinding(
Expand Down Expand Up @@ -265,9 +278,7 @@ public synchronized ClientStream newStream(
return newFailingClientStream(failure, attributes, headers, tracers);
}

if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) {
clientTransportListener.transportInUse(true);
}
updateInUseStreamsCountIfNeeded(inbound.countsForInUse(), 1);
Outbound.ClientOutbound outbound =
new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext);
if (method.getType().clientSendsOneMessage()) {
Expand All @@ -279,9 +290,7 @@ public synchronized ClientStream newStream(

@Override
protected void unregisterInbound(Inbound<?> inbound) {
if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) {
clientTransportListener.transportInUse(false);
}
updateInUseStreamsCountIfNeeded(inbound.countsForInUse(), -1);
super.unregisterInbound(inbound);
}

Expand All @@ -305,21 +314,38 @@ public synchronized void shutdownNow(Status reason) {
@Override
@GuardedBy("this")
void notifyShutdown(Status status) {
clientTransportListener.transportShutdown(status);
// Defer to listener executor with external synchronization
scheduleOnListener(
new Runnable() {
@Override
public void run() {
clientTransportListener.transportShutdown(status);
}
});
}

@Override
@GuardedBy("this")
void notifyTerminated() {
if (numInUseStreams.getAndSet(0) > 0) {
clientTransportListener.transportInUse(false);
if (listenerInUse.compareAndSet(true, false)) {
scheduleTransportInUseNotification(false);
} else {
listenerInUse.set(false);
}
}
if (readyTimeoutFuture != null) {
readyTimeoutFuture.cancel(false);
readyTimeoutFuture = null;
}
serviceBinding.unbind();
clientTransportListener.transportTerminated();
scheduleOnListener(
new Runnable() {
@Override
public void run() {
clientTransportListener.transportTerminated();
}
});
}

@Override
Expand Down Expand Up @@ -439,8 +465,11 @@ public void handleSetupTransport() {
@GuardedBy("this")
private void onHandshakeComplete() {
setState(TransportState.READY);
attributes = clientTransportListener.filterTransport(attributes);
clientTransportListener.transportReady();
final Attributes currentAttrs = attributes;
// Perform filter on listener thread with external synchronization, then update attrs and
// notify ready without holding transport lock to avoid deadlocks.
scheduleFilterTransportAndReady(currentAttrs);

if (readyTimeoutFuture != null) {
readyTimeoutFuture.cancel(false);
readyTimeoutFuture = null;
Expand All @@ -452,6 +481,100 @@ private synchronized void handleAuthResult(Throwable t) {
Status.INTERNAL.withDescription("Could not evaluate SecurityPolicy").withCause(t), true);
}

/**
* Updates in-use stream count and notifies listener only on transitions between 0 and >0, without
* acquiring the transport lock.
*/
private void updateInUseStreamsCountIfNeeded(boolean countsForInUse, int delta) {
Preconditions.checkArgument(delta == -1 || delta == 1, "stream count delta must be -1 or +1");
if (!countsForInUse) {
return;
}

if (delta > 0) {
numInUseStreams.incrementAndGet();
} else {
// Decrement with floor at 0
int prev = numInUseStreams.get();

while (true) {
int current = prev;
int newValue = current > 0 ? current - 1 : 0;
if (numInUseStreams.compareAndSet(current, newValue)) {
break;
}
prev = numInUseStreams.get();
}
}
reconcileInUseState();
}

/** Reconcile listenerInUse with the current stream count to avoid stale toggles under races. */
private void reconcileInUseState() {
boolean nowInUse = numInUseStreams.get() > 0;
boolean prev = listenerInUse.get();

if(prev != nowInUse && listenerInUse.compareAndSet(prev, nowInUse)) {
scheduleTransportInUseNotification(nowInUse);
}
}

private void scheduleTransportInUseNotification(final boolean inUse) {
getScheduledExecutorService()
.execute(
new Runnable() {
@Override
public void run() {
// Provide external synchronization as required by Listener contract,
// without taking the transport lock to avoid potential deadlocks.
synchronized (listenerNotifyLock) {
if (listenerInUse.get() == inUse) {
clientTransportListener.transportInUse(inUse);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your new listenerNotifyLock may ensure two threads don't call transportInUse() at the same time, but why is that sufficient? According to the Listener API contract, don't we actually need mutual exclusion across all Listener methods?

Copy link
Contributor Author

@becomeStar becomeStar Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdcormie

Thanks for the earlier feedback.
I updated the PR to ensure mutual exclusion across all Listener callbacks under a single lock and added reconciliation logic for the in-use counter.
Could you please take another look when you have time?

I also rebased the branch to resolve the recent conflicts.

}
}
}
});
}

private void scheduleFilterTransportAndReady(final Attributes attrsSnapshot) {
getScheduledExecutorService()
.execute(
new Runnable() {
@Override
public void run() {
final Attributes filtered;
synchronized (listenerNotifyLock) {
filtered = clientTransportListener.filterTransport(attrsSnapshot);
}

synchronized (BinderClientTransport.class) {
attributes = filtered;
}

scheduleOnListener(
new Runnable() {
@Override
public void run() {
clientTransportListener.transportReady();
}
});
}
});
}

private void scheduleOnListener(final Runnable task) {
getScheduledExecutorService()
.execute(
new Runnable() {
@Override
public void run() {
synchronized (listenerNotifyLock) {
task.run();
}
}
});
}

@GuardedBy("this")
@Override
protected void handlePingResponse(Parcel parcel) {
Expand Down