Skip to main content
Best Practices Guide Summary
  • Goal: Production-ready MCP integration with comprehensive testing
  • SDK Version: v2.0.32 or higher
  • Prerequisites: Understanding of testing frameworks and monitoring
  • Reference: XPander issue #401

Best Practices Overview

This guide provides comprehensive best practices for implementing, testing, and maintaining MCP integrations in production environments.

Development Best Practices

Configuration Management

Environment-based configuration with validation and fallbacks

Error Handling

Comprehensive error handling with retry logic and circuit breakers

Monitoring & Observability

Detailed logging, metrics, and health checks for operational visibility

Testing Strategy

Unit, integration, and end-to-end testing for reliable deployments

Configuration Best Practices

Environment-Based Configuration

config_best_practices.py
import os
import json
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from pathlib import Path

@dataclass
class MCPServerConfig:
    """Configuration for a single MCP server."""
    command: str
    args: list[str] = field(default_factory=list)
    env: Dict[str, str] = field(default_factory=dict)
    timeout: int = 30
    retry_attempts: int = 3
    health_check_interval: int = 60

@dataclass
class MCPConfig:
    """Complete MCP configuration."""
    servers: Dict[str, MCPServerConfig] = field(default_factory=dict)
    global_timeout: int = 30
    max_concurrent_connections: int = 10
    health_check_enabled: bool = True
    metrics_enabled: bool = True
    
    @classmethod
    def from_env(cls) -> 'MCPConfig':
        """Load configuration from environment variables."""
        config = cls()
        
        # Load from environment variable
        config_json = os.getenv('MCP_CONFIG_JSON')
        if config_json:
            data = json.loads(config_json)
            return cls.from_dict(data)
        
        # Load from config file
        config_path = os.getenv('MCP_CONFIG_PATH', '~/.mcp/config.json')
        config_file = Path(config_path).expanduser()
        
        if config_file.exists():
            with open(config_file) as f:
                data = json.load(f)
                return cls.from_dict(data)
        
        # Default configuration
        return cls.default_config()
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'MCPConfig':
        """Create configuration from dictionary."""
        servers = {}
        for name, server_data in data.get('servers', {}).items():
            servers[name] = MCPServerConfig(**server_data)
        
        return cls(
            servers=servers,
            global_timeout=data.get('global_timeout', 30),
            max_concurrent_connections=data.get('max_concurrent_connections', 10),
            health_check_enabled=data.get('health_check_enabled', True),
            metrics_enabled=data.get('metrics_enabled', True)
        )
    
    @classmethod
    def default_config(cls) -> 'MCPConfig':
        """Create default configuration."""
        servers = {}
        
        # Add filesystem server if available
        servers['filesystem'] = MCPServerConfig(
            command='npx',
            args=['@modelcontextprotocol/server-filesystem', '/tmp'],
            env={}
        )
        
        # Add GitHub server if token available
        github_token = os.getenv('GITHUB_TOKEN')
        if github_token:
            servers['github'] = MCPServerConfig(
                command='npx',
                args=['@modelcontextprotocol/server-github'],
                env={'GITHUB_TOKEN': github_token}
            )
        
        return cls(servers=servers)
    
    def validate(self) -> list[str]:
        """Validate configuration and return list of errors."""
        errors = []
        
        if not self.servers:
            errors.append("No MCP servers configured")
        
        for name, server in self.servers.items():
            if not server.command:
                errors.append(f"Server {name}: command is required")
            
            if server.timeout <= 0:
                errors.append(f"Server {name}: timeout must be positive")
        
        return errors

# Usage example
def load_validated_config() -> MCPConfig:
    """Load and validate MCP configuration."""
    config = MCPConfig.from_env()
    errors = config.validate()
    
    if errors:
        raise ValueError(f"Configuration validation failed: {errors}")
    
    return config

Secure Configuration Management

secure_config.py
import os
from cryptography.fernet import Fernet
import base64

