0x00 从 ESXi 迁移到 PVE

前三篇文章中,我们已经讨论了 Suricata 如何构造规则组。Suricata 将使用这些规则组,对流量进行匹配,进而产生告警。笔者这段时间重新配置了实验环境,从 ESXi 迁移到了 PVE,它并未像 ESXi 那样提供了“交换机是否支持混杂模式”的选项,我们需要自行配置。

首先,建立一个 linux bridge,给每个虚拟机都添加一张网卡,连接到 bridge:

然而,默认情况下,bridge 会学习 mac 地址,所以 IDS 虚拟机很快就收不到其他机器之间的通讯报文了。我们应该将 ageing time 设为 0,具体参考这篇文章。修改 /etc/network/interfaces 文件:

auto br_lab
iface br_lab inet static
        address 192.168.25.0/24
        bridge-ports none
        bridge-stp off
        bridge-fd 0
        bridge-ageing 0

其中 bridge-ageing 0 这一行是我们手动添加的。重启相关服务,我们终于能抓包了:

▲ 如果使用 VirtIO 半虚拟化网卡,则 cksum 会错误,因为内核推迟计算 cksum,详情参考这篇回答。我们这里使用 vmxnet3 虚拟网卡,可以得到正确的 cksum

另外,我们现在把 --enable-debug--enable-unittests 这两个开关打开。configure 指令如下:

CC=clang CXX="clang++" ./configure  \
  --enable-shared=no \
  --enable-static=yes \
  CFLAGS="-g -O0" \
  --disable-gccmarch-native \
  --enable-debug \
  --enable-unittests \
  --prefix=/home/neko/workspace/suricata-dbg/lab/app/ \
  --sysconfdir=/home/neko/workspace/suricata-dbg/lab/conf/ \
  --localstatedir=/home/neko/workspace/suricata-dbg/lab/state
  

bear -- make -j16

make install
make install-conf

现在的 launch.json

        {
            "type": "lldb",
            "request": "launch",
            "name": "suricata replay",
            "program": "/home/neko/suricata-7.0.8/lab/app/bin/suricata",
            "preLaunchTask": "clean",
            "args": [
                "--runmode",
                "single",
                "-r",
                "/home/neko/suricata-7.0.8/lab/one.pcap"
            ],
            "cwd": "/home/neko/suricata-7.0.8/lab/run/",
            "env": {
                "SC_LOG_LEVEL": "Debug"
            },
            "initCommands": [
                // "breakpoint set -n open --condition '(bool)strstr((char*)$arg1, \"suricata.rules\")'"
            ]
        },

0x01 追踪关键逻辑:从警报落盘入手

我们想看“产生警报”的逻辑,而警报会被写入 eve.json 文件。于是,我们可以追踪 Suricata 访问这个文件的过程。先使用以下 breakpoint 找到 Suricata 打开 eve.json 的逻辑:

breakpoint set -n open

发现这个文件的 fileno 是 5。接下来,我们追踪 fd=5 的 write syscall:

            "initCommands": [
                "breakpoint set -n write",
                "breakpoint modify -c '$rdi == 5'"
            ]

调用栈如下:

__write (@__write:3)
_IO_file_write (@_IO_file_write:17)
___lldb_unnamed_symbol3428 (@___lldb_unnamed_symbol3428:29)
_IO_do_write (@_IO_do_write:11)
_IO_file_sync (@_IO_file_sync:48)
_IO_fflush (@_IO_fflush:33)
SCLogFileWrite (\home\neko\suricata-7.0.8\src\util-logopenfile.c:277)
LogFileWrite (\home\neko\suricata-7.0.8\src\util-logopenfile.c:924)
OutputJsonBuilderBuffer (\home\neko\suricata-7.0.8\src\output-json.c:1010)
AlertJson (\home\neko\suricata-7.0.8\src\output-json-alert.c:924)
JsonAlertLogger (\home\neko\suricata-7.0.8\src\output-json-alert.c:992)
OutputPacketLog (\home\neko\suricata-7.0.8\src\output-packet.c:110)
OutputLoggerLog (\home\neko\suricata-7.0.8\src\output.c:883)
FlowWorker (\home\neko\suricata-7.0.8\src\flow-worker.c:643)
TmThreadsSlotVarRun (\home\neko\suricata-7.0.8\src\tm-threads.c:135)
TmThreadsSlotProcessPkt (\home\neko\suricata-7.0.8\src\tm-threads.h:200)
PcapFileCallbackLoop (\home\neko\suricata-7.0.8\src\source-pcap-file-helper.c:108)
___lldb_unnamed_symbol627 (@___lldb_unnamed_symbol627:43)
PcapFileDispatch (\home\neko\suricata-7.0.8\src\source-pcap-file-helper.c:153)
ReceivePcapFileLoop (\home\neko\suricata-7.0.8\src\source-pcap-file.c:180)
TmThreadsSlotPktAcqLoop (\home\neko\suricata-7.0.8\src\tm-threads.c:318)
___lldb_unnamed_symbol3515 (@___lldb_unnamed_symbol3515:152)
___lldb_unnamed_symbol3940 (@___lldb_unnamed_symbol3940:7)

由于我们的运行模式是 single,故所有相关逻辑都在工作线程 W#01 中。场上还存在其他的线程:

我们先来观察警报写入的相关逻辑。跳过过于底层的函数,我们来看 AlertJson()

