Suricata源码阅读(五):线程模型

0x00 线程模块注册

在先前的几篇文章中,我们分析了 Suricata 如何从 rules 文件导入规则、如何构建 siggroup、如何执行检测。现在,我们来分析这些组件在内存中的结构。

此前提到,FlowWorker 在线程中运行。 src/flow-worker.c 暴露了这样一个接口:

void TmModuleFlowWorkerRegister (void)
{
    tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
    tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
    tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
    tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
    tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
    tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats;
    tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
    tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_STREAM_TM|TM_FLAG_DETECT_TM;
}

它在 src/suricata.c 中被调用:

void RegisterAllModules(void)
{
    /* commanders */
    TmModuleUnixManagerRegister();
    /* managers */
    TmModuleFlowManagerRegister();
    TmModuleFlowRecyclerRegister();
    TmModuleBypassedFlowManagerRegister();
    /* nfq */
    TmModuleReceiveNFQRegister();
    TmModuleVerdictNFQRegister();
    TmModuleDecodeNFQRegister();
    /* ipfw */
    TmModuleReceiveIPFWRegister();
    TmModuleVerdictIPFWRegister();
    TmModuleDecodeIPFWRegister();
    /* pcap live */
    TmModuleReceivePcapRegister();
    TmModuleDecodePcapRegister();
    /* pcap file */
    TmModuleReceivePcapFileRegister();
    TmModuleDecodePcapFileRegister();
    /* af-packet */
    TmModuleReceiveAFPRegister();
    TmModuleDecodeAFPRegister();
    /* af-xdp */
    TmModuleReceiveAFXDPRegister();
    TmModuleDecodeAFXDPRegister();
    /* netmap */
    TmModuleReceiveNetmapRegister();
    TmModuleDecodeNetmapRegister();
    /* pfring */
    TmModuleReceivePfringRegister();
    TmModuleDecodePfringRegister();
    /* dag file */
    TmModuleReceiveErfFileRegister();
    TmModuleDecodeErfFileRegister();
    /* dag live */
    TmModuleReceiveErfDagRegister();
    TmModuleDecodeErfDagRegister();
    /* napatech */
    TmModuleNapatechStreamRegister();
    TmModuleNapatechDecodeRegister();

    /* flow worker */
    TmModuleFlowWorkerRegister();
    /* respond-reject */
    TmModuleRespondRejectRegister();

    /* log api */
    TmModuleLoggerRegister();
    TmModuleStatsLoggerRegister();

    TmModuleDebugList();
    /* nflog */
    TmModuleReceiveNFLOGRegister();
    TmModuleDecodeNFLOGRegister();

    /* windivert */
    TmModuleReceiveWinDivertRegister();
    TmModuleVerdictWinDivertRegister();
    TmModuleDecodeWinDivertRegister();

    /* Dpdk */
    TmModuleReceiveDPDKRegister();
    TmModuleDecodeDPDKRegister();
}

观察其他模块的注册函数,均为填写 tmm_modules 数组中的对应位置。 TmModule 的相关定义是:

typedef struct TmModule_ {
    const char *name;

    /** thread handling */
    TmEcode (*ThreadInit)(ThreadVars *, const void *, void **);
    void (*ThreadExitPrintStats)(ThreadVars *, void *);
    TmEcode (*ThreadDeinit)(ThreadVars *, void *);

    /** the packet processing function */
    TmEcode (*Func)(ThreadVars *, Packet *, void *);

    TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);

    /** terminates the capture loop in PktAcqLoop */
    TmEcode (*PktAcqBreakLoop)(ThreadVars *, void *);

    /** does a thread still have tasks to complete before it can be killed?
     *  \retval bool
     *  \param tv threadvars
     *  \param thread_data thread module thread data (e.g. FlowWorkerThreadData for FlowWorker) */
    bool (*ThreadBusy)(ThreadVars *tv, void *thread_data);

    TmEcode (*Management)(ThreadVars *, void *);

    /** global Init/DeInit */
    TmEcode (*Init)(void);
    TmEcode (*DeInit)(void);
#ifdef UNITTESTS
    void (*RegisterTests)(void);
#endif
    uint8_t cap_flags; /**< Flags to indicate the capability requirement of
                           the given TmModule */
    /* Other flags used by the module */
    uint8_t flags;
} TmModule;

TmModule tmm_modules[TMM_SIZE];   // 42

这里指定了各种回调函数,不同的模块所填写的字段亦有区别。例如,FlowWorker 模块填写了 Func 字段,而 ReceivePcapFile 模块填写了 PktAcqLoop 字段。

RegisterAllModules() 是被 PostConfLoadedSetup() 调用的。我们在第二篇文章中见过它,它是在“配置文件读入之后、规则文件读入之前”执行的。跟进:

