diff --git a/internal/app/configupdates.go b/internal/app/configupdates.go index f30370b03..b0548ac84 100644 --- a/internal/app/configupdates.go +++ b/internal/app/configupdates.go @@ -189,3 +189,13 @@ func (svc *Service) findMatchingFunction(configurable reflect.Value, functionNam functionType := functionValue.Type() return functionValue, functionType, nil } + +func (svc *Service) findMatchingFunctionWrapper(configurables []reflect.Value, functionName string) (functionValue reflect.Value, functionType reflect.Type, err error) { + for _, conconfigurable := range configurables { + functionValue, functionType, err = svc.findMatchingFunction(conconfigurable, functionName) + if err == nil { + return + } + } + return +} diff --git a/internal/app/service.go b/internal/app/service.go index 363a96eae..2b71ed93f 100644 --- a/internal/app/service.go +++ b/internal/app/service.go @@ -80,6 +80,7 @@ func NewService(serviceKey string, targetType interface{}, profileSuffixPlacehol serviceKey: serviceKey, targetType: targetType, profileSuffixPlaceholder: profileSuffixPlaceholder, + configurableFactorMap: make(map[string]interfaces.ConfigurableFactory), } } @@ -104,6 +105,7 @@ type Service struct { flags *flags.Default configProcessor *config.Processor requestTimeout time.Duration + configurableFactorMap map[string]interfaces.ConfigurableFactory } type commandLineFlags struct { @@ -281,7 +283,11 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F return nil, fmt.Errorf("pipline TargetType of '%s' is not supported", svc.config.GetWritableInfo().Pipeline.TargetType) } - configurable := reflect.ValueOf(NewConfigurable(svc.lc, svc.SecretProvider())) + configurables := []reflect.Value{reflect.ValueOf(NewConfigurable(svc.lc, svc.SecretProvider()))} + for _, factory := range svc.configurableFactorMap { + instance := factory(svc.lc, svc.SecretProvider()) + configurables = append(configurables, reflect.ValueOf(instance)) + } pipelineConfig := svc.config.GetWritableInfo().Pipeline defaultExecutionOrder := strings.TrimSpace(pipelineConfig.ExecutionOrder) @@ -294,7 +300,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F svc.lc.Debugf("Default Function Pipeline Execution Order: [%s]", pipelineConfig.ExecutionOrder) functionNames := util.DeleteEmptyAndTrim(strings.FieldsFunc(defaultExecutionOrder, util.SplitComma)) - transforms, err := svc.loadConfigurablePipelineTransforms(interfaces.DefaultPipelineId, functionNames, pipelineConfig.Functions, configurable) + transforms, err := svc.loadConfigurablePipelineTransforms(interfaces.DefaultPipelineId, functionNames, pipelineConfig.Functions, configurables) if err != nil { return nil, err } @@ -312,7 +318,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F functionNames := util.DeleteEmptyAndTrim(strings.FieldsFunc(perTopicPipeline.ExecutionOrder, util.SplitComma)) - transforms, err := svc.loadConfigurablePipelineTransforms(perTopicPipeline.Id, functionNames, pipelineConfig.Functions, configurable) + transforms, err := svc.loadConfigurablePipelineTransforms(perTopicPipeline.Id, functionNames, pipelineConfig.Functions, configurables) if err != nil { return nil, err } @@ -334,7 +340,7 @@ func (svc *Service) loadConfigurablePipelineTransforms( pipelineId string, executionOrder []string, functions map[string]common.PipelineFunction, - configurable reflect.Value) ([]interfaces.AppFunction, error) { + configurables []reflect.Value) ([]interfaces.AppFunction, error) { var transforms []interfaces.AppFunction // set pipeline function parameter names to lowercase to avoid casing issues from what is in source configuration @@ -347,7 +353,7 @@ func (svc *Service) loadConfigurablePipelineTransforms( return nil, fmt.Errorf("function '%s' configuration not found in Pipeline.Functions section for pipeline '%s'", functionName, pipelineId) } - functionValue, functionType, err := svc.findMatchingFunction(configurable, functionName) + functionValue, functionType, err := svc.findMatchingFunctionWrapper(configurables, functionName) if err != nil { return nil, fmt.Errorf("%s for pipeline '%s'", err.Error(), pipelineId) } @@ -766,3 +772,11 @@ func (svc *Service) PublishWithTopic(topic string, data any, contentType string) return nil } + +func (svc *Service) RegisterExternalConfigurable(name string, f interfaces.ConfigurableFactory) { + svc.configurableFactorMap[name] = f +} + +func (svc *Service) UnregisterExternalConfigurable(name string) { + delete(svc.configurableFactorMap, name) +} diff --git a/pkg/interfaces/mocks/ApplicationService.go b/pkg/interfaces/mocks/ApplicationService.go index 1cb8f2f09..71095f2c8 100644 --- a/pkg/interfaces/mocks/ApplicationService.go +++ b/pkg/interfaces/mocks/ApplicationService.go @@ -704,6 +704,13 @@ func (_m *ApplicationService) SubscriptionClient() clientsinterfaces.Subscriptio return r0 } +func (_m *ApplicationService) RegisterExternalConfigurable(name string, f interfaces.ConfigurableFactory) { + _m.Called() +} +func (_m *ApplicationService) UnregisterExternalConfigurable(name string) { + _m.Called() +} + // NewApplicationService creates a new instance of ApplicationService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewApplicationService(t interface { diff --git a/pkg/interfaces/service.go b/pkg/interfaces/service.go index 292540a86..0d03eb025 100644 --- a/pkg/interfaces/service.go +++ b/pkg/interfaces/service.go @@ -193,4 +193,27 @@ type ApplicationService interface { Publish(data any, contentType string) error // PublishWithTopic pushes data to the MessageBus using given topic PublishWithTopic(topic string, data any, contentType string) error + // RegisterExternalConfigurable registers a named ConfigurableFactory. + // + // This allows runtime users to plug in their own "Configurable"-style provider, + // i.e. a struct exposing methods such as: + // + // func (c *MyConfigurable) FilterByDeviceName(params map[string]string) interfaces.AppFunction + // + // Registration must happen before LoadConfigurableFunctionPipelines() is called, + // typically in init() or early in main(). Once registered, the functions on the + // provided configurable can be referenced in Writable.Pipeline configuration. + RegisterExternalConfigurable(name string, f ConfigurableFactory) + // UnregisterExternalConfigurable removes a previously registered factory by name. + // + // This can be used in tests or dynamic scenarios to clear or replace an external + // configurable provider. Usually registration is done once at startup and does + // not need to be removed in normal application flows. + UnregisterExternalConfigurable(name string) } + +// ConfigurableFactory creates a configurable instance given SDK logging/secret provider. +// Returned value should be a pointer to a struct whose methods have signature +// +// FuncName(parameters map[string]string) interfaces.AppFunction +type ConfigurableFactory func(lc logger.LoggingClient, sp bootstrapInterfaces.SecretProvider) interface{}