Data Processing
A batch data processing topology with scale blocks, custom tools, metering, and MCP servers
Data Processing
This example demonstrates a batch data processing topology that handles large datasets. It covers scale blocks for concurrency control, custom tools, metering for cost tracking, and MCP server integration.
The Complete Topology
topology data-pipeline : [pipeline, fan-out] {
meta {
version: "1.0.0"
description: "Batch data processing with validation and transformation"
}
orchestrator {
model: sonnet
handles: [intake, done]
}
mcp-servers {
server database {
command: "npx"
args: ["-y", "@modelcontextprotocol/server-postgres"]
env: { DATABASE_URL: "$DATABASE_URL" }
}
server file-store {
command: "npx"
args: ["-y", "@modelcontextprotocol/server-filesystem"]
env: { ROOT_DIR: "./data" }
}
}
agent extractor {
model: sonnet
phase: 1
tools: [Read, Bash, mcp:database]
writes: ["workspace/raw-data.json"]
outputs: { record-count: number }
}
agent validator {
model: sonnet
phase: 2
tools: [Read, Write]
reads: ["workspace/raw-data.json"]
writes: ["workspace/validated-data.json", "workspace/errors.json"]
outputs: { valid-count: number, error-count: number }
}
agent transformer {
model: sonnet
phase: 3
tools: [Read, Write, mcp:file-store]
reads: ["workspace/validated-data.json"]
writes: ["workspace/transformed-data.json"]
}
agent loader {
model: haiku
phase: 4
tools: [Read, mcp:database, mcp:file-store]
reads: ["workspace/transformed-data.json"]
outputs: { rows-loaded: number }
}
flow {
intake -> extractor
extractor -> validator
validator -> transformer
transformer -> loader
loader -> done
}
scale {
extractor { concurrency: 1 }
validator { concurrency: 4, batch-size: 500 }
transformer { concurrency: 4, batch-size: 500 }
loader { concurrency: 2, batch-size: 100 }
}
metering {
budget: "$5.00"
alert-at: 80%
per-agent {
extractor { max: "$1.00" }
validator { max: "$1.50" }
transformer { max: "$1.50" }
loader { max: "$1.00" }
}
}
}Walkthrough
MCP Servers
mcp-servers {
server database {
command: "npx"
args: ["-y", "@modelcontextprotocol/server-postgres"]
env: { DATABASE_URL: "$DATABASE_URL" }
}
server file-store {
command: "npx"
args: ["-y", "@modelcontextprotocol/server-filesystem"]
env: { ROOT_DIR: "./data" }
}
}MCP (Model Context Protocol) servers give agents access to external systems. In this topology:
- database — provides PostgreSQL read/write capabilities via the MCP protocol
- file-store — provides structured filesystem access for the data directory
Agents reference MCP servers with the mcp: prefix in their tools list:
agent extractor {
tools: [Read, Bash, mcp:database]
}Environment variables use $ syntax to reference values from the runtime environment, keeping secrets out of the topology file.
Custom Tools via MCP
Each MCP server exposes tools that agents can call. The mcp:database tool gives the extractor access to run queries, read schemas, and fetch data. The mcp:file-store tool gives the loader structured access to the output directory.
This is more secure than giving agents raw Bash access — MCP servers can enforce access controls, audit queries, and limit operations.
Scale Blocks
scale {
extractor { concurrency: 1 }
validator { concurrency: 4, batch-size: 500 }
transformer { concurrency: 4, batch-size: 500 }
loader { concurrency: 2, batch-size: 100 }
}The scale block controls how agents handle large workloads:
| Property | Description |
|---|---|
concurrency | How many instances of this agent can run simultaneously |
batch-size | How many records each instance processes at a time |
The extractor runs as a single instance (it reads from one source). The validator and transformer run 4 instances each, processing 500 records per batch. The loader uses lower concurrency (2) with smaller batches (100) to avoid overwhelming the database with writes.
Metering
metering {
budget: "$5.00"
alert-at: 80%
per-agent {
extractor { max: "$1.00" }
validator { max: "$1.50" }
transformer { max: "$1.50" }
loader { max: "$1.00" }
}
}The metering block sets cost controls:
- budget — total maximum spend for one run of the topology
- alert-at — percentage of budget that triggers a warning (80% = warn at $4.00)
- per-agent max — individual agent spending caps
When any limit is reached, AgenTopology pauses execution and reports the overage. This prevents runaway costs during batch processing of large datasets.
Agent Pipeline
The four agents form a classic ETL (Extract, Transform, Load) pipeline with a validation step:
- extractor (phase 1) — pulls raw data from the database and writes it to a JSON file
- validator (phase 2) — checks data quality, separates valid records from errors
- transformer (phase 3) — applies business logic transformations to validated data
- loader (phase 4) — writes the final transformed data back to the database
Each agent explicitly declares what it reads and writes, making data dependencies clear and auditable.
Outputs as Metrics
Agents produce structured outputs that serve as pipeline metrics:
agent extractor {
outputs: { record-count: number }
}
agent validator {
outputs: { valid-count: number, error-count: number }
}
agent loader {
outputs: { rows-loaded: number }
}These outputs are available to the orchestrator and can be used in the final report to summarize the pipeline run: how many records were extracted, how many passed validation, and how many were loaded.
Flow Diagram
intake -> extractor -> validator -> transformer -> loader -> done
| | | |
(concur:1) (concur:4) (concur:4) (concur:2)
(batch:500) (batch:500) (batch:100)Adapting This Example
- Add error handling — route
validatorerrors to afixeragent that attempts automatic correction - Add a human gate — require approval before the loader writes to a production database
- Fan-out validation — split validation into parallel schema-validator and business-rule-validator agents
- Add observability — include an
observabilityblock for logging and tracing