0x00 写在开头

本站的上一篇文章介绍了 MCP 协议的一些基本概念,如通讯模型、Resource Tool 等。然而,有些细节问题尚未厘清:

  • 我们不知道握手过程究竟是怎样进行的。inspector 界面中看不到握手包。
  • stdio 通讯很简单,双方互发 json rpc 报文即可;但 sse 与 stdio 的区别很大,sse 并非双工协议,客户端不能在 sse 信道上给服务器回传数据。我们需要细看代码,观察 sse 模式下客户端如何与服务端交互。
  • 在该文中,我们使用的是 Python sdk,“工具描述”是写在注释中的,sdk 借助 Pydantic 库将其取出;langchain 获取工具描述的方法与之类似。在其他语言中,我们大概需要自行提供工具描述,因此有必要看一眼相关逻辑。

本文中,我们将阅读 Kotlin sdk。选择 Kotlin 是因为个人原因:笔者参加工作之后,有很多代码要阅读,它们出自不同开发者之手,语言各异,以 Java 为多。笔者自己常写的语言是 C、Python、Golang,均非日常工作中频繁接触的语言。读本科时,笔者在《软件构造》课程中学习过一点 Java,现在已经忘了大半,故现在借阅读 MCP sdk 的机会,一边复习 Java,一边学习 Kotlin,可谓一举三得。

MCP Kotlin sdk 最早是由 Jetbrains 开发的,发布于 2024 年 12 月,当时的仓库在 JetBrains/mcp-kotlin-sdk,它立即成为了 MCP 的官方 Kotlin SDK,后续的开发都在 modelcontextprotocol/kotlin-sdk 仓库进行。本文阅读的版本是三天前发布的 v0.4.0。

sdk 的代码量比较大,在阅读它本身之前,我们不妨先看看示例代码。开发者提供了三个示例:

  • samples/kotlin-mcp-client,一个典型的 LLM 聊天软件,通过 stdio 与 MCP server 通讯
  • samples/kotlin-mcp-server,一个全功能服务端,提供了 prompt、resource、tool 服务,支持 stdio 和 sse 通讯
  • samples/weather-stdio-server,一个 stdio 服务端,提供天气查询 tool

接下来,我们从 client demo 切入。

💡
笔者会一点 Java,但此前完全不会 Kotlin。阅读代码的过程中,如果看到 Kotlin 特性,会以 callout 块(UI 类似于本段话)注明。

0x01 客户端示例

客户端示例的文件结构如下:

.
├── build.gradle.kts
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradle.properties
├── gradlew
├── gradlew.bat
├── README.md
├── settings.gradle.kts
└── src
    └── main
        └── kotlin
            └── io
                └── modelcontextprotocol
                    └── sample
                        └── client
                            ├── main.kt
                            └── MCPClient.kt

此项目使用 gradle 构建,它比 maven 更现代,是 Kotlin 官方推荐的构建方式。各种构建参数都在 build.gradle.kts 内指定。

💡
gradle 配置文件中,通过 mainClass.set("io.modelcontextprotocol.sample.client.MainKt") 指定了程序入口点。

直奔 main.kt,它的代码如下:

fun main(args: Array<String>) = runBlocking {
    if (args.isEmpty()) throw IllegalArgumentException("Usage: java -jar <your_path>/build/libs/kotlin-mcp-client-0.1.0-all.jar <path_to_server_script>")
    val serverPath = args.first()
    val client = MCPClient()
    client.use {
        client.connectToServer(serverPath)
        client.chatLoop()
    }
}
💡
这份代码使用了 runBlocking,即启动一个协程,阻塞当前线程直到协程结束。应用程序的 main 函数中经常使用这个方法来运行 suspend 函数。
💡
Kotlin 有“trailing lambda”语法糖,如果函数的最后一个参数是 lambda 函数,则可以把 lambda 表达式放到参数列表之外,例如 f(a, b, {x -> println(x)}) 可以简写成 f(a, b) {x -> println(x)}。上面的代码中有两处 trailing lambda:一处是 runBlocking,一处是 client.use
💡
这段代码还使用了“单表达式函数”语法糖。如果函数体只有一条表达式,则可以省略花括号,改用等号。
例:fun sum(a: Int, b: Int) = a + b

代码中,.use 的作用是自动关闭资源,有点类似于 Python 中的 with。我们的 MCPClient 类实现了 AutoCloseable,因此支持 use

class MCPClient : AutoCloseable {
    // ...
    
    override fun close() {
        runBlocking {
            mcp.close()
            anthropic.close()
        }
    }
}

我们继续跟进 client.connectToServer(serverPath)

    suspend fun connectToServer(serverScriptPath: String) {
        try {
            // Build the command based on the file extension of the server script
            val command = buildList {
                when (serverScriptPath.substringAfterLast(".")) {
                    "js" -> add("node")
                    "py" -> add(if (System.getProperty("os.name").lowercase().contains("win")) "python" else "python3")
                    "jar" -> addAll(listOf("java", "-jar"))
                    else -> throw IllegalArgumentException("Server script must be a .js, .py or .jar file")
                }
                add(serverScriptPath)
            }

            // Start the server process
            val process = ProcessBuilder(command).start()

            // Setup I/O transport using the process streams
            val transport = StdioClientTransport(
                input = process.inputStream.asSource().buffered(),
                output = process.outputStream.asSink().buffered()
            )

            // Connect the MCP client to the server using the transport
            mcp.connect(transport)

            // Request the list of available tools from the server
            val toolsResult = mcp.listTools()
            tools = toolsResult?.tools?.map { tool ->
                ToolUnion.ofTool(
                    Tool.builder()
                        .name(tool.name)
                        .description(tool.description ?: "")
                        .inputSchema(
                            Tool.InputSchema.builder()
                                .type(JsonValue.from(tool.inputSchema.type))
                                .properties(tool.inputSchema.properties.toJsonValue())
                                .putAdditionalProperty("required", JsonValue.from(tool.inputSchema.required))
                                .build()
                        )
                        .build()
                )
            } ?: emptyList()
            println("Connected to server with tools: ${tools.joinToString(", ") { it.tool().get().name() }}")
        } catch (e: Exception) {
            println("Failed to connect to MCP server: $e")
            throw e
        }
    }

