elizaOS

Python Integration Guide

Complete guide for Python developers to integrate with ElizaOS, including API clients, workarounds, and best practices

This guide addresses the common challenge Python developers face when working with ElizaOS, which is primarily a TypeScript/Node.js framework. Based on developer feedback about the "steep learning curve with TypeScript," this guide provides practical solutions for Python developers.

Good News: While ElizaOS core is TypeScript-based, you can effectively integrate with it using Python through its REST API and WebSocket interfaces.

Overview

ElizaOS provides several integration points for Python developers:

  1. REST API - Full HTTP API for all agent operations
  2. WebSocket/Socket.IO - Real-time communication
  3. API Client Libraries - Python wrappers for ElizaOS
  4. External Services - Run Python services alongside ElizaOS

Quick Start with Python

Install Python Dependencies

Create a Python environment and install required packages:

# Create virtual environment
python -m venv eliza-env
source eliza-env/bin/activate  # On Windows: eliza-env\Scripts\activate

# Install dependencies
pip install requests websocket-client python-socketio aiohttp

Basic API Client

Create a simple Python client to interact with ElizaOS:

eliza_client.py
import requests
import json
from typing import Dict, Any, Optional

class ElizaClient:
    """Python client for ElizaOS REST API"""
    
    def __init__(self, base_url: str = "http://localhost:3000"):
        self.base_url = base_url
        self.session = requests.Session()
        self.agent_id = None
    
    def create_agent(self, character: Dict[str, Any]) -> Dict[str, Any]:
        """Create a new agent with character definition"""
        response = self.session.post(
            f"{self.base_url}/api/agents",
            json={"character": character}
        )
        response.raise_for_status()
        result = response.json()
        self.agent_id = result["agent"]["id"]
        return result
    
    def send_message(self, text: str, user_id: str = "python-user") -> Dict[str, Any]:
        """Send a message to the agent"""
        if not self.agent_id:
            raise ValueError("No agent created yet")
        
        response = self.session.post(
            f"{self.base_url}/api/messaging/submit",
            json={
                "agentId": self.agent_id,
                "text": text,
                "userId": user_id,
                "roomId": f"room-{user_id}"
            }
        )
        response.raise_for_status()
        return response.json()
    
    def get_memories(self, count: int = 10) -> Dict[str, Any]:
        """Retrieve agent memories"""
        response = self.session.get(
            f"{self.base_url}/api/memory/{self.agent_id}/all",
            params={"count": count}
        )
        response.raise_for_status()
        return response.json()

# Example usage
if __name__ == "__main__":
    client = ElizaClient()
    
    # Create an agent
    character = {
        "name": "PythonBot",
        "bio": ["A helpful assistant created from Python"],
        "settings": {
            "model": "gpt-4",
            "temperature": 0.7
        }
    }
    
    agent = client.create_agent(character)
    print(f"Created agent: {agent['agent']['id']}")
    
    # Send a message
    response = client.send_message("Hello from Python!")
    print(f"Response: {response}")

Real-time Communication with Socket.IO

For real-time chat functionality:

eliza_socketio.py
import socketio
import asyncio
from typing import Callable, Optional

class ElizaRealtimeClient:
    """Real-time client for ElizaOS using Socket.IO"""
    
    def __init__(self, url: str = "http://localhost:3000"):
        self.url = url
        self.sio = socketio.AsyncClient()
        self.agent_id = None
        self.room_id = None
        
        # Register event handlers
        self.sio.on("connect", self._on_connect)
        self.sio.on("disconnect", self._on_disconnect)
        self.sio.on("message", self._on_message)
        self.sio.on("error", self._on_error)
    
    async def connect(self, agent_id: str, user_id: str = "python-user"):
        """Connect to ElizaOS WebSocket"""
        self.agent_id = agent_id
        self.room_id = f"room-{user_id}"
        
        await self.sio.connect(
            self.url,
            auth={"agentId": agent_id, "userId": user_id}
        )
        
        # Join room
        await self.sio.emit("join", {
            "agentId": agent_id,
            "roomId": self.room_id
        })
    
    async def send_message(self, text: str):
        """Send message through WebSocket"""
        await self.sio.emit("message", {
            "agentId": self.agent_id,
            "roomId": self.room_id,
            "text": text
        })
    
    async def _on_connect(self):
        print("Connected to ElizaOS")
    
    async def _on_disconnect(self):
        print("Disconnected from ElizaOS")
    
    async def _on_message(self, data):
        print(f"Received: {data}")
    
    async def _on_error(self, error):
        print(f"Error: {error}")
    
    async def disconnect(self):
        await self.sio.disconnect()

