Skip to content
58 changes: 43 additions & 15 deletions internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
pahoMqtt "github.com/eclipse/paho.mqtt.golang"
nethttp "net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -219,21 +220,24 @@ func (svc *Service) Run() error {

svc.webserver.StartWebServer(httpErrors)

// determine input type and create trigger for it
t := svc.setupTrigger(svc.config)
if t == nil {
return errors.New("failed to create Trigger")
}

// Initialize the trigger (i.e. start a web server, or connect to message bus)
deferred, err := t.Initialize(svc.ctx.appWg, svc.ctx.appCtx, svc.backgroundPublishChannel)
if err != nil {
svc.lc.Error(err.Error())
return errors.New("failed to initialize Trigger")
}

// deferred is a function that needs to be called when services exits.
svc.addDeferred(deferred)
//// determine input type and create trigger for it
//t := svc.setupTrigger(svc.config)
//if t == nil {
// return nil, errors.New("failed to create Trigger")
//}
//
//// Initialize the trigger (i.e. start a web server, or connect to message bus)
//mqttTrigger, deferred, err := t.Initialize(svc.ctx.appWg, svc.ctx.appCtx, svc.backgroundPublishChannel)
//if err != nil {
// svc.lc.Error(err.Error())
// return nil, errors.New("failed to initialize Trigger")
//}
//
//x := mqttTrigger.MqttClient
////x.Publish("/sys/xpsYHExTKPFaQMS7/0005002403260001/s/event/rawReport", 0, false, "{\"good\":\"good\"}")
//
//// deferred is a function that needs to be called when services exits.
//svc.addDeferred(deferred)

if svc.config.Writable.StoreAndForward.Enabled {
svc.startStoreForward()
Expand Down Expand Up @@ -636,6 +640,30 @@ func (svc *Service) RegistryClient() registry.Client {
return bootstrapContainer.RegistryFrom(svc.dic.Get)
}

// TriggerMqttClient returns the pahoMqtt.Client.
func (svc *Service) TriggerMqttClient() (*pahoMqtt.Client, error) {
// determine input type and create trigger for it
t := svc.setupTrigger(svc.config)
if t == nil {
return nil, errors.New("failed to create Trigger")
}

// Initialize the trigger (i.e. start a web server, or connect to message bus)
mqttTrigger, deferred, err := t.Initialize(svc.ctx.appWg, svc.ctx.appCtx, svc.backgroundPublishChannel)
if err != nil {
svc.lc.Error(err.Error())
return nil, errors.New("failed to initialize Trigger")
}

x := mqttTrigger.MqttClient
//x.Publish("/sys/xpsYHExTKPFaQMS7/0005002403260001/s/event/rawReport", 0, false, "{\"good\":\"good\"}")

// deferred is a function that needs to be called when services exits.
svc.addDeferred(deferred)

return &x, nil
}

// EventClient returns the Event client, which may be nil, from the dependency injection container
func (svc *Service) EventClient() clientInterfaces.EventClient {
return bootstrapContainer.EventClientFrom(svc.dic.Get)
Expand Down
64 changes: 33 additions & 31 deletions internal/app/triggerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"strings"

"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/trigger/http"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/trigger/messagebus"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/trigger/mqtt"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
)
Expand All @@ -36,39 +34,43 @@ const (
TriggerTypeHTTP = "HTTP"
)

func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct) interfaces.Trigger {
var t interfaces.Trigger
func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct) *mqtt.Trigger {
var t *mqtt.Trigger
//var t interfaces.Trigger

serviceBinding := NewTriggerServiceBinding(svc)
messageProcessor := NewTriggerMessageProcessor(serviceBinding, svc.MetricsManager())

switch triggerType := strings.ToUpper(configuration.Trigger.Type); triggerType {
case TriggerTypeHTTP:
svc.LoggingClient().Info("HTTP trigger selected")
t = http.NewTrigger(serviceBinding, messageProcessor, svc.webserver)

case TriggerTypeMessageBus:
svc.LoggingClient().Info("EdgeX MessageBus trigger selected")
t = messagebus.NewTrigger(serviceBinding, messageProcessor, svc.dic)

case TriggerTypeMQTT:
svc.LoggingClient().Info("External MQTT trigger selected")
t = mqtt.NewTrigger(serviceBinding, messageProcessor)

default:
if factory, found := svc.customTriggerFactories[triggerType]; found {
var err error
t, err = factory(svc)
if err != nil {
svc.LoggingClient().Errorf("failed to initialize custom trigger [%s]: %s", triggerType, err.Error())
return nil
}
} else if len(configuration.Trigger.Type) == 0 {
svc.LoggingClient().Error("Trigger type not found, missing common config? Use -cp or -cc flags for common config")
} else {
svc.LoggingClient().Errorf("Invalid Trigger type of '%s' specified", configuration.Trigger.Type)
}
}
//switch triggerType := strings.ToUpper(configuration.Trigger.Type); triggerType {
//case TriggerTypeHTTP:
// svc.LoggingClient().Info("HTTP trigger selected")
// t = http.NewTrigger(serviceBinding, messageProcessor, svc.webserver)
//
//case TriggerTypeMessageBus:
// svc.LoggingClient().Info("EdgeX MessageBus trigger selected")
// t = messagebus.NewTrigger(serviceBinding, messageProcessor, svc.dic)
//
//case TriggerTypeMQTT:
// svc.LoggingClient().Info("External MQTT trigger selected")
// t = mqtt.NewTrigger(serviceBinding, messageProcessor)
//
//default:
// if factory, found := svc.customTriggerFactories[triggerType]; found {
// var err error
// t, err = factory(svc)
// if err != nil {
// svc.LoggingClient().Errorf("failed to initialize custom trigger [%s]: %s", triggerType, err.Error())
// return nil
// }
// } else if len(configuration.Trigger.Type) == 0 {
// svc.LoggingClient().Error("Trigger type not found, missing common config? Use -cp or -cc flags for common config")
// } else {
// svc.LoggingClient().Errorf("Invalid Trigger type of '%s' specified", configuration.Trigger.Type)
// }
//}

svc.LoggingClient().Info("External MQTT trigger selected")
t = mqtt.NewTrigger(serviceBinding, messageProcessor)

return t
}
Expand Down
24 changes: 12 additions & 12 deletions internal/trigger/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
type Trigger struct {
messageProcessor trigger.MessageProcessor
serviceBinding trigger.ServiceBinding
mqttClient pahoMqtt.Client
MqttClient pahoMqtt.Client
qos byte
retain bool
publishTopic string
Expand All @@ -68,7 +68,7 @@ func NewTrigger(bnd trigger.ServiceBinding, mp trigger.MessageProcessor) *Trigge
}

