AI智能体培训第六天:MCP工具使用与开发

浏览:150 | 发布于 2025-11-22

到了mcp实操环节了,上午扣子空间MCP工具测试,下午本地简单MCP工具开发。

1、上午:扣子空间MCP工具测试

之前使用扣子都是在扣子的开发平台完成,上午的任务都是在扣子空间操作的,就是测试而已。

扣子空间地址:https://www.coze.cn/space-preview

1.1、任务1:帮我搜索下截止到11月22日全运会的金牌榜单以及破赛会记录的情况,并返回11月22日天气情况,帮我做成一个播客,并生成网页

示例对话:

调用 墨迹天气 工具,帮我搜索下截止到11月22日全运会的金牌榜单以及破赛会记录的情况,并返回11月22日天气情况,帮我做成一个播客,并生成网页

核心要求就是要会调用墨迹天气MCP工具,顺便看看扣子空间的强大能力,能生成带音频的播客网页

1.1.1、调用墨迹天气MCP工具

1.1.2、生成的网页

1.2、任务2:介绍家乡(自定义)的美食,帮我做成一个播客,并生成网页

核心要求:调用高德地图或Deeptrip旅行专家工具

1.3、任务3:利用Deeptrip 旅行专家,生成一份自驾阿里大北线的15天旅游行程网页,4人出行,预算充足,出行时间为9-10月。(地点可自定义)

核心要求:调用Deeptrip旅行专家工具

 

2、下午1:基于Coze平台开发自己的MCP插件

2.1、Coze平台开发插件流程

2.1.1、打开开放平台,在左侧菜单栏点击创建

2.1.2、创建应用,选择低代码平台

2.1.3、点击创建空白应用

2.1.4、填写应用名称、应用介绍、生成图标、完成后确认

2.1.5、在工作流里面新建工作流

2.1.6、填写工作流名称和工作流描述并确认

2.1.7、到了熟悉的工作流设计界面,尽情发挥吧

2.2、任务1:完成思维导图MCP工具的开发,并在扣子空间对话框中完成调用

例如:帮我搜索一下西藏旅游的相关知识,然后将这些知识整理为一个思维导图

2.2.1、添加树图插件

2.2.2、完成工作流

MCP工作流已开发完成,下面发布MCP工具。

2.2.3、点击右上角的发布按钮

2.2.4、点击页面最下面的扣子空间扩展库配置

2.2.5、勾选并确认

2.2.6、点击右上角的发布

2.2.8、提交并等待审核,通常几秒钟完成审核

2.2.9、在扣子空间新建对话,调用刚刚发布的test_mcp_mindmap工具,输入”新疆旅游“,得到如下思维导图

2.3、任务2:完成儿童绘本MCP工具的开发,并在扣子空间对话框中完成调用。

工作流比较复杂,就不记录想起过程了

3、下午2:本地天气查询MCP工具开发

3.1、uv工具

3.1.1、uv工具介绍

安装 uv 后,你可以像 pip 一样使用它,但它的语法更简洁,速度也更快。注意,以下为使用语法示例,不用实际运行。

3.1.2、安装uv

pip install uv

3.1.3、uv命令介绍

# uv install
uv pip install requests  # 与 pip install requests 类似,但更快

# 创建虚拟环境
uv venv mcpenv  # 等效于 python -m venv myenv,但更高效

#激活虚拟环境
source mcpenv/bin/activate  # Linux/macOS
mcpenv\Scripts\activate     # Windows

#安装  requirements .txt 
uv pip install -r requirements.txt
# 直接运行 Python 项目
# 如果项目中包含 pyproject.toml ,你可以直接运行:
uv run python script.py

# 上面的代码等效于下面的,但是uv更快
pip install -r requirements.txt 
python script.py

3.2、本地天气查询mcp工具开发

3.2.1、激活虚拟环境

conda activate myenv

3.2.2、创建项目目录mcp1122,并进入

mkdir mcp1122
cd mcp1122

3.2.3、安装uv工具

pip install uv

3.2.4、初始化项目 mcp-weatherbot

uv init mcp-weatherbot

3.2.5、进入 mcp-weatherbot 目录

cd mcp-weatherbot

3.2.6、创建 mcpenv 虚拟环境

uv venv mcpenv

conda的虚拟环境针对的是所有项目,uv的虚拟环境针对的是当前项目

3.2.7、激活虚拟环境

source mcpenv/bin/activate

