WebSocket Communication
Real-time communication with Socket.IO in ElizaOS
WebSocket Communication
ElizaOS provides real-time communication capabilities through WebSocket connections, enabling
instant message delivery, channel management, and live system monitoring. The WebSocket
implementation is built on Socket.IO and integrated with the central message bus through the
SocketIORouter
class.
Architecture
SocketIORouter Class
The SocketIORouter
class manages WebSocket connections and routes messages between clients and the
central message bus:
export class SocketIORouter {
private agents: Map<UUID, IAgentRuntime>;
private connections: Map<string, UUID>; // socket.id -> agentId
private logStreamConnections: Map<string, { agentName?: string; level?: string }>;
private serverInstance: AgentServer;
}
WebSocket Server Setup
The WebSocket server is automatically initialized when the AgentServer starts:
import { AgentServer } from "@elizaos/server";
const server = new AgentServer();
await server.initialize();
// WebSocket server is automatically created and configured
// Available at ws://localhost:3000 (same port as HTTP server)
await server.start(3000);
The server configuration includes:
const io = new SocketIOServer(server, {
cors: {
origin: "*",
methods: ["GET", "POST"],
},
});
Connection Events
Establishing Connection
// Client-side connection
import { io } from "socket.io-client";
const socket = io("http://localhost:3000");
socket.on("connect", () => {
console.log("Connected to elizaOS WebSocket server");
console.log("Socket ID:", socket.id);
});
socket.on("connection_established", (data) => {
console.log("Server confirmed connection:", data);
// Output: { message: 'Connected to Eliza Socket.IO server', socketId: 'abc123' }
});
Disconnection Handling
socket.on("disconnect", (reason) => {
console.log("Disconnected:", reason);
// Handle different disconnect reasons
if (reason === "io server disconnect") {
// Server initiated disconnect, reconnect manually
socket.connect();
}
// Client-side disconnect or transport error will auto-reconnect
});
Message Types
The WebSocket server supports several message types defined in SOCKET_MESSAGE_TYPE
:
enum SOCKET_MESSAGE_TYPE {
ROOM_JOINING = "room_joining",
SEND_MESSAGE = "send_message",
}
Event Handling
The SocketIORouter handles these core events:
connection
- New client connectionroom_joining
/ROOM_JOINING
- Channel joining requestssend_message
/SEND_MESSAGE
- Message submissionmessage
- Generic message handlingsubscribe_logs
- Log streaming subscriptionunsubscribe_logs
- Log streaming unsubscriptionupdate_log_filters
- Log filter updatesdisconnect
- Client disconnectionerror
- Socket errors
Channel Management
Joining Channels
// Join a channel/room
socket.emit(SOCKET_MESSAGE_TYPE.ROOM_JOINING, {
channelId: "550e8400-e29b-41d4-a716-446655440000",
agentId: "550e8400-e29b-41d4-a716-446655440001", // Optional
entityId: "550e8400-e29b-41d4-a716-446655440002",
serverId: "00000000-0000-0000-0000-000000000000",
metadata: {
isDm: false,
channelType: "GROUP",
},
});
// Alternative: Use roomId for backward compatibility
socket.emit(SOCKET_MESSAGE_TYPE.ROOM_JOINING, {
roomId: "550e8400-e29b-41d4-a716-446655440000", // Same as channelId
entityId: "550e8400-e29b-41d4-a716-446655440002",
serverId: "00000000-0000-0000-0000-000000000000",
});
// Listen for join confirmation
socket.on("channel_joined", (data) => {
console.log("Successfully joined channel:", data);
// {
// message: 'Socket abc123 successfully joined channel 550e8400-e29b-41d4-a716-446655440000',
// channelId: '550e8400-e29b-41d4-a716-446655440000',
// roomId: '550e8400-e29b-41d4-a716-446655440000',
// agentId: '550e8400-e29b-41d4-a716-446655440001'
// }
});
// Backward compatibility event
socket.on("room_joined", (data) => {
console.log("Room joined (legacy):", data);
});
ENTITY_JOINED Event
When joining a channel with entityId
, the server emits an ENTITY_JOINED
event for world/entity
creation:
// Server-side event emission (automatic)
runtime.emitEvent(EventType.ENTITY_JOINED, {
entityId: entityId,
runtime,
worldId: serverId,
roomId: channelId,
metadata: {
type: isDm ? ChannelType.DM : ChannelType.GROUP,
isDm,
...metadata,
},
source: "socketio",
});
Channel Types
enum ChannelType {
DM = "DM",
GROUP = "GROUP",
VOICE_DM = "VOICE_DM",
FEED = "FEED",
}
Real-time Messaging
Sending Messages
// Send a message to a channel
socket.emit(SOCKET_MESSAGE_TYPE.SEND_MESSAGE, {
channelId: "550e8400-e29b-41d4-a716-446655440000",
serverId: "00000000-0000-0000-0000-000000000000",
senderId: "550e8400-e29b-41d4-a716-446655440001",
senderName: "John Doe",
message: "Hello, how can you help me?",
messageId: "client-msg-123", // Client-generated ID for tracking
source: "web_client",
metadata: {
isDm: false,
channelType: "GROUP",
targetUserId: "550e8400-e29b-41d4-a716-446655440002", // For DM channels
},
attachments: [
{
url: "https://example.com/image.jpg",
contentType: "image/jpeg",
title: "Screenshot",
},
],
});
// Alternative: Use roomId for backward compatibility
socket.emit(SOCKET_MESSAGE_TYPE.SEND_MESSAGE, {
roomId: "550e8400-e29b-41d4-a716-446655440000",
serverId: "00000000-0000-0000-0000-000000000000",
senderId: "550e8400-e29b-41d4-a716-446655440001",
senderName: "John Doe",
message: "Hello, how can you help me?",
});
Auto-Channel Creation
If a channel doesn't exist, the server automatically creates it:
// For DM channels
socket.emit(SOCKET_MESSAGE_TYPE.SEND_MESSAGE, {
channelId: "new-dm-channel-id",
serverId: "00000000-0000-0000-0000-000000000000",
senderId: "user-id-1",
message: "Hello in new DM!",
metadata: {
isDm: true,
channelType: "DM",
targetUserId: "user-id-2", // Second participant
},
});
The server will:
- Check if channel exists
- Create channel if it doesn't exist
- Set up participants (for DM channels)
- Store the message in the central database
- Emit to internal message bus for agent processing
- Broadcast to all channel participants
Message Acknowledgment
// Listen for message acknowledgment
socket.on("messageAck", (data) => {
console.log("Message acknowledged:", data);
// {
// clientMessageId: 'client-msg-123',
// messageId: '550e8400-e29b-41d4-a716-446655440002',
// status: 'received_by_server_and_processing',
// channelId: '550e8400-e29b-41d4-a716-446655440000'
// }
});
Receiving Messages
// Listen for incoming messages
socket.on("messageBroadcast", (message) => {
console.log("New message received:", message);
// {
// id: '550e8400-e29b-41d4-a716-446655440002',
// senderId: '550e8400-e29b-41d4-a716-446655440001',
// senderName: 'Agent Alice',
// text: 'Hello! I can help you with various tasks.',
// channelId: '550e8400-e29b-41d4-a716-446655440000',
// roomId: '550e8400-e29b-41d4-a716-446655440000', // Backward compatibility
// serverId: '00000000-0000-0000-0000-000000000000',
// createdAt: 1640995200000,
// source: 'agent_response',
// attachments: [],
// thought: 'The user is asking for help', // Agent's internal thought
// actions: ['respond'], // Agent's actions
// clientMessageId: 'client-msg-123' // Original client message ID (if applicable)
// }
});
// Listen for message completion
socket.on("messageComplete", (data) => {
console.log("Message processing complete:", data);
// {
// channelId: '550e8400-e29b-41d4-a716-446655440000',
// serverId: '00000000-0000-0000-0000-000000000000'
// }
});
Generic Message Handling
For custom message types, use the generic message
event:
// Send generic message
socket.emit("message", {
type: SOCKET_MESSAGE_TYPE.SEND_MESSAGE,
payload: {
channelId: "550e8400-e29b-41d4-a716-446655440000",
message: "Hello from generic handler",
},
});
// The server will route this to the appropriate handler
Error Handling
Message Errors
// Listen for message errors
socket.on("messageError", (error) => {
console.error("Message error:", error);
// { error: 'channelId is required for joining.' }
});
Socket Errors
socket.on("error", (error) => {
console.error("Socket error:", error);
});
socket.on("connect_error", (error) => {
console.error("Connection error:", error);
});
Log Streaming
The WebSocket server provides real-time log streaming capabilities:
Subscribe to Logs
// Subscribe to log stream
socket.emit("subscribe_logs");
socket.on("log_subscription_confirmed", (data) => {
console.log("Log subscription:", data);
// { subscribed: true, message: 'Successfully subscribed to log stream' }
});
Receive Log Entries
// Listen for log entries
socket.on("log_stream", (logData) => {
console.log("Log entry:", logData);
// {
// type: 'log_entry',
// payload: {
// level: 30,
// time: 1640995200000,
// msg: 'Agent registered successfully',
// agentName: 'Alice'
// }
// }
});
Filter Logs
// Update log filters
socket.emit("update_log_filters", {
agentName: "Alice",
level: "info",
});
socket.on("log_filters_updated", (response) => {
console.log("Log filters updated:", response);
// { success: true, filters: { agentName: 'Alice', level: 'info' } }
});
Unsubscribe from Logs
// Unsubscribe from log stream
socket.emit("unsubscribe_logs");
socket.on("log_subscription_confirmed", (data) => {
console.log("Log unsubscription:", data);
// { subscribed: false, message: 'Successfully unsubscribed from log stream' }
});
Auto-Channel Creation
The WebSocket server automatically creates channels when messages are sent to non-existent channels:
// Send message to non-existent channel
socket.emit(SOCKET_MESSAGE_TYPE.SEND_MESSAGE, {
channelId: "new-channel-id",
serverId: "00000000-0000-0000-0000-000000000000",
senderId: "user-id",
message: "Hello in new channel",
metadata: {
isDm: true,
targetUserId: "another-user-id",
},
});
// Server will:
// 1. Create the channel automatically
// 2. Set up participants for DM channels
// 3. Process the message
// 4. Broadcast to all participants
Event Integration
The WebSocket server integrates with the elizaOS event system:
// Events emitted by WebSocket actions:
// - ENTITY_JOINED when users join channels
// - MESSAGE_RECEIVED when messages are processed
// - Various agent lifecycle events
// These events are handled by the bootstrap plugin
// and other registered event handlers
Best Practices
Connection Management
class WebSocketManager {
constructor(serverUrl) {
this.socket = io(serverUrl);
this.setupEventHandlers();
}
setupEventHandlers() {
this.socket.on("connect", () => {
console.log("Connected to elizaOS server");
this.rejoinChannels();
});
this.socket.on("disconnect", (reason) => {
console.log("Disconnected:", reason);
if (reason === "io server disconnect") {
this.socket.connect();
}
});
}
rejoinChannels() {
// Rejoin all channels after reconnection
this.joinedChannels.forEach((channel) => {
this.joinChannel(channel);
});
}
}
Message Queue
class MessageQueue {
constructor(socket) {
this.socket = socket;
this.pendingMessages = new Map();
this.messageTimeout = 30000; // 30 seconds
this.socket.on("messageAck", (ack) => {
this.handleAcknowledgment(ack);
});
}
sendMessage(message) {
const messageId = this.generateMessageId();
message.messageId = messageId;
// Store message for timeout handling
this.pendingMessages.set(messageId, {
message,
timestamp: Date.now(),
timeout: setTimeout(() => {
this.handleTimeout(messageId);
}, this.messageTimeout),
});
this.socket.emit(SOCKET_MESSAGE_TYPE.SEND_MESSAGE, message);
}
handleAcknowledgment(ack) {
const pending = this.pendingMessages.get(ack.clientMessageId);
if (pending) {
clearTimeout(pending.timeout);
this.pendingMessages.delete(ack.clientMessageId);
}
}
handleTimeout(messageId) {
const pending = this.pendingMessages.get(messageId);
if (pending) {
console.warn("Message timeout:", messageId);
this.pendingMessages.delete(messageId);
// Implement retry logic if needed
}
}
}
Typing Indicators
class TypingIndicator {
constructor(socket) {
this.socket = socket;
this.typingTimeout = null;
this.typingDelay = 3000; // 3 seconds
}
startTyping(channelId) {
this.socket.emit("typing_start", { channelId });
// Auto-stop typing after delay
clearTimeout(this.typingTimeout);
this.typingTimeout = setTimeout(() => {
this.stopTyping(channelId);
}, this.typingDelay);
}
stopTyping(channelId) {
this.socket.emit("typing_stop", { channelId });
clearTimeout(this.typingTimeout);
}
}
Security Considerations
Connection Validation
// Server-side connection validation
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (!validateToken(token)) {
return next(new Error("Authentication failed"));
}
next();
});
Rate Limiting
// Implement client-side rate limiting
class RateLimiter {
constructor(maxRequests = 10, windowMs = 60000) {
this.requests = [];
this.maxRequests = maxRequests;
this.windowMs = windowMs;
}
canMakeRequest() {
const now = Date.now();
this.requests = this.requests.filter((time) => now - time < this.windowMs);
if (this.requests.length >= this.maxRequests) {
return false;
}
this.requests.push(now);
return true;
}
}
The WebSocket implementation in elizaOS provides a robust foundation for real-time communication, with automatic channel management, message queuing, and comprehensive error handling.