static int AlertJson(ThreadVars *tv, JsonAlertLogThread *aft, const Packet *p)
{
    AlertJsonOutputCtx *json_output_ctx = aft->json_output_ctx;

    if (p->alerts.cnt == 0 && !(p->flags & PKT_HAS_TAG))
        return TM_ECODE_OK;

    // 遍历 p->alerts.alerts 警报列表。我们这次运行时,只有一条 CVE 警报
    for (int i = 0; i < p->alerts.cnt; i++) {
        const PacketAlert *pa = &p->alerts.alerts[i];
        if (unlikely(pa->s == NULL)) {
            continue;
        }

        // 提取五元组 src_ip, dst_ip, sp, dp, proto(bitmap)
        JsonAddrInfo addr = json_addr_info_zero;
        JsonAddrInfoInit(p, LOG_DIR_PACKET, &addr);

        /* 处理 x-forwarded-for,略 */

        // 初始化 JsonBuilder
        JsonBuilder *jb =
                CreateEveHeader(p, LOG_DIR_PACKET, "alert", &addr, json_output_ctx->eve_ctx);
        if (unlikely(jb == NULL))
            return TM_ECODE_OK;


        // 设置 json 的 alert 字段,例如 action、signature_id、rev、signature、xff
        AlertJsonHeader(json_output_ctx, p, pa, jb, json_output_ctx->flags, &addr, xff_buffer);

        if (IS_TUNNEL_PKT(p)) {
            AlertJsonTunnel(p, jb);
        }

        if (p->flow != NULL) {
            if (pa->flags & PACKET_ALERT_FLAG_TX) {
                if (json_output_ctx->flags & LOG_JSON_APP_LAYER) {
                    // 根据这个 flow 的 alproto 设置 json 的应用层相关字段,例如 http、smtp、smb 等
                    AlertAddAppLayer(p, jb, pa->tx_id, pa->s->flags, json_output_ctx->flags);
                }
                /* including fileinfo data is configured by the metadata setting */
                if (json_output_ctx->flags & LOG_JSON_RULE_METADATA) {
                    // 设置 json 的 files 字段
                    AlertAddFiles(p, jb, pa->tx_id);
                }
            }

            // 设置 json 的 app_proto、app_proto_expected 等字段
            EveAddAppProto(p->flow, jb);

            // 设置 direction 字段
            if (p->flowflags & FLOW_PKT_TOSERVER) {
                jb_set_string(jb, "direction", "to_server");
            } else {
                jb_set_string(jb, "direction", "to_client");
            }

            // 设置 flow 字段
            if (json_output_ctx->flags & LOG_JSON_FLOW) {
                jb_open_object(jb, "flow");
                EveAddFlow(p->flow, jb);
                if (p->flowflags & FLOW_PKT_TOCLIENT) {
                    jb_set_string(jb, "src_ip", addr.dst_ip);
                    jb_set_string(jb, "dest_ip", addr.src_ip);
                    if (addr.sp > 0) {
                        jb_set_uint(jb, "src_port", addr.dp);
                        jb_set_uint(jb, "dest_port", addr.sp);
                    }
                } else {
                    jb_set_string(jb, "src_ip", addr.src_ip);
                    jb_set_string(jb, "dest_ip", addr.dst_ip);
                    if (addr.sp > 0) {
                        jb_set_uint(jb, "src_port", addr.sp);
                        jb_set_uint(jb, "dest_port", addr.dp);
                    }
                }
                jb_close(jb);
            }
        }

        /* 在打开了 LOG_JSON_PAYLOAD 或 LOG_JSON_PAYLOAD_BASE64 的情况下,设置 json 的 payload 字段,略 */
        
        // 设置 json 的 frame 字段
        if (pa->flags & PACKET_ALERT_FLAG_FRAME) {
            AlertAddFrame(p, pa->frame_id, jb, aft->payload_buffer);
        }

        // 若打开了 LOG_JSON_PACKET,则设置 json 的 packet 字段,用 base64 记录整个 packet
        if (json_output_ctx->flags & LOG_JSON_PACKET) {
            EvePacket(p, jb, 0);
        }

        // 对于源于 pcap 的警告,在 json 中记录 pcap 文件名
        char *pcap_filename = PcapLogGetFilename();
        if (pcap_filename != NULL) {
            jb_set_string(jb, "capture_file", pcap_filename);
        }

        // 设置 json 的 verdict 字段
        if (json_output_ctx->flags & LOG_JSON_VERDICT) {
            EveAddVerdict(jb, p);
        }

        // 落盘
        OutputJsonBuilderBuffer(jb, aft->ctx);
        jb_free(jb);
    }

    // 在打开 LOG_JSON_TAGGED_PACKETS 的情况下,若报文匹配上了 tag,则记录
    if ((p->flags & PKT_HAS_TAG) && (json_output_ctx->flags &
            LOG_JSON_TAGGED_PACKETS)) {
        JsonBuilder *packetjs =
                CreateEveHeader(p, LOG_DIR_PACKET, "packet", NULL, json_output_ctx->eve_ctx);
        if (unlikely(packetjs != NULL)) {
            EvePacket(p, packetjs, 0);
            OutputJsonBuilderBuffer(packetjs, aft->ctx);
            jb_free(packetjs);
        }
    }

    return TM_ECODE_OK;
}

读完之后,我们得知,Suricata 在处理 packet 的过程中,会构建 p->alerts.alerts 数组,最终一次性输出所有警报。沿着 AlertJson() 函数向上走,看它的上层 caller:

static int JsonAlertLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
    JsonAlertLogThread *aft = thread_data;

    if (PKT_IS_IPV4(p) || PKT_IS_IPV6(p)) {
        return AlertJson(tv, aft, p);
    } else if (p->alerts.cnt > 0) {
        return AlertJsonDecoderEvent(tv, aft, p);
    }
    return 0;
}

如果 packet 是 ipv4 或 ipv6,则调用我们刚才看过的 AlertJson();否则记录 decoder event 日志。关于 decoder event,可以参考源码中的测试用例,例如 ipv4.frag_pkt_too_large 等。

继续看上层函数:

static TmEcode OutputPacketLog(ThreadVars *tv, Packet *p, void *thread_data)
{
    // 已忽略 debug 内容

    if (list == NULL) {
        /* No child loggers. */
        return TM_ECODE_OK;
    }

    OutputPacketLoggerThreadData *op_thread_data = (OutputPacketLoggerThreadData *)thread_data;
    
    // 这个 list 是全局变量,目前包含 "AlertFastLog"、"JsonAlertLog" 和 "JsonAnomalyLog"
    OutputPacketLogger *logger = list;
    OutputLoggerThreadStore *store = op_thread_data->store;

    while (logger && store) {
        if ((logger->ConditionFunc(tv, store->thread_data, (const Packet *)p)) == TRUE) {
            // 断点时,这里的 logger->LogFunc 为 JsonAlertLogger
            logger->LogFunc(tv, store->thread_data, (const Packet *)p);
        }

        logger = logger->next;
        store = store->next;
    }

    return TM_ECODE_OK;
}


TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data)
{
    LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
    RootLogger *logger = TAILQ_FIRST(&active_loggers);
    // active_loggers 是一个 tailqueue,目前包含 OutputPacketLog 和 OutputTxLog
    
    LoggerThreadStoreNode *thread_store_node = TAILQ_FIRST(thread_store);
    while (logger && thread_store_node) {
        logger->LogFunc(tv, p, thread_store_node->thread_data);

        logger = TAILQ_NEXT(logger, entries);
        thread_store_node = TAILQ_NEXT(thread_store_node, entries);
    }
    return TM_ECODE_OK;
}

上面是两个很简单的包装器,大致意思是对于所有的 logger,记录相关日志。再上一层的调用者就是 FlowWorker 了,我们在第一篇文章末尾的 gprof 结果中,中见过这个函数:

接下来,我们详细观察 FlowWorker。

0x02 FlowWorker

FlowWorker() 实现如下:

