Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 11 additions & 41 deletions src/main/java/io/kestra/plugin/aws/sns/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Data;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
Expand Down Expand Up @@ -95,8 +97,7 @@
)
}
)
public class Publish extends AbstractSns implements RunnableTask<Publish.Output> {
@PluginProperty(dynamic = true)
public class Publish extends AbstractSns implements RunnableTask<Publish.Output>,Data.From {
@NotNull
@Schema(
title = "The source of the published data.",
Expand All @@ -109,37 +110,14 @@ public class Publish extends AbstractSns implements RunnableTask<Publish.Output>
public Publish.Output run(RunContext runContext) throws Exception {
var topicArn = runContext.render(getTopicArn()).as(String.class).orElseThrow();
try (var snsClient = this.client(runContext)) {
Integer count;
Flux<Message> flowable;
Flux<Integer> resultFlowable;

if (this.from instanceof String) {
URI from = new URI(runContext.render((String) this.from));
if (!from.getScheme().equals("kestra")) {
throw new Exception("Invalid 'from' parameter, must be a Kestra internal storage URI");
}

try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
flowable = FileSerde.readAll(inputStream, Message.class);
resultFlowable = this.buildFlowable(flowable, snsClient, topicArn, runContext);

count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
}

} else if (this.from instanceof List) {
flowable = Flux
.fromIterable((List<?>) this.from)
.map(map -> JacksonMapper.toMap(map, Message.class));

resultFlowable = this.buildFlowable(flowable, snsClient, topicArn, runContext);

count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
} else {
var msg = JacksonMapper.toMap(this.from, Message.class);
snsClient.publish(msg.to(PublishRequest.builder().topicArn(topicArn), runContext));

count = 1;
}
Integer count = Data.from(from).read(runContext, Message.class)
.map(throwFunction(message -> {
snsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext));
return 1;
}))
.reduce(Integer::sum)
.blockOptional()
.orElse(0);

// metrics
runContext.metric(Counter.of("sns.publish.messages", count, "topic", topicArn));
Expand All @@ -150,14 +128,6 @@ public Publish.Output run(RunContext runContext) throws Exception {
}
}

private Flux<Integer> buildFlowable(Flux<Message> flowable, SnsClient snsClient, String topicArn, RunContext runContext) throws IllegalVariableEvaluationException {
return flowable
.map(throwFunction(message -> {
snsClient.publish(message.to(PublishRequest.builder().topicArn(topicArn), runContext));
return 1;
}));
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
Expand Down
54 changes: 12 additions & 42 deletions src/main/java/io/kestra/plugin/aws/sqs/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Data;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
Expand Down Expand Up @@ -92,8 +94,7 @@
)
}
)
public class Publish extends AbstractSqs implements RunnableTask<Publish.Output> {
@PluginProperty(dynamic = true)
public class Publish extends AbstractSqs implements RunnableTask<Publish.Output>,Data.From {
@NotNull
@Schema(
title = "The source of the published data.",
Expand All @@ -107,39 +108,15 @@ public class Publish extends AbstractSqs implements RunnableTask<Publish.Output>
public Output run(RunContext runContext) throws Exception {
var queueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow();
try (var sqsClient = this.client(runContext)) {
Integer count;
Flux<Message> flowable;
Flux<Integer> resultFlowable;

if (this.from instanceof String) {
URI from = new URI(runContext.render((String) this.from));
if (!from.getScheme().equals("kestra")) {
throw new Exception("Invalid from parameter, must be a Kestra internal storage URI");
}


try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
flowable = FileSerde.readAll(inputStream, Message.class);
resultFlowable = this.buildFlowable(flowable, sqsClient, queueUrl, runContext);

count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
}

} else if (this.from instanceof List) {
flowable = Flux
.fromIterable((List<?>) this.from)
.map(map -> JacksonMapper.toMap(map, Message.class));

resultFlowable = this.buildFlowable(flowable, sqsClient, queueUrl, runContext);

count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
} else {
var msg = JacksonMapper.toMap(this.from, Message.class);
sqsClient.sendMessage(msg.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext));

count = 1;
}

Integer count = Data.from(from).read(runContext)
.map(throwFunction(row -> {
sqsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext));
return 1;
}))
.reduce(Integer::sum)
.blockOptional()
.orElse(0);

// metrics
runContext.metric(Counter.of("sqs.publish.messages", count, "queue", queueUrl));

Expand All @@ -149,13 +126,6 @@ public Output run(RunContext runContext) throws Exception {
}
}

private Flux<Integer> buildFlowable(Flux<Message> flowable, SqsClient sqsClient, String queueUrl, RunContext runContext) throws IllegalVariableEvaluationException {
return flowable
.map(throwFunction(message -> {
sqsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext));
return 1;
}));
}

@Builder
@Getter
Expand Down
Loading