int PostConfLoadedSetup(SCInstance *suri)
{
    /* do this as early as possible #1577 #1955 */
#ifdef HAVE_LUAJIT
    if (LuajitSetupStatesPool() != 0) {
        SCReturnInt(TM_ECODE_FAILED);
    }
#endif

    /* load the pattern matchers */
    // 把 AC、ACBS、ACTile 注册到 mpm_table[]
    MpmTableSetup();
    // 把 BM 注册到 spm_table[]
    SpmTableSetup();

    // 按照配置文件,处理 offloading 相关设置
    int disable_offloading;
    if (ConfGetBool("capture.disable-offloading", &disable_offloading) == 0)
        disable_offloading = 1;
    if (disable_offloading) {
        LiveSetOffloadDisable();
    } else {
        LiveSetOffloadWarn();
    }

    // 是否启用 checksum 验证
    if (suri->checksum_validation == -1) {
        const char *cv = NULL;
        if (ConfGet("capture.checksum-validation", &cv) == 1) {
            if (strcmp(cv, "none") == 0) {
                suri->checksum_validation = 0;
            } else if (strcmp(cv, "all") == 0) {
                suri->checksum_validation = 1;
            }
        }
    }
    switch (suri->checksum_validation) {
        case 0:
            ConfSet("stream.checksum-validation", "0");
            break;
        case 1:
            ConfSet("stream.checksum-validation", "1");
            break;
    }

    // 如果现在正处于 custom mode,则把当前 runmode 写回 conf 树
    if (suri->runmode_custom_mode) {
        ConfSet("runmode", suri->runmode_custom_mode);
    }

    // 下面是一些杂务
    StorageInit();
#ifdef HAVE_PACKET_EBPF
    if (suri->run_mode == RUNMODE_AFP_DEV) {
        EBPFRegisterExtension();
        LiveDevRegisterExtension();
    }
#endif
    RegisterFlowBypassInfo();

    MacSetRegisterFlowStorage();

#ifdef HAVE_PLUGINS
    SCPluginsLoad(suri->capture_plugin_name, suri->capture_plugin_args);
#endif

    LiveDeviceFinalize(); // must be after EBPF extension registration

    RunModeEngineIsIPS(
            suricata.run_mode, suricata.runmode_custom_mode, suricata.capture_plugin_name);

    if (EngineModeIsUnknown()) { // if still uninitialized, set the default
        SCLogInfo("Setting engine mode to IDS mode by default");
        EngineModeSetIDS();
    }

    SetMasterExceptionPolicy();

    /* Must occur prior to output mod registration
       and app layer setup. */
    FeatureTrackingRegister();

    AppLayerSetup();

    /* Suricata will use this umask if provided. By default it will use the
       umask passed on from the shell. */
    const char *custom_umask;
    if (ConfGet("umask", &custom_umask) == 1) {
        uint16_t mask;
        if (StringParseUint16(&mask, 8, (uint16_t)strlen(custom_umask), custom_umask) > 0) {
            umask((mode_t)mask);
        }
    }


    if (ConfigGetCaptureValue(suri) != TM_ECODE_OK) {
        SCReturnInt(TM_ECODE_FAILED);
    }

#ifdef NFQ
    if (suri->run_mode == RUNMODE_NFQ)
        NFQInitConfig(false);
#endif

    /* Load the Host-OS lookup. */
    SCHInfoLoadFromConfig();

    if (suri->run_mode == RUNMODE_ENGINE_ANALYSIS) {
        SCLogInfo("== Carrying out Engine Analysis ==");
        const char *temp = NULL;
        if (ConfGet("engine-analysis", &temp) == 0) {
            SCLogInfo("no engine-analysis parameter(s) defined in conf file.  "
                      "Please define/enable them in the conf to use this "
                      "feature.");
            SCReturnInt(TM_ECODE_FAILED);
        }
    }

    /* hardcoded initialization code */
    SigTableSetup(); /* load the rule keywords */
    SigTableApplyStrictCommandLineOption(suri->strict_rule_parsing_string);
    TmqhSetup();

    TagInitCtx();
    PacketAlertTagInit();
    ThresholdInit();
    HostBitInitCtx();
    IPPairBitInitCtx();

    if (DetectAddressTestConfVars() < 0) {
        SCLogError(
                "basic address vars test failed. Please check %s for errors", suri->conf_filename);
        SCReturnInt(TM_ECODE_FAILED);
    }
    if (DetectPortTestConfVars() < 0) {
        SCLogError("basic port vars test failed. Please check %s for errors", suri->conf_filename);
        SCReturnInt(TM_ECODE_FAILED);
    }

    // 此处注册所有模块
    RegisterAllModules();
    
    
    
    AppLayerHtpNeedFileInspection();

    StorageFinalize();

    TmModuleRunInit();

    if (MayDaemonize(suri) != TM_ECODE_OK)
        SCReturnInt(TM_ECODE_FAILED);

    if (InitSignalHandler(suri) != TM_ECODE_OK)
        SCReturnInt(TM_ECODE_FAILED);

    /* Check for the existence of the default logging directory which we pick
     * from suricata.yaml.  If not found, shut the engine down */
    suri->log_dir = ConfigGetLogDirectory();

    if (ConfigCheckLogDirectoryExists(suri->log_dir) != TM_ECODE_OK) {
        SCLogError("The logging directory \"%s\" "
                   "supplied by %s (default-log-dir) doesn't exist. "
                   "Shutting down the engine",
                suri->log_dir, suri->conf_filename);
        SCReturnInt(TM_ECODE_FAILED);
    }
    if (!IsLogDirectoryWritable(suri->log_dir)) {
        SCLogError("The logging directory \"%s\" "
                   "supplied by %s (default-log-dir) is not writable. "
                   "Shutting down the engine",
                suri->log_dir, suri->conf_filename);
        SCReturnInt(TM_ECODE_FAILED);
    }

    if (suri->disabled_detect) {
        SCLogConfig("detection engine disabled");
        /* disable raw reassembly */
        (void)ConfSetFinal("stream.reassembly.raw", "false");
    }

    HostInitConfig(HOST_VERBOSE);

    CoredumpLoadConfig();

    DecodeGlobalConfig();

    /* hostmode depends on engine mode being set */
    PostConfLoadedSetupHostMode();

    PreRunInit(suri->run_mode);

    SCReturnInt(TM_ECODE_OK);
}

至此,我们知道了各个模块的注册过程。然而,所谓“注册”只是把模块的相关属性(模块名、回调函数地址等)写进了表,我们需要进一步追踪这些线程是如何启动的。

0x01 线程启动

修改 launch.json,在 pthread_create() 下断点:

            "initCommands": [
                "breakpoint set -n pthread_create"
            ]

定位到 SuricataMain()

观察代码:

int SuricataMain(int argc, char **argv)
{
    /* 略 */

    // 下面的函数载入了规则文件,即第二篇文章所分析的内容
    PostConfLoadedDetectSetup(&suricata);
    if (suricata.run_mode == RUNMODE_ENGINE_ANALYSIS) {
        goto out;
    } else if (suricata.run_mode == RUNMODE_CONF_TEST){
        SCLogNotice("Configuration provided was successfully loaded. Exiting.");
        goto out;
    } else if (suricata.run_mode == RUNMODE_DUMP_FEATURES) {
        FeatureDump();
        goto out;
    }

    SystemHugepageSnapshot *prerun_snap = NULL;
    if (run_mode == RUNMODE_DPDK)
        prerun_snap = SystemHugepageSnapshotCreate();

    SCSetStartTime(&suricata);
    
    // 创建各个线程
    RunModeDispatch(suricata.run_mode, suricata.runmode_custom_mode,
            suricata.capture_plugin_name, suricata.capture_plugin_args);
    if (suricata.run_mode != RUNMODE_UNIX_SOCKET) {
        UnixManagerThreadSpawnNonRunmode(suricata.unix_socket_enabled);
    }

    /* Wait till all the threads have been initialized */
    if (TmThreadWaitOnThreadInit() == TM_ECODE_FAILED) {
        FatalError("Engine initialization failed, "
                   "aborting...");
    }

    /* nproc 相关,略 */
    
    SC_ATOMIC_SET(engine_stage, SURICATA_RUNTIME);
    PacketPoolPostRunmodes();

    /* Un-pause all the paused threads */
    TmThreadContinueThreads();

    /* Must ensure all threads are fully operational before continuing with init process */
    if (TmThreadWaitOnThreadRunning() != TM_ECODE_OK) {
        exit(EXIT_FAILURE);
    }

    /* Print notice and send OS specific notification of threads in running state */
    OnNotifyRunning();

    PostRunStartedDetectSetup(&suricata);
    
    /* dpdk 相关,略 */

    SCPledge();
    
    // 主循环,每 10ms 执行一次
    SuricataMainLoop(&suricata);

    /* Update the engine stage/status flag */
    SC_ATOMIC_SET(engine_stage, SURICATA_DEINIT);

    UnixSocketKillSocketThread();
    PostRunDeinit(suricata.run_mode, &suricata.start_time);
    /* kill remaining threads */
    TmThreadKillThreads();

out:
    GlobalsDestroy(&suricata);

    exit(EXIT_SUCCESS);
}