class SecureMCPConfig:
    """Secure configuration management for MCP."""
    
    def __init__(self):
        # Get encryption key from environment or generate
        key = os.getenv('MCP_ENCRYPTION_KEY')
        if key:
            self.fernet = Fernet(key.encode())
        else:
            self.fernet = Fernet(Fernet.generate_key())
    
    def encrypt_sensitive_data(self, data: str) -> str:
        """Encrypt sensitive configuration data."""
        return base64.urlsafe_b64encode(
            self.fernet.encrypt(data.encode())
        ).decode()
    
    def decrypt_sensitive_data(self, encrypted_data: str) -> str:
        """Decrypt sensitive configuration data."""
        return self.fernet.decrypt(
            base64.urlsafe_b64decode(encrypted_data.encode())
        ).decode()
    
    def load_server_tokens(self) -> Dict[str, str]:
        """Load encrypted server tokens."""
        tokens = {}
        
        # Load encrypted tokens from environment
        for key, value in os.environ.items():
            if key.startswith('MCP_TOKEN_'):
                server_name = key.replace('MCP_TOKEN_', '').lower()
                try:
                    tokens[server_name] = self.decrypt_sensitive_data(value)
                except Exception as e:
                    logger.warning(f"Failed to decrypt token for {server_name}: {e}")
        
        return tokens

Error Handling & Resilience

Circuit Breaker Pattern

circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, Any, Optional

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker for MCP server connections."""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time: Optional[float] = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection."""
        
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
            
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset."""
        if self.last_failure_time is None:
            return True
        
        return time.time() - self.last_failure_time >= self.recovery_timeout
    
    def _on_success(self):
        """Handle successful execution."""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Handle failed execution."""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Usage with MCP tools
class ResilientMCPTools:
    """MCP tools wrapper with circuit breaker protection."""
    
    def __init__(self, mcp_tools):
        self.mcp_tools = mcp_tools
        self.circuit_breakers = {}
    
    def get_circuit_breaker(self, server_name: str) -> CircuitBreaker:
        """Get circuit breaker for specific server."""
        if server_name not in self.circuit_breakers:
            self.circuit_breakers[server_name] = CircuitBreaker(
                failure_threshold=3,
                recovery_timeout=30
            )
        return self.circuit_breakers[server_name]
    
    async def call_tool_with_protection(self, server_name: str, tool_name: str, args: dict):
        """Call MCP tool with circuit breaker protection."""
        circuit_breaker = self.get_circuit_breaker(server_name)
        
        return await circuit_breaker.call(
            self.mcp_tools.call_tool,
            server_name,
            tool_name,
            args
        )

Retry Logic with Exponential Backoff

retry_logic.py
import asyncio
import random
from typing import Callable, Any, Optional
import logging

logger = logging.getLogger(__name__)

async def retry_with_exponential_backoff(
    func: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    backoff_multiplier: float = 2.0,
    jitter: bool = True,
    exceptions: tuple = (Exception,)
):
    """Execute function with exponential backoff retry logic."""
    
    for attempt in range(max_retries + 1):
        try:
            return await func()
            
        except exceptions as e:
            if attempt == max_retries:
                logger.error(f"Function failed after {max_retries + 1} attempts: {e}")
                raise e
            
            # Calculate delay with exponential backoff
            delay = base_delay * (backoff_multiplier ** attempt)
            delay = min(delay, max_delay)
            
            # Add jitter to prevent thundering herd
            if jitter:
                delay = delay * (0.5 + random.random() * 0.5)
            
            logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
            await asyncio.sleep(delay)

# Usage example
async def connect_with_retry(mcp_tools, server_name: str):
    """Connect to MCP server with retry logic."""
    
    async def connect_func():
        return await mcp_tools.connect(server_name)
    
    return await retry_with_exponential_backoff(
        connect_func,
        max_retries=3,
        base_delay=1.0,
        exceptions=(ConnectionError, TimeoutError)
    )

Monitoring & Observability

Comprehensive Logging

mcp_logging.py
import logging
import json
import time
from contextlib import contextmanager
from typing import Dict, Any