# Example usage
async def main():
    client = ElizaRealtimeClient()
    
    # Connect to existing agent
    await client.connect("your-agent-id")
    
    # Send messages
    await client.send_message("Hello from Python WebSocket!")
    
    # Keep connection alive
    await asyncio.sleep(30)
    
    await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Advanced Integration Patterns

Pattern 1: Python Service Integration

Run Python services alongside ElizaOS for specialized functionality:

python_service.py
from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

# ElizaOS connection
ELIZA_URL = "http://localhost:3000"

@app.route("/analyze", methods=["POST"])
def analyze_sentiment():
    """Python service for sentiment analysis"""
    data = request.json
    text = data.get("text", "")
    
    # Perform Python-specific analysis
    # (using libraries like TextBlob, NLTK, etc.)
    sentiment = analyze_text_sentiment(text)
    
    # Send results back to ElizaOS
    response = requests.post(
        f"{ELIZA_URL}/api/memory/{data['agentId']}/add",
        json={
            "type": "sentiment_analysis",
            "content": {
                "text": text,
                "sentiment": sentiment,
                "source": "python_analyzer"
            }
        }
    )
    
    return jsonify({
        "sentiment": sentiment,
        "stored": response.ok
    })

def analyze_text_sentiment(text: str) -> dict:
    """Your Python-specific analysis logic"""
    # Example using TextBlob
    from textblob import TextBlob
    blob = TextBlob(text)
    
    return {
        "polarity": blob.sentiment.polarity,
        "subjectivity": blob.sentiment.subjectivity,
        "language": blob.detect_language()
    }

if __name__ == "__main__":
    app.run(port=5000)

Pattern 2: Custom Action Handler

Create Python-based custom actions:

custom_actions.py
import asyncio
from typing import Dict, Any
import httpx

