elizaOS

Tasks

Understanding task management in ElizaOS, including Task and TaskWorker interfaces, scheduling patterns, and the TaskService implementation from plugin-bootstrap

Tasks enable agents to perform scheduled or queued operations independently of message processing. They support background processing, recurring activities, and complex workflows that extend beyond simple message-response patterns.

Task Interface

From @elizaos/core/src/types/task.ts:

/**
 * Represents a task to be performed, often in the background or at a later time.
 * Tasks are managed by the AgentRuntime and processed by registered TaskWorkers.
 */
export interface Task {
  /** Optional. A Universally Unique Identifier for the task. Generated if not provided. */
  id?: UUID;

  /** The name of the task, which should correspond to a registered TaskWorker.name. */
  name: string;

  /** Optional. Timestamp of the last update to this task. */
  updatedAt?: number;

  /** Optional. Metadata associated with the task, conforming to TaskMetadata. */
  metadata?: TaskMetadata;

  /** A human-readable description of what the task does or its purpose. */
  description: string;

  /** Optional. The UUID of the room this task is associated with. */
  roomId?: UUID;

  /** Optional. The UUID of the world this task is associated with. */
  worldId?: UUID;

  entityId?: UUID;

  tags: string[];
}

/**
 * Defines metadata associated with a Task.
 * This can include scheduling information like updateInterval or UI-related details.
 */
export type TaskMetadata = {
  /** Optional. If the task is recurring, this specifies the interval in milliseconds between updates or executions. */
  updateInterval?: number;

  /** Optional. Describes options or parameters that can be configured for this task, often for UI presentation. */
  options?: {
    name: string;
    description: string;
  }[];

  /** Allows for other dynamic metadata properties related to the task. */
  [key: string]: unknown;
};

Task Workers

From @elizaos/core/src/types/task.ts:

/**
 * Defines the contract for a Task Worker, which is responsible for executing a specific type of task.
 * Task workers are registered with the AgentRuntime and are invoked when a Task of their designated name needs processing.
 */
export interface TaskWorker {
  /** The unique name of the task type this worker handles. This name links Task instances to this worker. */
  name: string;

  /**
   * The core execution logic for the task. This function is called by the runtime when a task needs to be processed.
   * It receives the AgentRuntime, task-specific options, and the Task object itself.
   */
  execute: (
    runtime: IAgentRuntime,
    options: { [key: string]: unknown },
    task: Task
  ) => Promise<void>;

  /**
   * Optional validation function that can be used to determine if a task is valid or should be executed,
   * often based on the current message and state. This might be used by an action or evaluator
   * before creating or queueing a task.
   */
  validate?: (runtime: IAgentRuntime, message: Memory, state: State) => Promise<boolean>;
}

Creating Task Workers

Basic Task Worker Structure

import type { IAgentRuntime, Task, TaskWorker } from "@elizaos/core";

export const reminderTaskWorker: TaskWorker = {
  name: "SEND_REMINDER",

  execute: async (runtime: IAgentRuntime, options: { [key: string]: unknown }, task: Task) => {
    const { message, timestamp } = options as { message: string; timestamp: number };

    // Check if it's time to send the reminder
    if (Date.now() < timestamp) {
      // Task will be checked again on next tick
      return;
    }

    // Send the reminder
    await runtime.messageManager.sendMessage({
      roomId: task.roomId!,
      content: {
        text: `⏰ Reminder: ${message}`,
        metadata: {
          taskId: task.id,
          type: "reminder",
        },
      },
    });

    // Non-repeating tasks are automatically deleted after execution
    // No need to manually mark as completed
  },

  validate: async (runtime: IAgentRuntime, message: Memory, state: State) => {
    // Optional: Validate if reminders are enabled
    return runtime.character.settings?.enableReminders !== false;
  },
};

Recurring Task Worker

export const analyticsTaskWorker: TaskWorker = {
  name: "COLLECT_ANALYTICS",

  execute: async (runtime: IAgentRuntime, options: any, task: Task) => {
    // Collect analytics data
    const messages = await runtime.getMemories({
      tableName: "messages",
      roomId: task.roomId,
      startTime: task.updatedAt || Date.now() - 3600000, // Last hour
    });

    const analytics = {
      messageCount: messages.length,
      uniqueUsers: new Set(messages.map((m) => m.entityId)).size,
      averageResponseTime: calculateAvgResponseTime(messages),
      sentiment: analyzeSentiment(messages),
    };

    // Store analytics
    await runtime.createMemory(
      {
        entityId: runtime.agentId,
        content: {
          text: "Hourly analytics collected",
          metadata: analytics,
        },
        roomId: task.roomId,
      },
      "analytics"
    );

    // Update task for next run
    await runtime.updateTask({
      ...task,
      updatedAt: Date.now(),
      metadata: {
        ...task.metadata,
        lastRun: Date.now(),
        runCount: (task.metadata?.runCount || 0) + 1,
      },
    });
  },

  validate: async (runtime: IAgentRuntime, message: Memory) => {
    // Only enable for specific rooms or conditions
    const settings = runtime.character.settings;
    return settings?.enableAnalytics === true;
  },
};

