WebSocket Events
Real-time communication using Socket.IO in ElizaOS
Overview
ElizaOS uses Socket.IO for real-time bidirectional communication between clients and the server. This enables instant messaging, live updates, and event streaming without the need for polling.
Connection Management
Establishing Connection
import { io } from "socket.io-client";
const socket = io("http://localhost:3000", {
autoConnect: true,
reconnection: true,
});
socket.on("connect", () => {
console.log("Connected to server");
});
Connection Events
// Connection established
socket.on("connection_established", (data) => {
console.log("Connected with socket ID:", data.socketId);
});
// Connection lost
socket.on("disconnect", (reason) => {
console.log("Disconnected:", reason);
});
// Reconnection attempts
socket.on("reconnect_attempt", (attemptNumber) => {
console.log("Reconnection attempt:", attemptNumber);
});
// Successfully reconnected
socket.on("reconnect", (attemptNumber) => {
console.log("Reconnected after", attemptNumber, "attempts");
});
// Connection error
socket.on("connect_error", (error) => {
console.error("Connection error:", error);
});
Message Types
ElizaOS uses typed message events for different operations:
enum SOCKET_MESSAGE_TYPE {
ROOM_JOINING = 0,
SEND_MESSAGE = 1,
}
Channel Management
Joining a Channel
// Join a channel to receive messages
socket.emit("message", {
type: 0, // ROOM_JOINING
payload: {
channelId: "channel-uuid",
entityId: "user-uuid",
serverId: "server-uuid",
metadata: {
isDm: false,
channelType: "group",
},
},
});
// Confirmation events
socket.on("channel_joined", (data) => {
console.log("Joined channel:", data.channelId);
});
// Legacy support
socket.on("room_joined", (data) => {
console.log("Joined room:", data.roomId);
});
Channel Events
// Channel cleared (all messages deleted)
socket.on("channelCleared", (data) => {
console.log("Channel cleared:", data.channelId);
});
// Channel deleted
socket.on("channelDeleted", (data) => {
console.log("Channel deleted:", data.channelId);
});
Messaging
Sending Messages
// Send a message to a channel
socket.emit("message", {
type: 1, // SEND_MESSAGE
payload: {
senderId: "user-uuid",
senderName: "User Name",
message: "Hello, world!",
channelId: "channel-uuid",
serverId: "server-uuid",
messageId: "client-generated-uuid",
source: "web_client",
attachments: [],
metadata: {
isDm: false,
},
},
});
Receiving Messages
// Message broadcast from any participant
socket.on("messageBroadcast", (data) => {
console.log("New message:", {
id: data.id,
senderId: data.senderId,
senderName: data.senderName,
text: data.text,
channelId: data.channelId,
createdAt: data.createdAt,
source: data.source,
attachments: data.attachments,
thought: data.thought, // Agent's internal thought
actions: data.actions, // Actions taken by agent
prompt: data.prompt, // LLM prompt used
});
});
// Message acknowledgment
socket.on("messageAck", (data) => {
console.log("Message acknowledged:", {
clientMessageId: data.clientMessageId,
messageId: data.messageId,
status: data.status,
channelId: data.channelId,
});
});
// Message error
socket.on("messageError", (data) => {
console.error("Message error:", data.error);
});
Message Deletion
// Message deleted event
socket.on("messageDeleted", (data) => {
console.log("Message deleted:", {
messageId: data.messageId,
channelId: data.channelId,
});
});
Control Messages
Control messages allow the server to control client UI behavior:
socket.on("controlMessage", (data) => {
switch (data.action) {
case "disable_input":
// Disable input while agent is processing
console.log("Disable input for channel:", data.channelId);
break;
case "enable_input":
// Re-enable input after processing
console.log("Enable input for channel:", data.channelId);
break;
}
});
Log Streaming
Subscribe to Logs
// Subscribe to real-time log stream
socket.emit("subscribe_logs");
// Subscription confirmed
socket.on("log_subscription_confirmed", (data) => {
console.log("Log subscription:", data.subscribed);
});
Log Stream Events
// Receive log entries
socket.on("log_stream", (data) => {
if (data.type === "log_entry") {
const log = data.payload;
console.log("Log:", {
level: log.level,
time: log.time,
msg: log.msg,
agentId: log.agentId,
agentName: log.agentName,
channelId: log.channelId,
});
}
});
Filter Logs
// Update log filters
socket.emit("update_log_filters", {
agentName: "specific-agent", // or 'all'
level: "info", // minimum log level
});
// Filter update confirmation
socket.on("log_filters_updated", (data) => {
if (data.success) {
console.log("Filters updated:", data.filters);
}
});
Unsubscribe from Logs
// Unsubscribe from log stream
socket.emit("unsubscribe_logs");
Authentication
If the server requires authentication:
socket.on("unauthorized", (reason) => {
console.error("Unauthorized:", reason);
// Handle authentication failure
});
Complete Client Implementation Example
class ChatClient {
constructor() {
this.socket = null;
this.activeChannel = null;
this.userId = "user-uuid";
}
connect() {
this.socket = io("http://localhost:3000");
// Setup event handlers
this.socket.on("connect", () => this.onConnect());
this.socket.on("disconnect", () => this.onDisconnect());
this.socket.on("messageBroadcast", (data) => this.onMessage(data));
this.socket.on("messageError", (data) => this.onError(data));
this.socket.on("controlMessage", (data) => this.onControl(data));
}
joinChannel(channelId, serverId) {
this.activeChannel = channelId;
this.socket.emit("message", {
type: 0, // ROOM_JOINING
payload: {
channelId: channelId,
entityId: this.userId,
serverId: serverId,
},
});
}
sendMessage(text) {
if (!this.activeChannel) return;
this.socket.emit("message", {
type: 1, // SEND_MESSAGE
payload: {
senderId: this.userId,
senderName: "User",
message: text,
channelId: this.activeChannel,
serverId: "00000000-0000-0000-0000-000000000000",
messageId: this.generateUUID(),
source: "web_client",
},
});
}
onConnect() {
console.log("Connected to server");
}
onDisconnect() {
console.log("Disconnected from server");
}
onMessage(data) {
// Handle incoming message
this.displayMessage(data);
}
onError(data) {
console.error("Message error:", data.error);
}
onControl(data) {
if (data.action === "disable_input") {
this.disableInput();
} else if (data.action === "enable_input") {
this.enableInput();
}
}
generateUUID() {
// UUID generation logic
}
}
Server-Side Implementation
export class SocketIORouter {
setupListeners(io: SocketIOServer) {
io.on("connection", (socket: Socket) => {
console.log("New connection:", socket.id);
// Handle channel joining
socket.on(String(SOCKET_MESSAGE_TYPE.ROOM_JOINING), (payload) => {
this.handleChannelJoining(socket, payload);
});
// Handle message sending
socket.on(String(SOCKET_MESSAGE_TYPE.SEND_MESSAGE), (payload) => {
this.handleMessageSubmission(socket, payload);
});
// Handle generic message event
socket.on("message", (data) => {
this.handleGenericMessage(socket, data);
});
// Handle log subscriptions
socket.on("subscribe_logs", () => {
this.handleLogSubscription(socket);
});
// Handle disconnection
socket.on("disconnect", () => {
this.handleDisconnect(socket);
});
});
}
}
Message Flow Architecture
- Client Connects → Server assigns socket ID
- Client Joins Channel → Server adds to channel room
- Client Sends Message → Server validates and stores
- Server Broadcasts → All channel participants receive
- Agent Processes → Response broadcast to channel
Best Practices
Client-Side
-
Implement Reconnection Logic
socket.on("disconnect", () => { // Attempt to rejoin channels on reconnect });
-
Handle Network Failures
socket.on("connect_error", (error) => { // Show offline indicator // Queue messages for retry });
-
Message Deduplication
const processedMessages = new Set(); socket.on("messageBroadcast", (data) => { if (!processedMessages.has(data.id)) { processedMessages.add(data.id); displayMessage(data); } });
-
Clean Up Listeners
componentWillUnmount() { socket.off('messageBroadcast'); socket.off('messageError'); socket.disconnect(); }
Server-Side
-
Validate Input
if (!validateUuid(channelId)) { socket.emit('messageError', { error: 'Invalid channel ID' }); return; }
-
Rate Limiting
const messageRateLimit = new Map(); // Track and limit messages per socket
-
Error Handling
try { await this.serverInstance.createMessage(data); } catch (error) { this.sendErrorResponse(socket, error.message); }
-
Resource Cleanup
socket.on("disconnect", () => { // Remove from active connections // Clean up subscriptions });
Performance Considerations
- Connection Pooling: Reuse single socket connection
- Event Batching: Group multiple updates
- Compression: Enable Socket.IO compression
- Binary Data: Use ArrayBuffer for large payloads
- Room Management: Leave unused channels
Security Considerations
- Input Validation: Validate all incoming data
- Authentication: Implement proper auth checks
- Rate Limiting: Prevent spam and DoS
- Channel Access: Verify user permissions
- Data Sanitization: Clean user input
Debugging
Enable Socket.IO debugging in development:
// Client-side
localStorage.debug = 'socket.io-client:*';
// Server-side
DEBUG=socket.io:* node server.js
Monitor events in browser DevTools:
socket.onAny((event, ...args) => {
console.log(`Event: ${event}`, args);
});