现在跟进创建线程的函数 RunModeDispatch()

void RunModeDispatch(int runmode, const char *custom_mode, const char *capture_plugin_name,
        const char *capture_plugin_args)
{
    char *local_custom_mode = NULL;

    // 确定 custom_mode 字符串,若未指定则使用默认模式
    // 例如,runmode=RUNMODE_PCAP_FILE 时,默认模式是 autofp
    if (custom_mode == NULL) {
        custom_mode = RunModeGetConfOrDefault(runmode, capture_plugin_name);
        if (custom_mode == NULL)
            FatalError("Unknown runtime mode. Aborting");
    }
    
    // 查 runmodes 表获取 RunMode 对象
    RunMode *mode = RunModeGetCustomMode(runmode, custom_mode);
    if (mode == NULL) {
        SCLogError("The custom type \"%s\" doesn't exist "
                   "for this runmode type \"%s\".  Please use --list-runmodes to "
                   "see available custom types for this runmode",
                custom_mode, RunModeTranslateModeToName(runmode));
        exit(EXIT_FAILURE);
    }

    /* Export the custom mode */
    if (active_runmode) {
        SCFree(active_runmode);
    }
    active_runmode = SCStrdup(custom_mode);
    if (unlikely(active_runmode == NULL)) {
        FatalError("Unable to dup active mode");
    }

    if (strcasecmp(active_runmode, "autofp") == 0) {
        TmqhFlowPrintAutofpHandler();
    }

    // 在此创建了工作线程
    mode->RunModeFunc();

    if (local_custom_mode != NULL)
        SCFree(local_custom_mode);

    /* Check if the alloted queues have at least 1 reader and writer */
    TmValidateQueueState();

    // 创建 FlowManager(FM#01)、FlowRecycler(FR#01)、StatsWakeup(CW) 线程
    if (runmode != RUNMODE_UNIX_SOCKET) {
        /* spawn management threads */
        FlowManagerThreadSpawn();
        FlowRecyclerThreadSpawn();
        if (RunModeNeedsBypassManager()) {
            BypassedFlowManagerThreadSpawn();
        }
        StatsSpawnThreads();
    }
}

可以看到,创建工作线程的代码是 mode->RunModeFunc(),它是由各个 runmode 自己定义的;另外,在 runmode 的线程创建完成之后,还会创建 FlowManager、FlowRecycler、StatsWakeup 线程。

我们来看 RunMode 对象的定义:

typedef struct RunMode_ {
    /* the runmode type */
    enum RunModes runmode;
    const char *name;
    const char *description;
    /* runmode function */
    int (*RunModeFunc)(void);
    void (*RunModeIsIPSEnabled)(void);
} RunMode;

typedef struct RunModes_ {
    int cnt;
    RunMode *runmodes;
} RunModes;

static RunModes runmodes[RUNMODE_USER_MAX];  // 18

可见 runmodes 表中有 18 个标准 runmode,如 RUNMODE_PCAP_DEV, RUNMODE_PCAP_FILE 等。而一个 runmode 可以支持多个 custom mode,例如 RUNMODE_PCAP_FILE 这个 runmode 支持 singleautofp 两个 custom mode。

p runmodes[2].runmodes[0]
(RunMode) {
  runmode = RUNMODE_PCAP_FILE
  name = 0x00005555560c9750 "single"
  description = 0x00005555560c9770 "Single threaded pcap file mode"
  RunModeFunc = 0x000055555591f9f0 (suricata`RunModeFilePcapSingle at runmode-pcap-file.c:56)
  RunModeIsIPSEnabled = 0x0000000000000000
}

p runmodes[2].runmodes[1]
(RunMode) {
  runmode = RUNMODE_PCAP_FILE
  name = 0x00005555560c9800 "autofp"
  description = 0x00005555560c9820 "Multi-threaded pcap file mode. Packets from each flow are assigned to a consistent detection thread"
  RunModeFunc = 0x000055555591fc90 (suricata`RunModeFilePcapAutoFp at runmode-pcap-file.c:121)
  RunModeIsIPSEnabled = 0x0000000000000000
}

这张表的注册过程我们暂且不论。总之,目前 RunModeFunc 指向了 RunModeFilePcapSingle() 函数,跟进:

/**
 * \brief Single thread version of the Pcap file processing.
 */
int RunModeFilePcapSingle(void)
{
    const char *file = NULL;
    char tname[TM_THREAD_NAME_MAX];

    if (ConfGet("pcap-file.file", &file) == 0) {
        FatalError("Failed retrieving pcap-file from Conf");
    }

    // 设置 live_time_tracking = false
    TimeModeSetOffline();

    // 清零几个变量
    PcapFileGlobalInit();

    snprintf(tname, sizeof(tname), "%s#01", thread_name_single); // "W#01"

    // 这个函数的参数:name, inq_name, inqh_name, outq_name, outqh_name, slots
    // 它返回一个 ThreadVars
    ThreadVars *tv = TmThreadCreatePacketHandler(tname,
                                                 "packetpool", "packetpool",
                                                 "packetpool", "packetpool",
                                                 "pktacqloop");
    if (tv == NULL) {
        FatalError("threading setup failed");
    }

    // 下面,根据 name 查 tmm_modules 表,获得 TmModule 对象,创建 slot
    
    TmModule *tm_module = TmModuleGetByName("ReceivePcapFile");
    if (tm_module == NULL) {
        FatalError("TmModuleGetByName failed for ReceivePcap");
    }
    TmSlotSetFuncAppend(tv, tm_module, file);

    tm_module = TmModuleGetByName("DecodePcapFile");
    if (tm_module == NULL) {
        FatalError("TmModuleGetByName DecodePcap failed");
    }
    TmSlotSetFuncAppend(tv, tm_module, NULL);

    tm_module = TmModuleGetByName("FlowWorker");
    if (tm_module == NULL) {
        FatalError("TmModuleGetByName for FlowWorker failed");
    }
    TmSlotSetFuncAppend(tv, tm_module, NULL);

    // 设置 cpu_affinity
    TmThreadSetCPU(tv, WORKER_CPU_SET);

    // 创建线程
    if (TmThreadSpawn(tv) != TM_ECODE_OK) {
        FatalError("TmThreadSpawn failed");
    }
    return 0;
}

