自动巡检框架的设计和实现

0x00 起因

读者应该已经熟悉各种各样的 uptime bot。这些 bot 定期执行任务(例如 ping 一下服务器、看一眼首页的 HTTP 状态码是否为 200),生成日志,于是我们便可以知道在过去几天内,服务器 down 机的时间有多长。这类应用的典型代表是 louislam/uptime-kuma

▲ 我们所熟悉的 uptime-kuma 界面

而笔者面临的任务,比单纯的 uptime 稍稍复杂一些。简而言之,笔者手上有若干台服务器,上面运行着各种业务;笔者需要定期检查这些业务是否正常,指标包含 web 服务是否可以正常使用、内存及磁盘占用等。本文将这样的日常检查称为“巡检”。一场巡检可能包含:

  • 用浏览器打开网站,登录账号,进入各个页面,观察是否正常
  • ssh 连接服务器,运行 htop 等指令
  • 编写巡检报告

一位工作人员可能需要巡检多台服务器,如果人工执行所有巡检步骤,对工作者而言无异于体力劳动。另一方面,巡检员直接用 ssh 登录服务器,存在误操作的风险;巡检报告中的一些字段(例如日期)也容易误填。于是,我们有必要实现一个自动巡检系统。

0x01 基础工具:利用 pexpect 实现 ssh 自动化

显然,欲实现自动巡检,则必须先能自动化控制 ssh 和浏览器。我们先来讨论 ssh。第一种方案是建立 ssh 信道向服务端传输指令,有现成的 paramiko 库可以使用;第二种方案是在本机执行 ssh user@ip 指令,然后我们与 ssh 进程进行本地通讯。此方案可以使用 pexpect 库实现。

考虑到方案二的可扩展性优于方案一(不仅能支持 ssh,也能支持其他命令行程序),我们选择了 pexpect。使用方法是:启动一个进程,等待它发来某个字符串,然后发送我们的 payload,如此循环,直到完成任务。举个例子,假设我们想要调用 passwd 工具修改密码,则需要先输入旧密码,再输入两次新密码:

使用 pexpect,我们的工作流程就是:启动 passwd 进程,等待出现 Current password: 字符串,然后发送旧密码;等待出现 New password: 字符串,发送新密码;等待出现 Retype new password:,再次发送新密码。代码如下:

import pexpect

io = pexpect.spawn('passwd')
io.expect('Current password:')
io.sendline('-----------')
io.expect('New password:')
io.sendline('***********')
io.expect('Retype new password:')
io.sendline('***********')
print(io.read())
# b' \r\npasswd: password updated successfully\r\n'
💡
事实上,也可以通过 pwntools 实现,相关代码见 pwnlib/term/term.py。由于 pexpect 是专为此用途设计的,相关文档容易找到,所以本文采用 pexpect。

回顾 ssh 密码登录流程。服务器会发来 xxx's password: 字符串,在此之后我们输入密码,进入服务器的 shell。

因此,假设我们想要在服务器上执行 df -h 查看磁盘占用,我们只需写出以下代码:

import pexpect
import os

io = pexpect.spawn(
    'ssh neko@192.168.8.200',
    logfile=open('log', 'wb')
)

io.expect_exact('password:')
io.sendline(os.environ['PASSWD'])

io.expect_exact('$')
io.sendline('df -h')
io.expect_exact('$')

print(io.before, io.after)
# b' df -h\r\n\x1b[?2004l\rFilesystem      Size  Used Avail Use% Mounted on\r\nudev            7.8G     0  7.8G   0% /dev\r\ntmpfs           1.6G  744K  1.6G   1% /run\r\n/dev/sda1       294G  1.5G  277G   1% /\r\ntmpfs           7.9G     0  7.9G   0% /dev/shm\r\ntmpfs           5.0M     0  5.0M   0% /run/lock\r\ntmpfs           1.6G     0  1.6G   0% /run/user/1000\r\n\x1b[?2004h\x1b]0;neko@center: ~\x07\x1b[01;32mneko@center\x1b[00m:\x1b[01;34m~\x1b[00m' b'$'

