elizaOS

WebSocket Communication

Real-time communication with Socket.IO in ElizaOS

WebSocket Communication

ElizaOS provides real-time communication capabilities through WebSocket connections, enabling instant message delivery, channel management, and live system monitoring. The WebSocket implementation is built on Socket.IO and integrated with the central message bus through the SocketIORouter class.

Architecture

SocketIORouter Class

The SocketIORouter class manages WebSocket connections and routes messages between clients and the central message bus:

export class SocketIORouter {
  private agents: Map<UUID, IAgentRuntime>;
  private connections: Map<string, UUID>; // socket.id -> agentId
  private logStreamConnections: Map<string, { agentName?: string; level?: string }>;
  private serverInstance: AgentServer;
}

WebSocket Server Setup

The WebSocket server is automatically initialized when the AgentServer starts:

import { AgentServer } from "@elizaos/server";

const server = new AgentServer();
await server.initialize();

// WebSocket server is automatically created and configured
// Available at ws://localhost:3000 (same port as HTTP server)
await server.start(3000);

The server configuration includes:

const io = new SocketIOServer(server, {
  cors: {
    origin: "*",
    methods: ["GET", "POST"],
  },
});

Connection Events

Establishing Connection

// Client-side connection
import { io } from "socket.io-client";

const socket = io("http://localhost:3000");

socket.on("connect", () => {
  console.log("Connected to elizaOS WebSocket server");
  console.log("Socket ID:", socket.id);
});

socket.on("connection_established", (data) => {
  console.log("Server confirmed connection:", data);
  // Output: { message: 'Connected to Eliza Socket.IO server', socketId: 'abc123' }
});

Disconnection Handling

socket.on("disconnect", (reason) => {
  console.log("Disconnected:", reason);

  // Handle different disconnect reasons
  if (reason === "io server disconnect") {
    // Server initiated disconnect, reconnect manually
    socket.connect();
  }
  // Client-side disconnect or transport error will auto-reconnect
});

Message Types

The WebSocket server supports several message types defined in SOCKET_MESSAGE_TYPE:

enum SOCKET_MESSAGE_TYPE {
  ROOM_JOINING = "room_joining",
  SEND_MESSAGE = "send_message",
}

Event Handling

The SocketIORouter handles these core events:

  • connection - New client connection
  • room_joining / ROOM_JOINING - Channel joining requests
  • send_message / SEND_MESSAGE - Message submission
  • message - Generic message handling
  • subscribe_logs - Log streaming subscription
  • unsubscribe_logs - Log streaming unsubscription
  • update_log_filters - Log filter updates
  • disconnect - Client disconnection
  • error - Socket errors

Channel Management

Joining Channels

// Join a channel/room
socket.emit(SOCKET_MESSAGE_TYPE.ROOM_JOINING, {
  channelId: "550e8400-e29b-41d4-a716-446655440000",
  agentId: "550e8400-e29b-41d4-a716-446655440001", // Optional
  entityId: "550e8400-e29b-41d4-a716-446655440002",
  serverId: "00000000-0000-0000-0000-000000000000",
  metadata: {
    isDm: false,
    channelType: "GROUP",
  },
});

// Alternative: Use roomId for backward compatibility
socket.emit(SOCKET_MESSAGE_TYPE.ROOM_JOINING, {
  roomId: "550e8400-e29b-41d4-a716-446655440000", // Same as channelId
  entityId: "550e8400-e29b-41d4-a716-446655440002",
  serverId: "00000000-0000-0000-0000-000000000000",
});

// Listen for join confirmation
socket.on("channel_joined", (data) => {
  console.log("Successfully joined channel:", data);
  // {
  //   message: 'Socket abc123 successfully joined channel 550e8400-e29b-41d4-a716-446655440000',
  //   channelId: '550e8400-e29b-41d4-a716-446655440000',
  //   roomId: '550e8400-e29b-41d4-a716-446655440000',
  //   agentId: '550e8400-e29b-41d4-a716-446655440001'
  // }
});

// Backward compatibility event
socket.on("room_joined", (data) => {
  console.log("Room joined (legacy):", data);
});

ENTITY_JOINED Event

When joining a channel with entityId, the server emits an ENTITY_JOINED event for world/entity creation:

// Server-side event emission (automatic)
runtime.emitEvent(EventType.ENTITY_JOINED, {
  entityId: entityId,
  runtime,
  worldId: serverId,
  roomId: channelId,
  metadata: {
    type: isDm ? ChannelType.DM : ChannelType.GROUP,
    isDm,
    ...metadata,
  },
  source: "socketio",
});

Channel Types

enum ChannelType {
  DM = "DM",
  GROUP = "GROUP",
  VOICE_DM = "VOICE_DM",
  FEED = "FEED",
}

Real-time Messaging

Sending Messages