上面的函数中,首先创建了一个 ThreadVars 结构,然后给几个模块添加 slot,最后调用 TmThreadSpawn(tv) 创建线程。我们细细观察 ThreadVars

/** \brief Per thread variable structure */
typedef struct ThreadVars_ {
    pthread_t t;
    /** function pointer to the function that runs the packet pipeline for
     *  this thread. It is passed directly to pthread_create(), hence the
     *  void pointers in and out. */
    void *(*tm_func)(void *);

    char name[16];
    char *printable_name;
    char *thread_group_name;

    uint8_t thread_setup_flags;

    /** the type of thread as defined in tm-threads.h (TVT_PPT, TVT_MGMT) */
    uint8_t type;

    uint16_t cpu_affinity; /** cpu or core number to set affinity to */
    int thread_priority; /** priority (real time) for this thread. Look at threads.h */


    /** TmModule::flags for each module part of this thread */
    uint8_t tmm_flags;

    uint8_t cap_flags; /**< Flags to indicate the capabilities of all the
                            TmModules registered under this thread */
    uint8_t inq_id;
    uint8_t outq_id;

    /** local id */
    int id;

    /** incoming queue and handler */
    Tmq *inq;
    struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);

    SC_ATOMIC_DECLARE(uint32_t, flags);

    /** list of of TmSlot objects together forming the packet pipeline. */
    struct TmSlot_ *tm_slots;

    /** pointer to the flowworker in the pipeline. Used as starting point
     *  for injected packets. Can be NULL if the flowworker is not part
     *  of this thread. */
    struct TmSlot_ *tm_flowworker;

    /** outgoing queue and handler */
    Tmq *outq;
    void *outctx;
    void (*tmqh_out)(struct ThreadVars_ *, struct Packet_ *);

    /** Queue for decoders to temporarily store extra packets they
     *  generate. These packets are generated as part of the tunnel
     *  handling, and are processed directly after the "real" packet
     *  from the current position in the pipeline. */
    PacketQueueNoLock decode_pq;

    /** Stream packet queue for flow time out injection. Either a pointer to the
     *  workers input queue or to stream_pq_local */
    struct PacketQueue_ *stream_pq;
    struct PacketQueue_ *stream_pq_local;

    /* counters */

    /** private counter store: counter updates modify this */
    StatsPrivateThreadContext perf_private_ctx;

    /** pointer to the next thread */
    struct ThreadVars_ *next;

    /** public counter store: counter syncs update this */
    StatsPublicThreadContext perf_public_ctx;

    /* mutex and condition used by management threads */

    SCCtrlMutex *ctrl_mutex;
    SCCtrlCondT *ctrl_cond;

    struct FlowQueue_ *flow_queue;
    bool break_loop;

} ThreadVars;

可见 ThreadVars 里面就是本线程的相关变量。之前的代码中,针对 ReceivePcapFile, DecodePcapFile, FlowWorker 这三个 tm_module 创建了 slot,而 slot 的定义如下:

typedef struct TmSlot_ {
    
    // 下面这个函数指针是 slot 的核心属性,语义是 SlotFunc 或 PktAcqLoop 或 Management
    union {
        TmSlotFunc SlotFunc;
        TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
        TmEcode (*Management)(ThreadVars *, void *);
    };
    
    // 链表结构
    struct TmSlot_ *slot_next;

    SC_ATOMIC_DECLARE(void *, slot_data);

    /** copy of the TmModule::flags */
    uint8_t tm_flags;

    /* store the thread module id */
    int tm_id;

    TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
    void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
    TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);

    /* data storage */
    const void *slot_initdata;

} TmSlot;

slot 会形成一个链表结构。在三次 append 之后,tv 里面的三个 slot 的函数指针分别指向: ReceivePcapFileLoop, DecodePcapFile, FlowWorker。向 tv 中添加 slot 的逻辑如下:

void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
{
    TmSlot *slot = SCMalloc(sizeof(TmSlot));
    if (unlikely(slot == NULL))
        return;
    
    memset(slot, 0, sizeof(TmSlot));
    SC_ATOMIC_INITPTR(slot->slot_data);
    slot->SlotThreadInit = tm->ThreadInit;
    slot->slot_initdata = data;
    
    // 使用模块里的函数指针,填写 slot 里的函数指针
    if (tm->Func) {
        slot->SlotFunc = tm->Func;
    } else if (tm->PktAcqLoop) {
        slot->PktAcqLoop = tm->PktAcqLoop;
        if (tm->PktAcqBreakLoop) {
            tv->break_loop = true;
        }
    } else if (tm->Management) {
        slot->Management = tm->Management;
    }
    
    
    slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats;
    slot->SlotThreadDeinit = tm->ThreadDeinit;
    slot->tm_id = TmModuleGetIDForTM(tm);
    slot->tm_flags |= tm->flags;

    tv->tmm_flags |= tm->flags;
    tv->cap_flags |= tm->cap_flags;

    // 链表插入
    if (tv->tm_slots == NULL) {
        tv->tm_slots = slot;
    } else {
        TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;

        /* get the last slot */
        for ( ; a != NULL; a = a->slot_next) {
             b = a;
        }
        /* append the new slot */
        if (b != NULL) {
            b->slot_next = slot;
        }
    }
    return;
}

slot 链表组装完成之后,调用 TmThreadSpawn() 创建线程:

/**
 * \brief Spawns a thread associated with the ThreadVars instance tv
 *
 * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
 */
TmEcode TmThreadSpawn(ThreadVars *tv)
{
    pthread_attr_t attr;
    if (tv->tm_func == NULL) {
        FatalError("No thread function set");
    }

    /* Initialize and set thread detached attribute */
    pthread_attr_init(&attr);

    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    /* Adjust thread stack size if configured */
    if (threading_set_stack_size) {
        SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
        if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
            FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
                    threading_set_stack_size);
        }
    }

    // 创建线程
    int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
    if (rc) {
        FatalError("Unable to create thread with pthread_create(): retval %d: %s", rc,
                strerror(errno));
    }

    TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE);

    TmThreadAppend(tv, tv->type);
    return TM_ECODE_OK;
}