提供 logfile 参数之后,pexpect 会将通讯字节流写入日志文件,很方便调试。代码中采用了 io.expect_exact() 方法而非 io.expect(),因为后者匹配的是正则表达式,前者是直接匹配字符串。当匹配完成之后,我们可以读取 io.before 获得匹配前的字符串,读取 io.after 获得匹配之后的字符串。

不过,我们发现,读出的字符串里面包含有一些特殊字符。这些是 ANSI escape code,可以用于实现文本颜色等功能,详情可见 Burke Libbey 的一篇文章。我们之后使用 htop 等指令时,服务器还会发来更多的控制字符。我们需要模拟出一个终端,吞进这些字节流,然后渲染出界面——每个位置是哪个字符、是什么颜色。selectel/pyte 库可以用于此项工作。

💡
我们自己电脑上使用的终端,例如 Windows Terminal,都是支持 VT-xxx 控制字符的。从而,若我们直接 print 出这些控制字符,屏幕上也能显示正确的渲染结果。这可以帮助判断截取的字节流是否合适。
💡
可以调用 io.interact() 使 pexpect 进入交互状态,这也是调试利器。行为类似于 pwntools 的 io.interactive()

0x02 基础工具:利用 pyte 解析 ANSI 字节流

pyte 的使用方法是:建立一个指定尺寸的虚拟终端;喂进字节流;读取虚拟终端的状态。基础代码:

import pyte

term = pyte.Screen(120, 80)
stream = pyte.Stream(term)

raw_bytes = b' df -h\r\n\x1b[?2004l\rFilesystem      Size  Used Avail Use% Mounted on\r\nudev            7.8G     0  7.8G   0% /dev\r\ntmpfs           1.6G  744K  1.6G   1% /run\r\n/dev/sda1       294G  1.5G  277G   1% /\r\ntmpfs           7.9G     0  7.9G   0% /dev/shm\r\ntmpfs           5.0M     0  5.0M   0% /run/lock\r\ntmpfs           1.6G     0  1.6G   0% /run/user/1000\r\n\x1b[?2004h\x1b]0;neko@center: ~\x07\x1b[01;32mneko@center\x1b[00m:\x1b[01;34m~\x1b[00m'
stream.feed(raw_bytes.decode())

print('\n'.join(term.display))

运行结果:

我们这里使用 term.display 直接读出字符串,丢失了颜色信息。想获取前景色、背景色、是否加粗等信息,可以使用 term.buffer

print(term.buffer[0][0])
# Char(data=' ', fg='default', bg='default', bold=False, italics=False, underscore=False, strikethrough=False, reverse=False)

我们可以从 term.buffer 生成带颜色标签的 html,代码如下:

def ansi_raw_to_html(s):
    screen = pyte.Screen(n_cols, n_rows)
    stream = pyte.Stream(screen)

    if type(s) == bytes:
        s = s.decode("utf8")
    stream.feed(s)

    buf = ""

    def get_classes(c):
        classes = []
        for key in [
            "fg",
            "bg",
            "bold",
            "italics",
            "underscore",
            "strikethrough",
            "reverse",
        ]:
            val = getattr(c, key)

            match val:
                case str():
                    if val != "default":
                        classes.append(f"{key}-{val}")
                case bool():
                    if val:
                        classes.append(key)

        return " ".join(classes)

    for x in range(n_rows):
        t = []

        for y in range(n_cols):
            a = screen.buffer[x][y]

            cs = get_classes(a)
            t.append((cs, html.escape(a.data)))

        buf += '<div class="term-line">'

        for cs, chars in itertools.groupby(t, key=lambda z: z[0]):
            buf += '<code class="{}">{}</code>'.format(
                cs, "".join(map(lambda z: z[1], chars))
            )

        buf += "</div>"

    empty_line = f'<div class="term-line"><code class="">{" " * n_cols}</code></div>'
    while buf.startswith(empty_line):
        buf = buf[len(empty_line) :]
    while buf.endswith(empty_line):
        buf = buf[: -len(empty_line)]

    buf = '<pre class="term-block">' + buf + "</pre>"

    r = jinja2.Template(
        open(os.path.join("share", "base.html"), encoding="utf8").read()
    ).render(body=buf)

    with open(os.path.join("tmp", "term.html"), "w", encoding="utf8") as f:
        f.write(r)
    return buf

模板文件 base.html 如下:

