Skip to content

Commit f65d4cb

Browse files
authored
Merge pull request #1751 from eaton-coreymutter/issue-1750-dynamic-topics
feat: add ability to update pipeline topic list at runtime.
2 parents b397bfa + 08c43a3 commit f65d4cb

File tree

4 files changed

+107
-36
lines changed

4 files changed

+107
-36
lines changed

internal/app/service.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -404,33 +404,36 @@ func (svc *Service) SetDefaultFunctionsPipeline(transforms ...interfaces.AppFunc
404404
return nil
405405
}
406406

407-
// AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics
408-
func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error {
409-
if len(transforms) == 0 {
410-
return errors.New("no transforms provided to pipeline")
411-
}
407+
// getFullTopics adds the base topic prefix to all the topics in a list
408+
func getFullTopics(topics []string, baseTopicPrefix string) ([]string, error) {
409+
var fullTopics []string
412410

413411
if len(topics) == 0 {
414-
return errors.New("topics for pipeline can not be empty")
412+
return nil, errors.New("topics for pipeline can not be empty")
415413
}
416414

417-
for _, t := range topics {
418-
if strings.TrimSpace(t) == "" {
419-
return errors.New("blank topic not allowed")
415+
for _, topic := range topics {
416+
if strings.TrimSpace(topic) == "" {
417+
return nil, errors.New("blank topic not allowed")
420418
}
419+
fullTopics = append(fullTopics, coreCommon.BuildTopic(baseTopicPrefix, topic))
421420
}
421+
return fullTopics, nil
422+
}
422423

423-
// Must add the base topic to all the input topics
424-
var fullTopics []string
425-
for _, topic := range topics {
426-
fullTopics = append(fullTopics, coreCommon.BuildTopic(svc.config.MessageBus.GetBaseTopicPrefix(), topic))
424+
// AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics
425+
func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error {
426+
if len(transforms) == 0 {
427+
return errors.New("no transforms provided to pipeline")
427428
}
428-
429-
err := svc.runtime.AddFunctionsPipeline(id, fullTopics, transforms)
429+
fullTopics, err := getFullTopics(topics, svc.config.MessageBus.GetBaseTopicPrefix())
430+
if err != nil {
431+
return err
432+
}
433+
err = svc.runtime.AddFunctionsPipeline(id, fullTopics, transforms)
430434
if err != nil {
431435
return err
432436
}
433-
434437
svc.lc.Debugf("Pipeline '%s' added for topics '%v' with %d transform(s)", id, fullTopics, len(transforms))
435438
return nil
436439
}
@@ -440,6 +443,16 @@ func (svc *Service) RemoveAllFunctionPipelines() {
440443
svc.runtime.RemoveAllFunctionPipelines()
441444
}
442445

446+
// SetFunctionsPipelineTopics updates the list of topics for the specified functions pipeline
447+
func (svc *Service) SetFunctionsPipelineTopics(id string, topics []string) error {
448+
fullTopics, err := getFullTopics(topics, svc.config.MessageBus.GetBaseTopicPrefix())
449+
if err != nil {
450+
return err
451+
}
452+
svc.runtime.SetFunctionsPipelineTopics(id, fullTopics)
453+
return nil
454+
}
455+
443456
// RequestTimeout returns the Request Timeout duration that was parsed from the Service.RequestTimeout configuration
444457
func (svc *Service) RequestTimeout() time.Duration {
445458
return svc.requestTimeout

internal/app/service_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,44 @@ func TestService_AddFunctionsPipelineForTopics(t *testing.T) {
393393
}
394394
}
395395

396+
func TestService_SetFunctionsPipelineTopics(t *testing.T) {
397+
service := Service{
398+
lc: lc,
399+
dic: dic,
400+
runtime: runtime.NewFunctionPipelineRuntime("", nil, dic),
401+
config: &common.ConfigurationStruct{
402+
Trigger: common.TriggerInfo{
403+
Type: TriggerTypeMessageBus,
404+
},
405+
MessageBus: config.MessageBusInfo{
406+
BaseTopicPrefix: "base/two",
407+
},
408+
},
409+
}
410+
411+
tags := builtin.NewTags(nil)
412+
413+
transforms := []interfaces.AppFunction{tags.AddTags}
414+
topics := []string{"a/b", "a/c"}
415+
bustopics := []string{"base/two/a/b", "base/two/a/c"}
416+
err := service.AddFunctionsPipelineForTopics("pl", topics, transforms...)
417+
require.NoError(t, err)
418+
assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics)
419+
420+
topics = append(topics, "c/d/e")
421+
bustopics = append(bustopics, "base/two/c/d/e")
422+
err = service.SetFunctionsPipelineTopics("pl", topics)
423+
require.NoError(t, err)
424+
assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics)
425+
err = service.SetFunctionsPipelineTopics("pl", nil)
426+
require.Error(t, err)
427+
assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics)
428+
topics = append(topics, "")
429+
err = service.SetFunctionsPipelineTopics("pl", topics)
430+
require.Error(t, err)
431+
assert.Equal(t, bustopics, service.runtime.GetPipelineById("pl").Topics)
432+
}
433+
396434
func TestService_RemoveAllFunctionPipelines(t *testing.T) {
397435
service := Service{
398436
lc: lc,

pkg/interfaces/mocks/ApplicationService.go

Lines changed: 38 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/interfaces/service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ type ApplicationService interface {
117117
AddFunctionsPipelineForTopics(id string, topic []string, transforms ...AppFunction) error
118118
// RemoveAllFunctionPipelines removes all existing function pipelines
119119
RemoveAllFunctionPipelines()
120+
// SetFunctionsPipelineTopics updates the list of topics for the specified functions pipeline
121+
SetFunctionsPipelineTopics(id string, topics []string) error
120122
// Run starts the configured trigger to allow the functions pipeline to execute when the trigger
121123
// receives data and starts the internal webserver. This is a long-running function which does not return until
122124
// the service is stopped or Stop() is called.

0 commit comments

Comments
 (0)