Skip to content

Architecture

MCPOmni Connect is built with a modular, extensible architecture designed for scalability, reliability, and ease of use. This document provides a comprehensive overview of the system's design and components.

System Overview

MCPOmni Connect acts as an intelligent gateway between users and the Model Context Protocol (MCP) ecosystem, providing AI-powered automation and orchestration capabilities.

graph TB
    User[๐Ÿ‘ค User] --> CLI[๐Ÿ–ฅ๏ธ CLI Interface]
    CLI --> Core[๐Ÿง  Core Engine]

    Core --> LLM[๐Ÿค– LLM Integration]
    Core --> Memory[๐Ÿ’พ Memory Management]
    Core --> Session[๐Ÿ”„ Session Management]
    Core --> Transport[๐Ÿš€ Transport Layer]

    LLM --> Providers[โ˜๏ธ LLM Providers]
    Memory --> Redis[๐Ÿ“Š Redis]
    Memory --> Files[๐Ÿ“ File Storage]

    Transport --> Stdio[๐Ÿ“บ Stdio]
    Transport --> SSE[๐Ÿ“ก SSE]
    Transport --> HTTP[๐ŸŒ HTTP]

    Stdio --> LocalMCP[๐Ÿ”ง Local MCP Servers]
    SSE --> RemoteMCP[๐ŸŒ Remote MCP Servers]
    HTTP --> APIMnCP[๐Ÿ”Œ API MCP Servers]

    Core --> Agent[๐Ÿค– Agent System]
    Agent --> Chat[๐Ÿ’ฌ Chat Mode]
    Agent --> Auto[โšก Autonomous Mode]
    Agent --> Orch[๐ŸŽฏ Orchestrator Mode]

Core Components

1. CLI Interface Layer

The user-facing command-line interface that handles input/output and user interactions.

Responsibilities: - Command parsing and validation - User input handling - Output formatting and display - Interactive prompts and confirmations - Error message presentation

Key Features: - Rich text formatting with syntax highlighting - Interactive command completion - Real-time status updates - Debug mode visualization

2. Core Engine

The central orchestrator that coordinates all system components.

Responsibilities: - Component lifecycle management - Event coordination and messaging - Configuration management - Error handling and recovery - System state management

Components:

class CoreEngine:
    def __init__(self):
        self.session_manager = SessionManager()
        self.transport_layer = TransportLayer()
        self.llm_integration = LLMIntegration()
        self.memory_manager = MemoryManager()
        self.agent_system = AgentSystem()

3. Agent System

The AI-powered decision-making and execution engine.

graph LR
    Agent[๐Ÿง  Agent System] --> ReAct[๐Ÿ”„ ReAct Engine]
    Agent --> Orchestrator[๐ŸŽฏ Orchestrator]
    Agent --> Context[๐Ÿ“š Context Manager]

    ReAct --> Reasoning[๐Ÿ’ญ Reasoning]
    ReAct --> Acting[โšก Acting]
    ReAct --> Observing[๐Ÿ‘๏ธ Observing]

    Orchestrator --> Planning[๐Ÿ“‹ Planning]
    Orchestrator --> Coordination[๐Ÿค Coordination]
    Orchestrator --> Monitoring[๐Ÿ“Š Monitoring]

Mode Architecture:

class ChatMode:
    def process_request(self, user_input):
        # 1. Parse user intent
        intent = self.parse_intent(user_input)

        # 2. Plan actions
        actions = self.plan_actions(intent)

        # 3. Request approval for each action
        for action in actions:
            if self.request_approval(action):
                result = self.execute_action(action)
                self.present_result(result)
class AutonomousMode:
    def process_request(self, user_input):
        # 1. Parse and understand goal
        goal = self.parse_goal(user_input)

        # 2. ReAct loop
        while not self.goal_achieved(goal):
            thought = self.think(current_state)
            action = self.plan_action(thought)
            observation = self.execute_action(action)
            self.update_state(observation)

        # 3. Report completion
        return self.generate_report()