<!DOCTYPE html>
<html lang="en">
<head>
    <style>
        body{
            font-family: 'Sarasa Fixed SC';
        }

        pre, code{
            font-family: 'Sarasa Fixed SC';
            font-size: 14px;
        }

        .term-block{
            background-color: #eeeeee;
            margin: 0 0 4px 0;
            padding: 2px;
            border: 1px solid #dddddd;
        }

        .fg-black{color: black;}
        .fg-red{color: rgba(255, 0, 0, 70%);}
        .fg-green{color: green;}
        .fg-brown{color: brown;}
        .fg-blue{color: blue;}
        .fg-magenta{color: magenta;}
        .fg-cyan{color: hsl(198, 99%, 37%);}
        .fg-white{color: gray;}
        .fg-default{color: black;}

        .bg-black{background-color: black;}
        .bg-red{background-color: red;}
        .bg-green{background-color: hsl(0.3turn 60% 45% / .7);}
        .bg-brown{background-color: brown;}
        .bg-blue{background-color: blue;}
        .bg-magenta{background-color: magenta;}
        .bg-cyan{background-color: hsl(198, 99%, 90%);}
        .bg-white{background-color: gray;}
        .bg-default{background-color: white;}

    </style>
</head>
<body>
{% block body %}
{{body}}
{% endblock %}
</body>
</html>

效果:

使用 wkhtmltopdf 生成 PDF 文件:

wkhtmltopdf --encoding utf8 --page-width 260mm --page-height 4500mm report.html out.pdf

至此,我们已经实现“通过 ssh 自动发送指令,并渲染出带颜色的 html”。ssh 巡检的自动化已经基本完成,接下来考虑浏览器的自动化。

0x03 基础工具:利用 selenium 实现浏览器自动化

本站四年前的文章已经使用过 selenium,今次继续用它实现浏览器巡检。写代码的过程没有多少好说,不过有几个坑点需要注意:

不宜使用 sleep(10) 等待页面加载。影响页面加载速度的因素非常多,例如电脑的性能、网速、有否静态文件缓存等。强行指定等待时间,很容易在页面未加载完成的情况下便进行下一步操作;若等待时间设置过长,则造成浪费。合理的方案是轮询某个条件,例如:

def wait_for_web(b):
    time.sleep(2)
    while "Loading" in b.page_source:
        time.sleep(1)
    time.sleep(2)

若页面上有 iframe,则 b.find_element 无法直接定位到 iframe 内部的 DOM。需要先切换到特定的 iframe:

iframe = b.find_element(
    By.CSS_SELECTOR,
    "iframe[src='/page/aloha/oe']",
)
b.switch_to.frame(iframe)

新版 selenium 比起当年的版本,引入了 Selenium Manager 工具,以管理各种 driver。这个工具对局域网环境极不友好,我们有必要禁用之,转而使用自己提供的 driver。写法如下:

os.environ["SE_OFFLINE"] = "true"

gecko_path = os.environ['GECKODRIVER_PATH']

arg = {}

if gecko_path:
    arg['executable_path'] = gecko_path
         
service = webdriver.FirefoxService(**arg)
self.browser = webdriver.Firefox(service=service)
💡
pexpect 仅支持 Linux,但笔者是在 Windows 下使用 selenium 的。要解决这个问题,大致有三种方案:
- 在 wsl 里面跑 ssh 巡检,在 Windows 跑浏览器巡检,需要代码做好隔离,不要产生 import 错误
- 使用带桌面的 Linux 系统
- 高版本的 wsl 2 可以提供图形界面,见微软的文档 (这是笔者最后采取的方案)

0x04 敏捷开发

笔者一边研究 pexpect 等工具的使用,一边编写了原型系统。项目组织上,将普遍可复用的代码(例如从 ANSI 字节流渲染 html)放进了 util 目录,然后每个系统各自实现一遍巡检业务逻辑。目录结构大致是:

.
├── 1-clean.cmd
├── 2-go-A.cmd
├── 3-go-B.cmd
├── 4-gen-report.cmd
├── autoins_A1-1.py
├── autoins_A1-2.py
├── autoins_A2-1.py
├── autoins_A2-2.py
├── autoins_B1-1.py
├── autoins_B1-2.py
├── autoins_B2-1.py
├── autoins_B2-2.py
├── clean.py
├── gen_A_report.py
├── gen_B_report.py
├── template
│   └── base.html
├── util
│   ├── db.py
│   └── term.py
├── wkhtmltopdf.exe
└── worklog.db