static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
{
    FlowWorkerThreadData *fw = data;
    void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);

    DEBUG_VALIDATE_BUG_ON(p == NULL);
    DEBUG_VALIDATE_BUG_ON(tv->flow_queue == NULL);

    SCLogDebug("packet %"PRIu64, p->pcap_cnt);

    // 更新本线程的时间戳字段
    if (!(PKT_IS_PSEUDOPKT(p))) {
        TimeSetByThread(tv->id, p->ts);
        // 更新 thread_store.threads[idx]->pktts 为 p->ts
        // 更新 thread_store.threads[idx]->sys_sec_stamp 为 gettimeofday()
        // thread_store 是全局变量,动态调试时,包含 W#01、FM#01、FR#01、CW、CS 这五个 thread    
    }

    // 下面开始处理 flow
    
    // 这里使用了 PKT_WANTS_FLOW 这个 flag,它由 decoder 设置
    // 如果 decoder 认为这个 packet 需要被 flow engine 处理,则会设置这个 flag 以及 flow_hash
    // 相关逻辑见 flow-hash.c 的 void FlowSetupPacket(Packet *p)
    // flow_hash 是使用 hash_rand(种子)、源地址、目的地址、源端口、目的端口、recursion level 计算得到的
    if (p->flags & PKT_WANTS_FLOW) {
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);

        // 根据 flow hash 获取 flow 对象,写入 p->flow 指针。如果没有,则新建一个
        FlowHandlePacket(tv, &fw->fls, p);
        
        if (likely(p->flow != NULL)) {
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
            
            // 使用 packet 更新 flow 的若干属性,例如 lastts、todstpktcnt、tosrcbytecnt
            // 更新 packet 的 flowflags(如 FLOW_PKT_TOSERVER)和 flags(如 PKT_PROTO_DETECT_TS_DONE)
            // flow->flow_state 是一个 enum:NEW、ESTABLISHED、CLOSED、BYPASSED、CAPTURE_BYPASSED
            if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
                goto housekeeping;
            }
        }
        /* Flow is now LOCKED */

        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW);

    /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
     * pseudo packet created by the flow manager. */
    } else if (p->flags & PKT_HAS_FLOW) {
        FLOWLOCK_WRLOCK(p->flow);
        DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
    }

    SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");

    /* handle TCP and app layer */
    if (p->flow) {
        if (PKT_IS_TCP(p)) {
            SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", p->pcap_cnt,
                    PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);


            if (detect_thread == NULL &&
                    ((PKT_IS_TOSERVER(p) && (p->flowflags & FLOW_PKT_TOSERVER_FIRST)) ||
                            (PKT_IS_TOCLIENT(p) && (p->flowflags & FLOW_PKT_TOCLIENT_FIRST)))) {
                // 如果 detect 不启用,则对于 flow 中的第一个包,设置 flow->file_flags
                DisableDetectFlowFileFlags(p->flow);
            }

            // 更新 stream engine(它负责流跟踪和流重组)
            FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, false);
            
            // 更新 flow->flags 中的 FLOW_TS_APP_UPDATED、FLOW_TC_APP_UPDATED
            PacketAppUpdate2FlowFlags(p);

            /* handle the app layer part of the UDP packet payload */
        } else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) {
            FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP);
            AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow);
            FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP);
            PacketAppUpdate2FlowFlags(p);
        }
    }

    // 更新 DecodeThreadVars 的 counter_engine_events 字段
    PacketUpdateEngineEventCounters(tv, fw->dtv, p);

    /* handle Detect */
    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
    SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
    if (detect_thread != NULL) {
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
        
        // 执行 detect,构建 p.alerts
        Detect(tv, p, detect_thread);
        
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
    }

    // Outputs.
    OutputLoggerLog(tv, p, fw->output_thread);

    /*  Release tcp segments. Done here after alerting can use them. */
    if (p->flow != NULL) {
        /* 一些清理工作,略 */
    }

housekeeping:

    /* take injected flows and add them to our local queue */
    FlowWorkerProcessInjectedFlows(tv, fw, p);

    /* process local work queue */
    FlowWorkerProcessLocalFlows(tv, fw, p);

    return TM_ECODE_OK;
}

