diff --git a/internal/app/service.go b/internal/app/service.go index 363a96eae..709664db1 100644 --- a/internal/app/service.go +++ b/internal/app/service.go @@ -404,33 +404,36 @@ func (svc *Service) SetDefaultFunctionsPipeline(transforms ...interfaces.AppFunc return nil } -// AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics -func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error { - if len(transforms) == 0 { - return errors.New("no transforms provided to pipeline") - } +// getFullTopics adds the base topic prefix to all the topics in a list +func getFullTopics(topics []string, baseTopicPrefix string) ([]string, error) { + var fullTopics []string if len(topics) == 0 { - return errors.New("topics for pipeline can not be empty") + return nil, errors.New("topics for pipeline can not be empty") } - for _, t := range topics { - if strings.TrimSpace(t) == "" { - return errors.New("blank topic not allowed") + for _, topic := range topics { + if strings.TrimSpace(topic) == "" { + return nil, errors.New("blank topic not allowed") } + fullTopics = append(fullTopics, coreCommon.BuildTopic(baseTopicPrefix, topic)) } + return fullTopics, nil +} - // Must add the base topic to all the input topics - var fullTopics []string - for _, topic := range topics { - fullTopics = append(fullTopics, coreCommon.BuildTopic(svc.config.MessageBus.GetBaseTopicPrefix(), topic)) +// AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics +func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error { + if len(transforms) == 0 { + return errors.New("no transforms provided to pipeline") } - - err := svc.runtime.AddFunctionsPipeline(id, fullTopics, transforms) + fullTopics, err := getFullTopics(topics, svc.config.MessageBus.GetBaseTopicPrefix()) + if err != nil { + return err + } + err = svc.runtime.AddFunctionsPipeline(id, fullTopics, transforms) if err != nil { return err } - svc.lc.Debugf("Pipeline '%s' added for topics '%v' with %d transform(s)", id, fullTopics, len(transforms)) return nil } @@ -440,6 +443,16 @@ func (svc *Service) RemoveAllFunctionPipelines() { svc.runtime.RemoveAllFunctionPipelines() } +// SetFunctionsPipelineTopics updates the list of topics for the specified functions pipeline +func (svc *Service) SetFunctionsPipelineTopics(id string, topics []string) error { + fullTopics, err := getFullTopics(topics, svc.config.MessageBus.GetBaseTopicPrefix()) + if err != nil { + return err + } + svc.runtime.SetFunctionsPipelineTopics(id, fullTopics) + return nil +} + // RequestTimeout returns the Request Timeout duration that was parsed from the Service.RequestTimeout configuration func (svc *Service) RequestTimeout() time.Duration { return svc.requestTimeout diff --git a/internal/app/service_test.go b/internal/app/service_test.go index a2694ad20..2051eff96 100644 --- a/internal/app/service_test.go +++ b/internal/app/service_test.go @@ -393,6 +393,44 @@ func TestService_AddFunctionsPipelineForTopics(t *testing.T) { } } +func TestService_SetFunctionsPipelineTopics(t *testing.T) { + service := Service{ + lc: lc, + dic: dic, + runtime: runtime.NewFunctionPipelineRuntime("", nil, dic), + config: &common.ConfigurationStruct{ + Trigger: common.TriggerInfo{ + Type: TriggerTypeMessageBus, + }, + MessageBus: config.MessageBusInfo{ + BaseTopicPrefix: "base/two", + }, + }, + } + + tags := builtin.NewTags(nil) + + transforms := []interfaces.AppFunction{tags.AddTags} + topics := []string{"a/b", "a/c"} + bustopics := []string{"base/two/a/b", "base/two/a/c"} + err := service.AddFunctionsPipelineForTopics("pl", topics, transforms...) + require.NoError(t, err) + assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics) + + topics = append(topics, "c/d/e") + bustopics = append(bustopics, "base/two/c/d/e") + err = service.SetFunctionsPipelineTopics("pl", topics) + require.NoError(t, err) + assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics) + err = service.SetFunctionsPipelineTopics("pl", nil) + require.Error(t, err) + assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics) + topics = append(topics, "") + err = service.SetFunctionsPipelineTopics("pl", topics) + require.Error(t, err) + assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics) +} + func TestService_RemoveAllFunctionPipelines(t *testing.T) { service := Service{ lc: lc, diff --git a/pkg/interfaces/mocks/ApplicationService.go b/pkg/interfaces/mocks/ApplicationService.go index 1cb8f2f09..e8e575202 100644 --- a/pkg/interfaces/mocks/ApplicationService.go +++ b/pkg/interfaces/mocks/ApplicationService.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.49.0. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package mocks @@ -138,7 +138,7 @@ func (_m *ApplicationService) AddFunctionsPipelineForTopics(id string, topic []s return r0 } -// AppContext provides a mock function with given fields: +// AppContext provides a mock function with no fields func (_m *ApplicationService) AppContext() context.Context { ret := _m.Called() @@ -158,7 +158,7 @@ func (_m *ApplicationService) AppContext() context.Context { return r0 } -// ApplicationSettings provides a mock function with given fields: +// ApplicationSettings provides a mock function with no fields func (_m *ApplicationService) ApplicationSettings() map[string]string { ret := _m.Called() @@ -198,7 +198,7 @@ func (_m *ApplicationService) BuildContext(correlationId string, contentType str return r0 } -// CommandClient provides a mock function with given fields: +// CommandClient provides a mock function with no fields func (_m *ApplicationService) CommandClient() clientsinterfaces.CommandClient { ret := _m.Called() @@ -218,7 +218,7 @@ func (_m *ApplicationService) CommandClient() clientsinterfaces.CommandClient { return r0 } -// DeviceClient provides a mock function with given fields: +// DeviceClient provides a mock function with no fields func (_m *ApplicationService) DeviceClient() clientsinterfaces.DeviceClient { ret := _m.Called() @@ -238,7 +238,7 @@ func (_m *ApplicationService) DeviceClient() clientsinterfaces.DeviceClient { return r0 } -// DeviceProfileClient provides a mock function with given fields: +// DeviceProfileClient provides a mock function with no fields func (_m *ApplicationService) DeviceProfileClient() clientsinterfaces.DeviceProfileClient { ret := _m.Called() @@ -258,7 +258,7 @@ func (_m *ApplicationService) DeviceProfileClient() clientsinterfaces.DeviceProf return r0 } -// DeviceServiceClient provides a mock function with given fields: +// DeviceServiceClient provides a mock function with no fields func (_m *ApplicationService) DeviceServiceClient() clientsinterfaces.DeviceServiceClient { ret := _m.Called() @@ -278,7 +278,7 @@ func (_m *ApplicationService) DeviceServiceClient() clientsinterfaces.DeviceServ return r0 } -// EventClient provides a mock function with given fields: +// EventClient provides a mock function with no fields func (_m *ApplicationService) EventClient() clientsinterfaces.EventClient { ret := _m.Called() @@ -374,7 +374,7 @@ func (_m *ApplicationService) ListenForCustomConfigChanges(configToWatch interfa return r0 } -// LoadConfigurableFunctionPipelines provides a mock function with given fields: +// LoadConfigurableFunctionPipelines provides a mock function with no fields func (_m *ApplicationService) LoadConfigurableFunctionPipelines() (map[string]interfaces.FunctionPipeline, error) { ret := _m.Called() @@ -422,7 +422,7 @@ func (_m *ApplicationService) LoadCustomConfig(_a0 interfaces.UpdatableConfig, s return r0 } -// LoggingClient provides a mock function with given fields: +// LoggingClient provides a mock function with no fields func (_m *ApplicationService) LoggingClient() logger.LoggingClient { ret := _m.Called() @@ -442,7 +442,7 @@ func (_m *ApplicationService) LoggingClient() logger.LoggingClient { return r0 } -// MetricsManager provides a mock function with given fields: +// MetricsManager provides a mock function with no fields func (_m *ApplicationService) MetricsManager() bootstrapinterfaces.MetricsManager { ret := _m.Called() @@ -462,7 +462,7 @@ func (_m *ApplicationService) MetricsManager() bootstrapinterfaces.MetricsManage return r0 } -// NotificationClient provides a mock function with given fields: +// NotificationClient provides a mock function with no fields func (_m *ApplicationService) NotificationClient() clientsinterfaces.NotificationClient { ret := _m.Called() @@ -518,7 +518,7 @@ func (_m *ApplicationService) PublishWithTopic(topic string, data interface{}, c return r0 } -// ReadingClient provides a mock function with given fields: +// ReadingClient provides a mock function with no fields func (_m *ApplicationService) ReadingClient() clientsinterfaces.ReadingClient { ret := _m.Called() @@ -574,7 +574,7 @@ func (_m *ApplicationService) RegisterCustomTriggerFactory(name string, factory return r0 } -// RegistryClient provides a mock function with given fields: +// RegistryClient provides a mock function with no fields func (_m *ApplicationService) RegistryClient() registry.Client { ret := _m.Called() @@ -594,12 +594,12 @@ func (_m *ApplicationService) RegistryClient() registry.Client { return r0 } -// RemoveAllFunctionPipelines provides a mock function with given fields: +// RemoveAllFunctionPipelines provides a mock function with no fields func (_m *ApplicationService) RemoveAllFunctionPipelines() { _m.Called() } -// RequestTimeout provides a mock function with given fields: +// RequestTimeout provides a mock function with no fields func (_m *ApplicationService) RequestTimeout() time.Duration { ret := _m.Called() @@ -617,7 +617,7 @@ func (_m *ApplicationService) RequestTimeout() time.Duration { return r0 } -// Run provides a mock function with given fields: +// Run provides a mock function with no fields func (_m *ApplicationService) Run() error { ret := _m.Called() @@ -635,7 +635,7 @@ func (_m *ApplicationService) Run() error { return r0 } -// SecretProvider provides a mock function with given fields: +// SecretProvider provides a mock function with no fields func (_m *ApplicationService) SecretProvider() bootstrapinterfaces.SecretProvider { ret := _m.Called() @@ -679,12 +679,30 @@ func (_m *ApplicationService) SetDefaultFunctionsPipeline(transforms ...func(int return r0 } -// Stop provides a mock function with given fields: +// SetFunctionsPipelineTopics provides a mock function with given fields: id, topics +func (_m *ApplicationService) SetFunctionsPipelineTopics(id string, topics []string) error { + ret := _m.Called(id, topics) + + if len(ret) == 0 { + panic("no return value specified for SetFunctionsPipelineTopics") + } + + var r0 error + if rf, ok := ret.Get(0).(func(string, []string) error); ok { + r0 = rf(id, topics) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Stop provides a mock function with no fields func (_m *ApplicationService) Stop() { _m.Called() } -// SubscriptionClient provides a mock function with given fields: +// SubscriptionClient provides a mock function with no fields func (_m *ApplicationService) SubscriptionClient() clientsinterfaces.SubscriptionClient { ret := _m.Called() diff --git a/pkg/interfaces/service.go b/pkg/interfaces/service.go index 292540a86..e21ec160f 100644 --- a/pkg/interfaces/service.go +++ b/pkg/interfaces/service.go @@ -117,6 +117,8 @@ type ApplicationService interface { AddFunctionsPipelineForTopics(id string, topic []string, transforms ...AppFunction) error // RemoveAllFunctionPipelines removes all existing function pipelines RemoveAllFunctionPipelines() + // SetFunctionsPipelineTopics updates the list of topics for the specified functions pipeline + SetFunctionsPipelineTopics(id string, topics []string) error // Run starts the configured trigger to allow the functions pipeline to execute when the trigger // receives data and starts the internal webserver. This is a long-running function which does not return until // the service is stopped or Stop() is called.