Skip to content

Commit a4d8a3e

Browse files
committed
feat(executor/nats): Add ACK option to Jetstream
Signed-off-by: LuBashQ <[email protected]>
1 parent e24c5df commit a4d8a3e

File tree

1 file changed

+14
-0
lines changed

1 file changed

+14
-0
lines changed

executors/nats/nats.go

+14
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type JetstreamOptions struct {
3838
Consumer string `json:"consumer,omitempty" yaml:"consumer,omitempty"` // If set search for a durable consumer, otherwise use an ephemeral one
3939
FilterSubjects []string `json:"filterSubjects,omitempty" yaml:"filterSubjects,omitempty"`
4040
DeliveryPolicy string `json:"delivery_policy,omitempty" yaml:"deliveryPolicy,omitempty"` // Must be last, new or all. Other values will default to jetstream.DeliverLastPolicy
41+
AckPolicy string `json:"ack_policy,omitempty" yaml:"ackPolicy,omitempty"` // Must be all, explicit or none. Other values will default to jetstream.AckNonePolicy
4142
}
4243

4344
func (js JetstreamOptions) deliveryPolicy() jetstream.DeliverPolicy {
@@ -53,6 +54,19 @@ func (js JetstreamOptions) deliveryPolicy() jetstream.DeliverPolicy {
5354
}
5455
}
5556

57+
func (js JetstreamOptions) ackPolicy() jetstream.AckPolicy {
58+
switch js.DeliveryPolicy {
59+
case "none":
60+
return jetstream.AckNonePolicy
61+
case "all":
62+
return jetstream.AckAllPolicy
63+
case "explicit":
64+
return jetstream.AckExplicitPolicy
65+
default:
66+
return jetstream.AckNonePolicy
67+
}
68+
}
69+
5670
type Executor struct {
5771
Command string `json:"command,omitempty" yaml:"command,omitempty"` // Must be `publish` or `subscribe`
5872
Url string `json:"url,omitempty" yaml:"url,omitempty"`

0 commit comments

Comments
 (0)