我们可以归纳一下 FlowWorker 的行为:

  • 对于收到的每个 packet,都会用 FlowWorker 予以处理
  • 利用 packet 更新 flow 的 flag,并利用 flow 的信息更新 packet 的 flag
  • 更新 stream engine(负责流的跟踪和重组,可参考一篇文章
  • 执行 detect 过程,生成警报
  • 输出警报,落盘

在本文中,我们暂且不关心 stream engine,因为过于底层。我们接下来关注 Detect() 函数,看警报是如何产生的。

0x03 Detect

继续动态调试。我们的 pcap 中存在多个 packet,每个 packet 都会被 FlowWorker 处理一次,而我们只关心触发了警报的那个 packet。先调试一番,发现是 pcap 中的 7 号报文触发了警报,所以我们设置断点条件:

            "initCommands": [
                "breakpoint set -n Detect --condition 'p->pcap_cnt == 7'"
            ]

跟进 Detect() 函数:

TmEcode Detect(ThreadVars *tv, Packet *p, void *data)
{
    DEBUG_VALIDATE_PACKET(p);

    DetectEngineCtx *de_ctx = NULL;
    DetectEngineThreadCtx *det_ctx = (DetectEngineThreadCtx *)data;
    if (det_ctx == NULL) {
        printf("ERROR: Detect has no thread ctx\n");
        goto error;
    }

    if (unlikely(SC_ATOMIC_GET(det_ctx->so_far_used_by_detect) == 0)) {
        (void)SC_ATOMIC_SET(det_ctx->so_far_used_by_detect, 1);
        SCLogDebug("Detect Engine using new det_ctx - %p",
                  det_ctx);
    }

    /* if in MT mode _and_ we have tenants registered, use
     * MT logic. */
    if (det_ctx->mt_det_ctxs_cnt > 0 && det_ctx->TenantGetId != NULL)
    {
        // multi tenant 模式相关,略
    } else {
        de_ctx = det_ctx->de_ctx;
    }

    if (p->flow) {
        DetectFlow(tv, de_ctx, det_ctx, p);
    } else {
        DetectNoFlow(tv, de_ctx, det_ctx, p);
    }

#ifdef PROFILE_RULES
    // profiling 相关,略
#endif

    return TM_ECODE_OK;
error:
    return TM_ECODE_FAILED;
}

可见它是 DetectFlow()DetectNoFlow() 的包装器。跟进它们:

static void DetectFlow(ThreadVars *tv,
                       DetectEngineCtx *de_ctx, DetectEngineThreadCtx *det_ctx,
                       Packet *p)
{
    Flow *const f = p->flow;

    if (p->flags & PKT_NOPACKET_INSPECTION) {
        /* hack: if we are in pass the entire flow mode, we need to still
         * update the inspect_id forward. So test for the condition here,
         * and call the update code if necessary. */
        const int pass = ((f->flags & FLOW_NOPACKET_INSPECTION));
        if (pass) {
            uint8_t flags = STREAM_FLAGS_FOR_PACKET(p);
            flags = FlowGetDisruptionFlags(f, flags);
            if (f->alstate) {
                AppLayerParserSetTransactionInspectId(f, f->alparser, f->alstate, flags, true);
            }
        }
        SCLogDebug("p->pcap %"PRIu64": no detection on packet, "
                "PKT_NOPACKET_INSPECTION is set", p->pcap_cnt);
        return;
    }

    /* we check the flow drop here, and not the packet drop. This is
     * to allow stream engine "invalid" drop packets to still be
     * evaluated by the stream event rules. */
    if (f->flags & FLOW_ACTION_DROP) {
        DEBUG_VALIDATE_BUG_ON(!(PKT_IS_PSEUDOPKT(p)) && !PacketCheckAction(p, ACTION_DROP));
        SCReturn;
    }

    /* see if the packet matches one or more of the sigs */
    (void)DetectRun(tv, de_ctx, det_ctx, p);
}


static void DetectNoFlow(ThreadVars *tv,
                         DetectEngineCtx *de_ctx, DetectEngineThreadCtx *det_ctx,
                         Packet *p)
{
    /* No need to perform any detection on this packet, if the given flag is set.*/
    if ((p->flags & PKT_NOPACKET_INSPECTION) || (PacketCheckAction(p, ACTION_DROP))) {
        return;
    }

    /* see if the packet matches one or more of the sigs */
    DetectRun(tv, de_ctx, det_ctx, p);
    return;
}

这两个函数都会调用 DetectRun()。跟进:

static void DetectRun(ThreadVars *th_v,
        DetectEngineCtx *de_ctx, DetectEngineThreadCtx *det_ctx,
        Packet *p)
{
    SCEnter();
    SCLogDebug("p->pcap_cnt %" PRIu64 " direction %s pkt_src %s", p->pcap_cnt,
            p->flow ? (FlowGetPacketDirection(p->flow, p) == TOSERVER ? "toserver" : "toclient")
                    : "noflow",
            PktSrcToString(p->pkt_src));

    // 如果这个包有 PKT_NOPACKET_INSPECTION flag,则跳过检测逻辑
    if (p->flags & PKT_NOPACKET_INSPECTION) {
        /* nothing to do */
        SCReturn;
    }

    Flow * const pflow = p->flow;

    // 做一些准备工作,返回 flow 的 alproto、flow_flags、app_decoder_events
    DetectRunScratchpad scratch = DetectRunSetup(de_ctx, det_ctx, p, pflow);

    // 执行 IPonly engine,其中会使用 RadixTree 匹配 ip 地址
    // 若有警报产生,则调用 AlertQueueAppend() 把警报加入 det_ctx.alert_queue
    DetectRunInspectIPOnly(th_v, de_ctx, det_ctx, pflow, p);

    // 获取规则组,存入 scratch.sgh
    // 内部逻辑:优先使用 pflow->sgh_toserver 或 pflow->sgh_toclient
    //         若没有上述 sgh,则调用 SigMatchSignaturesGetSgh() 获取 sgh,并缓存起来
    DetectRunGetRuleGroup(de_ctx, p, pflow, &scratch);
    if (scratch.sgh == NULL) {
        SCLogDebug("no sgh for this packet, nothing to match against");
        goto end;
    }

    // 运行 pkt 检测
    DetectRunPrefilterPkt(th_v, de_ctx, det_ctx, p, &scratch);
    PACKET_PROFILING_DETECT_START(p, PROF_DETECT_RULES);
    /* inspect the rules against the packet */
    DetectRulePacketRules(th_v, de_ctx, det_ctx, p, pflow, &scratch);
    PACKET_PROFILING_DETECT_END(p, PROF_DETECT_RULES);

    // 运行 tx 和 frame 检测
    /* run tx/state inspection. Don't call for ICMP error msgs. */
    if (pflow && pflow->alstate && likely(pflow->proto == p->proto)) {
        if (p->proto == IPPROTO_TCP) {
            const TcpSession *ssn = p->flow->protoctx;
            if (ssn && (ssn->flags & STREAMTCP_FLAG_APP_LAYER_DISABLED) == 0) {
                DetectRunFrames(th_v, de_ctx, det_ctx, p, pflow, &scratch);
            }
            // no update to transactions
            if (!PKT_IS_PSEUDOPKT(p) && p->app_update_direction == 0 &&
                    ((PKT_IS_TOSERVER(p) && (p->flow->flags & FLOW_TS_APP_UPDATED) == 0) ||
                            (PKT_IS_TOCLIENT(p) && (p->flow->flags & FLOW_TC_APP_UPDATED) == 0))) {
                goto end;
            }
        } else if (p->proto == IPPROTO_UDP) {
            DetectRunFrames(th_v, de_ctx, det_ctx, p, pflow, &scratch);
        }

        PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX);
        
        // 此处产生了我们所关注的警报
        DetectRunTx(th_v, de_ctx, det_ctx, p, pflow, &scratch);
        
        PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX);
        /* see if we need to increment the inspect_id and reset the de_state */
        PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX_UPDATE);
        AppLayerParserSetTransactionInspectId(
                pflow, pflow->alparser, pflow->alstate, scratch.flow_flags, (scratch.sgh == NULL));
        PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX_UPDATE);
    }

end:
    // 汇总 det_ctx->alert_queue 中的所有警报,按优先级排序并处理
    // 把有效的警报转储到 p.alerts 数组
    DetectRunPostRules(th_v, de_ctx, det_ctx, p, pflow, &scratch);

    DetectRunCleanup(det_ctx, p, pflow);
    SCReturn;
}

动态调试中,我们发现 sgh 下面有四组 engine: pkt_engines, payload_engines, tx_engines, frame_engines。在我们的场景下,只有 tx_engines 有数据,其余都是 nullptr。在执行 DetectRunTx() 的过程中,产生了警报并加入队列,最终从队列转储到 alerts 数组。

TX 是 transaction(事务)的缩写。HTTP 的请求和响应形成了一个事务,具体情况可以参考文档

跟进 DetectRunTx()

