通过Python调用MCP的实现示例
作者:龙井茶Sky
前言
前段时间MCP大火,现在温度降下来了,正好最近也在看MCP,发现网上的很多资源还是停留在介绍MCP是USB接口、通过Cursor、Cline使用MCP、通过代码调用stdio格式的MCP。
但是如何在自己的项目(写代码)去实际使用MCP呢?说的就很少了。就比如说,怎么连接sse格式的MCP Server?怎么用Python代码连接多个MCP Server?
今天就把我最近学到的给大家做下分享。
什么是MCP
MCP(Model Context Protocol)即模型上下文协议。
如何理解?大模型其实只是个“大脑”,它只会思考,没有手没有脚,不会行动。这里行动可以理解为查个天气、查个地图、查个企业信息,也可以是企业内容的业务数据查询、办理等等。
大模型最早出来的时候,是通过function call的形式来扩展模型的行动能力(工具调用),但是这个function(技术角度讲就是一个接口api)都是各自定义、没有统一标准。某个应用跑的好好的,可能换了个模型就由于兼容性问题不能用了。
一流公司做标准,于是代码能力最厉害的那个大模型的公司就制定了MCP这个标准协议,以后所有的function都按这个标准来,只要大模型也支持这个标准,那function和模型就可以随便组合都不会有问题了。也就是通过MCP将大模型和api接口给解耦了。

