Suricata源码阅读(五):线程模型
0x00 线程模块注册
在先前的几篇文章中,我们分析了 Suricata 如何从 rules 文件导入规则、如何构建 siggroup、如何执行检测。现在,我们来分析这些组件在内存中的结构。
此前提到,FlowWorker 在线程中运行。 src/flow-worker.c
暴露了这样一个接口:
void TmModuleFlowWorkerRegister (void)
{
tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats;
tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_STREAM_TM|TM_FLAG_DETECT_TM;
}
它在 src/suricata.c
中被调用:
void RegisterAllModules(void)
{
/* commanders */
TmModuleUnixManagerRegister();
/* managers */
TmModuleFlowManagerRegister();
TmModuleFlowRecyclerRegister();
TmModuleBypassedFlowManagerRegister();
/* nfq */
TmModuleReceiveNFQRegister();
TmModuleVerdictNFQRegister();
TmModuleDecodeNFQRegister();
/* ipfw */
TmModuleReceiveIPFWRegister();
TmModuleVerdictIPFWRegister();
TmModuleDecodeIPFWRegister();
/* pcap live */
TmModuleReceivePcapRegister();
TmModuleDecodePcapRegister();
/* pcap file */
TmModuleReceivePcapFileRegister();
TmModuleDecodePcapFileRegister();
/* af-packet */
TmModuleReceiveAFPRegister();
TmModuleDecodeAFPRegister();
/* af-xdp */
TmModuleReceiveAFXDPRegister();
TmModuleDecodeAFXDPRegister();
/* netmap */
TmModuleReceiveNetmapRegister();
TmModuleDecodeNetmapRegister();
/* pfring */
TmModuleReceivePfringRegister();
TmModuleDecodePfringRegister();
/* dag file */
TmModuleReceiveErfFileRegister();
TmModuleDecodeErfFileRegister();
/* dag live */
TmModuleReceiveErfDagRegister();
TmModuleDecodeErfDagRegister();
/* napatech */
TmModuleNapatechStreamRegister();
TmModuleNapatechDecodeRegister();
/* flow worker */
TmModuleFlowWorkerRegister();
/* respond-reject */
TmModuleRespondRejectRegister();
/* log api */
TmModuleLoggerRegister();
TmModuleStatsLoggerRegister();
TmModuleDebugList();
/* nflog */
TmModuleReceiveNFLOGRegister();
TmModuleDecodeNFLOGRegister();
/* windivert */
TmModuleReceiveWinDivertRegister();
TmModuleVerdictWinDivertRegister();
TmModuleDecodeWinDivertRegister();
/* Dpdk */
TmModuleReceiveDPDKRegister();
TmModuleDecodeDPDKRegister();
}
观察其他模块的注册函数,均为填写 tmm_modules
数组中的对应位置。 TmModule
的相关定义是:
typedef struct TmModule_ {
const char *name;
/** thread handling */
TmEcode (*ThreadInit)(ThreadVars *, const void *, void **);
void (*ThreadExitPrintStats)(ThreadVars *, void *);
TmEcode (*ThreadDeinit)(ThreadVars *, void *);
/** the packet processing function */
TmEcode (*Func)(ThreadVars *, Packet *, void *);
TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
/** terminates the capture loop in PktAcqLoop */
TmEcode (*PktAcqBreakLoop)(ThreadVars *, void *);
/** does a thread still have tasks to complete before it can be killed?
* \retval bool
* \param tv threadvars
* \param thread_data thread module thread data (e.g. FlowWorkerThreadData for FlowWorker) */
bool (*ThreadBusy)(ThreadVars *tv, void *thread_data);
TmEcode (*Management)(ThreadVars *, void *);
/** global Init/DeInit */
TmEcode (*Init)(void);
TmEcode (*DeInit)(void);
#ifdef UNITTESTS
void (*RegisterTests)(void);
#endif
uint8_t cap_flags; /**< Flags to indicate the capability requirement of
the given TmModule */
/* Other flags used by the module */
uint8_t flags;
} TmModule;
TmModule tmm_modules[TMM_SIZE]; // 42
这里指定了各种回调函数,不同的模块所填写的字段亦有区别。例如,FlowWorker 模块填写了 Func
字段,而 ReceivePcapFile 模块填写了 PktAcqLoop
字段。
RegisterAllModules()
是被 PostConfLoadedSetup()
调用的。我们在第二篇文章中见过它,它是在“配置文件读入之后、规则文件读入之前”执行的。跟进:
int PostConfLoadedSetup(SCInstance *suri)
{
/* do this as early as possible #1577 #1955 */
#ifdef HAVE_LUAJIT
if (LuajitSetupStatesPool() != 0) {
SCReturnInt(TM_ECODE_FAILED);
}
#endif
/* load the pattern matchers */
// 把 AC、ACBS、ACTile 注册到 mpm_table[]
MpmTableSetup();
// 把 BM 注册到 spm_table[]
SpmTableSetup();
// 按照配置文件,处理 offloading 相关设置
int disable_offloading;
if (ConfGetBool("capture.disable-offloading", &disable_offloading) == 0)
disable_offloading = 1;
if (disable_offloading) {
LiveSetOffloadDisable();
} else {
LiveSetOffloadWarn();
}
// 是否启用 checksum 验证
if (suri->checksum_validation == -1) {
const char *cv = NULL;
if (ConfGet("capture.checksum-validation", &cv) == 1) {
if (strcmp(cv, "none") == 0) {
suri->checksum_validation = 0;
} else if (strcmp(cv, "all") == 0) {
suri->checksum_validation = 1;
}
}
}
switch (suri->checksum_validation) {
case 0:
ConfSet("stream.checksum-validation", "0");
break;
case 1:
ConfSet("stream.checksum-validation", "1");
break;
}
// 如果现在正处于 custom mode,则把当前 runmode 写回 conf 树
if (suri->runmode_custom_mode) {
ConfSet("runmode", suri->runmode_custom_mode);
}
// 下面是一些杂务
StorageInit();
#ifdef HAVE_PACKET_EBPF
if (suri->run_mode == RUNMODE_AFP_DEV) {
EBPFRegisterExtension();
LiveDevRegisterExtension();
}
#endif
RegisterFlowBypassInfo();
MacSetRegisterFlowStorage();
#ifdef HAVE_PLUGINS
SCPluginsLoad(suri->capture_plugin_name, suri->capture_plugin_args);
#endif
LiveDeviceFinalize(); // must be after EBPF extension registration
RunModeEngineIsIPS(
suricata.run_mode, suricata.runmode_custom_mode, suricata.capture_plugin_name);
if (EngineModeIsUnknown()) { // if still uninitialized, set the default
SCLogInfo("Setting engine mode to IDS mode by default");
EngineModeSetIDS();
}
SetMasterExceptionPolicy();
/* Must occur prior to output mod registration
and app layer setup. */
FeatureTrackingRegister();
AppLayerSetup();
/* Suricata will use this umask if provided. By default it will use the
umask passed on from the shell. */
const char *custom_umask;
if (ConfGet("umask", &custom_umask) == 1) {
uint16_t mask;
if (StringParseUint16(&mask, 8, (uint16_t)strlen(custom_umask), custom_umask) > 0) {
umask((mode_t)mask);
}
}
if (ConfigGetCaptureValue(suri) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
#ifdef NFQ
if (suri->run_mode == RUNMODE_NFQ)
NFQInitConfig(false);
#endif
/* Load the Host-OS lookup. */
SCHInfoLoadFromConfig();
if (suri->run_mode == RUNMODE_ENGINE_ANALYSIS) {
SCLogInfo("== Carrying out Engine Analysis ==");
const char *temp = NULL;
if (ConfGet("engine-analysis", &temp) == 0) {
SCLogInfo("no engine-analysis parameter(s) defined in conf file. "
"Please define/enable them in the conf to use this "
"feature.");
SCReturnInt(TM_ECODE_FAILED);
}
}
/* hardcoded initialization code */
SigTableSetup(); /* load the rule keywords */
SigTableApplyStrictCommandLineOption(suri->strict_rule_parsing_string);
TmqhSetup();
TagInitCtx();
PacketAlertTagInit();
ThresholdInit();
HostBitInitCtx();
IPPairBitInitCtx();
if (DetectAddressTestConfVars() < 0) {
SCLogError(
"basic address vars test failed. Please check %s for errors", suri->conf_filename);
SCReturnInt(TM_ECODE_FAILED);
}
if (DetectPortTestConfVars() < 0) {
SCLogError("basic port vars test failed. Please check %s for errors", suri->conf_filename);
SCReturnInt(TM_ECODE_FAILED);
}
// 此处注册所有模块
RegisterAllModules();
AppLayerHtpNeedFileInspection();
StorageFinalize();
TmModuleRunInit();
if (MayDaemonize(suri) != TM_ECODE_OK)
SCReturnInt(TM_ECODE_FAILED);
if (InitSignalHandler(suri) != TM_ECODE_OK)
SCReturnInt(TM_ECODE_FAILED);
/* Check for the existence of the default logging directory which we pick
* from suricata.yaml. If not found, shut the engine down */
suri->log_dir = ConfigGetLogDirectory();
if (ConfigCheckLogDirectoryExists(suri->log_dir) != TM_ECODE_OK) {
SCLogError("The logging directory \"%s\" "
"supplied by %s (default-log-dir) doesn't exist. "
"Shutting down the engine",
suri->log_dir, suri->conf_filename);
SCReturnInt(TM_ECODE_FAILED);
}
if (!IsLogDirectoryWritable(suri->log_dir)) {
SCLogError("The logging directory \"%s\" "
"supplied by %s (default-log-dir) is not writable. "
"Shutting down the engine",
suri->log_dir, suri->conf_filename);
SCReturnInt(TM_ECODE_FAILED);
}
if (suri->disabled_detect) {
SCLogConfig("detection engine disabled");
/* disable raw reassembly */
(void)ConfSetFinal("stream.reassembly.raw", "false");
}
HostInitConfig(HOST_VERBOSE);
CoredumpLoadConfig();
DecodeGlobalConfig();
/* hostmode depends on engine mode being set */
PostConfLoadedSetupHostMode();
PreRunInit(suri->run_mode);
SCReturnInt(TM_ECODE_OK);
}
至此,我们知道了各个模块的注册过程。然而,所谓“注册”只是把模块的相关属性(模块名、回调函数地址等)写进了表,我们需要进一步追踪这些线程是如何启动的。
0x01 线程启动
修改 launch.json
,在 pthread_create()
下断点:
"initCommands": [
"breakpoint set -n pthread_create"
]
定位到 SuricataMain()
:
观察代码:
int SuricataMain(int argc, char **argv)
{
/* 略 */
// 下面的函数载入了规则文件,即第二篇文章所分析的内容
PostConfLoadedDetectSetup(&suricata);
if (suricata.run_mode == RUNMODE_ENGINE_ANALYSIS) {
goto out;
} else if (suricata.run_mode == RUNMODE_CONF_TEST){
SCLogNotice("Configuration provided was successfully loaded. Exiting.");
goto out;
} else if (suricata.run_mode == RUNMODE_DUMP_FEATURES) {
FeatureDump();
goto out;
}
SystemHugepageSnapshot *prerun_snap = NULL;
if (run_mode == RUNMODE_DPDK)
prerun_snap = SystemHugepageSnapshotCreate();
SCSetStartTime(&suricata);
// 创建各个线程
RunModeDispatch(suricata.run_mode, suricata.runmode_custom_mode,
suricata.capture_plugin_name, suricata.capture_plugin_args);
if (suricata.run_mode != RUNMODE_UNIX_SOCKET) {
UnixManagerThreadSpawnNonRunmode(suricata.unix_socket_enabled);
}
/* Wait till all the threads have been initialized */
if (TmThreadWaitOnThreadInit() == TM_ECODE_FAILED) {
FatalError("Engine initialization failed, "
"aborting...");
}
/* nproc 相关,略 */
SC_ATOMIC_SET(engine_stage, SURICATA_RUNTIME);
PacketPoolPostRunmodes();
/* Un-pause all the paused threads */
TmThreadContinueThreads();
/* Must ensure all threads are fully operational before continuing with init process */
if (TmThreadWaitOnThreadRunning() != TM_ECODE_OK) {
exit(EXIT_FAILURE);
}
/* Print notice and send OS specific notification of threads in running state */
OnNotifyRunning();
PostRunStartedDetectSetup(&suricata);
/* dpdk 相关,略 */
SCPledge();
// 主循环,每 10ms 执行一次
SuricataMainLoop(&suricata);
/* Update the engine stage/status flag */
SC_ATOMIC_SET(engine_stage, SURICATA_DEINIT);
UnixSocketKillSocketThread();
PostRunDeinit(suricata.run_mode, &suricata.start_time);
/* kill remaining threads */
TmThreadKillThreads();
out:
GlobalsDestroy(&suricata);
exit(EXIT_SUCCESS);
}
现在跟进创建线程的函数 RunModeDispatch()
:
void RunModeDispatch(int runmode, const char *custom_mode, const char *capture_plugin_name,
const char *capture_plugin_args)
{
char *local_custom_mode = NULL;
// 确定 custom_mode 字符串,若未指定则使用默认模式
// 例如,runmode=RUNMODE_PCAP_FILE 时,默认模式是 autofp
if (custom_mode == NULL) {
custom_mode = RunModeGetConfOrDefault(runmode, capture_plugin_name);
if (custom_mode == NULL)
FatalError("Unknown runtime mode. Aborting");
}
// 查 runmodes 表获取 RunMode 对象
RunMode *mode = RunModeGetCustomMode(runmode, custom_mode);
if (mode == NULL) {
SCLogError("The custom type \"%s\" doesn't exist "
"for this runmode type \"%s\". Please use --list-runmodes to "
"see available custom types for this runmode",
custom_mode, RunModeTranslateModeToName(runmode));
exit(EXIT_FAILURE);
}
/* Export the custom mode */
if (active_runmode) {
SCFree(active_runmode);
}
active_runmode = SCStrdup(custom_mode);
if (unlikely(active_runmode == NULL)) {
FatalError("Unable to dup active mode");
}
if (strcasecmp(active_runmode, "autofp") == 0) {
TmqhFlowPrintAutofpHandler();
}
// 在此创建了工作线程
mode->RunModeFunc();
if (local_custom_mode != NULL)
SCFree(local_custom_mode);
/* Check if the alloted queues have at least 1 reader and writer */
TmValidateQueueState();
// 创建 FlowManager(FM#01)、FlowRecycler(FR#01)、StatsWakeup(CW) 线程
if (runmode != RUNMODE_UNIX_SOCKET) {
/* spawn management threads */
FlowManagerThreadSpawn();
FlowRecyclerThreadSpawn();
if (RunModeNeedsBypassManager()) {
BypassedFlowManagerThreadSpawn();
}
StatsSpawnThreads();
}
}
可以看到,创建工作线程的代码是 mode->RunModeFunc()
,它是由各个 runmode 自己定义的;另外,在 runmode 的线程创建完成之后,还会创建 FlowManager、FlowRecycler、StatsWakeup 线程。
我们来看 RunMode 对象的定义:
typedef struct RunMode_ {
/* the runmode type */
enum RunModes runmode;
const char *name;
const char *description;
/* runmode function */
int (*RunModeFunc)(void);
void (*RunModeIsIPSEnabled)(void);
} RunMode;
typedef struct RunModes_ {
int cnt;
RunMode *runmodes;
} RunModes;
static RunModes runmodes[RUNMODE_USER_MAX]; // 18
可见 runmodes
表中有 18 个标准 runmode,如 RUNMODE_PCAP_DEV, RUNMODE_PCAP_FILE
等。而一个 runmode 可以支持多个 custom mode,例如 RUNMODE_PCAP_FILE
这个 runmode 支持 single
、autofp
两个 custom mode。
p runmodes[2].runmodes[0]
(RunMode) {
runmode = RUNMODE_PCAP_FILE
name = 0x00005555560c9750 "single"
description = 0x00005555560c9770 "Single threaded pcap file mode"
RunModeFunc = 0x000055555591f9f0 (suricata`RunModeFilePcapSingle at runmode-pcap-file.c:56)
RunModeIsIPSEnabled = 0x0000000000000000
}
p runmodes[2].runmodes[1]
(RunMode) {
runmode = RUNMODE_PCAP_FILE
name = 0x00005555560c9800 "autofp"
description = 0x00005555560c9820 "Multi-threaded pcap file mode. Packets from each flow are assigned to a consistent detection thread"
RunModeFunc = 0x000055555591fc90 (suricata`RunModeFilePcapAutoFp at runmode-pcap-file.c:121)
RunModeIsIPSEnabled = 0x0000000000000000
}
这张表的注册过程我们暂且不论。总之,目前 RunModeFunc
指向了 RunModeFilePcapSingle()
函数,跟进:
/**
* \brief Single thread version of the Pcap file processing.
*/
int RunModeFilePcapSingle(void)
{
const char *file = NULL;
char tname[TM_THREAD_NAME_MAX];
if (ConfGet("pcap-file.file", &file) == 0) {
FatalError("Failed retrieving pcap-file from Conf");
}
// 设置 live_time_tracking = false
TimeModeSetOffline();
// 清零几个变量
PcapFileGlobalInit();
snprintf(tname, sizeof(tname), "%s#01", thread_name_single); // "W#01"
// 这个函数的参数:name, inq_name, inqh_name, outq_name, outqh_name, slots
// 它返回一个 ThreadVars
ThreadVars *tv = TmThreadCreatePacketHandler(tname,
"packetpool", "packetpool",
"packetpool", "packetpool",
"pktacqloop");
if (tv == NULL) {
FatalError("threading setup failed");
}
// 下面,根据 name 查 tmm_modules 表,获得 TmModule 对象,创建 slot
TmModule *tm_module = TmModuleGetByName("ReceivePcapFile");
if (tm_module == NULL) {
FatalError("TmModuleGetByName failed for ReceivePcap");
}
TmSlotSetFuncAppend(tv, tm_module, file);
tm_module = TmModuleGetByName("DecodePcapFile");
if (tm_module == NULL) {
FatalError("TmModuleGetByName DecodePcap failed");
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
FatalError("TmModuleGetByName for FlowWorker failed");
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
// 设置 cpu_affinity
TmThreadSetCPU(tv, WORKER_CPU_SET);
// 创建线程
if (TmThreadSpawn(tv) != TM_ECODE_OK) {
FatalError("TmThreadSpawn failed");
}
return 0;
}
上面的函数中,首先创建了一个 ThreadVars
结构,然后给几个模块添加 slot,最后调用 TmThreadSpawn(tv)
创建线程。我们细细观察 ThreadVars
:
/** \brief Per thread variable structure */
typedef struct ThreadVars_ {
pthread_t t;
/** function pointer to the function that runs the packet pipeline for
* this thread. It is passed directly to pthread_create(), hence the
* void pointers in and out. */
void *(*tm_func)(void *);
char name[16];
char *printable_name;
char *thread_group_name;
uint8_t thread_setup_flags;
/** the type of thread as defined in tm-threads.h (TVT_PPT, TVT_MGMT) */
uint8_t type;
uint16_t cpu_affinity; /** cpu or core number to set affinity to */
int thread_priority; /** priority (real time) for this thread. Look at threads.h */
/** TmModule::flags for each module part of this thread */
uint8_t tmm_flags;
uint8_t cap_flags; /**< Flags to indicate the capabilities of all the
TmModules registered under this thread */
uint8_t inq_id;
uint8_t outq_id;
/** local id */
int id;
/** incoming queue and handler */
Tmq *inq;
struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);
SC_ATOMIC_DECLARE(uint32_t, flags);
/** list of of TmSlot objects together forming the packet pipeline. */
struct TmSlot_ *tm_slots;
/** pointer to the flowworker in the pipeline. Used as starting point
* for injected packets. Can be NULL if the flowworker is not part
* of this thread. */
struct TmSlot_ *tm_flowworker;
/** outgoing queue and handler */
Tmq *outq;
void *outctx;
void (*tmqh_out)(struct ThreadVars_ *, struct Packet_ *);
/** Queue for decoders to temporarily store extra packets they
* generate. These packets are generated as part of the tunnel
* handling, and are processed directly after the "real" packet
* from the current position in the pipeline. */
PacketQueueNoLock decode_pq;
/** Stream packet queue for flow time out injection. Either a pointer to the
* workers input queue or to stream_pq_local */
struct PacketQueue_ *stream_pq;
struct PacketQueue_ *stream_pq_local;
/* counters */
/** private counter store: counter updates modify this */
StatsPrivateThreadContext perf_private_ctx;
/** pointer to the next thread */
struct ThreadVars_ *next;
/** public counter store: counter syncs update this */
StatsPublicThreadContext perf_public_ctx;
/* mutex and condition used by management threads */
SCCtrlMutex *ctrl_mutex;
SCCtrlCondT *ctrl_cond;
struct FlowQueue_ *flow_queue;
bool break_loop;
} ThreadVars;
可见 ThreadVars
里面就是本线程的相关变量。之前的代码中,针对 ReceivePcapFile, DecodePcapFile, FlowWorker
这三个 tm_module
创建了 slot,而 slot 的定义如下:
typedef struct TmSlot_ {
// 下面这个函数指针是 slot 的核心属性,语义是 SlotFunc 或 PktAcqLoop 或 Management
union {
TmSlotFunc SlotFunc;
TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
TmEcode (*Management)(ThreadVars *, void *);
};
// 链表结构
struct TmSlot_ *slot_next;
SC_ATOMIC_DECLARE(void *, slot_data);
/** copy of the TmModule::flags */
uint8_t tm_flags;
/* store the thread module id */
int tm_id;
TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);
/* data storage */
const void *slot_initdata;
} TmSlot;
slot 会形成一个链表结构。在三次 append 之后,tv 里面的三个 slot 的函数指针分别指向: ReceivePcapFileLoop, DecodePcapFile, FlowWorker
。向 tv 中添加 slot 的逻辑如下:
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
{
TmSlot *slot = SCMalloc(sizeof(TmSlot));
if (unlikely(slot == NULL))
return;
memset(slot, 0, sizeof(TmSlot));
SC_ATOMIC_INITPTR(slot->slot_data);
slot->SlotThreadInit = tm->ThreadInit;
slot->slot_initdata = data;
// 使用模块里的函数指针,填写 slot 里的函数指针
if (tm->Func) {
slot->SlotFunc = tm->Func;
} else if (tm->PktAcqLoop) {
slot->PktAcqLoop = tm->PktAcqLoop;
if (tm->PktAcqBreakLoop) {
tv->break_loop = true;
}
} else if (tm->Management) {
slot->Management = tm->Management;
}
slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats;
slot->SlotThreadDeinit = tm->ThreadDeinit;
slot->tm_id = TmModuleGetIDForTM(tm);
slot->tm_flags |= tm->flags;
tv->tmm_flags |= tm->flags;
tv->cap_flags |= tm->cap_flags;
// 链表插入
if (tv->tm_slots == NULL) {
tv->tm_slots = slot;
} else {
TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
/* get the last slot */
for ( ; a != NULL; a = a->slot_next) {
b = a;
}
/* append the new slot */
if (b != NULL) {
b->slot_next = slot;
}
}
return;
}
slot 链表组装完成之后,调用 TmThreadSpawn()
创建线程:
/**
* \brief Spawns a thread associated with the ThreadVars instance tv
*
* \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
*/
TmEcode TmThreadSpawn(ThreadVars *tv)
{
pthread_attr_t attr;
if (tv->tm_func == NULL) {
FatalError("No thread function set");
}
/* Initialize and set thread detached attribute */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
/* Adjust thread stack size if configured */
if (threading_set_stack_size) {
SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
threading_set_stack_size);
}
}
// 创建线程
int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
if (rc) {
FatalError("Unable to create thread with pthread_create(): retval %d: %s", rc,
strerror(errno));
}
TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE);
TmThreadAppend(tv, tv->type);
return TM_ECODE_OK;
}
我们应该特别关注 tv 中的 slot 结构。不同的 runmode 和 custom mode,产生的 slot 也很不一样。例如,当 runmode 为 pcap、custom mode 为 autofp 时,其 RunModeFunc
如下:
int RunModeFilePcapAutoFp(void)
{
/* 初始化,略 */
uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
if (ncpus > 0)
cpu = 1;
int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
if (thread_max == 0)
thread_max = ncpus * threading_detect_ratio;
if (thread_max < 1)
thread_max = 1;
if (thread_max > 1024)
thread_max = 1024;
queues = RunmodeAutoFpCreatePickupQueuesString(thread_max);
if (queues == NULL) {
FatalError("RunmodeAutoFpCreatePickupQueuesString failed");
}
snprintf(tname, sizeof(tname), "%s#01", thread_name_autofp);
// 构造 tv(这个线程用于解析 pcap 文件)
ThreadVars *tv_receivepcap =
TmThreadCreatePacketHandler(tname,
"packetpool", "packetpool",
queues, "flow",
"pktacqloop");
SCFree(queues);
if (tv_receivepcap == NULL) {
FatalError("threading setup failed");
}
// 添加 slot:ReceivePcapFile
TmModule *tm_module = TmModuleGetByName("ReceivePcapFile");
if (tm_module == NULL) {
FatalError("TmModuleGetByName failed for ReceivePcap");
}
TmSlotSetFuncAppend(tv_receivepcap, tm_module, file);
// 添加 slot:DecodePcapFile
tm_module = TmModuleGetByName("DecodePcapFile");
if (tm_module == NULL) {
FatalError("TmModuleGetByName DecodePcap failed");
}
TmSlotSetFuncAppend(tv_receivepcap, tm_module, NULL);
TmThreadSetCPU(tv_receivepcap, RECEIVE_CPU_SET);
// 创建 pcap 解析线程
if (TmThreadSpawn(tv_receivepcap) != TM_ECODE_OK) {
FatalError("TmThreadSpawn failed");
}
for (thread = 0; thread < (uint16_t)thread_max; thread++) {
snprintf(tname, sizeof(tname), "%s#%02d", thread_name_workers, thread + 1);
snprintf(qname, sizeof(qname), "pickup%d", thread + 1);
SCLogDebug("tname %s, qname %s", tname, qname);
SCLogDebug("Assigning %s affinity to cpu %u", tname, cpu);
// 构造 tv(这个线程用于检测)
ThreadVars *tv_detect_ncpu =
TmThreadCreatePacketHandler(tname,
qname, "flow",
"packetpool", "packetpool",
"varslot");
if (tv_detect_ncpu == NULL) {
FatalError("TmThreadsCreate failed");
}
// 添加 slot:FlowWorker
tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
FatalError("TmModuleGetByName for FlowWorker failed");
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
TmThreadSetGroupName(tv_detect_ncpu, "Detect");
TmThreadSetCPU(tv_detect_ncpu, WORKER_CPU_SET);
// 创建检测线程
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
FatalError("TmThreadSpawn failed");
}
if ((cpu + 1) == ncpus)
cpu = 0;
else
cpu++;
}
return 0;
}
在 autofp 模式下,有一个 pcap 文件解析线程(slot 为 ReceivePcapFile
和 DecodePcapFile
),标记为 RX#01
;另有若干个 worker 线程(slot 为 FlowWorker
),标记为 W#0x
。如下图所示。
现在,对于任何 runmode 与 custom mode 的组合,我们都能找到负责“启动所有线程”的逻辑——翻查 runmodes
数组,找到特定 RunMode
,观察 RunModeFunc
指向的启动函数。
下一个问题是,数据是如何从 pcap 文件流转到 flow worker 的。
0x02 single 模式下的 packet 传递
我们已经在前一篇文章中分析了 flow worker 本身。现在我们在 single 模式下运行程序,在 FlowWorker
下断,看一眼调用栈:
[线程 W#01]
FlowWorker (\home\neko\suricata-7.0.8\src\flow-worker.c:567)
TmThreadsSlotVarRun (\home\neko\suricata-7.0.8\src\tm-threads.c:135)
TmThreadsSlotProcessPkt (\home\neko\suricata-7.0.8\src\tm-threads.h:200)
PcapFileCallbackLoop (\home\neko\suricata-7.0.8\src\source-pcap-file-helper.c:108)
PcapFileDispatch (\home\neko\suricata-7.0.8\src\source-pcap-file-helper.c:133)
ReceivePcapFileLoop (\home\neko\suricata-7.0.8\src\source-pcap-file.c:180)
TmThreadsSlotPktAcqLoop (\home\neko\suricata-7.0.8\src\tm-threads.c:318)
在 autofp 模式下运行:
[线程 W#04]
FlowWorker (\home\neko\suricata-7.0.8\src\flow-worker.c:567)
TmThreadsSlotVarRun (\home\neko\suricata-7.0.8\src\tm-threads.c:135)
TmThreadsSlotVar (\home\neko\suricata-7.0.8\src\tm-threads.c:471)
[线程 RX#01]
usleep (@usleep:19)
TmThreadWaitForFlag (\home\neko\suricata-7.0.8\src\tm-threads.c:1785)
TmThreadsSlotPktAcqLoop (\home\neko\suricata-7.0.8\src\tm-threads.c:339)
可见,在 single 模式下,是由 pcap 处理逻辑直接调用 FlowWorker()
函数处理 packet;在 autofp 模式下,应该是由 RX 线程产生 packet,然后通过某种管道机制交付给 worker 线程。
我们先细看 single 模式下的 packet 流转。位于调用栈底部的是 TmThreadsSlotPktAcqLoop()
函数,它也是工作线程的入口点:
值得一提的是,线程的入口点是在构造 tv 时确定的。TmThreadCreate()
在构造完 tv 的主要成员之后,会调用 TmThreadSetSlots()
,代码如下:
static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
{
if (name == NULL) {
if (fn_p == NULL) {
printf("Both slot name and function pointer can't be NULL inside "
"TmThreadSetSlots\n");
goto error;
} else {
name = "custom";
}
}
if (strcmp(name, "varslot") == 0) {
tv->tm_func = TmThreadsSlotVar;
} else if (strcmp(name, "pktacqloop") == 0) {
tv->tm_func = TmThreadsSlotPktAcqLoop;
} else if (strcmp(name, "management") == 0) {
tv->tm_func = TmThreadsManagement;
} else if (strcmp(name, "command") == 0) {
tv->tm_func = TmThreadsManagement;
} else if (strcmp(name, "custom") == 0) {
if (fn_p == NULL)
goto error;
tv->tm_func = fn_p;
} else {
printf("Error: Slot \"%s\" not supported\n", name);
goto error;
}
return TM_ECODE_OK;
error:
return TM_ECODE_FAILED;
}
也就是说,一个线程的入口点,由构造 tv 时的 slot name 参数决定。动态调试时,该参数是 pktacqloop
,所以入口点是 TmThreadsSlotPktAcqLoop
。代码如下:
static void *TmThreadsSlotPktAcqLoop(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
char run = 1;
TmEcode r = TM_ECODE_OK;
TmSlot *slot = NULL;
// 调用 prctl(PR_SET_NAME, tname, 0, 0, 0) 修改线程名
SCSetThreadName(tv->name);
if (tv->thread_setup_flags != 0)
TmThreadSetupOptions(tv);
// 放弃权限;初始化一些计数器;初始化 packet 存储池
SCDropCaps(tv);
CaptureStatsSetup(tv);
PacketPoolInit();
// 遍历 slot 链表
for (slot = s; slot != NULL; slot = slot->slot_next) {
// 调用各个 slot 的 init 方法
if (slot->SlotThreadInit != NULL) {
void *slot_data = NULL;
r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
if (r != TM_ECODE_OK) {
if (r == TM_ECODE_DONE) {
EngineDone();
TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
goto error;
} else {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
goto error;
}
}
(void)SC_ATOMIC_SET(slot->slot_data, slot_data);
}
if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
// 如果 flowworker 在第一个 slot,则取 tv->inq->pq 作为 tv->stream_pq
tv->stream_pq = tv->inq->pq;
tv->tm_flowworker = slot;
SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
tv->flow_queue = FlowQueueNew();
if (tv->flow_queue == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit((void *) -1);
return NULL;
}
} else if (slot->tm_id == TMM_FLOWWORKER) {
// 如果 flowworker 在其他 slot,则新建一个队列作为 tv->stream_pq
tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
if (tv->stream_pq_local == NULL)
FatalError("failed to alloc PacketQueue");
SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
tv->stream_pq = tv->stream_pq_local;
tv->tm_flowworker = slot;
SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
tv->flow_queue = FlowQueueNew();
if (tv->flow_queue == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit((void *) -1);
return NULL;
}
}
}
StatsSetupPrivate(tv);
TmThreadsSetFlag(tv, THV_INIT_DONE);
// 主循环
while(run) {
if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
// 动态调试时,s->PktAcqLoop 为 ReceivePcapFileLoop
r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
// 如果运行结果是 FAILED 或 DONE,或者一些其他情况,则退出
if (r == TM_ECODE_FAILED) {
TmThreadsSetFlag(tv, THV_FAILED);
run = 0;
}
if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
run = 0;
}
if (r == TM_ECODE_DONE) {
run = 0;
}
}
StatsSyncCounters(tv);
TmThreadsSetFlag(tv, THV_FLOW_LOOP);
/* process all pseudo packets the flow timeout may throw at us */
TmThreadTimeoutLoop(tv, s);
TmThreadsSetFlag(tv, THV_RUNNING_DONE);
TmThreadWaitForFlag(tv, THV_DEINIT);
PacketPoolDestroy();
// 释放所有 slot
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadExitPrintStats != NULL) {
slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
}
if (slot->SlotThreadDeinit != NULL) {
r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
if (r != TM_ECODE_OK) {
TmThreadsSetFlag(tv, THV_CLOSED);
goto error;
}
}
}
tv->stream_pq = NULL;
SCLogDebug("%s ending", tv->name);
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
error:
tv->stream_pq = NULL;
pthread_exit((void *) -1);
return NULL;
}
可见,worker 线程在执行一个主循环,每次调用 s->PktAcqLoop()
,即 ReceivePcapFileLoop
函数。不过,在这次运行中,由于 ReceivePcapFileLoop
返回的是 TM_ECODE_DONE
,故循环只执行一次就退出了。跟进 ReceivePcapFileLoop
:
TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
if(unlikely(data == NULL)) {
SCLogError("pcap file reader thread failed to initialize");
PcapFileExit(TM_ECODE_FAILED, NULL);
SCReturnInt(TM_ECODE_DONE);
}
TmEcode status = TM_ECODE_OK;
PcapFileThreadVars *ptv = (PcapFileThreadVars *) data;
TmSlot *s = (TmSlot *)slot;
// 把 ptv->shared.slot 指向后一个 slot
ptv->shared.slot = s->slot_next;
ptv->shared.cb_result = TM_ECODE_OK;
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
if(ptv->is_directory == 0) {
// 单个 pcap 文件
SCLogInfo("Starting file run for %s", ptv->behavior.file->filename);
status = PcapFileDispatch(ptv->behavior.file); // 断点位置
CleanupPcapFileFromThreadVars(ptv, ptv->behavior.file);
} else {
// pcap 目录
SCLogInfo("Starting directory run for %s", ptv->behavior.directory->filename);
PcapDirectoryDispatch(ptv->behavior.directory);
CleanupPcapDirectoryFromThreadVars(ptv, ptv->behavior.directory);
}
SCLogDebug("Pcap file loop complete with status %u", status);
status = PcapFileExit(status, &ptv->shared.last_processed);
SCReturnInt(status);
}
我们这次运行,只提供了一个 pcap 文件。跟进 PcapFileDispatch()
:
TmEcode PcapFileDispatch(PcapFileFileVars *ptv)
{
SCEnter();
// 初始化
/* initialize all the thread's initial timestamp */
if (likely(ptv->first_pkt_hdr != NULL)) {
TmThreadsInitThreadsTimestamp(SCTIME_FROM_TIMEVAL(&ptv->first_pkt_ts));
PcapFileCallbackLoop((char *)ptv, ptv->first_pkt_hdr,
(u_char *)ptv->first_pkt_data);
ptv->first_pkt_hdr = NULL;
ptv->first_pkt_data = NULL;
}
int packet_q_len = 64;
TmEcode loop_result = TM_ECODE_OK;
strlcpy(pcap_filename, ptv->filename, sizeof(pcap_filename));
// 读取 pcap 文件
while (loop_result == TM_ECODE_OK) {
if (suricata_ctl_flags & SURICATA_STOP) {
SCReturnInt(TM_ECODE_OK);
}
/* make sure we have at least one packet in the packet pool, to prevent
* us from alloc'ing packets at line rate */
PacketPoolWait();
/* Right now we just support reading packets one at a time. */
// 断点位置
int r = pcap_dispatch(ptv->pcap_handle, packet_q_len,
(pcap_handler)PcapFileCallbackLoop, (u_char *)ptv);
if (unlikely(r == -1)) {
/* 错误处理,略 */
}
StatsSyncCountersIfSignalled(ptv->shared->tv);
}
SCReturnInt(loop_result);
}
其中 pcap_dispatch
函数是 libpcap 提供的。 函数签名是:
int pcap_dispatch(pcap_t *p, int cnt, pcap_handler callback, u_char *user);
pcap_handler
的格式如下:
void user_routine(u_char *user, struct pcap_pkthdr *phdr, u_char *pdata)
本次运行时,callback 为 PcapFileCallbackLoop
,它会收到 pdata
,内有报文;结构体 pcap_pkthdr
包含 ts, caplen, len
。我们跟进 PcapFileCallbackLoop
:
void PcapFileCallbackLoop(char *user, struct pcap_pkthdr *h, u_char *pkt)
{
SCEnter();
PcapFileFileVars *ptv = (PcapFileFileVars *)user;
Packet *p = PacketGetFromQueueOrAlloc();
if (unlikely(p == NULL)) {
SCReturn;
}
PACKET_PROFILING_TMM_START(p, TMM_RECEIVEPCAPFILE);
// 构造 packet 对象
PKT_SET_SRC(p, PKT_SRC_WIRE); // p->pkt_src = PKT_SRC_WIRE
p->ts = SCTIME_FROM_TIMEVAL_UNTRUSTED(&h->ts);
SCLogDebug("p->ts.tv_sec %" PRIuMAX "", (uintmax_t)SCTIME_SECS(p->ts));
p->datalink = ptv->datalink;
p->pcap_cnt = ++pcap_g.cnt;
p->pcap_v.tenant_id = ptv->shared->tenant_id;
ptv->shared->pkts++;
ptv->shared->bytes += h->caplen;
// 把报文原文复制到 packet 对象中
if (unlikely(PacketCopyData(p, pkt, h->caplen))) {
TmqhOutputPacketpool(ptv->shared->tv, p);
PACKET_PROFILING_TMM_END(p, TMM_RECEIVEPCAPFILE);
SCReturn;
}
// 按需求设置 PKT_IGNORE_CHECKSUM flag
if (pcap_g.checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
p->flags |= PKT_IGNORE_CHECKSUM;
} else if (pcap_g.checksum_mode == CHECKSUM_VALIDATION_AUTO) {
if (ChecksumAutoModeCheck(ptv->shared->pkts, p->pcap_cnt,
SC_ATOMIC_GET(pcap_g.invalid_checksums))) {
pcap_g.checksum_mode = CHECKSUM_VALIDATION_DISABLE;
p->flags |= PKT_IGNORE_CHECKSUM;
}
}
PACKET_PROFILING_TMM_END(p, TMM_RECEIVEPCAPFILE);
// 断点位置
if (TmThreadsSlotProcessPkt(ptv->shared->tv, ptv->shared->slot, p) != TM_ECODE_OK) {
pcap_breakloop(ptv->pcap_handle);
ptv->shared->cb_result = TM_ECODE_FAILED;
}
SCReturn;
}
上述代码执行的逻辑是:从存储池中取一个 Packet 对象,填写相关字段,把报文原文复制进 Packet,然后执行 TmThreadsSlotProcessPkt
处理之。目前,我们的 ptv->shared->tv
即为 worker 线程的 tv,而 ptv->shared->slot
是 DecodePcapFile
这个 slot。跟进:
static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
{
if (s == NULL) {
tv->tmqh_out(tv, p);
return TM_ECODE_OK;
}
// 断点位置
TmEcode r = TmThreadsSlotVarRun(tv, p, s);
if (unlikely(r == TM_ECODE_FAILED)) {
TmThreadsSlotProcessPktFail(tv, s, p);
return TM_ECODE_FAILED;
}
// 此处 tv->tmqh_out 为 TmqhOutputPacketpool 函数
tv->tmqh_out(tv, p);
TmThreadsHandleInjectedPackets(tv);
return TM_ECODE_OK;
}
/**
* \brief Separate run function so we can call it recursively.
*/
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
{
// 调用各个 SlotFunc: DecodePcapFile, FlowWorker
for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
PACKET_PROFILING_TMM_START(p, s->tm_id);
TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
PACKET_PROFILING_TMM_END(p, s->tm_id);
DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
/* handle error */
if (unlikely(r == TM_ECODE_FAILED)) {
/* Encountered error. Return packets to packetpool and return */
TmThreadsSlotProcessPktFail(tv, s, NULL);
return TM_ECODE_FAILED;
}
if (s->tm_flags & TM_FLAG_DECODE_TM) {
if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
TM_ECODE_OK) {
return TM_ECODE_FAILED;
}
}
}
return TM_ECODE_OK;
}
上面两个函数,简而言之,就是依次执行各个 slot。动态调试时,被执行的函数是 DecodePcapFile
和 FlowWorker
。
综上,在 single 模式中,由 PcapFileDispatch()
调用 libpcap api 读取 pcap 文件,其 callback 函数 PcapFileCallbackLoop()
利用报文产生 Packet 对象,然后调用 TmThreadsSlotProcessPkt()
处理 Packet;此后则依次调用 DecodePcapFile
和 FlowWorker
这两个 slot 函数。
0x03 autofp 模式下的 packet 传递
在 autofp 模式下,RX 线程的调用过程也是 PcapFileDispatch -> PcapFileCallbackLoop -> TmThreadsSlotProcessPkt
。不过,slot 中只剩下 DecodePcapFile
,解码完成之后返回 TmThreadsSlotProcessPkt
:
static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
{
if (s == NULL) {
tv->tmqh_out(tv, p);
return TM_ECODE_OK;
}
TmEcode r = TmThreadsSlotVarRun(tv, p, s);
if (unlikely(r == TM_ECODE_FAILED)) {
TmThreadsSlotProcessPktFail(tv, s, p);
return TM_ECODE_FAILED;
}
// 注意这里 tv->tmqh_out 指向 TmqhOutputFlowHash 函数
tv->tmqh_out(tv, p);
TmThreadsHandleInjectedPackets(tv);
return TM_ECODE_OK;
}
在 single 模式下, tv->tmqh_out
指向 TmqhOutputPacketpool
函数,而在 autofp 模式下,则是 TmqhOutputFlowHash
函数。代码如下:
void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
{
uint32_t qid;
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
if (p->flags & PKT_WANTS_FLOW) {
// 如果有 PKT_WANTS_FLOW 标签,则使用 hash 分流
uint32_t hash = p->flow_hash;
qid = hash % ctx->size;
} else {
// 否则进行 round robin
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
// 把 packet 送进对应的队列
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
return;
}
可见,RX 线程的 tv 中存放了 outctx
成员,其中记录了各个 PacketQueue
。从 pcap 文件中解析出的 packet 会按照策略分流到各个 queue 中,且属于同一个 flow 的 packet 一定会分到相同的队列。
PacketQueue
队列的生产者是 RX 线程,消费者则是 worker 线程。这一次,worker 线程的入口点不是 TmThreadsSlotPktAcqLoop
,而是 TmThreadsSlotVar
。代码如下:
static void *TmThreadsSlotVar(void *td)
{
/* 初始化;放弃权限。与 TmThreadsSlotPktAcqLoop 类似,略 */
// 遍历 slot 列表
for (; s != NULL; s = s->slot_next) {
if (s->SlotThreadInit != NULL) {
/* slot 初始化,与 TmThreadsSlotPktAcqLoop 类似,略 */
}
if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
// 如果 flowworker 在第一个 slot,则取 tv->inq->pq 作为 tv->stream_pq
/* 与 TmThreadsSlotPktAcqLoop 类似,略 */
} else if (s->tm_id == TMM_FLOWWORKER) {
// 如果 flowworker 在其他 slot,则新建一个队列作为 tv->stream_pq
/* 与 TmThreadsSlotPktAcqLoop 类似,略 */
}
}
StatsSetupPrivate(tv);
TmThreadsSetFlag(tv, THV_INIT_DONE | THV_RUNNING);
s = (TmSlot *)tv->tm_slots;
while (run) {
if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
// 输入
p = tv->tmqh_in(tv); // 指向 TmqhInputFlow,从队列中取出一个 packet
if (unlikely(p == NULL)) {
/* 错误处理,略 */
}
if (p != NULL) {
/* run the thread module(s) */
r = TmThreadsSlotVarRun(tv, p, s); // 断点位置
if (r == TM_ECODE_FAILED) {
/* 错误处理,略 */
}
// 输出
tv->tmqh_out(tv, p); // 指向 TmqhOutputPacketpool,同 single 模式
/* now handle the stream pq packets */
TmThreadsHandleInjectedPackets(tv);
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {
run = 0;
}
} /* while (run) */
StatsSyncCounters(tv);
TmThreadsSetFlag(tv, THV_RUNNING_DONE);
TmThreadWaitForFlag(tv, THV_DEINIT);
PacketPoolDestroy();
/* 善后工作,略 */
}
上面的代码先调用 tv->tmqh_in()
,从队列中取出一个 packet,然后调用 TmThreadsSlotVarRun()
处理 packet,最后调用 tv->tmqh_out()
输出。而上文已经分析过 TmThreadsSlotVarRun
函数,它就是沿着 slots 链表调用各个 SlotFunc
。这一次,slot 链表中只有 FlowWorker
。
TmThreadsSlotVar
代码与 TmThreadsSlotPktAcqLoop
有大量重复。事实上,Suricata 代码中充斥着类似的违背设计模式的实现,且夹杂着各种 hack,注释中也时常见到英语语法错误。平心而论,Suricata 的代码质量并非很高。二次开发时,也需要十分小心。0x04 总结
本文的主要结论:
- 在规则文件导入之后、报文处理开始之前,Suricata 会创建所需的线程。
- 每个合法的 (runmode, custom mode) 二元组都会有特定的启动函数,例如 single 模式下会启动 worker 线程,autofp 模式下会启动 RX 和 worker 线程。
- 除了上述工作线程以外,还会启动 FlowManager(FM#01)、FlowRecycler(FR#01)、StatsWakeup(CW) 线程。
- 一个线程的信息存放在 ThreadVars(简称 tv)中。tv 内有线程名、输入输出队列、
tm_slots
链表等信息。 - slots 链表可以视为数据处理的流水线。一个
TmSlot
对应一个线程模块,例如DecodePcapFile
、FlowWorker
。
- 在 single 模式下,W 线程的 slots 链表里包含了 pcap 收取、pcap 解码、flow worker
- 在 autofp 模式下,RX 线程的 slots 链表是 pcap 收取、pcap 解码;W 线程的 slots 链表是 flow worker - single 模式下,packet 是通过函数调用,层层传递到 flow worker 的;而在 autofp 模式下,RX 线程产生 packet 之后,会按照哈希策略放进对应的队列;W 线程则从自己的队列中获取 packet。详见下图。