static void DetectRunTx(ThreadVars *tv,
                    DetectEngineCtx *de_ctx,
                    DetectEngineThreadCtx *det_ctx,
                    Packet *p,
                    Flow *f,
                    DetectRunScratchpad *scratch)
{
    // 已忽略 debug 和 profiling 相关逻辑
    
    const uint8_t flow_flags = scratch->flow_flags;
    const SigGroupHead * const sgh = scratch->sgh;
    void * const alstate = f->alstate;
    const uint8_t ipproto = f->proto;
    const AppProto alproto = f->alproto;

    const uint64_t total_txs = AppLayerParserGetTxCnt(f, alstate);
    uint64_t tx_id_min = AppLayerParserGetTransactionInspectId(f->alparser, flow_flags);
    const int tx_end_state = AppLayerParserGetStateProgressCompletionStatus(alproto, flow_flags);

    // 获取迭代函数,动态调试发现是 AppLayerDefaultGetTxIterator()
    AppLayerGetTxIteratorFunc IterFunc = AppLayerGetTxIterator(ipproto, alproto);
    AppLayerGetTxIterState state;
    memset(&state, 0, sizeof(state));

    while (1) {
        AppLayerGetTxIterTuple ires = IterFunc(ipproto, alproto, alstate, tx_id_min, total_txs, &state);
        if (ires.tx_ptr == NULL)
            break;
        
        // 获取事务
        DetectTransaction tx = GetDetectTx(ipproto, alproto,
                alstate, ires.tx_id, ires.tx_ptr, tx_end_state, flow_flags);
        if (tx.tx_ptr == NULL) {
            SCLogDebug("%p/%"PRIu64" no transaction to inspect",
                    tx.tx_ptr, tx_id_min);

            tx_id_min++; // next (if any) run look for +1
            goto next;
        }
        tx_id_min = tx.tx_id + 1; // next look for cur + 1

        bool do_sort = false; // do we need to sort the tx candidate list?
        uint32_t array_idx = 0;
        uint32_t total_rules = det_ctx->match_array_cnt;
        total_rules += (tx.de_state ? tx.de_state->cnt : 0);

        /* run prefilter engines and merge results into a candidates array */
        if (sgh->tx_engines) {
            // 执行 tx prefilter,结果存入 det_ctx->pmq,它是 PrefilterRuleStore
            PACKET_PROFILING_DETECT_START(p, PROF_DETECT_PF_TX);
            DetectRunPrefilterTx(det_ctx, sgh, p, ipproto, flow_flags, alproto,
                    alstate, &tx);
            PACKET_PROFILING_DETECT_END(p, PROF_DETECT_PF_TX);
            SCLogDebug("%p/%"PRIu64" rules added from prefilter: %u candidates",
                    tx.tx_ptr, tx.tx_id, det_ctx->pmq.rule_id_array_cnt);

            // 下面,将 prefileter 确定的候选 sig 存入 det_ctx->tx_candidates
            
            total_rules += det_ctx->pmq.rule_id_array_cnt;
            if (!(RuleMatchCandidateTxArrayHasSpace(det_ctx, total_rules))) {
                RuleMatchCandidateTxArrayExpand(det_ctx, total_rules);
            }

            for (uint32_t i = 0; i < det_ctx->pmq.rule_id_array_cnt; i++) {
                const Signature *s = de_ctx->sig_array[det_ctx->pmq.rule_id_array[i]];
                const SigIntId id = s->num;
                det_ctx->tx_candidates[array_idx].s = s;
                det_ctx->tx_candidates[array_idx].id = id;
                det_ctx->tx_candidates[array_idx].flags = NULL;
                det_ctx->tx_candidates[array_idx].stream_reset = 0;
                array_idx++;
            }
            PMQ_RESET(&det_ctx->pmq);
        } else {
            if (!(RuleMatchCandidateTxArrayHasSpace(det_ctx, total_rules))) {
                RuleMatchCandidateTxArrayExpand(det_ctx, total_rules);
            }
        }

        // 把 det_ctx->match_array 合并到 det_ctx->tx_candidates
        /* merge 'state' rules from the regular prefilter */
        RuleMatchCandidateMergeStateRules(det_ctx, &array_idx);

        /* merge stored state into results */
        if (tx.de_state != NULL) {
            // 注:动态调试没有进入这个分支
            const uint32_t old = array_idx;

            /* if tx.de_state->flags has 'new file' set and sig below has
             * 'file inspected' flag, reset the file part of the state */
            const bool have_new_file = (tx.de_state->flags & DETECT_ENGINE_STATE_FLAG_FILE_NEW);
            if (have_new_file) {
                SCLogDebug("%p/%"PRIu64" destate: need to consider new file",
                        tx.tx_ptr, tx.tx_id);
                tx.de_state->flags &= ~DETECT_ENGINE_STATE_FLAG_FILE_NEW;
            }

            SigIntId state_cnt = 0;
            DeStateStore *tx_store = tx.de_state->head;
            for (; tx_store != NULL; tx_store = tx_store->next) {
                SCLogDebug("tx_store %p", tx_store);

                SigIntId store_cnt = 0;
                for (store_cnt = 0;
                        store_cnt < DE_STATE_CHUNK_SIZE && state_cnt < tx.de_state->cnt;
                        store_cnt++, state_cnt++)
                {
                    DeStateStoreItem *item = &tx_store->store[store_cnt];
                    SCLogDebug("rule id %u, inspect_flags %u", item->sid, item->flags);
                    if (have_new_file && (item->flags & DE_STATE_FLAG_FILE_INSPECT)) {
                        /* remove part of the state. File inspect engine will now
                         * be able to run again */
                        item->flags &= ~(DE_STATE_FLAG_SIG_CANT_MATCH|DE_STATE_FLAG_FULL_INSPECT|DE_STATE_FLAG_FILE_INSPECT);
                        SCLogDebug("rule id %u, post file reset inspect_flags %u", item->sid, item->flags);
                    }
                    det_ctx->tx_candidates[array_idx].s = de_ctx->sig_array[item->sid];
                    det_ctx->tx_candidates[array_idx].id = item->sid;
                    det_ctx->tx_candidates[array_idx].flags = &item->flags;
                    det_ctx->tx_candidates[array_idx].stream_reset = 0;
                    array_idx++;
                }
            }
            do_sort |= (old && old != array_idx); // sort if continue list adds sids
            SCLogDebug("%p/%" PRIu64 " rules added from 'continue' list: %u", tx.tx_ptr, tx.tx_id,
                    array_idx - old);
        }
        if (do_sort) {
            qsort(det_ctx->tx_candidates, array_idx, sizeof(RuleMatchCandidateTx),
                    DetectRunTxSortHelper);
        }

        det_ctx->tx_id = tx.tx_id;
        det_ctx->tx_id_set = true;
        det_ctx->p = p;

        // 接下来考虑每个候选 sig

        /* run rules: inspect the match candidates */
        for (uint32_t i = 0; i < array_idx; i++) {
            RuleMatchCandidateTx *can = &det_ctx->tx_candidates[i];
            const Signature *s = det_ctx->tx_candidates[i].s;
            uint32_t *inspect_flags = det_ctx->tx_candidates[i].flags;

            // 避免重复检查同一个 sig
            while ((i + 1) < array_idx &&
                    det_ctx->tx_candidates[i].s == det_ctx->tx_candidates[i + 1].s) {
                SCLogDebug("%p/%" PRIu64 " inspecting SKIP NEXT: sid %u (%u), flags %08x",
                        tx.tx_ptr, tx.tx_id, s->id, s->num, inspect_flags ? *inspect_flags : 0);
                i++;
            }
            
            // 下面准备检查 sig

            SCLogDebug("%p/%"PRIu64" inspecting: sid %u (%u), flags %08x",
                    tx.tx_ptr, tx.tx_id, s->id, s->num, inspect_flags ? *inspect_flags : 0);

            if (inspect_flags) {
                if (*inspect_flags & (DE_STATE_FLAG_FULL_INSPECT|DE_STATE_FLAG_SIG_CANT_MATCH)) {
                    SCLogDebug("%p/%"PRIu64" inspecting: sid %u (%u), flags %08x ALREADY COMPLETE",
                            tx.tx_ptr, tx.tx_id, s->id, s->num, *inspect_flags);
                    continue;
                }
            }

            if (inspect_flags) {
                /* continue previous inspection */
                SCLogDebug("%p/%" PRIu64 " Continuing sid %u", tx.tx_ptr, tx.tx_id, s->id);
            } else {
                /* start new inspection */
                SCLogDebug("%p/%"PRIu64" Start sid %u", tx.tx_ptr, tx.tx_id, s->id);
            }

            // 检查 sig
            /* call individual rule inspection */
            RULE_PROFILING_START(p);
            const int r = DetectRunTxInspectRule(tv, de_ctx, det_ctx, p, f, flow_flags,
                    alstate, &tx, s, inspect_flags, can, scratch);
            if (r == 1) {
                /* match */
                DetectRunPostMatch(tv, det_ctx, p, s);

                const uint8_t alert_flags = (PACKET_ALERT_FLAG_STATE_MATCH | PACKET_ALERT_FLAG_TX);
                SCLogDebug("%p/%"PRIu64" sig %u (%u) matched", tx.tx_ptr, tx.tx_id, s->id, s->num);
                
                // 成功匹配,加入 alert 队列
                AlertQueueAppend(det_ctx, s, p, tx.tx_id, alert_flags);
            }
            DetectVarProcessList(det_ctx, p->flow, p);
            RULE_PROFILING_END(det_ctx, s, r, p);
        }

        det_ctx->tx_id = 0;
        det_ctx->tx_id_set = false;
        det_ctx->p = NULL;

        /* see if we have any updated state to store in the tx */

        uint64_t new_detect_flags = 0;
        /* this side of the tx is done */
        if (tx.tx_progress >= tx.tx_end_state) {
            // 该 tx 在此方向已经检查完毕,设置 APP_LAYER_TX_INSPECTED_FLAG
            new_detect_flags |= APP_LAYER_TX_INSPECTED_FLAG;
            SCLogDebug("%p/%"PRIu64" tx is done for direction %s. Flag %016"PRIx64,
                    tx.tx_ptr, tx.tx_id,
                    flow_flags & STREAM_TOSERVER ? "toserver" : "toclient",
                    new_detect_flags);
        }
        if (tx.prefilter_flags != tx.prefilter_flags_orig) {
            // 更新 prefilter_flags
            new_detect_flags |= tx.prefilter_flags;
            DEBUG_VALIDATE_BUG_ON(new_detect_flags & APP_LAYER_TX_RESERVED_FLAGS);
            SCLogDebug("%p/%"PRIu64" updated prefilter flags %016"PRIx64" "
                    "(was: %016"PRIx64") for direction %s. Flag %016"PRIx64,
                    tx.tx_ptr, tx.tx_id, tx.prefilter_flags, tx.prefilter_flags_orig,
                    flow_flags & STREAM_TOSERVER ? "toserver" : "toclient",
                    new_detect_flags);
        }
        if (new_detect_flags != 0 &&
                (new_detect_flags | tx.detect_flags) != tx.detect_flags)
        {
            new_detect_flags |= tx.detect_flags;
            DEBUG_VALIDATE_BUG_ON(new_detect_flags & APP_LAYER_TX_RESERVED_FLAGS);
            SCLogDebug("%p/%"PRIu64" Storing new flags %016"PRIx64" (was %016"PRIx64")",
                    tx.tx_ptr, tx.tx_id, new_detect_flags, tx.detect_flags);

            StoreDetectFlags(&tx, flow_flags, ipproto, alproto, new_detect_flags);
        }
next:
        InspectionBufferClean(det_ctx);

        if (!ires.has_next)
            break;
    }
}