3.2.8、添加依赖

uv add mcp openai python-dotenv httpx

3.2.9、添加.env文件

BASE_URL=https://api.deepseek.com
MODEL=deepseek-chat
#OPENAI_API_KEY="sk-463136c040bf477e8cf38599305e0a80"
LLM_API_KEY="sk-463136c040bf477e8cf38599305e0a80"

3.2.10、添加servers_config.json文件

{
    "mcpServers": {
      "weather": {
        "command": "python",
        "args": ["weather_server.py"]
      }
    }
}

3.2.11、编写 weather_server.py 文件

import os
import json
import httpx
from typing import Any
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP

# 初始化 MCP 服务器
mcp = FastMCP("WeatherServer")

# OpenWeather API 配置
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
API_KEY = "xxx"
USER_AGENT = "weather-app/1.0"

async def fetch_weather(city: str) -> dict[str, Any] | None:
    """
    从 OpenWeather API 获取天气信息。
    :param city: 城市名称(需使用英文,如 Beijing)
    :return: 天气数据字典;若出错返回包含 error 信息的字典
    """
    params = {
        "q": city,
        "appid": API_KEY,
        "units": "metric",
        "lang": "zh_cn"
    }
    headers = {"User-Agent": USER_AGENT}

    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(OPENWEATHER_API_BASE, params=params, headers=headers, timeout=30.0)
            response.raise_for_status()
            return response.json()  # 返回字典类型
        except httpx.HTTPStatusError as e:
            return {"error": f"HTTP 错误: {e.response.status_code}"}
        except Exception as e:
            return {"error": f"请求失败 : {str(e)}"}

def format_weather(data: dict[str, Any] | str) -> str:
    """
    将天气数据格式化为易读文本。
    :param data: 天气数据(可以是字典或 JSON 字符串)
    :return: 格式化后的天气信息字符串
    """
    # 如果传入的是字符串,则先转换为字典
    if isinstance(data, str):
        try:
            data = json.loads(data)
        except Exception as e:
            return f"无法解析天气数据: {e}"

    # 如果数据中包含错误信息,直接返回错误提示
    if "error" in data:
        return f" ▲  {data['error']}"

    # 提取数据时做容错处理
    city = data.get("name", "未知")
    country = data.get("sys", {}).get("country", "未知")
    temp = data.get("main", {}).get("temp", "N/A")
    humidity = data.get("main", {}).get("humidity", "N/A")
    wind_speed = data.get("wind", {}).get("speed", "N/A")
    # weather 可能为空列表,因此用 [0] 前先提供默认字典
    weather_list = data.get("weather", [{}])
    description = weather_list[0].get("description", "未知")

    return (
        f" ●  {city}, {country}\n"
        f" 温度 : {temp}°C\n"
        f" 』  湿度 : {humidity}%\n"
        f" 风速 : {wind_speed} m/s\n"
        f" 讪  天气 : {description}\n"
    )

@mcp.tool()
async def query_weather(city: str) -> str:
    """
    输入指定城市的英文名称,返回今日天气查询结果。
    :param city: 城市名称(需使用英文)
    :return: 格式化后的天气信息
    """
    data = await fetch_weather(city)
    return format_weather(data)

if __name__ == "__main__":
    # 以标准 I/O 方式运行 MCP 服务器
    mcp.run(transport='stdio')

注意:API Key 需要换成自己的,在 https://openweathermap.org/ 中申请

3.2.12、修改 main.py 文件

import asyncio
import json
import logging
import os
import shutil
from contextlib import AsyncExitStack
from typing import Any, Dict, List, Optional

import httpx
from dotenv import load_dotenv
from openai import OpenAI  # OpenAI Python SDK
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# =============================
# 配置加载类(支持环境变量及配置文件)
# =============================
class Configuration:
    """管理 MCP 客户端的环境变量和配置文件"""

    def __init__(self) -> None:
        load_dotenv()
        # 从环境变量中加载 API key, base_url 和 model
        self.api_key = os.getenv("LLM_API_KEY")
        self.base_url = os.getenv("BASE_URL")
        self.model = os.getenv("MODEL")
        if not self.api_key:
            raise ValueError(" ×  未找到 LLM_API_KEY,请在 .env 文件中配置")

    @staticmethod
    def load_config(file_path: str) -> Dict[str, Any]:
        """
        从 JSON 文件加载服务器配置

        Args:
            file_path: JSON 配置文件路径

        Returns:
            包含服务器配置的字典
        """
        with open(file_path, "r") as f:
            return json.load(f)

