Skip to content

Commit

Permalink
Adds lock to RelpClientImpl (#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrongestNumber9 authored Sep 25, 2024
1 parent db1fae0 commit a55fedf
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions src/main/java/com/teragrep/rlp_03/client/RelpClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Simple client with asynchronous transmit and {@link java.util.concurrent.Future} based receive.
Expand All @@ -63,12 +65,14 @@ public final class RelpClientImpl implements RelpClient {
private final TransactionService transactionService;
private final AtomicInteger txnCounter;
private final FragmentFactory fragmentFactory;
private final Lock lock;

RelpClientImpl(EstablishedContext establishedContext, TransactionService transactionService) {
this.establishedContext = establishedContext;
this.transactionService = transactionService;
this.txnCounter = new AtomicInteger();
this.fragmentFactory = new FragmentFactory();
this.lock = new ReentrantLock();
}

/**
Expand All @@ -79,19 +83,25 @@ public final class RelpClientImpl implements RelpClient {
*/
@Override
public CompletableFuture<RelpFrame> transmit(RelpFrame relpFrame) {
int txnInt = txnCounter.incrementAndGet();
Fragment txn = fragmentFactory.create(txnInt);
lock.lock();
CompletableFuture<RelpFrame> future;
try {
int txnInt = txnCounter.incrementAndGet();
Fragment txn = fragmentFactory.create(txnInt);
RelpFrame relpFrameToXmit = new RelpFrameImpl(
txn,
relpFrame.command(),
relpFrame.payloadLength(),
relpFrame.payload(),
relpFrame.endOfTransfer()
);
future = transactionService.create(relpFrameToXmit);

RelpFrame relpFrameToXmit = new RelpFrameImpl(
txn,
relpFrame.command(),
relpFrame.payloadLength(),
relpFrame.payload(),
relpFrame.endOfTransfer()
);
CompletableFuture<RelpFrame> future = transactionService.create(relpFrameToXmit);

establishedContext.egress().accept(relpFrameToXmit.toWriteable());
establishedContext.egress().accept(relpFrameToXmit.toWriteable());
}
finally {
lock.unlock();
}
return future;
}

Expand All @@ -100,8 +110,14 @@ public CompletableFuture<RelpFrame> transmit(RelpFrame relpFrame) {
*/
@Override
public void close() {
transactionService.close();
establishedContext.close();
lock.lock();
try {
transactionService.close();
establishedContext.close();
}
finally {
lock.unlock();
}
}

@Override
Expand Down

0 comments on commit a55fedf

Please sign in to comment.