From f85891af85ce546f0ac84d9a495f023c60cc55ea Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Mon, 27 Oct 2025 20:08:12 +0530 Subject: [PATCH 01/15] Used the new 'ubiquitous' from in sqs/Publish.java --- .../io/kestra/plugin/aws/sqs/Publish.java | 54 +++++-------------- 1 file changed, 12 insertions(+), 42 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java index 5e610279..e9f295e2 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java @@ -6,6 +6,8 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Data; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; @@ -92,8 +94,7 @@ ) } ) -public class Publish extends AbstractSqs implements RunnableTask { - @PluginProperty(dynamic = true) +public class Publish extends AbstractSqs implements RunnableTask,Data.From { @NotNull @Schema( title = "The source of the published data.", @@ -107,39 +108,15 @@ public class Publish extends AbstractSqs implements RunnableTask public Output run(RunContext runContext) throws Exception { var queueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow(); try (var sqsClient = this.client(runContext)) { - Integer count; - Flux flowable; - Flux resultFlowable; - - if (this.from instanceof String) { - URI from = new URI(runContext.render((String) this.from)); - if (!from.getScheme().equals("kestra")) { - throw new Exception("Invalid from parameter, must be a Kestra internal storage URI"); - } - - - try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) { - flowable = FileSerde.readAll(inputStream, Message.class); - resultFlowable = this.buildFlowable(flowable, sqsClient, queueUrl, runContext); - - count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0); - } - - } else if (this.from instanceof List) { - flowable = Flux - .fromIterable((List) this.from) - .map(map -> JacksonMapper.toMap(map, Message.class)); - - resultFlowable = this.buildFlowable(flowable, sqsClient, queueUrl, runContext); - - count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0); - } else { - var msg = JacksonMapper.toMap(this.from, Message.class); - sqsClient.sendMessage(msg.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext)); - - count = 1; - } - + Integer count = Data.from(from).read(runContext) + .map(throwFunction(row -> { + sqsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext)); + return 1; + })) + .reduce(Integer::sum) + .blockOptional() + .orElse(0); + // metrics runContext.metric(Counter.of("sqs.publish.messages", count, "queue", queueUrl)); @@ -149,13 +126,6 @@ public Output run(RunContext runContext) throws Exception { } } - private Flux buildFlowable(Flux flowable, SqsClient sqsClient, String queueUrl, RunContext runContext) throws IllegalVariableEvaluationException { - return flowable - .map(throwFunction(message -> { - sqsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext)); - return 1; - })); - } @Builder @Getter From 315f05956182925bb9b0a3a1d5e7abbe4e71cd66 Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Mon, 27 Oct 2025 20:18:57 +0530 Subject: [PATCH 02/15] Used the new 'ubiquitous' from in sns/Publish.java --- .../io/kestra/plugin/aws/sns/Publish.java | 52 ++++--------------- 1 file changed, 11 insertions(+), 41 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/sns/Publish.java b/src/main/java/io/kestra/plugin/aws/sns/Publish.java index 1100c9fe..ca53e94c 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sns/Publish.java @@ -6,6 +6,8 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Data; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; @@ -95,8 +97,7 @@ ) } ) -public class Publish extends AbstractSns implements RunnableTask { - @PluginProperty(dynamic = true) +public class Publish extends AbstractSns implements RunnableTask,Data.From { @NotNull @Schema( title = "The source of the published data.", @@ -109,37 +110,14 @@ public class Publish extends AbstractSns implements RunnableTask public Publish.Output run(RunContext runContext) throws Exception { var topicArn = runContext.render(getTopicArn()).as(String.class).orElseThrow(); try (var snsClient = this.client(runContext)) { - Integer count; - Flux flowable; - Flux resultFlowable; - - if (this.from instanceof String) { - URI from = new URI(runContext.render((String) this.from)); - if (!from.getScheme().equals("kestra")) { - throw new Exception("Invalid 'from' parameter, must be a Kestra internal storage URI"); - } - - try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) { - flowable = FileSerde.readAll(inputStream, Message.class); - resultFlowable = this.buildFlowable(flowable, snsClient, topicArn, runContext); - - count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0); - } - - } else if (this.from instanceof List) { - flowable = Flux - .fromIterable((List) this.from) - .map(map -> JacksonMapper.toMap(map, Message.class)); - - resultFlowable = this.buildFlowable(flowable, snsClient, topicArn, runContext); - - count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0); - } else { - var msg = JacksonMapper.toMap(this.from, Message.class); - snsClient.publish(msg.to(PublishRequest.builder().topicArn(topicArn), runContext)); - - count = 1; - } + Integer count = Data.from(from).read(runContext, Message.class) + .map(throwFunction(message -> { + snsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext)); + return 1; + })) + .reduce(Integer::sum) + .blockOptional() + .orElse(0); // metrics runContext.metric(Counter.of("sns.publish.messages", count, "topic", topicArn)); @@ -150,14 +128,6 @@ public Publish.Output run(RunContext runContext) throws Exception { } } - private Flux buildFlowable(Flux flowable, SnsClient snsClient, String topicArn, RunContext runContext) throws IllegalVariableEvaluationException { - return flowable - .map(throwFunction(message -> { - snsClient.publish(message.to(PublishRequest.builder().topicArn(topicArn), runContext)); - return 1; - })); - } - @Builder @Getter public static class Output implements io.kestra.core.models.tasks.Output { From 6e68478af61e9e891ec314741a59034b7d2cbec0 Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 13:11:59 +0530 Subject: [PATCH 03/15] Refactor SNS message sending to use publish method --- src/main/java/io/kestra/plugin/aws/sns/Publish.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/sns/Publish.java b/src/main/java/io/kestra/plugin/aws/sns/Publish.java index ca53e94c..50a0b776 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sns/Publish.java @@ -110,9 +110,9 @@ public class Publish extends AbstractSns implements RunnableTask public Publish.Output run(RunContext runContext) throws Exception { var topicArn = runContext.render(getTopicArn()).as(String.class).orElseThrow(); try (var snsClient = this.client(runContext)) { - Integer count = Data.from(from).read(runContext, Message.class) + Integer count = Data.from(from).read(runContext) .map(throwFunction(message -> { - snsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext)); + snsClient.publish(PublishRequest.builder().topicArn(topicArn).message(message.getData()).build()); return 1; })) .reduce(Integer::sum) From a62daa875fdc8d546d9050bd47a0d5c38671c8f6 Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 14:56:26 +0530 Subject: [PATCH 04/15] Refactor message mapping in sqs/Publish class --- src/main/java/io/kestra/plugin/aws/sqs/Publish.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java index e9f295e2..595bd6c5 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java @@ -109,7 +109,7 @@ public Output run(RunContext runContext) throws Exception { var queueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow(); try (var sqsClient = this.client(runContext)) { Integer count = Data.from(from).read(runContext) - .map(throwFunction(row -> { + .map(throwFunction(message -> { sqsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext)); return 1; })) From 8c807abf8c8310fc7fa2ff10a97ea2837007640f Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 15:33:58 +0530 Subject: [PATCH 05/15] Refactor sendMessage call for sqs/Publish.java --- src/main/java/io/kestra/plugin/aws/sqs/Publish.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java index 595bd6c5..f8ce5064 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java @@ -110,7 +110,10 @@ public Output run(RunContext runContext) throws Exception { try (var sqsClient = this.client(runContext)) { Integer count = Data.from(from).read(runContext) .map(throwFunction(message -> { - sqsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(queueUrl), runContext)); + sqsClient.sendMessage(SendMessageRequest.builder() + .queueUrl(queueUrl) + .messageBody(message.get("data").toString()) + .build()); return 1; })) .reduce(Integer::sum) From 4db2b5fc5e5de979a0f5e50b0b58f5b757c0591b Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 15:35:58 +0530 Subject: [PATCH 06/15] Updated sns/Publish.java --- src/main/java/io/kestra/plugin/aws/sns/Publish.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/kestra/plugin/aws/sns/Publish.java b/src/main/java/io/kestra/plugin/aws/sns/Publish.java index 50a0b776..00618c69 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sns/Publish.java @@ -112,7 +112,10 @@ public Publish.Output run(RunContext runContext) throws Exception { try (var snsClient = this.client(runContext)) { Integer count = Data.from(from).read(runContext) .map(throwFunction(message -> { - snsClient.publish(PublishRequest.builder().topicArn(topicArn).message(message.getData()).build()); + snsClient.publish(PublishRequest.builder() + .topicArn(topicArn) + .message(message.getData()) + .build()); return 1; })) .reduce(Integer::sum) From 62e06c23ac08d18b669cfe3ee6bfa345b69558f2 Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 15:59:19 +0530 Subject: [PATCH 07/15] Update message retrieval in SNS publish method --- src/main/java/io/kestra/plugin/aws/sns/Publish.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/kestra/plugin/aws/sns/Publish.java b/src/main/java/io/kestra/plugin/aws/sns/Publish.java index 00618c69..f7494f41 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sns/Publish.java @@ -114,7 +114,7 @@ public Publish.Output run(RunContext runContext) throws Exception { .map(throwFunction(message -> { snsClient.publish(PublishRequest.builder() .topicArn(topicArn) - .message(message.getData()) + .message((String) message.get("data")) .build()); return 1; })) From 1fcaf6c84a0773cac1fc5cf71e78848152c08950 Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 17:21:01 +0530 Subject: [PATCH 08/15] Improved message parsing in sns/Publish.java --- .../io/kestra/plugin/aws/sns/Publish.java | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/sns/Publish.java b/src/main/java/io/kestra/plugin/aws/sns/Publish.java index f7494f41..5efe7cbe 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sns/Publish.java @@ -26,6 +26,7 @@ import java.net.URI; import java.util.List; import jakarta.validation.constraints.NotNull; +import java.util.Map; import static io.kestra.core.utils.Rethrow.throwFunction; @@ -111,12 +112,27 @@ public Publish.Output run(RunContext runContext) throws Exception { var topicArn = runContext.render(getTopicArn()).as(String.class).orElseThrow(); try (var snsClient = this.client(runContext)) { Integer count = Data.from(from).read(runContext) - .map(throwFunction(message -> { - snsClient.publish(PublishRequest.builder() - .topicArn(topicArn) - .message((String) message.get("data")) - .build()); - return 1; + .map(throwFunction(raw -> { + Message message; + + if (raw instanceof Message m) { + message = m; + } else if (raw instanceof Map map) { + message = JacksonMapper.ofJson().convertValue(map, Message.class); + } else if (raw instanceof String str) { + message = JacksonMapper.ofJson().readValue(str, Message.class); + } else { + throw new IllegalArgumentException("Unsupported message type: " + raw.getClass()); + } + + snsClient.publish(PublishRequest.builder() + .topicArn(topicArn) + .message(message.getData()) + .subject(message.getSubject()) + .build() + ); + + return 1; })) .reduce(Integer::sum) .blockOptional() From ca4eebb99d493cbfd223c9d050c1b4a630e1d032 Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 17:25:14 +0530 Subject: [PATCH 09/15] Refactor message handling in sqs/Publish class --- .../io/kestra/plugin/aws/sqs/Publish.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java index f8ce5064..b4198bbe 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java @@ -109,11 +109,28 @@ public Output run(RunContext runContext) throws Exception { var queueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow(); try (var sqsClient = this.client(runContext)) { Integer count = Data.from(from).read(runContext) - .map(throwFunction(message -> { - sqsClient.sendMessage(SendMessageRequest.builder() + .map(throwFunction(raw -> { + Message message; + + if (raw instanceof Message m) { + message = m; + } else if (raw instanceof Map map) { + message = JacksonMapper.ofJson().convertValue(map, Message.class); + } else if (raw instanceof String str) { + message = JacksonMapper.ofJson().readValue(str, Message.class); + } else { + throw new IllegalArgumentException("Unsupported message type: " + raw.getClass()); + } + + var builder = SendMessageRequest.builder() .queueUrl(queueUrl) - .messageBody(message.get("data").toString()) - .build()); + .messageBody(message.getData()); + + if (message.getDelaySeconds() != null) { + builder.delaySeconds(message.getDelaySeconds()); + } + + sqsClient.sendMessage(builder.build()); return 1; })) .reduce(Integer::sum) From 52be47fdb64cf83deaf30fc77db7901490eee3b4 Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 17:33:06 +0530 Subject: [PATCH 10/15] Implement error handling for message parsing in sqs/Publish.java Add error handling for JSON parsing in Publish class. --- src/main/java/io/kestra/plugin/aws/sqs/Publish.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java index b4198bbe..a5b49021 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java @@ -117,7 +117,13 @@ public Output run(RunContext runContext) throws Exception { } else if (raw instanceof Map map) { message = JacksonMapper.ofJson().convertValue(map, Message.class); } else if (raw instanceof String str) { - message = JacksonMapper.ofJson().readValue(str, Message.class); + try { + message = JacksonMapper.ofJson().readValue(str, Message.class); + } catch (Exception e) { + message = Message.builder() + .data(str) + .build(); + } } else { throw new IllegalArgumentException("Unsupported message type: " + raw.getClass()); } From a1394ea9fcee247f338f3edcde22256526c246e8 Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 17:34:23 +0530 Subject: [PATCH 11/15] Implement fallback for message parsing errors in sns/Publish.java Add error handling for JSON parsing in Publish.java --- src/main/java/io/kestra/plugin/aws/sns/Publish.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/kestra/plugin/aws/sns/Publish.java b/src/main/java/io/kestra/plugin/aws/sns/Publish.java index 5efe7cbe..d810b43b 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sns/Publish.java @@ -120,7 +120,13 @@ public Publish.Output run(RunContext runContext) throws Exception { } else if (raw instanceof Map map) { message = JacksonMapper.ofJson().convertValue(map, Message.class); } else if (raw instanceof String str) { - message = JacksonMapper.ofJson().readValue(str, Message.class); + try { + message = JacksonMapper.ofJson().readValue(str, Message.class); + } catch (Exception e) { + message = Message.builder() + .data(str) + .build(); + } } else { throw new IllegalArgumentException("Unsupported message type: " + raw.getClass()); } From 9f41d2bfcef0d63050a96999568d56302534b3de Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 17:40:47 +0530 Subject: [PATCH 12/15] Refactor message handling in sqs/Publish.java --- .../io/kestra/plugin/aws/sqs/Publish.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java index a5b49021..605a18d5 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java @@ -112,18 +112,19 @@ public Output run(RunContext runContext) throws Exception { .map(throwFunction(raw -> { Message message; - if (raw instanceof Message m) { - message = m; - } else if (raw instanceof Map map) { - message = JacksonMapper.ofJson().convertValue(map, Message.class); - } else if (raw instanceof String str) { - try { - message = JacksonMapper.ofJson().readValue(str, Message.class); - } catch (Exception e) { - message = Message.builder() - .data(str) - .build(); - } + if (raw instanceof Message) { + message = (Message) raw; + } else if (raw instanceof Map) { + message = JacksonMapper.ofJson().convertValue(raw, Message.class); + } else if (raw instanceof String) { + String str = (String) raw; + try { + message = JacksonMapper.ofJson().readValue(str, Message.class); + } catch (Exception e) { + message = Message.builder() + .data(str) + .build(); + } } else { throw new IllegalArgumentException("Unsupported message type: " + raw.getClass()); } From 70ad5f8308990601695f96666a35a32b9a00907a Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Tue, 28 Oct 2025 17:43:51 +0530 Subject: [PATCH 13/15] Refactor message handling in sns/Publish.java --- .../io/kestra/plugin/aws/sns/Publish.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/sns/Publish.java b/src/main/java/io/kestra/plugin/aws/sns/Publish.java index d810b43b..8aaafa73 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sns/Publish.java @@ -115,22 +115,22 @@ public Publish.Output run(RunContext runContext) throws Exception { .map(throwFunction(raw -> { Message message; - if (raw instanceof Message m) { - message = m; - } else if (raw instanceof Map map) { - message = JacksonMapper.ofJson().convertValue(map, Message.class); - } else if (raw instanceof String str) { + if (raw instanceof Message) { + message = (Message) raw; + } else if (raw instanceof Map) { + message = JacksonMapper.ofJson().convertValue(raw, Message.class); + } else if (raw instanceof String) { + String str = (String) raw; try { - message = JacksonMapper.ofJson().readValue(str, Message.class); - } catch (Exception e) { - message = Message.builder() - .data(str) - .build(); - } + message = JacksonMapper.ofJson().readValue(str, Message.class); + } catch (Exception e) { + message = Message.builder() + .data(str) + .build(); + } } else { throw new IllegalArgumentException("Unsupported message type: " + raw.getClass()); } - snsClient.publish(PublishRequest.builder() .topicArn(topicArn) .message(message.getData()) From 3fcc13d11fc19e278273364aaeffd38d3930bbec Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Sat, 1 Nov 2025 23:08:14 +0530 Subject: [PATCH 14/15] Refactor message handling for raw input types --- src/main/java/io/kestra/plugin/aws/sns/Publish.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/sns/Publish.java b/src/main/java/io/kestra/plugin/aws/sns/Publish.java index 8aaafa73..8de68292 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sns/Publish.java @@ -119,8 +119,8 @@ public Publish.Output run(RunContext runContext) throws Exception { message = (Message) raw; } else if (raw instanceof Map) { message = JacksonMapper.ofJson().convertValue(raw, Message.class); - } else if (raw instanceof String) { - String str = (String) raw; + } else if (raw instanceof String || raw instanceof Map) { + String str = raw.toString(); try { message = JacksonMapper.ofJson().readValue(str, Message.class); } catch (Exception e) { From 0c798eb44301753efae8f46dc6cf55d8e2ffcd50 Mon Sep 17 00:00:00 2001 From: Pavalasri Date: Sat, 1 Nov 2025 23:09:20 +0530 Subject: [PATCH 15/15] Refactor message handling for String and Map types --- src/main/java/io/kestra/plugin/aws/sqs/Publish.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java index 605a18d5..0cfdde49 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java @@ -116,8 +116,8 @@ public Output run(RunContext runContext) throws Exception { message = (Message) raw; } else if (raw instanceof Map) { message = JacksonMapper.ofJson().convertValue(raw, Message.class); - } else if (raw instanceof String) { - String str = (String) raw; + } else if (raw instanceof String || raw instanceof Map) { + String str = raw.toString(); try { message = JacksonMapper.ofJson().readValue(str, Message.class); } catch (Exception e) {