上述代码的作用是拼凑出一行指令(例如 python server.py),启动进程,设置 stdio transport,然后用 MCP 客户端连接之,获取 tool 列表,并保存到 MCPClient.tools 中。

💡
代码中用到了 buildList 列表构建器,比传统的 mutableListOf().apply { ... }.toList() 更简洁。在传入 buildList 的 lambda 函数中,可以使用 add()addAll() 来给 list 添加元素。

从代码中,我们可以看到,transport 与 MCP client 是解耦的,mcp.connect 的参数是 transport: Transport,不关心具体是 StdioClientTransport 还是 SseClientTransport

💡
我们注意到,代码中调用了 process.inputStream.asSource(),但 inputStream 是 java 原生类,它本来并没有 asSource() 这个方法。
这是 Kotlin 特性“扩展函数”import kotlinx.io.asSource 之后,InputStream 被扩展出了 asSource 方法。
扩展函数的例子:fun String.isValidEmail(): Boolean {...} ,然后就可以对 String 对象调用 isValidEmail()
💡
上述代码片段的最后,出现了一些问号,它是“空安全操作符”。例如,我们使用 toolsResult?.tools 时,仅当 toolsResult 不为空的情况下,才会去访问 tools,否则表达式直接返回 null。
?: 是 Elvis 操作符(空合并操作符),表达式 a ?: ba 非空的情况下返回 a,否则返回 b。 

我们跟进一下 mcp.listTools() 方法:

    public suspend fun listTools(
        request: ListToolsRequest = ListToolsRequest(),
        options: RequestOptions? = null,
    ): ListToolsResult? {
        return request<ListToolsResult>(request, options)
    }
💡
上面的代码中,类型标注 RequestOptions?末尾的 ? 表示该对象可为 null。Kotlin 要求显式地处理 null,这与 Guava 的实践类似(见本站早年文章《软件构造:Guava 的使用》)。

Kotlin 支持默认参数,这里 request 的默认参数是 ListToolsRequest(),它会在函数调用时(而不是定义时)求值。跟进:

/**
 * Sent from the client to request a list of tools the server has.
 */
@Serializable
public data class ListToolsRequest(
    override val cursor: Cursor? = null,
    override val _meta: JsonObject = EmptyJsonObject
) : ClientRequest, PaginatedRequest {
    override val method: Method = Method.Defined.ToolsList
}
💡
@Serializable 注解表示这个类可以被序列化,支持 JSON、CBOR、ProtoBuf 等格式。
💡
data class 是数据类,会自动生成 hashCode() 等方法,且 == 逻辑会变成判断各个字段是否相等,而不是判断是否引用相同对象。

ListToolsRequest 实现了 ClientRequestPaginatedRequest 两个接口,并重写 method 属性为 Method.Defined.ToolsList。它的定义如下:

@Serializable(with = RequestMethodSerializer::class)
public sealed interface Method {
    public val value: String

    /**
     * Enum of predefined methods supported by the protocol.
     */
    @Serializable
    public enum class Defined(override val value: String) : Method {
        Initialize("initialize"),
        Ping("ping"),
        ResourcesList("resources/list"),
        ResourcesTemplatesList("resources/templates/list"),
        ResourcesRead("resources/read"),
        ResourcesSubscribe("resources/subscribe"),
        ResourcesUnsubscribe("resources/unsubscribe"),
        PromptsList("prompts/list"),
        PromptsGet("prompts/get"),
        NotificationsCancelled("notifications/cancelled"),
        NotificationsInitialized("notifications/initialized"),
        NotificationsProgress("notifications/progress"),
        NotificationsMessage("notifications/message"),
        NotificationsResourcesUpdated("notifications/resources/updated"),
        NotificationsResourcesListChanged("notifications/resources/list_changed"),
        NotificationsToolsListChanged("notifications/tools/list_changed"),
        NotificationsRootsListChanged("notifications/roots/list_changed"),
        NotificationsPromptsListChanged("notifications/prompts/list_changed"),
        ToolsList("tools/list"),
        ToolsCall("tools/call"),
        LoggingSetLevel("logging/setLevel"),
        SamplingCreateMessage("sampling/createMessage"),
        CompletionComplete("completion/complete"),
        RootsList("roots/list")
    }

    /**
     * Represents a custom method defined by the user.
     */
    @Serializable
    public data class Custom(override val value: String) : Method
}
💡
这个类是一个“密封接口”,它所有可能的实现必须在编译期就能确定。上述代码中提供了 DefinedCustom 两种实现。
💡
代码中这些 enum 的类型是 Defined,它们以各自的字符串调用构造函数。

