Lifecycle Guide Summary
- Goal: Master lifecycle management for MCP-enabled applications
- SDK Version: v2.0.32 or higher
- Prerequisites: Basic understanding of async/await and decorators
- Reference: XPander issue #401, SDK v2.0.32
Overview
This guide demonstrates advanced lifecycle management patterns specifically designed for MCP (Model Context Protocol) integrations. You’ll learn how to properly initialize, manage, and cleanup MCP resources using XPander’s lifecycle decorators.Lifecycle Phases
Boot Phase
Initialize MCP servers, validate configuration, and establish connections
Runtime Phase
Handle tasks with MCP tools, manage connections, and monitor health
Shutdown Phase
Gracefully disconnect from servers, save state, and cleanup resources
Complete Implementation
Prerequisites and Setup
Prerequisites and Setup
Virtual Environment Setup
setup.sh
Copy
Ask AI
python3 -m venv .venv
source .venv/bin/activate
pip install "xpander-sdk[agno]>=2.0.32"
Environment Configuration
.env
Copy
Ask AI
XPANDER_API_KEY=your_api_key
XPANDER_ORGANIZATION_ID=your_org_id
GITHUB_TOKEN=your_github_token
SLACK_BOT_TOKEN=your_slack_token
MCP_CONFIG_PATH=~/.mcp/config.json
advanced_lifecycle_mcp.py
Copy
Ask AI
from dotenv import load_dotenv
load_dotenv()
from xpander_sdk import Backend, on_boot, on_shutdown, on_task
from xpander_sdk.tools import MultiMCPTools
from agno.agent import Agent
import asyncio
import logging
import json
import os
from datetime import datetime
from typing import Dict, Any, Optional
import signal
# Configure comprehensive logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Global state management
class MCPState:
def __init__(self):
self.tools: Optional[MultiMCPTools] = None
self.connected: bool = False
self.servers: Dict[str, Dict[str, Any]] = {}
self.metrics: Dict[str, Any] = {
"boot_time": None,
"tasks_processed": 0,
"errors_encountered": 0,
"reconnections": 0
}
self.shutdown_requested: bool = False
# Global state instance
mcp_state = MCPState()
# === BOOT PHASE ===
@on_boot
def setup_signal_handlers():
"""Setup graceful shutdown signal handlers."""
def signal_handler(signum, frame):
logger.info(f"📡 Received signal {signum}, initiating graceful shutdown...")
mcp_state.shutdown_requested = True
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
logger.info("📡 Signal handlers configured")
@on_boot
def validate_mcp_environment():
"""Comprehensive environment validation for MCP integration."""
logger.info("🔍 Validating MCP environment...")
# Required environment variables
required_vars = ["XPANDER_API_KEY", "XPANDER_ORGANIZATION_ID"]
missing_required = [var for var in required_vars if not os.getenv(var)]
if missing_required:
raise EnvironmentError(f"❌ Missing required variables: {missing_required}")
# Optional but recommended variables
optional_vars = {
"GITHUB_TOKEN": "GitHub MCP server functionality will be limited",
"SLACK_BOT_TOKEN": "Slack MCP server will not be available",
"MCP_CONFIG_PATH": "Using default MCP configuration path"
}
for var, warning in optional_vars.items():
if not os.getenv(var):
logger.warning(f"⚠️ {var} not set: {warning}")
else:
logger.info(f"✅ {var} configured")
# Validate MCP configuration file
config_path = os.path.expanduser(os.getenv("MCP_CONFIG_PATH", "~/.mcp/config.json"))
if os.path.exists(config_path):
try:
with open(config_path, 'r') as f:
config = json.load(f)
logger.info(f"✅ MCP configuration loaded from {config_path}")
except json.JSONDecodeError as e:
logger.error(f"❌ Invalid JSON in MCP config: {e}")
raise
else:
logger.info(f"📄 No MCP config file found at {config_path}, using defaults")
logger.info("✅ Environment validation completed")
@on_boot
async def initialize_mcp_tools():
"""Initialize MCP tools with comprehensive server configuration."""
global mcp_state
logger.info("🔌 Initializing MCP tools and servers...")
mcp_state.metrics["boot_time"] = datetime.now()
try:
# Load server configuration
servers_config = load_server_configuration()
# Initialize MultiMCPTools
mcp_state.tools = MultiMCPTools(servers=servers_config)
# Connect to all configured servers
connection_results = await connect_servers_with_retry(servers_config)
# Update state based on connection results
mcp_state.connected = any(connection_results.values())
mcp_state.servers = {
name: {
"status": "connected" if connected else "failed",
"last_ping": datetime.now().isoformat() if connected else None,
"tools_available": 0,
"reconnection_attempts": 0
}
for name, connected in connection_results.items()
}
# Discover available tools for connected servers
await discover_server_capabilities()
if mcp_state.connected:
logger.info(f"✅ MCP initialization complete. Connected to {sum(connection_results.values())}/{len(servers_config)} servers")
else:
logger.error("❌ Failed to connect to any MCP servers")
except Exception as e:
logger.error(f"❌ MCP initialization failed: {e}")
mcp_state.connected = False
raise
@on_boot
async def start_background_services():
"""Start background services for health monitoring and maintenance."""
logger.info("🔄 Starting background services...")
# Start health monitoring
asyncio.create_task(health_monitoring_service())
# Start metrics collection
asyncio.create_task(metrics_collection_service())
# Start connection maintenance
asyncio.create_task(connection_maintenance_service())
logger.info("✅ Background services started")
# === RUNTIME PHASE ===
@on_task
async def handle_mcp_task(task):
"""Advanced task handling with MCP integration."""
global mcp_state
logger.info(f"📨 Processing MCP task: {task.id}")
mcp_state.metrics["tasks_processed"] += 1
# Check if MCP is available
if not mcp_state.connected or not mcp_state.tools:
mcp_state.metrics["errors_encountered"] += 1
task.result = {
"error": "MCP services unavailable",
"status": "failed",
"details": "MCP integration is not properly initialized",
"available_servers": list(mcp_state.servers.keys())
}
return task
try:
# Task analysis and routing
task_analysis = analyze_task_requirements(task.input.text)
# Route to appropriate MCP server
result = await route_task_to_server(task_analysis)
# Process result and set task response
task.result = {
"status": "completed",
"server_used": result.get("server_name"),
"operation": result.get("operation"),
"data": result.get("data"),
"processing_time_ms": result.get("processing_time"),
"timestamp": datetime.now().isoformat()
}
logger.info(f"✅ Task {task.id} completed successfully")
except Exception as e:
logger.error(f"❌ Task processing failed: {e}")
mcp_state.metrics["errors_encountered"] += 1
task.result = {
"error": str(e),
"status": "failed",
"server_status": {name: info["status"] for name, info in mcp_state.servers.items()},
"timestamp": datetime.now().isoformat()
}
return task
@on_task
async def handle_health_check(task):
"""Handle health check requests for MCP integration."""
global mcp_state
if "health" in task.input.text.lower() or "status" in task.input.text.lower():
health_report = await generate_health_report()
task.result = {
"health_report": health_report,
"status": "completed",
"timestamp": datetime.now().isoformat()
}
return task
# === SHUTDOWN PHASE ===
@on_shutdown
async def save_mcp_metrics():
"""Save comprehensive MCP metrics before shutdown."""
global mcp_state
logger.info("📊 Saving MCP metrics and state...")
# Calculate uptime
if mcp_state.metrics["boot_time"]:
uptime = datetime.now() - mcp_state.metrics["boot_time"]
mcp_state.metrics["uptime_seconds"] = uptime.total_seconds()
# Compile final metrics
final_metrics = {
"session_metrics": mcp_state.metrics,
"server_status": mcp_state.servers,
"final_health_check": await generate_health_report() if mcp_state.connected else None,
"shutdown_timestamp": datetime.now().isoformat()
}
# Save to file (in production, send to monitoring system)
metrics_path = "/tmp/mcp_session_metrics.json"
try:
with open(metrics_path, 'w') as f:
json.dump(final_metrics, f, indent=2, default=str)
logger.info(f"📈 Metrics saved to {metrics_path}")
except Exception as e:
logger.error(f"❌ Failed to save metrics: {e}")
@on_shutdown
async def graceful_mcp_shutdown():
"""Gracefully shutdown all MCP connections."""
global mcp_state
logger.info("🔌 Shutting down MCP connections...")
if mcp_state.tools:
try:
# Disconnect from all servers gracefully
for server_name in mcp_state.servers:
if mcp_state.servers[server_name]["status"] == "connected":
logger.info(f"🔌 Disconnecting from {server_name}...")
await mcp_state.tools.disconnect(server_name)
mcp_state.servers[server_name]["status"] = "disconnected"
logger.info("✅ All MCP connections closed gracefully")
except Exception as e:
logger.error(f"❌ Error during MCP shutdown: {e}")
# Reset state
mcp_state.connected = False
mcp_state.tools = None
@on_shutdown
def cleanup_resources():
"""Final cleanup of resources and temporary files."""
logger.info("🧹 Performing final resource cleanup...")
# Clear sensitive data from memory
if mcp_state.servers:
for server_info in mcp_state.servers.values():
if "auth_token" in server_info:
server_info["auth_token"] = "[REDACTED]"
# Clean up temporary files
temp_files = ["/tmp/mcp_temp_*", "/tmp/mcp_cache_*"]
for pattern in temp_files:
import glob
for file_path in glob.glob(pattern):
try:
os.remove(file_path)
logger.info(f"🗑️ Cleaned up {file_path}")
except Exception as e:
logger.warning(f"⚠️ Failed to clean {file_path}: {e}")
logger.info("✅ Resource cleanup completed")
# === HELPER FUNCTIONS ===
def load_server_configuration() -> Dict[str, Dict[str, Any]]:
"""Load MCP server configuration from various sources."""
# Default configuration
default_config = {
"filesystem": {
"command": "npx",
"args": ["@modelcontextprotocol/server-filesystem", "/tmp"],
"env": {}
},
"github": {
"command": "npx",
"args": ["@modelcontextprotocol/server-github"],
"env": {"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN", "")}
}
}
# Try to load from config file
config_path = os.path.expanduser(os.getenv("MCP_CONFIG_PATH", "~/.mcp/config.json"))
if os.path.exists(config_path):
try:
with open(config_path, 'r') as f:
file_config = json.load(f)
default_config.update(file_config.get("servers", {}))
except Exception as e:
logger.warning(f"⚠️ Failed to load config from {config_path}: {e}")
# Filter out servers with missing required environment variables
filtered_config = {}
for name, config in default_config.items():
if name == "github" and not os.getenv("GITHUB_TOKEN"):
logger.info(f"⏭️ Skipping {name} server (missing GITHUB_TOKEN)")
continue
filtered_config[name] = config
return filtered_config
async def connect_servers_with_retry(servers_config: Dict[str, Dict[str, Any]], max_retries: int = 3) -> Dict[str, bool]:
"""Connect to MCP servers with retry logic."""
connection_results = {}
for server_name, config in servers_config.items():
logger.info(f"🔗 Connecting to {server_name}...")
for attempt in range(max_retries):
try:
await mcp_state.tools.connect(server_name)
connection_results[server_name] = True
logger.info(f"✅ Connected to {server_name}")
break
except Exception as e:
logger.warning(f"⚠️ Connection attempt {attempt + 1}/{max_retries} failed for {server_name}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
connection_results[server_name] = False
logger.error(f"❌ Failed to connect to {server_name} after {max_retries} attempts")
return connection_results
async def discover_server_capabilities():
"""Discover and cache capabilities for all connected servers."""
for server_name, server_info in mcp_state.servers.items():
if server_info["status"] == "connected":
try:
tools = await mcp_state.tools.list_tools(server_name)
resources = await mcp_state.tools.list_resources(server_name) if hasattr(mcp_state.tools, 'list_resources') else []
server_info["tools_available"] = len(tools)
server_info["resources_available"] = len(resources)
server_info["capabilities"] = {
"tools": [tool["name"] for tool in tools],
"resources": [resource["uri"] for resource in resources] if resources else []
}
logger.info(f"🔍 Discovered {len(tools)} tools for {server_name}")
except Exception as e:
logger.warning(f"⚠️ Failed to discover capabilities for {server_name}: {e}")
def analyze_task_requirements(task_text: str) -> Dict[str, Any]:
"""Analyze task to determine required MCP server and operations."""
analysis = {
"text": task_text.lower(),
"required_servers": [],
"suggested_operations": [],
"priority": "normal"
}
# Keyword-based analysis
if any(keyword in analysis["text"] for keyword in ["file", "directory", "read", "write"]):
analysis["required_servers"].append("filesystem")
analysis["suggested_operations"].extend(["read_file", "list_directory", "write_file"])
if any(keyword in analysis["text"] for keyword in ["github", "repository", "repo", "git"]):
analysis["required_servers"].append("github")
analysis["suggested_operations"].extend(["search_repositories", "get_file_contents"])
if any(keyword in analysis["text"] for keyword in ["search", "web", "internet"]):
analysis["required_servers"].append("brave")
analysis["suggested_operations"].append("web_search")
# Determine priority
if any(keyword in analysis["text"] for keyword in ["urgent", "asap", "immediately"]):
analysis["priority"] = "high"
return analysis
async def route_task_to_server(task_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Route task to appropriate MCP server based on analysis."""
start_time = datetime.now()
for server_name in task_analysis["required_servers"]:
if server_name in mcp_state.servers and mcp_state.servers[server_name]["status"] == "connected":
# Get available tools for this server
available_tools = mcp_state.servers[server_name].get("capabilities", {}).get("tools", [])
# Find matching operations
matching_ops = [op for op in task_analysis["suggested_operations"] if op in available_tools]
if matching_ops:
# Execute the first matching operation (simplified routing)
operation = matching_ops[0]
try:
# This would call the actual MCP tool - simplified for example
result = await execute_mcp_operation(server_name, operation, task_analysis)
processing_time = (datetime.now() - start_time).total_seconds() * 1000
return {
"server_name": server_name,
"operation": operation,
"data": result,
"processing_time": processing_time
}
except Exception as e:
logger.error(f"❌ Operation {operation} failed on {server_name}: {e}")
continue
# Fallback: return available capabilities
return {
"server_name": "system",
"operation": "list_capabilities",
"data": {
"available_servers": list(mcp_state.servers.keys()),
"server_status": {name: info["status"] for name, info in mcp_state.servers.items()}
},
"processing_time": (datetime.now() - start_time).total_seconds() * 1000
}
async def execute_mcp_operation(server_name: str, operation: str, task_analysis: Dict[str, Any]) -> Any:
"""Execute specific MCP operation - simplified implementation."""
# This is a simplified implementation - in practice, you'd parse the task
# and extract proper parameters for each operation
if operation == "read_file":
# Extract file path from task text
file_path = "/tmp/example.txt" # Simplified
return await mcp_state.tools.call_tool(server_name, operation, {"path": file_path})
elif operation == "search_repositories":
# Extract search query
query = "MCP protocol" # Simplified
return await mcp_state.tools.call_tool(server_name, operation, {"query": query})
elif operation == "web_search":
# Extract search terms
query = task_analysis["text"]
return await mcp_state.tools.call_tool(server_name, operation, {"query": query})
else:
return {"message": f"Operation {operation} not implemented"}
async def generate_health_report() -> Dict[str, Any]:
"""Generate comprehensive health report for MCP integration."""
health_report = {
"timestamp": datetime.now().isoformat(),
"overall_status": "healthy" if mcp_state.connected else "unhealthy",
"uptime": None,
"servers": {},
"metrics": mcp_state.metrics.copy()
}
# Calculate uptime
if mcp_state.metrics["boot_time"]:
uptime = datetime.now() - mcp_state.metrics["boot_time"]
health_report["uptime"] = uptime.total_seconds()
# Server-specific health checks
for server_name, server_info in mcp_state.servers.items():
try:
if server_info["status"] == "connected" and mcp_state.tools:
# Ping server
tools = await mcp_state.tools.list_tools(server_name)
health_report["servers"][server_name] = {
"status": "healthy",
"tools_count": len(tools),
"last_successful_ping": datetime.now().isoformat(),
"reconnection_attempts": server_info.get("reconnection_attempts", 0)
}
else:
health_report["servers"][server_name] = {
"status": "unhealthy",
"reason": "Not connected",
"reconnection_attempts": server_info.get("reconnection_attempts", 0)
}
except Exception as e:
health_report["servers"][server_name] = {
"status": "unhealthy",
"error": str(e),
"last_error_time": datetime.now().isoformat()
}
return health_report
# === BACKGROUND SERVICES ===
async def health_monitoring_service():
"""Background service for continuous health monitoring."""
logger.info("🩺 Health monitoring service started")
while not mcp_state.shutdown_requested:
try:
if mcp_state.connected:
health_report = await generate_health_report()
# Check for unhealthy servers
unhealthy_servers = [
name for name, status in health_report["servers"].items()
if status.get("status") != "healthy"
]
if unhealthy_servers:
logger.warning(f"⚠️ Unhealthy servers detected: {unhealthy_servers}")
# Trigger reconnection attempts
for server_name in unhealthy_servers:
await attempt_server_reconnection(server_name)
await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"❌ Health monitoring error: {e}")
await asyncio.sleep(60)
logger.info("🩺 Health monitoring service stopped")
async def metrics_collection_service():
"""Background service for metrics collection."""
logger.info("📊 Metrics collection service started")
while not mcp_state.shutdown_requested:
try:
# Update metrics periodically
mcp_state.metrics["timestamp"] = datetime.now().isoformat()
# Log periodic metrics
if mcp_state.metrics["tasks_processed"] % 100 == 0 and mcp_state.metrics["tasks_processed"] > 0:
logger.info(f"📈 Processed {mcp_state.metrics['tasks_processed']} tasks, {mcp_state.metrics['errors_encountered']} errors")
await asyncio.sleep(300) # Update every 5 minutes
except Exception as e:
logger.error(f"❌ Metrics collection error: {e}")
await asyncio.sleep(300)
logger.info("📊 Metrics collection service stopped")
async def connection_maintenance_service():
"""Background service for connection maintenance."""
logger.info("🔧 Connection maintenance service started")
while not mcp_state.shutdown_requested:
try:
# Perform connection maintenance
if mcp_state.connected:
for server_name, server_info in mcp_state.servers.items():
if server_info["status"] == "connected":
# Periodic ping to keep connection alive
try:
await mcp_state.tools.list_tools(server_name)
server_info["last_ping"] = datetime.now().isoformat()
except Exception as e:
logger.warning(f"⚠️ Connection maintenance failed for {server_name}: {e}")
server_info["status"] = "unhealthy"
await asyncio.sleep(600) # Maintenance every 10 minutes
except Exception as e:
logger.error(f"❌ Connection maintenance error: {e}")
await asyncio.sleep(600)
logger.info("🔧 Connection maintenance service stopped")
async def attempt_server_reconnection(server_name: str):
"""Attempt to reconnect to a specific server."""
server_info = mcp_state.servers.get(server_name)
if not server_info:
return
server_info["reconnection_attempts"] = server_info.get("reconnection_attempts", 0) + 1
mcp_state.metrics["reconnections"] += 1
logger.info(f"🔄 Attempting reconnection to {server_name} (attempt #{server_info['reconnection_attempts']})")
try:
await mcp_state.tools.reconnect(server_name)
server_info["status"] = "connected"
server_info["last_ping"] = datetime.now().isoformat()
logger.info(f"✅ Successfully reconnected to {server_name}")
except Exception as e:
logger.error(f"❌ Reconnection failed for {server_name}: {e}")
server_info["status"] = "failed"
# Initialize backend and agent
backend = Backend()
agno_agent = Agent(**backend.get_args())
# The agent is now ready with advanced MCP lifecycle management
logger.info("🚀 Agent ready with advanced MCP lifecycle management!")
logger.info("📋 Try: 'Check MCP health status'")
logger.info("📋 Try: 'Search for Python examples on GitHub'")
logger.info("📋 Try: 'Read the file /tmp/config.json'")
# Example usage
if __name__ == "__main__":
try:
agno_agent.print_response(message="Check MCP health status and show available capabilities")
except KeyboardInterrupt:
logger.info("🔄 Graceful shutdown initiated...")
Production Deployment Considerations
Monitoring Integration
Copy
Ask AI
# Send metrics to monitoring system
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
mcp_tasks_total = Counter('mcp_tasks_processed_total')
mcp_errors_total = Counter('mcp_errors_total')
mcp_connection_status = Gauge('mcp_server_connected')
Configuration Management
Copy
Ask AI
# Environment-based configuration
class MCPConfig:
@classmethod
def from_env(cls):
return cls(
servers=json.loads(os.getenv('MCP_SERVERS', '{}')),
timeouts=int(os.getenv('MCP_TIMEOUT', '30')),
retry_attempts=int(os.getenv('MCP_RETRY_ATTEMPTS', '3'))
)
Testing Your Lifecycle Implementation
test_lifecycle.py
Copy
Ask AI
import pytest
import asyncio
@pytest.mark.asyncio
async def test_mcp_initialization():
"""Test MCP initialization process."""
# Simulate initialization
await initialize_mcp_tools()
assert mcp_state.connected == True
assert len(mcp_state.servers) > 0
@pytest.mark.asyncio
async def test_graceful_shutdown():
"""Test graceful shutdown process."""
# Setup
await initialize_mcp_tools()
# Trigger shutdown
await graceful_mcp_shutdown()
# Verify cleanup
assert mcp_state.connected == False
assert mcp_state.tools == None
Key Lifecycle Patterns
Initialization Patterns
Initialization Patterns
- Environment Validation First - Always validate before connecting
- Graceful Degradation - Continue with partial functionality if some servers fail
- Background Services - Start monitoring and maintenance services
- Signal Handling - Setup proper signal handlers for graceful shutdown
Runtime Patterns
Runtime Patterns
- Health Monitoring - Continuous monitoring of server health
- Connection Pooling - Efficient connection management
- Circuit Breaking - Fail fast for unhealthy servers
- Metrics Collection - Comprehensive operational metrics
Shutdown Patterns
Shutdown Patterns
- Signal Handling - Respond to system shutdown signals
- State Preservation - Save important state before shutdown
- Connection Cleanup - Gracefully close all connections
- Resource Cleanup - Clean up temporary files and sensitive data
Best Practices Summary
- Always use lifecycle decorators for proper resource management
- Implement comprehensive health checking with automated recovery
- Use background services for monitoring and maintenance
- Handle errors gracefully with fallback mechanisms
- Save operational metrics for monitoring and debugging
- Clean up resources thoroughly during shutdown
- Use environment-based configuration for flexibility
- Implement proper signal handling for graceful shutdowns
Next Steps
- Production Monitoring: Integrate with your monitoring stack
- Custom MCP Servers: Build domain-specific MCP servers
- Load Testing: Test your implementation under load
- Security Hardening: Implement authentication and authorization
References
- XPander Issue: #401
- SDK Version: v2.0.32
- Related Examples: MCP Integration Guide, Basic Lifecycle Management