class MCPLogger:
    """Structured logging for MCP operations."""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        
        # Configure structured logging
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def log_operation(self, operation: str, server_name: str, **kwargs):
        """Log MCP operation with structured data."""
        log_data = {
            "operation": operation,
            "server_name": server_name,
            "timestamp": time.time(),
            **kwargs
        }
        
        self.logger.info(f"MCP Operation: {json.dumps(log_data)}")
    
    def log_error(self, operation: str, server_name: str, error: Exception, **kwargs):
        """Log MCP error with structured data."""
        log_data = {
            "operation": operation,
            "server_name": server_name,
            "error_type": type(error).__name__,
            "error_message": str(error),
            "timestamp": time.time(),
            **kwargs
        }
        
        self.logger.error(f"MCP Error: {json.dumps(log_data)}")
    
    @contextmanager
    def operation_context(self, operation: str, server_name: str, **kwargs):
        """Context manager for logging operation duration."""
        start_time = time.time()
        
        try:
            self.log_operation(operation, server_name, status="started", **kwargs)
            yield
            
            duration = time.time() - start_time
            self.log_operation(
                operation, server_name, 
                status="completed", 
                duration_ms=duration * 1000,
                **kwargs
            )
            
        except Exception as e:
            duration = time.time() - start_time
            self.log_error(
                operation, server_name, e,
                status="failed",
                duration_ms=duration * 1000,
                **kwargs
            )
            raise

# Usage example
mcp_logger = MCPLogger("mcp_integration")

async def logged_mcp_operation(mcp_tools, server_name: str, tool_name: str, args: dict):
    """Execute MCP operation with comprehensive logging."""
    
    with mcp_logger.operation_context("call_tool", server_name, tool_name=tool_name):
        return await mcp_tools.call_tool(server_name, tool_name, args)

Metrics Collection

mcp_metrics.py
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from collections import defaultdict, deque
import threading

@dataclass
class MCPMetrics:
    """Comprehensive MCP metrics collection."""
    
    # Connection metrics
    connection_attempts: int = 0
    successful_connections: int = 0
    failed_connections: int = 0
    active_connections: int = 0
    
    # Operation metrics
    total_operations: int = 0
    successful_operations: int = 0
    failed_operations: int = 0
    
    # Performance metrics
    average_response_time: float = 0.0
    response_times: deque = field(default_factory=lambda: deque(maxlen=1000))
    
    # Server-specific metrics
    server_metrics: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    
    # Error tracking
    error_counts: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
    last_errors: Dict[str, str] = field(default_factory=dict)
    
    # Health metrics
    last_health_check: Optional[float] = None
    health_check_failures: int = 0
    
    def __post_init__(self):
        self._lock = threading.Lock()
    
    def record_connection_attempt(self, server_name: str, success: bool):
        """Record connection attempt."""
        with self._lock:
            self.connection_attempts += 1
            
            if success:
                self.successful_connections += 1
                self.active_connections += 1
            else:
                self.failed_connections += 1
            
            # Update server-specific metrics
            if server_name not in self.server_metrics:
                self.server_metrics[server_name] = {
                    "connection_attempts": 0,
                    "successful_connections": 0,
                    "operations": 0,
                    "errors": 0
                }
            
            self.server_metrics[server_name]["connection_attempts"] += 1
            if success:
                self.server_metrics[server_name]["successful_connections"] += 1
    
    def record_operation(self, server_name: str, duration: float, success: bool, error: Optional[str] = None):
        """Record operation metrics."""
        with self._lock:
            self.total_operations += 1
            
            if success:
                self.successful_operations += 1
            else:
                self.failed_operations += 1
                if error:
                    self.error_counts[error] += 1
                    self.last_errors[server_name] = error
            
            # Update response time metrics
            self.response_times.append(duration)
            if self.response_times:
                self.average_response_time = sum(self.response_times) / len(self.response_times)
            
            # Update server-specific metrics
            if server_name in self.server_metrics:
                self.server_metrics[server_name]["operations"] += 1
                if not success:
                    self.server_metrics[server_name]["errors"] += 1
    
    def record_health_check(self, success: bool):
        """Record health check result."""
        with self._lock:
            self.last_health_check = time.time()
            if not success:
                self.health_check_failures += 1
    
    def get_summary(self) -> Dict[str, Any]:
        """Get metrics summary."""
        with self._lock:
            success_rate = (
                self.successful_operations / self.total_operations 
                if self.total_operations > 0 else 0
            )
            
            connection_success_rate = (
                self.successful_connections / self.connection_attempts
                if self.connection_attempts > 0 else 0
            )
            
            return {
                "timestamp": time.time(),
                "connections": {
                    "attempts": self.connection_attempts,
                    "successful": self.successful_connections,
                    "failed": self.failed_connections,
                    "active": self.active_connections,
                    "success_rate": connection_success_rate
                },
                "operations": {
                    "total": self.total_operations,
                    "successful": self.successful_operations,
                    "failed": self.failed_operations,
                    "success_rate": success_rate
                },
                "performance": {
                    "average_response_time_ms": self.average_response_time * 1000,
                    "response_count": len(self.response_times)
                },
                "health": {
                    "last_check": self.last_health_check,
                    "failures": self.health_check_failures
                },
                "servers": dict(self.server_metrics),
                "top_errors": dict(list(sorted(
                    self.error_counts.items(),
                    key=lambda x: x[1],
                    reverse=True
                ))[:5])
            }

