Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions internal/app/configupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,13 @@ func (svc *Service) findMatchingFunction(configurable reflect.Value, functionNam
functionType := functionValue.Type()
return functionValue, functionType, nil
}

func (svc *Service) findMatchingFunctionWrapper(configurables []reflect.Value, functionName string) (functionValue reflect.Value, functionType reflect.Type, err error) {
for _, conconfigurable := range configurables {
functionValue, functionType, err = svc.findMatchingFunction(conconfigurable, functionName)
if err == nil {
return
}
}
return
}
24 changes: 19 additions & 5 deletions internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func NewService(serviceKey string, targetType interface{}, profileSuffixPlacehol
serviceKey: serviceKey,
targetType: targetType,
profileSuffixPlaceholder: profileSuffixPlaceholder,
configurableFactorMap: make(map[string]interfaces.ConfigurableFactory),
}
}

Expand All @@ -104,6 +105,7 @@ type Service struct {
flags *flags.Default
configProcessor *config.Processor
requestTimeout time.Duration
configurableFactorMap map[string]interfaces.ConfigurableFactory
}

type commandLineFlags struct {
Expand Down Expand Up @@ -281,7 +283,11 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
return nil, fmt.Errorf("pipline TargetType of '%s' is not supported", svc.config.GetWritableInfo().Pipeline.TargetType)
}

configurable := reflect.ValueOf(NewConfigurable(svc.lc, svc.SecretProvider()))
configurables := []reflect.Value{reflect.ValueOf(NewConfigurable(svc.lc, svc.SecretProvider()))}
for _, factory := range svc.configurableFactorMap {
instance := factory(svc.lc, svc.SecretProvider())
configurables = append(configurables, reflect.ValueOf(instance))
}
pipelineConfig := svc.config.GetWritableInfo().Pipeline

defaultExecutionOrder := strings.TrimSpace(pipelineConfig.ExecutionOrder)
Expand All @@ -294,7 +300,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
svc.lc.Debugf("Default Function Pipeline Execution Order: [%s]", pipelineConfig.ExecutionOrder)
functionNames := util.DeleteEmptyAndTrim(strings.FieldsFunc(defaultExecutionOrder, util.SplitComma))

transforms, err := svc.loadConfigurablePipelineTransforms(interfaces.DefaultPipelineId, functionNames, pipelineConfig.Functions, configurable)
transforms, err := svc.loadConfigurablePipelineTransforms(interfaces.DefaultPipelineId, functionNames, pipelineConfig.Functions, configurables)
if err != nil {
return nil, err
}
Expand All @@ -312,7 +318,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F

functionNames := util.DeleteEmptyAndTrim(strings.FieldsFunc(perTopicPipeline.ExecutionOrder, util.SplitComma))

transforms, err := svc.loadConfigurablePipelineTransforms(perTopicPipeline.Id, functionNames, pipelineConfig.Functions, configurable)
transforms, err := svc.loadConfigurablePipelineTransforms(perTopicPipeline.Id, functionNames, pipelineConfig.Functions, configurables)
if err != nil {
return nil, err
}
Expand All @@ -334,7 +340,7 @@ func (svc *Service) loadConfigurablePipelineTransforms(
pipelineId string,
executionOrder []string,
functions map[string]common.PipelineFunction,
configurable reflect.Value) ([]interfaces.AppFunction, error) {
configurables []reflect.Value) ([]interfaces.AppFunction, error) {
var transforms []interfaces.AppFunction

// set pipeline function parameter names to lowercase to avoid casing issues from what is in source configuration
Expand All @@ -347,7 +353,7 @@ func (svc *Service) loadConfigurablePipelineTransforms(
return nil, fmt.Errorf("function '%s' configuration not found in Pipeline.Functions section for pipeline '%s'", functionName, pipelineId)
}

functionValue, functionType, err := svc.findMatchingFunction(configurable, functionName)
functionValue, functionType, err := svc.findMatchingFunctionWrapper(configurables, functionName)
if err != nil {
return nil, fmt.Errorf("%s for pipeline '%s'", err.Error(), pipelineId)
}
Expand Down Expand Up @@ -766,3 +772,11 @@ func (svc *Service) PublishWithTopic(topic string, data any, contentType string)

return nil
}

func (svc *Service) RegisterExternalConfigurable(name string, f interfaces.ConfigurableFactory) {
svc.configurableFactorMap[name] = f
}

func (svc *Service) UnregisterExternalConfigurable(name string) {
delete(svc.configurableFactorMap, name)
}
7 changes: 7 additions & 0 deletions pkg/interfaces/mocks/ApplicationService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions pkg/interfaces/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,27 @@ type ApplicationService interface {
Publish(data any, contentType string) error
// PublishWithTopic pushes data to the MessageBus using given topic
PublishWithTopic(topic string, data any, contentType string) error
// RegisterExternalConfigurable registers a named ConfigurableFactory.
//
// This allows runtime users to plug in their own "Configurable"-style provider,
// i.e. a struct exposing methods such as:
//
// func (c *MyConfigurable) FilterByDeviceName(params map[string]string) interfaces.AppFunction
//
// Registration must happen before LoadConfigurableFunctionPipelines() is called,
// typically in init() or early in main(). Once registered, the functions on the
// provided configurable can be referenced in Writable.Pipeline configuration.
RegisterExternalConfigurable(name string, f ConfigurableFactory)
// UnregisterExternalConfigurable removes a previously registered factory by name.
//
// This can be used in tests or dynamic scenarios to clear or replace an external
// configurable provider. Usually registration is done once at startup and does
// not need to be removed in normal application flows.
UnregisterExternalConfigurable(name string)
}

// ConfigurableFactory creates a configurable instance given SDK logging/secret provider.
// Returned value should be a pointer to a struct whose methods have signature
//
// FuncName(parameters map[string]string) interfaces.AppFunction
type ConfigurableFactory func(lc logger.LoggingClient, sp bootstrapInterfaces.SecretProvider) interface{}