这个函数很长,做的事情大概就是先运行 tx prefilter;然后针对潜在的能匹配的 sig,逐个进行匹配;若匹配,则加入警报队列。其中有两个函数需要关注:负责执行 prefilter 的 DetectRunPrefilterTx(),以及负责执行单个 sig 匹配的 DetectRunTxInspectRule()

0x04 DetectRunPrefilterTx

先来观察 DetectRunPrefilterTx()。实现如下:

void DetectRunPrefilterTx(DetectEngineThreadCtx *det_ctx,
        const SigGroupHead *sgh,
        Packet *p,
        const uint8_t ipproto,
        const uint8_t flow_flags,
        const AppProto alproto,
        void *alstate,
        DetectTransaction *tx)
{
    /* reset rule store */
    det_ctx->pmq.rule_id_array_cnt = 0;

    SCLogDebug("packet %" PRIu64 " tx %p progress %d tx->prefilter_flags %" PRIx64, p->pcap_cnt,
            tx->tx_ptr, tx->tx_progress, tx->prefilter_flags);

    PrefilterEngine *engine = sgh->tx_engines;
    
    // 遍历每一个 tx_engine
    // 动态调试时的 tx_engines 如下:PrefilterMpm(alproto=1)、PrefilterMpm(alproto=31)
    // alproto=1 对应 ALPROTO_HTTP1,alproto=31 对应 ALPROTO_HTTP2
    do {
        
        // 若 tx_engine 的 alproto 与这个流的 alproto 不一致,则跳过
        // 所以 HTTP1 的 tx_engine 会继续执行,HTTP2 则会被跳过
        if (engine->alproto != alproto)
            goto next;
            
        // 若这个 tx 还未进展到指定阶段,则跳过
        if (engine->ctx.tx_min_progress > tx->tx_progress)
            break;
        if (tx->tx_progress > engine->ctx.tx_min_progress) {
            if (tx->prefilter_flags & BIT_U64(engine->ctx.tx_min_progress)) {
                goto next;
            }
        }

        // 具体执行 prefilter,动态调试时为 PrefilterMpm()
        PREFILTER_PROFILING_START(det_ctx);
        engine->cb.PrefilterTx(det_ctx, engine->pectx, p, p->flow, tx->tx_ptr, tx->tx_id,
                tx->tx_data_ptr, flow_flags);
        PREFILTER_PROFILING_END(det_ctx, engine->gid);

        if (tx->tx_progress > engine->ctx.tx_min_progress && engine->is_last_for_progress) {
            tx->prefilter_flags |= BIT_U64(engine->ctx.tx_min_progress);
        }
    next:
        if (engine->is_last)
            break;
        engine++;
    } while (1);

    // 给匹配上的 sig 排序
    /* Sort the rule list to lets look at pmq.
     * NOTE due to merging of 'stream' pmqs we *MAY* have duplicate entries */
    if (likely(det_ctx->pmq.rule_id_array_cnt > 1)) {
        PACKET_PROFILING_DETECT_START(p, PROF_DETECT_PF_SORT1);
        QuickSortSigIntId(det_ctx->pmq.rule_id_array, det_ctx->pmq.rule_id_array_cnt);
        PACKET_PROFILING_DETECT_END(p, PROF_DETECT_PF_SORT1);
    }
}

