Skip to main content
Integration Guide Summary
  • Goal: Implement MCP (Model Context Protocol) integration using MultiMCPTools
  • SDK Version: v2.0.32 or higher
  • Prerequisites: xpander-sdk, knowledge of MCP protocol
  • Reference: XPander issue #401

Overview

The MultiMCPTools integration provides a streamlined way to implement MCP (Model Context Protocol) capabilities in your XPander agents. This guide covers the complete integration process, from basic setup to advanced lifecycle management with proper error handling and status monitoring.

Key Features

MultiMCPTools Integration

Seamless integration with multiple MCP servers and tools

Lifecycle Management

Built-in support for on_boot, on_shutdown, and on_task decorators

Status Management

Real-time status monitoring and health checks for MCP connections

Error Handling

Robust error handling with automatic reconnection capabilities

Installation and Setup

Environment Setup

python3 -m venv .venv
source .venv/bin/activate
pip install "xpander-sdk[agno]>=2.0.32"

Required Dependencies

The MultiMCPTools integration requires the following packages:
  • xpander-sdk (v2.0.32+)
  • mcp-client (automatically installed)
  • asyncio (built-in)

Implementation Example

Here’s the canonical implementation example demonstrating MCP integration with lifecycle decorators:
mcp_integration_example.py
from dotenv import load_dotenv
load_dotenv()

from xpander_sdk import Backend, on_boot, on_shutdown, on_task
from xpander_sdk.tools import MultiMCPTools
from agno.agent import Agent
import asyncio
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global MCP tools instance
mcp_tools = None
mcp_status = {"connected": False, "servers": {}}

@on_boot
async def initialize_mcp():
    """Initialize MCP connections during application boot."""
    global mcp_tools, mcp_status
    
    logger.info("🔌 Initializing MCP connections...")
    
    try:
        # Configure MCP servers
        mcp_servers = {
            "filesystem": {
                "command": "npx",
                "args": ["@modelcontextprotocol/server-filesystem", "/tmp"]
            },
            "github": {
                "command": "npx", 
                "args": ["@modelcontextprotocol/server-github"]
            },
            "brave": {
                "command": "npx",
                "args": ["@modelcontextprotocol/server-brave-search"]
            }
        }
        
        # Initialize MultiMCPTools
        mcp_tools = MultiMCPTools(servers=mcp_servers)
        await mcp_tools.connect_all()
        
        # Update status tracking
        mcp_status["connected"] = True
        for server_name in mcp_servers.keys():
            mcp_status["servers"][server_name] = {
                "status": "connected",
                "last_ping": "boot_time",
                "tools_available": len(await mcp_tools.list_tools(server_name))
            }
        
        logger.info(f"✅ MCP initialization complete. Connected to {len(mcp_servers)} servers")
        
    except Exception as e:
        logger.error(f"❌ MCP initialization failed: {e}")
        mcp_status["connected"] = False
        raise

@on_boot
def validate_mcp_environment():
    """Validate MCP-specific environment variables."""
    import os
    
    logger.info("🔍 Validating MCP environment...")
    
    # Check for optional MCP configuration
    mcp_config_path = os.getenv("MCP_CONFIG_PATH", "~/.mcp/config.json")
    logger.info(f"📁 MCP config path: {mcp_config_path}")
    
    # Validate GitHub token for MCP GitHub server (if used)
    github_token = os.getenv("GITHUB_TOKEN")
    if github_token:
        logger.info("✅ GitHub token found for MCP integration")
    else:
        logger.warning("⚠️  No GitHub token found - MCP GitHub server may have limited functionality")
    
    logger.info("✅ MCP environment validation complete")

