diff --git a/src/runmode-dpdk.c b/src/runmode-dpdk.c index ef0c38679817..94f153a353e1 100644 --- a/src/runmode-dpdk.c +++ b/src/runmode-dpdk.c @@ -1672,6 +1672,8 @@ static int DeviceConfigure(DPDKIfaceConfig *iconf) SCReturnInt(retval); } + iconf->is_pcap_iface = strcmp(dev_info.driver_name, "net_pcap") == 0; + if (iconf->nb_rx_queues > dev_info.max_rx_queues) { SCLogError("%s: configured RX queues %u is higher than device maximum (%" PRIu16 ")", iconf->iface, iconf->nb_rx_queues, dev_info.max_rx_queues); @@ -1829,6 +1831,10 @@ static void *ParseDpdkConfigAndConfigureDevice(const char *iface) } SC_ATOMIC_RESET(iconf->workers_sync->worker_checked_in); iconf->workers_sync->worker_cnt = iconf->threads; + if (iconf->is_pcap_iface) { + SC_ATOMIC_INIT(iconf->workers_sync->pcap_workers_left); + SC_ATOMIC_SET(iconf->workers_sync->pcap_workers_left, iconf->threads); + } // initialize LiveDev DPDK values LiveDevice *ldev_instance = LiveGetDevice(iface); diff --git a/src/source-dpdk.c b/src/source-dpdk.c index 5e612337463c..f75e75ee6539 100644 --- a/src/source-dpdk.c +++ b/src/source-dpdk.c @@ -115,6 +115,7 @@ typedef struct DPDKThreadVars_ { LiveDevice *livedev; ChecksumValidationMode checksum_mode; bool intr_enabled; + bool is_pcap_iface; /* references to packet and drop counters */ StatsCounterId capture_dpdk_packets; StatsCounterId capture_dpdk_rx_errs; @@ -563,18 +564,29 @@ static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv) } } -/** - * \brief Main DPDK reading Loop function - */ -static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot) +static inline TmEcode ReceiveDPDKPkts(DPDKThreadVars *ptv, uint16_t nb_rx) { - SCEnter(); - DPDKThreadVars *ptv = (DPDKThreadVars *)data; - ptv->slot = ((TmSlot *)slot)->slot_next; - TmEcode ret = ReceiveDPDKLoopInit(tv, ptv); - if (ret != TM_ECODE_OK) { - SCReturnInt(ret); + ptv->pkts += (uint64_t)nb_rx; + for (uint16_t i = 0; i < nb_rx; i++) { + Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]); + if (p == NULL) { + rte_pktmbuf_free(ptv->received_mbufs[i]); + continue; + } + DPDKSegmentedMbufWarning(ptv->received_mbufs[i]); + PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *), + rte_pktmbuf_pkt_len(p->dpdk_v.mbuf)); + if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) { + TmqhOutputPacketpool(ptv->tv, p); + DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1); + return TM_ECODE_FAILED; + } } + return TM_ECODE_OK; +} + +static TmEcode ReceiveDPDKLoopLive(ThreadVars *tv, DPDKThreadVars *ptv) +{ while (true) { if (unlikely(suricata_ctl_flags != 0)) { HandleShutdown(ptv); @@ -587,21 +599,38 @@ static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot) continue; } - ptv->pkts += (uint64_t)nb_rx; - for (uint16_t i = 0; i < nb_rx; i++) { - Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]); - if (p == NULL) { - rte_pktmbuf_free(ptv->received_mbufs[i]); - continue; - } - DPDKSegmentedMbufWarning(ptv->received_mbufs[i]); - PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *), - rte_pktmbuf_pkt_len(p->dpdk_v.mbuf)); - if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) { - TmqhOutputPacketpool(ptv->tv, p); - DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1); - SCReturnInt(EXIT_FAILURE); + if (ReceiveDPDKPkts(ptv, nb_rx) != TM_ECODE_OK) { + SCReturnInt(EXIT_FAILURE); + } + + PeriodicDPDKDumpCounters(ptv); + StatsSyncCountersIfSignalled(&tv->stats); + } + + SCReturnInt(TM_ECODE_OK); +} + +static TmEcode ReceiveDPDKLoopPcap(ThreadVars *tv, DPDKThreadVars *ptv) +{ + while (true) { + if (unlikely(suricata_ctl_flags != 0)) { + HandleShutdown(ptv); + break; + } + + uint16_t nb_rx = + rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE); + if (nb_rx == 0) { + SCLogNotice("%s: PCAP end of file", ptv->livedev->dev); + if (SC_ATOMIC_SUB(ptv->workers_sync->pcap_workers_left, 1) == 1) { + EngineStop(); } + HandleShutdown(ptv); + break; + } + + if (ReceiveDPDKPkts(ptv, nb_rx) != TM_ECODE_OK) { + SCReturnInt(EXIT_FAILURE); } PeriodicDPDKDumpCounters(ptv); @@ -611,6 +640,24 @@ static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot) SCReturnInt(TM_ECODE_OK); } +/** + * \brief Main DPDK reading Loop function + */ +static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot) +{ + SCEnter(); + DPDKThreadVars *ptv = (DPDKThreadVars *)data; + ptv->slot = ((TmSlot *)slot)->slot_next; + TmEcode ret = ReceiveDPDKLoopInit(tv, ptv); + if (ret != TM_ECODE_OK) { + SCReturnInt(ret); + } + + if (ptv->is_pcap_iface) + return ReceiveDPDKLoopPcap(tv, ptv); + return ReceiveDPDKLoopLive(tv, ptv); +} + /** * \brief Init function for ReceiveDPDK. * @@ -654,6 +701,7 @@ static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void ptv->threads = dpdk_config->threads; ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false; + ptv->is_pcap_iface = dpdk_config->is_pcap_iface; ptv->port_id = dpdk_config->port_id; ptv->out_port_id = dpdk_config->out_port_id; ptv->port_socket_id = dpdk_config->socket_id; diff --git a/src/source-dpdk.h b/src/source-dpdk.h index 070d569fe27a..e7838307533a 100644 --- a/src/source-dpdk.h +++ b/src/source-dpdk.h @@ -48,6 +48,7 @@ void DPDKSetTimevalOfMachineStart(void); typedef struct DPDKWorkerSync_ { uint16_t worker_cnt; SC_ATOMIC_DECLARE(uint16_t, worker_checked_in); + SC_ATOMIC_DECLARE(uint16_t, pcap_workers_left); } DPDKWorkerSync; typedef struct DPDKIfaceConfig_ { @@ -80,6 +81,7 @@ typedef struct DPDKIfaceConfig_ { SC_ATOMIC_DECLARE(uint16_t, queue_id); SC_ATOMIC_DECLARE(uint16_t, inconsistent_numa_cnt); DPDKWorkerSync *workers_sync; + bool is_pcap_iface; void (*DerefFunc)(void *); struct rte_flow *flow[100];