Bringing distributed tracing support to Watermill with OpenTelemetry.
go get github.com/nkonev/watermill-opentelemetry
package example
import (
"github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/garsue/watermillzap"
wotel "github.com/nkonev/watermill-opentelemetry"
"go.uber.org/zap"
)
type PublisherConfig struct {
Name string
GCPProjectID string
}
// NewPublisher instantiates a GCP Pub/Sub Publisher with tracing capabilities.
func NewPublisher(logger *zap.Logger, config PublisherConfig) (message.Publisher, error) {
publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{ProjectID: config.GCPProjectID},
watermillzap.NewLogger(logger),
)
if err != nil {
return nil, err
}
if config.Name == "" {
return wotel.NewPublisherDecorator(publisher), nil
}
return wotel.NewNamedPublisherDecorator(config.Name, publisher), nil
}
A tracing middleware can be defined at the router level:
package example
import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
wotel "github.com/nkonev/watermill-opentelemetry"
)
func InitTracedRouter() (*message.Router, error) {
router, err := message.NewRouter(message.RouterConfig{}, watermill.NopLogger{})
if err != nil {
return nil, err
}
router.AddMiddleware(wotel.Trace())
return router, nil
}
Alternatively, individual handlers can be traced:
package example
import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
wotel "github.com/nkonev/watermill-opentelemetry"
)
func InitRouter() (*message.Router, error) {
router, err := message.NewRouter(message.RouterConfig{}, watermill.NopLogger{})
if err != nil {
return nil, err
}
// subscriber definition omitted for clarity
subscriber := (message.Subscriber)(nil)
router.AddNoPublisherHandler(
"handler_name",
"subscribeTopic",
subscriber,
wotel.TraceNoPublishHandler(func(msg *message.Message) error {
return nil
}),
)
return router, nil
}
Apache 2.0, see LICENSE.md.