# =============================
# MCP 服务器客户端类
# =============================
class Server:
    """管理单个 MCP 服务器连接和工具调用"""

    def __init__(self, name: str, config: Dict[str, Any]) -> None:
        self.name: str = name
        self.config: Dict[str, Any] = config
        self.session: Optional[ClientSession] = None
        self.exit_stack: AsyncExitStack = AsyncExitStack()
        self._cleanup_lock = asyncio.Lock()

    async def initialize(self) -> None:
        """初始化与 MCP 服务器的连接"""
        # command 字段直接从配置获取
        command = self.config["command"]
        if command is None:
            raise ValueError("command 不能为空")

        server_params = StdioServerParameters(
            command=command,
            args=self.config["args"],
            env={**os.environ, **self.config["env"]}
            if self.config.get("env")
            else None,
        )
        try:
            stdio_transport = await self.exit_stack.enter_async_context(
                stdio_client(server_params)
            )
            read_stream, write_stream = stdio_transport
            session = await self.exit_stack.enter_async_context(
                ClientSession(read_stream, write_stream)
            )
            await session.initialize()
            self.session = session
        except Exception as e:
            logging.error(f"Error initializing server {self.name}: {e}")
            await self.cleanup()
            raise

    async def list_tools(self) -> List[Any]:
        """获取服务器可用的工具列表

        Returns:
            工具列表
        """
        if not self.session:
            raise RuntimeError(f"Server {self.name} not initialized")
        tools_response = await self.session.list_tools()
        tools = []
        for item in tools_response:
            if isinstance(item, tuple) and item[0] == "tools":
                for tool in item[1]:
                    tools.append(Tool(tool.name, tool.description, tool.inputSchema))
        return tools

    async def execute_tool(
        self, tool_name: str, arguments: Dict[str, Any], retries: int = 2, delay: float = 1.0
    ) -> Any:
        """执行指定工具,并支持重试机制

        Args:
            tool_name: 工具名称
            arguments: 工具参数
            retries: 重试次数
            delay: 重试间隔秒数

        Returns:
            工具调用结果
        """
        if not self.session:
            raise RuntimeError(f"Server {self.name} not initialized")
        attempt = 0
        while attempt < retries:
            try:
                logging.info(f"Executing {tool_name} on server {self.name}...")
                result = await self.session.call_tool(tool_name, arguments)
                return result
            except Exception as e:
                attempt += 1
                logging.warning(f"Error executing tool: {e}. Attempt {attempt} of {retries}.")
                if attempt < retries:
                    logging.info(f"Retrying in {delay} seconds...")
                    await asyncio.sleep(delay)
                else:
                    logging.error("Max retries reached. Failing.")
                    raise

    async def cleanup(self) -> None:
        """清理服务器资源"""
        async with self._cleanup_lock:
            try:
                await self.exit_stack.aclose()
                self.session = None
            except Exception as e:
                logging.error(f"Error during cleanup of server {self.name}: {e}")

# =============================
# 工具封装类
# =============================
class Tool:
    """封装 MCP 返回的工具信息"""

    def __init__(self, name: str, description: str, input_schema: Dict[str, Any]) -> None:
        self.name: str = name
        self.description: str = description
        self.input_schema: Dict[str, Any] = input_schema

    def format_for_llm(self) -> str:
        """生成用于 LLM 提示的工具描述"""
        args_desc = []
        if "properties" in self.input_schema:
            for param_name, param_info in self.input_schema["properties"].items():
                arg_desc = f"- {param_name}: {param_info.get('description', 'No description')}"
                if param_name in self.input_schema.get("required", []):
                    arg_desc += " (required)"
                args_desc.append(arg_desc)
        return f"""
Tool: {self.name}
Description: {self.description}
Arguments:
{chr(10).join(args_desc)}
"""

# =============================
# LLM 客户端封装类(使用 OpenAI SDK)
# =============================
class LLMClient:
    """使用 OpenAI SDK 与大模型交互"""

    def __init__(self, api_key: str, base_url: Optional[str], model: str) -> None:
        self.client = OpenAI(api_key=api_key, base_url=base_url)
        self.model = model

    def get_response(self, messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]] = None) -> Any:
        """
        发送消息给大模型 API,支持传入工具参数(function calling 格式)
        """
        payload = {
            "model": self.model,
            "messages": messages,
            "tools": tools,
        }
        try:
            response = self.client.chat.completions.create(**payload)
            return response
        except Exception as e:
            logging.error(f"Error during LLM call: {e}")
            raise

