Compose AI agents with functional programming primitives. Built on Vercel AI SDK.
An agent is a function — (input: string) => Promise<Result<O, AgentError>> —
created with a tiny builder and composed with pipe, fanOut, zip,
parallelSettled, branch, and memoize. No classes, no decorators, no
runtime magic. Outputs are validated by Zod. Errors are values, not exceptions.
📐 New here?
docs/ARCHITECTURE.mdis a visual mental map of the library: the central type, the 5 combinators side by side, the data flow of a typical call, and the design decisions behind it all. Read that first if you want to navigate the codebase.
// The power of agentic-fp on a single screen:
// agents are functions, errors are data, everything composes.
import { agent, pipe, branch, memoize, withTools, agentAsTool } from 'agentic-fp'
import { isOk, formatError } from 'agentic-fp/result'
// 1. An agent is just: (input: string) => Promise<Result<O, AgentError>>
const classify = agent({ /* ...returns { category: 'refund' | 'tracking' | 'sales' } */ })
const translate = agent({ /* ...returns string (EN → ES) */ })
const summarize = agent({ /* ...returns string */ })
const salesRep = agent({ /* ...returns string */ })
const supportRep = agent({ /* ...returns string */ })
// 2. Pure tools: just a schema + a function. No framework.
const lookupOrder = { description: '...', inputSchema: /*z*/, execute: ({id}) => db.orders[id] }
const computeRefund = { description: '...', inputSchema: /*z*/, execute: ({ids}) => sum(ids) }
// 3. A sub-agent becomes a tool in ONE line.
const summarizeTool = agentAsTool(summarize, {
description: 'Summarize a case in one sentence',
inputSchema: /*z*/,
inputToString: ({ text }) => text,
})
// 4. An agent with tools is still... an agent. Same signature.
const concierge = withTools(
agent({ /* concierge config */ }),
{ lookupOrder, computeRefund, fileTicket: summarizeTool },
{ maxSteps: 8 }
)
// 5. Here's the magic: everything is composable because everything is Agent<O>.
// A real pipeline, no orchestrator, no classes, no hidden state.
const supportFlow = pipe(
translate, // EN → ES
memoize( // cache by identical input
branch(classify, { // route by category
refund: concierge, // ← an agent with tools
tracking: concierge, // ← reused, not duplicated
sales: pipe(salesRep, summarize) // ← another nested pipeline
})
)
)
// 6. Call it like any function. Errors are data, not exceptions.
const result = await supportFlow('I was charged twice for order A-1003')
if (isOk(result)) {
console.log('reply:', result.value)
} else {
// result.error is a discriminated sum type: timeout | rate_limit |
// schema_validation | model_error | composition | tool_loop_exceeded
console.error(formatError(result.error))
}What you just saw, and what you did NOT have to think about:
- Parallelism: tools in the same turn run in parallel automatically.
- Errors: every step returns
Result; a failure intranslateshort-circuits the pipeline and surfaces ascomposition({ step: 0, cause })— notry/catch. - Smart cache: if
supportFlowfails,memoizedoes NOT cache the error. - Reentrancy:
conciergeis used in two branches ofbranch— no shared state because it's a pure function. - Observability:
configure({ onEvent })intercepts EVERY event (start,success,error,branch.*,tool.*) without touching the flow. - Types:
result.valuehas the correct type inferred throughpipe → memoize → branch → withTools → agentAsTool. End-to-end.
The flow is an expression, not a program. You read top to bottom and understand what it does. No class, no scattered awaits, no try/catch, no setup. And yet: parallelism, cache, typed errors, tools, sub-agents, observability — all in.
npm install agentic-fp ai zod
# Plus at least one provider:
npm install @ai-sdk/anthropic # or @ai-sdk/openai, etc.Requires Node.js ≥ 22.
An Agent<O> is a pure function:
type Agent<O> = (input: string) => Promise<Result<O, AgentError>>Input is always string. Output is whatever the Zod schema infers. Errors
are returned as values, never thrown.
type Result<T, E> =
| { ok: true; value: T }
| { ok: false; error: E }Helpers in agentic-fp/result: ok, err, isOk, isErr, map, flatMap,
mapErr, unwrap.
Discriminated by kind:
type AgentError =
| { kind: 'timeout'; agent: string }
| { kind: 'rate_limit'; agent: string; retryAfter?: number }
| { kind: 'schema_validation'; agent: string; issues: ZodIssue[] }
| { kind: 'model_error'; agent: string; message: string }
| { kind: 'composition'; step: number; cause: AgentError }agent({...}) accepts five required fields: name, model, description,
instruction, outputSchema. When all five are present, it returns the
Agent<O> directly. You can also feed it partial objects and finish the build
in multiple calls — currying is automatic.
const a = agent({
name: 'classifier',
model: haiku,
description: 'classifies sentiment',
instruction: 'Classify the input as positive, negative, or neutral.',
outputSchema: z.object({
sentiment: z.enum(['positive', 'negative', 'neutral']),
confidence: z.number().min(0).max(1),
}),
modelParams: { temperature: 0, maxTokens: 200 },
onEvent: (e) => console.log(e),
})
const r = await a('I love this product!')outputSchema: z.string() is treated specially and uses generateText; any
other schema uses generateObject (structured output / tool use).
import { pipe } from 'agentic-fp/combinators'
const flow = pipe(
extractor, // Agent<{ products: string[] }>
({ products }) => products.join(', '), // adapter: pure, sync, infallible
summarizer, // Agent<string>
)Adapters between agents are synchronous, infallible functions
(input) => string. If you need async or fallible work, wrap it in another
agent. Two consecutive agents are allowed only when the previous output is
string.
If any agent in the pipe returns an error, the pipe short-circuits and wraps
the error in compositionError(step, cause).
import { fanOut } from 'agentic-fp/combinators'
const triage = fanOut([sentiment, entities, language] as const)
// ^ Agent<[Sentiment, Entities, Language]>
const r = await triage('Apple launched a new iPhone today.')Returns a tuple of outputs. If any agent fails, the result is Err with that
agent's error.
import { zip } from 'agentic-fp/combinators'
const z2 = zip([translator, detector] as const)
// ^ (inputs: [string, string]) => Promise<Result<[Translation, Lang], AgentError>>
const r = await z2(['Hola mundo', 'Bonjour'])Note: zip returns a function that takes a tuple of inputs — it is not
itself an Agent.
import { parallelSettled } from 'agentic-fp/combinators'
const all = parallelSettled([a, b, c] as const)
const r = await all('input')
// Ok(tuple) if all succeed; Err(AgentError[]) with every failure if any fail.Difference with fanOut: parallelSettled waits for all to finish and
collects every error. Useful when you want partial visibility of a batch.
A router agent picks one of N branches and forwards an input to it. All
branches share the same output type, so branch(...) is itself an
Agent<O> and composes with the rest of the combinators.
import { branch } from 'agentic-fp/combinators'
// Each branch is an Agent<O> with the same O.
const flow = branch(router, { ventas, soporte, facturacion })
// flow: Agent<{ answer: string }>The router must return a single-key object. The key picks the branch; the value is the input passed to that branch. TypeScript forces exhaustiveness — the router output union and the branches map must match exactly.
In practice, the cleanest router is a small classifier agent + an adapter to the discriminated shape:
const classifier = agent({
/* ... */
outputSchema: z.object({
category: z.enum(['ventas', 'soporte', 'facturacion']),
query: z.string(),
}),
})
type RouterOut =
| { ventas: string }
| { soporte: string }
| { facturacion: string }
const router = async (input: string) => {
const r = await classifier(input)
if (!r.ok) return r
const { category, query } = r.value
return { ok: true as const, value: { [category]: query } as RouterOut }
}
const flow = branch(router, { ventas, soporte, facturacion })Errors from the router and the selected branch propagate unwrapped
(no compositionError envelope). Only runtime safety nets — router
returning the wrong number of keys, or a key not in the branches map —
surface as compositionError(0, modelError(...)).
Events emitted via the global onEvent hook: branch.start,
branch.selected { key }, branch.complete { key, durationMs },
branch.error { key, error, durationMs }.
See examples/05-branch.ts for a runnable end-to-end example.
import { withTools, agentAsTool } from 'agentic-fp/combinators'
import type { Tool } from 'agentic-fp/combinators'
const add: Tool<{ a: number; b: number }, number> = {
description: 'Add two integers and return the sum.',
inputSchema: z.object({ a: z.number(), b: z.number() }),
execute: ({ a, b }) => a + b,
}
const calculator = agent({
name: 'calculator',
model: anthropic('claude-haiku-4-5-20251001'),
description: 'Solves arithmetic step by step.',
instruction: 'Use the tools for every operation.',
outputSchema: z.string(),
})
const calc = withTools(calculator, { add }, { maxSteps: 8 })
// calc: Agent<string>withTools(agent, tools, opts?) decorates an Agent<string> with the
ability to call tools. The SDK runs the LLM ↔ tool_use ↔ tool_result
loop for up to maxSteps iterations (default 10). Tools requested in
the same turn are executed in parallel.
Tool shape:
type Tool<I, O> = {
description: string
inputSchema: z.ZodType<I>
execute: (input: I) =>
| O | Promise<O>
| Result<O, ToolError> | Promise<Result<O, ToolError>>
timeoutMs?: number // default 180_000 (3 min) per call
}execute may return a plain value, a Promise, or a Result<O, ToolError>.
Non-string outputs are JSON.stringify'd before being fed back to the
model. Each call is bounded by timeoutMs (or 3 min by default).
Resilience by default: tool failures (schema invalid, timeout, thrown
exceptions, returned Result.err) are reinjected to the model as error
tool results so it can recover. The bucle only fails as a whole when
maxSteps is exceeded — then withTools returns
err({ kind: 'tool_loop_exceeded', agent, steps, lastError? }).
Sub-agents as tools: use agentAsTool to expose any Agent<O>
through the Tool<I, O> interface, with inputToString translating the
structured input into the string the wrapped agent expects.
const summarize = agentAsTool(summarizerAgent, {
description: 'Summarize a piece of text in one sentence.',
inputSchema: z.object({ text: z.string() }),
inputToString: ({ text }) => text,
})
const writer = withTools(writerAgent, { summarize })Tool events:
type AgentEvent =
| /* existing */
| { type: 'tool.start'; agent; tool; input; timestamp }
| { type: 'tool.result'; agent; tool; durationMs; output }
| { type: 'tool.error'; agent; tool; durationMs; error: ToolError }
| { type: 'tool.loop_exceeded'; agent; steps; timestamp }v0.3.0 limitations (deliberate, see ARCHITECTURE.md):
- Output schema for
withToolsis restricted toz.string(). Structured outputs around the loop are out of scope for v1. - No
AbortSignal/ cancellation. - Reentrant
withTools(withTools(a, t1), t2)is unsupported (undefined behaviour).
import { memoize } from 'agentic-fp/combinators'
const cachedClassifier = memoize(classifier)
// cachedClassifier: Agent<O> with the same shape as classifiermemoize: Agent<O> → Agent<O> returns a new agent that caches successful
results keyed by the input string. Two consumers calling the agent with the
same input share a single underlying call:
- Hits return the cached value immediately, without invoking the inner agent.
- Failures are NOT cached — a transient error today should not block a future success. The next call with the same input re-runs the inner agent.
- In-flight requests coalesce: concurrent calls with the same input share the same promise, so the inner agent is invoked exactly once.
Pass { maxSize } to cap the cache (LRU — a hit moves the entry to the most-recent position):
const cached = memoize(classifier, { maxSize: 1024 })Default cache is unbounded — fine for short-lived processes (CLIs, scripts);
set maxSize for long-running services. The cache is per-agent (each
memoize(...) call returns a fresh agent with its own internal cache).
memoize is itself an Agent<O>, so it composes with every other combinator:
const cachedAndRouted = branch(memoize(router), { ventas, soporte })Pass modelParams and onEvent directly in the agent({...}) config.
import { configure } from 'agentic-fp/config'
configure({
onEvent: (e) => myLogger.log(e),
})The cascade is: per-agent → global (configure) → SDK env vars
(ANTHROPIC_API_KEY, OPENAI_API_KEY, …). Both per-agent and global
onEvent hooks run; per-agent first, then global. Hook exceptions are
swallowed so they never break the agent.
Three event types per agent invocation:
type AgentEvent =
| { type: 'start'; agent: string; input: string; timestamp: number }
| { type: 'success'; agent: string; output: unknown;
latencyMs: number; usage?: { inputTokens: number; outputTokens: number } }
| { type: 'error'; agent: string; error: AgentError; latencyMs: number }Combinators (pipe, fanOut, zip, parallelSettled, memoize) do not
emit their own events — only the agents underneath do. Two exceptions:
branch emits branch.start / branch.selected / branch.complete /
branch.error, and withTools emits tool.start / tool.result /
tool.error / tool.loop_exceeded for the tool-use loop.
Tree-shakeable entry points:
| Import | What you get |
|---|---|
agentic-fp |
agent, types |
agentic-fp/combinators |
pipe, fanOut, zip, parallelSettled, branch, memoize, withTools, agentAsTool |
agentic-fp/result |
Result, ok, err, isOk, isErr, map, flatMap, mapErr, unwrap |
agentic-fp/config |
configure, getGlobalConfig, resetConfig |
agentic-fp/errors |
AgentError, constructors, formatError |
ESM and CJS builds are emitted; .d.ts and .d.cts are both shipped.
- No streaming. Use
generateText/generateObjectsemantics; partials are not exposed. descriptionis metadata only. Reserved for v2 (router/handoff). Not injected into the system prompt.pipetyping covers up to 8 stages with explicit overloads. Beyond that the result falls back toAgent<unknown>.- Cancellation in
fanOutis best-effort. When one agent fails, others in flight are not aggressively aborted.
npm test # types + unit (109 tests)
npm run test:e2e # E2E against Anthropic (requires .env.local)
npm run test:e2e:stability # 3 consecutive E2E runs, fail on flakecp .env.example .env.local
# Edit .env.local and set ANTHROPIC_API_KEY=sk-ant-....env.local is gitignored. The E2E config (vitest.e2e.config.ts) loads
it automatically.
E2E tests use claude-haiku-4-5-20251001 with temperature: 0 and
maxTokens ≤ 200. A typical run costs less than $0.01 USD
(see COST.md for the budget).
tests/e2e/_setup.ts registers a global onEvent hook that aggregates
token usage and prints a per-file summary at the end:
─── E2E telemetry ───
success calls : 5
error calls : 1
input tokens : 822
output tokens : 84
total latency : 4809ms
est. cost : $0.001242 USD
─────────────────────
ANTHROPIC_RATE_LIMIT_TEST=1 npm run test:e2eBursts ~40 concurrent requests to provoke a real 429 and assert the mapping
to kind: 'rate_limit'. Skipped by default to preserve quota.
npm run audit:secretsGreps tracked + untracked-non-ignored files for sk-ant-, sk- (OpenAI),
xoxb-, AKIA… patterns. A unit test also verifies that .env.local is
covered by .gitignore.
Recommended setup:
- Unit suite is always blocking:
npm test. - E2E suite is optional: only runs when the
ANTHROPIC_API_KEYsecret is available. In GitHub Actions:
- name: E2E (optional)
if: ${{ secrets.ANTHROPIC_API_KEY != '' }}
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
run: npm run test:e2eNever echo or print the secret. The audit script is a cheap pre-merge check to catch accidental commits of credentials.
Runnable examples live under examples/. Each is a single
TypeScript file you can run with tsx once your .env.local is set up.
npx tsx examples/01-quick-start.ts
npx tsx examples/02-pipe.ts
npx tsx examples/03-fanout.ts
npx tsx examples/04-parallel-settled.ts
npx tsx examples/05-branch.ts
npx tsx examples/06-memoize.tsFor a visual mental map of the library — the central type, the 5 combinators
side by side, the data flow of a typical call, and the design decisions —
see docs/ARCHITECTURE.md.
MIT.