AgenTopology

Data Processing

A batch data processing topology with scale blocks, custom tools, metering, and MCP servers

Data Processing

This example demonstrates a batch data processing topology that handles large datasets. It covers scale blocks for concurrency control, custom tools, metering for cost tracking, and MCP server integration.

The Complete Topology

topology data-pipeline : [pipeline, fan-out] {
  meta {
    version: "1.0.0"
    description: "Batch data processing with validation and transformation"
  }

  orchestrator {
    model: sonnet
    handles: [intake, done]
  }

  mcp-servers {
    server database {
      command: "npx"
      args: ["-y", "@modelcontextprotocol/server-postgres"]
      env: { DATABASE_URL: "$DATABASE_URL" }
    }

    server file-store {
      command: "npx"
      args: ["-y", "@modelcontextprotocol/server-filesystem"]
      env: { ROOT_DIR: "./data" }
    }
  }

  agent extractor {
    model: sonnet
    phase: 1
    tools: [Read, Bash, mcp:database]
    writes: ["workspace/raw-data.json"]
    outputs: { record-count: number }
  }

  agent validator {
    model: sonnet
    phase: 2
    tools: [Read, Write]
    reads: ["workspace/raw-data.json"]
    writes: ["workspace/validated-data.json", "workspace/errors.json"]
    outputs: { valid-count: number, error-count: number }
  }

  agent transformer {
    model: sonnet
    phase: 3
    tools: [Read, Write, mcp:file-store]
    reads: ["workspace/validated-data.json"]
    writes: ["workspace/transformed-data.json"]
  }

  agent loader {
    model: haiku
    phase: 4
    tools: [Read, mcp:database, mcp:file-store]
    reads: ["workspace/transformed-data.json"]
    outputs: { rows-loaded: number }
  }

  flow {
    intake -> extractor
    extractor -> validator
    validator -> transformer
    transformer -> loader
    loader -> done
  }

  scale {
    extractor   { concurrency: 1 }
    validator   { concurrency: 4, batch-size: 500 }
    transformer { concurrency: 4, batch-size: 500 }
    loader      { concurrency: 2, batch-size: 100 }
  }

  metering {
    budget: "$5.00"
    alert-at: 80%
    per-agent {
      extractor   { max: "$1.00" }
      validator   { max: "$1.50" }
      transformer { max: "$1.50" }
      loader      { max: "$1.00" }
    }
  }
}

Walkthrough

MCP Servers

mcp-servers {
  server database {
    command: "npx"
    args: ["-y", "@modelcontextprotocol/server-postgres"]
    env: { DATABASE_URL: "$DATABASE_URL" }
  }

  server file-store {
    command: "npx"
    args: ["-y", "@modelcontextprotocol/server-filesystem"]
    env: { ROOT_DIR: "./data" }
  }
}

MCP (Model Context Protocol) servers give agents access to external systems. In this topology:

  • database — provides PostgreSQL read/write capabilities via the MCP protocol
  • file-store — provides structured filesystem access for the data directory

Agents reference MCP servers with the mcp: prefix in their tools list:

agent extractor {
  tools: [Read, Bash, mcp:database]
}

Environment variables use $ syntax to reference values from the runtime environment, keeping secrets out of the topology file.

Custom Tools via MCP

Each MCP server exposes tools that agents can call. The mcp:database tool gives the extractor access to run queries, read schemas, and fetch data. The mcp:file-store tool gives the loader structured access to the output directory.

This is more secure than giving agents raw Bash access — MCP servers can enforce access controls, audit queries, and limit operations.

Scale Blocks

scale {
  extractor   { concurrency: 1 }
  validator   { concurrency: 4, batch-size: 500 }
  transformer { concurrency: 4, batch-size: 500 }
  loader      { concurrency: 2, batch-size: 100 }
}

The scale block controls how agents handle large workloads:

PropertyDescription
concurrencyHow many instances of this agent can run simultaneously
batch-sizeHow many records each instance processes at a time

The extractor runs as a single instance (it reads from one source). The validator and transformer run 4 instances each, processing 500 records per batch. The loader uses lower concurrency (2) with smaller batches (100) to avoid overwhelming the database with writes.

Metering

metering {
  budget: "$5.00"
  alert-at: 80%
  per-agent {
    extractor   { max: "$1.00" }
    validator   { max: "$1.50" }
    transformer { max: "$1.50" }
    loader      { max: "$1.00" }
  }
}

The metering block sets cost controls:

  • budget — total maximum spend for one run of the topology
  • alert-at — percentage of budget that triggers a warning (80% = warn at $4.00)
  • per-agent max — individual agent spending caps

When any limit is reached, AgenTopology pauses execution and reports the overage. This prevents runaway costs during batch processing of large datasets.

Agent Pipeline

The four agents form a classic ETL (Extract, Transform, Load) pipeline with a validation step:

  1. extractor (phase 1) — pulls raw data from the database and writes it to a JSON file
  2. validator (phase 2) — checks data quality, separates valid records from errors
  3. transformer (phase 3) — applies business logic transformations to validated data
  4. loader (phase 4) — writes the final transformed data back to the database

Each agent explicitly declares what it reads and writes, making data dependencies clear and auditable.

Outputs as Metrics

Agents produce structured outputs that serve as pipeline metrics:

agent extractor {
  outputs: { record-count: number }
}

agent validator {
  outputs: { valid-count: number, error-count: number }
}

agent loader {
  outputs: { rows-loaded: number }
}

These outputs are available to the orchestrator and can be used in the final report to summarize the pipeline run: how many records were extracted, how many passed validation, and how many were loaded.

Flow Diagram

intake -> extractor -> validator -> transformer -> loader -> done
             |             |             |           |
          (concur:1)   (concur:4)    (concur:4)  (concur:2)
                       (batch:500)   (batch:500) (batch:100)

Adapting This Example

  • Add error handling — route validator errors to a fixer agent that attempts automatic correction
  • Add a human gate — require approval before the loader writes to a production database
  • Fan-out validation — split validation into parallel schema-validator and business-rule-validator agents
  • Add observability — include an observability block for logging and tracing

On this page