Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,33 +404,36 @@ func (svc *Service) SetDefaultFunctionsPipeline(transforms ...interfaces.AppFunc
return nil
}

// AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics
func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error {
if len(transforms) == 0 {
return errors.New("no transforms provided to pipeline")
}
// getFullTopics adds the base topic prefix to all the topics in a list
func getFullTopics(topics []string, baseTopicPrefix string) ([]string, error) {
var fullTopics []string

if len(topics) == 0 {
return errors.New("topics for pipeline can not be empty")
return nil, errors.New("topics for pipeline can not be empty")
}

for _, t := range topics {
if strings.TrimSpace(t) == "" {
return errors.New("blank topic not allowed")
for _, topic := range topics {
if strings.TrimSpace(topic) == "" {
return nil, errors.New("blank topic not allowed")
}
fullTopics = append(fullTopics, coreCommon.BuildTopic(baseTopicPrefix, topic))
}
return fullTopics, nil
}

// Must add the base topic to all the input topics
var fullTopics []string
for _, topic := range topics {
fullTopics = append(fullTopics, coreCommon.BuildTopic(svc.config.MessageBus.GetBaseTopicPrefix(), topic))
// AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics
func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error {
if len(transforms) == 0 {
return errors.New("no transforms provided to pipeline")
}

err := svc.runtime.AddFunctionsPipeline(id, fullTopics, transforms)
fullTopics, err := getFullTopics(topics, svc.config.MessageBus.GetBaseTopicPrefix())
if err != nil {
return err
}
err = svc.runtime.AddFunctionsPipeline(id, fullTopics, transforms)
if err != nil {
return err
}

svc.lc.Debugf("Pipeline '%s' added for topics '%v' with %d transform(s)", id, fullTopics, len(transforms))
return nil
}
Expand All @@ -440,6 +443,16 @@ func (svc *Service) RemoveAllFunctionPipelines() {
svc.runtime.RemoveAllFunctionPipelines()
}

// SetFunctionsPipelineTopics updates the list of topics for the specified functions pipeline
func (svc *Service) SetFunctionsPipelineTopics(id string, topics []string) error {
fullTopics, err := getFullTopics(topics, svc.config.MessageBus.GetBaseTopicPrefix())
if err != nil {
return err
}
svc.runtime.SetFunctionsPipelineTopics(id, fullTopics)
return nil
}

// RequestTimeout returns the Request Timeout duration that was parsed from the Service.RequestTimeout configuration
func (svc *Service) RequestTimeout() time.Duration {
return svc.requestTimeout
Expand Down
38 changes: 38 additions & 0 deletions internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,44 @@ func TestService_AddFunctionsPipelineForTopics(t *testing.T) {
}
}

func TestService_SetFunctionsPipelineTopics(t *testing.T) {
service := Service{
lc: lc,
dic: dic,
runtime: runtime.NewFunctionPipelineRuntime("", nil, dic),
config: &common.ConfigurationStruct{
Trigger: common.TriggerInfo{
Type: TriggerTypeMessageBus,
},
MessageBus: config.MessageBusInfo{
BaseTopicPrefix: "base/two",
},
},
}

tags := builtin.NewTags(nil)

transforms := []interfaces.AppFunction{tags.AddTags}
topics := []string{"a/b", "a/c"}
bustopics := []string{"base/two/a/b", "base/two/a/c"}
err := service.AddFunctionsPipelineForTopics("pl", topics, transforms...)
require.NoError(t, err)
assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics)

topics = append(topics, "c/d/e")
bustopics = append(bustopics, "base/two/c/d/e")
err = service.SetFunctionsPipelineTopics("pl", topics)
require.NoError(t, err)
assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics)
err = service.SetFunctionsPipelineTopics("pl", nil)
require.Error(t, err)
assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics)
topics = append(topics, "")
err = service.SetFunctionsPipelineTopics("pl", topics)
require.Error(t, err)
assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics)
}

func TestService_RemoveAllFunctionPipelines(t *testing.T) {
service := Service{
lc: lc,
Expand Down
58 changes: 38 additions & 20 deletions pkg/interfaces/mocks/ApplicationService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/interfaces/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type ApplicationService interface {
AddFunctionsPipelineForTopics(id string, topic []string, transforms ...AppFunction) error
// RemoveAllFunctionPipelines removes all existing function pipelines
RemoveAllFunctionPipelines()
// SetFunctionsPipelineTopics updates the list of topics for the specified functions pipeline
SetFunctionsPipelineTopics(id string, topics []string) error
// Run starts the configured trigger to allow the functions pipeline to execute when the trigger
// receives data and starts the internal webserver. This is a long-running function which does not return until
// the service is stopped or Stop() is called.
Expand Down