我们应该特别关注 tv 中的 slot 结构。不同的 runmode 和 custom mode,产生的 slot 也很不一样。例如,当 runmode 为 pcap、custom mode 为 autofp 时,其 RunModeFunc 如下:

int RunModeFilePcapAutoFp(void)
{
    /* 初始化,略 */

    uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
    if (ncpus > 0)
        cpu = 1;

    int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
    if (thread_max == 0)
        thread_max = ncpus * threading_detect_ratio;
    if (thread_max < 1)
        thread_max = 1;
    if (thread_max > 1024)
        thread_max = 1024;

    queues = RunmodeAutoFpCreatePickupQueuesString(thread_max);
    if (queues == NULL) {
        FatalError("RunmodeAutoFpCreatePickupQueuesString failed");
    }

    snprintf(tname, sizeof(tname), "%s#01", thread_name_autofp);

    // 构造 tv(这个线程用于解析 pcap 文件)
    ThreadVars *tv_receivepcap =
        TmThreadCreatePacketHandler(tname,
                                    "packetpool", "packetpool",
                                    queues, "flow",
                                    "pktacqloop");
    SCFree(queues);

    if (tv_receivepcap == NULL) {
        FatalError("threading setup failed");
    }
    
    // 添加 slot:ReceivePcapFile
    TmModule *tm_module = TmModuleGetByName("ReceivePcapFile");
    if (tm_module == NULL) {
        FatalError("TmModuleGetByName failed for ReceivePcap");
    }
    TmSlotSetFuncAppend(tv_receivepcap, tm_module, file);

    // 添加 slot:DecodePcapFile
    tm_module = TmModuleGetByName("DecodePcapFile");
    if (tm_module == NULL) {
        FatalError("TmModuleGetByName DecodePcap failed");
    }
    TmSlotSetFuncAppend(tv_receivepcap, tm_module, NULL);

    TmThreadSetCPU(tv_receivepcap, RECEIVE_CPU_SET);

    // 创建 pcap 解析线程
    if (TmThreadSpawn(tv_receivepcap) != TM_ECODE_OK) {
        FatalError("TmThreadSpawn failed");
    }

    for (thread = 0; thread < (uint16_t)thread_max; thread++) {
        snprintf(tname, sizeof(tname), "%s#%02d", thread_name_workers, thread + 1);
        snprintf(qname, sizeof(qname), "pickup%d", thread + 1);

        SCLogDebug("tname %s, qname %s", tname, qname);
        SCLogDebug("Assigning %s affinity to cpu %u", tname, cpu);

        // 构造 tv(这个线程用于检测)
        ThreadVars *tv_detect_ncpu =
            TmThreadCreatePacketHandler(tname,
                                        qname, "flow",
                                        "packetpool", "packetpool",
                                        "varslot");
        if (tv_detect_ncpu == NULL) {
            FatalError("TmThreadsCreate failed");
        }

        // 添加 slot:FlowWorker
        tm_module = TmModuleGetByName("FlowWorker");
        if (tm_module == NULL) {
            FatalError("TmModuleGetByName for FlowWorker failed");
        }
        TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);

        TmThreadSetGroupName(tv_detect_ncpu, "Detect");

        TmThreadSetCPU(tv_detect_ncpu, WORKER_CPU_SET);

        // 创建检测线程
        if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
            FatalError("TmThreadSpawn failed");
        }

        if ((cpu + 1) == ncpus)
            cpu = 0;
        else
            cpu++;
    }

    return 0;
}

在 autofp 模式下,有一个 pcap 文件解析线程(slot 为 ReceivePcapFileDecodePcapFile),标记为 RX#01;另有若干个 worker 线程(slot 为 FlowWorker),标记为 W#0x。如下图所示。

现在,对于任何 runmode 与 custom mode 的组合,我们都能找到负责“启动所有线程”的逻辑——翻查 runmodes 数组,找到特定 RunMode ,观察 RunModeFunc 指向的启动函数。

下一个问题是,数据是如何从 pcap 文件流转到 flow worker 的。

0x02 single 模式下的 packet 传递

我们已经在前一篇文章中分析了 flow worker 本身。现在我们在 single 模式下运行程序,在 FlowWorker 下断,看一眼调用栈:

[线程 W#01]
FlowWorker (\home\neko\suricata-7.0.8\src\flow-worker.c:567)
TmThreadsSlotVarRun (\home\neko\suricata-7.0.8\src\tm-threads.c:135)
TmThreadsSlotProcessPkt (\home\neko\suricata-7.0.8\src\tm-threads.h:200)
PcapFileCallbackLoop (\home\neko\suricata-7.0.8\src\source-pcap-file-helper.c:108)
PcapFileDispatch (\home\neko\suricata-7.0.8\src\source-pcap-file-helper.c:133)
ReceivePcapFileLoop (\home\neko\suricata-7.0.8\src\source-pcap-file.c:180)
TmThreadsSlotPktAcqLoop (\home\neko\suricata-7.0.8\src\tm-threads.c:318)

在 autofp 模式下运行:

[线程 W#04]
FlowWorker (\home\neko\suricata-7.0.8\src\flow-worker.c:567)
TmThreadsSlotVarRun (\home\neko\suricata-7.0.8\src\tm-threads.c:135)
TmThreadsSlotVar (\home\neko\suricata-7.0.8\src\tm-threads.c:471)

[线程 RX#01]
usleep (@usleep:19)
TmThreadWaitForFlag (\home\neko\suricata-7.0.8\src\tm-threads.c:1785)
TmThreadsSlotPktAcqLoop (\home\neko\suricata-7.0.8\src\tm-threads.c:339)

可见,在 single 模式下,是由 pcap 处理逻辑直接调用 FlowWorker() 函数处理 packet;在 autofp 模式下,应该是由 RX 线程产生 packet,然后通过某种管道机制交付给 worker 线程。

我们先细看 single 模式下的 packet 流转。位于调用栈底部的是 TmThreadsSlotPktAcqLoop() 函数,它也是工作线程的入口点:

值得一提的是,线程的入口点是在构造 tv 时确定的。TmThreadCreate() 在构造完 tv 的主要成员之后,会调用 TmThreadSetSlots(),代码如下:

static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
{
    if (name == NULL) {
        if (fn_p == NULL) {
            printf("Both slot name and function pointer can't be NULL inside "
                   "TmThreadSetSlots\n");
            goto error;
        } else {
            name = "custom";
        }
    }

    if (strcmp(name, "varslot") == 0) {
        tv->tm_func = TmThreadsSlotVar;
    } else if (strcmp(name, "pktacqloop") == 0) {
        tv->tm_func = TmThreadsSlotPktAcqLoop;
    } else if (strcmp(name, "management") == 0) {
        tv->tm_func = TmThreadsManagement;
    } else if (strcmp(name, "command") == 0) {
        tv->tm_func = TmThreadsManagement;
    } else if (strcmp(name, "custom") == 0) {
        if (fn_p == NULL)
            goto error;
        tv->tm_func = fn_p;
    } else {
        printf("Error: Slot \"%s\" not supported\n", name);
        goto error;
    }

    return TM_ECODE_OK;

error:
    return TM_ECODE_FAILED;
}

也就是说,一个线程的入口点,由构造 tv 时的 slot name 参数决定。动态调试时,该参数是 pktacqloop,所以入口点是 TmThreadsSlotPktAcqLoop。代码如下:

static void *TmThreadsSlotPktAcqLoop(void *td)
{
    ThreadVars *tv = (ThreadVars *)td;
    TmSlot *s = tv->tm_slots;
    char run = 1;
    TmEcode r = TM_ECODE_OK;
    TmSlot *slot = NULL;

    // 调用 prctl(PR_SET_NAME, tname, 0, 0, 0) 修改线程名
    SCSetThreadName(tv->name);

    if (tv->thread_setup_flags != 0)
        TmThreadSetupOptions(tv);

    // 放弃权限;初始化一些计数器;初始化 packet 存储池
    SCDropCaps(tv);
    CaptureStatsSetup(tv);
    PacketPoolInit();

    // 遍历 slot 链表
    for (slot = s; slot != NULL; slot = slot->slot_next) {
        // 调用各个 slot 的 init 方法
        if (slot->SlotThreadInit != NULL) {
            void *slot_data = NULL;
            r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
            if (r != TM_ECODE_OK) {
                if (r == TM_ECODE_DONE) {
                    EngineDone();
                    TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
                    goto error;
                } else {
                    TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
                    goto error;
                }
            }
            (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
        }

        
        if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
            // 如果 flowworker 在第一个 slot,则取 tv->inq->pq 作为 tv->stream_pq
            tv->stream_pq = tv->inq->pq;
            tv->tm_flowworker = slot;
            SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
            tv->flow_queue = FlowQueueNew();
            if (tv->flow_queue == NULL) {
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
                pthread_exit((void *) -1);
                return NULL;
            }
        } else if (slot->tm_id == TMM_FLOWWORKER) {
            // 如果 flowworker 在其他 slot,则新建一个队列作为 tv->stream_pq
            tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
            if (tv->stream_pq_local == NULL)
                FatalError("failed to alloc PacketQueue");
            SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
            tv->stream_pq = tv->stream_pq_local;
            tv->tm_flowworker = slot;
            SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
            tv->flow_queue = FlowQueueNew();
            if (tv->flow_queue == NULL) {
                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
                pthread_exit((void *) -1);
                return NULL;
            }
        }
    }

    StatsSetupPrivate(tv);

    TmThreadsSetFlag(tv, THV_INIT_DONE);

    // 主循环
    while(run) {
        if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
            TmThreadsSetFlag(tv, THV_PAUSED);
            TmThreadTestThreadUnPaused(tv);
            TmThreadsUnsetFlag(tv, THV_PAUSED);
        }

        // 动态调试时,s->PktAcqLoop 为 ReceivePcapFileLoop
        r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);

        // 如果运行结果是 FAILED 或 DONE,或者一些其他情况,则退出
        if (r == TM_ECODE_FAILED) {
            TmThreadsSetFlag(tv, THV_FAILED);
            run = 0;
        }
        if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
            run = 0;
        }
        if (r == TM_ECODE_DONE) {
            run = 0;
        }
    }
    StatsSyncCounters(tv);

    TmThreadsSetFlag(tv, THV_FLOW_LOOP);

    /* process all pseudo packets the flow timeout may throw at us */
    TmThreadTimeoutLoop(tv, s);

    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
    TmThreadWaitForFlag(tv, THV_DEINIT);

    PacketPoolDestroy();

    // 释放所有 slot
    for (slot = s; slot != NULL; slot = slot->slot_next) {
        if (slot->SlotThreadExitPrintStats != NULL) {
            slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
        }

        if (slot->SlotThreadDeinit != NULL) {
            r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
            if (r != TM_ECODE_OK) {
                TmThreadsSetFlag(tv, THV_CLOSED);
                goto error;
            }
        }
    }

    tv->stream_pq = NULL;
    SCLogDebug("%s ending", tv->name);
    TmThreadsSetFlag(tv, THV_CLOSED);
    pthread_exit((void *) 0);
    return NULL;

error:
    tv->stream_pq = NULL;
    pthread_exit((void *) -1);
    return NULL;
}

可见,worker 线程在执行一个主循环,每次调用 s->PktAcqLoop(),即 ReceivePcapFileLoop 函数。不过,在这次运行中,由于 ReceivePcapFileLoop 返回的是 TM_ECODE_DONE ,故循环只执行一次就退出了。跟进 ReceivePcapFileLoop

TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot)
{
    SCEnter();

    if(unlikely(data == NULL)) {
        SCLogError("pcap file reader thread failed to initialize");

        PcapFileExit(TM_ECODE_FAILED, NULL);

        SCReturnInt(TM_ECODE_DONE);
    }

    TmEcode status = TM_ECODE_OK;
    PcapFileThreadVars *ptv = (PcapFileThreadVars *) data;
    TmSlot *s = (TmSlot *)slot;

    // 把 ptv->shared.slot 指向后一个 slot
    ptv->shared.slot = s->slot_next;
    ptv->shared.cb_result = TM_ECODE_OK;

    // Indicate that the thread is actually running its application level code (i.e., it can poll
    // packets)
    TmThreadsSetFlag(tv, THV_RUNNING);

    if(ptv->is_directory == 0) {
        // 单个 pcap 文件
        SCLogInfo("Starting file run for %s", ptv->behavior.file->filename);

        status = PcapFileDispatch(ptv->behavior.file); // 断点位置
        CleanupPcapFileFromThreadVars(ptv, ptv->behavior.file);
    } else {
        // pcap 目录
        SCLogInfo("Starting directory run for %s", ptv->behavior.directory->filename);
        PcapDirectoryDispatch(ptv->behavior.directory);
        CleanupPcapDirectoryFromThreadVars(ptv, ptv->behavior.directory);
    }

    SCLogDebug("Pcap file loop complete with status %u", status);

    status = PcapFileExit(status, &ptv->shared.last_processed);
    SCReturnInt(status);
}

我们这次运行,只提供了一个 pcap 文件。跟进 PcapFileDispatch()

