Python Integration Guide
Complete guide for Python developers to integrate with ElizaOS, including API clients, workarounds, and best practices
This guide addresses the common challenge Python developers face when working with ElizaOS, which is primarily a TypeScript/Node.js framework. Based on developer feedback about the "steep learning curve with TypeScript," this guide provides practical solutions for Python developers.
Good News: While ElizaOS core is TypeScript-based, you can effectively integrate with it using Python through its REST API and WebSocket interfaces.
Overview
ElizaOS provides several integration points for Python developers:
- REST API - Full HTTP API for all agent operations
- WebSocket/Socket.IO - Real-time communication
- API Client Libraries - Python wrappers for ElizaOS
- External Services - Run Python services alongside ElizaOS
Quick Start with Python
Install Python Dependencies
Create a Python environment and install required packages:
# Create virtual environment
python -m venv eliza-env
source eliza-env/bin/activate # On Windows: eliza-env\Scripts\activate
# Install dependencies
pip install requests websocket-client python-socketio aiohttp
Basic API Client
Create a simple Python client to interact with ElizaOS:
import requests
import json
from typing import Dict, Any, Optional
class ElizaClient:
"""Python client for ElizaOS REST API"""
def __init__(self, base_url: str = "http://localhost:3000"):
self.base_url = base_url
self.session = requests.Session()
self.agent_id = None
def create_agent(self, character: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new agent with character definition"""
response = self.session.post(
f"{self.base_url}/api/agents",
json={"character": character}
)
response.raise_for_status()
result = response.json()
self.agent_id = result["agent"]["id"]
return result
def send_message(self, text: str, user_id: str = "python-user") -> Dict[str, Any]:
"""Send a message to the agent"""
if not self.agent_id:
raise ValueError("No agent created yet")
response = self.session.post(
f"{self.base_url}/api/messaging/submit",
json={
"agentId": self.agent_id,
"text": text,
"userId": user_id,
"roomId": f"room-{user_id}"
}
)
response.raise_for_status()
return response.json()
def get_memories(self, count: int = 10) -> Dict[str, Any]:
"""Retrieve agent memories"""
response = self.session.get(
f"{self.base_url}/api/memory/{self.agent_id}/all",
params={"count": count}
)
response.raise_for_status()
return response.json()
# Example usage
if __name__ == "__main__":
client = ElizaClient()
# Create an agent
character = {
"name": "PythonBot",
"bio": ["A helpful assistant created from Python"],
"settings": {
"model": "gpt-4",
"temperature": 0.7
}
}
agent = client.create_agent(character)
print(f"Created agent: {agent['agent']['id']}")
# Send a message
response = client.send_message("Hello from Python!")
print(f"Response: {response}")
Real-time Communication with Socket.IO
For real-time chat functionality:
import socketio
import asyncio
from typing import Callable, Optional
class ElizaRealtimeClient:
"""Real-time client for ElizaOS using Socket.IO"""
def __init__(self, url: str = "http://localhost:3000"):
self.url = url
self.sio = socketio.AsyncClient()
self.agent_id = None
self.room_id = None
# Register event handlers
self.sio.on("connect", self._on_connect)
self.sio.on("disconnect", self._on_disconnect)
self.sio.on("message", self._on_message)
self.sio.on("error", self._on_error)
async def connect(self, agent_id: str, user_id: str = "python-user"):
"""Connect to ElizaOS WebSocket"""
self.agent_id = agent_id
self.room_id = f"room-{user_id}"
await self.sio.connect(
self.url,
auth={"agentId": agent_id, "userId": user_id}
)
# Join room
await self.sio.emit("join", {
"agentId": agent_id,
"roomId": self.room_id
})
async def send_message(self, text: str):
"""Send message through WebSocket"""
await self.sio.emit("message", {
"agentId": self.agent_id,
"roomId": self.room_id,
"text": text
})
async def _on_connect(self):
print("Connected to ElizaOS")
async def _on_disconnect(self):
print("Disconnected from ElizaOS")
async def _on_message(self, data):
print(f"Received: {data}")
async def _on_error(self, error):
print(f"Error: {error}")
async def disconnect(self):
await self.sio.disconnect()
# Example usage
async def main():
client = ElizaRealtimeClient()
# Connect to existing agent
await client.connect("your-agent-id")
# Send messages
await client.send_message("Hello from Python WebSocket!")
# Keep connection alive
await asyncio.sleep(30)
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Advanced Integration Patterns
Pattern 1: Python Service Integration
Run Python services alongside ElizaOS for specialized functionality:
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
# ElizaOS connection
ELIZA_URL = "http://localhost:3000"
@app.route("/analyze", methods=["POST"])
def analyze_sentiment():
"""Python service for sentiment analysis"""
data = request.json
text = data.get("text", "")
# Perform Python-specific analysis
# (using libraries like TextBlob, NLTK, etc.)
sentiment = analyze_text_sentiment(text)
# Send results back to ElizaOS
response = requests.post(
f"{ELIZA_URL}/api/memory/{data['agentId']}/add",
json={
"type": "sentiment_analysis",
"content": {
"text": text,
"sentiment": sentiment,
"source": "python_analyzer"
}
}
)
return jsonify({
"sentiment": sentiment,
"stored": response.ok
})
def analyze_text_sentiment(text: str) -> dict:
"""Your Python-specific analysis logic"""
# Example using TextBlob
from textblob import TextBlob
blob = TextBlob(text)
return {
"polarity": blob.sentiment.polarity,
"subjectivity": blob.sentiment.subjectivity,
"language": blob.detect_language()
}
if __name__ == "__main__":
app.run(port=5000)
Pattern 2: Custom Action Handler
Create Python-based custom actions:
import asyncio
from typing import Dict, Any
import httpx
class PythonActionHandler:
"""Handle custom actions in Python"""
def __init__(self, eliza_url: str = "http://localhost:3000"):
self.eliza_url = eliza_url
self.actions = {
"ANALYZE_DATA": self.analyze_data,
"GENERATE_REPORT": self.generate_report,
"PROCESS_IMAGE": self.process_image
}
async def listen_for_actions(self, agent_id: str):
"""Poll for actions that need Python processing"""
async with httpx.AsyncClient() as client:
while True:
try:
# Check for pending actions
response = await client.get(
f"{self.eliza_url}/api/agents/{agent_id}/pending-actions"
)
if response.status_code == 200:
actions = response.json().get("actions", [])
for action in actions:
await self.handle_action(action, client)
await asyncio.sleep(1) # Poll interval
except Exception as e:
print(f"Error: {e}")
await asyncio.sleep(5)
async def handle_action(self, action: Dict[str, Any], client: httpx.AsyncClient):
"""Process individual action"""
action_type = action.get("type")
if action_type in self.actions:
result = await self.actions[action_type](action["data"])
# Send result back to ElizaOS
await client.post(
f"{self.eliza_url}/api/actions/{action['id']}/complete",
json={"result": result}
)
async def analyze_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Custom data analysis in Python"""
# Use pandas, numpy, scikit-learn, etc.
import pandas as pd
df = pd.DataFrame(data["records"])
analysis = {
"summary": df.describe().to_dict(),
"correlation": df.corr().to_dict()
}
return analysis
async def generate_report(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate reports using Python libraries"""
# Use matplotlib, seaborn, reportlab, etc.
return {"report_url": "/reports/generated.pdf"}
async def process_image(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process images with Python CV libraries"""
# Use OpenCV, PIL, etc.
return {"processed": True, "features": []}
Working with ElizaOS from Jupyter Notebooks
For data scientists and researchers:
# Cell 1: Setup
import requests
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display, JSON
class ElizaNotebook:
"""Jupyter-friendly ElizaOS client"""
def __init__(self, base_url="http://localhost:3000"):
self.base_url = base_url
self.agent_id = None
self.conversation_history = []
def chat(self, message: str):
"""Interactive chat with display"""
response = requests.post(
f"{self.base_url}/api/messaging/submit",
json={
"agentId": self.agent_id,
"text": message,
"userId": "notebook-user"
}
)
result = response.json()
self.conversation_history.append({
"user": message,
"agent": result.get("text", "")
})
# Display formatted response
display(JSON(result))
return result
# Cell 2: Create client and agent
eliza = ElizaNotebook()
# Create agent with your character
character = {
"name": "DataBot",
"bio": ["I help analyze data and create visualizations"],
"settings": {"model": "gpt-4"}
}
response = requests.post(
f"{eliza.base_url}/api/agents",
json={"character": character}
)
eliza.agent_id = response.json()["agent"]["id"]
# Cell 3: Interactive chat
eliza.chat("Can you help me analyze some sales data?")
# Cell 4: Visualize conversation
df = pd.DataFrame(eliza.conversation_history)
df['user_length'] = df['user'].str.len()
df['agent_length'] = df['agent'].str.len()
plt.figure(figsize=(10, 6))
plt.plot(df.index, df['user_length'], label='User message length')
plt.plot(df.index, df['agent_length'], label='Agent response length')
plt.xlabel('Message number')
plt.ylabel('Character count')
plt.legend()
plt.title('Conversation Analysis')
plt.show()
Common Python Integration Patterns
1. Async Event Processing
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import numpy as np
class AsyncElizaProcessor:
"""Process ElizaOS events asynchronously"""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=4)
async def process_messages_batch(self, messages: list):
"""Process multiple messages in parallel"""
async with aiohttp.ClientSession() as session:
tasks = [
self.process_single_message(session, msg)
for msg in messages
]
return await asyncio.gather(*tasks)
async def process_single_message(self, session, message):
"""Process with CPU-intensive Python operations"""
# Run heavy computation in thread pool
result = await asyncio.get_event_loop().run_in_executor(
self.executor,
self.heavy_computation,
message
)
# Send result to ElizaOS
async with session.post(
"http://localhost:3000/api/messaging/submit",
json={
"agentId": message["agentId"],
"text": f"Processed: {result}",
"metadata": {"processor": "python_async"}
}
) as response:
return await response.json()
def heavy_computation(self, message):
"""CPU-intensive Python operation"""
# Example: NLP, ML inference, data processing
text = message.get("text", "")
# Simulate heavy processing
import time
time.sleep(0.1)
return f"Processed {len(text)} characters"
2. Data Pipeline Integration
import pandas as pd
from sqlalchemy import create_engine
import schedule
import time
class ElizaDataPipeline:
"""ETL pipeline for ElizaOS data"""
def __init__(self, eliza_db_path: str):
self.engine = create_engine(f'sqlite:///{eliza_db_path}')
self.eliza_api = "http://localhost:3000"
def extract_memories(self) -> pd.DataFrame:
"""Extract memories from ElizaOS database"""
query = """
SELECT
id,
type,
content,
embedding,
created_at
FROM memories
WHERE created_at > datetime('now', '-1 day')
"""
return pd.read_sql(query, self.engine)
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Transform and enrich data"""
# Parse JSON content
df['content_parsed'] = df['content'].apply(json.loads)
# Extract text
df['text'] = df['content_parsed'].apply(
lambda x: x.get('text', '')
)
# Add sentiment analysis
df['sentiment'] = df['text'].apply(self.analyze_sentiment)
# Add topic modeling
df['topics'] = self.extract_topics(df['text'].tolist())
return df
def load_insights(self, df: pd.DataFrame):
"""Load insights back to ElizaOS"""
insights = self.generate_insights(df)
# Send insights as memories
for insight in insights:
requests.post(
f"{self.eliza_api}/api/memory/add",
json={
"type": "insight",
"content": insight
}
)
def run_pipeline(self):
"""Run complete ETL pipeline"""
df = self.extract_memories()
df_transformed = self.transform_data(df)
self.load_insights(df_transformed)
print(f"Processed {len(df)} memories")
# Schedule pipeline
pipeline = ElizaDataPipeline("./eliza.db")
schedule.every(1).hours.do(pipeline.run_pipeline)
while True:
schedule.run_pending()
time.sleep(60)
Best Practices for Python Integration
Pro Tip: Use Python for what it does best - data analysis, ML, scientific computing - while letting ElizaOS handle the agent orchestration and conversation flow.
1. Use Appropriate Tools
Task | Use ElizaOS | Use Python |
---|---|---|
Agent orchestration | ✅ Built for this | ❌ Complex to implement |
Conversation management | ✅ Native support | ❌ Requires extra work |
Data analysis | ⚠️ Limited libraries | ✅ Rich ecosystem |
Machine learning | ⚠️ Basic support | ✅ Full ML stack |
Scientific computing | ❌ Not designed for | ✅ NumPy, SciPy, etc. |
Image processing | ⚠️ Basic only | ✅ OpenCV, PIL, etc. |
2. Error Handling
class RobustElizaClient:
"""Client with comprehensive error handling"""
def __init__(self, base_url: str, max_retries: int = 3):
self.base_url = base_url
self.max_retries = max_retries
def _make_request(self, method: str, endpoint: str, **kwargs):
"""Make request with retry logic"""
from time import sleep
for attempt in range(self.max_retries):
try:
response = requests.request(
method,
f"{self.base_url}{endpoint}",
**kwargs
)
if response.status_code == 429: # Rate limited
sleep(2 ** attempt) # Exponential backoff
continue
response.raise_for_status()
return response.json()
except requests.exceptions.ConnectionError:
if attempt == self.max_retries - 1:
raise
sleep(1)
except requests.exceptions.HTTPError as e:
if 500 <= e.response.status_code < 600:
# Server error, retry
if attempt < self.max_retries - 1:
sleep(2 ** attempt)
continue
raise
3. Type Safety with Pydantic
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
class ElizaMessage(BaseModel):
"""Type-safe message model"""
agent_id: str = Field(alias="agentId")
text: str
user_id: str = Field(alias="userId")
room_id: Optional[str] = Field(alias="roomId", default=None)
metadata: Optional[dict] = None
class ElizaResponse(BaseModel):
"""Type-safe response model"""
id: str
text: str
action: Optional[str] = None
timestamp: datetime
confidence: Optional[float] = None
class TypedElizaClient:
"""Type-safe client using Pydantic"""
def send_message(self, message: ElizaMessage) -> ElizaResponse:
response = requests.post(
f"{self.base_url}/api/messaging/submit",
json=message.dict(by_alias=True)
)
return ElizaResponse(**response.json())
Deployment Considerations
Running Python Services with ElizaOS
version: "3.8"
services:
eliza:
image: elizaos/eliza:latest
ports:
- "3000:3000"
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/eliza
depends_on:
- db
python-service:
build: ./python-service
ports:
- "5000:5000"
environment:
- ELIZA_URL=http://eliza:3000
depends_on:
- eliza
db:
image: postgres:15
environment:
- POSTGRES_PASSWORD=password
- POSTGRES_DB=eliza
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]
Troubleshooting Python Integration
Common Issues and Solutions
-
Connection Refused
# Check if ElizaOS is running try: response = requests.get("http://localhost:3000/api/health") print(f"ElizaOS status: {response.json()}") except requests.exceptions.ConnectionError: print("ElizaOS is not running. Start with: bun start")
-
Authentication Errors
# Some endpoints require authentication headers = { "Authorization": f"Bearer {your_api_key}", "Content-Type": "application/json" }
-
Encoding Issues
# Handle Unicode properly response = requests.post( url, json=data, headers={"Content-Type": "application/json; charset=utf-8"} )
Next Steps
REST API Reference
Complete API documentation
WebSocket Events
Real-time communication guide
Deployment Guide
Deploy Python + ElizaOS
Example Projects
Python integration examples
Remember: You don't need to rewrite ElizaOS in Python. Use Python for what it excels at while leveraging ElizaOS's agent capabilities through its APIs.