class OrchestratorMode:
    def process_request(self, user_input):
        # 1. Strategic analysis
        strategy = self.analyze_requirements(user_input)

        # 2. Multi-phase planning
        phases = self.create_execution_plan(strategy)

        # 3. Coordinate execution
        for phase in phases:
            agents = self.allocate_agents(phase)
            results = self.execute_parallel(agents)
            self.merge_results(results)

        return self.final_report()

Transport Layer

Transport Architecture

graph TB
    TL[๐Ÿš€ Transport Layer] --> TM[๐Ÿ“‹ Transport Manager]
    TM --> Registry[๐Ÿ“Š Transport Registry]
    TM --> Factory[๐Ÿญ Transport Factory]

    Factory --> StdioT[๐Ÿ“บ Stdio Transport]
    Factory --> SSET[๐Ÿ“ก SSE Transport]
    Factory --> HTTPT[๐ŸŒ HTTP Transport]

    StdioT --> Process[โš™๏ธ Process Manager]
    SSET --> EventStream[๐Ÿ“ˆ Event Stream]
    HTTPT --> AuthManager[๐Ÿ” Auth Manager]

    AuthManager --> OAuth[๐Ÿ”‘ OAuth Handler]
    AuthManager --> Bearer[๐ŸŽซ Bearer Token]
    AuthManager --> Custom[๐Ÿ”ง Custom Headers]

Transport Implementations

Stdio Transport

class StdioTransport:
    def __init__(self, command, args):
        self.process = subprocess.Popen(
            [command] + args,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )

    async def send_message(self, message):
        await self.process.stdin.write(message)

    async def receive_message(self):
        return await self.process.stdout.readline()

SSE Transport

class SSETransport:
    def __init__(self, url, headers):
        self.url = url
        self.headers = headers
        self.client = httpx.AsyncClient()

    async def connect(self):
        self.stream = self.client.stream(
            "GET", self.url, headers=self.headers
        )

    async def receive_events(self):
        async for line in self.stream.aiter_lines():
            if line.startswith("data: "):
                yield json.loads(line[6:])

HTTP Transport

class HTTPTransport:
    def __init__(self, url, auth_config):
        self.url = url
        self.auth = self.setup_auth(auth_config)
        self.client = httpx.AsyncClient()

    async def send_request(self, data):
        response = await self.client.post(
            self.url,
            json=data,
            headers=self.auth.get_headers()
        )
        return response.json()

Session Management

Session Architecture

graph LR
    SM[๐Ÿ”„ Session Manager] --> SL[๐Ÿ“Š Session Lifecycle]
    SM --> CR[๐Ÿ”— Connection Registry]
    SM --> HM[๐Ÿ’– Health Monitor]

    CR --> Servers[๐Ÿ–ฅ๏ธ Server Connections]
    HM --> Heartbeat[๐Ÿ’“ Heartbeat]
    HM --> Recovery[๐Ÿ”„ Recovery]

    Servers --> Active[โœ… Active]
    Servers --> Idle[๐Ÿ˜ด Idle]
    Servers --> Failed[โŒ Failed]

Connection Management

class SessionManager:
    def __init__(self):
        self.connections = {}
        self.health_monitor = HealthMonitor()
        self.recovery_manager = RecoveryManager()

    async def connect_server(self, server_config):
        transport = self.create_transport(server_config)
        connection = await transport.connect()

        self.connections[server_config.name] = connection
        self.health_monitor.add_connection(connection)

        return connection

    async def health_check(self):
        for name, connection in self.connections.items():
            if not await connection.is_healthy():
                await self.recovery_manager.recover(name, connection)

LLM Integration

LiteLLM Integration Architecture

graph TB
    LLM[๐Ÿค– LLM Integration] --> LiteLLM[โšก LiteLLM]
    LLM --> Config[โš™๏ธ Config Manager]
    LLM --> Context[๐Ÿ“š Context Manager]

    LiteLLM --> OpenAI[๐Ÿ”ต OpenAI]
    LiteLLM --> Anthropic[๐ŸŸฃ Anthropic]
    LiteLLM --> Google[๐Ÿ”ด Google]
    LiteLLM --> Others[... Others]

    Context --> Window[๐ŸชŸ Context Window]
    Context --> History[๐Ÿ“œ History]
    Context --> Pruning[โœ‚๏ธ Pruning]

