Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client]: add --streaming option to topics partitioned-stats-internal #23380

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin;

import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -1206,6 +1207,26 @@ default CompletableFuture<TopicStats> getStatsAsync(String topic) {
*/
PersistentTopicInternalStats getInternalStats(String topic, boolean metadata) throws PulsarAdminException;


/**
* Get the internal stats for the topic in a streaming fashion.
* <p/>
*
* @param topic
* topic name
* @param metadata
* flag to include ledger metadata
* @return the topic statistics
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Topic does not exist
* @throws PulsarAdminException
* Unexpected error
*/
InputStream streamInternalStats(String topic, boolean metadata) throws PulsarAdminException;

/**
* Get the internal stats for the topic.
* <p/>
Expand Down Expand Up @@ -1235,6 +1256,17 @@ default CompletableFuture<TopicStats> getStatsAsync(String topic) {
*/
CompletableFuture<PersistentTopicInternalStats> getInternalStatsAsync(String topic, boolean metadata);

/**
* Get the internal stats for the topic asynchronously in a streaming fashion.
*
* @param topic
* topic Name
* @param metadata
* flag to include ledger metadata
* @return a future that can be used to track when the internal topic statistics are returned
*/
CompletableFuture<InputStream> streamInternalStatsAsync(String topic, boolean metadata);

/**
* Get the internal stats for the topic asynchronously.
*
Expand Down Expand Up @@ -1405,6 +1437,27 @@ PartitionedTopicInternalStats getPartitionedInternalStats(String topic)
*/
CompletableFuture<PartitionedTopicInternalStats> getPartitionedInternalStatsAsync(String topic);


/**
* Get the stats for the partitioned topic in a streaming fashion.
*
* @param topic
* topic name
* @return
* @throws PulsarAdminException
*/
InputStream streamPartitionedInternalStats(String topic)
throws PulsarAdminException;

/**
* Get the stats-internal for the partitioned topic asynchronously in a streaming fashion.
*
* @param topic
* topic Name
* @return a future that can be used to track when the partitioned topic statistics are returned
*/
CompletableFuture<InputStream> streamPartitionedInternalStatsAsync(String topic);

/**
* Delete a subscription.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,19 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStatsAsync(Str
return asyncGetRequest(path, new FutureCallback<PersistentTopicInternalStats>(){});
}

@Override
public InputStream streamInternalStats(String topic, boolean metadata) throws PulsarAdminException {
return sync(() -> streamInternalStatsAsync(topic, metadata));
}

@Override
public CompletableFuture<InputStream> streamInternalStatsAsync(String topic, boolean metadata) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "internalStats");
path = path.queryParam("metadata", metadata);
return asyncGetRequest(path, new FutureCallback<InputStream>() {});
}

@Override
public String getInternalInfo(String topic) throws PulsarAdminException {
return sync(() -> getInternalInfoAsync(topic));
Expand Down Expand Up @@ -776,6 +789,18 @@ public CompletableFuture<PartitionedTopicInternalStats> getPartitionedInternalSt
return asyncGetRequest(path, new FutureCallback<PartitionedTopicInternalStats>(){});
}

@Override
public InputStream streamPartitionedInternalStats(String topic) throws PulsarAdminException {
return sync(() -> streamPartitionedInternalStatsAsync(topic));
}

@Override
public CompletableFuture<InputStream> streamPartitionedInternalStatsAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitioned-internalStats");
return asyncGetRequest(path, new FutureCallback<InputStream>(){});
}

@Override
public void deleteSubscription(String topic, String subName) throws PulsarAdminException {
sync(() -> deleteSubscriptionAsync(topic, subName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.URL;
Expand Down Expand Up @@ -1560,6 +1563,11 @@ public void topics() throws Exception {
Lookup mockLookup = mock(Lookup.class);
when(admin.lookups()).thenReturn(mockLookup);

String statsString = "{}";
InputStream stream = new ByteArrayInputStream(statsString.getBytes(StandardCharsets.UTF_8));
when(mockTopics.streamInternalStats("persistent://myprop/clust/ns1/ds1", false)).thenReturn(stream);
when(mockTopics.streamPartitionedInternalStats("persistent://myprop/clust/ns1/ds1")).thenReturn(stream);

CmdTopics cmdTopics = new CmdTopics(() -> admin);

cmdTopics.run(split("truncate persistent://myprop/clust/ns1/ds1"));
Expand Down Expand Up @@ -1607,6 +1615,9 @@ public void topics() throws Exception {
cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInternalStats("persistent://myprop/clust/ns1/ds1", false);

cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1 --streaming"));
verify(mockTopics).streamInternalStats("persistent://myprop/clust/ns1/ds1", false);

cmdTopics.run(split("get-backlog-quotas persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getBacklogQuotaMap("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 10 -p producer_request_hold"));
Expand Down Expand Up @@ -1655,6 +1666,9 @@ public void topics() throws Exception {
cmdTopics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1 --streaming"));
verify(mockTopics).streamPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("clear-backlog persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -747,11 +748,24 @@ private class GetInternalStats extends CliCommand {
@Option(names = { "-m",
"--metadata" }, description = "Flag to include ledger metadata")
private boolean metadata = false;
@Option(names = "--streaming", description = "Streaming the output directly to the standard output without "
+ "parsing the response in memory.")
private boolean streaming = false;

@Override
void run() throws PulsarAdminException {
void run() throws PulsarAdminException, IOException {
String topic = validateTopicName(topicName);
print(getTopics().getInternalStats(topic, metadata));
if (streaming) {
try (InputStream in = getTopics().streamInternalStats(topic, metadata);) {
int size;
byte[] buffer = new byte[2048];
while ((size = in.read(buffer)) != -1) {
System.out.write(buffer, 0, size);
}
}
} else {
print(getTopics().getInternalStats(topic, metadata));
}
}
}

Expand Down Expand Up @@ -811,11 +825,25 @@ void run() throws Exception {
private class GetPartitionedStatsInternal extends CliCommand {
@Parameters(description = "persistent://tenant/namespace/topic", arity = "1")
private String topicName;
@Option(names = "--streaming", description = "Streaming the output directly to the standard output without "
+ "parsing the response in memory.")
private boolean streaming = false;

@Override
void run() throws Exception {
String topic = validateTopicName(topicName);
print(getTopics().getPartitionedInternalStats(topic));
if (streaming) {
try (InputStream in = getTopics().streamPartitionedInternalStats(topic)) {
int size;
byte[] buffer = new byte[2048];
while ((size = in.read(buffer)) != -1) {
System.out.write(buffer, 0, size);
}
}
} else {
print(getTopics().getPartitionedInternalStats(topic));
}

}
}

Expand Down
Loading