@on_task
async def handle_mcp_request(task):
    """Handle incoming tasks using MCP capabilities."""
    global mcp_tools, mcp_status
    
    logger.info(f"📨 Processing MCP task: {task.id}")
    
    if not mcp_status["connected"] or not mcp_tools:
        task.result = {
            "error": "MCP tools not available",
            "status": "failed",
            "message": "MCP integration is not initialized"
        }
        return task
    
    try:
        # Analyze task to determine which MCP server to use
        task_text = task.input.text.lower()
        
        if "file" in task_text or "directory" in task_text:
            # Use filesystem MCP server
            server_name = "filesystem"
            available_tools = await mcp_tools.list_tools(server_name)
            
            if "read_file" in [tool["name"] for tool in available_tools]:
                # Example: reading a file
                if "read" in task_text:
                    file_path = extract_file_path(task.input.text)  # Custom helper function
                    result = await mcp_tools.call_tool(server_name, "read_file", {"path": file_path})
                    
                    task.result = {
                        "content": result["content"],
                        "file_path": file_path,
                        "server_used": server_name,
                        "status": "completed"
                    }
        
        elif "github" in task_text or "repository" in task_text:
            # Use GitHub MCP server
            server_name = "github"
            available_tools = await mcp_tools.list_tools(server_name)
            
            if "search_repositories" in [tool["name"] for tool in available_tools]:
                # Example: searching GitHub repositories
                query = extract_github_query(task.input.text)  # Custom helper function
                result = await mcp_tools.call_tool(server_name, "search_repositories", {"query": query})
                
                task.result = {
                    "repositories": result["items"],
                    "query": query,
                    "server_used": server_name,
                    "status": "completed"
                }
        
        elif "search" in task_text:
            # Use Brave search MCP server
            server_name = "brave"
            available_tools = await mcp_tools.list_tools(server_name)
            
            if "brave_search" in [tool["name"] for tool in available_tools]:
                query = extract_search_query(task.input.text)  # Custom helper function
                result = await mcp_tools.call_tool(server_name, "brave_search", {"query": query})
                
                task.result = {
                    "search_results": result["results"],
                    "query": query,
                    "server_used": server_name,
                    "status": "completed"
                }
        
        else:
            # Default handling - list available capabilities
            all_capabilities = {}
            for server in mcp_status["servers"]:
                if mcp_status["servers"][server]["status"] == "connected":
                    tools = await mcp_tools.list_tools(server)
                    all_capabilities[server] = [tool["name"] for tool in tools]
            
            task.result = {
                "message": "MCP integration ready",
                "available_capabilities": all_capabilities,
                "status": "info"
            }
    
    except Exception as e:
        logger.error(f"❌ MCP task processing failed: {e}")
        task.result = {
            "error": str(e),
            "status": "failed",
            "server_status": mcp_status
        }
    
    logger.info(f"✅ Task {task.id} completed")
    return task

@on_shutdown
async def cleanup_mcp_connections():
    """Clean up MCP connections during shutdown."""
    global mcp_tools, mcp_status
    
    logger.info("🔌 Shutting down MCP connections...")
    
    if mcp_tools:
        try:
            await mcp_tools.disconnect_all()
            logger.info("✅ All MCP connections closed")
        except Exception as e:
            logger.error(f"❌ Error closing MCP connections: {e}")
    
    # Reset status
    mcp_status = {"connected": False, "servers": {}}
    mcp_tools = None

@on_shutdown
def save_mcp_metrics():
    """Save MCP usage metrics before shutdown."""
    global mcp_status
    
    logger.info("📊 Saving MCP metrics...")
    
    if mcp_status.get("servers"):
        total_servers = len(mcp_status["servers"])
        connected_servers = sum(1 for s in mcp_status["servers"].values() if s.get("status") == "connected")
        
        metrics = {
            "total_servers": total_servers,
            "connected_servers": connected_servers,
            "connection_rate": connected_servers / total_servers if total_servers > 0 else 0
        }
        
        logger.info(f"📈 MCP metrics: {metrics}")
        # In production, save to persistent storage or monitoring system

# Helper functions for task processing
def extract_file_path(text):
    """Extract file path from task text."""
    # Simple implementation - in production, use more sophisticated parsing
    words = text.split()
    for word in words:
        if "/" in word and not word.startswith("http"):
            return word
    return "/tmp/example.txt"  # Default for demo

def extract_github_query(text):
    """Extract GitHub search query from task text."""
    # Remove common words and extract meaningful search terms
    stop_words = {"github", "repository", "search", "find", "look", "for"}
    words = [w for w in text.lower().split() if w not in stop_words]
    return " ".join(words[:5])  # Limit to first 5 meaningful words

def extract_search_query(text):
    """Extract search query from task text."""
    # Remove the word "search" and return the rest
    return text.replace("search", "").strip()

# Initialize backend and agent
backend = Backend()
agno_agent = Agent(**backend.get_args())

# The agent is now ready with MCP integration
logger.info("🚀 Agent ready with MCP integration!")
logger.info("📋 Try asking: 'Search for Python MCP examples on GitHub'")
logger.info("📋 Try asking: 'Read the file /tmp/config.json'")
logger.info("📋 Try asking: 'Search for MCP protocol documentation'")

# Example usage
if __name__ == "__main__":
    agno_agent.print_response(message="Search for Python MCP examples on GitHub")

Status Management and Health Checks

Monitor your MCP integration health with these utilities:
mcp_status_monitoring.py
import asyncio
from datetime import datetime