然而,我们存在这样的情况:两台服务器上部署了相同的应用,它们 IP 不一致,密码不一致,而且由于版本不同,可能流程上有细微差别。在原型系统中,笔者先开发了 X 系统的巡检逻辑,然后将其复制出来,修改出 Y 系统的巡检代码。最终,两份代码的大部分都是相同的。这样的做法违反了设计模式(Don't repeat yourself),很不适合维护。我们需要重新设计自动巡检系统。

0x05 新的设计

让我们回想一次 ssh 巡检流程:

  1. 登录 ssh(在不同的业务系统中,IP、端口、密码等参数不同)
  2. 执行 df -h
  3. 执行 htop
  4. 执行其他指令(每个业务系统有各自的特殊指令要执行)

而一次典型的浏览器巡检流程如下:

  1. 登录(各个系统登录逻辑不同),截图
  2. 进入 A 页面,截图
  3. 进入 B 页面,执行某操作,截图

我们可以发现,巡检可以拆分成不同的步骤(Step),例如“执行 df -h”就是一个步骤。步骤的序列构成了巡检过程,我们只需要拼接各种步骤,形成步骤链(WorkChain),就可以完全描述一场巡检。而步骤的可复用性很高,以“执行 df -h”这个步骤为例,我们只需要向 ssh 进程发送一句 df -h,然后等待出现 $ 符号或 # 符号(这代表着 df 已经执行完毕,shell 发回了形如 neko@lab:~$ 的字符串,等待下一条指令)。我们只需要实现一次 df -h 步骤,就能在所有的 ssh 巡检中使用之。更进一步地,我们可以实现通用的“执行终端指令”步骤,从这个通用步骤派生出 df -huname -a 等各种详细步骤。

于是,我们应该会有这样的代码:

class StepCommand(Step):
    def __init__(self, cmd, shell_prompt):
        # ...

    def exec(self):
        # ...

class StepLogin(Step):
    # ...

class StepDf(StepCommand):
    def __init__(self, shell_prompt):
        super().__init__('df -h', shell_prompt)

class StepUname(StepCommand):
    def __init__(self, shell_prompt):
        super().__init__('uname -a', shell_prompt)


c = WorkChain("测试链")
c.step(StepLogin('$'))
c.step(StepDf('$'))
c.step(StepUname('$'))
c.work()

这份代码有两个问题。首先,StepCommandexec 方法是需要有 pexpect 对象的,而这个对象由 StepLogin 产生。我们不宜将 pexpect 对象放进全局变量,所以应当在 WorkChain 对象中维护一个 context,它是一个 map,专门用于存放与此 WorkChain 相关的各种对象。显然,shell_prompt 也可以放进 context,于是我们就只需在 StepLogin 步骤提供 shell_prompt ,而无需在每次实例化 StepDf 等步骤时都提供一遍。

另一个问题在于,有些步骤打开了资源,我们需要在合适的时机予以关闭。例如,StartBrowser 步骤会打开浏览器,按照目前的设计,我们需要再实现一个 CloseBrowser 步骤,在巡检的最后时刻执行,以关掉浏览器。我们分别实现开启、关闭两个步骤是不雅观的,如果我们手上的资源很多(例如在某次巡检中同时打开 3 个浏览器和 2 个 ssh),会造成不便。因此,我们提出 Resource 类,它在巡检之初执行 init() 方法,在巡检结束时执行 close() 方法。

最终,我们一套巡检逻辑的代码如下:

c = WorkChain("ssh测试链")
c.resource(
    "ssh",
    RemoteTerminal(["user@host"], os.environ["PWD_TEST"],shell_prompt="$")
)
# 此后可以通过 c.res['ssh'] 访问到 RemoteTerminal 对象

c.step(StepCommand("cat /etc/issue"))
c.step(StepCommand("ps aux"))
c.step(StepCommand("screenfetch"))
c.step(StepHtop())

c.work()
💡
调试时,可以使用 IPython.embed() 进入 IPython 交互环境,这对于 selenium 非常有用。