# Global metrics instance
mcp_metrics = MCPMetrics()

# Usage decorators
def track_mcp_operation(server_name: str):
    """Decorator to track MCP operations."""
    def decorator(func):
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = await func(*args, **kwargs)
                duration = time.time() - start_time
                mcp_metrics.record_operation(server_name, duration, True)
                return result
            except Exception as e:
                duration = time.time() - start_time
                mcp_metrics.record_operation(server_name, duration, False, str(e))
                raise
        return wrapper
    return decorator

Testing Strategy

Unit Tests

test_mcp_unit.py
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from xpander_sdk.tools import MultiMCPTools

class TestMCPIntegration:
    """Unit tests for MCP integration."""
    
    @pytest.fixture
    def mock_mcp_tools(self):
        """Mock MCP tools for testing."""
        tools = Mock(spec=MultiMCPTools)
        tools.connect = AsyncMock()
        tools.disconnect = AsyncMock()
        tools.list_tools = AsyncMock()
        tools.call_tool = AsyncMock()
        return tools
    
    @pytest.fixture
    def sample_server_config(self):
        """Sample server configuration for testing."""
        return {
            "filesystem": {
                "command": "npx",
                "args": ["@modelcontextprotocol/server-filesystem", "/tmp"]
            }
        }
    
    @pytest.mark.asyncio
    async def test_successful_connection(self, mock_mcp_tools, sample_server_config):
        """Test successful MCP server connection."""
        mock_mcp_tools.connect.return_value = True
        
        # Test connection
        await mock_mcp_tools.connect("filesystem")
        
        # Verify connection was called
        mock_mcp_tools.connect.assert_called_once_with("filesystem")
    
    @pytest.mark.asyncio
    async def test_connection_failure(self, mock_mcp_tools):
        """Test MCP connection failure handling."""
        mock_mcp_tools.connect.side_effect = ConnectionError("Connection failed")
        
        with pytest.raises(ConnectionError):
            await mock_mcp_tools.connect("filesystem")
    
    @pytest.mark.asyncio
    async def test_tool_discovery(self, mock_mcp_tools):
        """Test MCP tool discovery."""
        expected_tools = [
            {"name": "read_file", "description": "Read file contents"},
            {"name": "write_file", "description": "Write file contents"}
        ]
        mock_mcp_tools.list_tools.return_value = expected_tools
        
        tools = await mock_mcp_tools.list_tools("filesystem")
        
        assert tools == expected_tools
        mock_mcp_tools.list_tools.assert_called_once_with("filesystem")
    
    @pytest.mark.asyncio
    async def test_tool_execution(self, mock_mcp_tools):
        """Test MCP tool execution."""
        expected_result = {"content": "file contents"}
        mock_mcp_tools.call_tool.return_value = expected_result
        
        result = await mock_mcp_tools.call_tool(
            "filesystem",
            "read_file", 
            {"path": "/tmp/test.txt"}
        )
        
        assert result == expected_result
        mock_mcp_tools.call_tool.assert_called_once_with(
            "filesystem", "read_file", {"path": "/tmp/test.txt"}
        )
    
    def test_metrics_collection(self):
        """Test metrics collection functionality."""
        metrics = MCPMetrics()
        
        # Record some operations
        metrics.record_connection_attempt("filesystem", True)
        metrics.record_operation("filesystem", 0.5, True)
        metrics.record_operation("filesystem", 1.0, False, "timeout")
        
        summary = metrics.get_summary()
        
        assert summary["connections"]["attempts"] == 1
        assert summary["connections"]["successful"] == 1
        assert summary["operations"]["total"] == 2
        assert summary["operations"]["successful"] == 1
        assert summary["operations"]["failed"] == 1
    
    def test_circuit_breaker_open(self):
        """Test circuit breaker opens after failures."""
        circuit_breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=1)
        
        # Simulate failures
        for _ in range(3):
            try:
                circuit_breaker._on_failure()
            except:
                pass
        
        assert circuit_breaker.state == CircuitState.OPEN
    
    def test_configuration_validation(self):
        """Test configuration validation."""
        config = MCPConfig()
        
        # Empty configuration should have errors
        errors = config.validate()
        assert len(errors) > 0
        
        # Valid configuration should pass
        config.servers["test"] = MCPServerConfig(command="test", args=[])
        errors = config.validate()
        assert len(errors) == 0

