Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/runmode-dpdk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,8 @@ static int DeviceConfigure(DPDKIfaceConfig *iconf)
SCReturnInt(retval);
}

iconf->is_pcap_iface = strcmp(dev_info.driver_name, "net_pcap") == 0;

if (iconf->nb_rx_queues > dev_info.max_rx_queues) {
SCLogError("%s: configured RX queues %u is higher than device maximum (%" PRIu16 ")",
iconf->iface, iconf->nb_rx_queues, dev_info.max_rx_queues);
Expand Down Expand Up @@ -1829,6 +1831,10 @@ static void *ParseDpdkConfigAndConfigureDevice(const char *iface)
}
SC_ATOMIC_RESET(iconf->workers_sync->worker_checked_in);
iconf->workers_sync->worker_cnt = iconf->threads;
if (iconf->is_pcap_iface) {
SC_ATOMIC_INIT(iconf->workers_sync->pcap_workers_left);
SC_ATOMIC_SET(iconf->workers_sync->pcap_workers_left, iconf->threads);
}

// initialize LiveDev DPDK values
LiveDevice *ldev_instance = LiveGetDevice(iface);
Expand Down
96 changes: 72 additions & 24 deletions src/source-dpdk.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ typedef struct DPDKThreadVars_ {
LiveDevice *livedev;
ChecksumValidationMode checksum_mode;
bool intr_enabled;
bool is_pcap_iface;
/* references to packet and drop counters */
StatsCounterId capture_dpdk_packets;
StatsCounterId capture_dpdk_rx_errs;
Expand Down Expand Up @@ -563,18 +564,29 @@ static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
}
}

/**
* \brief Main DPDK reading Loop function
*/
static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
static inline TmEcode ReceiveDPDKPkts(DPDKThreadVars *ptv, uint16_t nb_rx)
{
SCEnter();
DPDKThreadVars *ptv = (DPDKThreadVars *)data;
ptv->slot = ((TmSlot *)slot)->slot_next;
TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
if (ret != TM_ECODE_OK) {
SCReturnInt(ret);
ptv->pkts += (uint64_t)nb_rx;
for (uint16_t i = 0; i < nb_rx; i++) {
Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
if (p == NULL) {
rte_pktmbuf_free(ptv->received_mbufs[i]);
continue;
}
DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
TmqhOutputPacketpool(ptv->tv, p);
DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
return TM_ECODE_FAILED;
}
}
return TM_ECODE_OK;
}

static TmEcode ReceiveDPDKLoopLive(ThreadVars *tv, DPDKThreadVars *ptv)
{
while (true) {
if (unlikely(suricata_ctl_flags != 0)) {
HandleShutdown(ptv);
Expand All @@ -587,21 +599,38 @@ static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
continue;
}

ptv->pkts += (uint64_t)nb_rx;
for (uint16_t i = 0; i < nb_rx; i++) {
Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
if (p == NULL) {
rte_pktmbuf_free(ptv->received_mbufs[i]);
continue;
}
DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
TmqhOutputPacketpool(ptv->tv, p);
DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
SCReturnInt(EXIT_FAILURE);
if (ReceiveDPDKPkts(ptv, nb_rx) != TM_ECODE_OK) {
SCReturnInt(EXIT_FAILURE);
}

PeriodicDPDKDumpCounters(ptv);
StatsSyncCountersIfSignalled(&tv->stats);
}

SCReturnInt(TM_ECODE_OK);
}

static TmEcode ReceiveDPDKLoopPcap(ThreadVars *tv, DPDKThreadVars *ptv)
{
while (true) {
if (unlikely(suricata_ctl_flags != 0)) {
HandleShutdown(ptv);
break;
}

uint16_t nb_rx =
rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
if (nb_rx == 0) {
SCLogNotice("%s: PCAP end of file", ptv->livedev->dev);
if (SC_ATOMIC_SUB(ptv->workers_sync->pcap_workers_left, 1) == 1) {
EngineStop();
}
HandleShutdown(ptv);
break;
}

if (ReceiveDPDKPkts(ptv, nb_rx) != TM_ECODE_OK) {
SCReturnInt(EXIT_FAILURE);
}

PeriodicDPDKDumpCounters(ptv);
Expand All @@ -611,6 +640,24 @@ static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
SCReturnInt(TM_ECODE_OK);
}

/**
* \brief Main DPDK reading Loop function
*/
static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
DPDKThreadVars *ptv = (DPDKThreadVars *)data;
ptv->slot = ((TmSlot *)slot)->slot_next;
TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
if (ret != TM_ECODE_OK) {
SCReturnInt(ret);
}

if (ptv->is_pcap_iface)
return ReceiveDPDKLoopPcap(tv, ptv);
return ReceiveDPDKLoopLive(tv, ptv);
}

/**
* \brief Init function for ReceiveDPDK.
*
Expand Down Expand Up @@ -654,6 +701,7 @@ static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void

ptv->threads = dpdk_config->threads;
ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
ptv->is_pcap_iface = dpdk_config->is_pcap_iface;
ptv->port_id = dpdk_config->port_id;
ptv->out_port_id = dpdk_config->out_port_id;
ptv->port_socket_id = dpdk_config->socket_id;
Expand Down
2 changes: 2 additions & 0 deletions src/source-dpdk.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void DPDKSetTimevalOfMachineStart(void);
typedef struct DPDKWorkerSync_ {
uint16_t worker_cnt;
SC_ATOMIC_DECLARE(uint16_t, worker_checked_in);
SC_ATOMIC_DECLARE(uint16_t, pcap_workers_left);
} DPDKWorkerSync;

typedef struct DPDKIfaceConfig_ {
Expand Down Expand Up @@ -80,6 +81,7 @@ typedef struct DPDKIfaceConfig_ {
SC_ATOMIC_DECLARE(uint16_t, queue_id);
SC_ATOMIC_DECLARE(uint16_t, inconsistent_numa_cnt);
DPDKWorkerSync *workers_sync;
bool is_pcap_iface;
void (*DerefFunc)(void *);

struct rte_flow *flow[100];
Expand Down
Loading