Complex Workflow Task

export const contentGenerationTaskWorker: TaskWorker = {
  name: "GENERATE_CONTENT",

  execute: async (runtime: IAgentRuntime, options: any, task: Task) => {
    const { topic, format, schedule } = options;

    try {
      // Step 1: Research topic
      const research = await runtime.executeAction("RESEARCH_TOPIC", {
        topic,
        depth: "comprehensive",
      });

      // Step 2: Generate content
      const content = await runtime.useModel(ModelType.CREATIVE, {
        prompt: `Generate ${format} content about ${topic} based on: ${research.summary}`,
        temperature: 0.8,
      });

      // Step 3: Review and refine
      const refined = await runtime.executeAction("REFINE_CONTENT", {
        content,
        style: runtime.character.style,
      });

      // Step 4: Publish or schedule
      if (schedule === "immediate") {
        await runtime.sendMessage({
          roomId: task.roomId!,
          content: {
            text: refined.content,
            metadata: {
              type: "generated_content",
              topic,
              format,
            },
          },
        });
      } else {
        // Create scheduled post task
        await runtime.createTask({
          name: "SCHEDULED_POST",
          description: `Post content about ${topic}`,
          metadata: {
            content: refined.content,
            scheduledTime: schedule,
          },
          roomId: task.roomId,
          tags: ["content", "scheduled"],
        });
      }

      // Complete task
      await runtime.completeTask(task.id!);
    } catch (error) {
      runtime.logger.error("Content generation failed", {
        taskId: task.id,
        error: error.message,
      });

      // Retry logic
      const retryCount = task.metadata?.retryCount || 0;
      if (retryCount < 3) {
        await runtime.updateTask({
          ...task,
          metadata: {
            ...task.metadata,
            retryCount: retryCount + 1,
            lastError: error.message,
          },
        });
      } else {
        // Max retries reached
        await runtime.failTask(task.id!, error.message);
      }
    }
  },
};

Creating Tasks

Creating Tasks from Actions

import type { Action } from "@elizaos/core";

export const scheduleReminderAction: Action = {
  name: "SCHEDULE_REMINDER",

  handler: async (runtime, message, state, options, callback) => {
    const { reminderText, delay } = extractReminderDetails(message.content.text);

    // Register the task worker if not already registered
    runtime.registerTaskWorker(reminderTaskWorker);

    // Create task with proper tags for the TaskService
    const task = await runtime.createTask({
      name: "SEND_REMINDER",
      description: `Reminder: ${reminderText}`,
      metadata: {
        message: reminderText,
        timestamp: Date.now() + delay,
        createdBy: message.entityId,
      },
      roomId: message.roomId,
      entityId: message.entityId,
      tags: ["queue", "reminder"], // 'queue' tag required for TaskService to pick it up
    });

    const response = {
      text: `I'll remind you about "${reminderText}" in ${formatDelay(delay)}.`,
      actions: ["SCHEDULE_REMINDER"],
      metadata: { taskId: task.id },
    };

    if (callback) {
      await callback(response);
    }

    return task;
  },
};

From Evaluators

export const maintenanceEvaluator: Evaluator = {
  name: "SYSTEM_MAINTENANCE",

  handler: async (runtime, message, state) => {
    const memoryUsage = await runtime.getMemoryUsage();

    if (memoryUsage.percentage > 80) {
      // Create cleanup task
      await runtime.createTask({
        name: "MEMORY_CLEANUP",
        description: "Clean up old memories and optimize storage",
        metadata: {
          triggerPercentage: memoryUsage.percentage,
          targetPercentage: 60,
        },
        tags: ["maintenance", "system"],
      });
    }
  },
};

Task Management

Querying Tasks

// Get all tasks for a room
const roomTasks = await runtime.getTasks({
  roomId: message.roomId,
});

// Get tasks by type
const reminders = await runtime.getTasks({
  name: "SEND_REMINDER",
  entityId: userId,
});

// Get tasks by tags
const scheduledTasks = await runtime.getTasks({
  tags: ["scheduled"],
  status: "pending",
});

Updating Tasks

// Update task metadata
await runtime.updateTask({
  ...task,
  metadata: {
    ...task.metadata,
    progress: 50,
    lastUpdate: Date.now(),
  },
});