Integration Tests

test_mcp_integration.py
import pytest
import asyncio
import tempfile
import json
from pathlib import Path

class TestMCPIntegrationE2E:
    """End-to-end integration tests for MCP."""
    
    @pytest.fixture
    async def real_mcp_tools(self):
        """Real MCP tools instance for integration testing."""
        # Only run if MCP servers are available
        pytest.importorskip("mcp")
        
        config = {
            "filesystem": {
                "command": "npx",
                "args": ["@modelcontextprotocol/server-filesystem", "/tmp"]
            }
        }
        
        tools = MultiMCPTools(servers=config)
        
        try:
            await tools.connect_all()
            yield tools
        finally:
            await tools.disconnect_all()
    
    @pytest.mark.integration
    @pytest.mark.asyncio
    async def test_filesystem_operations(self, real_mcp_tools):
        """Test real filesystem operations."""
        # Create test file
        test_file = "/tmp/mcp_test.txt"
        test_content = "MCP integration test content"
        
        with open(test_file, 'w') as f:
            f.write(test_content)
        
        try:
            # Test file reading
            result = await real_mcp_tools.call_tool(
                "filesystem",
                "read_file",
                {"path": test_file}
            )
            
            assert result["content"] == test_content
            
        finally:
            # Cleanup
            Path(test_file).unlink(missing_ok=True)
    
    @pytest.mark.integration
    @pytest.mark.asyncio
    async def test_server_health_check(self, real_mcp_tools):
        """Test server health checking."""
        # List available tools
        tools = await real_mcp_tools.list_tools("filesystem")
        
        assert len(tools) > 0
        assert any(tool["name"] == "read_file" for tool in tools)
    
    @pytest.mark.integration
    async def test_connection_recovery(self, real_mcp_tools):
        """Test connection recovery after disconnection."""
        # Disconnect and reconnect
        await real_mcp_tools.disconnect("filesystem")
        await real_mcp_tools.connect("filesystem")
        
        # Verify connection works
        tools = await real_mcp_tools.list_tools("filesystem")
        assert len(tools) > 0

@pytest.mark.load
class TestMCPLoadTesting:
    """Load testing for MCP integration."""
    
    @pytest.mark.asyncio
    async def test_concurrent_operations(self):
        """Test concurrent MCP operations."""
        mock_tools = Mock(spec=MultiMCPTools)
        mock_tools.call_tool = AsyncMock(return_value={"result": "success"})
        
        # Simulate concurrent operations
        tasks = []
        for i in range(100):
            task = mock_tools.call_tool("filesystem", "read_file", {"path": f"/tmp/file_{i}.txt"})
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        
        assert len(results) == 100
        assert all(r["result"] == "success" for r in results)
        assert mock_tools.call_tool.call_count == 100
    
    @pytest.mark.asyncio
    async def test_connection_pool_limits(self):
        """Test connection pool behavior under load."""
        # This would test actual connection pooling limits
        # Implementation depends on your connection pooling strategy
        pass