在动态调试时,我们发现 engine->cb.PrefilterTx 即为 PrefilterMpm 函数。跟进:

/** \brief Generic Mpm prefilter callback
 *
 *  \param det_ctx detection engine thread ctx
 *  \param p packet to inspect
 *  \param f flow to inspect
 *  \param txv tx to inspect
 *  \param pectx inspection context
 */
static void PrefilterMpm(DetectEngineThreadCtx *det_ctx, const void *pectx, Packet *p, Flow *f,
        void *txv, const uint64_t idx, const AppLayerTxData *_txd, const uint8_t flags)
{
    SCEnter();

    const PrefilterMpmCtx *ctx = (const PrefilterMpmCtx *)pectx;
    const MpmCtx *mpm_ctx = ctx->mpm_ctx;
    SCLogDebug("running on list %d", ctx->list_id);
    // 这里 list_id 是 7

    // ctx->GetData 是 detect-http-uri.c 中的 GetData()
    InspectionBuffer *buffer = ctx->GetData(det_ctx, ctx->transforms,
            f, flags, txv, ctx->list_id);
    if (buffer == NULL)
        return;
    // 返回的 buffer 属性如下:
    //   - inspect = "/cgi-bin/luci/;stok=/locale?form=country"
    //   - inspect_offset = 0
    //   - inspect_len = 40

    const uint32_t data_len = buffer->inspect_len;
    const uint8_t *data = buffer->inspect;

    SCLogDebug("mpm'ing buffer:");
    //PrintRawDataFp(stdout, data, data_len);

    if (data != NULL && data_len >= mpm_ctx->minlen) {
        (void)mpm_table[mpm_ctx->mpm_type].Search(mpm_ctx,
                &det_ctx->mtcu, &det_ctx->pmq, data, data_len);
        // 这里 mpm_ctx->mpm_type = 1,mpm_table[1] 是一个 AC 自动机
        // 其 Search 函数是 util-mpm-ac.c 中定义的 SCACSearch()
        // 最终实际调用:SCACSearch(mpm_ctx, &det_ctx->mtcu, &det_ctx->pmq, <http uri>, <len>)
        // 其中 det_ctx->mtcu 是 uricontent 专用的 MpmThreadCtx
        // det_ctx->pmq 是 PrefilterRuleStore,存放匹配结果
        
        PREFILTER_PROFILING_ADD_BYTES(det_ctx, data_len);
    }
}

SCACSearch() 是 AC 自动机的算法实现,无需跟进。我们至此看完了 prefilter 的工作流程:对于每一个需要使用的 tx engine,调用其 prefilter,最后给候选 sig 排序。动态调试时的 prefilter 是一个 AC 自动机,其 MPM ctx 的GetData 函数是从 tx 中取出 request uri。

0x05 DetectRunTxInspectRule

针对 prefilter 筛出的候选 sig,程序会调用 DetectRunTxInspectRule() 进行匹配。实现如下:

/** \internal
 *  \brief inspect a rule against a transaction
 *
 *  Inspect a rule. New detection or continued stateful
 *  detection.
 *
 *  \param stored_flags pointer to stored flags or NULL.
 *         If stored_flags is set it means we're continuing
 *         inspection from an earlier run.
 *
 *  \retval bool true sig matched, false didn't match
 */
