Skip to content

Commit e24c5df

Browse files
committed
feat(executor/nats): Add delivery option to Jetstream
Signed-off-by: LuBashQ <[email protected]>
1 parent 56afb19 commit e24c5df

File tree

1 file changed

+15
-0
lines changed

1 file changed

+15
-0
lines changed

executors/nats/nats.go

+15
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ type JetstreamOptions struct {
3737
Stream string `json:"stream,omitempty" yaml:"stream,omitempty"` // Stream must exist before the command execution
3838
Consumer string `json:"consumer,omitempty" yaml:"consumer,omitempty"` // If set search for a durable consumer, otherwise use an ephemeral one
3939
FilterSubjects []string `json:"filterSubjects,omitempty" yaml:"filterSubjects,omitempty"`
40+
DeliveryPolicy string `json:"delivery_policy,omitempty" yaml:"deliveryPolicy,omitempty"` // Must be last, new or all. Other values will default to jetstream.DeliverLastPolicy
41+
}
42+
43+
func (js JetstreamOptions) deliveryPolicy() jetstream.DeliverPolicy {
44+
switch js.DeliveryPolicy {
45+
case "last":
46+
return jetstream.DeliverLastPolicy
47+
case "new":
48+
return jetstream.DeliverNewPolicy
49+
case "all":
50+
return jetstream.DeliverAllPolicy
51+
default:
52+
return jetstream.DeliverAllPolicy
53+
}
4054
}
4155

4256
type Executor struct {
@@ -365,6 +379,7 @@ func (e Executor) getConsumer(ctx context.Context, session *nats.Conn) (jetstrea
365379
consumer, consErr = stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
366380
FilterSubjects: e.Jetstream.FilterSubjects,
367381
AckPolicy: jetstream.AckAllPolicy,
382+
DeliverPolicy: e.Jetstream.deliveryPolicy(),
368383
})
369384
if consErr != nil {
370385
return nil, err

0 commit comments

Comments
 (0)