Test Configuration

conftest.py
import pytest
import asyncio
import os
from unittest.mock import patch

def pytest_configure(config):
    """Configure pytest with custom markers."""
    config.addinivalue_line("markers", "integration: integration tests requiring real MCP servers")
    config.addinivalue_line("markers", "load: load testing scenarios")

@pytest.fixture(scope="session")
def event_loop():
    """Create event loop for async tests."""
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    yield loop
    loop.close()

@pytest.fixture
def mock_environment():
    """Mock environment variables for testing."""
    env_vars = {
        "XPANDER_API_KEY": "test-api-key",
        "XPANDER_ORGANIZATION_ID": "test-org-id",
        "GITHUB_TOKEN": "test-github-token"
    }
    
    with patch.dict(os.environ, env_vars):
        yield env_vars

@pytest.fixture
def temp_mcp_config():
    """Create temporary MCP configuration file."""
    import tempfile
    import json
    
    config = {
        "servers": {
            "filesystem": {
                "command": "npx",
                "args": ["@modelcontextprotocol/server-filesystem", "/tmp"]
            }
        },
        "global_timeout": 30
    }
    
    with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
        json.dump(config, f)
        f.flush()
        
        yield f.name
        
        os.unlink(f.name)

Production Deployment Checklist

Configuration Validation

  • Environment variables are properly set
  • MCP server configurations are validated
  • Security tokens are encrypted
  • Configuration schema is validated

Testing

  • Unit tests pass
  • Integration tests pass
  • Load tests meet performance requirements
  • Security tests validate token handling

Monitoring Setup

  • Logging is configured
  • Metrics collection is enabled
  • Health checks are implemented
  • Alerting is configured

Infrastructure

  • MCP servers are installed and available
  • Network connectivity is verified
  • Resource limits are configured
  • Backup and recovery procedures are in place

Application

  • Application is deployed with proper configuration
  • Health checks are passing
  • Metrics are being collected
  • Logs are being generated and stored

Monitoring

  • Monitor connection success rates
  • Track operation response times
  • Watch for error patterns
  • Monitor resource usage

Maintenance

  • Regular health checks
  • Log analysis and cleanup
  • Performance optimization
  • Security updates

Performance Optimization

Connection Pooling

connection_pool.py
import asyncio
from typing import Dict, Optional
from dataclasses import dataclass

@dataclass
class ConnectionPool:
    """Connection pool for MCP servers."""
    max_connections: int = 10
    min_connections: int = 2
    connection_timeout: int = 30
    
    def __post_init__(self):
        self.pools: Dict[str, asyncio.Queue] = {}
        self.active_connections: Dict[str, int] = {}
    
    async def get_connection(self, server_name: str):
        """Get connection from pool."""
        if server_name not in self.pools:
            self.pools[server_name] = asyncio.Queue(maxsize=self.max_connections)
            self.active_connections[server_name] = 0
        
        pool = self.pools[server_name]
        
        try:
            # Try to get existing connection
            connection = pool.get_nowait()
            return connection
        except asyncio.QueueEmpty:
            # Create new connection if under limit
            if self.active_connections[server_name] < self.max_connections:
                connection = await self._create_connection(server_name)
                self.active_connections[server_name] += 1
                return connection
            else:
                # Wait for available connection
                return await asyncio.wait_for(pool.get(), timeout=self.connection_timeout)
    
    async def return_connection(self, server_name: str, connection):
        """Return connection to pool."""
        if server_name in self.pools:
            try:
                self.pools[server_name].put_nowait(connection)
            except asyncio.QueueFull:
                # Pool is full, close connection
                await self._close_connection(connection)
                self.active_connections[server_name] -= 1
    
    async def _create_connection(self, server_name: str):
        """Create new connection to MCP server."""
        # Implementation depends on MCP client library
        pass
    
    async def _close_connection(self, connection):
        """Close MCP connection."""
        # Implementation depends on MCP client library
        pass

Caching Strategy

