Architecture¶
MCPOmni Connect is built with a modular, extensible architecture designed for scalability, reliability, and ease of use. This document provides a comprehensive overview of the system's design and components.
System Overview¶
MCPOmni Connect acts as an intelligent gateway between users and the Model Context Protocol (MCP) ecosystem, providing AI-powered automation and orchestration capabilities.
graph TB
User[๐ค User] --> CLI[๐ฅ๏ธ CLI Interface]
CLI --> Core[๐ง Core Engine]
Core --> LLM[๐ค LLM Integration]
Core --> Memory[๐พ Memory Management]
Core --> Session[๐ Session Management]
Core --> Transport[๐ Transport Layer]
LLM --> Providers[โ๏ธ LLM Providers]
Memory --> Redis[๐ Redis]
Memory --> Files[๐ File Storage]
Transport --> Stdio[๐บ Stdio]
Transport --> SSE[๐ก SSE]
Transport --> HTTP[๐ HTTP]
Stdio --> LocalMCP[๐ง Local MCP Servers]
SSE --> RemoteMCP[๐ Remote MCP Servers]
HTTP --> APIMnCP[๐ API MCP Servers]
Core --> Agent[๐ค Agent System]
Agent --> Chat[๐ฌ Chat Mode]
Agent --> Auto[โก Autonomous Mode]
Agent --> Orch[๐ฏ Orchestrator Mode]
Core Components¶
1. CLI Interface Layer¶
The user-facing command-line interface that handles input/output and user interactions.
Responsibilities: - Command parsing and validation - User input handling - Output formatting and display - Interactive prompts and confirmations - Error message presentation
Key Features: - Rich text formatting with syntax highlighting - Interactive command completion - Real-time status updates - Debug mode visualization
2. Core Engine¶
The central orchestrator that coordinates all system components.
Responsibilities: - Component lifecycle management - Event coordination and messaging - Configuration management - Error handling and recovery - System state management
Components:
class CoreEngine:
def __init__(self):
self.session_manager = SessionManager()
self.transport_layer = TransportLayer()
self.llm_integration = LLMIntegration()
self.memory_manager = MemoryManager()
self.agent_system = AgentSystem()
3. Agent System¶
The AI-powered decision-making and execution engine.
graph LR
Agent[๐ง Agent System] --> ReAct[๐ ReAct Engine]
Agent --> Orchestrator[๐ฏ Orchestrator]
Agent --> Context[๐ Context Manager]
ReAct --> Reasoning[๐ญ Reasoning]
ReAct --> Acting[โก Acting]
ReAct --> Observing[๐๏ธ Observing]
Orchestrator --> Planning[๐ Planning]
Orchestrator --> Coordination[๐ค Coordination]
Orchestrator --> Monitoring[๐ Monitoring]
Mode Architecture:
class ChatMode:
def process_request(self, user_input):
# 1. Parse user intent
intent = self.parse_intent(user_input)
# 2. Plan actions
actions = self.plan_actions(intent)
# 3. Request approval for each action
for action in actions:
if self.request_approval(action):
result = self.execute_action(action)
self.present_result(result)
class AutonomousMode:
def process_request(self, user_input):
# 1. Parse and understand goal
goal = self.parse_goal(user_input)
# 2. ReAct loop
while not self.goal_achieved(goal):
thought = self.think(current_state)
action = self.plan_action(thought)
observation = self.execute_action(action)
self.update_state(observation)
# 3. Report completion
return self.generate_report()
class OrchestratorMode:
def process_request(self, user_input):
# 1. Strategic analysis
strategy = self.analyze_requirements(user_input)
# 2. Multi-phase planning
phases = self.create_execution_plan(strategy)
# 3. Coordinate execution
for phase in phases:
agents = self.allocate_agents(phase)
results = self.execute_parallel(agents)
self.merge_results(results)
return self.final_report()
Transport Layer¶
Transport Architecture¶
graph TB
TL[๐ Transport Layer] --> TM[๐ Transport Manager]
TM --> Registry[๐ Transport Registry]
TM --> Factory[๐ญ Transport Factory]
Factory --> StdioT[๐บ Stdio Transport]
Factory --> SSET[๐ก SSE Transport]
Factory --> HTTPT[๐ HTTP Transport]
StdioT --> Process[โ๏ธ Process Manager]
SSET --> EventStream[๐ Event Stream]
HTTPT --> AuthManager[๐ Auth Manager]
AuthManager --> OAuth[๐ OAuth Handler]
AuthManager --> Bearer[๐ซ Bearer Token]
AuthManager --> Custom[๐ง Custom Headers]
Transport Implementations¶
Stdio Transport¶
class StdioTransport:
def __init__(self, command, args):
self.process = subprocess.Popen(
[command] + args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
async def send_message(self, message):
await self.process.stdin.write(message)
async def receive_message(self):
return await self.process.stdout.readline()
SSE Transport¶
class SSETransport:
def __init__(self, url, headers):
self.url = url
self.headers = headers
self.client = httpx.AsyncClient()
async def connect(self):
self.stream = self.client.stream(
"GET", self.url, headers=self.headers
)
async def receive_events(self):
async for line in self.stream.aiter_lines():
if line.startswith("data: "):
yield json.loads(line[6:])
HTTP Transport¶
class HTTPTransport:
def __init__(self, url, auth_config):
self.url = url
self.auth = self.setup_auth(auth_config)
self.client = httpx.AsyncClient()
async def send_request(self, data):
response = await self.client.post(
self.url,
json=data,
headers=self.auth.get_headers()
)
return response.json()
Session Management¶
Session Architecture¶
graph LR
SM[๐ Session Manager] --> SL[๐ Session Lifecycle]
SM --> CR[๐ Connection Registry]
SM --> HM[๐ Health Monitor]
CR --> Servers[๐ฅ๏ธ Server Connections]
HM --> Heartbeat[๐ Heartbeat]
HM --> Recovery[๐ Recovery]
Servers --> Active[โ
Active]
Servers --> Idle[๐ด Idle]
Servers --> Failed[โ Failed]
Connection Management¶
class SessionManager:
def __init__(self):
self.connections = {}
self.health_monitor = HealthMonitor()
self.recovery_manager = RecoveryManager()
async def connect_server(self, server_config):
transport = self.create_transport(server_config)
connection = await transport.connect()
self.connections[server_config.name] = connection
self.health_monitor.add_connection(connection)
return connection
async def health_check(self):
for name, connection in self.connections.items():
if not await connection.is_healthy():
await self.recovery_manager.recover(name, connection)
LLM Integration¶
LiteLLM Integration Architecture¶
graph TB
LLM[๐ค LLM Integration] --> LiteLLM[โก LiteLLM]
LLM --> Config[โ๏ธ Config Manager]
LLM --> Context[๐ Context Manager]
LiteLLM --> OpenAI[๐ต OpenAI]
LiteLLM --> Anthropic[๐ฃ Anthropic]
LiteLLM --> Google[๐ด Google]
LiteLLM --> Others[... Others]
Context --> Window[๐ช Context Window]
Context --> History[๐ History]
Context --> Pruning[โ๏ธ Pruning]
LLM Integration Implementation¶
class LLMIntegration:
def __init__(self, config):
self.config = config
self.context_manager = ContextManager()
self.client = self.setup_litellm()
def setup_litellm(self):
return litellm.completion
async def generate_response(self, messages, tools=None):
# Prepare context
context = self.context_manager.prepare_context(messages)
# Call LLM
response = await self.client(
model=f"{self.config.provider}/{self.config.model}",
messages=context,
tools=tools,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens
)
return response
Memory Management¶
Memory Architecture¶
graph TB
MM[๐พ Memory Manager] --> SM[๐ง Session Memory]
MM --> RM[๐ Redis Memory]
MM --> FM[๐ File Memory]
SM --> Current[โก Current Context]
SM --> Buffer[๐ฆ Message Buffer]
RM --> Persistence[๐พ Persistence]
RM --> TTL[โฐ TTL Management]
FM --> Save[๐พ Save Operations]
FM --> Load[๐ฅ Load Operations]
FM --> Backup[๐ Backup]
Memory Implementation¶
class MemoryManager:
def __init__(self, config):
self.session_memory = SessionMemory()
self.redis_memory = RedisMemory(config.redis) if config.redis else None
self.file_memory = FileMemory()
self.enabled = False
async def store_message(self, message):
# Always store in session
self.session_memory.add(message)
# Store in Redis if enabled
if self.enabled and self.redis_memory:
await self.redis_memory.store(message)
async def get_context(self, limit=None):
# Get from Redis if available
if self.enabled and self.redis_memory:
return await self.redis_memory.get_context(limit)
# Fallback to session memory
return self.session_memory.get_context(limit)
Tool Management¶
Tool Discovery and Execution¶
graph LR
TM[๐ง Tool Manager] --> Discovery[๐ Discovery]
TM --> Registry[๐ Registry]
TM --> Executor[โก Executor]
Discovery --> Servers[๐ฅ๏ธ Server Tools]
Registry --> Metadata[๐ Tool Metadata]
Registry --> Routing[๐ค๏ธ Routing Rules]
Executor --> Parallel[โก Parallel Exec]
Executor --> Serial[๐ Serial Exec]
Executor --> Fallback[๐ Fallback]
Tool Execution Engine¶
class ToolManager:
def __init__(self):
self.registry = ToolRegistry()
self.executor = ToolExecutor()
self.router = ToolRouter()
async def discover_tools(self, connections):
for connection in connections:
tools = await connection.list_tools()
for tool in tools:
self.registry.register(tool, connection)
async def execute_tool(self, tool_name, parameters):
# Route to appropriate server
connection = self.router.route(tool_name)
# Execute with timeout and retry
return await self.executor.execute(
connection, tool_name, parameters
)
Security Architecture¶
Security Layers¶
graph TB
Security[๐ Security] --> Auth[๐ Authentication]
Security --> Authz[๐ก๏ธ Authorization]
Security --> Encryption[๐ Encryption]
Security --> Isolation[๐ฐ Isolation]
Auth --> OAuth[๐ OAuth 2.0]
Auth --> Tokens[๐ซ Bearer Tokens]
Auth --> Custom[๐ง Custom Auth]
Authz --> ServerLevel[๐ฅ๏ธ Server Level]
Authz --> ToolLevel[๐ง Tool Level]
Encryption --> Transit[๐ In Transit]
Encryption --> Rest[๐พ At Rest]
Isolation --> ServerIso[๐ Server Isolation]
Isolation --> DataIso[๐ Data Isolation]
Security Implementation¶
class SecurityManager:
def __init__(self):
self.auth_manager = AuthenticationManager()
self.authz_manager = AuthorizationManager()
self.crypto = CryptographyManager()
async def authenticate_server(self, server_config):
if server_config.auth_method == "oauth":
return await self.auth_manager.oauth_flow(server_config)
elif server_config.auth_method == "bearer":
return self.auth_manager.bearer_token(server_config)
def encrypt_sensitive_data(self, data):
return self.crypto.encrypt(data)
def authorize_tool_access(self, tool, user_context):
return self.authz_manager.check_permission(tool, user_context)
Performance and Scalability¶
Performance Architecture¶
graph LR
Perf[โก Performance] --> Caching[๐๏ธ Caching]
Perf --> Pooling[๐ Connection Pooling]
Perf --> Async[๐ Async Processing]
Perf --> Monitoring[๐ Monitoring]
Caching --> ToolCache[๐ง Tool Results]
Caching --> ContextCache[๐ Context Cache]
Pooling --> ConnPool[๐ Connection Pool]
Pooling --> ThreadPool[๐งต Thread Pool]
Async --> EventLoop[๐ Event Loop]
Async --> Coroutines[โก Coroutines]
Performance Optimizations¶
class PerformanceManager:
def __init__(self):
self.cache = CacheManager()
self.connection_pool = ConnectionPool()
self.metrics = MetricsCollector()
async def execute_with_cache(self, tool_call):
cache_key = self.generate_cache_key(tool_call)
# Check cache first
cached_result = await self.cache.get(cache_key)
if cached_result:
self.metrics.record_cache_hit(tool_call)
return cached_result
# Execute and cache result
result = await self.execute_tool(tool_call)
await self.cache.set(cache_key, result, ttl=300)
self.metrics.record_cache_miss(tool_call)
return result
Configuration System¶
Configuration Architecture¶
graph TB
Config[โ๏ธ Configuration] --> Env[๐ Environment]
Config --> JSON[๐ JSON Config]
Config --> Runtime[โก Runtime Config]
Env --> APIKeys[๐ API Keys]
Env --> Redis[๐ Redis Config]
Env --> Debug[๐ Debug Settings]
JSON --> LLMConfig[๐ค LLM Config]
JSON --> Servers[๐ฅ๏ธ Server Config]
JSON --> AgentConfig[๐ค Agent Config]
Runtime --> Dynamic[๐ Dynamic Updates]
Runtime --> Validation[โ
Validation]
Configuration Management¶
class ConfigurationManager:
def __init__(self):
self.env_config = self.load_env_config()
self.json_config = self.load_json_config()
self.runtime_config = {}
self.validators = ConfigValidators()
def load_env_config(self):
return {
'llm_api_key': os.getenv('LLM_API_KEY'),
'redis_host': os.getenv('REDIS_HOST', 'localhost'),
'redis_port': int(os.getenv('REDIS_PORT', 6379)),
'debug': os.getenv('DEBUG', 'false').lower() == 'true'
}
def validate_configuration(self):
errors = []
# Validate environment variables
if not self.env_config.get('llm_api_key'):
errors.append("LLM_API_KEY is required")
# Validate JSON configuration
if not self.json_config.get('LLM'):
errors.append("LLM configuration is required")
if errors:
raise ConfigurationError(errors)
Error Handling and Recovery¶
Error Handling Strategy¶
graph TB
Error[โ Error Handling] --> Detection[๐ Detection]
Error --> Classification[๐ Classification]
Error --> Recovery[๐ Recovery]
Error --> Reporting[๐ข Reporting]
Detection --> Monitoring[๐ Monitoring]
Detection --> Logging[๐ Logging]
Classification --> Transient[โฑ๏ธ Transient]
Classification --> Permanent[๐ Permanent]
Classification --> Unknown[โ Unknown]
Recovery --> Retry[๐ Retry]
Recovery --> Fallback[๐ Fallback]
Recovery --> Graceful[โ
Graceful Degradation]
Recovery Implementation¶
class ErrorRecoveryManager:
def __init__(self):
self.retry_policies = RetryPolicies()
self.fallback_strategies = FallbackStrategies()
self.circuit_breakers = CircuitBreakerRegistry()
async def handle_error(self, error, context):
error_type = self.classify_error(error)
if error_type == ErrorType.TRANSIENT:
return await self.retry_with_backoff(context)
elif error_type == ErrorType.PERMANENT:
return await self.execute_fallback(context)
else:
return await self.graceful_degradation(context)
async def retry_with_backoff(self, context, max_retries=3):
for attempt in range(max_retries):
try:
await asyncio.sleep(2 ** attempt) # Exponential backoff
return await context.retry()
except Exception as e:
if attempt == max_retries - 1:
raise e
continue
Monitoring and Observability¶
Observability Stack¶
graph LR
Obs[๐๏ธ Observability] --> Metrics[๐ Metrics]
Obs --> Logging[๐ Logging]
Obs --> Tracing[๐ Tracing]
Obs --> Health[๐ Health Checks]
Metrics --> Performance[โก Performance]
Metrics --> Usage[๐ Usage]
Metrics --> Errors[โ Errors]
Logging --> Structured[๐ Structured]
Logging --> Levels[๐ Log Levels]
Tracing --> Requests[๐ Request Tracing]
Tracing --> Dependencies[๐ Dependency Tracing]
Monitoring Implementation¶
class MonitoringManager:
def __init__(self):
self.metrics_collector = MetricsCollector()
self.logger = StructuredLogger()
self.tracer = DistributedTracer()
self.health_checker = HealthChecker()
def record_tool_execution(self, tool_name, duration, success):
self.metrics_collector.increment(
'tool_executions_total',
tags={'tool': tool_name, 'success': success}
)
self.metrics_collector.histogram(
'tool_execution_duration',
duration,
tags={'tool': tool_name}
)
def log_user_interaction(self, user_input, response, context):
self.logger.info(
"user_interaction",
user_input=user_input,
response_length=len(response),
mode=context.mode,
servers_connected=len(context.servers)
)
Extensibility and Plugin System¶
Plugin Architecture¶
graph TB
Plugin[๐ Plugin System] --> Registry[๐ Plugin Registry]
Plugin --> Loader[๐ฅ Plugin Loader]
Plugin --> Lifecycle[๐ Lifecycle Manager]
Registry --> Transport[๐ Transport Plugins]
Registry --> LLM[๐ค LLM Plugins]
Registry --> Tool[๐ง Tool Plugins]
Registry --> Memory[๐พ Memory Plugins]
Loader --> Discovery[๐ Discovery]
Loader --> Validation[โ
Validation]
Loader --> Installation[๐ฆ Installation]
This architecture provides a solid foundation for MCPOmni Connect's current capabilities while allowing for future expansion and customization.
Next: API Reference โ