Concurrency + 429 retry
Run independent nodes in parallel, fan-out iterations concurrently, and automatically back off on rate-limit errors.
When you need this
- Your graph has multiple nodes with no data dependency between them and you want them to run in parallel.
- A
for_eachfan-out calls an LLM and you want N simultaneous calls instead of sequential. - You are hitting 429 rate-limit errors from the Anthropic or OpenAI API.
- You want to tune the wall-clock time of a multi-node experience.
The minimal example
Enable global parallelism for the experience:
runtime:
concurrency: 4All nodes whose predecessors have already completed are collected into a "wave" and dispatched up to concurrency at a time. No code changes required.
For for_each fan-outs, set concurrency per node:
- id: review
kind: agent
prompt: ./prompts/review.md
for_each:
source: $.dimensions
concurrency: 3
reads: [diff, dimensions]
writes: [findings]How it works
ParallelScheduler (packages/core/src/graph/parallel-scheduler.ts) extends SequentialScheduler. When runtime.concurrency > 1, it replaces the sequential topological walk with a wave-based loop:
- Build a map of each node's unmet predecessor count (counting only nodes in the main DAG pass, excluding pipeline stage nodes and loop body nodes).
- Collect all nodes where the unmet count is 0 — these are "ready".
- Dispatch the ready wave through
runWithLimit(waveItems, concurrency, fn). - After the wave completes, decrement successor counts for each completed node.
- Repeat until no nodes remain.
runWithLimit (packages/core/src/graph/scheduler.ts) is a simple worker-pool: it starts min(limit, items.length) concurrent workers, each pulling from a shared index. This gives natural backpressure — a new item starts only when a worker slot frees.
for_each.concurrency uses the same runWithLimit function, scoped to the iterations of a single node. The global runtime.concurrency and the per-node for_each.concurrency are independent — both apply simultaneously if set.
Pipeline and loop nodes are always run sequentially by the inherited pass, regardless of runtime.concurrency.
429 retry is handled in AnthropicLLMClient.callWithRetry and OpenAILLMClient.callWithRetry: up to 4 attempts by default, with exponential backoff starting at 1 second (1s, 2s, 4s, 8s). This is separate from the node-level on_error: { policy: retry } — it operates at the HTTP call level, transparently to the scheduler.
Variations
Sequential (default) — omit runtime entirely, or set concurrency: 1:
runtime:
concurrency: 1Aggressive parallel with per-node fan-out:
runtime:
concurrency: 8
graph:
nodes:
- id: fetch_issues
kind: tool
impl: ./tools/fetch.mjs
writes: [issues]
- id: triage
kind: agent
prompt: ./prompts/triage.md
for_each:
source: $.issues
concurrency: 5
reads: [issues]
writes: [labels]Here up to 8 top-level waves run in parallel, and within the triage node, up to 5 issue-triage calls run simultaneously.
Custom retry settings at the LLM-client level (programmatic API):
import { AnthropicLLMClient } from '@openexpertise/node-kinds-agent'
const client = new AnthropicLLMClient({
retry: { max_attempts: 6, base_ms: 500 },
})Node-level retry for transient errors (network flakes, not just 429s):
- id: call_api
kind: tool
impl: ./tools/call-api.mjs
on_error:
policy: retry
attempts: 3
backoff: exponential
base_ms: 200Gotchas
- State writes are not transactional across concurrent nodes. If two nodes write the same field simultaneously, the last write wins (SQLite serializes writes, but order is non-deterministic). Use
merge: array_appendfor fields written by fan-out iterations. runtime.concurrencyapplies to the main DAG pass only. Pipeline stages and loop bodies are always sequential.- The 429 retry inside the LLM client is invisible to the scheduler. A node does not emit
node.failedduring a 429 retry — it just takes longer. The node-levelon_errorretry fires only if the LLM client exhausts all its attempts and throws. - Setting
for_each.concurrencyhigh whileruntime.concurrencyis 1 means multiple iterations of that one node run in parallel, but no two different nodes run at the same time. Both dials are independent.
See also
- Scheduler concept
- Error policies (on_error) — retry at the node level
- Resume + cache — skip completed nodes on re-run
oe runCLI reference