static bool DetectRunTxInspectRule(ThreadVars *tv,
        DetectEngineCtx *de_ctx,
        DetectEngineThreadCtx *det_ctx,
        Packet *p,
        Flow *f,
        const uint8_t in_flow_flags,   // direction, EOF, etc
        void *alstate,
        DetectTransaction *tx,
        const Signature *s,
        uint32_t *stored_flags,
        RuleMatchCandidateTx *can,
        DetectRunScratchpad *scratch)
{
    uint8_t flow_flags = in_flow_flags;
    const int direction = (flow_flags & STREAM_TOSERVER) ? 0 : 1;
    uint32_t inspect_flags = stored_flags ? *stored_flags : 0;
    int total_matches = 0;
    uint16_t file_no_match = 0;
    bool retval = false;
    bool mpm_before_progress = false;   // is mpm engine before progress?
    bool mpm_in_progress = false;       // is mpm engine in a buffer we will revisit?

    TRACE_SID_TXS(s->id, tx, "starting %s", direction ? "toclient" : "toserver");
    
    
    /* for a new inspection we inspect pkt header and packet matches */
    if (likely(stored_flags == NULL)) {
        // 先检测 packet 层面的特征是否匹配(例如 ipv4/ipv6 版本是否匹配、端口是否匹配)
        TRACE_SID_TXS(s->id, tx, "first inspect, run packet matches");
        if (DetectRunInspectRuleHeader(p, f, s, s->flags, s->proto.flags) == 0) {
            TRACE_SID_TXS(s->id, tx, "DetectRunInspectRuleHeader() no match");
            return false;
        }
        if (DetectEnginePktInspectionRun(tv, det_ctx, s, f, p, NULL) == false) {
            TRACE_SID_TXS(s->id, tx, "DetectEnginePktInspectionRun no match");
            return false;
        }
        // 检查 alproto 是否匹配
        /* stream mpm and negated mpm sigs can end up here with wrong proto */
        if (!(AppProtoEquals(s->alproto, f->alproto) || s->alproto == ALPROTO_UNKNOWN)) {
            TRACE_SID_TXS(s->id, tx, "alproto mismatch");
            return false;
        }
    }

    const DetectEngineAppInspectionEngine *engine = s->app_inspect;
    // s->app_inspect 链表内容:http2 engine 和 http1 engine 
    while (engine != NULL) {
        // 考虑每个 engine
        
        TRACE_SID_TXS(s->id, tx, "engine %p inspect_flags %x", engine, inspect_flags);
        if (!(inspect_flags & BIT_U32(engine->id)) &&
                direction == engine->dir)
        {
            // 若 engine 的 alproto 与 flow 的不匹配,则跳过
            const bool skip_engine = (engine->alproto != 0 && engine->alproto != f->alproto);
            /* special case: file_data on 'alert tcp' will have engines
             * in the list that are not for us. */
            if (unlikely(skip_engine)) {
                engine = engine->next;
                continue;
            }

            // 若 tx 进度不匹配,则跳过
            /* engines are sorted per progress, except that the one with
             * mpm/prefilter enabled is first */
            if (tx->tx_progress < engine->progress) {
                SCLogDebug("tx progress %d < engine progress %d",
                        tx->tx_progress, engine->progress);
                break;
            }
            
            
            if (engine->mpm) {
                if (tx->tx_progress > engine->progress) {
                    // 动态调试时,tx->tx_progress=5, engine->progress=1
                    TRACE_SID_TXS(s->id, tx,
                            "engine->mpm: t->tx_progress %u > engine->progress %u, so set "
                            "mpm_before_progress",
                            tx->tx_progress, engine->progress);
                    mpm_before_progress = true;
                } else if (tx->tx_progress == engine->progress) {
                    TRACE_SID_TXS(s->id, tx,
                            "engine->mpm: t->tx_progress %u == engine->progress %u, so set "
                            "mpm_in_progress",
                            tx->tx_progress, engine->progress);
                    mpm_in_progress = true;
                }
            }

            /* run callback: but bypass stream callback if we can */
            uint8_t match;
            if (unlikely(engine->stream && can->stream_stored)) {
                match = can->stream_result;
                TRACE_SID_TXS(s->id, tx, "stream skipped, stored result %d used instead", match);
            } else {
                KEYWORD_PROFILING_SET_LIST(det_ctx, engine->sm_list);
                DEBUG_VALIDATE_BUG_ON(engine->v2.Callback == NULL);
                
                // 调用具体的匹配逻辑
                // - engine.v2.GetData = (GetData at detect-http-uri.c:223)
                // - engine.v2.Callback = (DetectEngineInspectBufferGeneric at detect-engine.c:2192)
                match = engine->v2.Callback(
                        de_ctx, det_ctx, engine, s, f, flow_flags, alstate, tx->tx_ptr, tx->tx_id);
                TRACE_SID_TXS(s->id, tx, "engine %p match %d", engine, match);
                
                if (engine->stream) {
                    can->stream_stored = true;
                    can->stream_result = match;
                    TRACE_SID_TXS(s->id, tx, "stream ran, store result %d for next tx (if any)", match);
                }
            }
            if (match == DETECT_ENGINE_INSPECT_SIG_MATCH) {
                // 匹配成功
                inspect_flags |= BIT_U32(engine->id);
                engine = engine->next;
                total_matches++;
                continue;
            } else if (match == DETECT_ENGINE_INSPECT_SIG_MATCH_MORE_FILES) {
                // 匹配成功,但还有后续文件待匹配,则不设置 inspect_flags
                /* if the file engine matched, but indicated more
                 * files are still in progress, we don't set inspect
                 * flags as these would end inspection for this tx */
                engine = engine->next;
                total_matches++;
                continue;
            } else if (match == DETECT_ENGINE_INSPECT_SIG_CANT_MATCH) {
                // 以下是匹配失败
                inspect_flags |= DE_STATE_FLAG_SIG_CANT_MATCH;
                inspect_flags |= BIT_U32(engine->id);
            } else if (match == DETECT_ENGINE_INSPECT_SIG_CANT_MATCH_FILES) {
                inspect_flags |= DE_STATE_FLAG_SIG_CANT_MATCH;
                inspect_flags |= BIT_U32(engine->id);
                file_no_match = 1;
            }
            /* implied DETECT_ENGINE_INSPECT_SIG_NO_MATCH */
            if (engine->mpm && mpm_before_progress) {
                inspect_flags |= DE_STATE_FLAG_SIG_CANT_MATCH;
                inspect_flags |= BIT_U32(engine->id);
            }
            break;
        }
        engine = engine->next;
    }
    TRACE_SID_TXS(s->id, tx, "inspect_flags %x, total_matches %u, engine %p",
            inspect_flags, total_matches, engine);

    if (engine == NULL && total_matches) {
        // 设置 DE_STATE_FLAG_FULL_INSPECT 表示已经运行过所有 engine 且有匹配
        inspect_flags |= DE_STATE_FLAG_FULL_INSPECT;
        TRACE_SID_TXS(s->id, tx, "MATCH");
        retval = true;
    }

    if (stored_flags) {
        *stored_flags = inspect_flags;
        TRACE_SID_TXS(s->id, tx, "continue inspect flags %08x", inspect_flags);
    } else {
        // store... or? If tx is done we might not want to come back to this tx

        // also... if mpmid tracking is enabled, we won't do a sig again for this tx...
        TRACE_SID_TXS(s->id, tx, "start inspect flags %08x", inspect_flags);
        if (inspect_flags & DE_STATE_FLAG_SIG_CANT_MATCH) {
            if (file_no_match) {
                /* if we have a mismatch on a file sig, we need to keep state.
                 * We may get another file on the same tx (for http and smtp
                 * at least), so for a new file we need to re-eval the sig.
                 * Thoughts / TODO:
                 *  - not for some protos that have 1 file per tx (e.g. nfs)
                 *  - maybe we only need this for file sigs that mix with
                 *    other matches? E.g. 'POST + filename', is different than
                 *    just 'filename'.
                 */
                DetectRunStoreStateTx(scratch->sgh, f, tx->tx_ptr, tx->tx_id, s,
                        inspect_flags, flow_flags, file_no_match);
            }
        } else if ((inspect_flags & DE_STATE_FLAG_FULL_INSPECT) && mpm_before_progress) {
            // 这是本次执行进入的分支
            TRACE_SID_TXS(s->id, tx, "no need to store match sig, "
                    "mpm won't trigger for it anymore");

            if (inspect_flags & DE_STATE_FLAG_FILE_INSPECT) {
                TRACE_SID_TXS(s->id, tx, "except that for new files, "
                        "we may have to revisit anyway");
                DetectRunStoreStateTx(scratch->sgh, f, tx->tx_ptr, tx->tx_id, s,
                        inspect_flags, flow_flags, file_no_match);
            }
        } else if ((inspect_flags & DE_STATE_FLAG_FULL_INSPECT) == 0 && mpm_in_progress) {
            TRACE_SID_TXS(s->id, tx, "no need to store no-match sig, "
                    "mpm will revisit it");
        } else {
            TRACE_SID_TXS(s->id, tx, "storing state: flags %08x", inspect_flags);
            DetectRunStoreStateTx(scratch->sgh, f, tx->tx_ptr, tx->tx_id, s,
                    inspect_flags, flow_flags, file_no_match);
        }
    }

    // 返回“是否匹配成功”
    return retval;
}
💡
从代码中可以推测,inspect_flags 这个 bitmap 的语义是“第 k 个 engine 是否已经完成过匹配任务”。 

与 prefilter 类似,对单个 sig 的匹配,也是分 engine 进行的。http2 的 engine 被跳过,只执行了 http1 的 engine。具体调用的匹配函数是 DetectEngineInspectBufferGeneric(),负责用 sig 中记录的内容去匹配 flow 的 request uri。

0x06 总结

本文的主要结论:

  • 每一个 TCP 包都会经过 FlowWorker() 函数。它会利用 stream engine 维护每个 TCP 流的状态。
  • 检测过程有几个大类:iponly、packet、tx、frame。我们指定的 http 规则位于 tx engine 中,且分为 http1 和 http2 两个 engine。
  • 动态调试时,tx 检测过程先运行了 prefilter(AC 自动机),再对候选 sig 逐一匹配。
  • Suricata 有一些避免重复执行匹配引擎的机制,详见代码。