Tasks
Understanding task management in ElizaOS, including Task and TaskWorker interfaces, scheduling patterns, and the TaskService implementation from plugin-bootstrap
Tasks enable agents to perform scheduled or queued operations independently of message processing. They support background processing, recurring activities, and complex workflows that extend beyond simple message-response patterns.
Task Interface
From @elizaos/core/src/types/task.ts
:
/**
* Represents a task to be performed, often in the background or at a later time.
* Tasks are managed by the AgentRuntime and processed by registered TaskWorkers.
*/
export interface Task {
/** Optional. A Universally Unique Identifier for the task. Generated if not provided. */
id?: UUID;
/** The name of the task, which should correspond to a registered TaskWorker.name. */
name: string;
/** Optional. Timestamp of the last update to this task. */
updatedAt?: number;
/** Optional. Metadata associated with the task, conforming to TaskMetadata. */
metadata?: TaskMetadata;
/** A human-readable description of what the task does or its purpose. */
description: string;
/** Optional. The UUID of the room this task is associated with. */
roomId?: UUID;
/** Optional. The UUID of the world this task is associated with. */
worldId?: UUID;
entityId?: UUID;
tags: string[];
}
/**
* Defines metadata associated with a Task.
* This can include scheduling information like updateInterval or UI-related details.
*/
export type TaskMetadata = {
/** Optional. If the task is recurring, this specifies the interval in milliseconds between updates or executions. */
updateInterval?: number;
/** Optional. Describes options or parameters that can be configured for this task, often for UI presentation. */
options?: {
name: string;
description: string;
}[];
/** Allows for other dynamic metadata properties related to the task. */
[key: string]: unknown;
};
Task Workers
From @elizaos/core/src/types/task.ts
:
/**
* Defines the contract for a Task Worker, which is responsible for executing a specific type of task.
* Task workers are registered with the AgentRuntime and are invoked when a Task of their designated name needs processing.
*/
export interface TaskWorker {
/** The unique name of the task type this worker handles. This name links Task instances to this worker. */
name: string;
/**
* The core execution logic for the task. This function is called by the runtime when a task needs to be processed.
* It receives the AgentRuntime, task-specific options, and the Task object itself.
*/
execute: (
runtime: IAgentRuntime,
options: { [key: string]: unknown },
task: Task
) => Promise<void>;
/**
* Optional validation function that can be used to determine if a task is valid or should be executed,
* often based on the current message and state. This might be used by an action or evaluator
* before creating or queueing a task.
*/
validate?: (runtime: IAgentRuntime, message: Memory, state: State) => Promise<boolean>;
}
Creating Task Workers
Basic Task Worker Structure
import type { IAgentRuntime, Task, TaskWorker } from "@elizaos/core";
export const reminderTaskWorker: TaskWorker = {
name: "SEND_REMINDER",
execute: async (runtime: IAgentRuntime, options: { [key: string]: unknown }, task: Task) => {
const { message, timestamp } = options as { message: string; timestamp: number };
// Check if it's time to send the reminder
if (Date.now() < timestamp) {
// Task will be checked again on next tick
return;
}
// Send the reminder
await runtime.messageManager.sendMessage({
roomId: task.roomId!,
content: {
text: `⏰ Reminder: ${message}`,
metadata: {
taskId: task.id,
type: "reminder",
},
},
});
// Non-repeating tasks are automatically deleted after execution
// No need to manually mark as completed
},
validate: async (runtime: IAgentRuntime, message: Memory, state: State) => {
// Optional: Validate if reminders are enabled
return runtime.character.settings?.enableReminders !== false;
},
};
Recurring Task Worker
export const analyticsTaskWorker: TaskWorker = {
name: "COLLECT_ANALYTICS",
execute: async (runtime: IAgentRuntime, options: any, task: Task) => {
// Collect analytics data
const messages = await runtime.getMemories({
tableName: "messages",
roomId: task.roomId,
startTime: task.updatedAt || Date.now() - 3600000, // Last hour
});
const analytics = {
messageCount: messages.length,
uniqueUsers: new Set(messages.map((m) => m.entityId)).size,
averageResponseTime: calculateAvgResponseTime(messages),
sentiment: analyzeSentiment(messages),
};
// Store analytics
await runtime.createMemory(
{
entityId: runtime.agentId,
content: {
text: "Hourly analytics collected",
metadata: analytics,
},
roomId: task.roomId,
},
"analytics"
);
// Update task for next run
await runtime.updateTask({
...task,
updatedAt: Date.now(),
metadata: {
...task.metadata,
lastRun: Date.now(),
runCount: (task.metadata?.runCount || 0) + 1,
},
});
},
validate: async (runtime: IAgentRuntime, message: Memory) => {
// Only enable for specific rooms or conditions
const settings = runtime.character.settings;
return settings?.enableAnalytics === true;
},
};
Complex Workflow Task
export const contentGenerationTaskWorker: TaskWorker = {
name: "GENERATE_CONTENT",
execute: async (runtime: IAgentRuntime, options: any, task: Task) => {
const { topic, format, schedule } = options;
try {
// Step 1: Research topic
const research = await runtime.executeAction("RESEARCH_TOPIC", {
topic,
depth: "comprehensive",
});
// Step 2: Generate content
const content = await runtime.useModel(ModelType.CREATIVE, {
prompt: `Generate ${format} content about ${topic} based on: ${research.summary}`,
temperature: 0.8,
});
// Step 3: Review and refine
const refined = await runtime.executeAction("REFINE_CONTENT", {
content,
style: runtime.character.style,
});
// Step 4: Publish or schedule
if (schedule === "immediate") {
await runtime.sendMessage({
roomId: task.roomId!,
content: {
text: refined.content,
metadata: {
type: "generated_content",
topic,
format,
},
},
});
} else {
// Create scheduled post task
await runtime.createTask({
name: "SCHEDULED_POST",
description: `Post content about ${topic}`,
metadata: {
content: refined.content,
scheduledTime: schedule,
},
roomId: task.roomId,
tags: ["content", "scheduled"],
});
}
// Complete task
await runtime.completeTask(task.id!);
} catch (error) {
runtime.logger.error("Content generation failed", {
taskId: task.id,
error: error.message,
});
// Retry logic
const retryCount = task.metadata?.retryCount || 0;
if (retryCount < 3) {
await runtime.updateTask({
...task,
metadata: {
...task.metadata,
retryCount: retryCount + 1,
lastError: error.message,
},
});
} else {
// Max retries reached
await runtime.failTask(task.id!, error.message);
}
}
},
};
Creating Tasks
Creating Tasks from Actions
import type { Action } from "@elizaos/core";
export const scheduleReminderAction: Action = {
name: "SCHEDULE_REMINDER",
handler: async (runtime, message, state, options, callback) => {
const { reminderText, delay } = extractReminderDetails(message.content.text);
// Register the task worker if not already registered
runtime.registerTaskWorker(reminderTaskWorker);
// Create task with proper tags for the TaskService
const task = await runtime.createTask({
name: "SEND_REMINDER",
description: `Reminder: ${reminderText}`,
metadata: {
message: reminderText,
timestamp: Date.now() + delay,
createdBy: message.entityId,
},
roomId: message.roomId,
entityId: message.entityId,
tags: ["queue", "reminder"], // 'queue' tag required for TaskService to pick it up
});
const response = {
text: `I'll remind you about "${reminderText}" in ${formatDelay(delay)}.`,
actions: ["SCHEDULE_REMINDER"],
metadata: { taskId: task.id },
};
if (callback) {
await callback(response);
}
return task;
},
};
From Evaluators
export const maintenanceEvaluator: Evaluator = {
name: "SYSTEM_MAINTENANCE",
handler: async (runtime, message, state) => {
const memoryUsage = await runtime.getMemoryUsage();
if (memoryUsage.percentage > 80) {
// Create cleanup task
await runtime.createTask({
name: "MEMORY_CLEANUP",
description: "Clean up old memories and optimize storage",
metadata: {
triggerPercentage: memoryUsage.percentage,
targetPercentage: 60,
},
tags: ["maintenance", "system"],
});
}
},
};
Task Management
Querying Tasks
// Get all tasks for a room
const roomTasks = await runtime.getTasks({
roomId: message.roomId,
});
// Get tasks by type
const reminders = await runtime.getTasks({
name: "SEND_REMINDER",
entityId: userId,
});
// Get tasks by tags
const scheduledTasks = await runtime.getTasks({
tags: ["scheduled"],
status: "pending",
});
Updating Tasks
// Update task metadata
await runtime.updateTask({
...task,
metadata: {
...task.metadata,
progress: 50,
lastUpdate: Date.now(),
},
});
// Reschedule task
await runtime.updateTask({
...task,
metadata: {
...task.metadata,
updateInterval: 3600000, // Change to hourly
},
});
Task Lifecycle
// Create task
const task = await runtime.createTask({
name: "PROCESS_DATA",
description: "Process user data batch",
tags: ["data-processing"],
});
// Task is picked up by worker
// Worker executes task logic
// Complete task
await runtime.completeTask(task.id);
// Or fail task
await runtime.failTask(task.id, "Processing error");
// Or cancel task
await runtime.cancelTask(task.id);
Recurring Tasks
Tasks with updateInterval
run periodically:
// Create daily report task
await runtime.createTask({
name: "DAILY_REPORT",
description: "Generate daily activity report",
metadata: {
updateInterval: 86400000, // 24 hours
reportType: "summary",
recipients: ["admin@example.com"],
updatedAt: Date.now(), // Initial timestamp
},
tags: ["queue", "repeat", "reporting"], // 'queue' and 'repeat' tags required
});
// Task worker handles recurrence
export const dailyReportWorker: TaskWorker = {
name: "DAILY_REPORT",
execute: async (runtime, options, task) => {
// Generate report
const report = await generateDailyReport(runtime, task.roomId);
// Send report
await sendReport(report, task.metadata?.recipients);
// Task automatically reschedules based on updateInterval
},
};
Task Coordination
Tasks can create and manage other tasks:
export const orchestratorTaskWorker: TaskWorker = {
name: "CONTENT_PIPELINE",
execute: async (runtime, options, task) => {
const { contentType, topics } = options;
// Create subtasks for each topic
const subtasks = await Promise.all(
topics.map((topic) =>
runtime.createTask({
name: "RESEARCH_TOPIC",
description: `Research ${topic}`,
metadata: {
topic,
parentTaskId: task.id,
stage: "research",
},
roomId: task.roomId,
tags: ["subtask", "research"],
})
)
);
// Update parent task with subtask IDs
await runtime.updateTask({
...task,
metadata: {
...task.metadata,
subtaskIds: subtasks.map((t) => t.id),
status: "awaiting_subtasks",
},
});
// Parent task will check subtask completion
},
};
Error Handling
Robust error handling for tasks:
execute: async (runtime, options, task) => {
const maxRetries = task.metadata?.maxRetries || 3;
const retryCount = task.metadata?.retryCount || 0;
try {
// Execute task logic
await performTaskOperation(options);
// Success - complete task
await runtime.completeTask(task.id!);
} catch (error) {
runtime.logger.error("Task execution failed", {
taskId: task.id,
taskName: task.name,
error: error.message,
retryCount,
});
if (retryCount < maxRetries) {
// Retry with exponential backoff
const backoffDelay = Math.pow(2, retryCount) * 1000;
await runtime.updateTask({
...task,
metadata: {
...task.metadata,
retryCount: retryCount + 1,
nextRetry: Date.now() + backoffDelay,
lastError: error.message,
},
});
} else {
// Max retries exceeded
await runtime.failTask(task.id!, `Failed after ${maxRetries} attempts: ${error.message}`);
// Optionally notify
await runtime.sendMessage({
roomId: task.roomId!,
content: {
text: `Task "${task.description}" failed after multiple attempts.`,
metadata: { taskId: task.id, error: error.message },
},
});
}
}
};
Task Service
The bootstrap plugin provides a comprehensive task service from
@elizaos/plugin-bootstrap/src/services/task.ts
:
import { type IAgentRuntime, Service, ServiceType } from "@elizaos/core";
/**
* TaskService class representing a service that schedules and executes tasks.
* - Checks for tasks every second (configurable via TICK_INTERVAL)
* - Manages recurring tasks with updateInterval
* - Handles one-time tasks that are deleted after execution
* - Supports task validation before execution
*/
export class TaskService extends Service {
private timer: NodeJS.Timeout | null = null;
private readonly TICK_INTERVAL = 1000; // Check every second
static serviceType = ServiceType.TASK;
capabilityDescription = "The agent is able to schedule and execute tasks";
static async start(runtime: IAgentRuntime): Promise<Service> {
const service = new TaskService(runtime);
await service.startTimer();
return service;
}
private async checkTasks() {
try {
// Get all tasks with "queue" tag
const allTasks = await this.runtime.getTasks({
tags: ["queue"],
});
// Validate the tasks
const tasks = await this.validateTasks(allTasks);
const now = Date.now();
for (const task of tasks) {
// Non-repeating tasks execute immediately
if (!task.tags?.includes("repeat")) {
await this.executeTask(task);
continue;
}
// Check if recurring task should run
const taskStartTime = task.updatedAt || task.metadata?.updatedAt || 0;
const updateIntervalMs = task.metadata?.updateInterval ?? 0;
if (now - taskStartTime >= updateIntervalMs) {
await this.executeTask(task);
}
}
} catch (error) {
logger.error("[Bootstrap] Error checking tasks:", error);
}
}
private async executeTask(task: Task) {
try {
const worker = this.runtime.getTaskWorker(task.name);
if (!worker) {
logger.debug(`[Bootstrap] No worker found for task type: ${task.name}`);
return;
}
// Handle repeating vs non-repeating tasks
if (task.tags?.includes("repeat")) {
// Update timestamp for next run
await this.runtime.updateTask(task.id, {
metadata: {
...task.metadata,
updatedAt: Date.now(),
},
});
}
logger.debug(`[Bootstrap] Executing task ${task.name} (${task.id})`);
await worker.execute(this.runtime, task.metadata || {}, task);
// Delete non-repeating tasks after execution
if (!task.tags?.includes("repeat")) {
await this.runtime.deleteTask(task.id);
}
} catch (error) {
logger.error(`[Bootstrap] Error executing task ${task.id}:`, error);
}
}
}
Best Practices
1. Task Design
- Keep tasks focused and single-purpose
- Use descriptive names and descriptions
- Include relevant metadata for debugging
- Tag tasks appropriately for organization
2. Idempotency
execute: async (runtime, options, task) => {
// Check if already processed
const processed = await runtime.getCache(`task-${task.id}-processed`);
if (processed) {
runtime.logger.info("Task already processed", { taskId: task.id });
return;
}
// Process task
await processTask(options);
// Mark as processed
await runtime.setCache(`task-${task.id}-processed`, true);
};
3. Resource Management
execute: async (runtime, options, task) => {
// Acquire resources
const lock = await runtime.acquireLock(`resource-${task.roomId}`);
try {
// Use resources
await performOperation();
} finally {
// Always release
await lock.release();
}
};
4. Monitoring
// Track task metrics
const taskMetrics = {
startTime: Date.now(),
taskId: task.id,
taskName: task.name,
};
try {
await executeTask();
taskMetrics.duration = Date.now() - taskMetrics.startTime;
taskMetrics.status = "success";
} catch (error) {
taskMetrics.duration = Date.now() - taskMetrics.startTime;
taskMetrics.status = "failed";
taskMetrics.error = error.message;
} finally {
await runtime.logMetrics("task_execution", taskMetrics);
}
5. Testing Tasks
import { testTaskWorker } from "@elizaos/core/testing";
describe("ReminderTaskWorker", () => {
it("should send reminder at correct time", async () => {
const task = {
id: "test-task-1",
name: "SEND_REMINDER",
description: "Test reminder",
metadata: {
message: "Test message",
timestamp: Date.now() + 1000,
},
roomId: "test-room",
tags: ["test"],
};
const result = await testTaskWorker(reminderTaskWorker, {
task,
options: task.metadata,
expectedMessages: 1,
expectedCompletion: true,
});
expect(result.completed).toBe(true);
expect(result.messages[0].content.text).toContain("Test message");
});
});
Common Task Patterns
Scheduled Messages
await runtime.createTask({
name: "SCHEDULED_MESSAGE",
metadata: {
message: "Good morning! Ready to start the day?",
scheduleTime: getTomorrowMorning(),
recurring: true,
timezone: "America/New_York",
},
});
Data Synchronization
await runtime.createTask({
name: "SYNC_DATA",
metadata: {
updateInterval: 300000, // 5 minutes
source: "external_api",
lastSync: Date.now(),
},
});
Batch Processing
await runtime.createTask({
name: "BATCH_PROCESS",
metadata: {
batchSize: 100,
totalItems: 1000,
currentOffset: 0,
operation: "import_contacts",
},
});
Next Steps
- Review Agents to understand the runtime context
- Explore Actions that create tasks
- Learn about Evaluators that monitor task performance