diff --git a/cmd/sse/main.go b/cmd/sse/main.go index 02762cf..b0618d1 100644 --- a/cmd/sse/main.go +++ b/cmd/sse/main.go @@ -85,8 +85,14 @@ func main() { var streamer stream.Streamer if cfg.RabbitMQURI() != "" { - log.Info("Starting up a RabbitMQStreamer") - streamer, err = stream.NewRabbitMQStreamer(*rabbitMQStream.NewEnvironmentOptions().SetUri(cfg.RabbitMQURI()), log) + if cfg.RabbitMQStreamName() == "" { + log.Fatal("ETOS_RABBITMQ_STREAM_NAME must be set for the SSE server to work.") + } + log.Infof("Starting up a RabbitMQStreamer with stream name: %s", cfg.RabbitMQStreamName()) + streamer, err = stream.NewRabbitMQStreamer(*rabbitMQStream.NewEnvironmentOptions().SetUri(cfg.RabbitMQURI()), log, cfg.RabbitMQStreamName()) + if err := streamer.CreateStream(ctx, log, cfg.RabbitMQStreamName()); err != nil { + log.Fatal(err.Error()) + } } else { log.Warning("RabbitMQURI is not set, defaulting to FileStreamer") streamer, err = stream.NewFileStreamer(100*time.Millisecond, log) diff --git a/go.mod b/go.mod index 07f49dc..036dc89 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/maxcnunes/httpfake v1.2.4 github.com/package-url/packageurl-go v0.1.3 github.com/rabbitmq/amqp091-go v1.10.0 - github.com/rabbitmq/rabbitmq-stream-go-client v1.4.10 + github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0 github.com/sethvargo/go-retry v0.3.0 github.com/sirupsen/logrus v1.9.3 github.com/snowzach/rotatefilehook v0.0.0-20220211133110-53752135082d @@ -102,12 +102,12 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect - golang.org/x/crypto v0.26.0 // indirect - golang.org/x/net v0.28.0 // indirect + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/net v0.33.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect - golang.org/x/sys v0.23.0 // indirect - golang.org/x/term v0.23.0 // indirect - golang.org/x/text v0.17.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/term v0.27.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.3.0 // indirect google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect diff --git a/go.sum b/go.sum index c2a4572..a41e3ac 100644 --- a/go.sum +++ b/go.sum @@ -125,8 +125,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM= -github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -193,10 +193,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= -github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= -github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= -github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= +github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU= +github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/package-url/packageurl-go v0.1.3 h1:4juMED3hHiz0set3Vq3KeQ75KD1avthoXLtmE3I0PLs= github.com/package-url/packageurl-go v0.1.3/go.mod h1:nKAWB8E6uk1MHqiS/lQb9pYBGH2+mdJ2PJc2s50dQY0= @@ -230,8 +230,8 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= -github.com/rabbitmq/rabbitmq-stream-go-client v1.4.10 h1:1kDn/orisEbfMtxdZwWKpxX9+FahnzoRCuGCLZ66fAc= -github.com/rabbitmq/rabbitmq-stream-go-client v1.4.10/go.mod h1:SdWsW0K5FVo8lIx0lCH17wh7RItXEQb8bfpxVlTVqS8= +github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0 h1:2UWryxhtQmYA3Bx2iajQCre3yQbARiSikpC/8iWbu3k= +github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0/go.mod h1:KDXSNVSqj4QNg6TNMBnQQ/oWHaxLjUI1520j68SyEcY= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= @@ -328,8 +328,8 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -350,8 +350,8 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= -golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -364,8 +364,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -381,15 +381,15 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= -golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= -golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -400,8 +400,8 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= +golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/config/sse.go b/internal/config/sse.go index 41534ed..84afd6c 100644 --- a/internal/config/sse.go +++ b/internal/config/sse.go @@ -23,18 +23,21 @@ import ( type SSEConfig interface { Config RabbitMQURI() string + RabbitMQStreamName() string } type sseCfg struct { Config - rabbitmqURI string + rabbitmqURI string + rabbitmqStreamName string } // NewSSEConfig creates a sse config interface based on input parameters or environment variables. func NewSSEConfig() SSEConfig { var conf sseCfg - flag.StringVar(&conf.rabbitmqURI, "rabbitmquri", os.Getenv("ETOS_RABBITMQ_URI"), "URI to the RabbitMQ ") + flag.StringVar(&conf.rabbitmqURI, "rabbitmquri", os.Getenv("ETOS_RABBITMQ_URI"), "URI to the RabbitMQ") + flag.StringVar(&conf.rabbitmqStreamName, "rabbitmqstreamname", os.Getenv("ETOS_RABBITMQ_STREAM_NAME"), "Stream name to use with RabbitMQ.") base := load() conf.Config = base flag.Parse() @@ -45,3 +48,8 @@ func NewSSEConfig() SSEConfig { func (c *sseCfg) RabbitMQURI() string { return c.rabbitmqURI } + +// RabbitMQStreamName returns the stream name to use with RabbitMQ. +func (c *sseCfg) RabbitMQStreamName() string { + return c.rabbitmqStreamName +} diff --git a/internal/stream/rabbitmq.go b/internal/stream/rabbitmq.go index 2d625c7..84675a3 100644 --- a/internal/stream/rabbitmq.go +++ b/internal/stream/rabbitmq.go @@ -35,15 +35,16 @@ const IgnoreUnfiltered = false type RabbitMQStreamer struct { environment *stream.Environment logger *logrus.Entry + streamName string } // NewRabbitMQStreamer creates a new RabbitMQ streamer. Only a single connection should be created. -func NewRabbitMQStreamer(options stream.EnvironmentOptions, logger *logrus.Entry) (Streamer, error) { +func NewRabbitMQStreamer(options stream.EnvironmentOptions, logger *logrus.Entry, streamName string) (Streamer, error) { env, err := stream.NewEnvironment(&options) if err != nil { log.Fatal(err) } - return &RabbitMQStreamer{environment: env, logger: logger}, err + return &RabbitMQStreamer{environment: env, logger: logger, streamName: streamName}, err } // CreateStream creates a new RabbitMQ stream. @@ -52,16 +53,15 @@ func (s *RabbitMQStreamer) CreateStream(ctx context.Context, logger *logrus.Entr // This will create the stream if not already created. return s.environment.DeclareStream(name, &stream.StreamOptions{ - // TODO: More sane numbers MaxLengthBytes: stream.ByteCapacity{}.GB(2), - MaxAge: time.Second * 10, + MaxAge: time.Hour * 48, }, ) } // NewStream creates a new stream struct to consume from. func (s *RabbitMQStreamer) NewStream(ctx context.Context, logger *logrus.Entry, name string) (Stream, error) { - exists, err := s.environment.StreamExists(name) + exists, err := s.environment.StreamExists(s.streamName) if err != nil { return nil, err } @@ -73,7 +73,7 @@ func (s *RabbitMQStreamer) NewStream(ctx context.Context, logger *logrus.Entry, SetConsumerName(name). SetCRCCheck(false). SetOffset(stream.OffsetSpecification{}.First()) - return &RabbitMQStream{ctx: ctx, logger: logger, streamName: name, environment: s.environment, options: options}, nil + return &RabbitMQStream{ctx: ctx, logger: logger, streamName: s.streamName, environment: s.environment, options: options}, nil } // Close the RabbitMQ connection. diff --git a/pkg/events/events.go b/pkg/events/events.go index 6f123f2..6ddfe11 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -41,7 +41,6 @@ func New(data []byte) (Event, error) { } e.Data = string(data) return e, nil - } // Write an event to a writer object diff --git a/pkg/sse/v2alpha/sse.go b/pkg/sse/v2alpha/sse.go index 4c2184f..f1c37b8 100644 --- a/pkg/sse/v2alpha/sse.go +++ b/pkg/sse/v2alpha/sse.go @@ -75,9 +75,9 @@ func New(ctx context.Context, cfg config.SSEConfig, log *logrus.Entry, streamer // LoadRoutes loads all the v2alpha routes. func (a Application) LoadRoutes(router *httprouter.Router) { handler := &Handler{a.logger, a.cfg, a.ctx, a.streamer} - router.GET("/v2alpha/selftest/ping", handler.Selftest) - router.GET("/v2alpha/events/:identifier", a.authorizer.Middleware(scope.StreamSSE, handler.GetEvents)) - router.POST("/v2alpha/stream/:identifier", a.authorizer.Middleware(scope.DefineSSE, handler.CreateStream)) + router.GET("/sse/v2alpha/selftest/ping", handler.Selftest) + router.GET("/sse/v2alpha/events/:identifier", a.authorizer.Middleware(scope.StreamSSE, handler.GetEvents)) + router.POST("/sse/v2alpha/stream/:identifier", a.authorizer.Middleware(scope.DefineSSE, handler.CreateStream)) } // Selftest is a handler to just return 204. @@ -102,16 +102,14 @@ type ErrorEvent struct { } // subscribe subscribes to stream and gets logs and events from it and writes them to a channel. -func (h Handler) subscribe(ctx context.Context, logger *logrus.Entry, streamer stream.Stream, ch chan<- events.Event, counter int, filter []string) { +func (h Handler) subscribe(ctx context.Context, logger *logrus.Entry, streamer stream.Stream, ch chan<- events.Event, lastID int, filter []string) { defer close(ch) var err error consumeCh := make(chan []byte, 0) offset := -1 - if counter > 1 { - offset = counter - } + counter := 1 // lastID will default to 1 and the first event will be 1 closed, err := streamer.WithChannel(consumeCh).WithOffset(offset).WithFilter(filter).Consume(ctx) if err != nil { @@ -138,6 +136,13 @@ func (h Handler) subscribe(ctx context.Context, logger *logrus.Entry, streamer s ch <- events.Event{Event: "error", Data: string(b)} return case msg := <-consumeCh: + // We have no reliable way of getting a specific offset on the SSE stream so + // we will need to iterate all events until we reach the last known ID. + if counter < lastID { + counter++ + continue + } + event, err = events.New(msg) if err != nil { logger.WithError(err).Error("failed to parse SSE event") diff --git a/pkg/sse/v2alpha/sse_test.go b/pkg/sse/v2alpha/sse_test.go index a600dfb..6c0c921 100644 --- a/pkg/sse/v2alpha/sse_test.go +++ b/pkg/sse/v2alpha/sse_test.go @@ -40,6 +40,10 @@ func (c cfg) RabbitMQURI() string { return "" } +func (c cfg) RabbitMQStreamName() string { + return "test" +} + // TestSSECreateStream tests that the CreateStream endpoint works. // Does not test the authorization middleware! func TestSSECreateStream(t *testing.T) { diff --git a/python/src/etos_api/__init__.py b/python/src/etos_api/__init__.py index 6997852..4613e35 100644 --- a/python/src/etos_api/__init__.py +++ b/python/src/etos_api/__init__.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS API module.""" + import os from importlib.metadata import PackageNotFoundError, version @@ -38,10 +39,6 @@ from .library.providers.register import RegisterProviders from .main import APP -# The API shall not send logs to RabbitMQ as it -# is too early in the ETOS test run. -os.environ["ETOS_ENABLE_SENDING_LOGS"] = "false" - try: VERSION = version("etos_api") except PackageNotFoundError: diff --git a/python/src/etos_api/library/context_logging.py b/python/src/etos_api/library/context_logging.py index 8620b9f..5f40200 100644 --- a/python/src/etos_api/library/context_logging.py +++ b/python/src/etos_api/library/context_logging.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS API context based logging.""" + import logging from contextvars import ContextVar from etos_lib.logging.logger import FORMAT_CONFIG @@ -32,6 +33,7 @@ class ContextLogging(logging.Logger): get for each logging method called. """ + # Default identifier is 'Unknown' which is ignored by the ETOS internal messagebus. identifier = ContextVar("identifier") def critical(self, msg, *args, **kwargs): @@ -39,7 +41,7 @@ def critical(self, msg, *args, **kwargs): For documentation read :obj:`logging.Logger.critical` """ - FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main + FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown return super().critical(msg, *args, **kwargs) def error(self, msg, *args, **kwargs): @@ -47,7 +49,7 @@ def error(self, msg, *args, **kwargs): For documentation read :obj:`logging.Logger.error` """ - FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main + FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown return super().error(msg, *args, **kwargs) def warning(self, msg, *args, **kwargs): @@ -55,7 +57,7 @@ def warning(self, msg, *args, **kwargs): For documentation read :obj:`logging.Logger.warning` """ - FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main + FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown return super().warning(msg, *args, **kwargs) def info(self, msg, *args, **kwargs): @@ -63,7 +65,7 @@ def info(self, msg, *args, **kwargs): For documentation read :obj:`logging.Logger.info` """ - FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main + FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown return super().info(msg, *args, **kwargs) def debug(self, msg, *args, **kwargs): @@ -71,7 +73,7 @@ def debug(self, msg, *args, **kwargs): For documentation read :obj:`logging.Logger.debug` """ - FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main + FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown return super().debug(msg, *args, **kwargs) diff --git a/python/tox.ini b/python/tox.ini index 2bc873b..8e99f69 100644 --- a/python/tox.ini +++ b/python/tox.ini @@ -11,6 +11,7 @@ deps = -r{toxinidir}/test-requirements.txt setenv = ETOS_DISABLE_SENDING_EVENTS=1 + ETOS_ENABLE_SENDING_LOGS=0 ETOS_DISABLE_RECEIVING_EVENTS=1 ETOS_GRAPHQL_SERVER=http://localhost:8005/graphql ETOS_ENVIRONMENT_PROVIDER=http://localhost:8005/environment_provider