0x00 写在开头
本站的上一篇文章介绍了 MCP 协议的一些基本概念,如通讯模型、Resource Tool 等。然而,有些细节问题尚未厘清:
- 我们不知道握手过程究竟是怎样进行的。inspector 界面中看不到握手包。
- stdio 通讯很简单,双方互发 json rpc 报文即可;但 sse 与 stdio 的区别很大,sse 并非双工协议,客户端不能在 sse 信道上给服务器回传数据。我们需要细看代码,观察 sse 模式下客户端如何与服务端交互。
- 在该文中,我们使用的是 Python sdk,“工具描述”是写在注释中的,sdk 借助 Pydantic 库将其取出;langchain 获取工具描述的方法与之类似。在其他语言中,我们大概需要自行提供工具描述,因此有必要看一眼相关逻辑。
本文中,我们将阅读 Kotlin sdk。选择 Kotlin 是因为个人原因:笔者参加工作之后,有很多代码要阅读,它们出自不同开发者之手,语言各异,以 Java 为多。笔者自己常写的语言是 C、Python、Golang,均非日常工作中频繁接触的语言。读本科时,笔者在《软件构造》课程中学习过一点 Java,现在已经忘了大半,故现在借阅读 MCP sdk 的机会,一边复习 Java,一边学习 Kotlin,可谓一举三得。
MCP Kotlin sdk 最早是由 Jetbrains 开发的,发布于 2024 年 12 月,当时的仓库在 JetBrains/mcp-kotlin-sdk,它立即成为了 MCP 的官方 Kotlin SDK,后续的开发都在 modelcontextprotocol/kotlin-sdk 仓库进行。本文阅读的版本是三天前发布的 v0.4.0。
sdk 的代码量比较大,在阅读它本身之前,我们不妨先看看示例代码。开发者提供了三个示例:
samples/kotlin-mcp-client
,一个典型的 LLM 聊天软件,通过 stdio 与 MCP server 通讯samples/kotlin-mcp-server
,一个全功能服务端,提供了 prompt、resource、tool 服务,支持 stdio 和 sse 通讯samples/weather-stdio-server
,一个 stdio 服务端,提供天气查询 tool
接下来,我们从 client demo 切入。
0x01 客户端示例
客户端示例的文件结构如下:
.
├── build.gradle.kts
├── gradle
│ └── wrapper
│ ├── gradle-wrapper.jar
│ └── gradle-wrapper.properties
├── gradle.properties
├── gradlew
├── gradlew.bat
├── README.md
├── settings.gradle.kts
└── src
└── main
└── kotlin
└── io
└── modelcontextprotocol
└── sample
└── client
├── main.kt
└── MCPClient.kt
此项目使用 gradle 构建,它比 maven 更现代,是 Kotlin 官方推荐的构建方式。各种构建参数都在 build.gradle.kts
内指定。
mainClass.set("io.modelcontextprotocol.sample.client.MainKt")
指定了程序入口点。直奔 main.kt
,它的代码如下:
fun main(args: Array<String>) = runBlocking {
if (args.isEmpty()) throw IllegalArgumentException("Usage: java -jar <your_path>/build/libs/kotlin-mcp-client-0.1.0-all.jar <path_to_server_script>")
val serverPath = args.first()
val client = MCPClient()
client.use {
client.connectToServer(serverPath)
client.chatLoop()
}
}
runBlocking
,即启动一个协程,阻塞当前线程直到协程结束。应用程序的 main
函数中经常使用这个方法来运行 suspend 函数。f(a, b, {x -> println(x)})
可以简写成 f(a, b) {x -> println(x)}
。上面的代码中有两处 trailing lambda:一处是 runBlocking
,一处是 client.use
。例:
fun sum(a: Int, b: Int) = a + b
代码中,.use
的作用是自动关闭资源,有点类似于 Python 中的 with
。我们的 MCPClient
类实现了 AutoCloseable
,因此支持 use
:
class MCPClient : AutoCloseable {
// ...
override fun close() {
runBlocking {
mcp.close()
anthropic.close()
}
}
}
我们继续跟进 client.connectToServer(serverPath)
:
suspend fun connectToServer(serverScriptPath: String) {
try {
// Build the command based on the file extension of the server script
val command = buildList {
when (serverScriptPath.substringAfterLast(".")) {
"js" -> add("node")
"py" -> add(if (System.getProperty("os.name").lowercase().contains("win")) "python" else "python3")
"jar" -> addAll(listOf("java", "-jar"))
else -> throw IllegalArgumentException("Server script must be a .js, .py or .jar file")
}
add(serverScriptPath)
}
// Start the server process
val process = ProcessBuilder(command).start()
// Setup I/O transport using the process streams
val transport = StdioClientTransport(
input = process.inputStream.asSource().buffered(),
output = process.outputStream.asSink().buffered()
)
// Connect the MCP client to the server using the transport
mcp.connect(transport)
// Request the list of available tools from the server
val toolsResult = mcp.listTools()
tools = toolsResult?.tools?.map { tool ->
ToolUnion.ofTool(
Tool.builder()
.name(tool.name)
.description(tool.description ?: "")
.inputSchema(
Tool.InputSchema.builder()
.type(JsonValue.from(tool.inputSchema.type))
.properties(tool.inputSchema.properties.toJsonValue())
.putAdditionalProperty("required", JsonValue.from(tool.inputSchema.required))
.build()
)
.build()
)
} ?: emptyList()
println("Connected to server with tools: ${tools.joinToString(", ") { it.tool().get().name() }}")
} catch (e: Exception) {
println("Failed to connect to MCP server: $e")
throw e
}
}
上述代码的作用是拼凑出一行指令(例如 python server.py
),启动进程,设置 stdio transport,然后用 MCP 客户端连接之,获取 tool 列表,并保存到 MCPClient.tools
中。
buildList
列表构建器,比传统的 mutableListOf().apply { ... }.toList()
更简洁。在传入 buildList
的 lambda 函数中,可以使用 add()
和 addAll()
来给 list 添加元素。从代码中,我们可以看到,transport 与 MCP client 是解耦的,mcp.connect
的参数是 transport: Transport
,不关心具体是 StdioClientTransport
还是 SseClientTransport
。
process.inputStream.asSource()
,但 inputStream
是 java 原生类,它本来并没有 asSource()
这个方法。这是 Kotlin 特性“扩展函数”:
import kotlinx.io.asSource
之后,InputStream
被扩展出了 asSource
方法。扩展函数的例子:
fun String.isValidEmail(): Boolean {...}
,然后就可以对 String
对象调用 isValidEmail()
。toolsResult?.tools
时,仅当 toolsResult
不为空的情况下,才会去访问 tools
,否则表达式直接返回 null。?:
是 Elvis 操作符(空合并操作符),表达式 a ?: b
在 a
非空的情况下返回 a
,否则返回 b
。 我们跟进一下 mcp.listTools()
方法:
public suspend fun listTools(
request: ListToolsRequest = ListToolsRequest(),
options: RequestOptions? = null,
): ListToolsResult? {
return request<ListToolsResult>(request, options)
}
RequestOptions?
末尾的 ?
表示该对象可为 null。Kotlin 要求显式地处理 null,这与 Guava 的实践类似(见本站早年文章《软件构造:Guava 的使用》)。Kotlin 支持默认参数,这里 request
的默认参数是 ListToolsRequest()
,它会在函数调用时(而不是定义时)求值。跟进:
/**
* Sent from the client to request a list of tools the server has.
*/
@Serializable
public data class ListToolsRequest(
override val cursor: Cursor? = null,
override val _meta: JsonObject = EmptyJsonObject
) : ClientRequest, PaginatedRequest {
override val method: Method = Method.Defined.ToolsList
}
@Serializable
注解表示这个类可以被序列化,支持 JSON、CBOR、ProtoBuf 等格式。data class
是数据类,会自动生成 hashCode()
等方法,且 ==
逻辑会变成判断各个字段是否相等,而不是判断是否引用相同对象。ListToolsRequest
实现了 ClientRequest
和 PaginatedRequest
两个接口,并重写 method
属性为 Method.Defined.ToolsList
。它的定义如下:
@Serializable(with = RequestMethodSerializer::class)
public sealed interface Method {
public val value: String
/**
* Enum of predefined methods supported by the protocol.
*/
@Serializable
public enum class Defined(override val value: String) : Method {
Initialize("initialize"),
Ping("ping"),
ResourcesList("resources/list"),
ResourcesTemplatesList("resources/templates/list"),
ResourcesRead("resources/read"),
ResourcesSubscribe("resources/subscribe"),
ResourcesUnsubscribe("resources/unsubscribe"),
PromptsList("prompts/list"),
PromptsGet("prompts/get"),
NotificationsCancelled("notifications/cancelled"),
NotificationsInitialized("notifications/initialized"),
NotificationsProgress("notifications/progress"),
NotificationsMessage("notifications/message"),
NotificationsResourcesUpdated("notifications/resources/updated"),
NotificationsResourcesListChanged("notifications/resources/list_changed"),
NotificationsToolsListChanged("notifications/tools/list_changed"),
NotificationsRootsListChanged("notifications/roots/list_changed"),
NotificationsPromptsListChanged("notifications/prompts/list_changed"),
ToolsList("tools/list"),
ToolsCall("tools/call"),
LoggingSetLevel("logging/setLevel"),
SamplingCreateMessage("sampling/createMessage"),
CompletionComplete("completion/complete"),
RootsList("roots/list")
}
/**
* Represents a custom method defined by the user.
*/
@Serializable
public data class Custom(override val value: String) : Method
}
Defined
和 Custom
两种实现。Defined
,它们以各自的字符串调用构造函数。于是,透过 Method.Defined.ToolsList
,我们可以直接获取一个 value
为 "tools/list"
的 Method
对象。所以, ListToolsRequest()
获取的是一条即将发送的信息 {"method": "tools/list"}
。而它的发送过程是:
public suspend fun <T : RequestResult> request(
request: Request,
options: RequestOptions? = null,
): T {
LOGGER.trace { "Sending request: ${request.method}" }
val result = CompletableDeferred<T>()
val transport = this@Protocol.transport ?: throw Error("Not connected")
// 严格模式下,在调用 method 之前先检查是否明确支持这个 method
if (this@Protocol.options?.enforceStrictCapabilities == true) {
assertCapabilityForMethod(request.method)
}
val message = request.toJSON()
val messageId = message.id
// 如果提供了 onProgress 回调,则将其注册到 progressHandlers[messageId]
if (options?.onProgress != null) {
LOGGER.trace { "Registering progress handler for request id: $messageId" }
progressHandlers[messageId] = options.onProgress
}
// 注册 response handler
responseHandlers[messageId] = set@{ response, error ->
// 如果请求有 error,则调用 result.completeExceptionally
if (error != null) {
result.completeExceptionally(error)
return@set
}
if (response?.error != null) {
result.completeExceptionally(IllegalStateException(response.error.toString()))
return@set
}
// 请求正常返回
try {
// 完成 result
@Suppress("UNCHECKED_CAST")
result.complete(response!!.result as T)
} catch (error: Throwable) {
result.completeExceptionally(error)
}
}
// 定义一个 cancel 闭包
val cancel: suspend (Throwable) -> Unit = { reason: Throwable ->
// 解除注册 handler
responseHandlers.remove(messageId)
progressHandlers.remove(messageId)
// 向对端发送 cancel 通知
val notification = CancelledNotification(requestId = messageId, reason = reason.message ?: "Unknown")
val serialized = JSONRPCNotification(
notification.method.value,
params = McpJson.encodeToJsonElement(notification)
)
transport.send(serialized)
result.completeExceptionally(reason)
Unit // 这个 Unit 是为了匹配返回类型的,因为 lambda 表达式的最后一个语句会被视为返回值
}
val timeout = options?.timeout ?: DEFAULT_REQUEST_TIMEOUT
try {
// 尝试发送消息。注意这个 withTimeout 只对 send 限时,不管 result.await()
withTimeout(timeout) {
LOGGER.trace { "Sending request message with id: $messageId" }
this@Protocol.transport?.send(message)
}
return result.await()
} catch (cause: TimeoutCancellationException) {
// 若超时,则取消
LOGGER.error { "Request timed out after ${timeout.inWholeMilliseconds}ms: ${request.method}" }
cancel(
McpError(
ErrorCode.Defined.RequestTimeout.code,
"Request timed out",
JsonObject(mutableMapOf("timeout" to JsonPrimitive(timeout.inWholeMilliseconds)))
),
)
result.cancel(cause)
throw cause
}
}
val result = CompletableDeferred<T>()
,这个对象是用于协程之间同步数据的。用法是:一个协程可以通过 result.await()
等待它“完成”,而调用 result.complete(data)
或 result.completeExceptionally(e)
可以让它“完成”。this@Protocol.transport
的意思是“本对象的外部类 Protocol
实例中的 transport
属性”,与当前函数作用域下的 transport
相区分。set@{ response, error ->
是给这个 lambda 函数起名为 set
,以便在其中使用 return@set
返回。response!!.result as T
中, !!
是非空断言操作符,如果 response
为空则抛出 KotlinNullPointerException
异常。 as T
是类型转换,如果不兼容,则抛出 ClassCastException
异常。上述代码中以 message id 作为 message 的主键,但 message id 是在哪里定义的呢?我们来看:
internal fun Request.toJSON(): JSONRPCRequest {
return JSONRPCRequest(
method = method.value,
params = McpJson.encodeToJsonElement(this),
jsonrpc = JSONRPC_VERSION,
)
}
private val REQUEST_MESSAGE_ID: AtomicLong = atomic(0L)
@Serializable
public data class JSONRPCRequest(
val id: RequestId = RequestId.NumberId(REQUEST_MESSAGE_ID.incrementAndGet()),
val method: String,
val params: JsonElement = EmptyJsonObject,
val jsonrpc: String = JSONRPC_VERSION,
) : JSONRPCMessage
所以,当调用 val message = request.toJSON()
时,会自动创建一个 JSONRPCRequest
,其 id
是自增的数字。以上,我们终于分析完了 mcp.listTools()
背后的数据发送逻辑。
然而,我们只看到了注册 response handler
,没有看到这个 handler 是在哪里被调用的。寻找调用,我们发现:
// Client 是 Protocol 的子类
public abstract class Protocol(
@PublishedApi internal val options: ProtocolOptions?,
) {
// 在 transport 初始化之后,连接到 transport
public open suspend fun connect(transport: Transport) {
this.transport = transport
transport.onClose {
doClose()
}
transport.onError {
onError(it)
}
// onMessage 回调,按消息类型路由到各个 handler
transport.onMessage { message ->
when (message) {
is JSONRPCResponse -> onResponse(message, null)
is JSONRPCRequest -> onRequest(message)
is JSONRPCNotification -> onNotification(message)
is JSONRPCError -> onResponse(null, message)
}
}
return transport.start()
}
// 这是 JSONRPCResponse 类消息的 handler
private fun onResponse(response: JSONRPCResponse?, error: JSONRPCError?) {
val messageId = response?.id
// 通过 id 找到对应 handler
val handler = responseHandlers[messageId]
if (handler == null) {
onError(Error("Received a response for an unknown message ID: ${McpJson.encodeToString(response)}"))
return
}
// 调用并删除 handler
responseHandlers.remove(messageId)
progressHandlers.remove(messageId)
if (response != null) {
handler(response, null)
} else {
check(error != null)
val error = McpError(
error.code.code,
error.message,
error.data,
)
handler(null, error)
}
}
// ...
}
因此,在客户端调用 connect()
时,一个消息路由器被注册到了 transport
中。我们来看这个注册函数:
@Suppress("PropertyName")
public abstract class AbstractTransport : Transport {
protected var _onClose: (() -> Unit) = {}
private set
protected var _onError: ((Throwable) -> Unit) = {}
private set
// to not skip messages
private val _onMessageInitialized = CompletableDeferred<Unit>()
protected var _onMessage: (suspend ((JSONRPCMessage) -> Unit)) = {
_onMessageInitialized.await()
_onMessage.invoke(it)
}
private set
// 把新来的 onClose 函数添加到 _onClose 清单
override fun onClose(block: () -> Unit) {
val old = _onClose
_onClose = {
old()
block()
}
}
// 添加新的 onError
override fun onError(block: (Throwable) -> Unit) {
val old = _onError
_onError = { e ->
old(e)
block(e)
}
}
// 注册 onMessage 回调
override fun onMessage(block: suspend (JSONRPCMessage) -> Unit) {
val old: suspend (JSONRPCMessage) -> Unit = when (_onMessageInitialized.isCompleted) {
true -> _onMessage
false -> { _ -> }
}
_onMessage = { message ->
old(message)
block(message)
}
_onMessageInitialized.complete(Unit)
}
}
private set
是 Kotlin 特性,它把属性的 setter 变成 private 的,即只能在当前类内部修改该属性。然而,属性的 getter 仍然是 protected 的,因此该属性可以被子类读取。上面的代码非常有意思。我们重点关注这几行:
private val _onMessageInitialized = CompletableDeferred<Unit>()
protected var _onMessage: (suspend ((JSONRPCMessage) -> Unit)) = {
_onMessageInitialized.await()
_onMessage.invoke(it)
}
private set
这里默认的 _onMessage
是先等待 _onMessageInitialized
完成,然后调用 _onMessage.invoke(it)
,其中 it
是一个 JSONRPCMessage
。当 _onMessageInitialized
完成之后, _onMessage
会变成一个正式的处理函数,故此时再调用 _onMessage
不会递归到自己(默认处理函数)身上。这份代码以相当巧妙的方式,保证了不丢消息。
另一边,函数 onMessage
负责注册正式的处理函数:
override fun onMessage(block: suspend (JSONRPCMessage) -> Unit) {
val old: suspend (JSONRPCMessage) -> Unit = when (_onMessageInitialized.isCompleted) {
true -> _onMessage
false -> { _ -> } // 空函数
}
_onMessage = { message ->
old(message)
block(message)
}
_onMessageInitialized.complete(Unit)
}
顺便说一句, _onClose
、 _onError
、 _onMessage
都是层层嵌套的闭包,先执行下层的 handler (即 old()
),再执行本层的 handler (即 block()
)。这个写法确实很有函数式特色,若换作是 Python,可能多数开发者都会把 handler 放进一个 list,比如 Flask 的 app.before_request_funcs
。
现在,我们那个消息路由器已经被存到了 transport
的 _onMessage
中。现在来看 transport.start()
:
public class StdioClientTransport(
private val input: Source,
private val output: Sink
) : AbstractTransport() {
// ...
override suspend fun start() {
if (!initialized.compareAndSet(false, true)) {
error("StdioClientTransport already started!")
}
logger.debug { "Starting StdioClientTransport..." }
val outputStream = output.buffered()
// 在协程作用域中启动新的协程
job = scope.launch(CoroutineName("StdioClientTransport.IO#${hashCode()}")) {
// 启动 read 协程
val readJob = launch {
logger.debug { "Read coroutine started." }
try {
input.use {
while (isActive) {
// 轮询读取
val buffer = Buffer()
val bytesRead = input.readAtMostTo(buffer, 8192)
if (bytesRead == -1L) break
if (bytesRead > 0L) {
readBuffer.append(buffer.readByteArray())
// 处理消息
processReadBuffer()
}
}
}
} catch (e: Exception) {
_onError.invoke(e)
logger.error(e) { "Error reading from input stream" }
}
}
// 启动 write 协程
val writeJob = launch {
logger.debug { "Write coroutine started." }
try {
// 把 sendChannel 传来的每条消息序列化,并写入信道
sendChannel.consumeEach { message ->
val json = serializeMessage(message)
outputStream.writeString(json)
outputStream.flush()
}
} catch (e: Throwable) {
if (isActive) {
_onError.invoke(e)
logger.error(e) { "Error writing to output stream" }
}
} finally {
output.close()
}
}
// 等待 read 工作结束,然后关闭 write 工作
readJob.join()
writeJob.cancelAndJoin()
_onClose.invoke()
}
}
}
这里的 writeJob
很像 golang 的普遍做法:持续读取 channel,把读到的数据送到该去的地方。 readJob
就是轮询从 stdio 读取,把读到的内容加入 buffer
,并调用 processReadBuffer()
。跟进:
private suspend fun processReadBuffer() {
while (true) {
// readBuffer.readMessage() 是尝试读取第一行 JSON RPC 消息
val msg = readBuffer.readMessage() ?: break
try {
// _onMessage 现在是那个消息处理路由器
_onMessage.invoke(msg)
} catch (e: Throwable) {
_onError.invoke(e)
logger.error(e) { "Error processing message." }
}
}
}
于是,我们可以确定, readJob
源源不断地读取消息,送进 buffer;每当 buffer 里面有了完整的 JSON RPC 消息,就会立即调用 _onMessage
处理。
继续看 readBuffer.readMessage()
的实现:
public fun readMessage(): JSONRPCMessage? {
if (buffer.exhausted()) return null
var lfIndex = buffer.indexOf('\n'.code.toByte())
val line = when (lfIndex) {
-1L -> return null
0L -> {
buffer.skip(1)
""
}
else -> {
var skipBytes = 1
if (buffer[lfIndex - 1] == '\r'.code.toByte()) {
lfIndex -= 1
skipBytes += 1
}
val string = buffer.readString(lfIndex)
buffer.skip(skipBytes.toLong())
string
}
}
return deserializeMessage(line)
}
上述代码是在 buffer 中读取一行,并反序列化成 JSONRPCMessage
。由于 JSON 中使用 \n
表示换行,故上述代码逐行读取是可行的。至此,我们理清了 stdio 消息读写的逻辑。
本文开头提了一个小问题:“握手过程究竟是怎样进行的”。这个答案在 Client
类中:
public open class Client(
private val clientInfo: Implementation,
options: ClientOptions = ClientOptions(),
) : Protocol(options) {
// ...
override suspend fun connect(transport: Transport) {
// 调用 Protocol 的 connect 函数,即我们上文分析过的注册了消息路由器的那个 connect()
super.connect(transport)
try {
// 发送 InitializeRequest
val message = InitializeRequest(
protocolVersion = LATEST_PROTOCOL_VERSION,
capabilities = capabilities,
clientInfo = clientInfo
)
val result = request<InitializeResult>(message)
// 观察版本是否兼容
if (!SUPPORTED_PROTOCOL_VERSIONS.contains(result.protocolVersion)) {
throw IllegalStateException(
"Server's protocol version is not supported: ${result.protocolVersion}"
)
}
// 获取服务端的 capabilities 和 info
serverCapabilities = result.capabilities
serverVersion = result.serverInfo // 这个命名似有问题,serverInfo 不止有 version,还有 name
// 发出 "notifications/initialized" 通知
notification(InitializedNotification())
} catch (error: Throwable) {
close()
throw error
}
}
所以,事实上握手过程也是走的 JSON RPC 协议,由一个 request、一个 response、一个 notification 组成。知道了这一点之后,我们马上可以测试一下手头的 MCP server:
{"jsonrpc": "2.0", "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo":{"name":"mcp-test", "version": "0.1"}}, "id": 1}
服务端果然返回了 response:

现在,我们已经看完了 client.connectToServer(serverPath)
,接着继续看 client.chatLoop()
:
// Main chat loop for interacting with the user
suspend fun chatLoop() {
println("\nMCP Client Started!")
println("Type your queries or 'quit' to exit.")
while (true) {
print("\nQuery: ")
val message = readLine() ?: break
if (message.lowercase() == "quit") break
val response = processQuery(message)
println("\n$response")
}
}
跟进 processQuery
:
// Process a user query and return a string response
suspend fun processQuery(query: String): String {
// Create an initial message with a user's query
val messages = mutableListOf(
MessageParam.builder() // 这个 builder 是通过 companion object 实现的单例
.role(MessageParam.Role.USER)
.content(query)
.build()
)
// Send the query to the Anthropic model and get the response
val response = anthropic.messages().create(
messageParamsBuilder
.messages(messages)
.tools(tools) // 把 tools 的定义也发送给 LLM provider
.build()
)
val finalText = mutableListOf<String>()
response.content().forEach { content ->
when {
// Append text outputs from the response
content.isText() -> finalText.add(content.text().getOrNull()?.text() ?: "")
// If the response indicates a tool use, process it further
content.isToolUse() -> {
// LLM 返回了一个 tool use
val toolName = content.toolUse().get().name()
val toolArgs =
content.toolUse().get()._input().convert(object : TypeReference<Map<String, JsonValue>>() {})
// 执行 tool
val result = mcp.callTool(
name = toolName,
arguments = toolArgs ?: emptyMap()
)
finalText.add("[Calling tool $toolName with args $toolArgs]")
// 把 tool use 结果加入 message
messages.add(
MessageParam.builder()
.role(MessageParam.Role.USER)
.content(
"""
"type": "tool_result",
"tool_name": $toolName,
"result": ${result?.content?.joinToString("\n") { (it as TextContent).text ?: "" }}
""".trimIndent()
)
.build()
)
// 再次调用 LLM,注意这里没有提供 tool,所以 LLM 只能返回文本
val aiResponse = anthropic.messages().create(
messageParamsBuilder
.messages(messages)
.build()
)
finalText.add(aiResponse.content().first().text().getOrNull()?.text() ?: "")
}
}
}
return finalText.joinToString("\n", prefix = "", postfix = "")
}
可以看到,在这一轮对话中,至多只会执行 LLM 的一个 tool use 请求。另外,这份代码是依赖于 tool use 特性的,不具备函数调用功能的 LLM 无法使用。
0x02 天气服务器示例
程序入口点是 main.kt
:
package io.modelcontextprotocol.sample.server
fun main() = `run mcp server`()
main
是单表达式函数。跟进 run mcp server
函数:
fun `run mcp server`() {
val baseUrl = "https://api.weather.gov"
val httpClient = HttpClient {
// 连接 api.weather.gov 的 http client,略
}
// 创建 MCP server
val server = Server(
Implementation(
name = "weather",
version = "1.0.0"
),
ServerOptions(
// 支持 listChanged 功能
capabilities = ServerCapabilities(tools = ServerCapabilities.Tools(listChanged = true))
)
)
// 添加工具 get_alerts
server.addTool(
name = "get_alerts",
description = """
Get weather alerts for a US state. Input is Two-letter US state code (e.g. CA, NY)
""".trimIndent(),
inputSchema = Tool.Input(
// 构建 json {"state": {"type":"string", "description": "..."}},看起来有些繁琐
properties = JsonObject(
mapOf(
"state" to JsonObject(
mapOf(
"type" to JsonPrimitive("string"),
"description" to JsonPrimitive("Two-letter US state code (e.g. CA, NY)")
)
),
)
),
required = listOf("state")
)
) { request ->
val state = request.arguments["state"]?.jsonPrimitive?.content
if (state == null) {
// return@addTool 是从 lambda 表达式返回
return@addTool CallToolResult(
content = listOf(TextContent("The 'state' parameter is required."))
)
}
val alerts = httpClient.getAlerts(state)
// 返回值
CallToolResult(content = alerts.map { TextContent(it) })
}
// 添加工具 get_forecast
server.addTool(
// ...
) { request ->
// ...
val forecast = httpClient.getForecast(latitude, longitude)
CallToolResult(content = forecast.map { TextContent(it) })
}
// 创建 transport
val transport = StdioServerTransport(
System.`in`.asInput(),
System.out.asSink().buffered()
)
runBlocking {
server.connect(transport)
val done = Job()
server.onClose {
done.complete()
}
// 等待 server.onClose 把 done 完成
done.join()
}
}
可见,服务端对象的构建流程也与客户端类似:transport 与 MCP server 解耦,两者独立初始化,然后把 MCP server 连接到 transport 上。
0x03 sse server(普通模式)
现在来看 kotlin-mcp-server 这个全功能服务端。入口点:
fun main(args: Array<String>) {
val command = args.firstOrNull() ?: "--sse-server-ktor"
val port = args.getOrNull(1)?.toIntOrNull() ?: 3001
when (command) {
"--stdio" -> runMcpServerUsingStdio()
"--sse-server-ktor" -> runSseMcpServerUsingKtorPlugin(port)
"--sse-server" -> runSseMcpServerWithPlainConfiguration(port)
else -> {
System.err.println("Unknown command: $command")
}
}
}
sdk 提供了两种 sse server 实现。我们先不管 sse,去看一眼熟悉的 stdio 通讯:
fun runMcpServerUsingStdio() {
val server = configureServer()
val transport = StdioServerTransport(
inputStream = System.`in`.asSource().buffered(),
outputStream = System.out.asSink().buffered()
)
runBlocking {
server.connect(transport)
val done = Job()
server.onClose {
done.complete()
}
done.join()
println("Server closed")
}
}
由于 in
是 Kotlin 关键字,故上面的代码使用反引号将其包裹。负责初始化 MCP server 的 configureServer()
实现如下:
fun configureServer(): Server {
// 基本信息,在握手时提供
val server = Server(
Implementation(
name = "mcp-kotlin test server",
version = "0.1.0"
),
ServerOptions(
capabilities = ServerCapabilities(
prompts = ServerCapabilities.Prompts(listChanged = true),
resources = ServerCapabilities.Resources(subscribe = true, listChanged = true),
tools = ServerCapabilities.Tools(listChanged = true),
)
)
)
// prompt 功能
server.addPrompt(
name = "Kotlin Developer",
description = "Develop small kotlin applications",
arguments = listOf(
PromptArgument(
name = "Project Name",
description = "Project name for the new project",
required = true
)
)
) { request ->
GetPromptResult(
"Description for ${request.name}",
messages = listOf(
PromptMessage(
role = Role.user,
content = TextContent("Develop a kotlin project named <name>${request.arguments?.get("Project Name")}</name>")
)
)
)
}
// tool 功能
server.addTool(
name = "kotlin-sdk-tool",
description = "A test tool",
inputSchema = Tool.Input()
) { request ->
CallToolResult(
content = listOf(TextContent("Hello, world!"))
)
}
// resource 功能
server.addResource(
uri = "https://search.com/",
name = "Web Search",
description = "Web search engine",
mimeType = "text/html"
) { request ->
ReadResourceResult(
contents = listOf(
TextResourceContents("Placeholder content for ${request.uri}", request.uri, "text/html")
)
)
}
return server
}
上面的 server 实现了 tool、resource、prompt 三大功能,写法与天气服务器类似。我们接下来关注基础的 sse server 实现:
fun runSseMcpServerWithPlainConfiguration(port: Int): Unit = runBlocking {
val servers = ConcurrentMap<String, Server>()
println("Starting sse server on port $port. ")
println("Use inspector to connect to the http://localhost:$port/sse")
// embeddedServer 是 ktor 提供的 http 服务器
embeddedServer(CIO, host = "0.0.0.0", port = port) {
install(SSE) // ktor 的 sse 插件
routing {
sse("/sse") {
val transport = SseServerTransport("/message", this)
// 创建新的 mcp server 实例,关联到这个 session id
val server = configureServer()
servers[transport.sessionId] = server
// 在 server close 时,清理 servers 表
server.onClose {
println("Server closed")
servers.remove(transport.sessionId)
}
// 将 mcp server 连接到这个 sse transport
server.connect(transport)
}
post("/message") {
println("Received Message")
// 根据 sessionId 参数,找到 server 和 transport
val sessionId: String = call.request.queryParameters["sessionId"]!!
val transport = servers[sessionId]?.transport as? SseServerTransport
if (transport == null) {
call.respond(HttpStatusCode.NotFound, "Session not found")
return@post
}
// 交由 transport 中指定的 handlePostMessage 回调来处理
transport.handlePostMessage(call)
}
}
}.start(wait = true)
}
代码中一共有两个接口,一是 /sse
用于建立 transport,二是 /message
用于接收 post 信息。可以发现,前者完全只起到了“建立信道”的作用,然后客户端往 /message
发送请求,驱动 mcp server 干活。
跟进 sse transport:
public class SseServerTransport(
private val endpoint: String,
private val session: ServerSSESession,
) : AbstractTransport() {
private val initialized: AtomicBoolean = atomic(false)
@OptIn(ExperimentalUuidApi::class)
public val sessionId: String = Uuid.random().toString() // session id 是随机 uuid
// Protocol.connect() 会调用这个 start()
override suspend fun start() {
if (!initialized.compareAndSet(false, true)) {
throw error("SSEServerTransport already started! If using Server class, note that connect() calls start() automatically.")
}
// Send the endpoint event
session.send(
event = "endpoint",
data = "${endpoint.encodeURLPath()}?$SESSION_ID_PARAM=${sessionId}",
)
try {
session.coroutineContext.job.join()
} finally {
_onClose.invoke()
}
}
/**
* Handles incoming POST messages.
*
* This should be called when a POST request is made to send a message to the server.
*/
public suspend fun handlePostMessage(call: ApplicationCall) {
if (!initialized.value) {
val message = "SSE connection not established"
call.respondText(message, status = HttpStatusCode.InternalServerError)
_onError.invoke(IllegalStateException(message))
}
// 接收 post body
val body = try {
val ct = call.request.contentType()
if (ct != ContentType.Application.Json) {
error("Unsupported content-type: $ct")
}
call.receiveText()
} catch (e: Exception) {
call.respondText("Invalid message: ${e.message}", status = HttpStatusCode.BadRequest)
_onError.invoke(e)
return
}
try {
// 调用 handleMessage 处理信息(见下面那个函数)
handleMessage(body)
} catch (e: Exception) {
call.respondText("Error handling message $body: ${e.message}", status = HttpStatusCode.BadRequest)
return
}
// 响应本条 post 请求
call.respondText("Accepted", status = HttpStatusCode.Accepted)
}
/**
* Handle a client message, regardless of how it arrived.
* This can be used to inform the server of messages that arrive via a means different from HTTP POST.
*/
public suspend fun handleMessage(message: String) {
try {
// json 解码消息并调用 _onMessage 执行。这个 _onMessage 会被设为那个消息路由器(上文已经分析)
val parsedMessage = McpJson.decodeFromString<JSONRPCMessage>(message)
_onMessage.invoke(parsedMessage)
} catch (e: Exception) {
_onError.invoke(e)
throw e
}
}
override suspend fun close() {
session.close()
_onClose.invoke()
}
override suspend fun send(message: JSONRPCMessage) {
if (!initialized.value) {
throw error("Not connected")
}
// 发送 sse 消息
session.send(
event = "message",
data = McpJson.encodeToString(message),
)
}
}
我们注意到,上面的代码中有两类 sse 消息。连接成功后,通过 endpoint
类消息发送那个 post 端点的 url( /message?sessionId=xxx
);以及工作期间,通过 message
类消息发送 mcp 数据。至此,我们已经能解决开头那个“sse 模式下客户端如何往服务端发消息”的问题:客户端请求是通过 post /message
发往服务端的,这个 post 请求会收到一句 "Accepted"
;而服务端的具体响应则通过 sse 发送到客户端。据此,我们立即可以写一段脚本,来测试 sse 模式服务器:
import asyncio
import httpx
import json
from httpx_sse import aconnect_sse
def get_httpx_client():
return httpx.AsyncClient(base_url="http://127.0.0.1:3001", timeout=60)
async def sse_listener(queue):
async with get_httpx_client() as client:
async with aconnect_sse(client, "GET", "/sse") as event_source:
async for event in event_source.aiter_sse():
print(f"Event: {event.event}")
print(f"Data: {event.data}\n\n")
if "sessionId" in event.data:
post_path = event.data
await queue.put(post_path)
async def poster(queue):
async with get_httpx_client() as client:
post_path = await queue.get()
print(f"post path: {post_path}")
await client.post(
post_path,
json={
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "mcp-test", "version": "0.1"},
},
"id": 1,
},
)
await client.post(
post_path,
json={
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": 2,
},
)
async def main():
queue = asyncio.Queue()
listener = asyncio.create_task(sse_listener(queue))
asyncio.create_task(poster(queue))
await listener
asyncio.run(main())
运行结果:
Event: endpoint
Data: /message?sessionId=cf29d233-d3fd-435a-93d5-9ada6978ff3f
post path: /message?sessionId=cf29d233-d3fd-435a-93d5-9ada6978ff3f
Event: message
Data: {"id":1,"jsonrpc":"2.0","result":{"protocolVersion":"2024-11-05","capabilities":{"experimental":{},"sampling":{},"logging":{},"prompts":{"listChanged":true},"resources":{"subscribe":true,"listChanged":true},"tools":{"listChanged":true}},"serverInfo":{"name":"mcp-kotlin test server","version":"0.1.0"},"_meta":{}}}
Event: message
Data: {"id":2,"jsonrpc":"2.0","result":{"tools":[{"name":"kotlin-sdk-tool","description":"A test tool","inputSchema":{"properties":{},"type":"object"}}],"_meta":{}}}
0x04 sse server(ktor 插件)
最后,我们来观察 ktor 插件模式下的 sse server。
fun runSseMcpServerUsingKtorPlugin(port: Int): Unit = runBlocking {
println("Starting sse server on port $port")
println("Use inspector to connect to the http://localhost:$port/sse")
embeddedServer(CIO, host = "0.0.0.0", port = port) {
mcp {
return@mcp configureServer()
}
}.start(wait = true)
}
跟进 Application.mcp
:
@KtorDsl
public fun Application.mcp(block: () -> Server) {
val transports = ConcurrentMap<String, SseServerTransport>()
install(SSE)
routing {
sse("/sse") {
mcpSseEndpoint("/message", transports, block)
}
post("/message") {
mcpPostEndpoint(transports)
}
}
}
它定义了两个路由,先看 /sse
:
private suspend fun ServerSSESession.mcpSseEndpoint(
postEndpoint: String,
transports: ConcurrentMap<String, SseServerTransport>,
block: () -> Server,
) {
val transport = mcpSseTransport(postEndpoint, transports)
val server = block()
server.onClose {
logger.info { "Server connection closed for sessionId: ${transport.sessionId}" }
transports.remove(transport.sessionId)
}
server.connect(transport)
logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" }
}
internal fun ServerSSESession.mcpSseTransport(
postEndpoint: String,
transports: ConcurrentMap<String, SseServerTransport>,
): SseServerTransport {
val transport = SseServerTransport(postEndpoint, this) // 这个函数上文分析过
transports[transport.sessionId] = transport
logger.info { "New SSE connection established and stored with sessionId: ${transport.sessionId}" }
return transport
}
即初始化 SseServerTransport
并连接到它。注意 server.connect(transport)
会调用 transport.start()
,而 SseServerTransport
的 start()
方法中会发送 endpoint 信息,把 post url 告知客户端。这个流程与普通 sse 服务器是一致的。
来看另一个路由 /message
:
internal suspend fun RoutingContext.mcpPostEndpoint(
transports: ConcurrentMap<String, SseServerTransport>,
) {
val sessionId: String = call.request.queryParameters["sessionId"]
?: run {
call.respond(HttpStatusCode.BadRequest, "sessionId query parameter is not provided")
return
}
logger.debug { "Received message for sessionId: $sessionId" }
val transport = transports[sessionId]
if (transport == null) {
logger.warn { "Session not found for sessionId: $sessionId" }
call.respond(HttpStatusCode.NotFound, "Session not found")
return
}
transport.handlePostMessage(call)
logger.trace { "Message handled for sessionId: $sessionId" }
}
这份代码也与普通 sse server 相似。所以可以得到结论:ktor 插件模式就是把我们常用的“每个 session id 对应一个 mcp server 实例”这套方案给包装了起来,方便用户使用。
至此,我们已经阅读完了 MCP Kotlin SDK 的全部核心代码,文章开头的三个问题亦已解决。
0xff 总结
握手过程是怎样进行的——客户端发送 "initialize"
请求,从服务端获取 protocolVersion, capabilities, serverInfo
信息,再发送 "notifications/initialized"
通知,于是信道初始化完成。上述通讯全部走 JSON RPC,与普通请求无异。
工具描述是如何提供的——对于服务端,在 server.addTool
时手动提供 name
、 description
、 inputSchema
;对于客户端,通过 mcp.listTools()
获取工具信息,然后发送给支持 tool use 的 LLM。如果 LLM 不支持 tool use,则我们需要自行组装工具描述。
SSE 模式下如何实现双工通讯——客户端先请求 /sse
获得 session id(是一个随机生成的 uuid),后续往服务端发消息都通过 post /message
实现。服务端会按照 session id 把请求路由到特定的 server,而 server 通过早前建立的 sse 信道返回数据。