elizaOS

Message Bus Architecture

Event-driven communication system for inter-component messaging in ElizaOS

Message Bus Architecture

The ElizaOS message bus provides a centralized event-driven communication system that enables loose coupling between components. It facilitates message distribution from the server to agents and handles various system events throughout the application lifecycle.

Architecture Overview

The message bus follows a publish-subscribe pattern where:

  • Publishers: Components that emit events (server, agents, plugins)
  • Subscribers: Components that listen for events (agents, services, plugins)
  • Events: Structured messages with type and payload information

Core Architecture Principles

elizaOS uses a centralized message bus architecture with the following key characteristics:

  • Central Bus: All messages flow through a central bus (channel ID: 00000000-0000-0000-0000-000000000000)
  • Organizational DM Channels: Direct message channels for conversation persistence
  • Single Socket.IO Connection: Clients maintain one connection with channel-based message filtering
  • Centralized Database: UUID-based identifiers for all entities

Hierarchical Entity Model

The system follows a hierarchical structure:

World (Server/Platform)
├── Rooms (Channels/Conversations)
│   ├── Participants
│   └── Messages
└── Entities (Users/Agents)
    └── Memories (Agent-specific message records)

Internal Message Bus

The core message bus is implemented as a simple in-memory EventEmitter:

// packages/server/src/bus.ts
import EventEmitter from "events";

class InternalMessageBus extends EventEmitter {}

const internalMessageBus = new InternalMessageBus();

// Increased listener limit for multiple agents
internalMessageBus.setMaxListeners(50);

export default internalMessageBus;

Message Bus Service

The MessageBusService connects individual agents to the central message bus:

import { MessageBusService } from "@elizaos/server";

// Service is automatically registered with agents
export const messageBusConnectorPlugin = {
  name: "internal-message-bus-connector",
  description: "Internal service to connect agent to the message bus",
  services: [MessageBusService],
};

Event Types

The message bus handles several core event types:

Message Events

// New message from server to agents
internalMessageBus.emit("new_message", {
  id: "msg-123",
  channel_id: "channel-456",
  server_id: "server-789",
  author_id: "user-abc",
  content: "Hello, world!",
  created_at: Date.now(),
  metadata: { source: "web_client" },
});

Server Events

// Agent server association updates
internalMessageBus.emit("server_agent_update", {
  type: "agent_added_to_server",
  agentId: "agent-123",
  serverId: "server-456",
});

internalMessageBus.emit("server_agent_update", {
  type: "agent_removed_from_server",
  agentId: "agent-123",
  serverId: "server-456",
});

Message Management Events

// Message deletion
internalMessageBus.emit("message_deleted", {
  messageId: "msg-123",
  channelId: "channel-456",
});

// Channel clearing
internalMessageBus.emit("channel_cleared", {
  channelId: "channel-456",
  messageCount: 25,
});

Message Flow

Complete Message Flow

The message flow follows this sequence:

  1. Client sends message via REST API/api/messaging/submit
  2. API validates and stores message → Database persistence
  3. Message Bus emits eventnew_message event
  4. MessageBusService processes for each agent → Agent-specific handling
  5. Agent validates channel participation → Access control
  6. Agent processes message, generates response → AI processing
  7. Response sent back to central bus → Via REST API
  8. Socket.IO broadcasts to connected clients → Real-time updates

1. Message Submission

When a message is created through the server:

// Server creates message and publishes to bus
const createdMessage = await server.createMessage({
  channelId: "channel-123",
  authorId: "user-456",
  content: "Hello, agent!",
  sourceType: "web_client",
});

// Transform to bus format
const messageForBus = {
  id: createdMessage.id,
  channel_id: createdMessage.channelId,
  server_id: channel.messageServerId,
  author_id: createdMessage.authorId,
  content: createdMessage.content,
  created_at: createdMessage.createdAt.getTime(),
  metadata: createdMessage.metadata,
};

internalMessageBus.emit("new_message", messageForBus);

2. Agent Processing

Agents receive messages through the MessageBusService:

