Create Gateways
Gateways in Solace Agent Mesh (SAM) serve as bridges between external systems and the A2A (Agent-to-Agent) ecosystem. They enable your agents to receive information from and send responses to diverse external platforms like chat systems, web applications, IoT devices, APIs, and file systems.
This guide walks you through the steps of creating custom gateways, from basic concepts to advanced implementations.
What Are Gateways?
A gateway acts as a translator and coordinator that:
- Receives events, messages, or data from external systems
- Authenticates and authorizes external interactions
- Translates external data into standardized A2A
Task
format - Submits tasks to target A2A agents for processing
- Receives responses and status updates from agents
- Translates A2A responses back to external system format
- Sends results back to the originating external system
Quick Start: Creating Your First Gateway
You can create a gateway directly using the SAM CLI sam add gateway
:
sam add gateway my-custom-gateway
This command:
- Launches an interactive setup (or use
--gui
for browser-based configuration) - Generates the necessary files and configuration
- Sets up the basic gateway structure
CLI Options
You can customize the gateway creation with these options:
sam add gateway my-gateway \
--namespace "myorg/dev" \
--gateway-id "my-custom-gw-id" \
--artifact-service-type "filesystem" \
--artifact-service-base-path "var/data/my-gateway-artifacts" \
--system-purpose "This gateway processes external data feeds" \
--response-format "Agents should respond with structured JSON"
For a complete list of options, run:
sam add gateway --help
Gateway Architecture
Every SAM gateway consists of two main components:
Gateway App
Gateway App (app.py
):
- Defines configuration schema
- Manages gateway-level settings
- Links to the gateway component
Gateway Component
Gateway Component (component.py
):
- Contains the core business logic
- Handles external system integration
- Implements required abstract methods
Step-by-Step Tutorial
Let's create a practical example, Directory Monitor Gateway, a gateway that monitors a directory for new files and sends them to agents for processing.
You can create a gateway using either sam add gateway <your_gateway_name>
command directly or sam plugin create <your_gateway_plugin_name> --type gateway
command as gateway plugin.
Gateways can also be implemented as plugins. This allows you to easily package your gateway logic and reuse it across different projects.
To create a plugin of type gateway, use the sam plugin create <your_gateway_plugin_name> --type gateway
command.
For a complete list of options, run:
sam plugin create --help
To create a gateway instance based on a plugin, use the sam plugin add <your_gateway_name> --plugin <your_gateway_plugin>
command.
For a complete list of options, run:
sam plugin add --help
Although the specific directory structure may differ from standalone gateways, the core concepts remain the same. The core files remain the same: app.py, component.py, and the YAML configuration file.
Step 1: Generate the Gateway Structure
This tutorial shows you how to create a new gateway with the sam add gateway
command.
sam add gateway dir-monitor
This creates:
configs/gateways/dir_monitor_config.yaml
- Configuration filesrc/dir_monitor/app.py
- Gateway app classsrc/dir_monitor/component.py
- Gateway component class
Step 2: Define Configuration Schema
Define Configuration Schema (app.py
)
# src/dir_monitor/app.py
from typing import Any, Dict, List, Type
from solace_ai_connector.common.log import log
from solace_agent_mesh.gateway.base.app import BaseGatewayApp
from solace_agent_mesh.gateway.base.component import BaseGatewayComponent
from .component import DirMonitorGatewayComponent
# Module info required by SAC
info = {
"class_name": "DirMonitorGatewayApp",
"description": "Custom App class for the A2A DirMonitor Gateway.",
}
class DirMonitorGatewayApp(BaseGatewayApp):
"""
Directory Monitor Gateway App
Extends BaseGatewayApp with directory monitoring specific configuration.
"""
# Define gateway-specific configuration parameters
SPECIFIC_APP_SCHEMA_PARAMS: List[Dict[str, Any]] = [
{
"name": "directory_path",
"required": True,
"type": "string",
"description": "The directory path to monitor for changes.",
},
{
"name": "target_agent_name",
"required": False,
"type": "string",
"default": "OrchestratorAgent",
"description": "The A2A agent to send tasks to.",
},
{
"name": "default_user_identity",
"required": False,
"type": "string",
"default": "dir_monitor_user",
"description": "Default user identity for A2A tasks.",
},
{
"name": "error_directory_path",
"required": True,
"type": "string",
"description": "Directory to move files if processing fails.",
},
]
def __init__(self, app_info: Dict[str, Any], **kwargs):
log_prefix = app_info.get("name", "DirMonitorGatewayApp")
log.info("[%s] Initializing Directory Monitor Gateway App...", log_prefix)
super().__init__(app_info=app_info, **kwargs)
log.info("[%s] Directory Monitor Gateway App initialized.", self.name)
def _get_gateway_component_class(self) -> Type[BaseGatewayComponent]:
"""Returns the gateway component class for this app."""
return DirMonitorGatewayComponent
Step 3: Implement Core Logic
Implement Core Logic (component.py
)
# src/dir_monitor/component.py
import asyncio
import os
import shutil
import mimetypes
import threading
from typing import Any, Dict, List, Optional, Tuple, Union
from datetime import datetime, timezone
from solace_ai_connector.common.log import log
# Import watchdog for file system monitoring
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
WATCHDOG_AVAILABLE = True
except ImportError:
WATCHDOG_AVAILABLE = False
Observer = None
FileSystemEventHandler = None
from solace_agent_mesh.gateway.base.component import BaseGatewayComponent
from solace_agent_mesh.common.types import (
Part as A2APart,
TextPart,
FilePart,
Task,
TaskStatusUpdateEvent,
TaskArtifactUpdateEvent,
JSONRPCError,
FileContent,
)
from solace_agent_mesh.agent.utils.artifact_helpers import save_artifact_with_metadata
# Component info
info = {
"class_name": "DirMonitorGatewayComponent",
"description": "Monitors directories for new files and processes them via A2A agents.",
}
class DirMonitorGatewayComponent(BaseGatewayComponent):
"""
Directory Monitor Gateway Component
Watches a directory and creates A2A tasks for new files.
"""
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
log.info("%s Initializing Directory Monitor Gateway Component...", self.log_identifier)
# Check if watchdog is available
if not WATCHDOG_AVAILABLE:
log.error("%s Watchdog library not found. Install with: pip install watchdog",
self.log_identifier)
raise ImportError("Watchdog library required for directory monitoring")
# Load configuration
try:
self.directory_path = self.get_config("directory_path")
self.target_agent_name = self.get_config("target_agent_name", "OrchestratorAgent")
self.default_user_identity_id = self.get_config("default_user_identity", "dir_monitor_user")
self.error_directory_path = self.get_config("error_directory_path")
# Validate directories
if not os.path.isdir(self.directory_path):
raise ValueError(f"Monitor directory not found: {self.directory_path}")
os.makedirs(self.error_directory_path, exist_ok=True)
log.info("%s Monitoring: %s, Error dir: %s",
self.log_identifier, self.directory_path, self.error_directory_path)
except Exception as e:
log.error("%s Configuration error: %s", self.log_identifier, e)
raise
# Initialize monitoring components
self.observer: Optional[Observer] = None
self.watchdog_thread: Optional[threading.Thread] = None
log.info("%s Directory Monitor Gateway Component initialized.", self.log_identifier)
class DirWatchEventHandler(FileSystemEventHandler):
"""Handles file system events from Watchdog."""
def __init__(self, component_ref: 'DirMonitorGatewayComponent'):
super().__init__()
self.component_ref = component_ref
self.log_identifier = f"{component_ref.log_identifier}[FileHandler]"
def on_created(self, event):
if event.is_directory:
return
file_path = event.src_path
log.info("%s New file detected: %s", self.log_identifier, file_path)
# Bridge to async loop
if self.component_ref.async_loop and self.component_ref.async_loop.is_running():
asyncio.run_coroutine_threadsafe(
self.component_ref._process_new_file(file_path),
self.component_ref.async_loop
)
else:
log.error("%s Async loop not available for file: %s",
self.log_identifier, file_path)
def generate_uuid(self) -> str:
"""Generate a unique identifier."""
import uuid
return str(uuid.uuid4())
def _start_listener(self) -> None:
"""Start the directory monitoring listener."""
log_id_prefix = f"{self.log_identifier}[StartListener]"
log.info("%s Starting directory monitor for: %s", log_id_prefix, self.directory_path)
if not WATCHDOG_AVAILABLE:
log.error("%s Watchdog not available", log_id_prefix)
self.stop_signal.set()
return
# Set up file system observer
self.observer = Observer()
event_handler = self.DirWatchEventHandler(self)
self.observer.schedule(event_handler, self.directory_path, recursive=False)
# Start observer in separate thread
self.watchdog_thread = threading.Thread(
target=self._run_observer,
name=f"{self.name}_WatchdogThread",
daemon=True
)
self.watchdog_thread.start()
log.info("%s Directory monitor started", log_id_prefix)
def _run_observer(self):
"""Run the watchdog observer."""
if not self.observer:
return
log_id_prefix = f"{self.log_identifier}[Observer]"
try:
log.info("%s Starting file system observer...", log_id_prefix)
self.observer.start()
# Wait for stop signal
while not self.stop_signal.is_set() and self.observer.is_alive():
self.stop_signal.wait(timeout=1)
log.info("%s Observer loop exiting", log_id_prefix)
except Exception as e:
log.exception("%s Observer error: %s", log_id_prefix, e)
self.stop_signal.set()
finally:
if self.observer.is_alive():
self.observer.stop()
self.observer.join()
log.info("%s Observer stopped", log_id_prefix)
def _stop_listener(self) -> None:
"""Stop the directory monitoring listener."""
log_id_prefix = f"{self.log_identifier}[StopListener]"
log.info("%s Stopping directory monitor...", log_id_prefix)
if self.observer and self.observer.is_alive():
log.info("%s Stopping observer...", log_id_prefix)
self.observer.stop()
if self.watchdog_thread and self.watchdog_thread.is_alive():
log.info("%s Joining observer thread...", log_id_prefix)
self.watchdog_thread.join(timeout=5)
if self.watchdog_thread.is_alive():
log.warning("%s Observer thread did not join cleanly", log_id_prefix)
log.info("%s Directory monitor stopped", log_id_prefix)
async def _process_new_file(self, file_path: str):
"""Process a newly detected file."""
log_id_prefix = f"{self.log_identifier}[ProcessFile:{os.path.basename(file_path)}]"
log.info("%s Processing new file: %s", log_id_prefix, file_path)
error_context = {
"file_path": file_path,
"a2a_session_id": f"dir_monitor-error-{self.generate_uuid()}"
}
try:
# Step 1: Authenticate and enrich user
user_identity_profile = await self.authenticate_and_enrich_user(file_path)
if not user_identity_profile:
log.error("%s Authentication failed for file: %s", log_id_prefix, file_path)
error_obj = JSONRPCError(code=-32001, message="Authentication failed")
await self._send_error_to_external(error_context, error_obj)
return
# Step 2: Translate external input to A2A format
target_agent_name, a2a_parts, external_request_context = await self._translate_external_input(
file_path, user_identity_profile
)
if not target_agent_name or not a2a_parts:
log.error("%s Failed to translate file to A2A task: %s", log_id_prefix, file_path)
error_obj = JSONRPCError(code=-32002, message="Failed to translate file to A2A task")
final_error_context = {**error_context, **external_request_context}
await self._send_error_to_external(final_error_context, error_obj)
return
# Step 3: Submit A2A task
log.info("%s Submitting A2A task for file: %s to agent: %s",
log_id_prefix, file_path, target_agent_name)
await self.submit_a2a_task(
target_agent_name=target_agent_name,
a2a_parts=a2a_parts,
external_request_context=external_request_context,
user_identity=user_identity_profile
)
log.info("%s A2A task submitted for file: %s", log_id_prefix, file_path)
except FileNotFoundError:
log.error("%s File not found during processing: %s", log_id_prefix, file_path)
except Exception as e:
log.exception("%s Unexpected error processing file %s: %s", log_id_prefix, file_path, e)
error_obj = JSONRPCError(code=-32000, message=f"Unexpected error: {e}")
await self._send_error_to_external(error_context, error_obj)
async def _extract_initial_claims(self, external_event_data: Any) -> Optional[Dict[str, Any]]:
"""Extract user identity claims from file event."""
file_path = str(external_event_data)
log_id_prefix = f"{self.log_identifier}[ExtractClaims:{os.path.basename(file_path)}]"
claims = {
"id": self.default_user_identity_id,
"source": "dir_monitor",
"file_path": file_path
}
log.debug("%s Extracted claims for file %s: %s", log_id_prefix, file_path, claims)
return claims
async def _translate_external_input(
self, external_event_data: Any, authenticated_user_identity: Dict[str, Any]
) -> Tuple[Optional[str], List[A2APart], Dict[str, Any]]:
"""Translate file event to A2A task format."""
file_path = str(external_event_data)
log_id_prefix = f"{self.log_identifier}[TranslateInput:{os.path.basename(file_path)}]"
user_id_for_a2a = authenticated_user_identity.get("id", self.default_user_identity_id)
a2a_session_id = f"dir_monitor-session-{self.generate_uuid()}"
# Prepare external request context
external_request_context: Dict[str, Any] = {
"file_path": file_path,
"user_id_for_a2a": user_id_for_a2a,
"app_name_for_artifacts": self.gateway_id,
"user_id_for_artifacts": user_id_for_a2a,
"a2a_session_id": a2a_session_id,
}
a2a_parts: List[A2APart] = []
try:
# Check if file exists
if not os.path.exists(file_path):
log.error("%s File does not exist: %s", log_id_prefix, file_path)
raise FileNotFoundError(f"File not found: {file_path}")
# Read file content
with open(file_path, "rb") as f:
content_bytes = f.read()
# Determine MIME type
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type is None:
mime_type = "application/octet-stream"
# Save file as artifact
if not self.shared_artifact_service:
log.error("%s Artifact service not available for file: %s",
log_id_prefix, os.path.basename(file_path))
return None, [], external_request_context
artifact_metadata = {
"source": "dir_monitor_gateway",
"original_filename": os.path.basename(file_path),
"detected_mime_type": mime_type,
"processing_timestamp_utc": datetime.now(timezone.utc).isoformat(),
}
log.debug("%s Saving artifact for file: %s", log_id_prefix, file_path)
save_result = await save_artifact_with_metadata(
artifact_service=self.shared_artifact_service,
app_name=self.gateway_id,
user_id=str(user_id_for_a2a),
session_id=a2a_session_id,
filename=os.path.basename(file_path),
content_bytes=content_bytes,
mime_type=mime_type,
metadata_dict=artifact_metadata,
timestamp=datetime.now(timezone.utc),
)
if save_result["status"] not in ["success", "partial_success"]:
log.error("%s Failed to save file as artifact: %s",
log_id_prefix, save_result.get("message"))
return None, [], external_request_context
# Create artifact URI
data_version = save_result.get("data_version", 0)
artifact_uri = f"artifact://{self.gateway_id}/{str(user_id_for_a2a)}/{a2a_session_id}/{os.path.basename(file_path)}?version={data_version}"
log.info("%s Saved file as artifact: %s", log_id_prefix, artifact_uri)
# Create A2A parts
file_content_obj = FileContent(
name=os.path.basename(file_path),
uri=artifact_uri,
mimeType=mime_type
)
a2a_parts.append(FilePart(file=file_content_obj))
a2a_parts.append(TextPart(
text=f"Please analyze and summarize the content of: {os.path.basename(file_path)}"
))
log.info("%s Successfully translated file %s into A2A parts", log_id_prefix, file_path)
return self.target_agent_name, a2a_parts, external_request_context
except Exception as e:
log.exception("%s Error translating file %s: %s", log_id_prefix, file_path, e)
return None, [], external_request_context
async def _send_final_response_to_external(
self, external_request_context: Dict[str, Any], task_data: Task
) -> None:
"""Handle final response from A2A agent."""
log_id_prefix = f"{self.log_identifier}[SendFinalResponse]"
file_path = external_request_context.get("file_path", "Unknown file")
task_id = task_data.id
# Extract summary from response
summary_text = "Summary not available."
if task_data.status and task_data.status.message and task_data.status.message.parts:
for part in task_data.status.message.parts:
if isinstance(part, TextPart):
summary_text = part.text
break
log.info("%s Task %s completed for file '%s'. Status: %s",
log_id_prefix, task_id, os.path.basename(file_path),
task_data.status.state if task_data.status else "Unknown")
log.info("%s Summary: %s", log_id_prefix, summary_text[:200] + "..." if len(summary_text) > 200 else summary_text)
async def _send_error_to_external(
self, external_request_context: Dict[str, Any], error_data: JSONRPCError
) -> None:
"""Handle errors by moving files to error directory."""
log_id_prefix = f"{self.log_identifier}[SendError]"
file_path = external_request_context.get("file_path")
log.error("%s A2A Error for file '%s'. Code: %s, Message: %s",
log_id_prefix,
os.path.basename(file_path) if file_path else "Unknown file",
error_data.code, error_data.message)
# Move problematic file to error directory
if file_path and os.path.exists(file_path):
try:
os.makedirs(self.error_directory_path, exist_ok=True)
base_name = os.path.basename(file_path)
error_file_path = os.path.join(self.error_directory_path, base_name)
# Handle filename conflicts
counter = 0
while os.path.exists(error_file_path):
counter += 1
name, ext = os.path.splitext(base_name)
error_file_path = os.path.join(self.error_directory_path, f"{name}_error_{counter}{ext}")
shutil.move(file_path, error_file_path)
log.info("%s Moved problematic file %s to %s", log_id_prefix, file_path, error_file_path)
except Exception as e:
log.exception("%s Failed to move file %s to error directory: %s",
log_id_prefix, file_path, e)
async def _send_update_to_external(
self,
external_request_context: Dict[str, Any],
event_data: Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent],
is_final_chunk_of_update: bool,
) -> None:
"""Handle intermediate updates (optional for this gateway)."""
log_id_prefix = f"{self.log_identifier}[SendUpdate]"
task_id = event_data.id
file_path = external_request_context.get("file_path", "Unknown file")
log.debug("%s Received update for task %s (file %s). Updates not processed by this gateway.",
log_id_prefix, task_id, os.path.basename(file_path))
def cleanup(self):
"""Clean up resources."""
log.info("%s Cleaning up Directory Monitor Gateway Component...", self.log_identifier)
super().cleanup()
log.info("%s Directory Monitor Gateway Component cleanup finished.", self.log_identifier)
Step 4: Configure the Gateway
Configure the Gateway (dir_monitor_config.yaml
)
# configs/gateways/dir_monitor_config.yaml
log:
stdout_log_level: INFO
log_file_level: DEBUG
log_file: "dir_monitor_gateway.log"
!include ../shared_config.yaml
apps:
- name: dir_monitor_gateway_app
app_base_path: .
app_module: src.dir_monitor.app
broker:
<<: *broker_connection
app_config:
namespace: ${NAMESPACE}
gateway_id: dir-monitor-gateway
# Artifact service configuration
artifact_service: *default_artifact_service
# Authorization service
authorization_service:
type: "none"
# System purpose for A2A context
system_purpose: >
This system monitors directories for new files and processes them automatically.
Analyze and summarize file contents. Always provide useful insights about the files.
Your external name is Directory Monitor Agent.
response_format: >
Responses should be clear, concise, and professionally formatted.
Provide structured analysis of file contents in Markdown format.
# Gateway-specific configuration
directory_path: /path/to/monitor/directory
error_directory_path: /path/to/error/directory
target_agent_name: "OrchestratorAgent"
default_user_identity: "dir_monitor_system"
Step 5: Install Dependencies
Add required dependencies to your project:
pip install watchdog
Step 6: Run Your Gateway
sam run configs/gateways/dir_monitor_config.yaml
Advanced Gateway Patterns
Authentication and Authorization
Gateways can implement sophisticated authentication:
async def _extract_initial_claims(self, external_event_data: Any) -> Optional[Dict[str, Any]]:
"""Extract user claims with API key validation."""
request = external_event_data.get("request")
# Validate API key
api_key = request.headers.get("X-API-Key")
if not api_key or not self._validate_api_key(api_key):
return None
# Extract user information
user_id = request.headers.get("X-User-ID", "anonymous")
return {
"id": user_id,
"source": "api_gateway",
"api_key_hash": hashlib.sha256(api_key.encode()).hexdigest()[:8],
"roles": self._get_user_roles(user_id)
}
File Handling with Artifacts
For gateways that handle files:
async def _save_file_as_artifact(self, file_content: bytes, filename: str,
mime_type: str, session_id: str) -> Optional[str]:
"""Save file content as artifact and return URI."""
if not self.shared_artifact_service:
return None
try:
save_result = await save_artifact_with_metadata(
artifact_service=self.shared_artifact_service,
app_name=self.gateway_id,
user_id="system",
session_id=session_id,
filename=filename,
content_bytes=file_content,
mime_type=mime_type,
metadata_dict={
"source": "my_gateway",
"upload_timestamp": datetime.now(timezone.utc).isoformat()
},
timestamp=datetime.now(timezone.utc)
)
if save_result["status"] in ["success", "partial_success"]:
version = save_result.get("data_version", 0)
return f"artifact://{self.gateway_id}/system/{session_id}/{filename}?version={version}"
except Exception as e:
log.error("Failed to save artifact: %s", e)
return None
Streaming Responses
Handle streaming responses from agents:
async def _send_update_to_external(
self, external_request_context: Dict[str, Any],
event_data: Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent],
is_final_chunk_of_update: bool
) -> None:
"""Send streaming updates to external system."""
if isinstance(event_data, TaskStatusUpdateEvent):
if event_data.status and event_data.status.message:
for part in event_data.status.message.parts:
if isinstance(part, TextPart):
# Send partial text to external system
await self._send_partial_response(
external_request_context,
part.text,
is_final=is_final_chunk_of_update
)
Error Handling and Retry Logic
Implement robust error handling:
async def _process_with_retry(self, data: Any, max_retries: int = 3):
"""Process data with retry logic."""
for attempt in range(max_retries):
try:
return await self._process_data(data)
except TemporaryError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
log.warning("Attempt %d failed, retrying in %ds: %s",
attempt + 1, wait_time, e)
await asyncio.sleep(wait_time)
else:
raise
except PermanentError:
# Don't retry permanent errors
raise
Best Practices
1. Configuration Management
- Use environment variables for sensitive data
- Provide sensible defaults
- Validate configuration at startup
2. Error Handling
- Implement comprehensive error handling
- Use appropriate HTTP status codes
- Log errors with sufficient context
- Provide meaningful error messages
3. Security
- Validate all external inputs
- Use secure authentication methods
- Implement rate limiting where appropriate
- Store secrets securely (use environment variables)
- Follow principle of least privilege
4. Performance
- Use async/await for I/O operations
- Implement connection pooling for external APIs
- Monitor resource usage
- Handle backpressure appropriately
5. Monitoring and Logging
- Use structured logging
- Include correlation IDs
- Monitor key metrics (latency, error rates, throughput)
- Set up health checks
Common Gateway Patterns
HTTP/REST API Gateway
For HTTP-based integrations:
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer
class HTTPAPIGatewayComponent(BaseGatewayComponent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.app = FastAPI()
self.security = HTTPBearer()
self._setup_routes()
def _setup_routes(self):
@self.app.post("/webhook/{endpoint_id}")
async def webhook_handler(endpoint_id: str, request: Request,
token: str = Depends(self.security)):
# Authenticate request
user_identity = await self.authenticate_and_enrich_user({
"token": token,
"endpoint_id": endpoint_id,
"request": request
})
if not user_identity:
raise HTTPException(status_code=401, detail="Unauthorized")
# Process webhook
body = await request.json()
target_agent, parts, context = await self._translate_external_input(
body, user_identity
)
task_id = await self.submit_a2a_task(
target_agent_name=target_agent,
a2a_parts=parts,
external_request_context=context,
user_identity=user_identity
)
return {"task_id": task_id, "status": "accepted"}
WebSocket Gateway
For real-time bidirectional communication:
import websockets
import json
class WebSocketGatewayComponent(BaseGatewayComponent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.connections = {}
async def _start_listener(self):
"""Start WebSocket server."""
self.server = await websockets.serve(
self.handle_websocket,
self.get_config("websocket_host", "localhost"),
self.get_config("websocket_port", 8765)
)
log.info("%s WebSocket server started", self.log_identifier)
async def handle_websocket(self, websocket, path):
"""Handle WebSocket connections."""
connection_id = self.generate_uuid()
self.connections[connection_id] = websocket
try:
async for message in websocket:
data = json.loads(message)
await self.process_websocket_message(connection_id, data)
except websockets.exceptions.ConnectionClosed:
log.info("%s WebSocket connection closed: %s", self.log_identifier, connection_id)
finally:
self.connections.pop(connection_id, None)
async def process_websocket_message(self, connection_id: str, data: dict):
"""Process incoming WebSocket message."""
user_identity = await self.authenticate_and_enrich_user({
"connection_id": connection_id,
"data": data
})
if user_identity:
target_agent, parts, context = await self._translate_external_input(
data, user_identity
)
context["connection_id"] = connection_id
await self.submit_a2a_task(
target_agent_name=target_agent,
a2a_parts=parts,
external_request_context=context,
user_identity=user_identity
)
async def _send_final_response_to_external(self, context: Dict[str, Any], task_data: Task):
"""Send response back via WebSocket."""
connection_id = context.get("connection_id")
websocket = self.connections.get(connection_id)
if websocket:
response = {
"task_id": task_data.id,
"status": task_data.status.state.value if task_data.status else "unknown",
"result": self._extract_text_from_task(task_data)
}
await websocket.send(json.dumps(response))
Message Queue Gateway
For integration with message queues:
import asyncio
import aio_pika
class MessageQueueGatewayComponent(BaseGatewayComponent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.connection = None
self.channel = None
async def _start_listener(self):
"""Connect to message queue and start consuming."""
connection_url = self.get_config("rabbitmq_url")
queue_name = self.get_config("input_queue_name")
self.connection = await aio_pika.connect_robust(connection_url)
self.channel = await self.connection.channel()
queue = await self.channel.declare_queue(queue_name, durable=True)
await queue.consume(self.process_message)
log.info("%s Started consuming from queue: %s", self.log_identifier, queue_name)
async def process_message(self, message: aio_pika.IncomingMessage):
"""Process incoming queue message."""
async with message.process():
try:
data = json.loads(message.body.decode())
user_identity = await self.authenticate_and_enrich_user(data)
if not user_identity:
log.warning("%s Authentication failed for message", self.log_identifier)
return
target_agent, parts, context = await self._translate_external_input(
data, user_identity
)
context["message_id"] = message.message_id
context["reply_to"] = message.reply_to
await self.submit_a2a_task(
target_agent_name=target_agent,
a2a_parts=parts,
external_request_context=context,
user_identity=user_identity
)
except Exception as e:
log.exception("%s Error processing message: %s", self.log_identifier, e)
async def _send_final_response_to_external(self, context: Dict[str, Any], task_data: Task):
"""Send response back to reply queue."""
reply_to = context.get("reply_to")
if reply_to and self.channel:
response = {
"task_id": task_data.id,
"status": task_data.status.state.value if task_data.status else "unknown",
"result": self._extract_text_from_task(task_data)
}
await self.channel.default_exchange.publish(
aio_pika.Message(json.dumps(response).encode()),
routing_key=reply_to
)
Packaging as a Plugin
For distribution and reusability, package your gateway as a plugin:
1. Create Plugin Structure
The following structure is created when running the sam plugin create my-gateway-plugin --type gateway
command:
my-gateway-plugin/
├── pyproject.toml
├── README.md
├── src/
│ └── sam_my_gateway/
│ ├── __init__.py
│ ├── app.py
│ ├── component.py
├── config.yaml
└── examples/
└── my_gateway_example.yaml
2. Configure pyproject.toml
Update the pyproject.toml
file to include your gateway dependencies:
...
dependencies = [
"watchdog>=3.0.0", # Add your specific dependencies
]
...
3. Build and Install
# Build the plugin
sam plugin build
# Install plugin from local wheel file
sam plugin add my-gateway --plugin dist/sam_my_gateway-0.1.0-py3-none-any.whl
Troubleshooting
Common Issues
Gateway Fails to Start
- Check configuration schema validation
- Verify all required parameters are provided
- Ensure external dependencies are installed
Tasks Not Reaching Agents
- Verify namespace configuration matches agents
- Check Solace broker connectivity
- Confirm agent names are correct
Authentication Failures
- Validate user identity extraction logic
- Check authorization service configuration
- Verify claims format matches expectations
File/Artifact Issues
- Ensure artifact service is properly configured
- Check file permissions and paths
- Verify artifact URI construction
Debugging Tips
-
Enable Debug Logging:
log:
stdout_log_level: DEBUG
log_file_level: DEBUG -
Use Test Agents: Create simple echo agents for testing gateway integration
-
Monitor Solace Topics: Use Solace monitoring tools to trace message flow
-
Add Correlation IDs: Include unique identifiers in logs for request tracing