diff --git a/.gitignore b/.gitignore index 85eec240..f3d816df 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ tests/pki_variables.yml *.xml test_results*.html !venom_output.html +tests/nats/pki \ No newline at end of file diff --git a/executors/nats/README.md b/executors/nats/README.md new file mode 100644 index 00000000..45105cf1 --- /dev/null +++ b/executors/nats/README.md @@ -0,0 +1,122 @@ +# Venom - Executor NATS + +Step to publish and subscribe to NATS subjects. + +Currently two commands are supported: + +- `publish` +- `subscribe` + +## Input + +### Defaults + +This step includes some default values: + +- `url`: defaults to `nats://localhost:4222` +- `messageLimit`: defaults to 1 +- `deadline`: defaults to 1 second + +### Authentication + +This step allows for connection with and without TLS. Without TLS, the step does not require additional options. + +To connect to a NATS server with TLS, declare: + +```yaml +tls: + selfSigned: true + serverVerify: true + certificatePath: "/path/to/client_certificate" + keyPath: "/path/to/client_key" + caPath: ""/path/to/ca_certificate"" +``` + +Enable `selfSigned` only if the NATS server uses self-signed certificates. If enabled, `caPath` is mandatory. + +Enable `serverVerify` only if the NATS server verifies the client certificates. If enabled `certificatePath` and `keyPath` are mandatory. + +### publish command + +The `publish` command allows to publish a payload to a specific NATS subject. Optionally it can wait for a reply. + +Full configuration example: + +```yaml +- type: nats + url: "{{.url}}" # defaults to nats://localhost:4222 if not set + command: publish + subject: "{{.subject}}" # mandatory + payload: '{{.message}}' + headers: + customHeader: + - "some-value" + assertions: + - result.error ShouldBeEmpty +``` + +Full configuration with reply example: + +```yaml +- type: nats + url: "{{.url}}" # defaults to nats://localhost:4222 if not set + command: publish + request: true + subject: "{{.subject}}" # mandatory + replySubject: "{{.subject}}.reply" # mandatory if `request = true` + payload: '{{.message}}' + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 1 +``` + +It is possible to publish to a Jetstream stream by declaring `jetstream: true` in the step. + + For example: + +```yaml +- type: nats + command: publish + subject: "{{.subject}}.hello" # mandatory + deadline: 2 + jetstream: + enabled: true + assertions: + - result.error ShouldNotBeEmpty +``` + +### subscribe command + +The `subscribe` command allows to receive messages from a subject or a stream. + +Full configuration example: + +```yaml +- type: nats + command: subscribe + subject: "{{.subject}}.>" # mandatory + messageLimit: 2 # defaults to 1 + deadline: 10 # in seconds, defaults to 1 + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 2 +``` + +Full configuration example with Jetstream: + +```yaml +- type: nats + command: subscribe + subject: "{{.subject}}.>" # mandatory + messageLimit: 2 # defaults to 1 + deadline: 10 # in seconds, defaults to 1 + jetstream: + enabled: true + stream: TEST # mandatory, stream must exist + filterSubjects: + - "{{.subject}}.js.hello" + - "{{.subject}}.js.world" + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 2 +``` diff --git a/executors/nats/nats.go b/executors/nats/nats.go new file mode 100644 index 00000000..2fd566ac --- /dev/null +++ b/executors/nats/nats.go @@ -0,0 +1,457 @@ +package nats + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/ovh/venom" +) + +const Name = "nats" + +const ( + defaultUrl = "nats://localhost:4222" + defaultConnectTimeout = 5 * time.Second + defaultReconnectTime = 1 * time.Second + defaultClientName = "Venom" + defaultMessageLimit = 1 + defaultDeadline = 5 +) + +// TlsOptions describes TLS authentication options to the NATS server. +type TlsOptions struct { + SelfSigned bool `json:"self_signed,omitempty" yaml:"selfSigned"` // Set to true if the NATS server uses self-signed certificates. Ca certificate is required if enabled. + ServerVerify bool `json:"server_verify,omitempty" yaml:"serverVerify"` // Set to true if the NATS server verifies the client identity. Certificate and Key are required if enabled. + CertificatePath string `json:"certificate_path,omitempty" yaml:"certificatePath"` + KeyPath string `json:"key_path,omitempty" yaml:"keyPath"` + CaPath string `json:"ca_certificate_path,omitempty" yaml:"caPath"` +} + +type JetstreamOptions struct { + Stream string `json:"stream,omitempty" yaml:"stream,omitempty"` // Stream must exist before the command execution + Consumer string `json:"consumer,omitempty" yaml:"consumer,omitempty"` // If set search for a durable consumer, otherwise use an ephemeral one + FilterSubjects []string `json:"filterSubjects,omitempty" yaml:"filterSubjects,omitempty"` + DeliveryPolicy string `json:"delivery_policy,omitempty" yaml:"deliveryPolicy,omitempty"` // Must be last, new or all. Other values will default to jetstream.DeliverLastPolicy + AckPolicy string `json:"ack_policy,omitempty" yaml:"ackPolicy,omitempty"` // Must be all, explicit or none. Other values will default to jetstream.AckNonePolicy +} + +func (js JetstreamOptions) deliveryPolicy() jetstream.DeliverPolicy { + switch js.DeliveryPolicy { + case "last": + return jetstream.DeliverLastPolicy + case "new": + return jetstream.DeliverNewPolicy + case "all": + return jetstream.DeliverAllPolicy + default: + return jetstream.DeliverAllPolicy + } +} + +func (js JetstreamOptions) ackPolicy() jetstream.AckPolicy { + switch js.DeliveryPolicy { + case "none": + return jetstream.AckNonePolicy + case "all": + return jetstream.AckAllPolicy + case "explicit": + return jetstream.AckExplicitPolicy + default: + return jetstream.AckNonePolicy + } +} + +type Executor struct { + Command string `json:"command,omitempty" yaml:"command,omitempty"` // Must be `publish` or `subscribe` + Url string `json:"url,omitempty" yaml:"url,omitempty"` + Subject string `json:"subject,omitempty" yaml:"subject,omitempty"` + Payload string `json:"payload,omitempty" yaml:"payload,omitempty"` + Header map[string][]string `json:"header,omitempty" yaml:"header,omitempty"` + MessageLimit int `json:"message_limit,omitempty" yaml:"messageLimit,omitempty"` + Deadline int `json:"deadline,omitempty" yaml:"deadline,omitempty"` // Describes the deadline in seconds from the start of the command + ReplySubject string `json:"reply_subject,omitempty" yaml:"replySubject,omitempty"` + Request bool `json:"request,omitempty" yaml:"request,omitempty"` // Describe that the publish command expects a reply from the NATS server + Jetstream *JetstreamOptions `json:"jetstream,omitempty" yaml:"jetstream,omitempty"` + Tls *TlsOptions `json:"tls,omitempty" yaml:"tls"` +} + +// Message describes a NATS message received from a consumer or a request publisher +type Message struct { + Data string `json:"data,omitempty" yaml:"data,omitempty"` + DataJson map[string]interface{} `json:"datajson,omitempty" yaml:"dataJson,omitempty"` + Header map[string][]string `json:"header,omitempty" yaml:"header,omitempty"` + Subject string `json:"subject,omitempty" yaml:"subject,omitempty"` + ReplySubject string `json:"replysubject,omitempty" yaml:"replySubject,omitempty"` +} + +// Result describes a command result +type Result struct { + Messages []Message `json:"messages,omitempty" yaml:"messages,omitempty"` + Error string `json:"error,omitempty" yaml:"error,omitempty"` +} + +func tryDecodeToJSON(data []byte) (map[string]interface{}, error) { + if len(data) == 0 { + return nil, fmt.Errorf("empty data") + } + var result map[string]interface{} + err := json.Unmarshal(data, &result) + if err != nil { + return nil, err + } + return result, nil +} + +func (Executor) ZeroValueMessage() Message { + return Message{} +} + +func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, error) { + var e Executor + if err := mapstructure.Decode(step, &e); err != nil { + return nil, err + } + + session, err := e.session(ctx) + if err != nil { + return nil, err + } + defer session.Close() + + result := Result{} + + switch e.Command { + case "publish": + var cmdErr error + var reply Message + + if e.Jetstream != nil { + cmdErr = e.publishJetstream(ctx, session) + } else { + reply, cmdErr = e.publish(ctx, session) + } + + if cmdErr != nil { + result.Error = cmdErr.Error() + } else { + result.Messages = []Message{reply} + } + case "subscribe": + var msgs []Message + var cmdErr error + + if e.Jetstream != nil { + msgs, cmdErr = e.subscribeJetstream(ctx, session) + } else { + msgs, cmdErr = e.subscribe(ctx, session) + } + + if cmdErr != nil { + result.Error = cmdErr.Error() + } else { + result.Messages = msgs + } + } + + return result, nil +} + +func New() venom.Executor { + return &Executor{ + MessageLimit: defaultMessageLimit, + Deadline: defaultDeadline, + Url: defaultUrl, + } +} + +func (tls TlsOptions) getTlsOptions() ([]nats.Option, error) { + opts := make([]nats.Option, 1, 2) + + if tls.SelfSigned { + if len(tls.CaPath) == 0 { + return nil, fmt.Errorf("TLS CA certificate is required if NATS server uses self signed CA") + } + opts = append(opts, nats.RootCAs(tls.CaPath)) + } + + if tls.ServerVerify { + if len(tls.CertificatePath) == 0 { + return nil, fmt.Errorf("TLS certificate is required if NATS server verifies clients") + } + if len(tls.KeyPath) == 0 { + return nil, fmt.Errorf("TLS key is required if NATS server vertifies clients") + } + opts = append(opts, nats.ClientCert(tls.CertificatePath, tls.KeyPath)) + } + + return opts, nil +} + +func (e Executor) session(ctx context.Context) (*nats.Conn, error) { + opts := []nats.Option{ + nats.Timeout(defaultConnectTimeout), + nats.Name(defaultClientName), + nats.MaxReconnects(-1), + nats.ReconnectWait(defaultReconnectTime), + } + + if e.Tls != nil { + tlsOpts, err := e.Tls.getTlsOptions() + if err != nil { + return nil, err + } + opts = append(opts, tlsOpts...) + } + + venom.Debug(ctx, "Connecting to NATS server %q", e.Url) + + nc, err := nats.Connect(e.Url, opts...) + if err != nil { + return nil, err + } + + venom.Debug(ctx, "Connected to NATS server %q", nc.ConnectedAddr()) + + return nc, nil +} + +func (e Executor) publish(ctx context.Context, session *nats.Conn) (Message, error) { + if e.Subject == "" { + return e.ZeroValueMessage(), fmt.Errorf("subject is required") + } + + msg := nats.Msg{ + Subject: e.Subject, + Data: []byte(e.Payload), + Header: e.Header, + } + + var result Message + if e.Request { + if e.ReplySubject == "" { + return e.ZeroValueMessage(), fmt.Errorf("reply subject is required for request command") + } + msg.Reply = e.ReplySubject + + replyMsg, err := session.RequestMsg(&msg, time.Duration(5)*time.Second) + if err != nil { + return e.ZeroValueMessage(), err + } + + dataJson, err := tryDecodeToJSON(replyMsg.Data) + if err != nil { + venom.Debug(ctx, "data %s is not valid JSON ", replyMsg.Data) + } + + result = Message{ + Data: string(replyMsg.Data), + DataJson: dataJson, + Header: replyMsg.Header, + Subject: msg.Subject, + ReplySubject: replyMsg.Subject, + } + + venom.Debug(ctx, "Received reply message %+v", result) + } else { + err := session.PublishMsg(&msg) + if err != nil { + return e.ZeroValueMessage(), err + } + } + + venom.Debug(ctx, "Published message to subject %q with payload %q", e.Subject, e.Payload) + + return result, nil +} + +func (e Executor) publishJetstream(ctx context.Context, session *nats.Conn) error { + if e.Subject == "" { + return fmt.Errorf("subject is required") + } + + js, err := e.jetstreamSession(ctx, session) + if err != nil { + return err + } + + msg := nats.Msg{ + Subject: e.Subject, + Data: []byte(e.Payload), + Header: e.Header, + } + + ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Deadline)*time.Second) + defer cancel() + + _, err = js.PublishMsg(ctxWithTimeout, &msg) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("timeout reached while waiting for ACK from NATS server") + } + return err + } + + venom.Debug(ctx, "Published message to subject %q with payload %q", e.Subject, e.Payload) + + return nil +} + +func (e Executor) subscribe(ctx context.Context, session *nats.Conn) ([]Message, error) { + if e.Subject == "" { + return nil, fmt.Errorf("subject is required") + } + + venom.Debug(ctx, "Subscribing to subject %q", e.Subject) + + results := make([]Message, e.MessageLimit) + + ch := make(chan *nats.Msg) + msgCount := 0 + sub, err := session.ChanSubscribe(e.Subject, ch) + if err != nil { + return nil, err + } + + ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Deadline)*time.Second) + defer cancel() + + venom.Debug(ctx, "Subscribed to subject %q with timeout %v and max messages %d", e.Subject, e.Deadline, e.MessageLimit) + + for { + select { + case msg := <-ch: + msgData := msg.Data + venom.Debug(ctx, "Received message #%d from subject %q with data %q", msgCount, e.Subject, string(msgData)) + + dataJson, err := tryDecodeToJSON(msgData) + if err != nil { + venom.Debug(ctx, "data %s is not valid JSON ", msgData) + } + + results[msgCount] = Message{ + Data: string(msgData), + DataJson: dataJson, + Header: msg.Header, + Subject: msg.Subject, + } + + msgCount++ + + if msgCount >= e.MessageLimit { + err = sub.Unsubscribe() + if err != nil { + return nil, err + } + return results, nil + } + case <-ctxWithTimeout.Done(): + _ = sub.Unsubscribe() // even it if fails, we are done anyway + return nil, fmt.Errorf("timeout reached while waiting for message #%d from subject %q", msgCount, e.Subject) + } + } +} + +func (e Executor) jetstreamSession(ctx context.Context, session *nats.Conn) (jetstream.JetStream, error) { + js, err := jetstream.New(session) + if err != nil { + return nil, err + } + venom.Debug(ctx, "Jetstream session created") + return js, err +} + +func (e Executor) getConsumer(ctx context.Context, session *nats.Conn) (jetstream.Consumer, error) { + js, err := e.jetstreamSession(ctx, session) + if err != nil { + return nil, err + } + + stream, err := js.Stream(ctx, e.Jetstream.Stream) + if err != nil { + return nil, err + } + + streamName := stream.CachedInfo().Config.Name + + venom.Debug(ctx, "Found stream %q", streamName) + + var consumer jetstream.Consumer + var consErr error + if e.Jetstream.Consumer != "" { + consumer, consErr = stream.Consumer(ctx, e.Jetstream.Consumer) + if consErr != nil { + return nil, err + } + venom.Debug(ctx, "Found existing consumer %s[%s]", streamName, e.Jetstream.Consumer) + } else { + consumer, consErr = stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ + FilterSubjects: e.Jetstream.FilterSubjects, + AckPolicy: jetstream.AckAllPolicy, + DeliverPolicy: e.Jetstream.deliveryPolicy(), + }) + if consErr != nil { + return nil, err + } + venom.Warn(ctx, "Consumer %s[%s] not found. Created ephemeral consumer", streamName, e.Jetstream.Consumer) + } + + return consumer, nil +} + +func (e Executor) subscribeJetstream(ctx context.Context, session *nats.Conn) ([]Message, error) { + if e.Jetstream.Stream == "" { + return nil, fmt.Errorf("jetstream stream name is required") + } + + ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Deadline)*time.Second) + defer cancel() + + consumer, err := e.getConsumer(ctx, session) + if err != nil { + return nil, err + } + + results := make([]Message, e.MessageLimit) + msgCount := 0 + done := make(chan struct{}) + + cc, err := consumer.Consume(func(msg jetstream.Msg) { + msgData := msg.Data() + venom.Debug(ctx, "received message from %s[%s]: %+v", consumer.CachedInfo().Stream, msg.Subject(), string(msgData)) + + dataJson, err := tryDecodeToJSON(msgData) + if err != nil { + venom.Debug(ctx, "data %s is not valid JSON ", msgData) + } + + results[msgCount] = Message{ + Data: string(msgData), + DataJson: dataJson, + Header: msg.Headers(), + Subject: msg.Subject(), + ReplySubject: msg.Reply(), + } + msgCount++ + if msgCount == e.MessageLimit { + done <- struct{}{} + } + }, jetstream.PullMaxMessages(e.MessageLimit)) + + defer cc.Drain() + defer cc.Stop() + + for { + select { + case <-ctxWithTimeout.Done(): + return nil, fmt.Errorf("timeout reached while waiting for message #%d from subjects %v", msgCount, e.Jetstream.FilterSubjects) + case <-done: + return results, nil + } + } +} diff --git a/executors/registry.go b/executors/registry.go index 851b40b0..b799ff26 100644 --- a/executors/registry.go +++ b/executors/registry.go @@ -11,6 +11,7 @@ import ( "github.com/ovh/venom/executors/kafka" "github.com/ovh/venom/executors/mongo" "github.com/ovh/venom/executors/mqtt" + "github.com/ovh/venom/executors/nats" "github.com/ovh/venom/executors/ovhapi" "github.com/ovh/venom/executors/rabbitmq" "github.com/ovh/venom/executors/readfile" @@ -42,4 +43,5 @@ var Registry map[string]Constructor = map[string]Constructor{ ssh.Name: ssh.New, mongo.Name: mongo.New, web.Name: web.New, + nats.Name: nats.New, } diff --git a/go.mod b/go.mod index 4e37505a..846948a8 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,11 @@ require ( modernc.org/sqlite v1.26.0 ) +require ( + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect +) + require ( github.com/ClickHouse/ch-go v0.55.0 // indirect github.com/ClickHouse/clickhouse-go/v2 v2.9.1 // indirect @@ -84,11 +89,12 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kevinramage/venomWeb v0.0.0-20230530195848-87f9f752bcfb - github.com/klauspost/compress v1.17.1 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/montanaflynn/stats v0.7.1 // indirect github.com/mxk/go-imap v0.0.0-20150429134902-531c36c3f12d // indirect + github.com/nats-io/nats.go v1.37.0 github.com/paulmach/orb v0.9.0 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index dbe9a992..d87c6c5f 100644 --- a/go.sum +++ b/go.sum @@ -941,6 +941,8 @@ github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -999,6 +1001,12 @@ github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8 github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/mxk/go-imap v0.0.0-20150429134902-531c36c3f12d h1:+DgqA2tuWi/8VU+gVgBAa7+WZrnFbPKhQWbKBB54cVs= github.com/mxk/go-imap v0.0.0-20150429134902-531c36c3f12d/go.mod h1:xacC5qXZnL/ooiitVoe3BtI1OotFTqi5zICBs9J5Fyk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ovh/cds/sdk/interpolate v0.0.0-20231019155847-e738a974db8f h1:EVG6OeiGCWtvDOD9ALqz6T0T5UcNWZ0vU2qGqLSbHT8= github.com/ovh/cds/sdk/interpolate v0.0.0-20231019155847-e738a974db8f/go.mod h1:sHqEJq74yP3ylfc/aX5f7S0HX7NNsHnl8LSgiNX3sc8= github.com/ovh/go-ovh v1.4.3 h1:Gs3V823zwTFpzgGLZNI6ILS4rmxZgJwJCz54Er9LwD0= diff --git a/tests/Makefile b/tests/Makefile index ee45b9c5..8d11853a 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,13 +1,17 @@ PKI_DIR := ./grpc/pki PKI_VAR_FILE = ./pki_variables.yml +NATS_PKI_DIR := ./nats/pki + include ./pki.mk +include ./nats/pki.mk define docker_run - docker run --name venom-$(2) -d $(3) $(1) > venom-$2.cid + docker run --name venom-$(2) -d $(3) $(1) $(4) > venom-$2.cid endef docker-network := venom-test-net +nats-network := venom-nats-network COVER_FILES = $(shell find . -name "*.coverprofile") PKGS_COMMA_SEP = go list -f '{{ join .Deps "\n" }}{{"\n"}}{{.ImportPath}}' . | grep github.com/ovh/venom | grep -v vendor | tr '\n' ',' | sed 's/,$$//' @@ -54,6 +58,30 @@ venom-grpc.cid: docker build -t venom-grpc-greeter ./grpc $(call docker_run,venom-grpc-greeter,grpc,-p 50051:50051) +nats-server-name := nats-server +nats-server-addr := -s nats://venom-$(nats-server-name):4222 --tlscert=/pki/client.crt --tlskey=/pki/client.key --tlsca=/pki/ca.crt +nats-msg-body := {"message": "hello world"} +venom-nats.cid: + docker network create --driver bridge $(nats-network) + $(call docker_run,nats:2.10,$(nats-server-name),\ + --network $(nats-network)\ + -p 4222:4222\ + -v $(shell realpath nats/nats.conf):/nats-server.conf:ro\ + -v $(shell realpath nats/pki):/pki:ro\ + ) + $(call docker_run,natsio/nats-box,nats-replier,\ + --network $(nats-network)\ + -v $(shell realpath nats/pki):/pki:ro,\ + nats $(nats-server-addr) reply nats.test.request --echo\ + ) + $(call docker_run,natsio/nats-box,nats-publisher,\ + --network $(nats-network)\ + -v $(shell realpath nats/pki):/pki:ro,\ + nats $(nats-server-addr) pub --count=1_000_000 --sleep=1s nats.test.message '$(nats-msg-body)'\ + ) + +start-test-stack: $(NATS_PKI_DIR) +start-test-stack: venom-nats.cid start-test-stack: venom-postgres.cid start-test-stack: venom-mysql.cid start-test-stack: venom-mongo.cid @@ -68,7 +96,7 @@ start-test-stack: $(PKI_DIR) start-test-stack: venom-grpc.cid stop-test-stack: - @for f in `ls -1 *.cid`; do docker stop `cat $${f}`; docker rm `cat $${f}`; done; rm -f *.cid; docker network rm $(docker-network) + @for f in `ls -1 *.cid`; do docker stop `cat $${f}`; docker rm `cat $${f}`; done; rm -f *.cid; docker network rm $(docker-network); docker network rm $(nats-network) build-test-binary: cd ../cmd/venom; \ @@ -89,7 +117,7 @@ wait-for-kafka: until curl --retry 30 --retry-connrefused --retry-max-time 100 --connect-timeout 10 -s http://localhost:8081/config/ -o /dev/null; do sleep 2; done ; \ echo "\n\033[0;32mdone\033[0m" -clean: +clean: clean-nats-pki @rm -f *.prof *.html *.xml *.log *.dump.json *.args.file *.error.out *.out *.test.out *.coverprofile merge-coverage: diff --git a/tests/nats.yml b/tests/nats.yml new file mode 100644 index 00000000..a795c493 --- /dev/null +++ b/tests/nats.yml @@ -0,0 +1,256 @@ +name: NATS testsuite +vars: + url: 'nats://localhost:4222' + baseSubject: "nats.test" + message: '{"message": "hello world"}' + certificatePath: "./nats/pki/client.crt" + keyPath: "./nats/pki/client.key" + caPath: "./nats/pki/ca.crt" + natsTlsConfig: "--tlscert {{.certificatePath}} --tlskey {{.keyPath}} --tlsca {{.caPath}}" + +testcases: + + - name: NATS publish testcase + steps: + - type: nats + url: "{{.url}}" + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" + command: publish + subject: "{{.baseSubject}}.publish" + payload: '{{.message}}' + headers: + timestamp: + - "{{.venom.timestamp}}" + assertions: + - result.error ShouldBeEmpty + + - name: NATS publish empty subject testcase + steps: + - type: nats + url: "{{.url}}" + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" + command: publish + subject: "" + payload: '{{.message}}' + headers: + timestamp: + - "{{.venom.timestamp}}" + assertions: + - result.error ShouldNotBeEmpty + + - name: NATS publish Jetstream testcase + steps: + - type: exec + script: | + nats {{.natsTlsConfig}} stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + + - type: nats + command: publish + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" + subject: "{{.baseSubject}}.js.hello" + deadline: 2 + jetstream: + enabled: true + assertions: + - result.error ShouldBeEmpty + + - type: exec + script: | + nats {{.natsTlsConfig}} stream rm TEST -f + + - name: NATS publish Jetstream non-stream subject testcase + steps: + - type: exec + script: | + nats {{.natsTlsConfig}} stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + + - type: nats + command: publish + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" + subject: "some.other.subject" + deadline: 2 + jetstream: + enabled: true + assertions: + - result.error ShouldNotBeEmpty + + - type: exec + script: | + nats {{.natsTlsConfig}} stream rm TEST -f + + - name: NATS publish Jetstream empty subject testcase + steps: + - type: exec + script: | + nats {{.natsTlsConfig}} stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + + - type: nats + command: publish + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" + subject: "" + deadline: 2 + jetstream: + enabled: true + assertions: + - result.error ShouldNotBeEmpty + + - type: exec + script: | + nats {{.natsTlsConfig}} stream rm TEST -f + + - name: NATS subscribe Jetstream consumer testcase + steps: + - type: exec + script: | + nats {{.natsTlsConfig}} stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + + - type: exec + script: | + nats {{.natsTlsConfig}} pub "{{.baseSubject}}.js.hello" '{{.message}}' + nats {{.natsTlsConfig}} pub --count 3 "{{.baseSubject}}.js.world" '{{.message}}' + + - type: nats + command: subscribe + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" + subject: "{{.baseSubject}}.>" + messageLimit: 2 + deadline: 10 + jetstream: + enabled: true + stream: TEST + filterSubjects: + - "{{.baseSubject}}.js.hello" + - "{{.baseSubject}}.js.world" + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 2 + - result.messages.messages0.datajson ShouldContainKey message + - result.messages.messages0.datajson.message ShouldEqual "hello world" + + - type: exec + script: | + nats {{.natsTlsConfig}} stream rm TEST -f + + - name: NATS subscribe Jetstream consumer testcase with deadline + steps: + - type: exec + script: | + nats {{.natsTlsConfig}} stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + + - type: exec + script: | + nats {{.natsTlsConfig}} pub "{{.baseSubject}}.js.hello" '{{.message}}' + nats {{.natsTlsConfig}} pub --count 1 "{{.baseSubject}}.js.world" '{{.message}}' + + - type: nats + command: subscribe + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" + subject: "{{.baseSubject}}.>" + messageLimit: 10 + deadline: 1 + jetstream: + enabled: true + stream: TEST + filterSubjects: + - "{{.baseSubject}}.js.hello" + - "{{.baseSubject}}.js.world" + assertions: + - result.error ShouldNotBeEmpty + + - type: exec + script: | + nats {{.natsTlsConfig}} stream rm TEST -f + + - name: NATS subscribe testcase + steps: + - type: nats + url: "{{.url}}" + command: subscribe + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" + subject: "{{.baseSubject}}.>" + messageLimit: 1 + deadline: 1 + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 1 + - result.messages.messages0.datajson ShouldContainKey message + - result.messages.messages0.datajson.message ShouldEqual "hello world" + + - name: NATS subscribe testcase with deadline + steps: + - type: nats + url: "{{.url}}" + command: subscribe + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" + subject: "{{.baseSubject}}.>" + messageLimit: 1000 + deadline: 1 + assertions: + - result.error ShouldNotBeEmpty + - result.error ShouldContainSubstring "timeout reached" + + - name: NATS request/reply testcase + steps: + - type: nats + url: "{{.url}}" + command: publish + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" + request: true + subject: "{{.baseSubject}}.request" + replySubject: "{{.baseSubject}}.reply" + payload: '{{.message}}' + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 1 + - result.messages.messages0.datajson ShouldContainKey message + - result.messages.messages0.datajson.message ShouldEqual "hello world" \ No newline at end of file diff --git a/tests/nats/nats.conf b/tests/nats/nats.conf new file mode 100644 index 00000000..47957e19 --- /dev/null +++ b/tests/nats/nats.conf @@ -0,0 +1,11 @@ +host: 0.0.0.0 +port: 4222 + +jetstream {} + +tls { + cert_file: "/pki/server.crt" + key_file: "/pki/server.key" + ca_file: "/pki/ca.crt" + verify: true +} \ No newline at end of file diff --git a/tests/nats/pki.mk b/tests/nats/pki.mk new file mode 100644 index 00000000..d2ec2130 --- /dev/null +++ b/tests/nats/pki.mk @@ -0,0 +1,61 @@ +$(NATS_PKI_DIR): + mkdir -p $(NATS_PKI_DIR) + + # create a certificate authority (CA) that both the client and server trust. + # The CA is just a public and private key with the public key wrapped up in a self-signed X.509 certificate. + openssl req \ + -new \ + -x509 \ + -nodes \ + -days 365 \ + -subj '/C=GB/O=Example/OU=TeamA/CN=ca.example.com' \ + -keyout $(NATS_PKI_DIR)/ca.key \ + -out $(NATS_PKI_DIR)/ca.crt + + # create the server’s key + openssl genrsa \ + -out $(NATS_PKI_DIR)/server.key 2048 + + # create a server Certificate Signing Request + openssl req \ + -new \ + -key $(NATS_PKI_DIR)/server.key \ + -subj '/C=GB/O=Example/OU=TeamA/CN=example.com' \ + -addext 'subjectAltName = DNS:venom-$(nats-server-name), DNS:localhost, IP:127.0.0.1, IP:::1' \ + -out $(NATS_PKI_DIR)/server.csr + + # creates the server signed certificate + openssl x509 \ + -req \ + -in $(NATS_PKI_DIR)/server.csr \ + -CA $(NATS_PKI_DIR)/ca.crt \ + -CAkey $(NATS_PKI_DIR)/ca.key \ + -CAcreateserial \ + -days 365 \ + -copy_extensions copy \ + -out $(NATS_PKI_DIR)/server.crt + + # create the client's key + openssl genrsa \ + -out $(NATS_PKI_DIR)/client.key 2048 + + # create a client Certificate Signing Request + openssl req \ + -new \ + -key $(NATS_PKI_DIR)/client.key \ + -subj '/CN=user1.example.com' \ + -out $(NATS_PKI_DIR)/client.csr + + # creates the client signed certificate + openssl x509 \ + -req \ + -in $(NATS_PKI_DIR)/client.csr \ + -CA $(NATS_PKI_DIR)/ca.crt \ + -CAkey $(NATS_PKI_DIR)/ca.key \ + -CAcreateserial \ + -days 365 \ + -out $(NATS_PKI_DIR)/client.crt + +.PHONY: clean-nats-pki +clean-nats-pki: + rm -fr $(NATS_PKI_DIR) \ No newline at end of file