Skip to content

The Job engine

The former Task / BackgroundTask / Workflow abstractions were unified into a single Job aggregate. The runtime path is PgBossJobRunner / InMemoryJobRunner + a StepRegistry + a shared processJob() loop. It runs end-to-end in production.

The pieces

  • Job<T> — the aggregate root. Immutable transitions (start(), complete(result), fail(error)) return a new instance. It carries status, progress, currentStep, itemProgress, result, and a metadata value object. Errors live on job.result?.errors.
  • JobStep — a data-shaped step in the chain. The runtime contract is IWorkflowStep / BaseWorkflowStep.
  • JobPreset — the single source of truth mapping a preset to its JobStep[], plus a per-preset input schema. CreateJobUsecase resolves steps from it server-side, so presets and the runtime can’t drift.
  • JobSchedule — a cron expression + a template; its constructor validates the name and cron.

Runtime selection

apps/api/index.ts selects PgBossJobRunner when the job queue starts successfully; otherwise it falls back to InMemoryJobRunner (dev mode without postgres, or tests). Both runners share the same processJob() step loop, so behavior is identical.

Hexagonal split

PortAdapter(s)
IJobRepositoryDrizzleJobRepository
IJobRunner<T>PgBossJobRunner (default), InMemoryJobRunner (fallback / tests)
IJobScheduleRepositoryDrizzleJobScheduleRepository, InMemoryJobScheduleRepository
IWorkflowStep<T>concrete steps discovered via StepRegistry

Read on: the steps that make up a pipeline, scheduling, and monitoring.