目录
本文目标
mcp协议理解 mcp server生产级部署方案
如何支持集群部署 如何和hertz结合。 如何支持header中加k/v Agent如何集成MCP
1 前置聊聊
1.1 JSON-RPC 2.0协议
JSON-RPC 2.0 的核心价值可以概括为:在分布式系统中提供一种极其简单、轻量级、语言中立且可靠的远程过程调用协议。
1、request:一个合法的 JSON-RPC 2.0 请求必须包含以下字段:
1 2 3 4 5 6 |
{ "jsonrpc": "2.0", "method": "methodName", "params": ..., "id": .. } |
2、 response
-
成功的响应结构:
1 2 3 4 5 |
{ "jsonrpc": "2.0", "result": ..., "id": value } |
-
错误响应结构:
1 2 3 4 5 6 7 8 9 |
{ "jsonrpc": "2.0", "error": { "code": ..., "message": ..., "data": ... }, "id": value } |
1.2 泛化调用的业务场景
大家在日常开发工作中,是否遇到过这样的业务场景:团队需要快速搭建一个统一的配置化平台,用以对接和集成公司内各种各样的下游服务?典型的例子包括但不限于:
-
压测平台: 需要便捷地调用无数个业务的 RPC 接口来构造压测流量。
-
调度平台(Job系统): 需要定时、可靠地触发数百个不同的服务方法来完成抓取、计算、清理等任务。
-
低代码平台/工作流引擎: 需要通过“拖拽”方式配置一个节点,就能调用一个具体的服务接口来实现业务逻辑。
-
网关配置后台: 需要动态地接入和管理新的后端服务接口。
针对这类需求,一个典型的实现方案通常包含以下两个核心部分:
-
接口接入:通过配置形式保存接口的元数据信息,例如 PSM、方法名、请求结构与响应结构等;
-
接口执行:定义统一的执行接口
ExecuteAction
,屏蔽不同接口的调用差异,实现通用、可扩展的调用能力。ExcuteAction做的事情就是统一接口调用,包括接口的 request和resposne。-
request1234567891011{"jsonrpc": "2.0","method": "tools/call","params": {"name": "ActionName", // 执行一个接口名称"arguments": {"parma1": ..,"parma2": ..,}}}
-
Response1234{"jsonrpc": "2.0","result": {}// 每个接口返回的json数据。}
-
1.3 没有mcp之前,agent如何把tool调用标准化
在没有MCP时,一个Agent应用,如何快速接入业务的接口呢?可以采用上面泛化调用的方案。
部分大模型不具备function call能力,而且每个模型的functaion call方式不同 ,为了兼容这些场景,我们需要开发一个执行Tool的模块:包含工具识别+执行:

这个方案只是一个agent应用粒度的统一tool调用的方案。MCP协议是从行业角度提出的tool调用的统一方案,而且MCP还推动了更多业务方主动提供MCP tool。
2 MCP协议理解
2.1 介绍
MCP作用就是:将tool调用标准化化。即 request和response 转换成Json-rpc 2.0协议格式。
Model Context Protocol(模型上下文协议)是 Anthropic 在推出的用于 LLM 应用和外部数据源(Resources)或工具(Tools)通信的标准协议,遵循 JSON-RPC 2.0 的基础消息格式。
可以把 MCP 想象成 AI 应用程序的 USB-C 接口,规范了应用程序如何为 LLMs 提供上下文。

采用客户端-服务器架构,包含以下核心组件:
MCP主机(MCP Hosts)
-
发起请求的LLM应用程序
-
可以是IDE、AI工具或其他应用
MCP客户端(MCP Clients)
-
运行在主机程序内部
-
与MCP Server保持1:1连接
-
负责协议通信和请求转发
MCP服务器(MCP Servers)
-
提供工具、资源和提示词服务
-
运行在本地或远程环境
-
支持多种传输协议
资源层(Resources)
-
本地资源:文件、数据库等
-
远程资源:API、云服务等

2.2 四种资源tool/resource/prompt/resourcetemplate
2.2.1 为什么要划分为四种?
这四种类型完美地概括了计算机科学中最基本的交互范式,任何你能想到的客户端-服务器交互,几乎都可以被归类为“执行一个动作”、“获取一些数据”、“获取如何操作的指导”或“发现如何获取数据”。
-
执行(Do) ->
Tool
:“我能为你做什么?” (函数调用) -
读取(Read/Observe) ->
Resource
:“我能让你读/观察什么?” (静态或动态数据) -
指导(Guide) ->
Prompt
:“我能为你提供什么预制提示词?” (可复用的指令模板) -
发现(Discover) ->
ResourceTemplate
:“你需要读取什么,告诉我参数,我帮你生成资源URI?” (参数化资源URI生成器)
2.2.2 每一种资源统一一个reqeust/response
每种资源的request和reponse报文:
1、 执行特定的TOOL
Data 定义了tool返回的的数据通用结构
Reqeust
1 2 3 4 5 6 7 8 9 10 11 12 |
{ "jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": { "name": "test-tool", "arguments": { "parameter-1": "Hello Mcp TOOL" } } } |
Response
1 2 3 4 5 6 7 8 9 10 |
{ "jsonrpc": "2.0", "result": { "content": [{ "type": "text", "text": "Input parameter: Hello MCP TOOL" }] } } |
2、获取特定prompt
Data 定义了message的数据通用结构
Request
1 2 3 4 5 6 7 8 9 10 11 12 |
{ "jsonrpc": "2.0", "id": 1, "method": "prompts/get", "params": { "name": "code_review", "arguments": { "temperature": "0.6", "style":"严肃" } } } |
Response
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
{ "jsonrpc": "2.0", "result": { "description": "A complect with arguments", "messages": [{ "role": "system", "content": { "type": "text", "text": "this is a complextxx.." } }, { "role": "user", "content": { "type": "text", "text": "this is a complextxx.." } } ] } } |
3、读取特定 Resource
定义了resource资源的数据格式
Request
1 2 3 4 5 6 7 8 |
{ "jsonrpc": "2.0", "id": 1, "method": "resources/read", "params": { "uri": "docs://readme123" } } |
Response
1 2 3 4 5 6 7 8 9 10 |
{ "jsonrpc": "2.0", "result": { "contents": [{ "uri": "docs://readme123", "mimeType": "text/markdown", "text": "文章内容" }] } } |
2.3 三种传输协议
-
Stdio:本地调用场景
-
Streamable HTTP
: 远程调用场景
SE不支持 多实例部署StreamableHTTP 传输天然支持多实例部署
-
SSE(HTTP+Server-Sent Events):远程调用场景(已废弃)
2.3.1 SSE Server – 异步双向通信
SSE 介绍
HTTP SSE(Server-Sent Events,服务器发送事件)不支持客户端主动向服务器多次发送请求,其核心设计是单向通信模式。为了实现异步的双向通信模式,mcp引入了/sse和/messge两个endpoint 来实现异步双向传输
SSE通过双端点实现异步双向通信:
-
/sse
端点:建立持久SSE连接用于接收服务器推送,负责服务器到客户端的实时推送(单向流),用于传递响应和通知。 -
/message
端点:用来接受用户请求,,通过协程异步把返回放到SSE的队列中,通过SSE发送出去
搭建SSE Server
使用 github.com/mark3labs/mcp-go 搭建举例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
import ( "github.com/mark3labs/mcp-go/mcp" mcpServer "github.com/mark3labs/mcp-go/server" "me/mcp_serrver/mcp_handler" ) func main() { // 启动McpServer sseServer := newMcpSSEServer() sseServer.Start(":8889") } func newMcpSSEServer() *mcpServer.SSEServer { // Create a new MCP server s := mcpServer.NewMCPServer( "demo", "1.0.0", mcpServer.WithResourceCapabilities(true, true), mcpServer.WithToolCapabilities(true), ) // 2.1 Add remote tool tool := mcp.NewTool( "test-tool", mcp.WithDescription("Test tool"), mcp.WithString("parameter-1", mcp.Description("A string tool parameter")), mcp.WithTitleAnnotation("Test Tool Annotation Title"), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), mcp.WithIdempotentHintAnnotation(true), mcp.WithOpenWorldHintAnnotation(false)) s.AddTool(tool, mcp_handler.HelloHandler) // 2.2Add dynamic resources // . Add prompts // 4. Start the sse server sseServer := mcpServer.NewSSEServer(s, mcpServer.WithBaseURL("http://localhost:8889")) return sseServer } func HelloHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.TextContent{ Type: "text", Text: "Input parameter: " + request.GetArguments()["parameter-1"].(string), }, }, }, nil } |
SSE Server验证
方式1 postman验证
-
STEP1 通过 /sse接口建立 SSE 连接。执行结果会获取到一个/messge?sessionId=xxx
-
STEP2 通过/message访问发送请求。 通过第一步获取到 sessionid信息
操作 | 参数 | ||
tool_list | 通过/message发送一个请求
结果通过sse发送过去 |
||
call_tool | 通过/message发送一个请求
结果通过sse发送过去 出现错误时
|
方式2 通过定义client验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
import ( "context" "encoding/json" "fmt" "github.com/mark3labs/mcp-go/client" "github.com/mark3labs/mcp-go/mcp" "testing" "time" ) // TOOLS func TestListTools(t *testing.T) { c, _ := client.NewSSEMCPClient("http://localhost:8889/sse") defer c.Close() ctx := context.Background() // Start the client if err := c.Start(ctx); err != nil { errorInfo := fmt.Sprintf("Failed to start client: %v", err) panic(errorInfo) } <strong>// Initialize</strong> initRequest := mcp.InitializeRequest{} initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION initRequest.Params.ClientInfo = mcp.Implementation{ Name: "test-client", Version: "1.0.0", } result, err := c.Initialize(ctx, initRequest) if err != nil { errorInfo := fmt.Sprintf("Failed to initialize: %v", err) panic(errorInfo) } fmt.Println("InitializeRequest............") fmt.Printf("init result: %+v", result) // ====== ListTools ======== toolsRequest := mcp.ListToolsRequest{} toolsResult, err := c.ListTools(ctx, toolsRequest) if err != nil { errorInfo := fmt.Sprintf("Failed to list tools: %v", err) panic(errorInfo) } fmt.Println("ListTools............") resultBytes, _ := json.Marshal(toolsResult) logs.CtxInfo(ctx, "tools: %s", string(resultBytes)) // ====== call tool ====== toolName := "GetLarkDocContent" toolCallRequest := mcp.CallToolRequest{} toolCallRequest.Params.Name = toolName toolCallRequest.Params.Arguments = map[string]interface{}{ "doc_id": "RSMLdC83joCEF4xX3docjK6Jn1g", } oneResult, err := c.CallTool(ctx, toolCallRequest) if err != nil { logs.CtxError(ctx, "call err %v", err) } else { resultBytes, _ = json.Marshal(oneResult) logs.CtxInfo(ctx, "tools: %s", string(resultBytes)) } // ===== TestResources ===== // list resourcesList := mcp.ListResourcesRequest{} resourcesResult, err := c.ListResources(ctx, resourcesList) if err != nil { errorInfo := fmt.Sprintf("Failed to list resources: %v", err) panic(errorInfo) } fmt.Println("ListResources............") resultBytes, _ := json.Marshal(resourcesResult) logs.CtxInfo(ctx, "resources list: %s", string(resultBytes)) // call one resource resourceCallRequest := mcp.ReadResourceRequest{} resourceCallRequest.Params.URI = "docs://readme123" resourceResult, _ := c.ReadResource(ctx, resourceCallRequest) logs.CtxInfo(ctx, "one resource : %s", resourceResult) // === TestPrompts ==== // list promptListCallReq := mcp.ListPromptsRequest{} promptsResult, err := c.ListPrompts(ctx, promptListCallReq) if err != nil { logs.CtxError(ctx, "list err %v", err) } resultBytes, _ := json.Marshal(promptsResult) logs.CtxInfo(ctx, "prompts list: %s", string(resultBytes)) promptCall := mcp.GetPromptRequest{} promptCall.Params.Name = "start_role" onepromptResult, _ := c.GetPrompt(ctx, promptCall) resultBytes, _ = json.Marshal(onepromptResult) logs.CtxInfo(ctx, "one prompt: %v", string(resultBytes)) time.Sleep(3 * time.Second) } |
2.3.2 StreamableHttp Server – 同步双向通信
StreamableHttp 介绍
StreamableHTTP实际上实现的是同步双向通信,StreamableHTTP使用单一端点(/mcp)处理请求,支持两种响应模式:
-
标准同步模式:
Content-Type: application/json
,立即返回完整响应 -
流式模式:
Content-Type: text/event-stream
,当需要发送通知时升级为SSE流。

搭建Streamable Http Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
import ( "github.com/mark3labs/mcp-go/mcp" mcpServer "github.com/mark3labs/mcp-go/server" ) func OnlyStartStreamServer() { // 启动McpServer streambleServer := NewMcpStreamServer() streambleServer.Start(":8889") } func NewMcpStreamServer() *mcpServer.StreamableHTTPServer { // Create a new MCP server s := mcpServer.NewMCPServer( "demo", "1.0.0", mcpServer.WithResourceCapabilities(true, true), mcpServer.WithToolCapabilities(true), ) // 支持tool/resource/prompt三种资源 // 2.1 Add remote tool tool := mcp.NewTool( "test-tool", mcp.WithDescription("Test tool"), mcp.WithString("parameter-1", mcp.Description("A string tool parameter")), mcp.WithTitleAnnotation("Test Tool Annotation Title"), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), mcp.WithIdempotentHintAnnotation(true), mcp.WithOpenWorldHintAnnotation(false)) // add tool s.AddTool(tool, HelloHandler) // 2.2 Add dynamic resources // Create a simple resource resource := mcp.NewResource( "docs://readme123", // URI - unique identifier "Project README", // Name - human-readable mcp.WithResourceDescription("Main project documentation"), mcp.WithMIMEType("text/markdown"), ) // Add a static file resource s.AddResource(resource, HandleReadmeFile) // 2.3 Add prompts // Create a simple prompt prompt := mcp.NewPrompt("code_review", mcp.WithPromptDescription("Review code for best practices and issues"), mcp.WithArgument("temperature", mcp.ArgumentDescription("The temperature parameter for generation"), mcp.RequiredArgument(), ), mcp.WithArgument("style", mcp.ArgumentDescription("The style to use for the response"), mcp.RequiredArgument(), ), ) s.AddPrompt(prompt, HandleComplexPrompt) // 4. Start the sse server return mcpServer.NewStreamableHTTPServer(s) } |
EndPoint配置
-
默认StreamableHTTP传输协议使用单一端点来处理所有请求,默认端点路径是
/mcp
。 streamable_http.go:142 -
自定义: 您可以通过
WithEndpointPath()
选项自定义端点路径:
1 2 |
httpServer := server.NewStreamableHTTPServer(mcpServer, server.WithEndpointPath("/xx/mcp"), ) |
Streamable Http Server 验证
下面例子是部署了有状态server(默认是有状态),需要通过init获取mcp-session-id,在线上部署时,可以部署无状态server。
操作 | 参数 |
初始化 | ![]() |
执行tool | ![]() |
Prompt 相关请求 | ![]() |
resources | ![]() |
需要升级 SSE 的核心场景
Demo
-
注册一个streamable tool,如下通过server.SendNotificationToClient 可以把chunk内容发送到streamableHttpSession#notificationChannel, 再通过SSE 把notificationChannel数据发送到客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// stream接口注册 s.AddTool(mcp.NewTool( "streaming-tool", mcp.WithDescription("返回流式响应的示例工具")), StreamHandler) func StreamHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { server := server.ServerFromContext(ctx) for i := 0; i < 10; i++ { _ = server.SendNotificationToClient(ctx, "test/notification", map[string]any{ "value": i, }) time.Sleep(10 * time.Millisecond) } // send final response return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.TextContent{ Type: "text", Text: "done", }, }, }, nil } |
-
通过postman验证
-
通过client来验证。
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
func TestStreamClient(t *testing.T) { c, _ := client.NewStreamableHttpClient("http://localhost:8889/mcp") ctx := context.Background() // 启动客户端 if err := c.Start(ctx); err != nil { log.Fatalf("Failed to start client: %v", err) } // 接受流式内容 c.OnNotification(func(notification mcp.JSONRPCNotification) { // 处理服务器发送的流式通知(如test/notification) if notification.Method == "test/notification" { data, _ := json.Marshal(notification.Params.AdditionalFields) t.Logf("收到流式通知: %s", string(data)) // 打印中间结果 } }) // Initialize initRequest := mcp.InitializeRequest{} initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION initRequest.Params.ClientInfo = mcp.Implementation{ Name: "test-client", Version: "1.0.0", } result, err := c.Initialize(ctx, initRequest) if err != nil { errorInfo := fmt.Sprintf("Failed to initialize: %v", err) panic(errorInfo) } fmt.Println("InitializeRequest............") fmt.Printf("init result: %+v", result) // 执行tool toolCallRequest := mcp.CallToolRequest{} toolCallRequest.Params.Name = "streaming-tool" toolCallRequest.Params.Arguments = map[string]interface{}{} oneResult, err := c.CallTool(ctx, toolCallRequest) if err != nil { logs.CtxError(ctx, "call err %v", err) } else { resultBytes, _ := json.Marshal(oneResult) logs.CtxInfo(ctx, "tools: %s", string(resultBytes)) } time.Sleep(1 * time.Second) } |
输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
## 流式内容 mcp_client_test.go:31: 收到流式通知: {"value":1} mcp_client_test.go:31: 收到流式通知: {"value":2} mcp_client_test.go:31: 收到流式通知: {"value":3} mcp_client_test.go:31: 收到流式通知: {"value":4} mcp_client_test.go:31: 收到流式通知: {"value":5} mcp_client_test.go:31: 收到流式通知: {"value":6} mcp_client_test.go:31: 收到流式通知: {"value":7} mcp_client_test.go:31: 收到流式通知: {"value":8} mcp_client_test.go:31: 收到流式通知: {"value":9} ## 接口内容 tools: {"content":[{"type":"text","text":"done"} |
如何实现
-
【服务端】McpServer,自动升级SSE的代码如下:
-
stream_http server会自动开启一个SSE链接,监听streamableHttpSession#notificationChannel,把notificationChannel中数据通过sse发送到客户端。
-
通过监听notificationChannel来实现SSE自动升级,如果有升级SSE的场景,直接往notificationChannel写入数据就行。由notificationChannel定义,接受请求都是JSONRPCNotification类型的。 所以,SSE都是针对notify类型的。
1 2 3 4 5 6 |
ttype streamableHttpSession struct { sessionID string notificationChannel chan mcp.JSONRPCNotification // server -> client notifications .... } |
-
【客户端】根据response content_type自动识别SSE,如果是sse则将流式结果全部处理完成之后在封装成mcp.ToolCallResul
业务接口是一个SSE 流式接口如何封装成一个Tool
直接注册tool时时候,把流式内容转成一次返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
// 注册tool s.AddTool(mcp.NewTool( "streaming_tool-tool", mcp.WithDescription("返回流式响应的示例工具")), StreamHandler) func StreamHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { // ---- // 处理流式resposne,等全部数据返回之后,拼装一个CallToolResult // ---- return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.TextContent{ Type: "text", Text: "stream chat tool", }, }, }, nil } |
2.3.3 Stdio Server
使用场景
-
命令行工具
-
CLI 实用程序,可被 LLM 调用
-
本地开发和测试 MCP 实现
-
系统管理工具
2. 桌面应用集成
-
IDE 插件和文本编辑器扩展
-
本地生产力工具
-
文件系统浏览器
3. 子进程通信
-
父进程管理 MCP 服务器
-
进程级隔离和安全性
-
单用户场景
搭建Stdio Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
func main() { StartMcpStdioServer() } func StartMcpStdioServer() { // Create a new MCP server s := server.NewMCPServer( "Demo 🚀", "1.0.0", server.WithToolCapabilities(true), server.WithResourceCapabilities(true, true), ) // Add tool tool := mcp.NewTool("hello_world", mcp.WithDescription("Say hello to someone"), mcp.WithString("name", mcp.Required(), mcp.Description("Name of the person to greet"), ), ) // Add tool handler s.AddTool(tool, HelloHandler) // Start the stdio server if err := server.ServeStdio(s); err != nil { fmt.Printf("Server error: %v\n", err) } } |
服务启动之后,验证

客户端使用Stdio Server
不需要启动Stdio server,直接使用 client.NewStdioMCPClient 即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
func TestStdio(t *testing.T) { // 创建 STDIO 客户端 c, err := client.NewStdioMCPClient( "go", nil, "run", "/Users/bytedance/go/src/me/mcp_server/main.go", ) if err != nil { log.Fatal(err) } defer c.Close() // Initialize initRequest := mcp.InitializeRequest{} initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION initRequest.Params.ClientInfo = mcp.Implementation{ Name: "test-client", Version: "1.0.0", } ctx := context.Background() result, err := c.Initialize(ctx, initRequest) if err != nil { errorInfo := fmt.Sprintf("Failed to initialize: %v", err) fmt.Printf(errorInfo) } fmt.Println("InitializeRequest............") fmt.Printf("init result: %+v", result) // ListTools toolsRequest := mcp.ListToolsRequest{} toolsResult, err := c.ListTools(ctx, toolsRequest) if err != nil { errorInfo := fmt.Sprintf("Failed to list tools: %v", err) panic(errorInfo) } fmt.Println("ListTools............") resultBytes, _ := json.Marshal(toolsResult) logs.CtxInfo(ctx, "tools: %s", string(resultBytes)) } |
3 MCP Server生产级部署方案
如果要线上可用的话,需要满足如下功能:
MCPServer支持多实例部署 MCPServer集成hertz服务中 McpClient支持多服务器客户端管理,即需要连接到多个不同类型的MCP服务器 Mcp调用过程支持header中添加k/v
3.1 开源工具
使用 github.com/mark3labs/mcp-go,如果SDK有问题,可以直接https://deepwiki.com/mark3labs/mcp-go 这里问答吧go get github.com/mark3labs/mcp-go@latest
3.2 Servers: 多实例部署
最终方案:使用Streamble HTTP 部署 + 无状态模式
-
Streamable HTTP Server VS SSE Server
SSE不支持 多实例部署。🔥Streamable HTTP 传输天然支持多实例部署 。因为:
SSE 是由 /sse 和 /message 两个endpoint组成 Streamabel Http 是由一个/mcp endpoint组成
-
无状态模式是多实例部署的理想选择。因为它消除了会话粘性的需求。客户端可以与任何服务器实例通信,负载均衡器可以自由地将请求分发到不同的实例。这种模式提供了最佳的可扩展性和容错能力。
1 |
<em>// 服务器端使用 WithStateLess(true) 配置</em> httpServer := server.NewStreamableHTTPServer(s, server.WithStateLess(true), <em>// 启用无状态模式 </em> ) |
3.3 Client:多服务器客户端的管理(支持连接到多个不同类型的MCP服务器)
官网没有定义MltiServerClient,而是在 index.mdx:367-419中给出了思路和demo代码。这种方案实现比较简单.
这里需要问题就是在定义tool到时候指定一个serverName。这里有个方案就是在注册tool时,必须保证tool的name是唯一的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
type MultiServerClient struct { clients map[string]client.Clientmutex sync.RWMutex } func NewMultiServerClient() *MultiServerClient { return &MultiServerClient{ clients: make(map[string]client.Client), } } func (msc *MultiServerClient) AddServer(name, address string, clientType string) error { msc.mutex.Lock() defer msc.mutex.Unlock() var c client.Clientvar err errorswitch clientType { case "http": c = client.NewStreamableHttpClient(address) case "sse": c = client.NewSSEMCPClient(address) default: return fmt.Errorf("unsupported client type: %s", clientType) } ctx := context.Background() if err := c.Initialize(ctx); err != nil { return fmt.Errorf("failed to initialize client for %s: %w", name, err) } msc.clients[name] = creturn nil } func (msc *MultiServerClient) CallTool(ctx context.Context, serverName, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error) { msc.mutex.RLock() c, exists := msc.clients[serverName] msc.mutex.RUnlock() if !exists { return nil, fmt.Errorf("server not found: %s", serverName) } return c.CallTool(ctx, mcp.CallToolRequest{ Params: mcp.CallToolRequestParams{ Name: toolName, Arguments: args, }, }) } func (msc *MultiServerClient) GetAllTools(ctx context.Context) (map[string][]mcp.Tool, error) { msc.mutex.RLock() defer msc.mutex.RUnlock() allTools := make(map[string][]mcp.Tool) for serverName, c := range msc.clients { tools, err := c.ListTools(ctx) if err != nil { return nil, fmt.Errorf("failed to get tools from %s: %w", serverName, err) } allTools[serverName] = tools.Tools } return allTools, nil } |
2.4 支持header中添加k/v
直接使用transport.WithHTTPHeaders就可以,不需要修改源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func TestAidenServer(t *testing.T) { c, _ := client.NewStreamableHttpClient("https://xxx/mcp", transport.WithHTTPHeaders(map[string]string{ "key1": "xx1", "key2": "xx2", })) ctx := context.Background() // 启动客户端 if err := c.Start(ctx); err != nil { log.Fatalf("Failed to start client: %v", err) } ..... } |
2.5 集成hertz
使用 hertz最新版本提供的工具类adaptor.HertzHandler 来集成MCP Server。
2.5.1 Streamable Http Server + Hertz
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
"github.com/cloudwego/hertz/pkg/app" "github.com/cloudwego/hertz/pkg/app/server" "github.com/cloudwego/hertz/pkg/common/adaptor" func StartStreamableHttpWithHertz() { h := server.Default(server.WithHostPorts(":8889")) // 启动McpServer=== 在h.Spin之前 streambleServer := newMcpStreamServer() h.POST("/mcp", adaptor.HertzHandler(streambleServer)) h.Spin() } func NewMcpStreamServer() *mcpServer.StreamableHTTPServer { // Create a new MCP server s := mcpServer.NewMCPServer( "demo", "1.0.0", mcpServer.WithResourceCapabilities(true, true), mcpServer.WithToolCapabilities(true), ) // 支持tool/resource/prompt三种资源 // 2.1 Add remote tool tool := mcp.NewTool( "test-tool", mcp.WithDescription("Test tool"), mcp.WithString("parameter-1", mcp.Description("A string tool parameter")), mcp.WithTitleAnnotation("Test Tool Annotation Title"), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), mcp.WithIdempotentHintAnnotation(true), mcp.WithOpenWorldHintAnnotation(false)) // add tool s.AddTool(tool, HelloHandler) // stream接口注册 s.AddTool(mcp.NewTool( "streaming_tool-tool", mcp.WithDescription("返回流式响应的示例工具")), StreamHandler) // 2.2 Add dynamic resources // Create a simple resource resource := mcp.NewResource( "docs://readme123", // URI - unique identifier "Project README", // Name - human-readable mcp.WithResourceDescription("Main project documentation"), mcp.WithMIMEType("text/markdown"), ) // Add a static file resource s.AddResource(resource, HandleReadmeFile) // 2.3 Add prompts // Create a simple prompt prompt := mcp.NewPrompt("code_review", mcp.WithPromptDescription("Review code for best practices and issues"), mcp.WithArgument("temperature", mcp.ArgumentDescription("The temperature parameter for generation"), mcp.RequiredArgument(), ), mcp.WithArgument("style", mcp.ArgumentDescription("The style to use for the response"), mcp.RequiredArgument(), ), ) s.AddPrompt(prompt, HandleComplexPrompt) // 4. Start the sse server // 启用无状态模式 return mcpServer.NewStreamableHTTPServer(s, mcpServer.WithStateLess(true)) } |
2.5.2 SSE Server + Hertz (不建议再构建SSE mcp server)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
func StartSSEWithHertz() { h := server.Default(server.WithHostPorts(":8889")) // 启动McpServer sseServer := NewMcpSSEServer() h.GET("/sse", adaptor.HertzHandler(sseServer)) h.POST("/message", adaptor.HertzHandler(sseServer)) h.Spin() } func NewMcpSSEServer() *mcpServer.SSEServer { // Create a new MCP server s := mcpServer.NewMCPServer( "demo", "1.0.0", mcpServer.WithResourceCapabilities(true, true), mcpServer.WithToolCapabilities(true), ) // 支持tool/resource/prompt三种资源 // 2.1 Add remote tool tool := mcp.NewTool( "test-tool", mcp.WithDescription("Test tool"), mcp.WithString("parameter-1", mcp.Description("A string tool parameter")), mcp.WithTitleAnnotation("Test Tool Annotation Title"), mcp.WithReadOnlyHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), mcp.WithIdempotentHintAnnotation(true), mcp.WithOpenWorldHintAnnotation(false)) // add tool s.AddTool(tool, HelloHandler) // 2.2 Add dynamic resources // Create a simple resource resource := mcp.NewResource( "docs://readme123", // URI - unique identifier "Project README", // Name - human-readable mcp.WithResourceDescription("Main project documentation"), mcp.WithMIMEType("text/markdown"), ) // Add a static file resource s.AddResource(resource, HandleReadmeFile) // 2.3 Add prompts // Create a simple prompt prompt := mcp.NewPrompt("code_review", mcp.WithPromptDescription("Review code for best practices and issues"), mcp.WithArgument("temperature", mcp.ArgumentDescription("The temperature parameter for generation"), mcp.RequiredArgument(), ), mcp.WithArgument("style", mcp.ArgumentDescription("The style to use for the response"), mcp.RequiredArgument(), ), ) s.AddPrompt(prompt, HandleComplexPrompt) // 4. Start the sse server sseServer := mcpServer.NewSSEServer(s, mcpServer.WithBaseURL("http://localhost:8889")) return sseServer } |
4 Agent集成MCP
-
提示词1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253// 提示词const (// 构建专门用于工具判断的提示词toolDetectionPrompt = `## 任务你是一个AI的助手,可以调用工具解决问题。你需要根据用户的问题、历史对话和工具执行结果,给出完整、友好的回答。如果需要多轮调用工具才能完成任务,请继续调用,直到可以给出最终答案## 可用的工具包括:- calculate: 执行数学乘积{"need_tool": true,"tool_name": "calculate","server_path": "./mcp-server","desc": "执行数学运算(加、减、乘、除)","parameters": {"left_param": "左值","right_param": "右值"}}## LLM输出样式请确保只返回 JSON,不要添加任何其他解释,分为如下两个场景:- 如果需要调用工具,请以以下 JSON 格式回复:{"need_tool": true,"tool_name": "calculate","parameters": {"left_param": "左值","right_param": "右值"}}- 如果不需要调用工具或有最终答案,请以下 JSON 格式回复,其中result表示LLM最终的结果:{"need_tool": false,"result": "xx"}## 工具返回结果遵循JSON-RPC 2.0协议协议比如调用calculate工具返回结果为:{"jsonrpc": "2.0","result": {"content": [{"type": "text","text": "{ \"value\": 2 }"}]}}`)
-
-
执行流程123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141/ 消息角色常量const (RoleSystem = "system" // 系统指令和提示RoleUser = "user" // 用户输入RoleAssistant = "assistant" // AI 助手的回复RoleTool = "tool" // 工具执行结果)// McpAgentExecute 功能包括:根据LLM获取 工具name和参数 ;执行MCP工具; 根据工具返回内容构建Context// userQuery := "请计算 135 乘以 28 等于多少?然后告诉我结果的平方是多少?"func McpAgentExecute(userQuery string) {// 1. 如果需要工具,则调用相应的工具conversationHistory := []doubao.DoubaoMessage{}conversationHistory = append(conversationHistory,doubao.DoubaoMessage{Role: RoleSystem, Content: toolDetectionPrompt},doubao.DoubaoMessage{Role: RoleUser, Content: userQuery},)// 2.设置一个最大循环次数,避免无限制循环maxIterations := 5for iteration := 0; iteration < maxIterations; iteration++ {logs.Info("===== 第%v轮对话历史=======", iteration+1)toolCall, err := callLLM(conversationHistory)if err != nil {log.Printf("工具判断失败: %v", err)break}if toolCall.NeedTool {toolResult, err := callMcpTool(toolCall)if err != nil {log.Printf("调用工具失败: %v", err)break} else {log.Printf("工具执行结果: %v", toolResult)// 将工具结果添加到对话历史. 这里遇到一个问题,如果设置role为tool,LLM返回就会报错了conversationHistory = append(conversationHistory,doubao.DoubaoMessage{Role: RoleUser, Content: "工具执行结果: " + toolResult},)}} else {logs.Info("最终结果: %s", toolCall.Result)break}}logs.Info("===== 最终对话历史=======")time.Sleep(1 * time.Second)}// callLLM 调用LLM: 识别工具func callLLM(conversationHistory []doubao.DoubaoMessage) (ToolCall, error) {// 调用豆包 APIresponse, err := doubao.ChatSync(context.Background(), conversationHistory)// 尝试解析 JSONtoolCall := ToolCall{}if err != nil {toolCall.NeedTool = falsereturn toolCall, fmt.Errorf("调用豆包 API 失败: %v", err)}if len(response.Choices) == 0 {toolCall.NeedTool = falsereturn toolCall, errors.New("response.Choices为空")}// 解析模型回复reply := response.Choices[0].Message.Contentif err := json.Unmarshal([]byte(reply), &toolCall); err != nil {// 如果无法解析为 JSON,假设不需要工具log.Printf("无法解析模型回复为 JSON: %v, 回复内容: %s", err, reply)toolCall.NeedTool = falsereturn toolCall, errors.New("无法解析模型回复")}return toolCall, nil}func callMcpTool(toolCall ToolCall) (string, error) {leftParam := toolCall.Parameters["left_param"].(string)rightParam := toolCall.Parameters["right_param"].(string)left, _ := strconv.ParseInt(leftParam, 10, 64)right, _ := strconv.ParseInt(rightParam, 10, 64)value := left * rightvaluleResult := map[string]interface{}{"value": value,}valueStr, _ := json.Marshal(valuleResult)cItem := []ContentItem{{Type: "text",Text: string(valueStr),},}toolResult := &ToolResult{JSONRPC: "2.0",Result: ResponseResult{Content: cItem,},}toolResultStr, _ := json.Marshal(toolResult)return string(toolResultStr), nil}// 工具调用结构type ToolCall struct {NeedTool bool `json:"need_tool"`ToolName string `json:"tool_name"`Description string `json:"description"`Parameters map[string]interface{} `json:"parameters"`ServerPath string `json:"server_path"`Result string `json:"result"`}type ToolResult struct {JSONRPC string `json:"jsonrpc"` // JSON-RPC版本,固定为"2.0"ID int `json:"id"` // 请求ID,与请求中的ID对应Result ResponseResult `json:"result"` // 响应结果}// ResponseResult 表示响应中的result字段type ResponseResult struct {Content []ContentItem `json:"content"` // 内容数组}// ContentItem 表示content数组中的每个元素type ContentItem struct {Type string `json:"type"` // 内容类型,如"text"Text string `json:"text"` // 内容文本,这里是一个JSON字符串}
-
-
调用豆包接口,可以参考方舟平台demo封装下如下1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071// ChatSync 同步接口func ChatSync(ctx context.Context, messages []DoubaoMessage) (*DoubaoResponse, error) {// 定义请求的URLurl := domain + chat_url// 定义请求体requestBody := DoubaoRequest{Model: model_id,Messages: messages,}// 将请求体转换为JSON格式jsonBody, err := json.Marshal(requestBody)if err != nil {logs.CtxError(ctx, "json.Marshal:%v", err)return nil, err}// 创建HTTP请求req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBody))if err != nil {logs.CtxError(ctx, "NewRequest:%v", err)return nil, err}// 设置请求头req.Header.Set("Content-Type", "application/json")req.Header.Set("Authorization", "Bearer "+api_key)// 发送HTTP请求client := &http.Client{}resp, err := client.Do(req)if err != nil {logs.CtxError(ctx, "Response:%v", err)return nil, err}defer resp.Body.Close()// 读取响应doubResponse := &DoubaoResponse{}err = json.NewDecoder(resp.Body).Decode(doubResponse)if err != nil {logs.CtxError(ctx, "Response:%v", err)return nil, err}// 打印响应logs.CtxInfo(ctx, "Response:%v", doubResponse)// 提取响应中的内容return doubResponse, nil}type DoubaoResponse struct {Choices []struct {Message DoubaoMessage `json:"message"`FinishReason string `json:"finish_reason"`} `json:"choices"`}// 定义与豆包 API 交互所需的数据结构type DoubaoMessage struct {Role string `json:"role"`Content string `json:"content"`}type DoubaoRequest struct {Model string `json:"model"` // 例如 "doubao-1-5-pro-256k"Messages []DoubaoMessage `json:"messages"`Stream bool `json:"stream"`}
-
-
测试:
-
-
-
1 2 3 4 5 |
func main() { mcp_agent.McpAgentExecute("7 * 8 =?") } |