mcp_caching.py
import asyncio
import time
from typing import Any, Optional, Dict, Tuple
import hashlib
import json

class MCPCache:
    """Caching layer for MCP operations."""
    
    def __init__(self, default_ttl: int = 300):
        self.cache: Dict[str, Tuple[Any, float]] = {}
        self.default_ttl = default_ttl
        self._lock = asyncio.Lock()
    
    def _generate_cache_key(self, server_name: str, tool_name: str, args: dict) -> str:
        """Generate cache key from operation parameters."""
        key_data = {
            "server": server_name,
            "tool": tool_name,
            "args": args
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.md5(key_str.encode()).hexdigest()
    
    async def get(self, server_name: str, tool_name: str, args: dict) -> Optional[Any]:
        """Get cached result if available and not expired."""
        cache_key = self._generate_cache_key(server_name, tool_name, args)
        
        async with self._lock:
            if cache_key in self.cache:
                result, expiry_time = self.cache[cache_key]
                
                if time.time() < expiry_time:
                    return result
                else:
                    # Remove expired entry
                    del self.cache[cache_key]
        
        return None
    
    async def set(self, server_name: str, tool_name: str, args: dict, result: Any, ttl: Optional[int] = None):
        """Cache operation result."""
        cache_key = self._generate_cache_key(server_name, tool_name, args)
        expiry_time = time.time() + (ttl or self.default_ttl)
        
        async with self._lock:
            self.cache[cache_key] = (result, expiry_time)
    
    async def invalidate(self, server_name: str, tool_name: str, args: dict):
        """Invalidate specific cache entry."""
        cache_key = self._generate_cache_key(server_name, tool_name, args)
        
        async with self._lock:
            self.cache.pop(cache_key, None)
    
    async def clear_expired(self):
        """Clear expired cache entries."""
        current_time = time.time()
        
        async with self._lock:
            expired_keys = [
                key for key, (_, expiry_time) in self.cache.items()
                if current_time >= expiry_time
            ]
            
            for key in expired_keys:
                del self.cache[key]

# Usage with MCP tools
class CachedMCPTools:
    """MCP tools wrapper with caching."""
    
    def __init__(self, mcp_tools, cache_ttl: int = 300):
        self.mcp_tools = mcp_tools
        self.cache = MCPCache(default_ttl=cache_ttl)
    
    async def call_tool_cached(self, server_name: str, tool_name: str, args: dict, use_cache: bool = True):
        """Call MCP tool with caching support."""
        
        if use_cache:
            # Try to get from cache first
            cached_result = await self.cache.get(server_name, tool_name, args)
            if cached_result is not None:
                return cached_result
        
        # Call actual MCP tool
        result = await self.mcp_tools.call_tool(server_name, tool_name, args)
        
        if use_cache:
            # Cache the result
            await self.cache.set(server_name, tool_name, args, result)
        
        return result

Security Best Practices

  1. Token Management: Store tokens securely using environment variables or secret management systems
  2. Token Rotation: Implement automatic token rotation for long-lived tokens
  3. Access Control: Limit MCP server access based on user roles and permissions
  4. Audit Logging: Log all MCP operations for security auditing
  1. TLS Encryption: Ensure all MCP communication uses TLS
  2. Network Segmentation: Isolate MCP servers in secure network segments
  3. Firewall Rules: Configure strict firewall rules for MCP server access
  4. VPN/Private Networks: Use VPNs for remote MCP server access
  1. Data Encryption: Encrypt sensitive data at rest and in transit
  2. Data Sanitization: Sanitize data before passing to MCP servers
  3. Data Retention: Implement proper data retention and deletion policies
  4. Backup Security: Secure backup data with encryption and access controls

Summary

This comprehensive guide covers:
  1. Configuration Management: Environment-based, secure, and validated configuration
  2. Error Handling: Circuit breakers, retry logic, and graceful degradation
  3. Monitoring: Structured logging, metrics collection, and health checks
  4. Testing: Unit, integration, and load testing strategies
  5. Performance: Connection pooling, caching, and optimization techniques
  6. Security: Authentication, authorization, and data protection
Following these best practices will ensure your MCP integration is production-ready, maintainable, and secure.

References

I