diff --git a/pkg/cli/internal/commands/run/run.go b/pkg/cli/internal/commands/run/run.go index 632076a..dd74e0b 100644 --- a/pkg/cli/internal/commands/run/run.go +++ b/pkg/cli/internal/commands/run/run.go @@ -12,16 +12,18 @@ import ( "github.com/cilium/ebpf/rlimit" "github.com/pkg/errors" "github.com/pterm/pterm" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "go.uber.org/zap" + "github.com/solo-io/bumblebee/pkg/cli/internal/options" "github.com/solo-io/bumblebee/pkg/decoder" "github.com/solo-io/bumblebee/pkg/loader" + "github.com/solo-io/bumblebee/pkg/loader/mapwatcher" "github.com/solo-io/bumblebee/pkg/spec" "github.com/solo-io/bumblebee/pkg/stats" "github.com/solo-io/bumblebee/pkg/tui" "github.com/solo-io/go-utils/contextutils" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "go.uber.org/zap" ) type runOptions struct { @@ -109,22 +111,18 @@ func run(cmd *cobra.Command, args []string, opts *runOptions) error { return err } - progLoader := loader.NewLoader( - decoder.NewDecoderFactory(), - promProvider, - ) - parsedELF, err := progLoader.Parse(ctx, progReader) + + parsedELF, err := loader.Parse(ctx, progReader) if err != nil { return fmt.Errorf("could not parse BPF program: %w", err) } - tuiApp, err := buildTuiApp(&progLoader, progLocation, opts.filter, parsedELF) + tuiApp, err := buildTuiApp(progLocation, opts.filter, parsedELF) if err != nil { return err } - loaderOpts := loader.LoadOptions{ + loaderOpts := &loader.LoadOptions{ ParsedELF: parsedELF, - Watcher: tuiApp, PinMaps: opts.pinMaps, PinProgs: opts.pinProgs, } @@ -134,20 +132,38 @@ func run(cmd *cobra.Command, args []string, opts *runOptions) error { contextutils.LoggerFrom(ctx).Info("before calling tui.Run() context is done") return ctx.Err() } - if opts.notty { - fmt.Println("Calling Load...") - loaderOpts.Watcher = loader.NewNoopWatcher() - err = progLoader.Load(ctx, &loaderOpts) + + contextutils.LoggerFrom(ctx).Info("calling Load()") + loadedMaps, closeLifecycle, err := loader.Load(ctx, loaderOpts) + contextutils.LoggerFrom(ctx).Info("returned from Load()") + if err != nil { return err + } + + // Close our loaded program only if there's nothing set to explicitly extend our program's lifetime. + if opts.pinMaps == "" && opts.pinProgs == "" { + defer closeLifecycle() + } + + if opts.notty { + fmt.Println("Running in non-interactive mode. Hit Ctrl-C to exit.") + <-ctx.Done() } else { + mapWatcher := mapwatcher.New(parsedELF.WatchedMaps, loadedMaps, decoder.NewDecoderFactory(), promProvider) contextutils.LoggerFrom(ctx).Info("calling tui run()") - err = tuiApp.Run(ctx, progLoader, &loaderOpts) + err = tuiApp.Run(ctx, mapWatcher) contextutils.LoggerFrom(ctx).Info("after tui run()") return err } + + return nil } -func buildTuiApp(loader *loader.Loader, progLocation string, filterString []string, parsedELF *loader.ParsedELF) (*tui.App, error) { +func buildTuiApp( + progLocation string, + filterString []string, + parsedELF *loader.ParsedELF, +) (*tui.App, error) { // TODO: add filter to UI filter, err := tui.BuildFilter(filterString, parsedELF.WatchedMaps) if err != nil { diff --git a/pkg/loader/loader.go b/pkg/loader/loader.go index 6413283..fa514b4 100644 --- a/pkg/loader/loader.go +++ b/pkg/loader/loader.go @@ -5,90 +5,36 @@ import ( "errors" "fmt" "io" - "log" "os" "path/filepath" "strings" - "time" "github.com/cilium/ebpf" "github.com/cilium/ebpf/btf" "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/ringbuf" - "golang.org/x/sync/errgroup" - "github.com/solo-io/bumblebee/pkg/decoder" - "github.com/solo-io/bumblebee/pkg/stats" + "github.com/solo-io/bumblebee/pkg/loader/mapwatcher" + "github.com/solo-io/bumblebee/pkg/loader/util" "github.com/solo-io/go-utils/contextutils" ) type ParsedELF struct { Spec *ebpf.CollectionSpec - WatchedMaps map[string]WatchedMap + WatchedMaps map[string]mapwatcher.WatchedMap } type LoadOptions struct { ParsedELF *ParsedELF - Watcher MapWatcher PinMaps string PinProgs string } type Loader interface { Parse(ctx context.Context, reader io.ReaderAt) (*ParsedELF, error) - Load(ctx context.Context, opts *LoadOptions) error - WatchMaps(ctx context.Context, watchedMaps map[string]WatchedMap, coll map[string]*ebpf.Map, watcher MapWatcher) error + Load(ctx context.Context, opts *LoadOptions) (map[string]*ebpf.Map, func(), error) } -type WatchedMap struct { - Name string - Labels []string - - btf *btf.Map - mapType ebpf.MapType - mapSpec *ebpf.MapSpec - - valueStruct *btf.Struct -} - -type loader struct { - decoderFactory decoder.DecoderFactory - metricsProvider stats.MetricsProvider -} - -func NewLoader( - decoderFactory decoder.DecoderFactory, - metricsProvider stats.MetricsProvider, -) Loader { - return &loader{ - decoderFactory: decoderFactory, - metricsProvider: metricsProvider, - } -} - -const ( - counterMapType = "counter" - gaugeMapType = "gauge" - printMapType = "print" -) - -func isPrintMap(spec *ebpf.MapSpec) bool { - return strings.Contains(spec.SectionName, printMapType) -} - -func isGaugeMap(spec *ebpf.MapSpec) bool { - return strings.Contains(spec.SectionName, gaugeMapType) -} - -func isCounterMap(spec *ebpf.MapSpec) bool { - return strings.Contains(spec.SectionName, counterMapType) -} - -func isTrackedMap(spec *ebpf.MapSpec) bool { - return isCounterMap(spec) || isGaugeMap(spec) || isPrintMap(spec) -} - -func (l *loader) Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, error) { +func Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, error) { spec, err := ebpf.LoadCollectionSpecFromReader(progReader) if err != nil { return nil, err @@ -100,21 +46,21 @@ func (l *loader) Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, } } - watchedMaps := make(map[string]WatchedMap) + watchedMaps := make(map[string]mapwatcher.WatchedMap) for name, mapSpec := range spec.Maps { - if !isTrackedMap(mapSpec) { + if !util.IsTrackedMap(mapSpec) { continue } - watchedMap := WatchedMap{ + watchedMap := mapwatcher.WatchedMap{ Name: name, - btf: mapSpec.BTF, - mapType: mapSpec.Type, - mapSpec: mapSpec, + BTF: mapSpec.BTF, + MapType: mapSpec.Type, + MapSpec: mapSpec, } // TODO: Delete Hack if possible - if watchedMap.mapType == ebpf.RingBuf || watchedMap.mapType == ebpf.PerfEventArray { + if watchedMap.MapType == ebpf.RingBuf || watchedMap.MapType == ebpf.PerfEventArray { if _, ok := mapSpec.BTF.Value.(*btf.Struct); !ok { return nil, fmt.Errorf("the `value` member for map '%v' must be set to struct you will be submitting to the ringbuf/eventarray", name) } @@ -124,13 +70,13 @@ func (l *loader) Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, switch mapSpec.Type { case ebpf.RingBuf: - structType := watchedMap.btf.Value.(*btf.Struct) - watchedMap.valueStruct = structType - labelKeys := getLabelsForBtfStruct(structType) + structType := watchedMap.BTF.Value.(*btf.Struct) + watchedMap.ValueStruct = structType + labelKeys := util.GetLabelsForBtfStruct(structType) watchedMap.Labels = labelKeys case ebpf.Hash: - labelKeys, err := getLabelsForHashMapKey(mapSpec) + labelKeys, err := util.GetLabelsForHashMapKey(mapSpec) if err != nil { return nil, err } @@ -150,16 +96,14 @@ func (l *loader) Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, return &loadOptions, nil } -func (l *loader) Load(ctx context.Context, opts *LoadOptions) error { +func Load(ctx context.Context, opts *LoadOptions) (map[string]*ebpf.Map, func(), error) { // TODO: add invariant checks on opts contextutils.LoggerFrom(ctx).Info("enter Load()") - // on shutdown notify watcher we have no more entries to send - defer opts.Watcher.Close() // bail out before loading stuff into kernel if context canceled if ctx.Err() != nil { contextutils.LoggerFrom(ctx).Info("load entrypoint context is done") - return ctx.Err() + return nil, nil, ctx.Err() } if opts.PinMaps != "" { @@ -183,274 +127,72 @@ func (l *loader) Load(ctx context.Context, opts *LoadOptions) error { }, }) if err != nil { - return err + return nil, nil, err } - defer coll.Close() + var progLinks []link.Link // For each program, add kprope/tracepoint for name, prog := range spec.Programs { + var progLink link.Link select { case <-ctx.Done(): contextutils.LoggerFrom(ctx).Info("while loading progs context is done") - return ctx.Err() + return nil, nil, ctx.Err() default: switch prog.Type { case ebpf.Kprobe: - var kp link.Link var err error if strings.HasPrefix(prog.SectionName, "kretprobe/") { - kp, err = link.Kretprobe(prog.AttachTo, coll.Programs[name]) + progLink, err = link.Kretprobe(prog.AttachTo, coll.Programs[name]) if err != nil { - return fmt.Errorf("error attaching kretprobe '%v': %w", prog.Name, err) + return nil, nil, fmt.Errorf("error attaching kretprobe '%v': %w", prog.Name, err) } } else { - kp, err = link.Kprobe(prog.AttachTo, coll.Programs[name]) + progLink, err = link.Kprobe(prog.AttachTo, coll.Programs[name]) if err != nil { - return fmt.Errorf("error attaching kprobe '%v': %w", prog.Name, err) + return nil, nil, fmt.Errorf("error attaching kprobe '%v': %w", prog.Name, err) } } - defer kp.Close() case ebpf.TracePoint: - var tp link.Link var err error if strings.HasPrefix(prog.SectionName, "tracepoint/") { tokens := strings.Split(prog.AttachTo, "/") if len(tokens) != 2 { - return fmt.Errorf("unexpected tracepoint section '%v'", prog.AttachTo) + return nil, nil, fmt.Errorf("unexpected tracepoint section '%v'", prog.AttachTo) } - tp, err = link.Tracepoint(tokens[0], tokens[1], coll.Programs[name]) + progLink, err = link.Tracepoint(tokens[0], tokens[1], coll.Programs[name]) if err != nil { - return fmt.Errorf("error attaching to tracepoint '%v': %w", prog.Name, err) + return nil, nil, fmt.Errorf("error attaching to tracepoint '%v': %w", prog.Name, err) } } - defer tp.Close() default: - return errors.New("only kprobe programs supported") + return nil, nil, errors.New("only kprobe programs supported") } + progLinks = append(progLinks, progLink) + if opts.PinProgs != "" { if err := createDir(ctx, opts.PinProgs, 0700); err != nil { - return err + return nil, nil, err } pinFile := filepath.Join(opts.PinProgs, prog.Name) if err := coll.Programs[name].Pin(pinFile); err != nil { - return fmt.Errorf("could not pin program '%s': %v", prog.Name, err) + progLink.Close() + return nil, nil, fmt.Errorf("could not pin program '%s': %v", prog.Name, err) } fmt.Printf("Successfully pinned program '%v'\n", pinFile) } } } - return l.WatchMaps(ctx, opts.ParsedELF.WatchedMaps, coll.Maps, opts.Watcher) -} - -func (l *loader) WatchMaps( - ctx context.Context, - watchedMaps map[string]WatchedMap, - maps map[string]*ebpf.Map, - watcher MapWatcher, -) error { - contextutils.LoggerFrom(ctx).Info("enter watchMaps()") - eg, ctx := errgroup.WithContext(ctx) - for name, bpfMap := range watchedMaps { - name := name - bpfMap := bpfMap - - switch bpfMap.mapType { - case ebpf.RingBuf: - var increment stats.IncrementInstrument - if isCounterMap(bpfMap.mapSpec) { - increment = l.metricsProvider.NewIncrementCounter(name, bpfMap.Labels) - } else if isPrintMap(bpfMap.mapSpec) { - increment = &noop{} - } - eg.Go(func() error { - watcher.NewRingBuf(name, bpfMap.Labels) - return l.startRingBuf(ctx, bpfMap.valueStruct, maps[name], increment, name, watcher) - }) - case ebpf.Array: - fallthrough - case ebpf.Hash: - labelKeys := bpfMap.Labels - var instrument stats.SetInstrument - if isCounterMap(bpfMap.mapSpec) { - instrument = l.metricsProvider.NewSetCounter(bpfMap.Name, labelKeys) - } else if isGaugeMap(bpfMap.mapSpec) { - instrument = l.metricsProvider.NewGauge(bpfMap.Name, labelKeys) - } else { - instrument = &noop{} - } - eg.Go(func() error { - // TODO: output type of instrument in UI? - watcher.NewHashMap(name, labelKeys) - return l.startHashMap(ctx, bpfMap.mapSpec, maps[name], instrument, name, watcher) - }) - default: - // TODO: Support more map types - return errors.New("unsupported map type") + closeLifecycle := func() { + coll.Close() + for _, link := range progLinks { + link.Close() } } - err := eg.Wait() - contextutils.LoggerFrom(ctx).Info("after waitgroup") - return err -} - -func (l *loader) startRingBuf( - ctx context.Context, - valueStruct *btf.Struct, - liveMap *ebpf.Map, - incrementInstrument stats.IncrementInstrument, - name string, - watcher MapWatcher, -) error { - // Initialize decoder - d := l.decoderFactory() - logger := contextutils.LoggerFrom(ctx) - - // Open a ringbuf reader from userspace RINGBUF map described in the - // eBPF C program. - rd, err := ringbuf.NewReader(liveMap) - if err != nil { - return fmt.Errorf("opening ringbuf reader: %v", err) - } - defer rd.Close() - // Close the reader when the process receives a signal, which will exit - // the read loop. - go func() { - <-ctx.Done() - logger.Info("in ringbuf watcher, got done...") - if err := rd.Close(); err != nil { - logger.Infof("error while closing ringbuf '%s' reader: %s", name, err) - } - logger.Info("after reader.Close()") - }() - - for { - record, err := rd.Read() - if err != nil { - if errors.Is(err, ringbuf.ErrClosed) { - logger.Info("ringbuf closed...") - return nil - } - logger.Infof("error while reading from ringbuf '%s' reader: %s", name, err) - continue - } - result, err := d.DecodeBtfBinary(ctx, valueStruct, record.RawSample) - if err != nil { - return err - } - - stringLabels := stringify(result) - incrementInstrument.Increment(ctx, stringLabels) - watcher.SendEntry(MapEntry{ - Name: name, - Entry: KvPair{ - Key: stringLabels, - }, - }) - } -} - -func (l *loader) startHashMap( - ctx context.Context, - mapSpec *ebpf.MapSpec, - liveMap *ebpf.Map, - instrument stats.SetInstrument, - name string, - watcher MapWatcher, -) error { - d := l.decoderFactory() - - ticker := time.NewTicker(1 * time.Second) - for { - select { - case <-ticker.C: - mapIter := liveMap.Iterate() - for { - // Use generic key,value so we can decode ourselves - var ( - key, value []byte - ) - if !mapIter.Next(&key, &value) { - break - } - if err := mapIter.Err(); err != nil { - return err - } - decodedKey, err := d.DecodeBtfBinary(ctx, mapSpec.BTF.Key, key) - if err != nil { - return fmt.Errorf("error decoding key: %w", err) - } - - decodedValue, err := d.DecodeBtfBinary(ctx, mapSpec.BTF.Value, value) - if err != nil { - return fmt.Errorf("error decoding value: %w", err) - } - - // TODO: Check this information at load time - if len(decodedValue) > 1 { - log.Fatal("only 1 value allowed") - } - intVal, ok := decodedValue[""].(uint64) - if !ok { - log.Fatal("only uint64 allowed") - } - stringLabels := stringify(decodedKey) - instrument.Set(ctx, int64(intVal), stringLabels) - thisKvPair := KvPair{Key: stringLabels, Value: fmt.Sprint(intVal)} - watcher.SendEntry(MapEntry{ - Name: name, - Entry: thisKvPair, - }) - } - - case <-ctx.Done(): - // fmt.Println("got done in hashmap loop, returning") - return nil - } - } -} - -func stringify(decodedBinary map[string]interface{}) map[string]string { - keyMap := map[string]string{} - for k, v := range decodedBinary { - valAsStr := fmt.Sprint(v) - keyMap[k] = valAsStr - } - return keyMap -} - -func getLabelsForHashMapKey(mapSpec *ebpf.MapSpec) ([]string, error) { - structKey, ok := mapSpec.BTF.Key.(*btf.Struct) - if !ok { - return nil, fmt.Errorf("hash map keys can only be a struct, found %s", mapSpec.BTF.Value.String()) - } - - return getLabelsForBtfStruct(structKey), nil -} - -func getLabelsForBtfStruct(structKey *btf.Struct) []string { - keys := make([]string, 0, len(structKey.Members)) - for _, v := range structKey.Members { - keys = append(keys, v.Name) - } - return keys -} - -type noop struct{} - -func (n *noop) Increment( - ctx context.Context, - decodedKey map[string]string, -) { -} - -func (n *noop) Set( - ctx context.Context, - val int64, - labels map[string]string, -) { + return coll.Maps, closeLifecycle, nil } func createDir(ctx context.Context, path string, perm os.FileMode) error { diff --git a/pkg/loader/mapwatcher/receiver.go b/pkg/loader/mapwatcher/receiver.go new file mode 100644 index 0000000..2672b2c --- /dev/null +++ b/pkg/loader/mapwatcher/receiver.go @@ -0,0 +1,20 @@ +package mapwatcher + +type KvPair struct { + Key map[string]string + Value string + Hash uint64 +} + +type MapEntry struct { + Name string + Entry KvPair +} + +// MapEventReceiver provides a receiver that handles various map events. +type MapEventReceiver interface { + NewRingBuf(name string, keys []string) + NewHashMap(name string, keys []string) + SendEntry(entry MapEntry) + Close() +} diff --git a/pkg/loader/mapwatcher/watcher.go b/pkg/loader/mapwatcher/watcher.go new file mode 100644 index 0000000..8d3ed3b --- /dev/null +++ b/pkg/loader/mapwatcher/watcher.go @@ -0,0 +1,246 @@ +package mapwatcher + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/btf" + "github.com/cilium/ebpf/ringbuf" + "golang.org/x/sync/errgroup" + + "github.com/solo-io/bumblebee/pkg/decoder" + "github.com/solo-io/bumblebee/pkg/loader/util" + "github.com/solo-io/bumblebee/pkg/stats" + "github.com/solo-io/go-utils/contextutils" +) + +type WatchedMap struct { + Name string + Labels []string + + BTF *btf.Map + MapType ebpf.MapType + MapSpec *ebpf.MapSpec + + ValueStruct *btf.Struct +} + +type Watcher interface { + // WatchMaps watches the loaded maps and notifies the given receiver of events. + WatchMaps(ctx context.Context, receiver MapEventReceiver) error + // Maps returns the list of WatchedMaps + Maps() map[string]WatchedMap +} + +func New( + watchedMaps map[string]WatchedMap, + loadedMaps map[string]*ebpf.Map, + decoderFactory decoder.DecoderFactory, + provider stats.MetricsProvider, +) Watcher { + return &watcher{ + watchedMaps: watchedMaps, + loadedMaps: loadedMaps, + decoderFactory: decoderFactory, + metricsProvider: provider, + } +} + +type watcher struct { + // watchedMaps represent the maps we care to watch. + watchedMaps map[string]WatchedMap + // loadedMaps represent the maps currently loaded into the kernel. + loadedMaps map[string]*ebpf.Map + // decoderFactory provides a mechanism to decode various BTF types. + decoderFactory decoder.DecoderFactory + // metricsProvider provides Prometheus metrics. + metricsProvider stats.MetricsProvider +} + +func (w *watcher) WatchMaps(ctx context.Context, receiver MapEventReceiver) error { + // on shutdown notify receiver we have no more entries to send. + defer receiver.Close() + contextutils.LoggerFrom(ctx).Info("enter watchMaps()") + eg, ctx := errgroup.WithContext(ctx) + for name, bpfMap := range w.watchedMaps { + name := name + bpfMap := bpfMap + + switch bpfMap.MapType { + case ebpf.RingBuf: + var increment stats.IncrementInstrument + if util.IsCounterMap(bpfMap.MapSpec) { + increment = w.metricsProvider.NewIncrementCounter(name, bpfMap.Labels) + } else if util.IsPrintMap(bpfMap.MapSpec) { + increment = &noop{} + } + eg.Go(func() error { + receiver.NewRingBuf(name, bpfMap.Labels) + return w.startRingBuf(ctx, bpfMap.ValueStruct, w.loadedMaps[name], increment, name, receiver) + }) + case ebpf.Array: + fallthrough + case ebpf.Hash, ebpf.LRUHash: + labelKeys := bpfMap.Labels + var instrument stats.SetInstrument + if util.IsCounterMap(bpfMap.MapSpec) { + instrument = w.metricsProvider.NewSetCounter(bpfMap.Name, labelKeys) + } else if util.IsGaugeMap(bpfMap.MapSpec) { + instrument = w.metricsProvider.NewGauge(bpfMap.Name, labelKeys) + } else { + instrument = &noop{} + } + eg.Go(func() error { + // TODO: output type of instrument in UI? + receiver.NewHashMap(name, labelKeys) + return w.startHashMap(ctx, bpfMap.MapSpec, w.loadedMaps[name], instrument, name, receiver) + }) + default: + // TODO: Support more map types + return errors.New("unsupported map type") + } + } + + err := eg.Wait() + contextutils.LoggerFrom(ctx).Info("after waitgroup") + return err +} + +func (w *watcher) Maps() map[string]WatchedMap { + return w.watchedMaps +} + +func (w *watcher) startRingBuf( + ctx context.Context, + valueStruct *btf.Struct, + liveMap *ebpf.Map, + incrementInstrument stats.IncrementInstrument, + name string, + watcher MapEventReceiver, +) error { + // Initialize decoder + d := w.decoderFactory() + logger := contextutils.LoggerFrom(ctx) + + // Open a ringbuf reader from userspace RINGBUF map described in the + // eBPF C program. + rd, err := ringbuf.NewReader(liveMap) + if err != nil { + return fmt.Errorf("opening ringbuf reader: %v", err) + } + defer rd.Close() + // Close the reader when the process receives a signal, which will exit + // the read loop. + go func() { + <-ctx.Done() + logger.Info("in ringbuf watcher, got done...") + if err := rd.Close(); err != nil { + logger.Infof("error while closing ringbuf '%s' reader: %s", name, err) + } + logger.Info("after reader.Close()") + }() + + for { + record, err := rd.Read() + if err != nil { + if errors.Is(err, ringbuf.ErrClosed) { + logger.Info("ringbuf closed...") + return nil + } + logger.Infof("error while reading from ringbuf '%s' reader: %s", name, err) + continue + } + result, err := d.DecodeBtfBinary(ctx, valueStruct, record.RawSample) + if err != nil { + return err + } + + stringLabels := stringify(result) + incrementInstrument.Increment(ctx, stringLabels) + watcher.SendEntry(MapEntry{ + Name: name, + Entry: KvPair{ + Key: stringLabels, + }, + }) + } +} + +func (w *watcher) startHashMap( + ctx context.Context, + mapSpec *ebpf.MapSpec, + liveMap *ebpf.Map, + instrument stats.SetInstrument, + name string, + watcher MapEventReceiver, +) error { + d := w.decoderFactory() + + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ticker.C: + mapIter := liveMap.Iterate() + for { + // Use generic key,value so we can decode ourselves + var ( + key, value []byte + ) + if !mapIter.Next(&key, &value) { + break + } + if err := mapIter.Err(); err != nil { + return err + } + decodedKey, err := d.DecodeBtfBinary(ctx, mapSpec.BTF.Key, key) + if err != nil { + return fmt.Errorf("error decoding key: %w", err) + } + + decodedValue, err := d.DecodeBtfBinary(ctx, mapSpec.BTF.Value, value) + if err != nil { + return fmt.Errorf("error decoding value: %w", err) + } + + // TODO: Check this information at load time + if len(decodedValue) > 1 { + log.Fatal("only 1 value allowed") + } + intVal, ok := decodedValue[""].(uint64) + if !ok { + log.Fatal("only uint64 allowed") + } + stringLabels := stringify(decodedKey) + instrument.Set(ctx, int64(intVal), stringLabels) + thisKvPair := KvPair{Key: stringLabels, Value: fmt.Sprint(intVal)} + watcher.SendEntry(MapEntry{ + Name: name, + Entry: thisKvPair, + }) + } + + case <-ctx.Done(): + // fmt.Println("got done in hashmap loop, returning") + return nil + } + } +} + +func stringify(decodedBinary map[string]interface{}) map[string]string { + keyMap := map[string]string{} + for k, v := range decodedBinary { + valAsStr := fmt.Sprint(v) + keyMap[k] = valAsStr + } + return keyMap +} + +type noop struct{} + +func (n *noop) Increment(_ context.Context, _ map[string]string) {} + +func (n *noop) Set(_ context.Context, _ int64, _ map[string]string) {} diff --git a/pkg/loader/util/util.go b/pkg/loader/util/util.go new file mode 100644 index 0000000..ef2ead0 --- /dev/null +++ b/pkg/loader/util/util.go @@ -0,0 +1,48 @@ +package util + +import ( + "fmt" + "strings" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/btf" +) + +const ( + counterMapType = "counter" + gaugeMapType = "gauge" + printMapType = "print" +) + +func IsPrintMap(spec *ebpf.MapSpec) bool { + return strings.Contains(spec.SectionName, printMapType) +} + +func IsGaugeMap(spec *ebpf.MapSpec) bool { + return strings.Contains(spec.SectionName, gaugeMapType) +} + +func IsCounterMap(spec *ebpf.MapSpec) bool { + return strings.Contains(spec.SectionName, counterMapType) +} + +func IsTrackedMap(spec *ebpf.MapSpec) bool { + return IsCounterMap(spec) || IsGaugeMap(spec) || IsPrintMap(spec) +} + +func GetLabelsForBtfStruct(structKey *btf.Struct) []string { + keys := make([]string, 0, len(structKey.Members)) + for _, v := range structKey.Members { + keys = append(keys, v.Name) + } + return keys +} + +func GetLabelsForHashMapKey(mapSpec *ebpf.MapSpec) ([]string, error) { + structKey, ok := mapSpec.BTF.Key.(*btf.Struct) + if !ok { + return nil, fmt.Errorf("hash map keys can only be a struct, found %s", mapSpec.BTF.Value.String()) + } + + return GetLabelsForBtfStruct(structKey), nil +} diff --git a/pkg/loader/watcher.go b/pkg/loader/watcher.go deleted file mode 100644 index 13efad5..0000000 --- a/pkg/loader/watcher.go +++ /dev/null @@ -1,38 +0,0 @@ -package loader - -type KvPair struct { - Key map[string]string - Value string - Hash uint64 -} - -type MapEntry struct { - Name string - Entry KvPair -} - -type MapWatcher interface { - NewRingBuf(name string, keys []string) - NewHashMap(name string, keys []string) - SendEntry(entry MapEntry) - Close() -} - -type noopWatcher struct{} - -func (w *noopWatcher) NewRingBuf(name string, keys []string) { - // noop -} -func (w *noopWatcher) NewHashMap(name string, keys []string) { - // noop -} -func (w *noopWatcher) SendEntry(entry MapEntry) { - // noop -} -func (w *noopWatcher) Close() { - // noop -} - -func NewNoopWatcher() *noopWatcher { - return &noopWatcher{} -} diff --git a/pkg/tui/filter.go b/pkg/tui/filter.go index d33b4b0..6edd7ef 100644 --- a/pkg/tui/filter.go +++ b/pkg/tui/filter.go @@ -4,10 +4,10 @@ import ( "fmt" "regexp" - "github.com/solo-io/bumblebee/pkg/loader" + "github.com/solo-io/bumblebee/pkg/loader/mapwatcher" ) -func (a *App) filterMatch(entry loader.MapEntry) bool { +func (a *App) filterMatch(entry mapwatcher.MapEntry) bool { if a.filter == nil { // no filters defined, allow entry return true @@ -27,7 +27,7 @@ func (a *App) filterMatch(entry loader.MapEntry) bool { return false } -func BuildFilter(filterString []string, watchedMaps map[string]loader.WatchedMap) (map[string]Filter, error) { +func BuildFilter(filterString []string, watchedMaps map[string]mapwatcher.WatchedMap) (map[string]Filter, error) { if len(filterString) == 0 { return nil, nil } diff --git a/pkg/tui/tui.go b/pkg/tui/tui.go index 2d62693..5b33182 100644 --- a/pkg/tui/tui.go +++ b/pkg/tui/tui.go @@ -11,10 +11,12 @@ import ( "github.com/gdamore/tcell/v2" "github.com/mitchellh/hashstructure/v2" "github.com/rivo/tview" - "github.com/solo-io/bumblebee/pkg/loader" - "github.com/solo-io/go-utils/contextutils" "go.uber.org/zap" "golang.org/x/sync/errgroup" + + "github.com/solo-io/bumblebee/pkg/loader" + "github.com/solo-io/bumblebee/pkg/loader/mapwatcher" + "github.com/solo-io/go-utils/contextutils" ) const titleText = `[aqua] __ @@ -39,7 +41,7 @@ type Filter struct { type MapValue struct { Hash uint64 - Entries []loader.KvPair + Entries []mapwatcher.KvPair Table *tview.Table Index int Type ebpf.MapType @@ -53,7 +55,7 @@ type AppOpts struct { } type App struct { - Entries chan loader.MapEntry + Entries chan mapwatcher.MapEntry tviewApp *tview.Application flex *tview.Flex @@ -73,7 +75,10 @@ var mapOfMaps = make(map[string]MapValue) var mapMutex = sync.RWMutex{} var currentIndex int -func buildTView(logger *zap.SugaredLogger, cancel context.CancelFunc, progLocation string) (*tview.Application, *tview.Flex) { +func buildTView(logger *zap.SugaredLogger, cancel context.CancelFunc, progLocation string) ( + *tview.Application, + *tview.Flex, +) { app := tview.NewApplication() app.SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey { if event.Key() == tcell.KeyCtrlC || (event.Key() == tcell.KeyRune && event.Rune() == 'q') { @@ -127,14 +132,14 @@ func (a *App) Close() { close(a.Entries) } -func (a *App) Run(ctx context.Context, progLoader loader.Loader, loaderOpts *loader.LoadOptions) error { +func (a *App) Run(ctx context.Context, mapWatcher mapwatcher.Watcher) error { logger := contextutils.LoggerFrom(ctx) ctx, cancel := context.WithCancel(ctx) app, flex := buildTView(logger, cancel, a.progLocation) a.tviewApp = app a.flex = flex - a.Entries = make(chan loader.MapEntry, 20) + a.Entries = make(chan mapwatcher.MapEntry, 20) eg := errgroup.Group{} eg.Go(func() error { @@ -152,9 +157,9 @@ func (a *App) Run(ctx context.Context, progLoader loader.Loader, loaderOpts *loa }) eg.Go(func() error { - logger.Info("calling Load()") - err := progLoader.Load(ctx, loaderOpts) - logger.Info("returned from Load()") + logger.Info("calling WatchMaps()") + err := mapWatcher.WatchMaps(ctx, a) + logger.Info("returned from WatchMaps()") if err != nil { logger.Error("error loading program") a.renderLoadError(ctx, err) @@ -185,7 +190,7 @@ func (a *App) watch(ctx context.Context) { logger.Info("no more entries, returning from Watch()") } -func (a *App) renderRingBuf(ctx context.Context, incoming loader.MapEntry) { +func (a *App) renderRingBuf(ctx context.Context, incoming mapwatcher.MapEntry) { current := mapOfMaps[incoming.Name] current.Entries = append(current.Entries, incoming.Entry) @@ -214,7 +219,7 @@ func (a *App) renderRingBuf(ctx context.Context, incoming loader.MapEntry) { } } -func (a *App) renderHash(ctx context.Context, incoming loader.MapEntry) { +func (a *App) renderHash(ctx context.Context, incoming mapwatcher.MapEntry) { logger := contextutils.LoggerFrom(ctx) current := mapOfMaps[incoming.Name] if len(current.Entries) == 0 { @@ -297,7 +302,7 @@ func (a *App) NewHashMap(name string, keys []string) { a.makeMapValue(name, keys, ebpf.Hash) } -func (a *App) SendEntry(entry loader.MapEntry) { +func (a *App) SendEntry(entry mapwatcher.MapEntry) { if a.filterMatch(entry) { a.Entries <- entry } @@ -310,7 +315,7 @@ func (a *App) makeMapValue(name string, keys []string, mapType ebpf.MapType) { sort.Strings(keysCopy) // create the array for containing the entries - entries := make([]loader.KvPair, 0, 10) + entries := make([]mapwatcher.KvPair, 0, 10) table := tview.NewTable().SetFixed(1, 0) table.SetBorder(true).SetTitle(name)