export class MessageBusService extends Service {
  private async handleIncomingMessage(message: MessageServiceMessage) {
    // Validate server subscription
    if (!this.subscribedServers.has(message.server_id)) {
      return;
    }

    // Check channel participation
    const participants = await this.getChannelParticipants(message.channel_id);
    if (!participants.includes(this.runtime.agentId)) {
      return;
    }

    // Process message and emit to agent runtime
    await this.runtime.emitEvent(EventType.MESSAGE_RECEIVED, {
      runtime: this.runtime,
      message: agentMemory,
      callback: this.handleAgentResponse.bind(this),
    });
  }
}

3. Agent Response

When agents respond, they send back through the bus:

private async sendAgentResponseToBus(content: Content) {
  const payload = {
    channel_id: channelId,
    server_id: serverId,
    author_id: this.runtime.agentId,
    content: content.text,
    source_type: 'agent_response',
    metadata: {
      agent_id: this.runtime.agentId,
      agentName: this.runtime.character.name
    }
  };

  // Send to central server API
  await fetch('/api/messaging/submit', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(payload)
  });
}

Service Registration

The MessageBusService is automatically registered with agents:

// Auto-registration in AgentServer
export class AgentServer {
  async registerAgent(runtime: IAgentRuntime) {
    // Register the MessageBusConnector plugin
    if (messageBusConnectorPlugin) {
      await runtime.registerPlugin(messageBusConnectorPlugin);
    }

    // Service starts automatically
    // - Connects to message bus
    // - Fetches subscribed servers
    // - Validates channel access
  }
}

Channel Management

Server Subscription

Agents maintain subscriptions to specific servers:

private async fetchAgentServers() {
  const response = await fetch(
    `/api/messaging/agents/${this.runtime.agentId}/servers`
  );

  const data = await response.json();
  this.subscribedServers = new Set(data.servers);
}

Channel Validation

Before processing messages, agents validate channel access:

private async getChannelParticipants(channelId: UUID): Promise<string[]> {
  const response = await fetch(
    `/api/messaging/central-channels/${channelId}/participants`
  );

  const data = await response.json();
  return data.success ? data.data : [];
}

Error Handling

Connection Resilience

export class MessageBusService extends Service {
  async stop(): Promise<void> {
    // Clean up event listeners
    internalMessageBus.off("new_message", this.boundHandleIncomingMessage);
    internalMessageBus.off("server_agent_update", this.boundHandleServerAgentUpdate);
    internalMessageBus.off("message_deleted", this.boundHandleMessageDeleted);
    internalMessageBus.off("channel_cleared", this.boundHandleChannelCleared);
  }
}

Message Validation

private async validateMessage(message: MessageServiceMessage): Promise<boolean> {
  // Server subscription check
  if (!this.subscribedServers.has(message.server_id)) {
    return false;
  }

  // Self-message check
  if (message.author_id === this.runtime.agentId) {
    return false;
  }

  // Channel participation check
  const participants = await this.getChannelParticipants(message.channel_id);
  return participants.includes(this.runtime.agentId);
}

Critical Implementation Details

Channel Participation

Agents must be explicitly added to channels to receive messages:

// Agents validate channel participation before processing
const participants = await this.getChannelParticipants(message.channel_id);
if (!participants.includes(this.runtime.agentId)) {
  return; // Agent not in channel, skip message
}

Central Channel Importance

The central channel (00000000-0000-0000-0000-000000000000) is crucial for:

  • Agent-to-agent communication
  • System-wide broadcasts
  • Coordination between multiple agents

Message Transformation

Messages undergo transformation between different formats:

  • Client Format: User-friendly structure
  • Bus Format: Internal event structure
  • Agent Format: Runtime-specific memory format

Multi-Process Considerations

The current implementation is designed for single-process deployments:

/**
 * A simple in-memory message bus for distributing messages from the server
 * to subscribed MessageBusService instances within the same process.
 *
 * For multi-process or multi-server deployments, this would need to be replaced
 * with a more robust solution like Redis Pub/Sub, Kafka, RabbitMQ, etc.
 */

Scaling Options

For production deployments requiring multi-process support:

Redis Pub/Sub

import Redis from "ioredis";