@on_task
async def check_mcp_health(task):
    """Health check endpoint for MCP integration."""
    global mcp_tools, mcp_status
    
    health_report = {
        "timestamp": datetime.now().isoformat(),
        "overall_status": "healthy" if mcp_status["connected"] else "unhealthy",
        "servers": {}
    }
    
    if mcp_tools:
        for server_name in mcp_status["servers"]:
            try:
                # Ping server
                tools = await mcp_tools.list_tools(server_name)
                health_report["servers"][server_name] = {
                    "status": "healthy",
                    "tools_count": len(tools),
                    "last_check": datetime.now().isoformat()
                }
            except Exception as e:
                health_report["servers"][server_name] = {
                    "status": "unhealthy",
                    "error": str(e),
                    "last_check": datetime.now().isoformat()
                }
    
    task.result = health_report
    return task

async def periodic_health_check():
    """Periodic health check for MCP connections."""
    while True:
        if mcp_tools and mcp_status["connected"]:
            try:
                # Check each server
                for server_name in mcp_status["servers"]:
                    await mcp_tools.list_tools(server_name)
                    mcp_status["servers"][server_name]["last_ping"] = datetime.now().isoformat()
                
                logger.info("🔍 MCP health check passed")
            except Exception as e:
                logger.warning(f"⚠️ MCP health check failed: {e}")
                # Attempt reconnection logic here
        
        await asyncio.sleep(300)  # Check every 5 minutes

# Start health monitoring in background
@on_boot
async def start_health_monitoring():
    """Start background health monitoring."""
    asyncio.create_task(periodic_health_check())
    logger.info("📊 MCP health monitoring started")

Configuration Examples

Multiple Server Configuration

mcp_multi_server_config.py
# Advanced MCP server configuration
mcp_servers = {
    "filesystem": {
        "command": "npx",
        "args": ["@modelcontextprotocol/server-filesystem", "/workspace"],
        "env": {"FILESYSTEM_ROOT": "/workspace"}
    },
    "github": {
        "command": "npx",
        "args": ["@modelcontextprotocol/server-github"],
        "env": {"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")}
    },
    "postgres": {
        "command": "npx",
        "args": ["@modelcontextprotocol/server-postgres"],
        "env": {
            "POSTGRES_CONNECTION_STRING": os.getenv("DATABASE_URL")
        }
    },
    "slack": {
        "command": "npx",
        "args": ["@modelcontextprotocol/server-slack"],
        "env": {
            "SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN"),
            "SLACK_USER_TOKEN": os.getenv("SLACK_USER_TOKEN")
        }
    }
}

Environment-based Configuration

mcp_env_config.py
import os
import json

def load_mcp_config():
    """Load MCP configuration from environment or file."""
    
    # Try environment variable first
    config_json = os.getenv("MCP_CONFIG")
    if config_json:
        return json.loads(config_json)
    
    # Try config file
    config_path = os.getenv("MCP_CONFIG_PATH", "~/.mcp/config.json")
    expanded_path = os.path.expanduser(config_path)
    
    if os.path.exists(expanded_path):
        with open(expanded_path, 'r') as f:
            return json.load(f)
    
    # Default configuration
    return {
        "servers": {
            "filesystem": {
                "command": "npx",
                "args": ["@modelcontextprotocol/server-filesystem", "/tmp"]
            }
        }
    }

Best Practices

  1. Always use lifecycle decorators for proper initialization and cleanup
  2. Implement health checks to monitor server connectivity
  3. Handle connection failures gracefully with retry logic
  4. Use environment variables for server configuration
  1. Catch and log MCP-specific exceptions
  2. Provide fallback mechanisms when MCP servers are unavailable
  3. Implement circuit breaker patterns for unreliable servers
  4. Return meaningful error messages to users
  1. Pool connections for frequently used servers
  2. Cache server capabilities to avoid repeated discovery calls
  3. Use async/await throughout for non-blocking operations
  4. Implement request timeouts to prevent hanging operations

Troubleshooting

Connection Failures

# Check server status
if not await mcp_tools.is_connected(server_name):
    logger.error(f"Server {server_name} is not connected")
    await mcp_tools.reconnect(server_name)

Tool Discovery Issues

# List available tools for debugging
tools = await mcp_tools.list_tools(server_name)
logger.info(f"Available tools for {server_name}: {[t['name'] for t in tools]}")

Environment Problems

# Validate MCP server commands
import shutil

for server_name, config in mcp_servers.items():
    command = config["command"]
    if not shutil.which(command):
        logger.error(f"Command not found: {command} for server {server_name}")

Next Steps

  1. Explore Advanced Features: Learn about MCP resource management and streaming
  2. Custom Server Integration: Build your own MCP servers for specific use cases
  3. Production Deployment: Scale your MCP integration with proper monitoring
  4. Security Considerations: Implement authentication and authorization for MCP servers

References

I