機器學習|MCP(Model Context Protocol)實戰
最近 MCP 這么火,了解了一段時間也該寫篇總結,那就開始吧。
1. 什么是 MCP
MCP(Model Context Protocol,模型上下文協議) ,2024年11月底,由 Anthropic 推出的一種開放標準,旨在統一大型語言模型(LLM)與外部數據源和工具之間的通信協議。官網的介紹: https://modelcontextprotocol.io/introduction
MCP 包括幾個核心功能:
- Resources 是允許服務器公開可由客戶端讀取并用作 LLM 交互上下文的數據和內容,包括文件內容,數據庫,API,圖片等;
- Prompts 方便定義的 Prompt 模板,支持動態參數等;
- Tools 類似 function call;
- Sampling 主要是在完成某個事項前的中間代理,保護數據隱私;
- Roots 根目錄,它定義了服務器可以運行的邊界,它們為客戶端提供了一種方式,可以告知服務器相關資源及其位置;
- Transports MCP 使用JSON-RPC 2.0 作為其傳輸格式,Transports 負責將 MCP 協議消息轉換為 JSON-RPC 格式進行傳輸,并將接收到的 JSON-RPC 消息轉換回 MCP 協議消息,目前支持兩種協議:stdio(標準輸入輸出),SSE(服務端發送協議);
2. 開發 MCP Server
假設我們提供 web 搜索功能,那么怎么通過 MCP 對接到大模型上呢?通過開發 MCP Server,于是我基于 duckduckgo 提供了文本,圖片和視頻搜索的 API,參考如下:
class DuckDuckGoSearch:
"""DuckDuckGo 搜索功能封裝"""
def __init__(self):
self.ddgs = DDGS()
def search(self, keywords: str, max_results: int = 10, safesearch: str = 'Off',
timelimit: str = 'y') -> Dict[str, List[Dict[str, Any]]]:
"""通用文本搜索
Args:
keywords: 搜索關鍵詞
max_results: 最大結果數量
safesearch: 安全搜索選項 ('On' or 'Off')
timelimit: 時間限制 ('d', 'w', 'm', 'y')
Returns:
包含搜索結果的字典
"""
try:
results = []
ddgs_gen = self.ddgs.text(
keywords,
safesearch=safesearch,
timelimit=timelimit,
backend="lite"
)
for r in islice(ddgs_gen, max_results):
results.append(r)
return {'results': results}
except Exception as e:
return {'results': [], 'error': str(e)}
def search_answers(self, keywords: str, max_results: int = 5) -> Dict[str, List[Dict[str, Any]]]:
"""問答搜索
Args:
keywords: 搜索關鍵詞
max_results: 最大結果數量
Returns:
包含答案的字典
"""
try:
results = []
# 使用 text 方法替代 answers 方法
ddgs_gen = self.ddgs.text(
keywords,
safesearch='Off',
timelimit='y',
backend="lite",
reginotallow='wt-wt'# 使用全球區域
)
for r in islice(ddgs_gen, max_results):
results.append(r)
return {'results': results}
except Exception as e:
return {'results': [], 'error': str(e)}
def search_images(self, keywords: str, max_results: int = 10,
safesearch: str = 'Off') -> Dict[str, List[Dict[str, Any]]]:
"""圖片搜索
Args:
keywords: 搜索關鍵詞
max_results: 最大結果數量
safesearch: 安全搜索選項 ('On' or 'Off')
Returns:
包含圖片信息的字典
"""
try:
results = []
ddgs_gen = self.ddgs.images(
keywords,
safesearch=safesearch,
timelimit=None
)
for r in islice(ddgs_gen, max_results):
results.append(r)
return {'results': results}
except Exception as e:
return {'results': [], 'error': str(e)}
def search_videos(self, keywords: str, max_results: int = 10,
safesearch: str = 'Off', resolution: str = "high") -> Dict[str, List[Dict[str, Any]]]:
"""視頻搜索
Args:
keywords: 搜索關鍵詞
max_results: 最大結果數量
safesearch: 安全搜索選項 ('On' or 'Off')
resolution: 視頻分辨率 ("high" or "standard")
Returns:
包含視頻信息的字典
"""
try:
results = []
ddgs_gen = self.ddgs.videos(
keywords,
safesearch=safesearch,
timelimit=None,
resolutinotallow=resolution
)
for r in islice(ddgs_gen, max_results):
results.append(r)
return {'results': results}
except Exception as e:
return {'results': [], 'error': str(e)}
以上是對于 duckduckgo 封裝,除了提供搜索以外,我們需要按照規范開發 MCP Server,代碼如下:
# 初始化 FastMCP 服務器
app = FastMCP('web-search')
@app.tool()
async def web_search(query: str) -> str:
"""
搜索互聯網內容
Args:
query: 要搜索內容
Returns:
搜索結果的總結
"""
ddg = DuckDuckGoSearch()
return ddg.search(query)
if __name__ == "__main__":
app.run(transport='stdio')
- 創建 FastMCP
- 提供 app.tool,web_search 的接口和文檔信息
- 啟動 FastMCP
最終引入庫如下:
# !pip install duckduckgo-search
# !pip install mcp
from itertools import islice
from typing import List, Dict, Any, Optional
from mcp.server import FastMCP
from duckduckgo_search import DDGS
3. 調試 MCP Server
開發完上述的 MCP Server,通常我們是需要調試功能,使用官方的 Inspector 可視化工具來執行(首先需要安裝 nodejs,確保 npx 命令可以使用),命令如下:
npx -y @modelcontextprotocol/inspector <command> <arg1> <arg2>
按照我上述文件名為 ??mcp_server.py?
??,啟動:??npx -y @modelcontextprotocol/inspector python3.11 mcp_server.py?
?,執行界面如下:
然后打開本地瀏覽器:??http://127.0.0.1:6274?
?,就可以進入調試界面:
4. 開發 MCP Client
上面開發了 MCP Server,那么怎么讓大模型調用 MCP Server 呢?步驟如下:
- 首先將支持本地的 MCP Tools 列表提供給大模型
- 其次約束大模型在回答某一類問題,或者不能獲取知識時讓系統調用 MCP Server
- 最后將 MCP Server 返回的內容提供給大模型總結
代碼如下(注意這里需要通過環境變量配置 OPENAI_API_KEY 和 OPENAI_API_BASE):
import json
import asyncio
import os
from typing import Optional
from contextlib import AsyncExitStack
from openai import OpenAI
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class MCPClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_API_BASE"),
)
self.mode_name = "gpt-4o-mini"
asyncdef connect_to_server(self):
server_params = StdioServerParameters(
# 服務器執行的命令
command='python3.11',
# 運行的參數
args=['mcp_server.py'],
# 環境變量,默認為 None,表示使用當前環境變量
# env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params))
stdio, write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(stdio, write))
await self.session.initialize()
asyncdef process_query(self, query: str) -> str:
system_prompt = (
"You are a helpful assistant."
"You have the function of online search. "
"Please MUST call web_search tool to search the Internet content before answering."
"Please do not lose the user's question information when searching,"
"and try to maintain the completeness of the question content as much as possible."
"When there is a date related question in the user's question,"
"please use the search function directly to search and PROHIBIT inserting specific time."
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
]
# 獲取所有 mcp 服務器 工具列表信息
response = await self.session.list_tools()
# 生成 function call 的描述信息
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}
} for tool in response.tools]
print(f"\n\n ========> Available tools:\n{response}\n")
# 請求 function call 的描述信息通過 tools 參數傳入
response = self.client.chat.completions.create(
model=self.mode_name,
messages=messages,
tools=available_tools,
)
# 處理返回的內容
content = response.choices[0]
if content.finish_reason == "tool_calls":
# 如何是需要使用工具,就解析工具
tool_call = content.message.tool_calls[0]
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 執行工具
result = await self.session.call_tool(tool_name, tool_args)
print(f"\n\nCalling tool [{tool_name}] with args [{tool_args}]\nCalling tool response: [{result}]\n\n")
# 將返回的調用哪個工具數據和工具執行完成后的數據都存入messages中
messages.append(content.message.model_dump())
messages.append({
"role": "tool",
"content": result.content[0].text,
"tool_call_id": tool_call.id,
})
# 將上面的結果再返回給模型用于生產最終的結果
response = self.client.chat.completions.create(
model=self.mode_name,
messages=messages,
)
return response.choices[0].message.content
return content.message.content
asyncdef chat(self):
whileTrue:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
import traceback
traceback.print_exc()
asyncdef cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
asyncdef main():
client = MCPClient()
try:
await client.connect_to_server()
await client.chat()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
5. Sampling
Sampling 是采樣,就是允許服務器通過客戶端請求 LLM 完成,從而實現復雜的代理行為,同時保持安全性和隱私性,通俗的講就是可以確認某個流程是否可以繼續執行,執行順序如下:
- MCP 服務器向 MCP 客戶端發送sampling/createMessage請求
- MCP 客戶端審查該請求,并可以進行修改
- MCP 客戶端從 LLM 中生成一個結果
- MCP 客戶端審查生成的結果
- MCP 客戶端將結果返回給 MCP 服務器
代碼如下:
@app.tool()
asyncdef shell(cmd: str) -> str:
"""
執行 shell 腳本
Args:
cmd: 要執行的 shell 命令
Returns:
獲取返回的結果
"""
# 創建 SamplingMessage 用于觸發 sampling callback 函數
result = await app.get_context().session.create_message(
messages=[
SamplingMessage(
role='user', cnotallow=TextContent(
type='text', text=f'是否可以執行當前命令: {cmd} (Y/N)')
)
],
max_tokens=1024
)
print(f"result.content: {result.content}")
# 獲取到 sampling callback 函數的返回值,并根據返回值進行處理
if result.content.text == 'Y':
print(f'執行命令: {cmd}')
import subprocess
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout
else:
print(f'拒絕執行命令: {cmd}')
returnf'命令執行被拒絕, content: {result.content}'
可以在調試界面中確認是否繼續往下執行:
6. Prompts
MCP 中提供了 Prompts 的功能,通過傳入參數可以自定義 Prompt 模板,主要是方便后續可以動態生成,或者根據輸入邏輯控制 LLM,樣例代碼如下:
@app.prompt("代碼專家")
def ask_review(code_snippet: str) -> str:
return f"Please review the following code snippet for potential bugs and style issues:\n```python\n{code_snippet}\n```"
if __name__ == "__main__":
app.run(transport='stdio')
調試工具中可以直接使用:
7. Resources
MCP 中提供了可以使用的資源列表,允許服務器公開可由客戶端讀取并用作 LLM 交互上下文的數據和內容,其中資源協議格式:??[protocol]://[host]/[path]?
?,比如可以提供文件,數據庫等。
- 文件:file:///home/user/aaa.txt
- 數據庫:postgres://database/customers/schema
- 屏幕:screen://localhost/display1
樣例代碼如下:
@app.resource("db://users/{user_id}/email")
async def get_user_email(user_id: str) -> str:
"""Retrieves the email address for a given user ID."""
# Replace with actual database lookup
emails = {"123": "alice@example.com", "456": "bob@example.com"}
return emails.get(user_id, "not_found@example.com")
調試工具中可以直接使用:
8. 生命周期
MCP Server 本身是沒有生命周期,但是 FastMCP 為了能結合業務本身的邏輯,提供了生命周期的控制,分別是:初始化,交互通信中,服務被關閉,那么在代碼中怎么控制呢?
@dataclass
class AppContext:
histories: dict
def __init__(self, histories: dict):
self.histories = histories
print(f"初始化 AppContext: {self.histories}")
@asynccontextmanager
asyncdef app_lifespan(server):
# 在 MCP 初始化時執行
histories = {}
try:
yield AppContext(histories=histories)
finally:
print(f"關閉服務器:{histories}")
# 初始化 FastMCP 服務器
app = FastMCP(
'mcp-server',
lifespan=app_lifespan,
)
9. LangChain 中使用 MCP Server
做 LLM 應用開發,基本上所有的工具都集成到 LangChain,MCP 也不例外,如下是如何在 LangChain 中使用的代碼:
# !pip install langchain_mcp_adapters
# !pip install langgraph
# !pip install langchain_openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
import os
import asyncio
model = ChatOpenAI(
openai_api_base=os.getenv("OPENAI_API_BASE"),
openai_api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o",
)
server_params = StdioServerParameters(
# 服務器執行的命令
command='python3.11',
# 運行的參數
args=['mcp_server.py'],
# 環境變量,默認為 None,表示使用當前環境變量
# env=None
)
asyncdef main():
asyncwith stdio_client(server_params) as (read, write):
asyncwith ClientSession(read, write) as session:
await session.initialize()
# 獲取工具列表
tools = await load_mcp_tools(session)
# 創建并使用 ReAct agent
agent = create_react_agent(model, tools)
agent_response = await agent.ainvoke({'messages': '深圳天氣如何?'})
print(f"agent_response: {agent_response}")
if __name__ == "__main__":
asyncio.run(main())
10. 其他
(1)配置 Cursor
打開 mcp.json 可以手動配置:
{
"mcpServers": {
"mcp-server": {
"command": "python3.11",
"args": ["/Volumes/my/mpserver/blog/機器學習/code/mcp/mcp-server.py"]
}
}
}
也可以參考官方配置 SSE 協議:
{
"mcpServers": {
"server-name": {
"url": "http://localhost:3000/sse",
"env": {
"API_KEY": "value"
}
}
}
}
(2)開源的 MCP 資源或者項目
MCP 官方提供了很多服務,可以參考:https://mcp.so/。另外也有一些開源項目,有興趣可以看看:https://github.com/yzfly/Awesome-MCP-ZH?tab=readme-ov-file。
參考
(1)https://modelcontextprotocol.io/tutorials/building-mcp-with-llms(2)https://github.com/yzfly/Awesome-MCP-ZH?tab=readme-ov-file
本文轉載自??周末程序猿??,作者:周末程序猿
