diff --git a/.mk/bc.mk b/.mk/bc.mk index 78a1d5905..4174e23d5 100644 --- a/.mk/bc.mk +++ b/.mk/bc.mk @@ -28,7 +28,7 @@ define MAPS "direct_flows":"ringbuf", "aggregated_flows":"hash", "additional_flow_metrics":"per_cpu_hash", - "packets_record":"perf_event_array", + "packets_record":"ringbuf", "dns_flows":"hash", "global_counters":"per_cpu_array", "filter_map":"lpm_trie", diff --git a/bpf/maps_definition.h b/bpf/maps_definition.h index b6da15e3e..dd5d3c168 100644 --- a/bpf/maps_definition.h +++ b/bpf/maps_definition.h @@ -30,12 +30,10 @@ struct { __uint(pinning, LIBBPF_PIN_BY_NAME); } additional_flow_metrics SEC(".maps"); -//PerfEvent Array for Packet Payloads +//Ringbuf for Packet Payloads struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); - __type(key, u32); - __type(value, u32); - __uint(max_entries, 256); + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 1 << 21); // 256 bytes * 1000 events/sec * 5sec "eviction time" __uint(pinning, LIBBPF_PIN_BY_NAME); } packet_record SEC(".maps"); diff --git a/bpf/pca.h b/bpf/pca.h index 8c77d08f8..fb9973025 100644 --- a/bpf/pca.h +++ b/bpf/pca.h @@ -3,37 +3,39 @@ #include "utils.h" -static int attach_packet_payload(void *data, void *data_end, struct __sk_buff *skb) { - payload_meta meta; - u64 flags = BPF_F_CURRENT_CPU; - // Enable the flag to add packet header - // Packet payload follows immediately after the meta struct - u32 packetSize = (u32)(data_end - data); - - // Record the current time. - u64 current_time = bpf_ktime_get_ns(); - - // For packets which are allocated non-linearly struct __sk_buff does not necessarily - // has all data lined up in memory but instead can be part of scatter gather lists. - // This command pulls data from the buffer but incurs data copying penalty. - if (packetSize <= skb->len) { - packetSize = skb->len; - if (bpf_skb_pull_data(skb, skb->len)) { - return TC_ACT_UNSPEC; - }; +static inline void attach_packet_payload(struct __sk_buff *skb) { + payload_meta *event; + u32 packetSize = skb->len; + + event = bpf_ringbuf_reserve(&packet_record, sizeof(payload_meta), 0); + if (!event) { + return; + } + + if (!packetSize) { + // Release reserved ringbuf location + bpf_ringbuf_discard(event, 0); + return; + } + + if (packetSize > MAX_PAYLOAD_SIZE) { + packetSize = MAX_PAYLOAD_SIZE; } - // Set flag's upper 32 bits with the size of the paylaod and the bpf_perf_event_output will - // attach the specified amount of bytes from packet to the perf event - // https://github.com/xdp-project/xdp-tutorial/tree/9b25f0a039179aca1f66cba5492744d9f09662c1/tracing04-xdp-tcpdump - flags |= (u64)packetSize << 32; - - meta.if_index = skb->ifindex; - meta.pkt_len = packetSize; - meta.timestamp = current_time; - if (bpf_perf_event_output(skb, &packet_record, flags, &meta, sizeof(meta))) { - return TC_ACT_OK; + + event->if_index = skb->ifindex; + event->pkt_len = packetSize; + event->timestamp = bpf_ktime_get_ns(); + // bpf_skb_load_bytes will handle cases when packets are allocated linearly or none-linearly where + // struct __sk_buff does not necessarily has all data lined up in memory but instead + // can be part of scatter gather lists. + // so no need to use bpf_skb_pull_data() which has performance side effects for the rest of that skb's lifetime. + if (bpf_skb_load_bytes(skb, 0, event->payload, packetSize)) { + // Release reserved ringbuf location + bpf_ringbuf_discard(event, 0); + return; } - return TC_ACT_UNSPEC; + bpf_ringbuf_submit(event, 0); + return; } static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) { @@ -62,19 +64,18 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) { return true; } -static inline int export_packet_payload(struct __sk_buff *skb, direction dir) { +static inline void export_packet_payload(struct __sk_buff *skb, direction dir) { + if (!enable_pca) { + return; + } // If sampling is defined, will only parse 1 out of "sampling" flows if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) { - return 0; + return; } - void *data_end = (void *)(long)skb->data_end; - void *data = (void *)(long)skb->data; - if (validate_pca_filter(skb, dir)) { - return attach_packet_payload(data, data_end, skb); + attach_packet_payload(skb); } - return 0; } SEC("classifier/tc_pca_ingress") diff --git a/bpf/types.h b/bpf/types.h index a0ae968d8..756286afc 100644 --- a/bpf/types.h +++ b/bpf/types.h @@ -69,6 +69,8 @@ typedef __u64 u64; #define MAX_OBSERVED_INTERFACES 6 #define OBSERVED_DIRECTION_BOTH 3 +#define MAX_PAYLOAD_SIZE 256 + // according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml typedef enum direction_t { INGRESS, @@ -147,7 +149,6 @@ const static struct additional_metrics_t *unused3 __attribute__((unused)); const static struct dns_record_t *unused4 __attribute__((unused)); const static struct pkt_drops_t *unused5 __attribute__((unused)); const static struct translated_flow_t *unused6 __attribute__((unused)); -const static struct observed_intf_t *unused13 __attribute__((unused)); // Attributes that uniquely identify a flow typedef struct flow_id_t { @@ -196,6 +197,7 @@ typedef struct payload_meta_t { u32 if_index; u32 pkt_len; u64 timestamp; // timestamp when packet received by ebpf + u8 payload[MAX_PAYLOAD_SIZE]; } payload_meta; // DNS Flow record used as key to correlate DNS query and response diff --git a/examples/packetcapture-dump/README.md b/examples/packetcapture-dump/README.md index 0717acda7..330424ed7 100644 --- a/examples/packetcapture-dump/README.md +++ b/examples/packetcapture-dump/README.md @@ -19,7 +19,8 @@ Start the packetcapture-client using: (in a secondary shell) Start the agent using: ```bash -sudo TARGET_HOST=localhost TARGET_PORT=9990 ENABLE_PCA="true" FILTER_IP_CIDR="0.0.0.0/0" FILTER_PROTOCOL="TCP" FILTER_PORT=22 FILTER_ACTION="Accept" ./bin/netobserv-ebpf-agent +sudo TARGET_HOST=localhost TARGET_PORT=9990 ENABLE_PCA="true" FLOW_FILTER_RULES='[{"ip_cidr":"0.0.0.0/0","protocol":"TCP","action":"Accept"}]' ./bin/netobserv-ebpf-agent + ``` You should see output such as: diff --git a/pkg/agent/packets_agent.go b/pkg/agent/packets_agent.go index 21cba9b44..469fec490 100644 --- a/pkg/agent/packets_agent.go +++ b/pkg/agent/packets_agent.go @@ -15,7 +15,7 @@ import ( "github.com/netobserv/netobserv-ebpf-agent/pkg/model" "github.com/netobserv/netobserv-ebpf-agent/pkg/tracer" - "github.com/cilium/ebpf/perf" + "github.com/cilium/ebpf/ringbuf" "github.com/sirupsen/logrus" ) @@ -47,7 +47,7 @@ type ebpfPacketFetcher interface { AttachTCX(iface ifaces.Interface) error DetachTCX(iface ifaces.Interface) error LookupAndDeleteMap(*metrics.Metrics) map[int][]*byte - ReadPerf() (perf.Record, error) + ReadPerf() (ringbuf.Record, error) } // PacketsAgent instantiates a new agent, given a configuration. diff --git a/pkg/ebpf/bpf_arm64_bpfel.o b/pkg/ebpf/bpf_arm64_bpfel.o index 73b2984ff..1c7735f09 100644 Binary files a/pkg/ebpf/bpf_arm64_bpfel.o and b/pkg/ebpf/bpf_arm64_bpfel.o differ diff --git a/pkg/ebpf/bpf_powerpc_bpfel.o b/pkg/ebpf/bpf_powerpc_bpfel.o index d6d09b000..5afb69c35 100644 Binary files a/pkg/ebpf/bpf_powerpc_bpfel.o and b/pkg/ebpf/bpf_powerpc_bpfel.o differ diff --git a/pkg/ebpf/bpf_s390_bpfeb.o b/pkg/ebpf/bpf_s390_bpfeb.o index e221ef9d1..8acb3a056 100644 Binary files a/pkg/ebpf/bpf_s390_bpfeb.o and b/pkg/ebpf/bpf_s390_bpfeb.o differ diff --git a/pkg/ebpf/bpf_x86_bpfel.o b/pkg/ebpf/bpf_x86_bpfel.o index b83aa621b..2abf0ea11 100644 Binary files a/pkg/ebpf/bpf_x86_bpfel.o and b/pkg/ebpf/bpf_x86_bpfel.o differ diff --git a/pkg/flow/tracer_perf.go b/pkg/flow/tracer_perf.go index 237cbe946..837277d89 100644 --- a/pkg/flow/tracer_perf.go +++ b/pkg/flow/tracer_perf.go @@ -7,7 +7,7 @@ import ( "fmt" "time" - "github.com/cilium/ebpf/perf" + "github.com/cilium/ebpf/ringbuf" "github.com/netobserv/gopipes/pkg/node" "github.com/netobserv/netobserv-ebpf-agent/pkg/model" "github.com/sirupsen/logrus" @@ -24,7 +24,7 @@ type PerfTracer struct { } type perfReader interface { - ReadPerf() (perf.Record, error) + ReadPerf() (ringbuf.Record, error) } func NewPerfTracer( @@ -46,7 +46,7 @@ func (m *PerfTracer) TraceLoop(ctx context.Context) node.StartFunc[*model.Packet default: if err := m.listenAndForwardPerf(out); err != nil { - if errors.Is(err, perf.ErrClosed) { + if errors.Is(err, ringbuf.ErrClosed) { pblog.Debug("Received signal, exiting..") return } diff --git a/pkg/model/packet_record.go b/pkg/model/packet_record.go index 85af86136..223ce3980 100644 --- a/pkg/model/packet_record.go +++ b/pkg/model/packet_record.go @@ -28,25 +28,25 @@ func NewPacketRecord( return &pr } -// ReadRawPacket reads a PacketRecord from a binary source, in LittleEndian order +// ReadRawPacket reads a PacketRecord from a binary source, in NativeEndian order func ReadRawPacket(reader io.Reader) (*PacketRecord, error) { var pr PacketRecord currentTime := time.Now() monotonicTimeNow := monotime.Now() getLen := make([]byte, 4) packetTimestamp := make([]byte, 8) - // Read IfIndex and discard it: To be used in other usecases - _ = binary.Read(reader, binary.LittleEndian, make([]byte, 4)) - // Read Length of packet - _ = binary.Read(reader, binary.LittleEndian, getLen) - pr.Stream = make([]byte, binary.LittleEndian.Uint32(getLen)) - // Read TimeStamp of packet - _ = binary.Read(reader, binary.LittleEndian, packetTimestamp) + // Read IfIndex and discard it: To be used in other use cases + _ = binary.Read(reader, binary.NativeEndian, make([]byte, 4)) + // Read Length of a packet + _ = binary.Read(reader, binary.NativeEndian, getLen) + pr.Stream = make([]byte, binary.NativeEndian.Uint32(getLen)) + // Read TimeStamp of a packet + _ = binary.Read(reader, binary.NativeEndian, packetTimestamp) // The assumption is monotonic time should be as close to time recorded by ebpf. // The difference is considered the delta time from current time. - tsDelta := time.Duration(uint64(monotonicTimeNow) - binary.LittleEndian.Uint64(packetTimestamp)) + tsDelta := time.Duration(uint64(monotonicTimeNow) - binary.NativeEndian.Uint64(packetTimestamp)) pr.Time = currentTime.Add(-tsDelta) - err := binary.Read(reader, binary.LittleEndian, &pr.Stream) + err := binary.Read(reader, binary.NativeEndian, &pr.Stream) return &pr, err } diff --git a/pkg/model/record.go b/pkg/model/record.go index 255cf21e3..277941015 100644 --- a/pkg/model/record.go +++ b/pkg/model/record.go @@ -192,10 +192,10 @@ func (m *MacAddr) MarshalJSON() ([]byte, error) { return []byte("\"" + m.String() + "\""), nil } -// ReadFrom reads a Record from a binary source, in LittleEndian order +// ReadFrom reads a Record from a binary source, in NativeEndian order func ReadFrom(reader io.Reader) (*RawRecord, error) { var fr RawRecord - err := binary.Read(reader, binary.LittleEndian, &fr) + err := binary.Read(reader, binary.NativeEndian, &fr) return &fr, err } diff --git a/pkg/tracer/tracer.go b/pkg/tracer/tracer.go index 1a00ef429..d4034729b 100644 --- a/pkg/tracer/tracer.go +++ b/pkg/tracer/tracer.go @@ -19,7 +19,6 @@ import ( cilium "github.com/cilium/ebpf" "github.com/cilium/ebpf/btf" "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" "github.com/gavv/monotime" @@ -1266,7 +1265,7 @@ type PacketFetcher struct { qdiscs map[ifaces.Interface]*netlink.GenericQdisc egressFilters map[ifaces.Interface]*netlink.BpfFilter ingressFilters map[ifaces.Interface]*netlink.BpfFilter - perfReader *perf.Reader + perfReader *ringbuf.Reader cacheMaxSize int enableIngress bool enableEgress bool @@ -1379,7 +1378,7 @@ func NewPacketFetcher(cfg *FlowFetcherConfig) (*PacketFetcher, error) { } // read packets from igress+egress perf array - packets, err := perf.NewReader(objects.PacketRecord, os.Getpagesize()) + packets, err := ringbuf.NewReader(objects.BpfMaps.PacketRecord) if err != nil { return nil, fmt.Errorf("accessing to perf: %w", err) } @@ -1739,7 +1738,7 @@ func (p *PacketFetcher) Close() error { return errors.New(`errors: "` + strings.Join(errStrings, `", "`) + `"`) } -func (p *PacketFetcher) ReadPerf() (perf.Record, error) { +func (p *PacketFetcher) ReadPerf() (ringbuf.Record, error) { return p.perfReader.Read() } diff --git a/vendor/github.com/cilium/ebpf/perf/doc.go b/vendor/github.com/cilium/ebpf/perf/doc.go deleted file mode 100644 index b92bc56af..000000000 --- a/vendor/github.com/cilium/ebpf/perf/doc.go +++ /dev/null @@ -1,5 +0,0 @@ -// Package perf allows reading from BPF perf event arrays. -// -// A perf event array contains multiple perf event ringbuffers which can be used -// to exchange sample like data with user space. -package perf diff --git a/vendor/github.com/cilium/ebpf/perf/reader.go b/vendor/github.com/cilium/ebpf/perf/reader.go deleted file mode 100644 index 113148bb0..000000000 --- a/vendor/github.com/cilium/ebpf/perf/reader.go +++ /dev/null @@ -1,493 +0,0 @@ -//go:build linux - -package perf - -import ( - "encoding/binary" - "errors" - "fmt" - "io" - "os" - "runtime" - "sync" - "time" - - "github.com/cilium/ebpf" - "github.com/cilium/ebpf/internal" - "github.com/cilium/ebpf/internal/epoll" - "github.com/cilium/ebpf/internal/sys" - "github.com/cilium/ebpf/internal/unix" -) - -var ( - ErrClosed = os.ErrClosed - ErrFlushed = epoll.ErrFlushed - errEOR = errors.New("end of ring") -) - -var perfEventHeaderSize = binary.Size(perfEventHeader{}) - -// perfEventHeader must match 'struct perf_event_header` in . -type perfEventHeader struct { - Type uint32 - Misc uint16 - Size uint16 -} - -// Record contains either a sample or a counter of the -// number of lost samples. -type Record struct { - // The CPU this record was generated on. - CPU int - - // The data submitted via bpf_perf_event_output. - // Due to a kernel bug, this can contain between 0 and 7 bytes of trailing - // garbage from the ring depending on the input sample's length. - RawSample []byte - - // The number of samples which could not be output, since - // the ring buffer was full. - LostSamples uint64 - - // The minimum number of bytes remaining in the per-CPU buffer after this Record has been read. - // Negative for overwritable buffers. - Remaining int -} - -// Read a record from a reader and tag it as being from the given CPU. -// -// buf must be at least perfEventHeaderSize bytes long. -func readRecord(rd io.Reader, rec *Record, buf []byte, overwritable bool) error { - // Assert that the buffer is large enough. - buf = buf[:perfEventHeaderSize] - _, err := io.ReadFull(rd, buf) - if errors.Is(err, io.EOF) { - return errEOR - } else if err != nil { - return fmt.Errorf("read perf event header: %v", err) - } - - header := perfEventHeader{ - internal.NativeEndian.Uint32(buf[0:4]), - internal.NativeEndian.Uint16(buf[4:6]), - internal.NativeEndian.Uint16(buf[6:8]), - } - - switch header.Type { - case unix.PERF_RECORD_LOST: - rec.RawSample = rec.RawSample[:0] - rec.LostSamples, err = readLostRecords(rd) - return err - - case unix.PERF_RECORD_SAMPLE: - rec.LostSamples = 0 - // We can reuse buf here because perfEventHeaderSize > perfEventSampleSize. - rec.RawSample, err = readRawSample(rd, buf, rec.RawSample) - return err - - default: - return &unknownEventError{header.Type} - } -} - -func readLostRecords(rd io.Reader) (uint64, error) { - // lostHeader must match 'struct perf_event_lost in kernel sources. - var lostHeader struct { - ID uint64 - Lost uint64 - } - - err := binary.Read(rd, internal.NativeEndian, &lostHeader) - if err != nil { - return 0, fmt.Errorf("can't read lost records header: %v", err) - } - - return lostHeader.Lost, nil -} - -var perfEventSampleSize = binary.Size(uint32(0)) - -// This must match 'struct perf_event_sample in kernel sources. -type perfEventSample struct { - Size uint32 -} - -func readRawSample(rd io.Reader, buf, sampleBuf []byte) ([]byte, error) { - buf = buf[:perfEventSampleSize] - if _, err := io.ReadFull(rd, buf); err != nil { - return nil, fmt.Errorf("read sample size: %w", err) - } - - sample := perfEventSample{ - internal.NativeEndian.Uint32(buf), - } - - var data []byte - if size := int(sample.Size); cap(sampleBuf) < size { - data = make([]byte, size) - } else { - data = sampleBuf[:size] - } - - if _, err := io.ReadFull(rd, data); err != nil { - return nil, fmt.Errorf("read sample: %w", err) - } - return data, nil -} - -// Reader allows reading bpf_perf_event_output -// from user space. -type Reader struct { - poller *epoll.Poller - - // mu protects read/write access to the Reader structure with the - // exception fields protected by 'pauseMu'. - // If locking both 'mu' and 'pauseMu', 'mu' must be locked first. - mu sync.Mutex - array *ebpf.Map - rings []*perfEventRing - epollEvents []unix.EpollEvent - epollRings []*perfEventRing - eventHeader []byte - deadline time.Time - overwritable bool - bufferSize int - pendingErr error - - // pauseMu protects eventFds so that Pause / Resume can be invoked while - // Read is blocked. - pauseMu sync.Mutex - eventFds []*sys.FD - paused bool -} - -// ReaderOptions control the behaviour of the user -// space reader. -type ReaderOptions struct { - // The number of events required in any per CPU buffer before - // Read will process data. This is mutually exclusive with Watermark. - // The default is zero, which means Watermark will take precedence. - WakeupEvents int - // The number of written bytes required in any per CPU buffer before - // Read will process data. Must be smaller than PerCPUBuffer. - // The default is to start processing as soon as data is available. - Watermark int - // This perf ring buffer is overwritable, once full the oldest event will be - // overwritten by newest. - Overwritable bool -} - -// NewReader creates a new reader with default options. -// -// array must be a PerfEventArray. perCPUBuffer gives the size of the -// per CPU buffer in bytes. It is rounded up to the nearest multiple -// of the current page size. -func NewReader(array *ebpf.Map, perCPUBuffer int) (*Reader, error) { - return NewReaderWithOptions(array, perCPUBuffer, ReaderOptions{}) -} - -// NewReaderWithOptions creates a new reader with the given options. -func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) (pr *Reader, err error) { - closeOnError := func(c io.Closer) { - if err != nil { - c.Close() - } - } - - if perCPUBuffer < 1 { - return nil, errors.New("perCPUBuffer must be larger than 0") - } - if opts.WakeupEvents > 0 && opts.Watermark > 0 { - return nil, errors.New("WakeupEvents and Watermark cannot both be non-zero") - } - - var ( - nCPU = int(array.MaxEntries()) - rings = make([]*perfEventRing, 0, nCPU) - eventFds = make([]*sys.FD, 0, nCPU) - ) - - poller, err := epoll.New() - if err != nil { - return nil, err - } - defer closeOnError(poller) - - // bpf_perf_event_output checks which CPU an event is enabled on, - // but doesn't allow using a wildcard like -1 to specify "all CPUs". - // Hence we have to create a ring for each CPU. - bufferSize := 0 - for i := 0; i < nCPU; i++ { - event, ring, err := newPerfEventRing(i, perCPUBuffer, opts) - if errors.Is(err, unix.ENODEV) { - // The requested CPU is currently offline, skip it. - continue - } - - if err != nil { - return nil, fmt.Errorf("failed to create perf ring for CPU %d: %v", i, err) - } - defer closeOnError(event) - defer closeOnError(ring) - - bufferSize = ring.size() - rings = append(rings, ring) - eventFds = append(eventFds, event) - - if err := poller.Add(event.Int(), 0); err != nil { - return nil, err - } - } - - // Closing a PERF_EVENT_ARRAY removes all event fds - // stored in it, so we keep a reference alive. - array, err = array.Clone() - if err != nil { - return nil, err - } - - pr = &Reader{ - array: array, - rings: rings, - poller: poller, - deadline: time.Time{}, - epollEvents: make([]unix.EpollEvent, len(rings)), - epollRings: make([]*perfEventRing, 0, len(rings)), - eventHeader: make([]byte, perfEventHeaderSize), - eventFds: eventFds, - overwritable: opts.Overwritable, - bufferSize: bufferSize, - } - if err = pr.Resume(); err != nil { - return nil, err - } - runtime.SetFinalizer(pr, (*Reader).Close) - return pr, nil -} - -// Close frees resources used by the reader. -// -// It interrupts calls to Read. -// -// Calls to perf_event_output from eBPF programs will return -// ENOENT after calling this method. -func (pr *Reader) Close() error { - if err := pr.poller.Close(); err != nil { - if errors.Is(err, os.ErrClosed) { - return nil - } - return fmt.Errorf("close poller: %w", err) - } - - // Trying to poll will now fail, so Read() can't block anymore. Acquire the - // locks so that we can clean up. - pr.mu.Lock() - defer pr.mu.Unlock() - - pr.pauseMu.Lock() - defer pr.pauseMu.Unlock() - - for _, ring := range pr.rings { - ring.Close() - } - for _, event := range pr.eventFds { - event.Close() - } - pr.rings = nil - pr.eventFds = nil - pr.array.Close() - - return nil -} - -// SetDeadline controls how long Read and ReadInto will block waiting for samples. -// -// Passing a zero time.Time will remove the deadline. Passing a deadline in the -// past will prevent the reader from blocking if there are no records to be read. -func (pr *Reader) SetDeadline(t time.Time) { - pr.mu.Lock() - defer pr.mu.Unlock() - - pr.deadline = t -} - -// Read the next record from the perf ring buffer. -// -// The method blocks until there are at least Watermark bytes in one -// of the per CPU buffers. Records from buffers below the Watermark -// are not returned. -// -// Records can contain between 0 and 7 bytes of trailing garbage from the ring -// depending on the input sample's length. -// -// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush] -// makes it return all records currently in the ring buffer, followed by [ErrFlushed]. -// -// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records -// have been read from the ring. -// -// See [Reader.ReadInto] for a more efficient version of this method. -func (pr *Reader) Read() (Record, error) { - var r Record - - return r, pr.ReadInto(&r) -} - -var errMustBePaused = fmt.Errorf("perf ringbuffer: must have been paused before reading overwritable buffer") - -// ReadInto is like [Reader.Read] except that it allows reusing Record and associated buffers. -func (pr *Reader) ReadInto(rec *Record) error { - pr.mu.Lock() - defer pr.mu.Unlock() - - pr.pauseMu.Lock() - defer pr.pauseMu.Unlock() - - if pr.overwritable && !pr.paused { - return errMustBePaused - } - - if pr.rings == nil { - return fmt.Errorf("perf ringbuffer: %w", ErrClosed) - } - - for { - if len(pr.epollRings) == 0 { - if pe := pr.pendingErr; pe != nil { - // All rings have been emptied since the error occurred, return - // appropriate error. - pr.pendingErr = nil - return pe - } - - // NB: The deferred pauseMu.Unlock will panic if Wait panics, which - // might obscure the original panic. - pr.pauseMu.Unlock() - _, err := pr.poller.Wait(pr.epollEvents, pr.deadline) - pr.pauseMu.Lock() - - if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { - // We've hit the deadline, check whether there is any data in - // the rings that we've not been woken up for. - pr.pendingErr = err - } else if err != nil { - return err - } - - // Re-validate pr.paused since we dropped pauseMu. - if pr.overwritable && !pr.paused { - return errMustBePaused - } - - // Waking up userspace is expensive, make the most of it by checking - // all rings. - for _, ring := range pr.rings { - ring.loadHead() - pr.epollRings = append(pr.epollRings, ring) - } - } - - // Start at the last available event. The order in which we - // process them doesn't matter, and starting at the back allows - // resizing epollRings to keep track of processed rings. - err := pr.readRecordFromRing(rec, pr.epollRings[len(pr.epollRings)-1]) - if err == errEOR { - // We've emptied the current ring buffer, process - // the next one. - pr.epollRings = pr.epollRings[:len(pr.epollRings)-1] - continue - } - - return err - } -} - -// Pause stops all notifications from this Reader. -// -// While the Reader is paused, any attempts to write to the event buffer from -// BPF programs will return -ENOENT. -// -// Subsequent calls to Read will block until a call to Resume. -func (pr *Reader) Pause() error { - pr.pauseMu.Lock() - defer pr.pauseMu.Unlock() - - if pr.eventFds == nil { - return fmt.Errorf("%w", ErrClosed) - } - - for i := range pr.eventFds { - if err := pr.array.Delete(uint32(i)); err != nil && !errors.Is(err, ebpf.ErrKeyNotExist) { - return fmt.Errorf("could't delete event fd for CPU %d: %w", i, err) - } - } - - pr.paused = true - - return nil -} - -// Resume allows this perf reader to emit notifications. -// -// Subsequent calls to Read will block until the next event notification. -func (pr *Reader) Resume() error { - pr.pauseMu.Lock() - defer pr.pauseMu.Unlock() - - if pr.eventFds == nil { - return fmt.Errorf("%w", ErrClosed) - } - - for i, fd := range pr.eventFds { - if fd == nil { - continue - } - - if err := pr.array.Put(uint32(i), fd.Uint()); err != nil { - return fmt.Errorf("couldn't put event fd %d for CPU %d: %w", fd, i, err) - } - } - - pr.paused = false - - return nil -} - -// BufferSize is the size in bytes of each per-CPU buffer -func (pr *Reader) BufferSize() int { - return pr.bufferSize -} - -// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point, -// until you receive a [ErrFlushed] error. -func (pr *Reader) Flush() error { - return pr.poller.Flush() -} - -// NB: Has to be preceded by a call to ring.loadHead. -func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error { - defer ring.writeTail() - - rec.CPU = ring.cpu - err := readRecord(ring, rec, pr.eventHeader, pr.overwritable) - if pr.overwritable && (errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) { - return errEOR - } - rec.Remaining = ring.remaining() - return err -} - -type unknownEventError struct { - eventType uint32 -} - -func (uev *unknownEventError) Error() string { - return fmt.Sprintf("unknown event type: %d", uev.eventType) -} - -// IsUnknownEvent returns true if the error occurred -// because an unknown event was submitted to the perf event ring. -func IsUnknownEvent(err error) bool { - var uee *unknownEventError - return errors.As(err, &uee) -} diff --git a/vendor/github.com/cilium/ebpf/perf/ring.go b/vendor/github.com/cilium/ebpf/perf/ring.go deleted file mode 100644 index 426f7e154..000000000 --- a/vendor/github.com/cilium/ebpf/perf/ring.go +++ /dev/null @@ -1,295 +0,0 @@ -//go:build linux - -package perf - -import ( - "errors" - "fmt" - "io" - "math" - "os" - "runtime" - "sync/atomic" - "unsafe" - - "github.com/cilium/ebpf/internal/sys" - "github.com/cilium/ebpf/internal/unix" -) - -// perfEventRing is a page of metadata followed by -// a variable number of pages which form a ring buffer. -type perfEventRing struct { - cpu int - mmap []byte - ringReader -} - -func newPerfEventRing(cpu, perCPUBuffer int, opts ReaderOptions) (_ *sys.FD, _ *perfEventRing, err error) { - closeOnError := func(c io.Closer) { - if err != nil { - c.Close() - } - } - - if opts.Watermark >= perCPUBuffer { - return nil, nil, errors.New("watermark must be smaller than perCPUBuffer") - } - - fd, err := createPerfEvent(cpu, opts) - if err != nil { - return nil, nil, err - } - defer closeOnError(fd) - - if err := unix.SetNonblock(fd.Int(), true); err != nil { - return nil, nil, err - } - - protections := unix.PROT_READ - if !opts.Overwritable { - protections |= unix.PROT_WRITE - } - - mmap, err := unix.Mmap(fd.Int(), 0, perfBufferSize(perCPUBuffer), protections, unix.MAP_SHARED) - if err != nil { - return nil, nil, fmt.Errorf("can't mmap: %v", err) - } - - // This relies on the fact that we allocate an extra metadata page, - // and that the struct is smaller than an OS page. - // This use of unsafe.Pointer isn't explicitly sanctioned by the - // documentation, since a byte is smaller than sampledPerfEvent. - meta := (*unix.PerfEventMmapPage)(unsafe.Pointer(&mmap[0])) - - var reader ringReader - if opts.Overwritable { - reader = newReverseReader(meta, mmap[meta.Data_offset:meta.Data_offset+meta.Data_size]) - } else { - reader = newForwardReader(meta, mmap[meta.Data_offset:meta.Data_offset+meta.Data_size]) - } - - ring := &perfEventRing{ - cpu: cpu, - mmap: mmap, - ringReader: reader, - } - runtime.SetFinalizer(ring, (*perfEventRing).Close) - - return fd, ring, nil -} - -// perfBufferSize returns a valid mmap buffer size for use with perf_event_open (1+2^n pages) -func perfBufferSize(perCPUBuffer int) int { - pageSize := os.Getpagesize() - - // Smallest whole number of pages - nPages := (perCPUBuffer + pageSize - 1) / pageSize - - // Round up to nearest power of two number of pages - nPages = int(math.Pow(2, math.Ceil(math.Log2(float64(nPages))))) - - // Add one for metadata - nPages += 1 - - return nPages * pageSize -} - -func (ring *perfEventRing) Close() error { - runtime.SetFinalizer(ring, nil) - mmap := ring.mmap - ring.mmap = nil - return unix.Munmap(mmap) -} - -func createPerfEvent(cpu int, opts ReaderOptions) (*sys.FD, error) { - wakeup := 0 - bits := 0 - if opts.WakeupEvents > 0 { - wakeup = opts.WakeupEvents - } else { - wakeup = opts.Watermark - if wakeup == 0 { - wakeup = 1 - } - bits |= unix.PerfBitWatermark - } - - if opts.Overwritable { - bits |= unix.PerfBitWriteBackward - } - - attr := unix.PerfEventAttr{ - Type: unix.PERF_TYPE_SOFTWARE, - Config: unix.PERF_COUNT_SW_BPF_OUTPUT, - Bits: uint64(bits), - Sample_type: unix.PERF_SAMPLE_RAW, - Wakeup: uint32(wakeup), - } - - attr.Size = uint32(unsafe.Sizeof(attr)) - fd, err := unix.PerfEventOpen(&attr, -1, cpu, -1, unix.PERF_FLAG_FD_CLOEXEC) - if err != nil { - return nil, fmt.Errorf("can't create perf event: %w", err) - } - return sys.NewFD(fd) -} - -type ringReader interface { - loadHead() - size() int - remaining() int - writeTail() - Read(p []byte) (int, error) -} - -type forwardReader struct { - meta *unix.PerfEventMmapPage - head, tail uint64 - mask uint64 - ring []byte -} - -func newForwardReader(meta *unix.PerfEventMmapPage, ring []byte) *forwardReader { - return &forwardReader{ - meta: meta, - head: atomic.LoadUint64(&meta.Data_head), - tail: atomic.LoadUint64(&meta.Data_tail), - // cap is always a power of two - mask: uint64(cap(ring) - 1), - ring: ring, - } -} - -func (rr *forwardReader) loadHead() { - rr.head = atomic.LoadUint64(&rr.meta.Data_head) -} - -func (rr *forwardReader) size() int { - return len(rr.ring) -} - -func (rr *forwardReader) remaining() int { - return int((rr.head - rr.tail) & rr.mask) -} - -func (rr *forwardReader) writeTail() { - // Commit the new tail. This lets the kernel know that - // the ring buffer has been consumed. - atomic.StoreUint64(&rr.meta.Data_tail, rr.tail) -} - -func (rr *forwardReader) Read(p []byte) (int, error) { - start := int(rr.tail & rr.mask) - - n := len(p) - // Truncate if the read wraps in the ring buffer - if remainder := cap(rr.ring) - start; n > remainder { - n = remainder - } - - // Truncate if there isn't enough data - if remainder := int(rr.head - rr.tail); n > remainder { - n = remainder - } - - copy(p, rr.ring[start:start+n]) - rr.tail += uint64(n) - - if rr.tail == rr.head { - return n, io.EOF - } - - return n, nil -} - -type reverseReader struct { - meta *unix.PerfEventMmapPage - // head is the position where the kernel last wrote data. - head uint64 - // read is the position we read the next data from. Updated as reads are made. - read uint64 - // tail is the end of the ring buffer. No reads must be made past it. - tail uint64 - mask uint64 - ring []byte -} - -func newReverseReader(meta *unix.PerfEventMmapPage, ring []byte) *reverseReader { - rr := &reverseReader{ - meta: meta, - mask: uint64(cap(ring) - 1), - ring: ring, - } - rr.loadHead() - return rr -} - -func (rr *reverseReader) loadHead() { - // The diagram below represents an overwritable perf ring buffer: - // - // head read tail - // | | | - // V V V - // +---+--------+------------+---------+--------+ - // | |H-D....D|H-C........C|H-B.....B|H-A....A| - // +---+--------+------------+---------+--------+ - // <--Write from right to left - // Read from left to right--> - // (H means header) - // - // The buffer is read left to right beginning from head to tail. - // [head, read) is the read portion of the buffer, [read, tail) the unread one. - // read is adjusted as we progress through the buffer. - - // Avoid reading sample D multiple times by discarding unread samples C, B, A. - rr.tail = rr.head - - // Get the new head and starting reading from it. - rr.head = atomic.LoadUint64(&rr.meta.Data_head) - rr.read = rr.head - - if rr.tail-rr.head > uint64(cap(rr.ring)) { - // ring has been fully written, only permit at most cap(rr.ring) - // bytes to be read. - rr.tail = rr.head + uint64(cap(rr.ring)) - } -} - -func (rr *reverseReader) size() int { - return len(rr.ring) -} - -func (rr *reverseReader) remaining() int { - // remaining data is inaccurate for overwritable buffers - // once an overwrite happens, so return -1 here. - return -1 -} - -func (rr *reverseReader) writeTail() { - // We do not care about tail for over writable perf buffer. - // So, this function is noop. -} - -func (rr *reverseReader) Read(p []byte) (int, error) { - start := int(rr.read & rr.mask) - - n := len(p) - // Truncate if the read wraps in the ring buffer - if remainder := cap(rr.ring) - start; n > remainder { - n = remainder - } - - // Truncate if there isn't enough data - if remainder := int(rr.tail - rr.read); n > remainder { - n = remainder - } - - copy(p, rr.ring[start:start+n]) - rr.read += uint64(n) - - if rr.read == rr.tail { - return n, io.EOF - } - - return n, nil -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 3df9fb01c..95e5affca 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -58,7 +58,6 @@ github.com/cilium/ebpf/internal/testutils/fdtrace github.com/cilium/ebpf/internal/tracefs github.com/cilium/ebpf/internal/unix github.com/cilium/ebpf/link -github.com/cilium/ebpf/perf github.com/cilium/ebpf/ringbuf github.com/cilium/ebpf/rlimit # github.com/containernetworking/cni v1.1.2