LLM Integration Implementation

class LLMIntegration:
    def __init__(self, config):
        self.config = config
        self.context_manager = ContextManager()
        self.client = self.setup_litellm()

    def setup_litellm(self):
        return litellm.completion

    async def generate_response(self, messages, tools=None):
        # Prepare context
        context = self.context_manager.prepare_context(messages)

        # Call LLM
        response = await self.client(
            model=f"{self.config.provider}/{self.config.model}",
            messages=context,
            tools=tools,
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens
        )

        return response

Memory Management

Memory Architecture

graph TB
    MM[๐Ÿ’พ Memory Manager] --> SM[๐Ÿง  Session Memory]
    MM --> RM[๐Ÿ“Š Redis Memory]
    MM --> FM[๐Ÿ“ File Memory]

    SM --> Current[โšก Current Context]
    SM --> Buffer[๐Ÿ“ฆ Message Buffer]

    RM --> Persistence[๐Ÿ’พ Persistence]
    RM --> TTL[โฐ TTL Management]

    FM --> Save[๐Ÿ’พ Save Operations]
    FM --> Load[๐Ÿ“ฅ Load Operations]
    FM --> Backup[๐Ÿ”„ Backup]

Memory Implementation

class MemoryManager:
    def __init__(self, config):
        self.session_memory = SessionMemory()
        self.redis_memory = RedisMemory(config.redis) if config.redis else None
        self.file_memory = FileMemory()
        self.enabled = False

    async def store_message(self, message):
        # Always store in session
        self.session_memory.add(message)

        # Store in Redis if enabled
        if self.enabled and self.redis_memory:
            await self.redis_memory.store(message)

    async def get_context(self, limit=None):
        # Get from Redis if available
        if self.enabled and self.redis_memory:
            return await self.redis_memory.get_context(limit)

        # Fallback to session memory
        return self.session_memory.get_context(limit)

Tool Management

Tool Discovery and Execution

graph LR
    TM[๐Ÿ”ง Tool Manager] --> Discovery[๐Ÿ” Discovery]
    TM --> Registry[๐Ÿ“‹ Registry]
    TM --> Executor[โšก Executor]

    Discovery --> Servers[๐Ÿ–ฅ๏ธ Server Tools]
    Registry --> Metadata[๐Ÿ“Š Tool Metadata]
    Registry --> Routing[๐Ÿ›ค๏ธ Routing Rules]

    Executor --> Parallel[โšก Parallel Exec]
    Executor --> Serial[๐Ÿ”„ Serial Exec]
    Executor --> Fallback[๐Ÿ”„ Fallback]

Tool Execution Engine

class ToolManager:
    def __init__(self):
        self.registry = ToolRegistry()
        self.executor = ToolExecutor()
        self.router = ToolRouter()

    async def discover_tools(self, connections):
        for connection in connections:
            tools = await connection.list_tools()
            for tool in tools:
                self.registry.register(tool, connection)

    async def execute_tool(self, tool_name, parameters):
        # Route to appropriate server
        connection = self.router.route(tool_name)

        # Execute with timeout and retry
        return await self.executor.execute(
            connection, tool_name, parameters
        )

Security Architecture

Security Layers

graph TB
    Security[๐Ÿ” Security] --> Auth[๐Ÿ”‘ Authentication]
    Security --> Authz[๐Ÿ›ก๏ธ Authorization]
    Security --> Encryption[๐Ÿ”’ Encryption]
    Security --> Isolation[๐Ÿฐ Isolation]

    Auth --> OAuth[๐Ÿ” OAuth 2.0]
    Auth --> Tokens[๐ŸŽซ Bearer Tokens]
    Auth --> Custom[๐Ÿ”ง Custom Auth]

    Authz --> ServerLevel[๐Ÿ–ฅ๏ธ Server Level]
    Authz --> ToolLevel[๐Ÿ”ง Tool Level]

    Encryption --> Transit[๐Ÿš€ In Transit]
    Encryption --> Rest[๐Ÿ’พ At Rest]

    Isolation --> ServerIso[๐Ÿ  Server Isolation]
    Isolation --> DataIso[๐Ÿ“Š Data Isolation]

