Skip to content

NETOBSERV-2148: Switch PCA feature from using perf events to ringbuf #594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .mk/bc.mk
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ define MAPS
"direct_flows":"ringbuf",
"aggregated_flows":"hash",
"additional_flow_metrics":"per_cpu_hash",
"packets_record":"perf_event_array",
"packets_record":"ringbuf",
"dns_flows":"hash",
"global_counters":"per_cpu_array",
"filter_map":"lpm_trie",
Expand Down
8 changes: 3 additions & 5 deletions bpf/maps_definition.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ struct {
__uint(pinning, LIBBPF_PIN_BY_NAME);
} additional_flow_metrics SEC(".maps");

//PerfEvent Array for Packet Payloads
//Ringbuf for Packet Payloads
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__type(key, u32);
__type(value, u32);
__uint(max_entries, 256);
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 21); // 256 bytes * 1000 events/sec * 5sec "eviction time"
__uint(pinning, LIBBPF_PIN_BY_NAME);
} packet_record SEC(".maps");

Expand Down
73 changes: 37 additions & 36 deletions bpf/pca.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,39 @@

#include "utils.h"

static int attach_packet_payload(void *data, void *data_end, struct __sk_buff *skb) {
payload_meta meta;
u64 flags = BPF_F_CURRENT_CPU;
// Enable the flag to add packet header
// Packet payload follows immediately after the meta struct
u32 packetSize = (u32)(data_end - data);

// Record the current time.
u64 current_time = bpf_ktime_get_ns();

// For packets which are allocated non-linearly struct __sk_buff does not necessarily
// has all data lined up in memory but instead can be part of scatter gather lists.
// This command pulls data from the buffer but incurs data copying penalty.
if (packetSize <= skb->len) {
packetSize = skb->len;
if (bpf_skb_pull_data(skb, skb->len)) {
return TC_ACT_UNSPEC;
};
static inline void attach_packet_payload(struct __sk_buff *skb) {
payload_meta *event;
u32 packetSize = skb->len;

event = bpf_ringbuf_reserve(&packet_record, sizeof(payload_meta), 0);
if (!event) {
return;
}

if (!packetSize) {
// Release reserved ringbuf location
bpf_ringbuf_discard(event, 0);
return;
}

if (packetSize > MAX_PAYLOAD_SIZE) {
packetSize = MAX_PAYLOAD_SIZE;
}
// Set flag's upper 32 bits with the size of the paylaod and the bpf_perf_event_output will
// attach the specified amount of bytes from packet to the perf event
// https://github.com/xdp-project/xdp-tutorial/tree/9b25f0a039179aca1f66cba5492744d9f09662c1/tracing04-xdp-tcpdump
flags |= (u64)packetSize << 32;

meta.if_index = skb->ifindex;
meta.pkt_len = packetSize;
meta.timestamp = current_time;
if (bpf_perf_event_output(skb, &packet_record, flags, &meta, sizeof(meta))) {
return TC_ACT_OK;

event->if_index = skb->ifindex;
event->pkt_len = packetSize;
event->timestamp = bpf_ktime_get_ns();
// bpf_skb_load_bytes will handle cases when packets are allocated linearly or none-linearly where
// struct __sk_buff does not necessarily has all data lined up in memory but instead
// can be part of scatter gather lists.
// so no need to use bpf_skb_pull_data() which has performance side effects for the rest of that skb's lifetime.
if (bpf_skb_load_bytes(skb, 0, event->payload, packetSize)) {
// Release reserved ringbuf location
bpf_ringbuf_discard(event, 0);
return;
}
return TC_ACT_UNSPEC;
bpf_ringbuf_submit(event, 0);
return;
}

static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
Expand Down Expand Up @@ -62,19 +64,18 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
return true;
}

static inline int export_packet_payload(struct __sk_buff *skb, direction dir) {
static inline void export_packet_payload(struct __sk_buff *skb, direction dir) {
if (!enable_pca) {
return;
}
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
return 0;
return;
}

void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;

if (validate_pca_filter(skb, dir)) {
return attach_packet_payload(data, data_end, skb);
attach_packet_payload(skb);
}
return 0;
}

SEC("classifier/tc_pca_ingress")
Expand Down
4 changes: 3 additions & 1 deletion bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ typedef __u64 u64;
#define MAX_OBSERVED_INTERFACES 6
#define OBSERVED_DIRECTION_BOTH 3

#define MAX_PAYLOAD_SIZE 256

// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum direction_t {
INGRESS,
Expand Down Expand Up @@ -147,7 +149,6 @@ const static struct additional_metrics_t *unused3 __attribute__((unused));
const static struct dns_record_t *unused4 __attribute__((unused));
const static struct pkt_drops_t *unused5 __attribute__((unused));
const static struct translated_flow_t *unused6 __attribute__((unused));
const static struct observed_intf_t *unused13 __attribute__((unused));

// Attributes that uniquely identify a flow
typedef struct flow_id_t {
Expand Down Expand Up @@ -196,6 +197,7 @@ typedef struct payload_meta_t {
u32 if_index;
u32 pkt_len;
u64 timestamp; // timestamp when packet received by ebpf
u8 payload[MAX_PAYLOAD_SIZE];
} payload_meta;

// DNS Flow record used as key to correlate DNS query and response
Expand Down
3 changes: 2 additions & 1 deletion examples/packetcapture-dump/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ Start the packetcapture-client using: (in a secondary shell)

Start the agent using:
```bash
sudo TARGET_HOST=localhost TARGET_PORT=9990 ENABLE_PCA="true" FILTER_IP_CIDR="0.0.0.0/0" FILTER_PROTOCOL="TCP" FILTER_PORT=22 FILTER_ACTION="Accept" ./bin/netobserv-ebpf-agent
sudo TARGET_HOST=localhost TARGET_PORT=9990 ENABLE_PCA="true" FLOW_FILTER_RULES='[{"ip_cidr":"0.0.0.0/0","protocol":"TCP","action":"Accept"}]' ./bin/netobserv-ebpf-agent

```

You should see output such as:
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/packets_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/model"
"github.com/netobserv/netobserv-ebpf-agent/pkg/tracer"

"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -47,7 +47,7 @@ type ebpfPacketFetcher interface {
AttachTCX(iface ifaces.Interface) error
DetachTCX(iface ifaces.Interface) error
LookupAndDeleteMap(*metrics.Metrics) map[int][]*byte
ReadPerf() (perf.Record, error)
ReadPerf() (ringbuf.Record, error)
}

// PacketsAgent instantiates a new agent, given a configuration.
Expand Down
Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_x86_bpfel.o
Binary file not shown.
6 changes: 3 additions & 3 deletions pkg/flow/tracer_perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"fmt"
"time"

"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/netobserv/gopipes/pkg/node"
"github.com/netobserv/netobserv-ebpf-agent/pkg/model"
"github.com/sirupsen/logrus"
Expand All @@ -24,7 +24,7 @@ type PerfTracer struct {
}

type perfReader interface {
ReadPerf() (perf.Record, error)
ReadPerf() (ringbuf.Record, error)
}

func NewPerfTracer(
Expand All @@ -46,7 +46,7 @@ func (m *PerfTracer) TraceLoop(ctx context.Context) node.StartFunc[*model.Packet
default:
if err := m.listenAndForwardPerf(out); err != nil {

if errors.Is(err, perf.ErrClosed) {
if errors.Is(err, ringbuf.ErrClosed) {
pblog.Debug("Received signal, exiting..")
return
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/model/packet_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,25 @@ func NewPacketRecord(
return &pr
}

// ReadRawPacket reads a PacketRecord from a binary source, in LittleEndian order
// ReadRawPacket reads a PacketRecord from a binary source, in NativeEndian order
func ReadRawPacket(reader io.Reader) (*PacketRecord, error) {
var pr PacketRecord
currentTime := time.Now()
monotonicTimeNow := monotime.Now()
getLen := make([]byte, 4)
packetTimestamp := make([]byte, 8)
// Read IfIndex and discard it: To be used in other usecases
_ = binary.Read(reader, binary.LittleEndian, make([]byte, 4))
// Read Length of packet
_ = binary.Read(reader, binary.LittleEndian, getLen)
pr.Stream = make([]byte, binary.LittleEndian.Uint32(getLen))
// Read TimeStamp of packet
_ = binary.Read(reader, binary.LittleEndian, packetTimestamp)
// Read IfIndex and discard it: To be used in other use cases
_ = binary.Read(reader, binary.NativeEndian, make([]byte, 4))
// Read Length of a packet
_ = binary.Read(reader, binary.NativeEndian, getLen)
pr.Stream = make([]byte, binary.NativeEndian.Uint32(getLen))
// Read TimeStamp of a packet
_ = binary.Read(reader, binary.NativeEndian, packetTimestamp)
// The assumption is monotonic time should be as close to time recorded by ebpf.
// The difference is considered the delta time from current time.
tsDelta := time.Duration(uint64(monotonicTimeNow) - binary.LittleEndian.Uint64(packetTimestamp))
tsDelta := time.Duration(uint64(monotonicTimeNow) - binary.NativeEndian.Uint64(packetTimestamp))
pr.Time = currentTime.Add(-tsDelta)

err := binary.Read(reader, binary.LittleEndian, &pr.Stream)
err := binary.Read(reader, binary.NativeEndian, &pr.Stream)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that the reads of other fields should be NativeEndian...

But if you are reading data from skb->data, shouldn't you be reading in binary.BigEndian format? NetworkEndian == BigEndian

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its array of bytes so it shouldn't matter but when copied to userspace we need to be in host endian fmt ?

return &pr, err
}
4 changes: 2 additions & 2 deletions pkg/model/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ func (m *MacAddr) MarshalJSON() ([]byte, error) {
return []byte("\"" + m.String() + "\""), nil
}

// ReadFrom reads a Record from a binary source, in LittleEndian order
// ReadFrom reads a Record from a binary source, in NativeEndian order
func ReadFrom(reader io.Reader) (*RawRecord, error) {
var fr RawRecord
err := binary.Read(reader, binary.LittleEndian, &fr)
err := binary.Read(reader, binary.NativeEndian, &fr)
return &fr, err
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
cilium "github.com/cilium/ebpf"
"github.com/cilium/ebpf/btf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/gavv/monotime"
Expand Down Expand Up @@ -1266,7 +1265,7 @@ type PacketFetcher struct {
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
perfReader *perf.Reader
perfReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
Expand Down Expand Up @@ -1379,7 +1378,7 @@ func NewPacketFetcher(cfg *FlowFetcherConfig) (*PacketFetcher, error) {
}

// read packets from igress+egress perf array
packets, err := perf.NewReader(objects.PacketRecord, os.Getpagesize())
packets, err := ringbuf.NewReader(objects.BpfMaps.PacketRecord)
if err != nil {
return nil, fmt.Errorf("accessing to perf: %w", err)
}
Expand Down Expand Up @@ -1739,7 +1738,7 @@ func (p *PacketFetcher) Close() error {
return errors.New(`errors: "` + strings.Join(errStrings, `", "`) + `"`)
}

func (p *PacketFetcher) ReadPerf() (perf.Record, error) {
func (p *PacketFetcher) ReadPerf() (ringbuf.Record, error) {
return p.perfReader.Read()
}

Expand Down
5 changes: 0 additions & 5 deletions vendor/github.com/cilium/ebpf/perf/doc.go

This file was deleted.

Loading