于是,透过 Method.Defined.ToolsList,我们可以直接获取一个 value"tools/list"Method 对象。所以, ListToolsRequest() 获取的是一条即将发送的信息 {"method": "tools/list"}。而它的发送过程是:


    public suspend fun <T : RequestResult> request(
        request: Request,
        options: RequestOptions? = null,
    ): T {
        LOGGER.trace { "Sending request: ${request.method}" }
        val result = CompletableDeferred<T>()
        val transport = this@Protocol.transport ?: throw Error("Not connected")

        // 严格模式下,在调用 method 之前先检查是否明确支持这个 method
        if (this@Protocol.options?.enforceStrictCapabilities == true) {
            assertCapabilityForMethod(request.method)
        }

        val message = request.toJSON()
        val messageId = message.id

        // 如果提供了 onProgress 回调,则将其注册到 progressHandlers[messageId]
        if (options?.onProgress != null) {
            LOGGER.trace { "Registering progress handler for request id: $messageId" }
            progressHandlers[messageId] = options.onProgress
        }

        // 注册 response handler
        responseHandlers[messageId] = set@{ response, error ->
            // 如果请求有 error,则调用 result.completeExceptionally
            if (error != null) {
                result.completeExceptionally(error)
                return@set
            }

            if (response?.error != null) {
                result.completeExceptionally(IllegalStateException(response.error.toString()))
                return@set
            }

            // 请求正常返回
            try {
                // 完成 result
                @Suppress("UNCHECKED_CAST")
                result.complete(response!!.result as T)
            } catch (error: Throwable) {
                result.completeExceptionally(error)
            }
        }

        // 定义一个 cancel 闭包
        val cancel: suspend (Throwable) -> Unit = { reason: Throwable ->
            // 解除注册 handler
            responseHandlers.remove(messageId)
            progressHandlers.remove(messageId)

            // 向对端发送 cancel 通知
            val notification = CancelledNotification(requestId = messageId, reason = reason.message ?: "Unknown")

            val serialized = JSONRPCNotification(
                notification.method.value,
                params = McpJson.encodeToJsonElement(notification)
            )
            transport.send(serialized)

            result.completeExceptionally(reason)
            Unit   // 这个 Unit 是为了匹配返回类型的,因为 lambda 表达式的最后一个语句会被视为返回值
        }

        val timeout = options?.timeout ?: DEFAULT_REQUEST_TIMEOUT
        try {
            // 尝试发送消息。注意这个 withTimeout 只对 send 限时,不管 result.await()
            withTimeout(timeout) {
                LOGGER.trace { "Sending request message with id: $messageId" }
                this@Protocol.transport?.send(message)
            }
            return result.await()
        } catch (cause: TimeoutCancellationException) {
            // 若超时,则取消
            LOGGER.error { "Request timed out after ${timeout.inWholeMilliseconds}ms: ${request.method}" }
            cancel(
                McpError(
                    ErrorCode.Defined.RequestTimeout.code,
                    "Request timed out",
                    JsonObject(mutableMapOf("timeout" to JsonPrimitive(timeout.inWholeMilliseconds)))
                ),
            )
            result.cancel(cause)
            throw cause
        }
    }