Security Implementation

class SecurityManager:
    def __init__(self):
        self.auth_manager = AuthenticationManager()
        self.authz_manager = AuthorizationManager()
        self.crypto = CryptographyManager()

    async def authenticate_server(self, server_config):
        if server_config.auth_method == "oauth":
            return await self.auth_manager.oauth_flow(server_config)
        elif server_config.auth_method == "bearer":
            return self.auth_manager.bearer_token(server_config)

    def encrypt_sensitive_data(self, data):
        return self.crypto.encrypt(data)

    def authorize_tool_access(self, tool, user_context):
        return self.authz_manager.check_permission(tool, user_context)

Performance and Scalability

Performance Architecture

graph LR
    Perf[โšก Performance] --> Caching[๐Ÿ—„๏ธ Caching]
    Perf --> Pooling[๐ŸŠ Connection Pooling]
    Perf --> Async[๐Ÿ”„ Async Processing]
    Perf --> Monitoring[๐Ÿ“Š Monitoring]

    Caching --> ToolCache[๐Ÿ”ง Tool Results]
    Caching --> ContextCache[๐Ÿ“š Context Cache]

    Pooling --> ConnPool[๐Ÿ”— Connection Pool]
    Pooling --> ThreadPool[๐Ÿงต Thread Pool]

    Async --> EventLoop[๐Ÿ”„ Event Loop]
    Async --> Coroutines[โšก Coroutines]

Performance Optimizations

class PerformanceManager:
    def __init__(self):
        self.cache = CacheManager()
        self.connection_pool = ConnectionPool()
        self.metrics = MetricsCollector()

    async def execute_with_cache(self, tool_call):
        cache_key = self.generate_cache_key(tool_call)

        # Check cache first
        cached_result = await self.cache.get(cache_key)
        if cached_result:
            self.metrics.record_cache_hit(tool_call)
            return cached_result

        # Execute and cache result
        result = await self.execute_tool(tool_call)
        await self.cache.set(cache_key, result, ttl=300)

        self.metrics.record_cache_miss(tool_call)
        return result

Configuration System

Configuration Architecture

graph TB
    Config[โš™๏ธ Configuration] --> Env[๐ŸŒ Environment]
    Config --> JSON[๐Ÿ“„ JSON Config]
    Config --> Runtime[โšก Runtime Config]

    Env --> APIKeys[๐Ÿ”‘ API Keys]
    Env --> Redis[๐Ÿ“Š Redis Config]
    Env --> Debug[๐Ÿ› Debug Settings]

    JSON --> LLMConfig[๐Ÿค– LLM Config]
    JSON --> Servers[๐Ÿ–ฅ๏ธ Server Config]
    JSON --> AgentConfig[๐Ÿค– Agent Config]

    Runtime --> Dynamic[๐Ÿ”„ Dynamic Updates]
    Runtime --> Validation[โœ… Validation]

Configuration Management

class ConfigurationManager:
    def __init__(self):
        self.env_config = self.load_env_config()
        self.json_config = self.load_json_config()
        self.runtime_config = {}
        self.validators = ConfigValidators()

    def load_env_config(self):
        return {
            'llm_api_key': os.getenv('LLM_API_KEY'),
            'redis_host': os.getenv('REDIS_HOST', 'localhost'),
            'redis_port': int(os.getenv('REDIS_PORT', 6379)),
            'debug': os.getenv('DEBUG', 'false').lower() == 'true'
        }

    def validate_configuration(self):
        errors = []

        # Validate environment variables
        if not self.env_config.get('llm_api_key'):
            errors.append("LLM_API_KEY is required")

        # Validate JSON configuration
        if not self.json_config.get('LLM'):
            errors.append("LLM configuration is required")

        if errors:
            raise ConfigurationError(errors)

