Message Bus Architecture
Event-driven communication system for inter-component messaging in ElizaOS
Message Bus Architecture
The ElizaOS message bus provides a centralized event-driven communication system that enables loose coupling between components. It facilitates message distribution from the server to agents and handles various system events throughout the application lifecycle.
Architecture Overview
The message bus follows a publish-subscribe pattern where:
- Publishers: Components that emit events (server, agents, plugins)
- Subscribers: Components that listen for events (agents, services, plugins)
- Events: Structured messages with type and payload information
Core Architecture Principles
elizaOS uses a centralized message bus architecture with the following key characteristics:
- Central Bus: All messages flow through a central bus (channel ID:
00000000-0000-0000-0000-000000000000
) - Organizational DM Channels: Direct message channels for conversation persistence
- Single Socket.IO Connection: Clients maintain one connection with channel-based message filtering
- Centralized Database: UUID-based identifiers for all entities
Hierarchical Entity Model
The system follows a hierarchical structure:
World (Server/Platform)
├── Rooms (Channels/Conversations)
│ ├── Participants
│ └── Messages
└── Entities (Users/Agents)
└── Memories (Agent-specific message records)
Internal Message Bus
The core message bus is implemented as a simple in-memory EventEmitter:
// packages/server/src/bus.ts
import EventEmitter from "events";
class InternalMessageBus extends EventEmitter {}
const internalMessageBus = new InternalMessageBus();
// Increased listener limit for multiple agents
internalMessageBus.setMaxListeners(50);
export default internalMessageBus;
Message Bus Service
The MessageBusService
connects individual agents to the central message bus:
import { MessageBusService } from "@elizaos/server";
// Service is automatically registered with agents
export const messageBusConnectorPlugin = {
name: "internal-message-bus-connector",
description: "Internal service to connect agent to the message bus",
services: [MessageBusService],
};
Event Types
The message bus handles several core event types:
Message Events
// New message from server to agents
internalMessageBus.emit("new_message", {
id: "msg-123",
channel_id: "channel-456",
server_id: "server-789",
author_id: "user-abc",
content: "Hello, world!",
created_at: Date.now(),
metadata: { source: "web_client" },
});
Server Events
// Agent server association updates
internalMessageBus.emit("server_agent_update", {
type: "agent_added_to_server",
agentId: "agent-123",
serverId: "server-456",
});
internalMessageBus.emit("server_agent_update", {
type: "agent_removed_from_server",
agentId: "agent-123",
serverId: "server-456",
});
Message Management Events
// Message deletion
internalMessageBus.emit("message_deleted", {
messageId: "msg-123",
channelId: "channel-456",
});
// Channel clearing
internalMessageBus.emit("channel_cleared", {
channelId: "channel-456",
messageCount: 25,
});
Message Flow
Complete Message Flow
The message flow follows this sequence:
- Client sends message via REST API →
/api/messaging/submit
- API validates and stores message → Database persistence
- Message Bus emits event →
new_message
event - MessageBusService processes for each agent → Agent-specific handling
- Agent validates channel participation → Access control
- Agent processes message, generates response → AI processing
- Response sent back to central bus → Via REST API
- Socket.IO broadcasts to connected clients → Real-time updates
1. Message Submission
When a message is created through the server:
// Server creates message and publishes to bus
const createdMessage = await server.createMessage({
channelId: "channel-123",
authorId: "user-456",
content: "Hello, agent!",
sourceType: "web_client",
});
// Transform to bus format
const messageForBus = {
id: createdMessage.id,
channel_id: createdMessage.channelId,
server_id: channel.messageServerId,
author_id: createdMessage.authorId,
content: createdMessage.content,
created_at: createdMessage.createdAt.getTime(),
metadata: createdMessage.metadata,
};
internalMessageBus.emit("new_message", messageForBus);
2. Agent Processing
Agents receive messages through the MessageBusService:
export class MessageBusService extends Service {
private async handleIncomingMessage(message: MessageServiceMessage) {
// Validate server subscription
if (!this.subscribedServers.has(message.server_id)) {
return;
}
// Check channel participation
const participants = await this.getChannelParticipants(message.channel_id);
if (!participants.includes(this.runtime.agentId)) {
return;
}
// Process message and emit to agent runtime
await this.runtime.emitEvent(EventType.MESSAGE_RECEIVED, {
runtime: this.runtime,
message: agentMemory,
callback: this.handleAgentResponse.bind(this),
});
}
}
3. Agent Response
When agents respond, they send back through the bus:
private async sendAgentResponseToBus(content: Content) {
const payload = {
channel_id: channelId,
server_id: serverId,
author_id: this.runtime.agentId,
content: content.text,
source_type: 'agent_response',
metadata: {
agent_id: this.runtime.agentId,
agentName: this.runtime.character.name
}
};
// Send to central server API
await fetch('/api/messaging/submit', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
}
Service Registration
The MessageBusService is automatically registered with agents:
// Auto-registration in AgentServer
export class AgentServer {
async registerAgent(runtime: IAgentRuntime) {
// Register the MessageBusConnector plugin
if (messageBusConnectorPlugin) {
await runtime.registerPlugin(messageBusConnectorPlugin);
}
// Service starts automatically
// - Connects to message bus
// - Fetches subscribed servers
// - Validates channel access
}
}
Channel Management
Server Subscription
Agents maintain subscriptions to specific servers:
private async fetchAgentServers() {
const response = await fetch(
`/api/messaging/agents/${this.runtime.agentId}/servers`
);
const data = await response.json();
this.subscribedServers = new Set(data.servers);
}
Channel Validation
Before processing messages, agents validate channel access:
private async getChannelParticipants(channelId: UUID): Promise<string[]> {
const response = await fetch(
`/api/messaging/central-channels/${channelId}/participants`
);
const data = await response.json();
return data.success ? data.data : [];
}
Error Handling
Connection Resilience
export class MessageBusService extends Service {
async stop(): Promise<void> {
// Clean up event listeners
internalMessageBus.off("new_message", this.boundHandleIncomingMessage);
internalMessageBus.off("server_agent_update", this.boundHandleServerAgentUpdate);
internalMessageBus.off("message_deleted", this.boundHandleMessageDeleted);
internalMessageBus.off("channel_cleared", this.boundHandleChannelCleared);
}
}
Message Validation
private async validateMessage(message: MessageServiceMessage): Promise<boolean> {
// Server subscription check
if (!this.subscribedServers.has(message.server_id)) {
return false;
}
// Self-message check
if (message.author_id === this.runtime.agentId) {
return false;
}
// Channel participation check
const participants = await this.getChannelParticipants(message.channel_id);
return participants.includes(this.runtime.agentId);
}
Critical Implementation Details
Channel Participation
Agents must be explicitly added to channels to receive messages:
// Agents validate channel participation before processing
const participants = await this.getChannelParticipants(message.channel_id);
if (!participants.includes(this.runtime.agentId)) {
return; // Agent not in channel, skip message
}
Central Channel Importance
The central channel (00000000-0000-0000-0000-000000000000
) is crucial for:
- Agent-to-agent communication
- System-wide broadcasts
- Coordination between multiple agents
Message Transformation
Messages undergo transformation between different formats:
- Client Format: User-friendly structure
- Bus Format: Internal event structure
- Agent Format: Runtime-specific memory format
Multi-Process Considerations
The current implementation is designed for single-process deployments:
/**
* A simple in-memory message bus for distributing messages from the server
* to subscribed MessageBusService instances within the same process.
*
* For multi-process or multi-server deployments, this would need to be replaced
* with a more robust solution like Redis Pub/Sub, Kafka, RabbitMQ, etc.
*/
Scaling Options
For production deployments requiring multi-process support:
Redis Pub/Sub
import Redis from "ioredis";
class RedisMessageBus {
private pub: Redis;
private sub: Redis;
constructor() {
this.pub = new Redis(process.env.REDIS_URL);
this.sub = new Redis(process.env.REDIS_URL);
}
emit(event: string, data: any) {
this.pub.publish(`eliza:${event}`, JSON.stringify(data));
}
on(event: string, handler: Function) {
this.sub.subscribe(`eliza:${event}`);
this.sub.on("message", (channel, message) => {
if (channel === `eliza:${event}`) {
handler(JSON.parse(message));
}
});
}
}
Apache Kafka
import { Kafka } from "kafkajs";
class KafkaMessageBus {
private kafka: Kafka;
private producer: any;
private consumer: any;
constructor() {
this.kafka = new Kafka({
clientId: "eliza-message-bus",
brokers: [process.env.KAFKA_BROKER],
});
}
async emit(event: string, data: any) {
await this.producer.send({
topic: "eliza-events",
messages: [
{
key: event,
value: JSON.stringify(data),
},
],
});
}
async on(event: string, handler: Function) {
await this.consumer.subscribe({ topic: "eliza-events" });
await this.consumer.run({
eachMessage: async ({ message }) => {
if (message.key?.toString() === event) {
handler(JSON.parse(message.value?.toString() || "{}"));
}
},
});
}
}
Performance Monitoring
Event Metrics
class MessageBusMetrics {
private eventCounts = new Map<string, number>();
private eventLatencies = new Map<string, number[]>();
trackEvent(eventType: string, startTime: number) {
// Count events
this.eventCounts.set(eventType, (this.eventCounts.get(eventType) || 0) + 1);
// Track latency
const latency = Date.now() - startTime;
const latencies = this.eventLatencies.get(eventType) || [];
latencies.push(latency);
this.eventLatencies.set(eventType, latencies.slice(-100)); // Keep last 100
}
getMetrics() {
return {
eventCounts: Object.fromEntries(this.eventCounts),
averageLatencies: Object.fromEntries(
Array.from(this.eventLatencies.entries()).map(([event, latencies]) => [
event,
latencies.reduce((a, b) => a + b, 0) / latencies.length,
])
),
};
}
}
Best Practices
Event Naming
// Use descriptive, hierarchical event names
const EVENT_TYPES = {
MESSAGE: {
RECEIVED: "message:received",
SENT: "message:sent",
DELETED: "message:deleted",
},
AGENT: {
CONNECTED: "agent:connected",
DISCONNECTED: "agent:disconnected",
ERROR: "agent:error",
},
};
Memory Management
// Prevent memory leaks with listener limits
internalMessageBus.setMaxListeners(50);
// Clean up event listeners
class EventManager {
private handlers = new Map<string, Function>();
on(event: string, handler: Function) {
this.handlers.set(event, handler);
internalMessageBus.on(event, handler);
}
removeAll() {
this.handlers.forEach((handler, event) => {
internalMessageBus.off(event, handler);
});
this.handlers.clear();
}
}
Error Boundaries
private async handleIncomingMessage(message: MessageServiceMessage) {
try {
await this.processMessage(message);
} catch (error) {
logger.error('Message processing failed:', error);
// Emit error event for monitoring
internalMessageBus.emit('message:error', {
messageId: message.id,
error: error.message,
agentId: this.runtime.agentId
});
}
}
Deployment Architectures
The messaging system supports multiple client deployment patterns:
Web Clients
- Connect through CORS proxy
- Use Socket.IO for real-time updates
- Handle authentication via API keys
Native Applications
- Direct connection to server
- Full Socket.IO capabilities
- Can implement custom protocols
Server-Side Clients
- Direct API access
- Can bypass Socket.IO if needed
- Suitable for batch processing
The message bus provides a flexible, event-driven foundation for communication in elizaOS, enabling scalable and maintainable agent interactions across different deployment scenarios.