// Send a message to a channel
socket.emit(SOCKET_MESSAGE_TYPE.SEND_MESSAGE, {
  channelId: "550e8400-e29b-41d4-a716-446655440000",
  serverId: "00000000-0000-0000-0000-000000000000",
  senderId: "550e8400-e29b-41d4-a716-446655440001",
  senderName: "John Doe",
  message: "Hello, how can you help me?",
  messageId: "client-msg-123", // Client-generated ID for tracking
  source: "web_client",
  metadata: {
    isDm: false,
    channelType: "GROUP",
    targetUserId: "550e8400-e29b-41d4-a716-446655440002", // For DM channels
  },
  attachments: [
    {
      url: "https://example.com/image.jpg",
      contentType: "image/jpeg",
      title: "Screenshot",
    },
  ],
});

// Alternative: Use roomId for backward compatibility
socket.emit(SOCKET_MESSAGE_TYPE.SEND_MESSAGE, {
  roomId: "550e8400-e29b-41d4-a716-446655440000",
  serverId: "00000000-0000-0000-0000-000000000000",
  senderId: "550e8400-e29b-41d4-a716-446655440001",
  senderName: "John Doe",
  message: "Hello, how can you help me?",
});

Auto-Channel Creation

If a channel doesn't exist, the server automatically creates it:

// For DM channels
socket.emit(SOCKET_MESSAGE_TYPE.SEND_MESSAGE, {
  channelId: "new-dm-channel-id",
  serverId: "00000000-0000-0000-0000-000000000000",
  senderId: "user-id-1",
  message: "Hello in new DM!",
  metadata: {
    isDm: true,
    channelType: "DM",
    targetUserId: "user-id-2", // Second participant
  },
});

The server will:

  1. Check if channel exists
  2. Create channel if it doesn't exist
  3. Set up participants (for DM channels)
  4. Store the message in the central database
  5. Emit to internal message bus for agent processing
  6. Broadcast to all channel participants

Message Acknowledgment

// Listen for message acknowledgment
socket.on("messageAck", (data) => {
  console.log("Message acknowledged:", data);
  // {
  //   clientMessageId: 'client-msg-123',
  //   messageId: '550e8400-e29b-41d4-a716-446655440002',
  //   status: 'received_by_server_and_processing',
  //   channelId: '550e8400-e29b-41d4-a716-446655440000'
  // }
});

Receiving Messages

// Listen for incoming messages
socket.on("messageBroadcast", (message) => {
  console.log("New message received:", message);
  // {
  //   id: '550e8400-e29b-41d4-a716-446655440002',
  //   senderId: '550e8400-e29b-41d4-a716-446655440001',
  //   senderName: 'Agent Alice',
  //   text: 'Hello! I can help you with various tasks.',
  //   channelId: '550e8400-e29b-41d4-a716-446655440000',
  //   roomId: '550e8400-e29b-41d4-a716-446655440000', // Backward compatibility
  //   serverId: '00000000-0000-0000-0000-000000000000',
  //   createdAt: 1640995200000,
  //   source: 'agent_response',
  //   attachments: [],
  //   thought: 'The user is asking for help',  // Agent's internal thought
  //   actions: ['respond'],  // Agent's actions
  //   clientMessageId: 'client-msg-123'  // Original client message ID (if applicable)
  // }
});

// Listen for message completion
socket.on("messageComplete", (data) => {
  console.log("Message processing complete:", data);
  // {
  //   channelId: '550e8400-e29b-41d4-a716-446655440000',
  //   serverId: '00000000-0000-0000-0000-000000000000'
  // }
});

Generic Message Handling

For custom message types, use the generic message event:

// Send generic message
socket.emit("message", {
  type: SOCKET_MESSAGE_TYPE.SEND_MESSAGE,
  payload: {
    channelId: "550e8400-e29b-41d4-a716-446655440000",
    message: "Hello from generic handler",
  },
});

// The server will route this to the appropriate handler

Error Handling

Message Errors

// Listen for message errors
socket.on("messageError", (error) => {
  console.error("Message error:", error);
  // { error: 'channelId is required for joining.' }
});

Socket Errors

socket.on("error", (error) => {
  console.error("Socket error:", error);
});

socket.on("connect_error", (error) => {
  console.error("Connection error:", error);
});

Log Streaming

The WebSocket server provides real-time log streaming capabilities:

Subscribe to Logs

// Subscribe to log stream
socket.emit("subscribe_logs");

socket.on("log_subscription_confirmed", (data) => {
  console.log("Log subscription:", data);
  // { subscribed: true, message: 'Successfully subscribed to log stream' }
});

Receive Log Entries

// Listen for log entries
socket.on("log_stream", (logData) => {
  console.log("Log entry:", logData);
  // {
  //   type: 'log_entry',
  //   payload: {
  //     level: 30,
  //     time: 1640995200000,
  //     msg: 'Agent registered successfully',
  //     agentName: 'Alice'
  //   }
  // }
});