Error Handling and Recovery

Error Handling Strategy

graph TB
    Error[โŒ Error Handling] --> Detection[๐Ÿ” Detection]
    Error --> Classification[๐Ÿ“Š Classification]
    Error --> Recovery[๐Ÿ”„ Recovery]
    Error --> Reporting[๐Ÿ“ข Reporting]

    Detection --> Monitoring[๐Ÿ“Š Monitoring]
    Detection --> Logging[๐Ÿ“ Logging]

    Classification --> Transient[โฑ๏ธ Transient]
    Classification --> Permanent[๐Ÿ”’ Permanent]
    Classification --> Unknown[โ“ Unknown]

    Recovery --> Retry[๐Ÿ”„ Retry]
    Recovery --> Fallback[๐Ÿ”„ Fallback]
    Recovery --> Graceful[โœ… Graceful Degradation]

Recovery Implementation

class ErrorRecoveryManager:
    def __init__(self):
        self.retry_policies = RetryPolicies()
        self.fallback_strategies = FallbackStrategies()
        self.circuit_breakers = CircuitBreakerRegistry()

    async def handle_error(self, error, context):
        error_type = self.classify_error(error)

        if error_type == ErrorType.TRANSIENT:
            return await self.retry_with_backoff(context)
        elif error_type == ErrorType.PERMANENT:
            return await self.execute_fallback(context)
        else:
            return await self.graceful_degradation(context)

    async def retry_with_backoff(self, context, max_retries=3):
        for attempt in range(max_retries):
            try:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
                return await context.retry()
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                continue

Monitoring and Observability

Observability Stack

graph LR
    Obs[๐Ÿ‘๏ธ Observability] --> Metrics[๐Ÿ“Š Metrics]
    Obs --> Logging[๐Ÿ“ Logging]
    Obs --> Tracing[๐Ÿ” Tracing]
    Obs --> Health[๐Ÿ’– Health Checks]

    Metrics --> Performance[โšก Performance]
    Metrics --> Usage[๐Ÿ“ˆ Usage]
    Metrics --> Errors[โŒ Errors]

    Logging --> Structured[๐Ÿ“‹ Structured]
    Logging --> Levels[๐Ÿ“Š Log Levels]

    Tracing --> Requests[๐Ÿ“ Request Tracing]
    Tracing --> Dependencies[๐Ÿ”— Dependency Tracing]

Monitoring Implementation

class MonitoringManager:
    def __init__(self):
        self.metrics_collector = MetricsCollector()
        self.logger = StructuredLogger()
        self.tracer = DistributedTracer()
        self.health_checker = HealthChecker()

    def record_tool_execution(self, tool_name, duration, success):
        self.metrics_collector.increment(
            'tool_executions_total',
            tags={'tool': tool_name, 'success': success}
        )
        self.metrics_collector.histogram(
            'tool_execution_duration',
            duration,
            tags={'tool': tool_name}
        )

    def log_user_interaction(self, user_input, response, context):
        self.logger.info(
            "user_interaction",
            user_input=user_input,
            response_length=len(response),
            mode=context.mode,
            servers_connected=len(context.servers)
        )

Extensibility and Plugin System

Plugin Architecture

graph TB
    Plugin[๐Ÿ”Œ Plugin System] --> Registry[๐Ÿ“‹ Plugin Registry]
    Plugin --> Loader[๐Ÿ“ฅ Plugin Loader]
    Plugin --> Lifecycle[๐Ÿ”„ Lifecycle Manager]

    Registry --> Transport[๐Ÿš€ Transport Plugins]
    Registry --> LLM[๐Ÿค– LLM Plugins]
    Registry --> Tool[๐Ÿ”ง Tool Plugins]
    Registry --> Memory[๐Ÿ’พ Memory Plugins]

    Loader --> Discovery[๐Ÿ” Discovery]
    Loader --> Validation[โœ… Validation]
    Loader --> Installation[๐Ÿ“ฆ Installation]

This architecture provides a solid foundation for MCPOmni Connect's current capabilities while allowing for future expansion and customization.


Next: API Reference โ†’