TmEcode PcapFileDispatch(PcapFileFileVars *ptv)
{
    SCEnter();

    // 初始化
    
    /* initialize all the thread's initial timestamp */
    if (likely(ptv->first_pkt_hdr != NULL)) {
        TmThreadsInitThreadsTimestamp(SCTIME_FROM_TIMEVAL(&ptv->first_pkt_ts));
        PcapFileCallbackLoop((char *)ptv, ptv->first_pkt_hdr,
                (u_char *)ptv->first_pkt_data);
        ptv->first_pkt_hdr = NULL;
        ptv->first_pkt_data = NULL;
    }

    int packet_q_len = 64;
    TmEcode loop_result = TM_ECODE_OK;
    strlcpy(pcap_filename, ptv->filename, sizeof(pcap_filename));

    // 读取 pcap 文件
    
    while (loop_result == TM_ECODE_OK) {
        if (suricata_ctl_flags & SURICATA_STOP) {
            SCReturnInt(TM_ECODE_OK);
        }

        /* make sure we have at least one packet in the packet pool, to prevent
         * us from alloc'ing packets at line rate */
        PacketPoolWait();

        /* Right now we just support reading packets one at a time. */
        // 断点位置
        int r = pcap_dispatch(ptv->pcap_handle, packet_q_len,
                          (pcap_handler)PcapFileCallbackLoop, (u_char *)ptv);

        if (unlikely(r == -1)) {
            /* 错误处理,略 */
        }
        StatsSyncCountersIfSignalled(ptv->shared->tv);
    }

    SCReturnInt(loop_result);
}

其中 pcap_dispatch 函数是 libpcap 提供的。 函数签名是:

int pcap_dispatch(pcap_t *p, int cnt, pcap_handler callback, u_char *user);

pcap_handler 的格式如下:

void user_routine(u_char *user, struct pcap_pkthdr *phdr, u_char *pdata)

本次运行时,callback 为 PcapFileCallbackLoop,它会收到 pdata,内有报文;结构体 pcap_pkthdr 包含 ts, caplen, len。我们跟进 PcapFileCallbackLoop

void PcapFileCallbackLoop(char *user, struct pcap_pkthdr *h, u_char *pkt)
{
    SCEnter();

    PcapFileFileVars *ptv = (PcapFileFileVars *)user;
    Packet *p = PacketGetFromQueueOrAlloc();

    if (unlikely(p == NULL)) {
        SCReturn;
    }
    PACKET_PROFILING_TMM_START(p, TMM_RECEIVEPCAPFILE);

    // 构造 packet 对象
    PKT_SET_SRC(p, PKT_SRC_WIRE);    // p->pkt_src = PKT_SRC_WIRE
    p->ts = SCTIME_FROM_TIMEVAL_UNTRUSTED(&h->ts);
    SCLogDebug("p->ts.tv_sec %" PRIuMAX "", (uintmax_t)SCTIME_SECS(p->ts));
    p->datalink = ptv->datalink;
    p->pcap_cnt = ++pcap_g.cnt;
    p->pcap_v.tenant_id = ptv->shared->tenant_id;
    
    ptv->shared->pkts++;
    ptv->shared->bytes += h->caplen;

    // 把报文原文复制到 packet 对象中
    if (unlikely(PacketCopyData(p, pkt, h->caplen))) {
        TmqhOutputPacketpool(ptv->shared->tv, p);
        PACKET_PROFILING_TMM_END(p, TMM_RECEIVEPCAPFILE);
        SCReturn;
    }

    // 按需求设置 PKT_IGNORE_CHECKSUM flag
    if (pcap_g.checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
        p->flags |= PKT_IGNORE_CHECKSUM;
    } else if (pcap_g.checksum_mode == CHECKSUM_VALIDATION_AUTO) {
        if (ChecksumAutoModeCheck(ptv->shared->pkts, p->pcap_cnt,
                                  SC_ATOMIC_GET(pcap_g.invalid_checksums))) {
            pcap_g.checksum_mode = CHECKSUM_VALIDATION_DISABLE;
            p->flags |= PKT_IGNORE_CHECKSUM;
        }
    }

    PACKET_PROFILING_TMM_END(p, TMM_RECEIVEPCAPFILE);

    // 断点位置
    if (TmThreadsSlotProcessPkt(ptv->shared->tv, ptv->shared->slot, p) != TM_ECODE_OK) {
        pcap_breakloop(ptv->pcap_handle);
        ptv->shared->cb_result = TM_ECODE_FAILED;
    }

    SCReturn;
}

上述代码执行的逻辑是:从存储池中取一个 Packet 对象,填写相关字段,把报文原文复制进 Packet,然后执行 TmThreadsSlotProcessPkt 处理之。目前,我们的 ptv->shared->tv 即为 worker 线程的 tv,而 ptv->shared->slotDecodePcapFile 这个 slot。跟进:

static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
{
    if (s == NULL) {
        tv->tmqh_out(tv, p);
        return TM_ECODE_OK;
    }

    // 断点位置
    TmEcode r = TmThreadsSlotVarRun(tv, p, s);
    if (unlikely(r == TM_ECODE_FAILED)) {
        TmThreadsSlotProcessPktFail(tv, s, p);
        return TM_ECODE_FAILED;
    }

    // 此处 tv->tmqh_out 为 TmqhOutputPacketpool 函数
    tv->tmqh_out(tv, p);

    TmThreadsHandleInjectedPackets(tv);

    return TM_ECODE_OK;
}


/**
 * \brief Separate run function so we can call it recursively.
 */
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
{
    // 调用各个 SlotFunc: DecodePcapFile, FlowWorker
    for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
        PACKET_PROFILING_TMM_START(p, s->tm_id);
        TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
        PACKET_PROFILING_TMM_END(p, s->tm_id);
        DEBUG_VALIDATE_BUG_ON(p->flow != NULL);

        /* handle error */
        if (unlikely(r == TM_ECODE_FAILED)) {
            /* Encountered error.  Return packets to packetpool and return */
            TmThreadsSlotProcessPktFail(tv, s, NULL);
            return TM_ECODE_FAILED;
        }
        if (s->tm_flags & TM_FLAG_DECODE_TM) {
            if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
                    TM_ECODE_OK) {
                return TM_ECODE_FAILED;
            }
        }
    }

    return TM_ECODE_OK;
}

上面两个函数,简而言之,就是依次执行各个 slot。动态调试时,被执行的函数是 DecodePcapFileFlowWorker

综上,在 single 模式中,由 PcapFileDispatch() 调用 libpcap api 读取 pcap 文件,其 callback 函数 PcapFileCallbackLoop() 利用报文产生 Packet 对象,然后调用 TmThreadsSlotProcessPkt() 处理 Packet;此后则依次调用 DecodePcapFileFlowWorker 这两个 slot 函数。