class RedisMessageBus {
  private pub: Redis;
  private sub: Redis;

  constructor() {
    this.pub = new Redis(process.env.REDIS_URL);
    this.sub = new Redis(process.env.REDIS_URL);
  }

  emit(event: string, data: any) {
    this.pub.publish(`eliza:${event}`, JSON.stringify(data));
  }

  on(event: string, handler: Function) {
    this.sub.subscribe(`eliza:${event}`);
    this.sub.on("message", (channel, message) => {
      if (channel === `eliza:${event}`) {
        handler(JSON.parse(message));
      }
    });
  }
}

Apache Kafka

import { Kafka } from "kafkajs";

class KafkaMessageBus {
  private kafka: Kafka;
  private producer: any;
  private consumer: any;

  constructor() {
    this.kafka = new Kafka({
      clientId: "eliza-message-bus",
      brokers: [process.env.KAFKA_BROKER],
    });
  }

  async emit(event: string, data: any) {
    await this.producer.send({
      topic: "eliza-events",
      messages: [
        {
          key: event,
          value: JSON.stringify(data),
        },
      ],
    });
  }

  async on(event: string, handler: Function) {
    await this.consumer.subscribe({ topic: "eliza-events" });
    await this.consumer.run({
      eachMessage: async ({ message }) => {
        if (message.key?.toString() === event) {
          handler(JSON.parse(message.value?.toString() || "{}"));
        }
      },
    });
  }
}

Performance Monitoring

Event Metrics

class MessageBusMetrics {
  private eventCounts = new Map<string, number>();
  private eventLatencies = new Map<string, number[]>();

  trackEvent(eventType: string, startTime: number) {
    // Count events
    this.eventCounts.set(eventType, (this.eventCounts.get(eventType) || 0) + 1);

    // Track latency
    const latency = Date.now() - startTime;
    const latencies = this.eventLatencies.get(eventType) || [];
    latencies.push(latency);
    this.eventLatencies.set(eventType, latencies.slice(-100)); // Keep last 100
  }

  getMetrics() {
    return {
      eventCounts: Object.fromEntries(this.eventCounts),
      averageLatencies: Object.fromEntries(
        Array.from(this.eventLatencies.entries()).map(([event, latencies]) => [
          event,
          latencies.reduce((a, b) => a + b, 0) / latencies.length,
        ])
      ),
    };
  }
}

Best Practices

Event Naming

// Use descriptive, hierarchical event names
const EVENT_TYPES = {
  MESSAGE: {
    RECEIVED: "message:received",
    SENT: "message:sent",
    DELETED: "message:deleted",
  },
  AGENT: {
    CONNECTED: "agent:connected",
    DISCONNECTED: "agent:disconnected",
    ERROR: "agent:error",
  },
};

Memory Management

// Prevent memory leaks with listener limits
internalMessageBus.setMaxListeners(50);

// Clean up event listeners
class EventManager {
  private handlers = new Map<string, Function>();

  on(event: string, handler: Function) {
    this.handlers.set(event, handler);
    internalMessageBus.on(event, handler);
  }

  removeAll() {
    this.handlers.forEach((handler, event) => {
      internalMessageBus.off(event, handler);
    });
    this.handlers.clear();
  }
}

Error Boundaries

private async handleIncomingMessage(message: MessageServiceMessage) {
  try {
    await this.processMessage(message);
  } catch (error) {
    logger.error('Message processing failed:', error);

    // Emit error event for monitoring
    internalMessageBus.emit('message:error', {
      messageId: message.id,
      error: error.message,
      agentId: this.runtime.agentId
    });
  }
}

Deployment Architectures

The messaging system supports multiple client deployment patterns:

Web Clients

  • Connect through CORS proxy
  • Use Socket.IO for real-time updates
  • Handle authentication via API keys

Native Applications

  • Direct connection to server
  • Full Socket.IO capabilities
  • Can implement custom protocols

Server-Side Clients

  • Direct API access
  • Can bypass Socket.IO if needed
  • Suitable for batch processing

The message bus provides a flexible, event-driven foundation for communication in elizaOS, enabling scalable and maintainable agent interactions across different deployment scenarios.