diff --git a/internal/app/service.go b/internal/app/service.go index 7d92bf430..5d7d1752d 100644 --- a/internal/app/service.go +++ b/internal/app/service.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + pahoMqtt "github.com/eclipse/paho.mqtt.golang" nethttp "net/http" "os" "os/signal" @@ -219,21 +220,24 @@ func (svc *Service) Run() error { svc.webserver.StartWebServer(httpErrors) - // determine input type and create trigger for it - t := svc.setupTrigger(svc.config) - if t == nil { - return errors.New("failed to create Trigger") - } - - // Initialize the trigger (i.e. start a web server, or connect to message bus) - deferred, err := t.Initialize(svc.ctx.appWg, svc.ctx.appCtx, svc.backgroundPublishChannel) - if err != nil { - svc.lc.Error(err.Error()) - return errors.New("failed to initialize Trigger") - } - - // deferred is a function that needs to be called when services exits. - svc.addDeferred(deferred) + //// determine input type and create trigger for it + //t := svc.setupTrigger(svc.config) + //if t == nil { + // return nil, errors.New("failed to create Trigger") + //} + // + //// Initialize the trigger (i.e. start a web server, or connect to message bus) + //mqttTrigger, deferred, err := t.Initialize(svc.ctx.appWg, svc.ctx.appCtx, svc.backgroundPublishChannel) + //if err != nil { + // svc.lc.Error(err.Error()) + // return nil, errors.New("failed to initialize Trigger") + //} + // + //x := mqttTrigger.MqttClient + ////x.Publish("/sys/xpsYHExTKPFaQMS7/0005002403260001/s/event/rawReport", 0, false, "{\"good\":\"good\"}") + // + //// deferred is a function that needs to be called when services exits. + //svc.addDeferred(deferred) if svc.config.Writable.StoreAndForward.Enabled { svc.startStoreForward() @@ -636,6 +640,30 @@ func (svc *Service) RegistryClient() registry.Client { return bootstrapContainer.RegistryFrom(svc.dic.Get) } +// TriggerMqttClient returns the pahoMqtt.Client. +func (svc *Service) TriggerMqttClient() (*pahoMqtt.Client, error) { + // determine input type and create trigger for it + t := svc.setupTrigger(svc.config) + if t == nil { + return nil, errors.New("failed to create Trigger") + } + + // Initialize the trigger (i.e. start a web server, or connect to message bus) + mqttTrigger, deferred, err := t.Initialize(svc.ctx.appWg, svc.ctx.appCtx, svc.backgroundPublishChannel) + if err != nil { + svc.lc.Error(err.Error()) + return nil, errors.New("failed to initialize Trigger") + } + + x := mqttTrigger.MqttClient + //x.Publish("/sys/xpsYHExTKPFaQMS7/0005002403260001/s/event/rawReport", 0, false, "{\"good\":\"good\"}") + + // deferred is a function that needs to be called when services exits. + svc.addDeferred(deferred) + + return &x, nil +} + // EventClient returns the Event client, which may be nil, from the dependency injection container func (svc *Service) EventClient() clientInterfaces.EventClient { return bootstrapContainer.EventClientFrom(svc.dic.Get) diff --git a/internal/app/triggerfactory.go b/internal/app/triggerfactory.go index c706dcc90..ef93194af 100644 --- a/internal/app/triggerfactory.go +++ b/internal/app/triggerfactory.go @@ -23,8 +23,6 @@ import ( "strings" "github.com/edgexfoundry/app-functions-sdk-go/v3/internal/common" - "github.com/edgexfoundry/app-functions-sdk-go/v3/internal/trigger/http" - "github.com/edgexfoundry/app-functions-sdk-go/v3/internal/trigger/messagebus" "github.com/edgexfoundry/app-functions-sdk-go/v3/internal/trigger/mqtt" "github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces" ) @@ -36,39 +34,43 @@ const ( TriggerTypeHTTP = "HTTP" ) -func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct) interfaces.Trigger { - var t interfaces.Trigger +func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct) *mqtt.Trigger { + var t *mqtt.Trigger + //var t interfaces.Trigger serviceBinding := NewTriggerServiceBinding(svc) messageProcessor := NewTriggerMessageProcessor(serviceBinding, svc.MetricsManager()) - switch triggerType := strings.ToUpper(configuration.Trigger.Type); triggerType { - case TriggerTypeHTTP: - svc.LoggingClient().Info("HTTP trigger selected") - t = http.NewTrigger(serviceBinding, messageProcessor, svc.webserver) - - case TriggerTypeMessageBus: - svc.LoggingClient().Info("EdgeX MessageBus trigger selected") - t = messagebus.NewTrigger(serviceBinding, messageProcessor, svc.dic) - - case TriggerTypeMQTT: - svc.LoggingClient().Info("External MQTT trigger selected") - t = mqtt.NewTrigger(serviceBinding, messageProcessor) - - default: - if factory, found := svc.customTriggerFactories[triggerType]; found { - var err error - t, err = factory(svc) - if err != nil { - svc.LoggingClient().Errorf("failed to initialize custom trigger [%s]: %s", triggerType, err.Error()) - return nil - } - } else if len(configuration.Trigger.Type) == 0 { - svc.LoggingClient().Error("Trigger type not found, missing common config? Use -cp or -cc flags for common config") - } else { - svc.LoggingClient().Errorf("Invalid Trigger type of '%s' specified", configuration.Trigger.Type) - } - } + //switch triggerType := strings.ToUpper(configuration.Trigger.Type); triggerType { + //case TriggerTypeHTTP: + // svc.LoggingClient().Info("HTTP trigger selected") + // t = http.NewTrigger(serviceBinding, messageProcessor, svc.webserver) + // + //case TriggerTypeMessageBus: + // svc.LoggingClient().Info("EdgeX MessageBus trigger selected") + // t = messagebus.NewTrigger(serviceBinding, messageProcessor, svc.dic) + // + //case TriggerTypeMQTT: + // svc.LoggingClient().Info("External MQTT trigger selected") + // t = mqtt.NewTrigger(serviceBinding, messageProcessor) + // + //default: + // if factory, found := svc.customTriggerFactories[triggerType]; found { + // var err error + // t, err = factory(svc) + // if err != nil { + // svc.LoggingClient().Errorf("failed to initialize custom trigger [%s]: %s", triggerType, err.Error()) + // return nil + // } + // } else if len(configuration.Trigger.Type) == 0 { + // svc.LoggingClient().Error("Trigger type not found, missing common config? Use -cp or -cc flags for common config") + // } else { + // svc.LoggingClient().Errorf("Invalid Trigger type of '%s' specified", configuration.Trigger.Type) + // } + //} + + svc.LoggingClient().Info("External MQTT trigger selected") + t = mqtt.NewTrigger(serviceBinding, messageProcessor) return t } diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index cb4ddedb5..59a0b6b2a 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -37,7 +37,6 @@ import ( "github.com/edgexfoundry/go-mod-core-contracts/v3/dtos" "github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests" edgexErrors "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" - "github.com/edgexfoundry/go-mod-core-contracts/v3/models" "github.com/edgexfoundry/go-mod-messaging/v3/pkg/types" "github.com/fxamacker/cbor/v2" @@ -250,43 +249,46 @@ func (fpr *FunctionsPipelineRuntime) DecodeMessage(appContext *appfunction.Conte // Must make a copy of the type so that data isn't retained between calls for custom types target := reflect.New(reflect.ValueOf(fpr.TargetType).Elem().Type()).Interface() - switch target.(type) { - case *[]byte: - fpr.lc.Debug("Expecting raw byte data") - target = &envelope.Payload - - case *dtos.Event: - fpr.lc.Debug("Expecting an AddEventRequest or Event DTO") - - // Dynamically process either AddEventRequest or Event DTO - event, err := fpr.processEventPayload(envelope) - if err != nil { - err = fmt.Errorf("unable to process payload %s", err.Error()) - fpr.logError(err, envelope.CorrelationID) - return nil, &MessageError{Err: err, ErrorCode: http.StatusBadRequest}, true - } - - if fpr.lc.LogLevel() == models.DebugLog { - fpr.debugLogEvent(event) - } - - appContext.AddValue(interfaces.DEVICENAME, event.DeviceName) - appContext.AddValue(interfaces.PROFILENAME, event.ProfileName) - appContext.AddValue(interfaces.SOURCENAME, event.SourceName) - - target = event - - default: - customTypeName := di.TypeInstanceToName(target) - fpr.lc.Debugf("Expecting a custom type of %s", customTypeName) - - // Expecting a custom type so just unmarshal into the target type. - if err := fpr.unmarshalPayload(envelope, target); err != nil { - err = fmt.Errorf("unable to process custom object received of type '%s': %s", customTypeName, err.Error()) - fpr.logError(err, envelope.CorrelationID) - return nil, &MessageError{Err: err, ErrorCode: http.StatusBadRequest}, true - } - } + //switch target.(type) { + //case *[]byte: + // fpr.lc.Debug("Expecting raw byte data") + // target = &envelope.Payload + // + //case *dtos.Event: + // fpr.lc.Debug("Expecting an AddEventRequest or Event DTO") + // + // // Dynamically process either AddEventRequest or Event DTO + // event, err := fpr.processEventPayload(envelope) + // if err != nil { + // err = fmt.Errorf("unable to process payload %s", err.Error()) + // fpr.logError(err, envelope.CorrelationID) + // return nil, &MessageError{Err: err, ErrorCode: http.StatusBadRequest}, true + // } + // + // if fpr.lc.LogLevel() == models.DebugLog { + // fpr.debugLogEvent(event) + // } + // + // appContext.AddValue(interfaces.DEVICENAME, event.DeviceName) + // appContext.AddValue(interfaces.PROFILENAME, event.ProfileName) + // appContext.AddValue(interfaces.SOURCENAME, event.SourceName) + // + // target = event + // + //default: + // customTypeName := di.TypeInstanceToName(target) + // fpr.lc.Debugf("Expecting a custom type of %s", customTypeName) + // + // // Expecting a custom type so just unmarshal into the target type. + // if err := fpr.unmarshalPayload(envelope, target); err != nil { + // err = fmt.Errorf("unable to process custom object received of type '%s': %s", customTypeName, err.Error()) + // fpr.logError(err, envelope.CorrelationID) + // return nil, &MessageError{Err: err, ErrorCode: http.StatusBadRequest}, true + // } + //} + + fpr.lc.Debug("Expecting raw byte data") + target = &envelope.Payload appContext.SetCorrelationID(envelope.CorrelationID) appContext.SetInputContentType(envelope.ContentType) @@ -383,11 +385,13 @@ func (fpr *FunctionsPipelineRuntime) processEventPayload(envelope types.MessageE event := &dtos.Event{} err := fpr.unmarshalPayload(envelope, event) if err == nil { - err = common.Validate(event) - if err == nil { - fpr.lc.Debug("Using Event DTO received") - return event, nil - } + //err = common.Validate(event) + //if err == nil { + // fpr.lc.Debug("Using Event DTO received") + // return event, nil + //} + fpr.lc.Debug("Using Event DTO received") + return event, nil } // Check for validation error diff --git a/internal/trigger/mqtt/mqtt.go b/internal/trigger/mqtt/mqtt.go index bed7f48ff..4c77c5580 100644 --- a/internal/trigger/mqtt/mqtt.go +++ b/internal/trigger/mqtt/mqtt.go @@ -52,7 +52,7 @@ const ( type Trigger struct { messageProcessor trigger.MessageProcessor serviceBinding trigger.ServiceBinding - mqttClient pahoMqtt.Client + MqttClient pahoMqtt.Client qos byte retain bool publishTopic string @@ -68,7 +68,7 @@ func NewTrigger(bnd trigger.ServiceBinding, mp trigger.MessageProcessor) *Trigge } // Initialize initializes the Trigger for an external MQTT broker -func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, background <-chan interfaces.BackgroundMessage) (bootstrap.Deferred, error) { +func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, background <-chan interfaces.BackgroundMessage) (*Trigger, bootstrap.Deferred, error) { // Convenience shortcuts lc := trigger.serviceBinding.LoggingClient() config := trigger.serviceBinding.Config() @@ -83,16 +83,16 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg lc.Info("Initializing MQTT Trigger") if background != nil { - return nil, errors.New("background publishing not supported for services using MQTT trigger") + return trigger, nil, errors.New("background publishing not supported for services using MQTT trigger") } if len(strings.TrimSpace(topics)) == 0 { - return nil, fmt.Errorf("missing SubscribeTopics for MQTT Trigger. Must be present in [Trigger.ExternalMqtt] section") + return trigger, nil, fmt.Errorf("missing SubscribeTopics for MQTT Trigger. Must be present in [Trigger.ExternalMqtt] section") } brokerUrl, err := url.Parse(brokerConfig.Url) if err != nil { - return nil, fmt.Errorf("invalid MQTT Broker Url '%s': %s", config.Trigger.ExternalMqtt.Url, err.Error()) + return trigger, nil, fmt.Errorf("invalid MQTT Broker Url '%s': %s", config.Trigger.ExternalMqtt.Url, err.Error()) } opts := pahoMqtt.NewClientOptions() @@ -102,7 +102,7 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg if len(brokerConfig.ConnectTimeout) > 0 { duration, err := time.ParseDuration(brokerConfig.ConnectTimeout) if err != nil { - return nil, fmt.Errorf("invalid MQTT ConnectTimeout '%s': %s", brokerConfig.ConnectTimeout, err.Error()) + return trigger, nil, fmt.Errorf("invalid MQTT ConnectTimeout '%s': %s", brokerConfig.ConnectTimeout, err.Error()) } opts.ConnectTimeout = duration } @@ -132,7 +132,7 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg } select { case <-ctx.Done(): - return nil, errors.New("aborted MQTT Trigger initialization") + return trigger, nil, errors.New("aborted MQTT Trigger initialization") default: lc.Warnf("%s. Attempt to create MQTT client again after %d seconds...", err.Error(), brokerConfig.RetryInterval) timer.SleepForInterval() @@ -140,17 +140,17 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg } if err != nil { - return nil, fmt.Errorf("unable to create MQTT Client: %s", err.Error()) + return trigger, nil, fmt.Errorf("unable to create MQTT Client: %s", err.Error()) } deferred := func() { lc.Info("Disconnecting from broker for MQTT trigger") - trigger.mqttClient.Disconnect(0) + trigger.MqttClient.Disconnect(0) } - trigger.mqttClient = mqttClient + trigger.MqttClient = mqttClient - return deferred, nil + return trigger, deferred, nil } func (trigger *Trigger) onConnectHandler(mqttClient pahoMqtt.Client) { @@ -221,7 +221,7 @@ func (trigger *Trigger) responseHandler(appContext interfaces.AppFunctionContext return err } - if token := trigger.mqttClient.Publish(formattedTopic, trigger.qos, trigger.retain, appContext.ResponseData()); token.Wait() && token.Error() != nil { + if token := trigger.MqttClient.Publish(formattedTopic, trigger.qos, trigger.retain, appContext.ResponseData()); token.Wait() && token.Error() != nil { lc.Errorf("MQTT trigger: Could not publish to topic '%s' for pipeline '%s': %s", formattedTopic, pipeline.Id, diff --git a/pkg/interfaces/service.go b/pkg/interfaces/service.go index cf9fb7f04..8e25f069c 100644 --- a/pkg/interfaces/service.go +++ b/pkg/interfaces/service.go @@ -17,6 +17,7 @@ package interfaces import ( "context" + pahoMqtt "github.com/eclipse/paho.mqtt.golang" "net/http" "time" @@ -176,6 +177,8 @@ type ApplicationService interface { // RegistryClient returns the Registry client. Note the registry must been enable, otherwise this will return nil. // Useful if service needs to add additional health checks or needs to get endpoint of another registered service RegistryClient() registry.Client + //TriggerMqttClient returns the pahoMqtt.Client. + TriggerMqttClient() (*pahoMqtt.Client, error) // MetricsManager returns the Metrics Manager used to register counter, gauge, gaugeFloat64 or timer metric types from // github.com/rcrowley/go-metrics MetricsManager() bootstrapInterfaces.MetricsManager