from dotenv import load_dotenv
load_dotenv()
from xpander_sdk import Backend, on_boot, on_shutdown, on_task
from xpander_sdk.tools import MultiMCPTools
from agno.agent import Agent
import asyncio
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global MCP tools instance
mcp_tools = None
mcp_status = {"connected": False, "servers": {}}
@on_boot
async def initialize_mcp():
"""Initialize MCP connections during application boot."""
global mcp_tools, mcp_status
logger.info("🔌 Initializing MCP connections...")
try:
# Configure MCP servers
mcp_servers = {
"filesystem": {
"command": "npx",
"args": ["@modelcontextprotocol/server-filesystem", "/tmp"]
},
"github": {
"command": "npx",
"args": ["@modelcontextprotocol/server-github"]
},
"brave": {
"command": "npx",
"args": ["@modelcontextprotocol/server-brave-search"]
}
}
# Initialize MultiMCPTools
mcp_tools = MultiMCPTools(servers=mcp_servers)
await mcp_tools.connect_all()
# Update status tracking
mcp_status["connected"] = True
for server_name in mcp_servers.keys():
mcp_status["servers"][server_name] = {
"status": "connected",
"last_ping": "boot_time",
"tools_available": len(await mcp_tools.list_tools(server_name))
}
logger.info(f"✅ MCP initialization complete. Connected to {len(mcp_servers)} servers")
except Exception as e:
logger.error(f"❌ MCP initialization failed: {e}")
mcp_status["connected"] = False
raise
@on_boot
def validate_mcp_environment():
"""Validate MCP-specific environment variables."""
import os
logger.info("🔍 Validating MCP environment...")
# Check for optional MCP configuration
mcp_config_path = os.getenv("MCP_CONFIG_PATH", "~/.mcp/config.json")
logger.info(f"📁 MCP config path: {mcp_config_path}")
# Validate GitHub token for MCP GitHub server (if used)
github_token = os.getenv("GITHUB_TOKEN")
if github_token:
logger.info("✅ GitHub token found for MCP integration")
else:
logger.warning("⚠️ No GitHub token found - MCP GitHub server may have limited functionality")
logger.info("✅ MCP environment validation complete")
@on_task
async def handle_mcp_request(task):
"""Handle incoming tasks using MCP capabilities."""
global mcp_tools, mcp_status
logger.info(f"📨 Processing MCP task: {task.id}")
if not mcp_status["connected"] or not mcp_tools:
task.result = {
"error": "MCP tools not available",
"status": "failed",
"message": "MCP integration is not initialized"
}
return task
try:
# Analyze task to determine which MCP server to use
task_text = task.input.text.lower()
if "file" in task_text or "directory" in task_text:
# Use filesystem MCP server
server_name = "filesystem"
available_tools = await mcp_tools.list_tools(server_name)
if "read_file" in [tool["name"] for tool in available_tools]:
# Example: reading a file
if "read" in task_text:
file_path = extract_file_path(task.input.text) # Custom helper function
result = await mcp_tools.call_tool(server_name, "read_file", {"path": file_path})
task.result = {
"content": result["content"],
"file_path": file_path,
"server_used": server_name,
"status": "completed"
}
elif "github" in task_text or "repository" in task_text:
# Use GitHub MCP server
server_name = "github"
available_tools = await mcp_tools.list_tools(server_name)
if "search_repositories" in [tool["name"] for tool in available_tools]:
# Example: searching GitHub repositories
query = extract_github_query(task.input.text) # Custom helper function
result = await mcp_tools.call_tool(server_name, "search_repositories", {"query": query})
task.result = {
"repositories": result["items"],
"query": query,
"server_used": server_name,
"status": "completed"
}
elif "search" in task_text:
# Use Brave search MCP server
server_name = "brave"
available_tools = await mcp_tools.list_tools(server_name)
if "brave_search" in [tool["name"] for tool in available_tools]:
query = extract_search_query(task.input.text) # Custom helper function
result = await mcp_tools.call_tool(server_name, "brave_search", {"query": query})
task.result = {
"search_results": result["results"],
"query": query,
"server_used": server_name,
"status": "completed"
}
else:
# Default handling - list available capabilities
all_capabilities = {}
for server in mcp_status["servers"]:
if mcp_status["servers"][server]["status"] == "connected":
tools = await mcp_tools.list_tools(server)
all_capabilities[server] = [tool["name"] for tool in tools]
task.result = {
"message": "MCP integration ready",
"available_capabilities": all_capabilities,
"status": "info"
}
except Exception as e:
logger.error(f"❌ MCP task processing failed: {e}")
task.result = {
"error": str(e),
"status": "failed",
"server_status": mcp_status
}
logger.info(f"✅ Task {task.id} completed")
return task
@on_shutdown
async def cleanup_mcp_connections():
"""Clean up MCP connections during shutdown."""
global mcp_tools, mcp_status
logger.info("🔌 Shutting down MCP connections...")
if mcp_tools:
try:
await mcp_tools.disconnect_all()
logger.info("✅ All MCP connections closed")
except Exception as e:
logger.error(f"❌ Error closing MCP connections: {e}")
# Reset status
mcp_status = {"connected": False, "servers": {}}
mcp_tools = None
@on_shutdown
def save_mcp_metrics():
"""Save MCP usage metrics before shutdown."""
global mcp_status
logger.info("📊 Saving MCP metrics...")
if mcp_status.get("servers"):
total_servers = len(mcp_status["servers"])
connected_servers = sum(1 for s in mcp_status["servers"].values() if s.get("status") == "connected")
metrics = {
"total_servers": total_servers,
"connected_servers": connected_servers,
"connection_rate": connected_servers / total_servers if total_servers > 0 else 0
}
logger.info(f"📈 MCP metrics: {metrics}")
# In production, save to persistent storage or monitoring system
# Helper functions for task processing
def extract_file_path(text):
"""Extract file path from task text."""
# Simple implementation - in production, use more sophisticated parsing
words = text.split()
for word in words:
if "/" in word and not word.startswith("http"):
return word
return "/tmp/example.txt" # Default for demo
def extract_github_query(text):
"""Extract GitHub search query from task text."""
# Remove common words and extract meaningful search terms
stop_words = {"github", "repository", "search", "find", "look", "for"}
words = [w for w in text.lower().split() if w not in stop_words]
return " ".join(words[:5]) # Limit to first 5 meaningful words
def extract_search_query(text):
"""Extract search query from task text."""
# Remove the word "search" and return the rest
return text.replace("search", "").strip()
# Initialize backend and agent
backend = Backend()
agno_agent = Agent(**backend.get_args())
# The agent is now ready with MCP integration
logger.info("🚀 Agent ready with MCP integration!")
logger.info("📋 Try asking: 'Search for Python MCP examples on GitHub'")
logger.info("📋 Try asking: 'Read the file /tmp/config.json'")
logger.info("📋 Try asking: 'Search for MCP protocol documentation'")
# Example usage
if __name__ == "__main__":
agno_agent.print_response(message="Search for Python MCP examples on GitHub")