// Reschedule task
await runtime.updateTask({
  ...task,
  metadata: {
    ...task.metadata,
    updateInterval: 3600000, // Change to hourly
  },
});

Task Lifecycle

// Create task
const task = await runtime.createTask({
  name: "PROCESS_DATA",
  description: "Process user data batch",
  tags: ["data-processing"],
});

// Task is picked up by worker
// Worker executes task logic

// Complete task
await runtime.completeTask(task.id);

// Or fail task
await runtime.failTask(task.id, "Processing error");

// Or cancel task
await runtime.cancelTask(task.id);

Recurring Tasks

Tasks with updateInterval run periodically:

// Create daily report task
await runtime.createTask({
  name: "DAILY_REPORT",
  description: "Generate daily activity report",
  metadata: {
    updateInterval: 86400000, // 24 hours
    reportType: "summary",
    recipients: ["admin@example.com"],
    updatedAt: Date.now(), // Initial timestamp
  },
  tags: ["queue", "repeat", "reporting"], // 'queue' and 'repeat' tags required
});

// Task worker handles recurrence
export const dailyReportWorker: TaskWorker = {
  name: "DAILY_REPORT",

  execute: async (runtime, options, task) => {
    // Generate report
    const report = await generateDailyReport(runtime, task.roomId);

    // Send report
    await sendReport(report, task.metadata?.recipients);

    // Task automatically reschedules based on updateInterval
  },
};

Task Coordination

Tasks can create and manage other tasks:

export const orchestratorTaskWorker: TaskWorker = {
  name: "CONTENT_PIPELINE",

  execute: async (runtime, options, task) => {
    const { contentType, topics } = options;

    // Create subtasks for each topic
    const subtasks = await Promise.all(
      topics.map((topic) =>
        runtime.createTask({
          name: "RESEARCH_TOPIC",
          description: `Research ${topic}`,
          metadata: {
            topic,
            parentTaskId: task.id,
            stage: "research",
          },
          roomId: task.roomId,
          tags: ["subtask", "research"],
        })
      )
    );

    // Update parent task with subtask IDs
    await runtime.updateTask({
      ...task,
      metadata: {
        ...task.metadata,
        subtaskIds: subtasks.map((t) => t.id),
        status: "awaiting_subtasks",
      },
    });

    // Parent task will check subtask completion
  },
};

Error Handling

Robust error handling for tasks:

execute: async (runtime, options, task) => {
  const maxRetries = task.metadata?.maxRetries || 3;
  const retryCount = task.metadata?.retryCount || 0;

  try {
    // Execute task logic
    await performTaskOperation(options);

    // Success - complete task
    await runtime.completeTask(task.id!);
  } catch (error) {
    runtime.logger.error("Task execution failed", {
      taskId: task.id,
      taskName: task.name,
      error: error.message,
      retryCount,
    });

    if (retryCount < maxRetries) {
      // Retry with exponential backoff
      const backoffDelay = Math.pow(2, retryCount) * 1000;

      await runtime.updateTask({
        ...task,
        metadata: {
          ...task.metadata,
          retryCount: retryCount + 1,
          nextRetry: Date.now() + backoffDelay,
          lastError: error.message,
        },
      });
    } else {
      // Max retries exceeded
      await runtime.failTask(task.id!, `Failed after ${maxRetries} attempts: ${error.message}`);

      // Optionally notify
      await runtime.sendMessage({
        roomId: task.roomId!,
        content: {
          text: `Task "${task.description}" failed after multiple attempts.`,
          metadata: { taskId: task.id, error: error.message },
        },
      });
    }
  }
};

Task Service

The bootstrap plugin provides a comprehensive task service from @elizaos/plugin-bootstrap/src/services/task.ts:

import { type IAgentRuntime, Service, ServiceType } from "@elizaos/core";

/**
 * TaskService class representing a service that schedules and executes tasks.
 * - Checks for tasks every second (configurable via TICK_INTERVAL)
 * - Manages recurring tasks with updateInterval
 * - Handles one-time tasks that are deleted after execution
 * - Supports task validation before execution
 */
export class TaskService extends Service {
  private timer: NodeJS.Timeout | null = null;
  private readonly TICK_INTERVAL = 1000; // Check every second
  static serviceType = ServiceType.TASK;
  capabilityDescription = "The agent is able to schedule and execute tasks";

  static async start(runtime: IAgentRuntime): Promise<Service> {
    const service = new TaskService(runtime);
    await service.startTimer();
    return service;
  }