Filter Logs

// Update log filters
socket.emit("update_log_filters", {
  agentName: "Alice",
  level: "info",
});

socket.on("log_filters_updated", (response) => {
  console.log("Log filters updated:", response);
  // { success: true, filters: { agentName: 'Alice', level: 'info' } }
});

Unsubscribe from Logs

// Unsubscribe from log stream
socket.emit("unsubscribe_logs");

socket.on("log_subscription_confirmed", (data) => {
  console.log("Log unsubscription:", data);
  // { subscribed: false, message: 'Successfully unsubscribed from log stream' }
});

Auto-Channel Creation

The WebSocket server automatically creates channels when messages are sent to non-existent channels:

// Send message to non-existent channel
socket.emit(SOCKET_MESSAGE_TYPE.SEND_MESSAGE, {
  channelId: "new-channel-id",
  serverId: "00000000-0000-0000-0000-000000000000",
  senderId: "user-id",
  message: "Hello in new channel",
  metadata: {
    isDm: true,
    targetUserId: "another-user-id",
  },
});

// Server will:
// 1. Create the channel automatically
// 2. Set up participants for DM channels
// 3. Process the message
// 4. Broadcast to all participants

Event Integration

The WebSocket server integrates with the elizaOS event system:

// Events emitted by WebSocket actions:
// - ENTITY_JOINED when users join channels
// - MESSAGE_RECEIVED when messages are processed
// - Various agent lifecycle events

// These events are handled by the bootstrap plugin
// and other registered event handlers

Best Practices

Connection Management

class WebSocketManager {
  constructor(serverUrl) {
    this.socket = io(serverUrl);
    this.setupEventHandlers();
  }

  setupEventHandlers() {
    this.socket.on("connect", () => {
      console.log("Connected to elizaOS server");
      this.rejoinChannels();
    });

    this.socket.on("disconnect", (reason) => {
      console.log("Disconnected:", reason);
      if (reason === "io server disconnect") {
        this.socket.connect();
      }
    });
  }

  rejoinChannels() {
    // Rejoin all channels after reconnection
    this.joinedChannels.forEach((channel) => {
      this.joinChannel(channel);
    });
  }
}

Message Queue

class MessageQueue {
  constructor(socket) {
    this.socket = socket;
    this.pendingMessages = new Map();
    this.messageTimeout = 30000; // 30 seconds

    this.socket.on("messageAck", (ack) => {
      this.handleAcknowledgment(ack);
    });
  }

  sendMessage(message) {
    const messageId = this.generateMessageId();
    message.messageId = messageId;

    // Store message for timeout handling
    this.pendingMessages.set(messageId, {
      message,
      timestamp: Date.now(),
      timeout: setTimeout(() => {
        this.handleTimeout(messageId);
      }, this.messageTimeout),
    });

    this.socket.emit(SOCKET_MESSAGE_TYPE.SEND_MESSAGE, message);
  }

  handleAcknowledgment(ack) {
    const pending = this.pendingMessages.get(ack.clientMessageId);
    if (pending) {
      clearTimeout(pending.timeout);
      this.pendingMessages.delete(ack.clientMessageId);
    }
  }

  handleTimeout(messageId) {
    const pending = this.pendingMessages.get(messageId);
    if (pending) {
      console.warn("Message timeout:", messageId);
      this.pendingMessages.delete(messageId);
      // Implement retry logic if needed
    }
  }
}

Typing Indicators

class TypingIndicator {
  constructor(socket) {
    this.socket = socket;
    this.typingTimeout = null;
    this.typingDelay = 3000; // 3 seconds
  }

  startTyping(channelId) {
    this.socket.emit("typing_start", { channelId });

    // Auto-stop typing after delay
    clearTimeout(this.typingTimeout);
    this.typingTimeout = setTimeout(() => {
      this.stopTyping(channelId);
    }, this.typingDelay);
  }

  stopTyping(channelId) {
    this.socket.emit("typing_stop", { channelId });
    clearTimeout(this.typingTimeout);
  }
}

Security Considerations

Connection Validation

// Server-side connection validation
io.use((socket, next) => {
  const token = socket.handshake.auth.token;

  if (!validateToken(token)) {
    return next(new Error("Authentication failed"));
  }

  next();
});

Rate Limiting

// Implement client-side rate limiting
class RateLimiter {
  constructor(maxRequests = 10, windowMs = 60000) {
    this.requests = [];
    this.maxRequests = maxRequests;
    this.windowMs = windowMs;
  }

  canMakeRequest() {
    const now = Date.now();
    this.requests = this.requests.filter((time) => now - time < this.windowMs);

    if (this.requests.length >= this.maxRequests) {
      return false;
    }

    this.requests.push(now);
    return true;
  }
}

The WebSocket implementation in elizaOS provides a robust foundation for real-time communication, with automatic channel management, message queuing, and comprehensive error handling.