// Initialize initializes the Trigger for an external MQTT broker
func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, background <-chan interfaces.BackgroundMessage) (bootstrap.Deferred, error) {
func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, background <-chan interfaces.BackgroundMessage) (*Trigger, bootstrap.Deferred, error) {
// Convenience shortcuts
lc := trigger.serviceBinding.LoggingClient()
config := trigger.serviceBinding.Config()
Expand All @@ -83,16 +83,16 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg
lc.Info("Initializing MQTT Trigger")

if background != nil {
return nil, errors.New("background publishing not supported for services using MQTT trigger")
return trigger, nil, errors.New("background publishing not supported for services using MQTT trigger")
}

if len(strings.TrimSpace(topics)) == 0 {
return nil, fmt.Errorf("missing SubscribeTopics for MQTT Trigger. Must be present in [Trigger.ExternalMqtt] section")
return trigger, nil, fmt.Errorf("missing SubscribeTopics for MQTT Trigger. Must be present in [Trigger.ExternalMqtt] section")
}

brokerUrl, err := url.Parse(brokerConfig.Url)
if err != nil {
return nil, fmt.Errorf("invalid MQTT Broker Url '%s': %s", config.Trigger.ExternalMqtt.Url, err.Error())
return trigger, nil, fmt.Errorf("invalid MQTT Broker Url '%s': %s", config.Trigger.ExternalMqtt.Url, err.Error())
}

opts := pahoMqtt.NewClientOptions()
Expand All @@ -102,7 +102,7 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg
if len(brokerConfig.ConnectTimeout) > 0 {
duration, err := time.ParseDuration(brokerConfig.ConnectTimeout)
if err != nil {
return nil, fmt.Errorf("invalid MQTT ConnectTimeout '%s': %s", brokerConfig.ConnectTimeout, err.Error())
return trigger, nil, fmt.Errorf("invalid MQTT ConnectTimeout '%s': %s", brokerConfig.ConnectTimeout, err.Error())
}
opts.ConnectTimeout = duration
}
Expand Down Expand Up @@ -132,25 +132,25 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg
}
select {
case <-ctx.Done():
return nil, errors.New("aborted MQTT Trigger initialization")
return trigger, nil, errors.New("aborted MQTT Trigger initialization")
default:
lc.Warnf("%s. Attempt to create MQTT client again after %d seconds...", err.Error(), brokerConfig.RetryInterval)
timer.SleepForInterval()
}
}

if err != nil {
return nil, fmt.Errorf("unable to create MQTT Client: %s", err.Error())
return trigger, nil, fmt.Errorf("unable to create MQTT Client: %s", err.Error())
}

deferred := func() {
lc.Info("Disconnecting from broker for MQTT trigger")
trigger.mqttClient.Disconnect(0)
trigger.MqttClient.Disconnect(0)
}

trigger.mqttClient = mqttClient
trigger.MqttClient = mqttClient

return deferred, nil
return trigger, deferred, nil
}

func (trigger *Trigger) onConnectHandler(mqttClient pahoMqtt.Client) {
Expand Down Expand Up @@ -221,7 +221,7 @@ func (trigger *Trigger) responseHandler(appContext interfaces.AppFunctionContext
return err
}

if token := trigger.mqttClient.Publish(formattedTopic, trigger.qos, trigger.retain, appContext.ResponseData()); token.Wait() && token.Error() != nil {
if token := trigger.MqttClient.Publish(formattedTopic, trigger.qos, trigger.retain, appContext.ResponseData()); token.Wait() && token.Error() != nil {
lc.Errorf("MQTT trigger: Could not publish to topic '%s' for pipeline '%s': %s",
formattedTopic,
pipeline.Id,
Expand Down
3 changes: 3 additions & 0 deletions pkg/interfaces/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package interfaces

import (
"context"
pahoMqtt "github.com/eclipse/paho.mqtt.golang"
"net/http"
"time"

Expand Down Expand Up @@ -176,6 +177,8 @@ type ApplicationService interface {
// RegistryClient returns the Registry client. Note the registry must been enable, otherwise this will return nil.
// Useful if service needs to add additional health checks or needs to get endpoint of another registered service
RegistryClient() registry.Client
//TriggerMqttClient returns the pahoMqtt.Client.
TriggerMqttClient() (*pahoMqtt.Client, error)
// MetricsManager returns the Metrics Manager used to register counter, gauge, gaugeFloat64 or timer metric types from
// github.com/rcrowley/go-metrics
MetricsManager() bootstrapInterfaces.MetricsManager
Expand Down