  private async checkTasks() {
    try {
      // Get all tasks with "queue" tag
      const allTasks = await this.runtime.getTasks({
        tags: ["queue"],
      });

      // Validate the tasks
      const tasks = await this.validateTasks(allTasks);
      const now = Date.now();

      for (const task of tasks) {
        // Non-repeating tasks execute immediately
        if (!task.tags?.includes("repeat")) {
          await this.executeTask(task);
          continue;
        }

        // Check if recurring task should run
        const taskStartTime = task.updatedAt || task.metadata?.updatedAt || 0;
        const updateIntervalMs = task.metadata?.updateInterval ?? 0;

        if (now - taskStartTime >= updateIntervalMs) {
          await this.executeTask(task);
        }
      }
    } catch (error) {
      logger.error("[Bootstrap] Error checking tasks:", error);
    }
  }

  private async executeTask(task: Task) {
    try {
      const worker = this.runtime.getTaskWorker(task.name);
      if (!worker) {
        logger.debug(`[Bootstrap] No worker found for task type: ${task.name}`);
        return;
      }

      // Handle repeating vs non-repeating tasks
      if (task.tags?.includes("repeat")) {
        // Update timestamp for next run
        await this.runtime.updateTask(task.id, {
          metadata: {
            ...task.metadata,
            updatedAt: Date.now(),
          },
        });
      }

      logger.debug(`[Bootstrap] Executing task ${task.name} (${task.id})`);
      await worker.execute(this.runtime, task.metadata || {}, task);

      // Delete non-repeating tasks after execution
      if (!task.tags?.includes("repeat")) {
        await this.runtime.deleteTask(task.id);
      }
    } catch (error) {
      logger.error(`[Bootstrap] Error executing task ${task.id}:`, error);
    }
  }
}

Best Practices

1. Task Design

  • Keep tasks focused and single-purpose
  • Use descriptive names and descriptions
  • Include relevant metadata for debugging
  • Tag tasks appropriately for organization

2. Idempotency

execute: async (runtime, options, task) => {
  // Check if already processed
  const processed = await runtime.getCache(`task-${task.id}-processed`);
  if (processed) {
    runtime.logger.info("Task already processed", { taskId: task.id });
    return;
  }

  // Process task
  await processTask(options);

  // Mark as processed
  await runtime.setCache(`task-${task.id}-processed`, true);
};

3. Resource Management

execute: async (runtime, options, task) => {
  // Acquire resources
  const lock = await runtime.acquireLock(`resource-${task.roomId}`);

  try {
    // Use resources
    await performOperation();
  } finally {
    // Always release
    await lock.release();
  }
};

4. Monitoring

// Track task metrics
const taskMetrics = {
  startTime: Date.now(),
  taskId: task.id,
  taskName: task.name,
};

try {
  await executeTask();

  taskMetrics.duration = Date.now() - taskMetrics.startTime;
  taskMetrics.status = "success";
} catch (error) {
  taskMetrics.duration = Date.now() - taskMetrics.startTime;
  taskMetrics.status = "failed";
  taskMetrics.error = error.message;
} finally {
  await runtime.logMetrics("task_execution", taskMetrics);
}

5. Testing Tasks

import { testTaskWorker } from "@elizaos/core/testing";

describe("ReminderTaskWorker", () => {
  it("should send reminder at correct time", async () => {
    const task = {
      id: "test-task-1",
      name: "SEND_REMINDER",
      description: "Test reminder",
      metadata: {
        message: "Test message",
        timestamp: Date.now() + 1000,
      },
      roomId: "test-room",
      tags: ["test"],
    };

    const result = await testTaskWorker(reminderTaskWorker, {
      task,
      options: task.metadata,
      expectedMessages: 1,
      expectedCompletion: true,
    });

    expect(result.completed).toBe(true);
    expect(result.messages[0].content.text).toContain("Test message");
  });
});

Common Task Patterns

Scheduled Messages

await runtime.createTask({
  name: "SCHEDULED_MESSAGE",
  metadata: {
    message: "Good morning! Ready to start the day?",
    scheduleTime: getTomorrowMorning(),
    recurring: true,
    timezone: "America/New_York",
  },
});

Data Synchronization

await runtime.createTask({
  name: "SYNC_DATA",
  metadata: {
    updateInterval: 300000, // 5 minutes
    source: "external_api",
    lastSync: Date.now(),
  },
});

Batch Processing

await runtime.createTask({
  name: "BATCH_PROCESS",
  metadata: {
    batchSize: 100,
    totalItems: 1000,
    currentOffset: 0,
    operation: "import_contacts",
  },
});

Next Steps

  • Review Agents to understand the runtime context
  • Explore Actions that create tasks
  • Learn about Evaluators that monitor task performance