class PythonActionHandler:
    """Handle custom actions in Python"""
    
    def __init__(self, eliza_url: str = "http://localhost:3000"):
        self.eliza_url = eliza_url
        self.actions = {
            "ANALYZE_DATA": self.analyze_data,
            "GENERATE_REPORT": self.generate_report,
            "PROCESS_IMAGE": self.process_image
        }
    
    async def listen_for_actions(self, agent_id: str):
        """Poll for actions that need Python processing"""
        async with httpx.AsyncClient() as client:
            while True:
                try:
                    # Check for pending actions
                    response = await client.get(
                        f"{self.eliza_url}/api/agents/{agent_id}/pending-actions"
                    )
                    
                    if response.status_code == 200:
                        actions = response.json().get("actions", [])
                        
                        for action in actions:
                            await self.handle_action(action, client)
                    
                    await asyncio.sleep(1)  # Poll interval
                    
                except Exception as e:
                    print(f"Error: {e}")
                    await asyncio.sleep(5)
    
    async def handle_action(self, action: Dict[str, Any], client: httpx.AsyncClient):
        """Process individual action"""
        action_type = action.get("type")
        
        if action_type in self.actions:
            result = await self.actions[action_type](action["data"])
            
            # Send result back to ElizaOS
            await client.post(
                f"{self.eliza_url}/api/actions/{action['id']}/complete",
                json={"result": result}
            )
    
    async def analyze_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Custom data analysis in Python"""
        # Use pandas, numpy, scikit-learn, etc.
        import pandas as pd
        
        df = pd.DataFrame(data["records"])
        analysis = {
            "summary": df.describe().to_dict(),
            "correlation": df.corr().to_dict()
        }
        
        return analysis
    
    async def generate_report(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Generate reports using Python libraries"""
        # Use matplotlib, seaborn, reportlab, etc.
        return {"report_url": "/reports/generated.pdf"}
    
    async def process_image(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Process images with Python CV libraries"""
        # Use OpenCV, PIL, etc.
        return {"processed": True, "features": []}

Working with ElizaOS from Jupyter Notebooks

For data scientists and researchers:

eliza_notebook.ipynb
# Cell 1: Setup
import requests
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display, JSON

class ElizaNotebook:
    """Jupyter-friendly ElizaOS client"""
    
    def __init__(self, base_url="http://localhost:3000"):
        self.base_url = base_url
        self.agent_id = None
        self.conversation_history = []
    
    def chat(self, message: str):
        """Interactive chat with display"""
        response = requests.post(
            f"{self.base_url}/api/messaging/submit",
            json={
                "agentId": self.agent_id,
                "text": message,
                "userId": "notebook-user"
            }
        )
        
        result = response.json()
        self.conversation_history.append({
            "user": message,
            "agent": result.get("text", "")
        })
        
        # Display formatted response
        display(JSON(result))
        return result

# Cell 2: Create client and agent
eliza = ElizaNotebook()

# Create agent with your character
character = {
    "name": "DataBot",
    "bio": ["I help analyze data and create visualizations"],
    "settings": {"model": "gpt-4"}
}

response = requests.post(
    f"{eliza.base_url}/api/agents",
    json={"character": character}
)
eliza.agent_id = response.json()["agent"]["id"]

# Cell 3: Interactive chat
eliza.chat("Can you help me analyze some sales data?")

# Cell 4: Visualize conversation
df = pd.DataFrame(eliza.conversation_history)
df['user_length'] = df['user'].str.len()
df['agent_length'] = df['agent'].str.len()

plt.figure(figsize=(10, 6))
plt.plot(df.index, df['user_length'], label='User message length')
plt.plot(df.index, df['agent_length'], label='Agent response length')
plt.xlabel('Message number')
plt.ylabel('Character count')
plt.legend()
plt.title('Conversation Analysis')
plt.show()

Common Python Integration Patterns

1. Async Event Processing

async_processor.py
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import numpy as np

class AsyncElizaProcessor:
    """Process ElizaOS events asynchronously"""
    
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=4)
    
    async def process_messages_batch(self, messages: list):
        """Process multiple messages in parallel"""
        async with aiohttp.ClientSession() as session:
            tasks = [
                self.process_single_message(session, msg)
                for msg in messages
            ]
            return await asyncio.gather(*tasks)
    
    async def process_single_message(self, session, message):
        """Process with CPU-intensive Python operations"""
        # Run heavy computation in thread pool
        result = await asyncio.get_event_loop().run_in_executor(
            self.executor,
            self.heavy_computation,
            message
        )
        
        # Send result to ElizaOS
        async with session.post(
            "http://localhost:3000/api/messaging/submit",
            json={
                "agentId": message["agentId"],
                "text": f"Processed: {result}",
                "metadata": {"processor": "python_async"}
            }
        ) as response:
            return await response.json()
    
    def heavy_computation(self, message):
        """CPU-intensive Python operation"""
        # Example: NLP, ML inference, data processing
        text = message.get("text", "")
        
        # Simulate heavy processing
        import time
        time.sleep(0.1)
        
        return f"Processed {len(text)} characters"

2. Data Pipeline Integration

data_pipeline.py
import pandas as pd
from sqlalchemy import create_engine
import schedule
import time

class ElizaDataPipeline:
    """ETL pipeline for ElizaOS data"""
    
    def __init__(self, eliza_db_path: str):
        self.engine = create_engine(f'sqlite:///{eliza_db_path}')
        self.eliza_api = "http://localhost:3000"
    
    def extract_memories(self) -> pd.DataFrame:
        """Extract memories from ElizaOS database"""
        query = """
        SELECT 
            id,
            type,
            content,
            embedding,
            created_at
        FROM memories
        WHERE created_at > datetime('now', '-1 day')
        """
        return pd.read_sql(query, self.engine)
    
    def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Transform and enrich data"""
        # Parse JSON content
        df['content_parsed'] = df['content'].apply(json.loads)
        
        # Extract text
        df['text'] = df['content_parsed'].apply(
            lambda x: x.get('text', '')
        )
        
        # Add sentiment analysis
        df['sentiment'] = df['text'].apply(self.analyze_sentiment)
        
        # Add topic modeling
        df['topics'] = self.extract_topics(df['text'].tolist())
        
        return df
    
    def load_insights(self, df: pd.DataFrame):
        """Load insights back to ElizaOS"""
        insights = self.generate_insights(df)
        
        # Send insights as memories
        for insight in insights:
            requests.post(
                f"{self.eliza_api}/api/memory/add",
                json={
                    "type": "insight",
                    "content": insight
                }
            )
    
    def run_pipeline(self):
        """Run complete ETL pipeline"""
        df = self.extract_memories()
        df_transformed = self.transform_data(df)
        self.load_insights(df_transformed)
        print(f"Processed {len(df)} memories")

# Schedule pipeline
pipeline = ElizaDataPipeline("./eliza.db")
schedule.every(1).hours.do(pipeline.run_pipeline)

while True:
    schedule.run_pending()
    time.sleep(60)

Best Practices for Python Integration

Pro Tip: Use Python for what it does best - data analysis, ML, scientific computing - while letting ElizaOS handle the agent orchestration and conversation flow.

1. Use Appropriate Tools

TaskUse ElizaOSUse Python
Agent orchestration✅ Built for this❌ Complex to implement
Conversation management✅ Native support❌ Requires extra work
Data analysis⚠️ Limited libraries✅ Rich ecosystem
Machine learning⚠️ Basic support✅ Full ML stack
Scientific computing❌ Not designed for✅ NumPy, SciPy, etc.
Image processing⚠️ Basic only✅ OpenCV, PIL, etc.

2. Error Handling

class RobustElizaClient:
    """Client with comprehensive error handling"""
    
    def __init__(self, base_url: str, max_retries: int = 3):
        self.base_url = base_url
        self.max_retries = max_retries
    
    def _make_request(self, method: str, endpoint: str, **kwargs):
        """Make request with retry logic"""
        from time import sleep
        
        for attempt in range(self.max_retries):
            try:
                response = requests.request(
                    method,
                    f"{self.base_url}{endpoint}",
                    **kwargs
                )
                
                if response.status_code == 429:  # Rate limited
                    sleep(2 ** attempt)  # Exponential backoff
                    continue
                
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.ConnectionError:
                if attempt == self.max_retries - 1:
                    raise
                sleep(1)
                
            except requests.exceptions.HTTPError as e:
                if 500 <= e.response.status_code < 600:
                    # Server error, retry
                    if attempt < self.max_retries - 1:
                        sleep(2 ** attempt)
                        continue
                raise

3. Type Safety with Pydantic

from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime

class ElizaMessage(BaseModel):
    """Type-safe message model"""
    agent_id: str = Field(alias="agentId")
    text: str
    user_id: str = Field(alias="userId")
    room_id: Optional[str] = Field(alias="roomId", default=None)
    metadata: Optional[dict] = None

class ElizaResponse(BaseModel):
    """Type-safe response model"""
    id: str
    text: str
    action: Optional[str] = None
    timestamp: datetime
    confidence: Optional[float] = None

class TypedElizaClient:
    """Type-safe client using Pydantic"""
    
    def send_message(self, message: ElizaMessage) -> ElizaResponse:
        response = requests.post(
            f"{self.base_url}/api/messaging/submit",
            json=message.dict(by_alias=True)
        )
        
        return ElizaResponse(**response.json())

Deployment Considerations

Running Python Services with ElizaOS

docker-compose.yml
version: "3.8"

services:
  eliza:
    image: elizaos/eliza:latest
    ports:
      - "3000:3000"
    environment:
      - DATABASE_URL=postgresql://postgres:password@db:5432/eliza
    depends_on:
      - db

  python-service:
    build: ./python-service
    ports:
      - "5000:5000"
    environment:
      - ELIZA_URL=http://eliza:3000
    depends_on:
      - eliza

  db:
    image: postgres:15
    environment:
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=eliza
python-service/Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]

Troubleshooting Python Integration

Common Issues and Solutions

  1. Connection Refused

    # Check if ElizaOS is running
    try:
        response = requests.get("http://localhost:3000/api/health")
        print(f"ElizaOS status: {response.json()}")
    except requests.exceptions.ConnectionError:
        print("ElizaOS is not running. Start with: bun start")
  2. Authentication Errors

    # Some endpoints require authentication
    headers = {
        "Authorization": f"Bearer {your_api_key}",
        "Content-Type": "application/json"
    }
  3. Encoding Issues

    # Handle Unicode properly
    response = requests.post(
        url,
        json=data,
        headers={"Content-Type": "application/json; charset=utf-8"}
    )

Next Steps

Remember: You don't need to rewrite ElizaOS in Python. Use Python for what it excels at while leveraging ElizaOS's agent capabilities through its APIs.