Skip to content

Scheduler

scheduler

Scheduler actor — time-driven dispatch of goals and tasks.

The scheduler is a long-lived actor that reads a YAML config defining cron expressions and fixed-interval timers. When a timer fires, it publishes either an OrchestratorGoal or a TaskMessage to the appropriate NATS subject.

Design
  • Extends BaseActor (long-lived, not TaskWorker)
  • Overrides run() to launch a background timer loop alongside the standard message subscription
  • handle_message() is a minimal no-op (satisfies the ABC)
  • Uses croniter for cron parsing, asyncio.sleep for intervals
  • Graceful shutdown cancels the timer loop via _running flag

NATS subjects:

  • Subscribes to: loom.scheduler.{name} (health checks / future control)
  • Publishes to: loom.goals.incoming (for dispatch_type "goal") or loom.tasks.incoming (for dispatch_type "task")

ScheduleEntry dataclass

ScheduleEntry(name: str, cron: str | None, interval_seconds: float | None, dispatch_type: str, goal_config: dict[str, Any] | None = None, task_config: dict[str, Any] | None = None, expand_from: str | None = None, next_fire: float = 0.0)

Parsed schedule entry from YAML config.

If expand_from is set, the scheduler calls the referenced function before each fire. The function must return a list of context dicts. One goal/task is dispatched per dict, with the context merged into the payload (for tasks) or context (for goals). This enables per-session dispatch where the expansion function queries for active sessions and returns [{"session_id": "s1"}, {"session_id": "s2"}].

SchedulerActor

SchedulerActor(actor_id: str, config_path: str, nats_url: str = 'nats://nats:4222', *, bus: MessageBus | None = None)

Bases: BaseActor

Time-driven actor that dispatches goals and tasks on schedule.

All schedules are defined at startup via YAML config. The actor maintains a background timer loop that checks schedules every second and fires due entries by publishing to the appropriate NATS subject.

Source code in src/loom/scheduler/scheduler.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    nats_url: str = "nats://nats:4222",
    *,
    bus: MessageBus | None = None,
) -> None:
    super().__init__(actor_id, nats_url, bus=bus)
    self.config = self._load_config(config_path)
    self._schedules: list[ScheduleEntry] = self._parse_schedules(
        self.config.get("schedules", [])
    )
    self._timer_task: asyncio.Task | None = None

run async

run(subject: str, queue_group: str | None = None) -> None

Start the scheduler with background timer loop and subscription.

Source code in src/loom/scheduler/scheduler.py
async def run(self, subject: str, queue_group: str | None = None) -> None:
    """Start the scheduler with background timer loop and subscription."""
    self._shutdown_event = asyncio.Event()
    self._semaphore = asyncio.Semaphore(self.max_concurrent)

    await self.connect()
    await self.subscribe(subject, queue_group)
    self._running = True
    self._install_signal_handlers()

    self._initialize_fire_times()

    logger.info(
        "scheduler.running",
        actor_id=self.actor_id,
        subject=subject,
        schedule_count=len(self._schedules),
    )

    # Launch background timer loop
    self._timer_task = asyncio.create_task(self._timer_loop())
    self._background_tasks: set[asyncio.Task[None]] = set()

    try:
        # Standard subscription loop (mirrors BaseActor.run)
        async for data in self._sub:
            if not self._running:
                break
            if self.max_concurrent == 1:
                await self._process_one(data)
            else:
                task = asyncio.create_task(self._process_one(data))
                self._background_tasks.add(task)
                task.add_done_callback(self._background_tasks.discard)
    except asyncio.CancelledError:
        pass
    finally:
        self._running = False
        if self._timer_task and not self._timer_task.done():
            self._timer_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._timer_task
        await self.disconnect()

handle_message async

handle_message(data: dict[str, Any]) -> None

No-op message handler. The scheduler is timer-driven.

Satisfies BaseActor's abstract requirement. A future enhancement could respond to health-check or status queries here.

Source code in src/loom/scheduler/scheduler.py
async def handle_message(self, data: dict[str, Any]) -> None:
    """No-op message handler.  The scheduler is timer-driven.

    Satisfies BaseActor's abstract requirement.  A future enhancement
    could respond to health-check or status queries here.
    """
    logger.debug(
        "scheduler.message_received",
        actor_id=self.actor_id,
        keys=list(data.keys()),
    )