0x03 autofp 模式下的 packet 传递

在 autofp 模式下,RX 线程的调用过程也是 PcapFileDispatch -> PcapFileCallbackLoop -> TmThreadsSlotProcessPkt。不过,slot 中只剩下 DecodePcapFile,解码完成之后返回 TmThreadsSlotProcessPkt

static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
{
    if (s == NULL) {
        tv->tmqh_out(tv, p);
        return TM_ECODE_OK;
    }

    TmEcode r = TmThreadsSlotVarRun(tv, p, s);
    if (unlikely(r == TM_ECODE_FAILED)) {
        TmThreadsSlotProcessPktFail(tv, s, p);
        return TM_ECODE_FAILED;
    }

    // 注意这里 tv->tmqh_out 指向 TmqhOutputFlowHash 函数
    tv->tmqh_out(tv, p);

    TmThreadsHandleInjectedPackets(tv);

    return TM_ECODE_OK;
}

在 single 模式下, tv->tmqh_out 指向 TmqhOutputPacketpool 函数,而在 autofp 模式下,则是 TmqhOutputFlowHash 函数。代码如下:

void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
{
    uint32_t qid;
    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;

    if (p->flags & PKT_WANTS_FLOW) {
        // 如果有 PKT_WANTS_FLOW 标签,则使用 hash 分流
        uint32_t hash = p->flow_hash;
        qid = hash % ctx->size;
    } else {
        // 否则进行 round robin
        qid = ctx->last++;

        if (ctx->last == ctx->size)
            ctx->last = 0;
    }

    // 把 packet 送进对应的队列
    PacketQueue *q = ctx->queues[qid].q;
    SCMutexLock(&q->mutex_q);
    PacketEnqueue(q, p);
    SCCondSignal(&q->cond_q);
    SCMutexUnlock(&q->mutex_q);

    return;
}

可见,RX 线程的 tv 中存放了 outctx 成员,其中记录了各个 PacketQueue。从 pcap 文件中解析出的 packet 会按照策略分流到各个 queue 中,且属于同一个 flow 的 packet 一定会分到相同的队列。

PacketQueue 队列的生产者是 RX 线程,消费者则是 worker 线程。这一次,worker 线程的入口点不是 TmThreadsSlotPktAcqLoop ,而是 TmThreadsSlotVar。代码如下:

static void *TmThreadsSlotVar(void *td)
{
    /* 初始化;放弃权限。与 TmThreadsSlotPktAcqLoop 类似,略 */

    // 遍历 slot 列表
    for (; s != NULL; s = s->slot_next) {
        if (s->SlotThreadInit != NULL) {
            /* slot 初始化,与 TmThreadsSlotPktAcqLoop 类似,略 */
        }

        if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
            // 如果 flowworker 在第一个 slot,则取 tv->inq->pq 作为 tv->stream_pq
            /* 与 TmThreadsSlotPktAcqLoop 类似,略 */
        } else if (s->tm_id == TMM_FLOWWORKER) {
            // 如果 flowworker 在其他 slot,则新建一个队列作为 tv->stream_pq
            /* 与 TmThreadsSlotPktAcqLoop 类似,略 */
        }
    }

    StatsSetupPrivate(tv);
    TmThreadsSetFlag(tv, THV_INIT_DONE | THV_RUNNING);

    s = (TmSlot *)tv->tm_slots;

    while (run) {
        if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
            TmThreadsSetFlag(tv, THV_PAUSED);
            TmThreadTestThreadUnPaused(tv);
            TmThreadsUnsetFlag(tv, THV_PAUSED);
        }

        // 输入
        p = tv->tmqh_in(tv);   // 指向 TmqhInputFlow,从队列中取出一个 packet

        if (unlikely(p == NULL)) {
            /* 错误处理,略 */
        }

        if (p != NULL) {
            /* run the thread module(s) */
            r = TmThreadsSlotVarRun(tv, p, s);  // 断点位置
            
            if (r == TM_ECODE_FAILED) {
                /* 错误处理,略 */
            }

            // 输出
            tv->tmqh_out(tv, p);  // 指向 TmqhOutputPacketpool,同 single 模式

            /* now handle the stream pq packets */
            TmThreadsHandleInjectedPackets(tv);
        }

        if (TmThreadsCheckFlag(tv, THV_KILL)) {
            run = 0;
        }
    } /* while (run) */
    StatsSyncCounters(tv);

    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
    TmThreadWaitForFlag(tv, THV_DEINIT);

    PacketPoolDestroy();

    /* 善后工作,略 */
}

上面的代码先调用 tv->tmqh_in(),从队列中取出一个 packet,然后调用 TmThreadsSlotVarRun() 处理 packet,最后调用 tv->tmqh_out() 输出。而上文已经分析过 TmThreadsSlotVarRun 函数,它就是沿着 slots 链表调用各个 SlotFunc。这一次,slot 链表中只有 FlowWorker

💡
不难发现, TmThreadsSlotVar 代码与 TmThreadsSlotPktAcqLoop 有大量重复。事实上,Suricata 代码中充斥着类似的违背设计模式的实现,且夹杂着各种 hack,注释中也时常见到英语语法错误。平心而论,Suricata 的代码质量并非很高。二次开发时,也需要十分小心。

0x04 总结

本文的主要结论:

  • 在规则文件导入之后、报文处理开始之前,Suricata 会创建所需的线程。
  • 每个合法的 (runmode, custom mode) 二元组都会有特定的启动函数,例如 single 模式下会启动 worker 线程,autofp 模式下会启动 RX 和 worker 线程。
  • 除了上述工作线程以外,还会启动 FlowManager(FM#01)、FlowRecycler(FR#01)、StatsWakeup(CW) 线程。
  • 一个线程的信息存放在 ThreadVars(简称 tv)中。tv 内有线程名、输入输出队列、tm_slots 链表等信息。
  • slots 链表可以视为数据处理的流水线。一个 TmSlot 对应一个线程模块,例如 DecodePcapFileFlowWorker
    - 在 single 模式下,W 线程的 slots 链表里包含了 pcap 收取、pcap 解码、flow worker
    - 在 autofp 模式下,RX 线程的 slots 链表是 pcap 收取、pcap 解码;W 线程的 slots 链表是 flow worker
  • single 模式下,packet 是通过函数调用,层层传递到 flow worker 的;而在 autofp 模式下,RX 线程产生 packet 之后,会按照哈希策略放进对应的队列;W 线程则从自己的队列中获取 packet。详见下图。