MCP的一些概念
Host:这个其实就是实际的AI应用,如Cursor、Clinet这些AI开发工具,或Cherry这种Chat工具,或Dify、Coze这种平台搭建的智能体,还有就是我们自己用Python或Java写的AI应用
Client:这里指的是调用MCP Server的Client,一个Server对应一个Client。这个Client会连接Server,然后获取Server的tools信息,然后大模型就知道有哪些工具可以调用了。
Server:这里就是指MCP Server了。Server大致分两种,一种是运行在本地,需要通过uv安装,通过stdio命令行方式传输信息;一种是运行在远程,不需要本地安装,直接通过sse方式传输信息。
服务:这里其实才是大模型实际调用的服务,比如查询天气的服务。上面的Server其实只是做了一个转发。
对于Server的理解,其实本地安装的Server,安装后其实也是起了一个http端口,目的还是要可以调用到实际的服务。
MCP怎么用?
网上关于如何通过Cursor、Cline、Cherry等工具调用MCP的内容很多,今天我就不讲这些。我主要讲下文章开头我提出的几个提问。
连接sse的MCP
自己要测试的话,可以本地手写一个sse Server,或者使用高德地图MCP Server。对于本地sse Server,只需要把mcp类型改为sse,并给mcp设置访问端口即可。
主要代码
mcp = FastMCP(
name="myMCP",
host="0.0.0.0",
port=8888,
description="杂七杂八mcp",
sse_path="/sse",
)
if __name__ == "__main__":
# 初始化并运行服务器
try:
print("Starting server...")
mcp.run(transport="sse")
except Exception as e:
print(f"Error: {e}")
然后连接sse Server
import json
import asyncio
import sys
from typing import Optional
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client
from dotenv import load_dotenv
from openai import AsyncOpenAI, OpenAI
import os
load_dotenv()
api_key = os.getenv("api_key")
base_url = os.getenv("base_url")
class Client:
def __init__(self):
self._exit_stack: Optional[AsyncExitStack] = None
self.session: Optional[ClientSession] = None
self._lock = asyncio.Lock()
self.is_connected = False
self.client = AsyncOpenAI(
base_url=base_url,
api_key=api_key,
)
self.model = "qwen-plus-2025-04-28"
self.messages = []
async def connect_server(self, server_config):
async with self._lock:
url = server_config["mcpServers"]["amap-amap-sse"]["url"]
print(f"尝试连接到: {url}")
self._exit_stack = AsyncExitStack()
sse_cm = sse_client(url)
streams = await self._exit_stack.enter_async_context(sse_cm)
session_cm = ClientSession(streams[0], streams[1])
self.session = await self._exit_stack.enter_async_context(session_cm)
await self.session.initialize()
response = await self.session.list_tools()
self.tools = {tool.name: tool for tool in response.tools}
print(f"成功获取 {len(self.tools)} 个工具:")
# 将工具转换为 OpenAI 格式
self.openai_tools = [
self.convert_mcp_tool_to_openai_tool(tool) for tool in response.tools
]
print("连接成功并准备就绪。")
def convert_mcp_tool_to_openai_tool(self, mcp_tool):
"""将 MCP 工具转换为 OpenAI 的 function call 格式"""
return {
"type": "function",
"function": {
"name": mcp_tool.name,
"description": mcp_tool.description,
"parameters": mcp_tool.inputSchema,
},
}
async def chat(self, prompt, role="user"):
self.messages.append({"role": role, "content": prompt})
# 初始化 LLM API 调用,传入 tools 参数
response = await self.client.chat.completions.create(
model=self.model,
messages=self.messages,
tools=self.openai_tools,
tool_choice="auto", # 或者指定具体工具名
)
if response.choices[0].finish_reason == "tool_calls":
# 获取第一个 tool_call
tool_call = response.choices[0].message.tool_calls[0]
print(f"[提示]:正在调用工具 {tool_call.function.name}")
result = await self.session.call_tool(
tool_call.function.name, json.loads(tool_call.function.arguments)
)
# 添加 tool_call 和 tool_response 到 messages
self.messages.append(
{
"role": "assistant",
"content": None,
"tool_calls": [tool_call.model_dump()],
}
)
# 添加 tool_response 到 messages
self.messages.append(
{
"role": "tool",
"name": tool_call.function.name,
"content": str(result),
}
)
# 继续生成最终回答
followup_response = await self.client.chat.completions.create(
model=self.model,
messages=self.messages,
)
content = followup_response.choices[0].message.content
self.messages.append({"role": "assistant", "content": content})
return content
else:
content = response.choices[0].message.content
self.messages.append({"role": "assistant", "content": content})
return content
async def chat_loop(self):
print("MCP 客户端启动")
print("输入 /bye 退出")
while True:
prompt = input(">>> ").strip()
if "/bye" in prompt.lower():
break
response = await self.chat(prompt)
print(response)
async def disconnect(self):
"""关闭 Session 和连接"""
if self._exit_stack is not None:
await self._exit_stack.aclose()
self.is_connected = False
print("客户端已关闭")
def load_server_config(config_file):
with open(config_file) as f:
return json.load(f)
async def main():
try:
server_config = load_server_config("servers_config.json")
client = Client()
await client.connect_server(server_config)
await client.chat_loop()
except Exception as e:
print(f"主程序发生错误: {type(e).__name__}: {e}")
finally:
print("\n正在关闭客户端...")
await client.disconnect()
print("客户端已关闭。")
if __name__ == "__main__":
asyncio.run(main())
这里代码主要就是Client类的connect_server方法,连接到Server后,通过list_tools获取可用的工具列表,传递给大模型。
怎么连接多个MCP Server?
如上代码可以看到,Client类已经定义了大模型和chat逻辑,那么还有其他Server需要连接咋办?这个类的规划可能就有点不太合理了。
我们在相关概念里也说了,MCP分Host、Client、Server,按照这个思路,于是我把目前Client类做了逻辑拆分,连接Server的还保留,大模型、chat的逻辑其实属于主机Host,再新增一个Host类。在Host类即可遍历MCP配置文件,添加多个Client了,每个Client获取到的tools汇总再给到大模型。
再一个点是,在大模型判断需要调用工具的时候,由于涉及多个Server,具体是调用哪个的tool,需要遍历clinet、tool,找到对应的Client,才可以执行工具。
import json
import asyncio
import re
import sys
import traceback
from typing import Optional
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client
from dotenv import load_dotenv
from openai import AsyncOpenAI, OpenAI
import os
load_dotenv()
api_key = os.getenv("api_key")
base_url = os.getenv("base_url")
def format_tools_for_llm(tool) -> str:
"""对tool进行格式化
Returns:
格式化之后的tool描述
"""
args_desc = []
if "properties" in tool.inputSchema:
for param_name, param_info in tool.inputSchema["properties"].items():
arg_desc = (
f"- {param_name}: {param_info.get('description', 'No description')}"
)
if param_name in tool.inputSchema.get("required", []):
arg_desc += " (required)"
args_desc.append(arg_desc)
return f"Tool: {tool.name}\nDescription: {tool.description}\nArguments:\n{chr(10).join(args_desc)}"
class Client:
def __init__(self, url: str):
self._exit_stack: Optional[AsyncExitStack] = None
self.session: Optional[ClientSession] = None
self._lock = asyncio.Lock() # 防止并发连接/断开问题
self.is_connected = False
self.server_url = url
async def connect_server(self):
async with self._lock: # 防止并发调用 connect
url = self.server_url
print(f"尝试连接到: {url}")
self._exit_stack = AsyncExitStack()
# 1. 进入 SSE 上下文,但不退出
sse_cm = sse_client(url)
# 手动调用 __aenter__ 获取流,并存储上下文管理器以便后续退出
streams = await self._exit_stack.enter_async_context(sse_cm)
print("SSE 流已获取。")
# 2. 进入 Session 上下文,但不退出
session_cm = ClientSession(streams[0], streams[1])
# 手动调用 __aenter__ 获取 session
self.session = await self._exit_stack.enter_async_context(session_cm)
print("ClientSession 已创建。")
# 3. 初始化 Session
await self.session.initialize()
print("Session 已初始化。")
# 4. 获取并存储工具列表
response = await self.session.list_tools()
self.tools = {tool.name: tool for tool in response.tools}
print(f"成功获取 {len(self.tools)} 个工具:")
for name, tool in self.tools.items():
print(
f" - {name}: {tool.description[:50]}...:{tool.annotations}"
) # 打印部分描述
print("连接成功并准备就绪。")
# # 列出可用工具
# response = await self.session.list_tools()
# tools = response.tools
async def disconnect(self):
"""关闭 Session 和连接。"""
async with self._lock:
await self._exit_stack.aclose()
class Host:
def __init__(self):
self.client = AsyncOpenAI(
base_url=base_url,
api_key=api_key,
)
self.model = "qwen-plus-2025-04-28"
self.messages = []
self.all_tools = []
self.mcp_clients = []
async def connect_mcp_servers(self, server_config):
servers = server_config["mcpServers"]
for name, server_info in servers.items():
try:
server_url = server_info["url"]
client = Client(server_url)
print(f"正在连接MCP服务器: {name} {server_url}")
await client.connect_server()
self.mcp_clients.append(client)
response = await client.session.list_tools()
tools = response.tools
self.all_tools.extend(tools)
except Exception as e:
print(f"连接MCP服务器: {name} 失败: {type(e).__name__}: {e}")
tools_description = "\n".join(
[format_tools_for_llm(tool) for tool in self.all_tools]
)
# 修改系统提示
system_prompt = (
"You are a helpful assistant with access to these tools:\n\n"
f"{tools_description}\n"
"Choose the appropriate tool based on the user's question. "
"If no tool is needed, reply directly.\n\n"
"IMPORTANT: When you need to use a tool, you must ONLY respond with "
"the exact JSON object format below, nothing else:\n"
"{\n"
' "tool": "tool-name",\n'
' "arguments": {\n'
' "argument-name": "value"\n'
" }\n"
"}\n\n"
'"```json" is not allowed'
"After receiving a tool's response:\n"
"1. Transform the raw data into a natural, conversational response\n"
"2. Keep responses concise but informative\n"
"3. Focus on the most relevant information\n"
"4. Use appropriate context from the user's question\n"
"5. Avoid simply repeating the raw data\n\n"
"Please use only the tools that are explicitly defined above."
)
self.messages.append({"role": "system", "content": system_prompt})
async def disconnect_mcp_servers(self):
for mcp_server in self.mcp_clients:
await mcp_server.disconnect()
async def chat(self, prompt, role="user"):
"""与LLM进行交互"""
self.messages.append({"role": role, "content": prompt})
# 初始化 LLM API 调用
response = await self.client.chat.completions.create(
model=self.model,
messages=self.messages,
)
llm_response = response.choices[0].message.content
return llm_response
async def find_client_tool(self, tool_name: str):
"""查找客户端工具"""
target_client = None
target_tool = None
for client in self.mcp_clients:
response = await client.session.list_tools()
tools = response.tools
for tool in tools:
if tool.name == tool_name:
target_client = client
target_tool = tool
break
if target_tool:
break
return target_client, target_tool
async def execute_tool(self, llm_response: str):
"""Process the LLM response and execute tools if needed.
Args:
llm_response: The response from the LLM.
Returns:
The result of tool execution or the original response.
"""
import json
print(f"LLM Response: {llm_response}")
try:
pattern = r"```json\n(.*?)\n?```"
match = re.search(pattern, llm_response, re.DOTALL)
if match:
llm_response = match.group(1)
tool_call = json.loads(llm_response)
if "tool" in tool_call and "arguments" in tool_call:
# result = await self.session.call_tool(tool_name, tool_args)
# 如果有多个client,需要先找到clinet,再调用tool
# response = await self.session.list_tools()
# tools = response.tools
target_client, target_tool = await self.find_client_tool(
tool_call["tool"]
)
if target_client != None and target_tool != None:
try:
print(f"[提示]:正在调用工具 {tool_call['tool']}")
result = await target_client.session.call_tool(
tool_call["tool"], tool_call["arguments"]
)
if isinstance(result, dict) and "progress" in result:
progress = result["progress"]
total = result["total"]
percentage = (progress / total) * 100
print(f"Progress: {progress}/{total} ({percentage:.1f}%)")
print(f"[执行结果]: {result}")
return f"Tool execution result: {result}"
except Exception as e:
error_msg = f"Error executing tool: {str(e)}"
print(error_msg)
return error_msg
return f"No server found with tool: {tool_call['tool']}"
return llm_response
except json.JSONDecodeError:
return llm_response
async def chat_loop(self):
"""运行交互式聊天循环"""
print("MCP 客户端启动")
print("输入 /bye 退出")
while True:
prompt = input(">>> ").strip()
if "/bye" in prompt.lower():
break
response = await self.chat(prompt)
self.messages.append({"role": "assistant", "content": response})
result = await self.execute_tool(response)
while result != response:
response = await self.chat(result, "system")
self.messages.append({"role": "assistant", "content": response})
result = await self.execute_tool(response)
print(response)
def load_server_config(config_file):
with open(config_file) as f:
return json.load(f)
async def main():
host = None
try:
server_config = load_server_config("servers_config.json")
host = Host()
await host.connect_mcp_servers(server_config)
await host.chat_loop()
except Exception as e:
print(f"主程序发生错误: {type(e).__name__}: {e}")
# 打印完整的调用堆栈
traceback.print_exc()
finally:
# 无论如何,最后都要尝试断开连接并清理资源
print("\n正在关闭客户端...")
await host.disconnect_mcp_servers()
print("客户端已关闭。")
if __name__ == "__main__":
# 我要去济南奥体中心出差,请你查询附近5km的酒店,为我安排行程
asyncio.run(main())
我目前是直接使用MCP官方的类库实现的,也可以使用langchain的类库langchain-mcp-adapters实现,会更加简单。可以直接根据mcp配置json文件,直接获取tools与执行tool。
结语
如上就是我今天主要介绍的内容了,但是MCP其实不仅仅是工具调用,还有资源、提示词等内容,当然工具是用的最多的地方。
后面,我还会再写写如何用langchain更加高效的实现mcp调用。
还有一个是,如何将现有应用api,如何转换为mcp提供调用。
到此这篇关于通过Python调用MCP的实现示例的文章就介绍到这了,更多相关Python调用MCP内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