💡
代码开头有一句 val result = CompletableDeferred<T>(),这个对象是用于协程之间同步数据的。用法是:一个协程可以通过 result.await() 等待它“完成”,而调用 result.complete(data)result.completeExceptionally(e) 可以让它“完成”。
💡
this@Protocol.transport 的意思是“本对象的外部类 Protocol 实例中的 transport 属性”,与当前函数作用域下的 transport 相区分。
💡
set@{ response, error -> 是给这个 lambda 函数起名为 set,以便在其中使用 return@set 返回。
💡
response!!.result as T 中, !! 是非空断言操作符,如果 response 为空则抛出 KotlinNullPointerException 异常。 as T 是类型转换,如果不兼容,则抛出 ClassCastException 异常。

上述代码中以 message id 作为 message 的主键,但 message id 是在哪里定义的呢?我们来看:

internal fun Request.toJSON(): JSONRPCRequest {
    return JSONRPCRequest(
        method = method.value,
        params = McpJson.encodeToJsonElement(this),
        jsonrpc = JSONRPC_VERSION,
    )
}

private val REQUEST_MESSAGE_ID: AtomicLong = atomic(0L)

@Serializable
public data class JSONRPCRequest(
    val id: RequestId = RequestId.NumberId(REQUEST_MESSAGE_ID.incrementAndGet()),
    val method: String,
    val params: JsonElement = EmptyJsonObject,
    val jsonrpc: String = JSONRPC_VERSION,
) : JSONRPCMessage

所以,当调用 val message = request.toJSON() 时,会自动创建一个 JSONRPCRequest,其 id 是自增的数字。以上,我们终于分析完了 mcp.listTools() 背后的数据发送逻辑。

然而,我们只看到了注册 response handler,没有看到这个 handler 是在哪里被调用的。寻找调用,我们发现:

// Client 是 Protocol 的子类
public abstract class Protocol(
    @PublishedApi internal val options: ProtocolOptions?,
) {
    // 在 transport 初始化之后,连接到 transport
    public open suspend fun connect(transport: Transport) {
        this.transport = transport
        transport.onClose {
            doClose()
        }

        transport.onError {
            onError(it)
        }

        // onMessage 回调,按消息类型路由到各个 handler
        transport.onMessage { message ->
            when (message) {
                is JSONRPCResponse -> onResponse(message, null)
                is JSONRPCRequest -> onRequest(message)
                is JSONRPCNotification -> onNotification(message)
                is JSONRPCError -> onResponse(null, message)
            }
        }

        return transport.start()
    }
    
    // 这是 JSONRPCResponse 类消息的 handler
    private fun onResponse(response: JSONRPCResponse?, error: JSONRPCError?) {
        val messageId = response?.id
        
        // 通过 id 找到对应 handler
        val handler = responseHandlers[messageId]
        if (handler == null) {
            onError(Error("Received a response for an unknown message ID: ${McpJson.encodeToString(response)}"))
            return
        }

        // 调用并删除 handler
        responseHandlers.remove(messageId)
        progressHandlers.remove(messageId)
        if (response != null) {
            handler(response, null)
        } else {
            check(error != null)
            val error = McpError(
                error.code.code,
                error.message,
                error.data,
            )

            handler(null, error)
        }
    }
    
    // ...
}

因此,在客户端调用 connect() 时,一个消息路由器被注册到了 transport 中。我们来看这个注册函数:

@Suppress("PropertyName")
public abstract class AbstractTransport : Transport {
    protected var _onClose: (() -> Unit) = {}
        private set
    protected var _onError: ((Throwable) -> Unit) = {}
        private set

    // to not skip messages
    private val _onMessageInitialized = CompletableDeferred<Unit>()
    protected var _onMessage: (suspend ((JSONRPCMessage) -> Unit)) = {
        _onMessageInitialized.await()
        _onMessage.invoke(it)
    }
        private set

    // 把新来的 onClose 函数添加到 _onClose 清单
    override fun onClose(block: () -> Unit) {
        val old = _onClose
        _onClose = {
            old()
            block()
        }
    }

    // 添加新的 onError
    override fun onError(block: (Throwable) -> Unit) {
        val old = _onError
        _onError = { e ->
            old(e)
            block(e)
        }
    }

    // 注册 onMessage 回调
    override fun onMessage(block: suspend (JSONRPCMessage) -> Unit) {
        val old: suspend (JSONRPCMessage) -> Unit = when (_onMessageInitialized.isCompleted) {
            true -> _onMessage
            false -> { _ -> }
        }

        _onMessage = { message ->
            old(message)
            block(message)
        }

        _onMessageInitialized.complete(Unit)
    }
}
💡
private set 是 Kotlin 特性,它把属性的 setter 变成 private 的,即只能在当前类内部修改该属性。然而,属性的 getter 仍然是 protected 的,因此该属性可以被子类读取。

上面的代码非常有意思。我们重点关注这几行:

    private val _onMessageInitialized = CompletableDeferred<Unit>()
    protected var _onMessage: (suspend ((JSONRPCMessage) -> Unit)) = {
        _onMessageInitialized.await()
        _onMessage.invoke(it)
    }
        private set

这里默认的 _onMessage 是先等待 _onMessageInitialized 完成,然后调用 _onMessage.invoke(it),其中 it 是一个 JSONRPCMessage。当 _onMessageInitialized 完成之后, _onMessage 会变成一个正式的处理函数,故此时再调用 _onMessage 不会递归到自己(默认处理函数)身上。这份代码以相当巧妙的方式,保证了不丢消息。

另一边,函数 onMessage 负责注册正式的处理函数:

    override fun onMessage(block: suspend (JSONRPCMessage) -> Unit) {
        val old: suspend (JSONRPCMessage) -> Unit = when (_onMessageInitialized.isCompleted) {
            true -> _onMessage
            false -> { _ -> }    // 空函数
        }

        _onMessage = { message ->
            old(message)
            block(message)
        }

        _onMessageInitialized.complete(Unit)
    }

顺便说一句, _onClose_onError_onMessage 都是层层嵌套的闭包,先执行下层的 handler (即 old() ),再执行本层的 handler (即 block() )。这个写法确实很有函数式特色,若换作是 Python,可能多数开发者都会把 handler 放进一个 list,比如 Flask 的 app.before_request_funcs

现在,我们那个消息路由器已经被存到了 transport_onMessage 中。现在来看 transport.start()

public class StdioClientTransport(
    private val input: Source,
    private val output: Sink
) : AbstractTransport() {
    // ...

    override suspend fun start() {
        if (!initialized.compareAndSet(false, true)) {
            error("StdioClientTransport already started!")
        }

        logger.debug { "Starting StdioClientTransport..." }

        val outputStream = output.buffered()

        // 在协程作用域中启动新的协程
        job = scope.launch(CoroutineName("StdioClientTransport.IO#${hashCode()}")) {
            // 启动 read 协程
            val readJob = launch {
                logger.debug { "Read coroutine started." }
                try {
                    input.use {
                        while (isActive) {
                            // 轮询读取
                            val buffer = Buffer()
                            val bytesRead = input.readAtMostTo(buffer, 8192)
                            if (bytesRead == -1L) break
                            if (bytesRead > 0L) {
                                readBuffer.append(buffer.readByteArray())
                                
                                // 处理消息
                                processReadBuffer()
                            }
                        }
                    }
                } catch (e: Exception) {
                    _onError.invoke(e)
                    logger.error(e) { "Error reading from input stream" }
                }
            }

            // 启动 write 协程
            val writeJob = launch {
                logger.debug { "Write coroutine started." }
                try {
                    // 把 sendChannel 传来的每条消息序列化,并写入信道
                    sendChannel.consumeEach { message ->
                        val json = serializeMessage(message)
                        outputStream.writeString(json)
                        outputStream.flush()
                    }
                } catch (e: Throwable) {
                    if (isActive) {
                        _onError.invoke(e)
                        logger.error(e) { "Error writing to output stream" }
                    }
                } finally {
                    output.close()
                }
            }

            // 等待 read 工作结束,然后关闭 write 工作
            readJob.join()
            writeJob.cancelAndJoin()
            _onClose.invoke()
        }
    }
}

这里的 writeJob 很像 golang 的普遍做法:持续读取 channel,把读到的数据送到该去的地方。 readJob 就是轮询从 stdio 读取,把读到的内容加入 buffer,并调用 processReadBuffer() 。跟进:

    private suspend fun processReadBuffer() {
        while (true) {
            // readBuffer.readMessage() 是尝试读取第一行 JSON RPC 消息
            val msg = readBuffer.readMessage() ?: break
            try {
                // _onMessage 现在是那个消息处理路由器
                _onMessage.invoke(msg)
            } catch (e: Throwable) {
                _onError.invoke(e)
                logger.error(e) { "Error processing message." }
            }
        }
    }

于是,我们可以确定, readJob 源源不断地读取消息,送进 buffer;每当 buffer 里面有了完整的 JSON RPC 消息,就会立即调用 _onMessage 处理。

继续看 readBuffer.readMessage() 的实现:

    public fun readMessage(): JSONRPCMessage? {
        if (buffer.exhausted()) return null
        var lfIndex = buffer.indexOf('\n'.code.toByte())
        val line = when (lfIndex) {
            -1L -> return null
            0L -> {
                buffer.skip(1)
                ""
            }

            else -> {
                var skipBytes = 1
                if (buffer[lfIndex - 1] == '\r'.code.toByte()) {
                    lfIndex -= 1
                    skipBytes += 1
                }
                val string = buffer.readString(lfIndex)
                buffer.skip(skipBytes.toLong())
                string
            }
        }
        return deserializeMessage(line)
    }

上述代码是在 buffer 中读取一行,并反序列化成 JSONRPCMessage。由于 JSON 中使用 \n 表示换行,故上述代码逐行读取是可行的。至此,我们理清了 stdio 消息读写的逻辑。

本文开头提了一个小问题:“握手过程究竟是怎样进行的”。这个答案在 Client 类中:

public open class Client(
    private val clientInfo: Implementation,
    options: ClientOptions = ClientOptions(),
) : Protocol(options) {
    // ...

    override suspend fun connect(transport: Transport) {
        // 调用 Protocol 的 connect 函数,即我们上文分析过的注册了消息路由器的那个 connect()
        super.connect(transport)

        try {
            // 发送 InitializeRequest
            val message = InitializeRequest(
                protocolVersion = LATEST_PROTOCOL_VERSION,
                capabilities = capabilities,
                clientInfo = clientInfo
            )
            val result = request<InitializeResult>(message)

            // 观察版本是否兼容
            if (!SUPPORTED_PROTOCOL_VERSIONS.contains(result.protocolVersion)) {
                throw IllegalStateException(
                    "Server's protocol version is not supported: ${result.protocolVersion}"
                )
            }

            // 获取服务端的 capabilities 和 info
            serverCapabilities = result.capabilities
            serverVersion = result.serverInfo  // 这个命名似有问题,serverInfo 不止有 version,还有 name

            // 发出 "notifications/initialized" 通知
            notification(InitializedNotification())
        } catch (error: Throwable) {
            close()
            throw error
        }
    }

所以,事实上握手过程也是走的 JSON RPC 协议,由一个 request、一个 response、一个 notification 组成。知道了这一点之后,我们马上可以测试一下手头的 MCP server:

{"jsonrpc": "2.0", "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo":{"name":"mcp-test", "version": "0.1"}}, "id": 1}

服务端果然返回了 response:

现在,我们已经看完了 client.connectToServer(serverPath),接着继续看 client.chatLoop()

    // Main chat loop for interacting with the user
    suspend fun chatLoop() {
        println("\nMCP Client Started!")
        println("Type your queries or 'quit' to exit.")

        while (true) {
            print("\nQuery: ")
            val message = readLine() ?: break
            if (message.lowercase() == "quit") break
            val response = processQuery(message)
            println("\n$response")
        }
    }

跟进 processQuery

    // Process a user query and return a string response
    suspend fun processQuery(query: String): String {
        // Create an initial message with a user's query
        val messages = mutableListOf(
            MessageParam.builder()    // 这个 builder 是通过 companion object 实现的单例
                .role(MessageParam.Role.USER)
                .content(query)
                .build()
        )

        // Send the query to the Anthropic model and get the response
        val response = anthropic.messages().create(
            messageParamsBuilder
                .messages(messages)
                .tools(tools)    // 把 tools 的定义也发送给 LLM provider
                .build()
        )

        val finalText = mutableListOf<String>()
        response.content().forEach { content ->
            when {
                // Append text outputs from the response
                content.isText() -> finalText.add(content.text().getOrNull()?.text() ?: "")

                // If the response indicates a tool use, process it further
                content.isToolUse() -> {
                    // LLM 返回了一个 tool use
                    val toolName = content.toolUse().get().name()
                    val toolArgs =
                        content.toolUse().get()._input().convert(object : TypeReference<Map<String, JsonValue>>() {})

                    // 执行 tool
                    val result = mcp.callTool(
                        name = toolName,
                        arguments = toolArgs ?: emptyMap()
                    )
                    finalText.add("[Calling tool $toolName with args $toolArgs]")

                    // 把 tool use 结果加入 message
                    messages.add(
                        MessageParam.builder()
                            .role(MessageParam.Role.USER)
                            .content(
                                """
                                        "type": "tool_result",
                                        "tool_name": $toolName,
                                        "result": ${result?.content?.joinToString("\n") { (it as TextContent).text ?: "" }}
                                    """.trimIndent()
                            )
                            .build()
                    )

                    // 再次调用 LLM,注意这里没有提供 tool,所以 LLM 只能返回文本
                    val aiResponse = anthropic.messages().create(
                        messageParamsBuilder
                            .messages(messages)
                            .build()
                    )

                    finalText.add(aiResponse.content().first().text().getOrNull()?.text() ?: "")
                }
            }
        }

        return finalText.joinToString("\n", prefix = "", postfix = "")
    }

可以看到,在这一轮对话中,至多只会执行 LLM 的一个 tool use 请求。另外,这份代码是依赖于 tool use 特性的,不具备函数调用功能的 LLM 无法使用。

0x02 天气服务器示例

程序入口点是 main.kt

package io.modelcontextprotocol.sample.server

fun main() = `run mcp server`()
💡
这个反引号是 Kotlin 特性,允许函数名包含空格等特殊字符。另外, main 是单表达式函数。

跟进 run mcp server 函数:

fun `run mcp server`() {
    val baseUrl = "https://api.weather.gov"

    val httpClient = HttpClient {
        // 连接 api.weather.gov 的 http client,略
    }

    // 创建 MCP server
    val server = Server(
        Implementation(
            name = "weather",
            version = "1.0.0"
        ),
        ServerOptions(
            // 支持 listChanged 功能
            capabilities = ServerCapabilities(tools = ServerCapabilities.Tools(listChanged = true))
        )
    )

    // 添加工具 get_alerts
    server.addTool(
        name = "get_alerts",
        description = """
            Get weather alerts for a US state. Input is Two-letter US state code (e.g. CA, NY)
        """.trimIndent(),
        
        inputSchema = Tool.Input(
            // 构建 json {"state": {"type":"string", "description": "..."}},看起来有些繁琐
            properties = JsonObject(
                mapOf(
                    "state" to JsonObject(
                        mapOf(
                            "type" to JsonPrimitive("string"),
                            "description" to JsonPrimitive("Two-letter US state code (e.g. CA, NY)")
                        )
                    ),
                )
            ),
            required = listOf("state")
        )
    ) { request ->
        val state = request.arguments["state"]?.jsonPrimitive?.content
        if (state == null) {
            // return@addTool 是从 lambda 表达式返回
            return@addTool CallToolResult(
                content = listOf(TextContent("The 'state' parameter is required."))
            )
        }

        val alerts = httpClient.getAlerts(state)

        // 返回值
        CallToolResult(content = alerts.map { TextContent(it) })
    }

    // 添加工具 get_forecast
    server.addTool(
        // ...
    ) { request ->
        // ...
        val forecast = httpClient.getForecast(latitude, longitude)
        CallToolResult(content = forecast.map { TextContent(it) })
    }

    // 创建 transport
    val transport = StdioServerTransport(
        System.`in`.asInput(),
        System.out.asSink().buffered()
    )

    runBlocking {
        server.connect(transport)
        val done = Job()
        
        server.onClose {
            done.complete()
        }
        
        // 等待 server.onClose 把 done 完成
        done.join()
    }
}

可见,服务端对象的构建流程也与客户端类似:transport 与 MCP server 解耦,两者独立初始化,然后把 MCP server 连接到 transport 上。

0x03 sse server(普通模式)

现在来看 kotlin-mcp-server 这个全功能服务端。入口点:

fun main(args: Array<String>) {
    val command = args.firstOrNull() ?: "--sse-server-ktor"
    val port = args.getOrNull(1)?.toIntOrNull() ?: 3001
    when (command) {
        "--stdio" -> runMcpServerUsingStdio()
        "--sse-server-ktor" -> runSseMcpServerUsingKtorPlugin(port)
        "--sse-server" -> runSseMcpServerWithPlainConfiguration(port)
        else -> {
            System.err.println("Unknown command: $command")
        }
    }
}

sdk 提供了两种 sse server 实现。我们先不管 sse,去看一眼熟悉的 stdio 通讯:

fun runMcpServerUsingStdio() {
    val server = configureServer()
    val transport = StdioServerTransport(
        inputStream = System.`in`.asSource().buffered(),
        outputStream = System.out.asSink().buffered()
    )

    runBlocking {
        server.connect(transport)
        val done = Job()
        server.onClose {
            done.complete()
        }
        done.join()
        println("Server closed")
    }
}

由于 in 是 Kotlin 关键字,故上面的代码使用反引号将其包裹。负责初始化 MCP server 的 configureServer() 实现如下:

fun configureServer(): Server {
    // 基本信息,在握手时提供
    val server = Server(
        Implementation(
            name = "mcp-kotlin test server",
            version = "0.1.0"
        ),
        ServerOptions(
            capabilities = ServerCapabilities(
                prompts = ServerCapabilities.Prompts(listChanged = true),
                resources = ServerCapabilities.Resources(subscribe = true, listChanged = true),
                tools = ServerCapabilities.Tools(listChanged = true),
            )
        )
    )

    // prompt 功能
    server.addPrompt(
        name = "Kotlin Developer",
        description = "Develop small kotlin applications",
        arguments = listOf(
            PromptArgument(
                name = "Project Name",
                description = "Project name for the new project",
                required = true
            )
        )
    ) { request ->
        GetPromptResult(
            "Description for ${request.name}",
            messages = listOf(
                PromptMessage(
                    role = Role.user,
                    content = TextContent("Develop a kotlin project named <name>${request.arguments?.get("Project Name")}</name>")
                )
            )
        )
    }

    // tool 功能
    server.addTool(
        name = "kotlin-sdk-tool",
        description = "A test tool",
        inputSchema = Tool.Input()
    ) { request ->
        CallToolResult(
            content = listOf(TextContent("Hello, world!"))
        )
    }

    // resource 功能
    server.addResource(
        uri = "https://search.com/",
        name = "Web Search",
        description = "Web search engine",
        mimeType = "text/html"
    ) { request ->
        ReadResourceResult(
            contents = listOf(
                TextResourceContents("Placeholder content for ${request.uri}", request.uri, "text/html")
            )
        )
    }

    return server
}

上面的 server 实现了 tool、resource、prompt 三大功能,写法与天气服务器类似。我们接下来关注基础的 sse server 实现:

fun runSseMcpServerWithPlainConfiguration(port: Int): Unit = runBlocking {
    val servers = ConcurrentMap<String, Server>()
    println("Starting sse server on port $port. ")
    println("Use inspector to connect to the http://localhost:$port/sse")

    // embeddedServer 是 ktor 提供的 http 服务器
    embeddedServer(CIO, host = "0.0.0.0", port = port) {
        install(SSE)  // ktor 的 sse 插件
        routing {
            sse("/sse") {
                val transport = SseServerTransport("/message", this)
                
                // 创建新的 mcp server 实例,关联到这个 session id
                val server = configureServer()
                servers[transport.sessionId] = server

                // 在 server close 时,清理 servers 表
                server.onClose {
                    println("Server closed")
                    servers.remove(transport.sessionId)
                }

                // 将 mcp server 连接到这个 sse transport
                server.connect(transport)
            }
            post("/message") {
                println("Received Message")
                
                // 根据 sessionId 参数,找到 server 和 transport
                val sessionId: String = call.request.queryParameters["sessionId"]!!
                val transport = servers[sessionId]?.transport as? SseServerTransport
                
                if (transport == null) {
                    call.respond(HttpStatusCode.NotFound, "Session not found")
                    return@post
                }

                // 交由 transport 中指定的 handlePostMessage 回调来处理
                transport.handlePostMessage(call)
            }
        }
    }.start(wait = true)
}

代码中一共有两个接口,一是 /sse 用于建立 transport,二是 /message 用于接收 post 信息。可以发现,前者完全只起到了“建立信道”的作用,然后客户端往 /message 发送请求,驱动 mcp server 干活。

跟进 sse transport:

public class SseServerTransport(
    private val endpoint: String,
    private val session: ServerSSESession,
) : AbstractTransport() {
    private val initialized: AtomicBoolean = atomic(false)

    @OptIn(ExperimentalUuidApi::class)
    public val sessionId: String = Uuid.random().toString()  // session id 是随机 uuid

    // Protocol.connect() 会调用这个 start()
    override suspend fun start() {
        if (!initialized.compareAndSet(false, true)) {
            throw error("SSEServerTransport already started! If using Server class, note that connect() calls start() automatically.")
        }

        // Send the endpoint event
        session.send(
            event = "endpoint",
            data = "${endpoint.encodeURLPath()}?$SESSION_ID_PARAM=${sessionId}",
        )

        try {
            session.coroutineContext.job.join()
        } finally {
            _onClose.invoke()
        }
    }

    /**
     * Handles incoming POST messages.
     *
     * This should be called when a POST request is made to send a message to the server.
     */
    public suspend fun handlePostMessage(call: ApplicationCall) {
        if (!initialized.value) {
            val message = "SSE connection not established"
            call.respondText(message, status = HttpStatusCode.InternalServerError)
            _onError.invoke(IllegalStateException(message))
        }

        // 接收 post body
        val body = try {
            val ct = call.request.contentType()
            if (ct != ContentType.Application.Json) {
                error("Unsupported content-type: $ct")
            }

            call.receiveText()
        } catch (e: Exception) {
            call.respondText("Invalid message: ${e.message}", status = HttpStatusCode.BadRequest)
            _onError.invoke(e)
            return
        }

        try {
            // 调用 handleMessage 处理信息(见下面那个函数)
            handleMessage(body)
        } catch (e: Exception) {
            call.respondText("Error handling message $body: ${e.message}", status = HttpStatusCode.BadRequest)
            return
        }

        // 响应本条 post 请求
        call.respondText("Accepted", status = HttpStatusCode.Accepted)
    }

    /**
     * Handle a client message, regardless of how it arrived.
     * This can be used to inform the server of messages that arrive via a means different from HTTP POST.
     */
    public suspend fun handleMessage(message: String) {
        try {
            // json 解码消息并调用 _onMessage 执行。这个 _onMessage 会被设为那个消息路由器(上文已经分析)
            val parsedMessage = McpJson.decodeFromString<JSONRPCMessage>(message)
            _onMessage.invoke(parsedMessage)
        } catch (e: Exception) {
            _onError.invoke(e)
            throw e
        }
    }

    override suspend fun close() {
        session.close()
        _onClose.invoke()
    }

    override suspend fun send(message: JSONRPCMessage) {
        if (!initialized.value) {
            throw error("Not connected")
        }

        // 发送 sse 消息
        session.send(
            event = "message",
            data = McpJson.encodeToString(message),
        )
    }
}

我们注意到,上面的代码中有两类 sse 消息。连接成功后,通过 endpoint 类消息发送那个 post 端点的 url( /message?sessionId=xxx);以及工作期间,通过 message 类消息发送 mcp 数据。至此,我们已经能解决开头那个“sse 模式下客户端如何往服务端发消息”的问题:客户端请求是通过 post /message 发往服务端的,这个 post 请求会收到一句 "Accepted";而服务端的具体响应则通过 sse 发送到客户端。据此,我们立即可以写一段脚本,来测试 sse 模式服务器:

import asyncio
import httpx
import json
from httpx_sse import aconnect_sse


def get_httpx_client():
    return httpx.AsyncClient(base_url="http://127.0.0.1:3001", timeout=60)


async def sse_listener(queue):
    async with get_httpx_client() as client:
        async with aconnect_sse(client, "GET", "/sse") as event_source:
            async for event in event_source.aiter_sse():
                print(f"Event: {event.event}")
                print(f"Data: {event.data}\n\n")

                if "sessionId" in event.data:
                    post_path = event.data
                    await queue.put(post_path)


async def poster(queue):
    async with get_httpx_client() as client:
        post_path = await queue.get()
        print(f"post path: {post_path}")

        await client.post(
            post_path,
            json={
                "jsonrpc": "2.0",
                "method": "initialize",
                "params": {
                    "protocolVersion": "2024-11-05",
                    "capabilities": {},
                    "clientInfo": {"name": "mcp-test", "version": "0.1"},
                },
                "id": 1,
            },
        )

        await client.post(
            post_path,
            json={
                "jsonrpc": "2.0",
                "method": "tools/list",
                "params": {},
                "id": 2,
            },
        )


async def main():
    queue = asyncio.Queue()

    listener = asyncio.create_task(sse_listener(queue))
    asyncio.create_task(poster(queue))

    await listener


asyncio.run(main())

运行结果:

Event: endpoint
Data: /message?sessionId=cf29d233-d3fd-435a-93d5-9ada6978ff3f


post path: /message?sessionId=cf29d233-d3fd-435a-93d5-9ada6978ff3f
Event: message
Data: {"id":1,"jsonrpc":"2.0","result":{"protocolVersion":"2024-11-05","capabilities":{"experimental":{},"sampling":{},"logging":{},"prompts":{"listChanged":true},"resources":{"subscribe":true,"listChanged":true},"tools":{"listChanged":true}},"serverInfo":{"name":"mcp-kotlin test server","version":"0.1.0"},"_meta":{}}}


Event: message
Data: {"id":2,"jsonrpc":"2.0","result":{"tools":[{"name":"kotlin-sdk-tool","description":"A test tool","inputSchema":{"properties":{},"type":"object"}}],"_meta":{}}}

0x04 sse server(ktor 插件)

最后,我们来观察 ktor 插件模式下的 sse server。

fun runSseMcpServerUsingKtorPlugin(port: Int): Unit = runBlocking {
    println("Starting sse server on port $port")
    println("Use inspector to connect to the http://localhost:$port/sse")

    embeddedServer(CIO, host = "0.0.0.0", port = port) {
        mcp {
            return@mcp configureServer()
        }
    }.start(wait = true)
}

跟进 Application.mcp

@KtorDsl
public fun Application.mcp(block: () -> Server) {
    val transports = ConcurrentMap<String, SseServerTransport>()

    install(SSE)

    routing {
        sse("/sse") {
            mcpSseEndpoint("/message", transports, block)
        }

        post("/message") {
            mcpPostEndpoint(transports)
        }
    }
}

它定义了两个路由,先看 /sse

private suspend fun ServerSSESession.mcpSseEndpoint(
    postEndpoint: String,
    transports: ConcurrentMap<String, SseServerTransport>,
    block: () -> Server,
) {
    val transport =  mcpSseTransport(postEndpoint, transports)

    val server = block()

    server.onClose {
        logger.info { "Server connection closed for sessionId: ${transport.sessionId}" }
        transports.remove(transport.sessionId)
    }

    server.connect(transport)
    logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" }
}


internal fun ServerSSESession.mcpSseTransport(
    postEndpoint: String,
    transports: ConcurrentMap<String, SseServerTransport>,
): SseServerTransport {
    val transport = SseServerTransport(postEndpoint, this)  // 这个函数上文分析过
    transports[transport.sessionId] = transport

    logger.info { "New SSE connection established and stored with sessionId: ${transport.sessionId}" }

    return transport
}

即初始化 SseServerTransport 并连接到它。注意 server.connect(transport) 会调用 transport.start(),而 SseServerTransportstart() 方法中会发送 endpoint 信息,把 post url 告知客户端。这个流程与普通 sse 服务器是一致的。

来看另一个路由 /message

internal suspend fun RoutingContext.mcpPostEndpoint(
    transports: ConcurrentMap<String, SseServerTransport>,
) {
    val sessionId: String = call.request.queryParameters["sessionId"]
        ?: run {
            call.respond(HttpStatusCode.BadRequest, "sessionId query parameter is not provided")
            return
        }

    logger.debug { "Received message for sessionId: $sessionId" }

    val transport = transports[sessionId]
    if (transport == null) {
        logger.warn { "Session not found for sessionId: $sessionId" }
        call.respond(HttpStatusCode.NotFound, "Session not found")
        return
    }

    transport.handlePostMessage(call)
    logger.trace { "Message handled for sessionId: $sessionId" }
}

这份代码也与普通 sse server 相似。所以可以得到结论:ktor 插件模式就是把我们常用的“每个 session id 对应一个 mcp server 实例”这套方案给包装了起来,方便用户使用。

至此,我们已经阅读完了 MCP Kotlin SDK 的全部核心代码,文章开头的三个问题亦已解决。

0xff 总结

握手过程是怎样进行的——客户端发送 "initialize" 请求,从服务端获取 protocolVersion, capabilities, serverInfo 信息,再发送 "notifications/initialized" 通知,于是信道初始化完成。上述通讯全部走 JSON RPC,与普通请求无异。

工具描述是如何提供的——对于服务端,在 server.addTool 时手动提供 namedescriptioninputSchema;对于客户端,通过 mcp.listTools() 获取工具信息,然后发送给支持 tool use 的 LLM。如果 LLM 不支持 tool use,则我们需要自行组装工具描述。

SSE 模式下如何实现双工通讯——客户端先请求 /sse 获得 session id(是一个随机生成的 uuid),后续往服务端发消息都通过 post /message 实现。服务端会按照 session id 把请求路由到特定的 server,而 server 通过早前建立的 sse 信道返回数据。