Best Practices Guide Summary
- Goal: Production-ready MCP integration with comprehensive testing
- SDK Version: v2.0.32 or higher
- Prerequisites: Understanding of testing frameworks and monitoring
- Reference: XPander issue #401
Best Practices Overview
This guide provides comprehensive best practices for implementing, testing, and maintaining MCP integrations in production environments.Development Best Practices
Configuration Management
Environment-based configuration with validation and fallbacks
Error Handling
Comprehensive error handling with retry logic and circuit breakers
Monitoring & Observability
Detailed logging, metrics, and health checks for operational visibility
Testing Strategy
Unit, integration, and end-to-end testing for reliable deployments
Configuration Best Practices
Environment-Based Configuration
config_best_practices.py
Copy
Ask AI
import os
import json
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class MCPServerConfig:
"""Configuration for a single MCP server."""
command: str
args: list[str] = field(default_factory=list)
env: Dict[str, str] = field(default_factory=dict)
timeout: int = 30
retry_attempts: int = 3
health_check_interval: int = 60
@dataclass
class MCPConfig:
"""Complete MCP configuration."""
servers: Dict[str, MCPServerConfig] = field(default_factory=dict)
global_timeout: int = 30
max_concurrent_connections: int = 10
health_check_enabled: bool = True
metrics_enabled: bool = True
@classmethod
def from_env(cls) -> 'MCPConfig':
"""Load configuration from environment variables."""
config = cls()
# Load from environment variable
config_json = os.getenv('MCP_CONFIG_JSON')
if config_json:
data = json.loads(config_json)
return cls.from_dict(data)
# Load from config file
config_path = os.getenv('MCP_CONFIG_PATH', '~/.mcp/config.json')
config_file = Path(config_path).expanduser()
if config_file.exists():
with open(config_file) as f:
data = json.load(f)
return cls.from_dict(data)
# Default configuration
return cls.default_config()
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'MCPConfig':
"""Create configuration from dictionary."""
servers = {}
for name, server_data in data.get('servers', {}).items():
servers[name] = MCPServerConfig(**server_data)
return cls(
servers=servers,
global_timeout=data.get('global_timeout', 30),
max_concurrent_connections=data.get('max_concurrent_connections', 10),
health_check_enabled=data.get('health_check_enabled', True),
metrics_enabled=data.get('metrics_enabled', True)
)
@classmethod
def default_config(cls) -> 'MCPConfig':
"""Create default configuration."""
servers = {}
# Add filesystem server if available
servers['filesystem'] = MCPServerConfig(
command='npx',
args=['@modelcontextprotocol/server-filesystem', '/tmp'],
env={}
)
# Add GitHub server if token available
github_token = os.getenv('GITHUB_TOKEN')
if github_token:
servers['github'] = MCPServerConfig(
command='npx',
args=['@modelcontextprotocol/server-github'],
env={'GITHUB_TOKEN': github_token}
)
return cls(servers=servers)
def validate(self) -> list[str]:
"""Validate configuration and return list of errors."""
errors = []
if not self.servers:
errors.append("No MCP servers configured")
for name, server in self.servers.items():
if not server.command:
errors.append(f"Server {name}: command is required")
if server.timeout <= 0:
errors.append(f"Server {name}: timeout must be positive")
return errors
# Usage example
def load_validated_config() -> MCPConfig:
"""Load and validate MCP configuration."""
config = MCPConfig.from_env()
errors = config.validate()
if errors:
raise ValueError(f"Configuration validation failed: {errors}")
return config
Secure Configuration Management
secure_config.py
Copy
Ask AI
import os
from cryptography.fernet import Fernet
import base64
class SecureMCPConfig:
"""Secure configuration management for MCP."""
def __init__(self):
# Get encryption key from environment or generate
key = os.getenv('MCP_ENCRYPTION_KEY')
if key:
self.fernet = Fernet(key.encode())
else:
self.fernet = Fernet(Fernet.generate_key())
def encrypt_sensitive_data(self, data: str) -> str:
"""Encrypt sensitive configuration data."""
return base64.urlsafe_b64encode(
self.fernet.encrypt(data.encode())
).decode()
def decrypt_sensitive_data(self, encrypted_data: str) -> str:
"""Decrypt sensitive configuration data."""
return self.fernet.decrypt(
base64.urlsafe_b64decode(encrypted_data.encode())
).decode()
def load_server_tokens(self) -> Dict[str, str]:
"""Load encrypted server tokens."""
tokens = {}
# Load encrypted tokens from environment
for key, value in os.environ.items():
if key.startswith('MCP_TOKEN_'):
server_name = key.replace('MCP_TOKEN_', '').lower()
try:
tokens[server_name] = self.decrypt_sensitive_data(value)
except Exception as e:
logger.warning(f"Failed to decrypt token for {server_name}: {e}")
return tokens
Error Handling & Resilience
Circuit Breaker Pattern
circuit_breaker.py
Copy
Ask AI
import asyncio
import time
from enum import Enum
from typing import Callable, Any, Optional
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for MCP server connections."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time: Optional[float] = None
self.state = CircuitState.CLOSED
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection."""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset."""
if self.last_failure_time is None:
return True
return time.time() - self.last_failure_time >= self.recovery_timeout
def _on_success(self):
"""Handle successful execution."""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed execution."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage with MCP tools
class ResilientMCPTools:
"""MCP tools wrapper with circuit breaker protection."""
def __init__(self, mcp_tools):
self.mcp_tools = mcp_tools
self.circuit_breakers = {}
def get_circuit_breaker(self, server_name: str) -> CircuitBreaker:
"""Get circuit breaker for specific server."""
if server_name not in self.circuit_breakers:
self.circuit_breakers[server_name] = CircuitBreaker(
failure_threshold=3,
recovery_timeout=30
)
return self.circuit_breakers[server_name]
async def call_tool_with_protection(self, server_name: str, tool_name: str, args: dict):
"""Call MCP tool with circuit breaker protection."""
circuit_breaker = self.get_circuit_breaker(server_name)
return await circuit_breaker.call(
self.mcp_tools.call_tool,
server_name,
tool_name,
args
)
Retry Logic with Exponential Backoff
retry_logic.py
Copy
Ask AI
import asyncio
import random
from typing import Callable, Any, Optional
import logging
logger = logging.getLogger(__name__)
async def retry_with_exponential_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_multiplier: float = 2.0,
jitter: bool = True,
exceptions: tuple = (Exception,)
):
"""Execute function with exponential backoff retry logic."""
for attempt in range(max_retries + 1):
try:
return await func()
except exceptions as e:
if attempt == max_retries:
logger.error(f"Function failed after {max_retries + 1} attempts: {e}")
raise e
# Calculate delay with exponential backoff
delay = base_delay * (backoff_multiplier ** attempt)
delay = min(delay, max_delay)
# Add jitter to prevent thundering herd
if jitter:
delay = delay * (0.5 + random.random() * 0.5)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
# Usage example
async def connect_with_retry(mcp_tools, server_name: str):
"""Connect to MCP server with retry logic."""
async def connect_func():
return await mcp_tools.connect(server_name)
return await retry_with_exponential_backoff(
connect_func,
max_retries=3,
base_delay=1.0,
exceptions=(ConnectionError, TimeoutError)
)
Monitoring & Observability
Comprehensive Logging
mcp_logging.py
Copy
Ask AI
import logging
import json
import time
from contextlib import contextmanager
from typing import Dict, Any
class MCPLogger:
"""Structured logging for MCP operations."""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
# Configure structured logging
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_operation(self, operation: str, server_name: str, **kwargs):
"""Log MCP operation with structured data."""
log_data = {
"operation": operation,
"server_name": server_name,
"timestamp": time.time(),
**kwargs
}
self.logger.info(f"MCP Operation: {json.dumps(log_data)}")
def log_error(self, operation: str, server_name: str, error: Exception, **kwargs):
"""Log MCP error with structured data."""
log_data = {
"operation": operation,
"server_name": server_name,
"error_type": type(error).__name__,
"error_message": str(error),
"timestamp": time.time(),
**kwargs
}
self.logger.error(f"MCP Error: {json.dumps(log_data)}")
@contextmanager
def operation_context(self, operation: str, server_name: str, **kwargs):
"""Context manager for logging operation duration."""
start_time = time.time()
try:
self.log_operation(operation, server_name, status="started", **kwargs)
yield
duration = time.time() - start_time
self.log_operation(
operation, server_name,
status="completed",
duration_ms=duration * 1000,
**kwargs
)
except Exception as e:
duration = time.time() - start_time
self.log_error(
operation, server_name, e,
status="failed",
duration_ms=duration * 1000,
**kwargs
)
raise
# Usage example
mcp_logger = MCPLogger("mcp_integration")
async def logged_mcp_operation(mcp_tools, server_name: str, tool_name: str, args: dict):
"""Execute MCP operation with comprehensive logging."""
with mcp_logger.operation_context("call_tool", server_name, tool_name=tool_name):
return await mcp_tools.call_tool(server_name, tool_name, args)
Metrics Collection
mcp_metrics.py
Copy
Ask AI
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from collections import defaultdict, deque
import threading
@dataclass
class MCPMetrics:
"""Comprehensive MCP metrics collection."""
# Connection metrics
connection_attempts: int = 0
successful_connections: int = 0
failed_connections: int = 0
active_connections: int = 0
# Operation metrics
total_operations: int = 0
successful_operations: int = 0
failed_operations: int = 0
# Performance metrics
average_response_time: float = 0.0
response_times: deque = field(default_factory=lambda: deque(maxlen=1000))
# Server-specific metrics
server_metrics: Dict[str, Dict[str, Any]] = field(default_factory=dict)
# Error tracking
error_counts: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
last_errors: Dict[str, str] = field(default_factory=dict)
# Health metrics
last_health_check: Optional[float] = None
health_check_failures: int = 0
def __post_init__(self):
self._lock = threading.Lock()
def record_connection_attempt(self, server_name: str, success: bool):
"""Record connection attempt."""
with self._lock:
self.connection_attempts += 1
if success:
self.successful_connections += 1
self.active_connections += 1
else:
self.failed_connections += 1
# Update server-specific metrics
if server_name not in self.server_metrics:
self.server_metrics[server_name] = {
"connection_attempts": 0,
"successful_connections": 0,
"operations": 0,
"errors": 0
}
self.server_metrics[server_name]["connection_attempts"] += 1
if success:
self.server_metrics[server_name]["successful_connections"] += 1
def record_operation(self, server_name: str, duration: float, success: bool, error: Optional[str] = None):
"""Record operation metrics."""
with self._lock:
self.total_operations += 1
if success:
self.successful_operations += 1
else:
self.failed_operations += 1
if error:
self.error_counts[error] += 1
self.last_errors[server_name] = error
# Update response time metrics
self.response_times.append(duration)
if self.response_times:
self.average_response_time = sum(self.response_times) / len(self.response_times)
# Update server-specific metrics
if server_name in self.server_metrics:
self.server_metrics[server_name]["operations"] += 1
if not success:
self.server_metrics[server_name]["errors"] += 1
def record_health_check(self, success: bool):
"""Record health check result."""
with self._lock:
self.last_health_check = time.time()
if not success:
self.health_check_failures += 1
def get_summary(self) -> Dict[str, Any]:
"""Get metrics summary."""
with self._lock:
success_rate = (
self.successful_operations / self.total_operations
if self.total_operations > 0 else 0
)
connection_success_rate = (
self.successful_connections / self.connection_attempts
if self.connection_attempts > 0 else 0
)
return {
"timestamp": time.time(),
"connections": {
"attempts": self.connection_attempts,
"successful": self.successful_connections,
"failed": self.failed_connections,
"active": self.active_connections,
"success_rate": connection_success_rate
},
"operations": {
"total": self.total_operations,
"successful": self.successful_operations,
"failed": self.failed_operations,
"success_rate": success_rate
},
"performance": {
"average_response_time_ms": self.average_response_time * 1000,
"response_count": len(self.response_times)
},
"health": {
"last_check": self.last_health_check,
"failures": self.health_check_failures
},
"servers": dict(self.server_metrics),
"top_errors": dict(list(sorted(
self.error_counts.items(),
key=lambda x: x[1],
reverse=True
))[:5])
}
# Global metrics instance
mcp_metrics = MCPMetrics()
# Usage decorators
def track_mcp_operation(server_name: str):
"""Decorator to track MCP operations."""
def decorator(func):
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
mcp_metrics.record_operation(server_name, duration, True)
return result
except Exception as e:
duration = time.time() - start_time
mcp_metrics.record_operation(server_name, duration, False, str(e))
raise
return wrapper
return decorator
Testing Strategy
Unit Tests
test_mcp_unit.py
Copy
Ask AI
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from xpander_sdk.tools import MultiMCPTools
class TestMCPIntegration:
"""Unit tests for MCP integration."""
@pytest.fixture
def mock_mcp_tools(self):
"""Mock MCP tools for testing."""
tools = Mock(spec=MultiMCPTools)
tools.connect = AsyncMock()
tools.disconnect = AsyncMock()
tools.list_tools = AsyncMock()
tools.call_tool = AsyncMock()
return tools
@pytest.fixture
def sample_server_config(self):
"""Sample server configuration for testing."""
return {
"filesystem": {
"command": "npx",
"args": ["@modelcontextprotocol/server-filesystem", "/tmp"]
}
}
@pytest.mark.asyncio
async def test_successful_connection(self, mock_mcp_tools, sample_server_config):
"""Test successful MCP server connection."""
mock_mcp_tools.connect.return_value = True
# Test connection
await mock_mcp_tools.connect("filesystem")
# Verify connection was called
mock_mcp_tools.connect.assert_called_once_with("filesystem")
@pytest.mark.asyncio
async def test_connection_failure(self, mock_mcp_tools):
"""Test MCP connection failure handling."""
mock_mcp_tools.connect.side_effect = ConnectionError("Connection failed")
with pytest.raises(ConnectionError):
await mock_mcp_tools.connect("filesystem")
@pytest.mark.asyncio
async def test_tool_discovery(self, mock_mcp_tools):
"""Test MCP tool discovery."""
expected_tools = [
{"name": "read_file", "description": "Read file contents"},
{"name": "write_file", "description": "Write file contents"}
]
mock_mcp_tools.list_tools.return_value = expected_tools
tools = await mock_mcp_tools.list_tools("filesystem")
assert tools == expected_tools
mock_mcp_tools.list_tools.assert_called_once_with("filesystem")
@pytest.mark.asyncio
async def test_tool_execution(self, mock_mcp_tools):
"""Test MCP tool execution."""
expected_result = {"content": "file contents"}
mock_mcp_tools.call_tool.return_value = expected_result
result = await mock_mcp_tools.call_tool(
"filesystem",
"read_file",
{"path": "/tmp/test.txt"}
)
assert result == expected_result
mock_mcp_tools.call_tool.assert_called_once_with(
"filesystem", "read_file", {"path": "/tmp/test.txt"}
)
def test_metrics_collection(self):
"""Test metrics collection functionality."""
metrics = MCPMetrics()
# Record some operations
metrics.record_connection_attempt("filesystem", True)
metrics.record_operation("filesystem", 0.5, True)
metrics.record_operation("filesystem", 1.0, False, "timeout")
summary = metrics.get_summary()
assert summary["connections"]["attempts"] == 1
assert summary["connections"]["successful"] == 1
assert summary["operations"]["total"] == 2
assert summary["operations"]["successful"] == 1
assert summary["operations"]["failed"] == 1
def test_circuit_breaker_open(self):
"""Test circuit breaker opens after failures."""
circuit_breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=1)
# Simulate failures
for _ in range(3):
try:
circuit_breaker._on_failure()
except:
pass
assert circuit_breaker.state == CircuitState.OPEN
def test_configuration_validation(self):
"""Test configuration validation."""
config = MCPConfig()
# Empty configuration should have errors
errors = config.validate()
assert len(errors) > 0
# Valid configuration should pass
config.servers["test"] = MCPServerConfig(command="test", args=[])
errors = config.validate()
assert len(errors) == 0
Integration Tests
test_mcp_integration.py
Copy
Ask AI
import pytest
import asyncio
import tempfile
import json
from pathlib import Path
class TestMCPIntegrationE2E:
"""End-to-end integration tests for MCP."""
@pytest.fixture
async def real_mcp_tools(self):
"""Real MCP tools instance for integration testing."""
# Only run if MCP servers are available
pytest.importorskip("mcp")
config = {
"filesystem": {
"command": "npx",
"args": ["@modelcontextprotocol/server-filesystem", "/tmp"]
}
}
tools = MultiMCPTools(servers=config)
try:
await tools.connect_all()
yield tools
finally:
await tools.disconnect_all()
@pytest.mark.integration
@pytest.mark.asyncio
async def test_filesystem_operations(self, real_mcp_tools):
"""Test real filesystem operations."""
# Create test file
test_file = "/tmp/mcp_test.txt"
test_content = "MCP integration test content"
with open(test_file, 'w') as f:
f.write(test_content)
try:
# Test file reading
result = await real_mcp_tools.call_tool(
"filesystem",
"read_file",
{"path": test_file}
)
assert result["content"] == test_content
finally:
# Cleanup
Path(test_file).unlink(missing_ok=True)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_server_health_check(self, real_mcp_tools):
"""Test server health checking."""
# List available tools
tools = await real_mcp_tools.list_tools("filesystem")
assert len(tools) > 0
assert any(tool["name"] == "read_file" for tool in tools)
@pytest.mark.integration
async def test_connection_recovery(self, real_mcp_tools):
"""Test connection recovery after disconnection."""
# Disconnect and reconnect
await real_mcp_tools.disconnect("filesystem")
await real_mcp_tools.connect("filesystem")
# Verify connection works
tools = await real_mcp_tools.list_tools("filesystem")
assert len(tools) > 0
@pytest.mark.load
class TestMCPLoadTesting:
"""Load testing for MCP integration."""
@pytest.mark.asyncio
async def test_concurrent_operations(self):
"""Test concurrent MCP operations."""
mock_tools = Mock(spec=MultiMCPTools)
mock_tools.call_tool = AsyncMock(return_value={"result": "success"})
# Simulate concurrent operations
tasks = []
for i in range(100):
task = mock_tools.call_tool("filesystem", "read_file", {"path": f"/tmp/file_{i}.txt"})
tasks.append(task)
results = await asyncio.gather(*tasks)
assert len(results) == 100
assert all(r["result"] == "success" for r in results)
assert mock_tools.call_tool.call_count == 100
@pytest.mark.asyncio
async def test_connection_pool_limits(self):
"""Test connection pool behavior under load."""
# This would test actual connection pooling limits
# Implementation depends on your connection pooling strategy
pass
Test Configuration
conftest.py
Copy
Ask AI
import pytest
import asyncio
import os
from unittest.mock import patch
def pytest_configure(config):
"""Configure pytest with custom markers."""
config.addinivalue_line("markers", "integration: integration tests requiring real MCP servers")
config.addinivalue_line("markers", "load: load testing scenarios")
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
@pytest.fixture
def mock_environment():
"""Mock environment variables for testing."""
env_vars = {
"XPANDER_API_KEY": "test-api-key",
"XPANDER_ORGANIZATION_ID": "test-org-id",
"GITHUB_TOKEN": "test-github-token"
}
with patch.dict(os.environ, env_vars):
yield env_vars
@pytest.fixture
def temp_mcp_config():
"""Create temporary MCP configuration file."""
import tempfile
import json
config = {
"servers": {
"filesystem": {
"command": "npx",
"args": ["@modelcontextprotocol/server-filesystem", "/tmp"]
}
},
"global_timeout": 30
}
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(config, f)
f.flush()
yield f.name
os.unlink(f.name)
Production Deployment Checklist
Pre-Deployment
Pre-Deployment
Configuration Validation
- Environment variables are properly set
- MCP server configurations are validated
- Security tokens are encrypted
- Configuration schema is validated
Testing
- Unit tests pass
- Integration tests pass
- Load tests meet performance requirements
- Security tests validate token handling
Monitoring Setup
- Logging is configured
- Metrics collection is enabled
- Health checks are implemented
- Alerting is configured
Deployment
Deployment
Infrastructure
- MCP servers are installed and available
- Network connectivity is verified
- Resource limits are configured
- Backup and recovery procedures are in place
Application
- Application is deployed with proper configuration
- Health checks are passing
- Metrics are being collected
- Logs are being generated and stored
Performance Optimization
Connection Pooling
connection_pool.py
Copy
Ask AI
import asyncio
from typing import Dict, Optional
from dataclasses import dataclass
@dataclass
class ConnectionPool:
"""Connection pool for MCP servers."""
max_connections: int = 10
min_connections: int = 2
connection_timeout: int = 30
def __post_init__(self):
self.pools: Dict[str, asyncio.Queue] = {}
self.active_connections: Dict[str, int] = {}
async def get_connection(self, server_name: str):
"""Get connection from pool."""
if server_name not in self.pools:
self.pools[server_name] = asyncio.Queue(maxsize=self.max_connections)
self.active_connections[server_name] = 0
pool = self.pools[server_name]
try:
# Try to get existing connection
connection = pool.get_nowait()
return connection
except asyncio.QueueEmpty:
# Create new connection if under limit
if self.active_connections[server_name] < self.max_connections:
connection = await self._create_connection(server_name)
self.active_connections[server_name] += 1
return connection
else:
# Wait for available connection
return await asyncio.wait_for(pool.get(), timeout=self.connection_timeout)
async def return_connection(self, server_name: str, connection):
"""Return connection to pool."""
if server_name in self.pools:
try:
self.pools[server_name].put_nowait(connection)
except asyncio.QueueFull:
# Pool is full, close connection
await self._close_connection(connection)
self.active_connections[server_name] -= 1
async def _create_connection(self, server_name: str):
"""Create new connection to MCP server."""
# Implementation depends on MCP client library
pass
async def _close_connection(self, connection):
"""Close MCP connection."""
# Implementation depends on MCP client library
pass
Caching Strategy
mcp_caching.py
Copy
Ask AI
import asyncio
import time
from typing import Any, Optional, Dict, Tuple
import hashlib
import json
class MCPCache:
"""Caching layer for MCP operations."""
def __init__(self, default_ttl: int = 300):
self.cache: Dict[str, Tuple[Any, float]] = {}
self.default_ttl = default_ttl
self._lock = asyncio.Lock()
def _generate_cache_key(self, server_name: str, tool_name: str, args: dict) -> str:
"""Generate cache key from operation parameters."""
key_data = {
"server": server_name,
"tool": tool_name,
"args": args
}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.md5(key_str.encode()).hexdigest()
async def get(self, server_name: str, tool_name: str, args: dict) -> Optional[Any]:
"""Get cached result if available and not expired."""
cache_key = self._generate_cache_key(server_name, tool_name, args)
async with self._lock:
if cache_key in self.cache:
result, expiry_time = self.cache[cache_key]
if time.time() < expiry_time:
return result
else:
# Remove expired entry
del self.cache[cache_key]
return None
async def set(self, server_name: str, tool_name: str, args: dict, result: Any, ttl: Optional[int] = None):
"""Cache operation result."""
cache_key = self._generate_cache_key(server_name, tool_name, args)
expiry_time = time.time() + (ttl or self.default_ttl)
async with self._lock:
self.cache[cache_key] = (result, expiry_time)
async def invalidate(self, server_name: str, tool_name: str, args: dict):
"""Invalidate specific cache entry."""
cache_key = self._generate_cache_key(server_name, tool_name, args)
async with self._lock:
self.cache.pop(cache_key, None)
async def clear_expired(self):
"""Clear expired cache entries."""
current_time = time.time()
async with self._lock:
expired_keys = [
key for key, (_, expiry_time) in self.cache.items()
if current_time >= expiry_time
]
for key in expired_keys:
del self.cache[key]
# Usage with MCP tools
class CachedMCPTools:
"""MCP tools wrapper with caching."""
def __init__(self, mcp_tools, cache_ttl: int = 300):
self.mcp_tools = mcp_tools
self.cache = MCPCache(default_ttl=cache_ttl)
async def call_tool_cached(self, server_name: str, tool_name: str, args: dict, use_cache: bool = True):
"""Call MCP tool with caching support."""
if use_cache:
# Try to get from cache first
cached_result = await self.cache.get(server_name, tool_name, args)
if cached_result is not None:
return cached_result
# Call actual MCP tool
result = await self.mcp_tools.call_tool(server_name, tool_name, args)
if use_cache:
# Cache the result
await self.cache.set(server_name, tool_name, args, result)
return result
Security Best Practices
Authentication & Authorization
Authentication & Authorization
- Token Management: Store tokens securely using environment variables or secret management systems
- Token Rotation: Implement automatic token rotation for long-lived tokens
- Access Control: Limit MCP server access based on user roles and permissions
- Audit Logging: Log all MCP operations for security auditing
Network Security
Network Security
- TLS Encryption: Ensure all MCP communication uses TLS
- Network Segmentation: Isolate MCP servers in secure network segments
- Firewall Rules: Configure strict firewall rules for MCP server access
- VPN/Private Networks: Use VPNs for remote MCP server access
Data Security
Data Security
- Data Encryption: Encrypt sensitive data at rest and in transit
- Data Sanitization: Sanitize data before passing to MCP servers
- Data Retention: Implement proper data retention and deletion policies
- Backup Security: Secure backup data with encryption and access controls
Summary
This comprehensive guide covers:- Configuration Management: Environment-based, secure, and validated configuration
- Error Handling: Circuit breakers, retry logic, and graceful degradation
- Monitoring: Structured logging, metrics collection, and health checks
- Testing: Unit, integration, and load testing strategies
- Performance: Connection pooling, caching, and optimization techniques
- Security: Authentication, authorization, and data protection
References
- XPander Issue: #401
- SDK Version: v2.0.32
- Related Examples: MCP Integration Guide, Advanced Lifecycle Management