# =============================
# 多服务器 MCP 客户端类(集成配置文件、工具格式转换与 OpenAI SDK 调用)
# =============================
class MultiServerMCPClient:
    def __init__(self) -> None:
        """
        管理多个 MCP 服务器,并使用 OpenAI Function Calling 风格的接口调用大模型
        """
        self.exit_stack = AsyncExitStack()
        config = Configuration()
        self.openai_api_key = config.api_key
        self.base_url = config.base_url
        self.model = config.model
        self.client = LLMClient(self.openai_api_key, self.base_url, self.model)
        # (server_name -> Server 对象)
        self.servers: Dict[str, Server] = {}
        # 各个 server 的工具列表
        self.tools_by_server: Dict[str, List[Any]] = {}
        self.all_tools: List[Dict[str, Any]] = []

    async def connect_to_servers(self, servers_config: Dict[str, Any]) -> None:
        """
        根据配置文件同时启动多个服务器并获取工具
        servers_config 的格式为:
        {
            "mcpServers": {
                "sqlite": { "command": "uvx", "args": [ ... ] },
                "puppeteer": { "command": "npx", "args": [ ... ] },
                ...
            }
        }
        """
        mcp_servers = servers_config.get("mcpServers", {})
        for server_name, srv_config in mcp_servers.items():
            server = Server(server_name, srv_config)
            await server.initialize()
            self.servers[server_name] = server
            tools = await server.list_tools()
            self.tools_by_server[server_name] = tools

            for tool in tools:
                # 统一重命名:serverName_toolName
                function_name = f"{server_name}_{tool.name}"
                self.all_tools.append(
                    {
                        "type": "function",
                        "function": {
                            "name": function_name,
                            "description": tool.description,
                            "input_schema": tool.input_schema,
                        },
                    }
                )

        # 转换为 OpenAI Function Calling 所需格式
        self.all_tools = await self.transform_json(self.all_tools)

        logging.info("\n✅   已连接到下列服务器:")
        for name in self.servers:
            srv_cfg = mcp_servers[name]
            logging.info(f"  - {name}: command={srv_cfg['command']}, args= {srv_cfg['args']}")
        logging.info("\n汇总的工具:")
        for t in self.all_tools:
            logging.info(f"  - {t['function']['name']}")

    async def transform_json(self, json_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        将工具的 input_schema 转换为 OpenAI 所需的 parameters 格式,并删除多余字段
        """
        result = []
        for item in json_data:
            if not isinstance(item, dict) or "type" not in item or "function" not in item:
                continue
            old_func = item["function"]
            if (
                not isinstance(old_func, dict)
                or "name" not in old_func
                or "description" not in old_func
            ):
                continue
            new_func = {
                "name": old_func["name"],
                "description": old_func["description"],
                "parameters": {},
            }
            if "input_schema" in old_func and isinstance(old_func["input_schema"], dict):
                old_schema = old_func["input_schema"]
                new_func["parameters"]["type"] = old_schema.get("type", "object")
                new_func["parameters"]["properties"] = old_schema.get("properties", {})
                new_func["parameters"]["required"] = old_schema.get("required", [])
            new_item = {"type": item["type"], "function": new_func}
            result.append(new_item)
        return result

    async def chat_base(self, messages: List[Dict[str, Any]]) -> Any:
        """
        使用 OpenAI 接口进行对话,并支持多次工具调用(Function Calling)。
        如果返回 finish_reason 为 "tool_calls",则进行工具调用后再发起请求。
        """
        response = self.client.get_response(messages, tools=self.all_tools)
        # 如果模型返回工具调用
        if response.choices[0].finish_reason == "tool_calls":
            while True:
                messages = await self.create_function_response_messages(messages, response)
                response = self.client.get_response(messages, tools=self.all_tools)
                if response.choices[0].finish_reason != "tool_calls":
                    break
        return response

    async def create_function_response_messages(
        self, messages: List[Dict[str, Any]], response: Any
    ) -> List[Dict[str, Any]]:
        """
        将模型返回的工具调用解析执行,并将结果追加到消息队列中
        """
        function_call_messages = response.choices[0].message.tool_calls
        messages.append(response.choices[0].message.model_dump())
        for function_call_message in function_call_messages:
            tool_name = function_call_message.function.name
            tool_args = json.loads(function_call_message.function.arguments)
            # 调用 MCP 工具
            function_response = await self._call_mcp_tool(tool_name, tool_args)
            messages.append(
                {
                    "role": "tool",
                    "content": function_response,
                    "tool_call_id": function_call_message.id,
                }
            )
        return messages

    async def process_query(self, user_query: str) -> str:
        """
        OpenAI Function Calling 流程:
        1. 发送用户消息 + 工具信息
        2. 若模型返回 finish_reason 为 "tool_calls",则解析并调用 MCP 工具
        3. 将工具调用结果返回给模型,获得最终回答
        """
        messages = [{"role": "user", "content": user_query}]
        response = self.client.get_response(messages, tools=self.all_tools)
        content = response.choices[0]
        logging.info(content)
        if content.finish_reason == "tool_calls":
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)
            logging.info(f"\n[ 调用工具 : {tool_name}, 参数 : {tool_args} ]\n")
            result = await self._call_mcp_tool(tool_name, tool_args)
            messages.append(content.message.model_dump())
            messages.append(
                {
                    "role": "tool",
                    "content": result,
                    "tool_call_id": tool_call.id,
                }
            )
            response = self.client.get_response(messages, tools=self.all_tools)
            return response.choices[0].message.content
        return content.message.content

    async def _call_mcp_tool(self, tool_full_name: str, tool_args: Dict[str, Any]) -> str:
        """
        根据 "serverName_toolName" 格式调用相应 MCP 工具
        """
        parts = tool_full_name.split("_", 1)
        if len(parts) != 2:
            return f"无效的工具名称: {tool_full_name}"
        server_name, tool_name = parts
        server = self.servers.get(server_name)
        if not server:
            return f"找不到服务器: {server_name}"
        resp = await server.execute_tool(tool_name, tool_args)
        
        # 处理返回内容,确保转换为字符串格式
        if not resp.content:
            return "工具执行无输出"
        
        # 如果content是列表,需要转换为字符串
        if isinstance(resp.content, list):
            # 提取所有文本内容并合并
            text_parts = []
            for item in resp.content:
                if isinstance(item, dict) and "text" in item:
                    text_parts.append(item["text"])
                elif isinstance(item, str):
                    text_parts.append(item)
                else:
                    text_parts.append(str(item))
            return "\n".join(text_parts) if text_parts else "工具执行无输出"
        
        return str(resp.content)

    async def chat_loop(self) -> None:
        """多服务器 MCP + OpenAI Function Calling 客户端主循环"""
        logging.info("\n疊  多服务器 MCP + Function Calling 客户端已启动!输入  'quit'退出。")
        messages: List[Dict[str, Any]] = []
        while True:
            query = input("\n你 : ").strip()
            if query.lower() == "quit":
                break
            try:
                messages.append({"role": "user", "content": query})
                messages = messages[-20:]  # 保持最新 20 条上下文
                response = await self.chat_base(messages)
                messages.append(response.choices[0].message.model_dump())
                result = response.choices[0].message.content
                # logging.info(f"\nAI: {result}")
                print(f"\nAI: {result}")
            except Exception as e:
                print(f"\n▲    调用过程出错: {e}")

    async def cleanup(self) -> None:
        """关闭所有资源"""
        await self.exit_stack.aclose()

# =============================
# 主函数
# =============================
async def main() -> None:
    # 从配置文件加载服务器配置
    config = Configuration()
    servers_config = config.load_config("servers_config.json")
    client = MultiServerMCPClient()
    try:
        await client.connect_to_servers(servers_config)
        await client.chat_loop()
    finally:
        try:
            await asyncio.sleep(0.1)
            await client.cleanup()
        except RuntimeError as e:
            # 如果是因为退出 cancel scope 导致的异常,可以选择忽略
            if "Attempted to exit cancel scope" in str(e):
                logging.info("退出时检测到 cancel scope 异常,已忽略。")
            else:
                raise

if __name__ == "__main__":
    asyncio.run(main())

3.2.13、运行 main.py 文件并测试效果

uv run main.py

 

海涛贝塔(https://haitaobeta.com)属于海涛个人博客,欢迎浏览使用

联系方式:qq:52292959 邮箱:52292959@qq.com

备案号:粤ICP备18108585号 友情链接