Skip to content

Add labels as headers in gRPC connections #1155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,28 +106,29 @@ func (cp *CommandPlugin) Info() *bus.Info {

func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "Processing command")
ctxWithMetadata := cp.config.NewContextWithLabels(ctx)

if logger.ServerType(ctx) == "" {
ctx = context.WithValue(
ctx,
if logger.ServerType(ctxWithMetadata) == "" {
ctxWithMetadata = context.WithValue(
ctxWithMetadata,
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType.String()),
)
}

if logger.ServerType(ctx) == cp.commandServerType.String() {
if logger.ServerType(ctxWithMetadata) == cp.commandServerType.String() {
switch msg.Topic {
case bus.ConnectionResetTopic:
cp.processConnectionReset(ctx, msg)
cp.processConnectionReset(ctxWithMetadata, msg)
case bus.ResourceUpdateTopic:
cp.processResourceUpdate(ctx, msg)
cp.processResourceUpdate(ctxWithMetadata, msg)
case bus.InstanceHealthTopic:
cp.processInstanceHealth(ctx, msg)
cp.processInstanceHealth(ctxWithMetadata, msg)
case bus.DataPlaneHealthResponseTopic:
cp.processDataPlaneHealth(ctx, msg)
cp.processDataPlaneHealth(ctxWithMetadata, msg)
case bus.DataPlaneResponseTopic:
cp.processDataPlaneResponse(ctx, msg)
cp.processDataPlaneResponse(ctxWithMetadata, msg)
default:
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
slog.DebugContext(ctxWithMetadata, "Command plugin received unknown topic", "topic", msg.Topic)
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func ResolveConfig() (*Config, error) {
}

checkCollectorConfiguration(collector, config)
addLabelsAsOTelHeaders(collector, config.Labels)

slog.Debug("Agent config", "config", config)
slog.Info("Excluded files from being watched for file changes", "exclude_files",
Expand Down Expand Up @@ -209,6 +210,22 @@ func defaultCollector(collector *Collector, config *Config) {
}
}

func addLabelsAsOTelHeaders(collector *Collector, labels map[string]any) {
slog.Debug("Adding labels as headers to collector", "labels", labels)
if collector.Extensions.HeadersSetter != nil {
for key, value := range labels {
valueString, ok := value.(string)
if ok {
collector.Extensions.HeadersSetter.Headers = append(collector.Extensions.HeadersSetter.Headers, Header{
Action: "insert",
Key: key,
Value: valueString,
})
}
}
}
}

func setVersion(version, commit string) {
RootCommand.Version = version + "-" + commit
viperInstance.SetDefault(VersionKey, version)
Expand Down
15 changes: 15 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"os"
"path"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -63,6 +64,10 @@ func TestResolveConfig(t *testing.T) {

actual, err := ResolveConfig()
require.NoError(t, err)
sort.Slice(actual.Collector.Extensions.HeadersSetter.Headers, func(i, j int) bool {
headers := actual.Collector.Extensions.HeadersSetter.Headers
return headers[i].Key < headers[j].Key
})
assert.Equal(t, createConfig(), actual)
}

Expand Down Expand Up @@ -1059,6 +1064,16 @@ func createConfig() *Config {
Key: "key",
Value: "value",
},
{
Action: "insert",
Key: "label1",
Value: "label 1",
},
{
Action: "insert",
Key: "label2",
Value: "new-value",
},
},
},
},
Expand Down
15 changes: 15 additions & 0 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
package config

import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"
"time"

"google.golang.org/grpc/metadata"

"github.com/google/uuid"
)

Expand Down Expand Up @@ -432,6 +435,18 @@ func (c *Config) AreReceiversConfigured() bool {
len(c.Collector.Receivers.TcplogReceivers) > 0
}

func (c *Config) NewContextWithLabels(ctx context.Context) context.Context {
md := metadata.Pairs()
for key, value := range c.Labels {
valueString, ok := value.(string)
if ok {
md.Set(key, valueString)
}
}

return metadata.NewOutgoingContext(ctx, md)
}

func isAllowedDir(dir string, allowedDirs []string) bool {
if !strings.HasSuffix(dir, "/") && filepath.Ext(dir) == "" {
dir += "/"
Expand Down
26 changes: 14 additions & 12 deletions internal/file/file_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,34 +83,36 @@ func (fp *FilePlugin) Info() *bus.Info {

// nolint: cyclop, revive
func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
ctxWithMetadata := fp.config.NewContextWithLabels(ctx)

if logger.ServerType(ctx) == "" {
ctx = context.WithValue(
ctx,
ctxWithMetadata = context.WithValue(
ctxWithMetadata,
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()),
)
}

if logger.ServerType(ctx) == fp.serverType.String() {
if logger.ServerType(ctxWithMetadata) == fp.serverType.String() {
switch msg.Topic {
case bus.ConnectionResetTopic:
fp.handleConnectionReset(ctx, msg)
fp.handleConnectionReset(ctxWithMetadata, msg)
case bus.ConnectionCreatedTopic:
slog.DebugContext(ctx, "File plugin received connection created message")
slog.DebugContext(ctxWithMetadata, "File plugin received connection created message")
fp.fileManagerService.SetIsConnected(true)
case bus.NginxConfigUpdateTopic:
fp.handleNginxConfigUpdate(ctx, msg)
fp.handleNginxConfigUpdate(ctxWithMetadata, msg)
case bus.ConfigUploadRequestTopic:
fp.handleConfigUploadRequest(ctx, msg)
fp.handleConfigUploadRequest(ctxWithMetadata, msg)
case bus.ConfigApplyRequestTopic:
fp.handleConfigApplyRequest(ctx, msg)
fp.handleConfigApplyRequest(ctxWithMetadata, msg)
case bus.ConfigApplyCompleteTopic:
fp.handleConfigApplyComplete(ctx, msg)
fp.handleConfigApplyComplete(ctxWithMetadata, msg)
case bus.ConfigApplySuccessfulTopic:
fp.handleConfigApplySuccess(ctx, msg)
fp.handleConfigApplySuccess(ctxWithMetadata, msg)
case bus.ConfigApplyFailedTopic:
fp.handleConfigApplyFailedRequest(ctx, msg)
fp.handleConfigApplyFailedRequest(ctxWithMetadata, msg)
default:
slog.DebugContext(ctx, "File plugin received unknown topic", "topic", msg.Topic)
slog.DebugContext(ctxWithMetadata, "File plugin received unknown topic", "topic", msg.Topic)
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions test/mock/collector/nginx-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ allowed_directories:
- /usr/local/etc/nginx
- /usr/share/nginx/modules
- /var/run/nginx

labels:
product-type: mock-product
product-version: v1.0.0

client:
http:
Expand Down
20 changes: 16 additions & 4 deletions test/mock/grpc/mock_management_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ var (
Time: keepAliveTime,
Timeout: keepAliveTimeout,
}

errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
)

type MockManagementServer struct {
Expand Down Expand Up @@ -146,13 +149,15 @@ func serverOptions(agentConfig *config.Config) []grpc.ServerOption {
opts = append(opts, grpc.ChainUnaryInterceptor(
grpcvalidator.UnaryServerInterceptor(),
protovalidateInterceptor.UnaryServerInterceptor(validator),
logHeaders,
),
)
} else {
opts = append(opts, grpc.ChainUnaryInterceptor(
grpcvalidator.UnaryServerInterceptor(),
protovalidateInterceptor.UnaryServerInterceptor(validator),
ensureValidToken,
logHeaders,
),
)
}
Expand Down Expand Up @@ -242,10 +247,6 @@ func reportHealth(healthcheck *health.Server, agentConfig *config.Config) {
}

func ensureValidToken(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
var (
errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
)
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMissingMetadata
Expand All @@ -270,3 +271,14 @@ func valid(authorization []string) bool {
// for a token matching an arbitrary string.
return token == "1234"
}

func logHeaders(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMissingMetadata
}

slog.InfoContext(ctx, "Request headers", "headers", md)

return handler(ctx, req)
}