anth-load-scale

Implement load testing, auto-scaling, and capacity planning for Claude API. Use when running performance benchmarks, planning for traffic spikes, or configuring horizontal scaling for Claude-powered services. Trigger with phrases like "anthropic load test", "claude scaling", "anthropic capacity planning", "scale claude api".

claude-code
5 Tools
anthropic-pack Plugin
saas packs Category

Allowed Tools

ReadWriteEditBash(npm:*)Grep

Provided by Plugin

anthropic-pack

Claude Code skill pack for Anthropic (30 skills)

saas packs v1.0.0
View Plugin

Installation

This skill is included in the anthropic-pack plugin:

/plugin install anthropic-pack@claude-code-plugins-plus

Click to copy

Instructions

Anthropic Load & Scale

Overview

Capacity planning and load testing for Claude API integrations. Key constraint: your rate limits (RPM/ITPM/OTPM) are the ceiling, not your infrastructure.

Capacity Planning


# Calculate required tier based on traffic
def plan_capacity(
    requests_per_minute: int,
    avg_input_tokens: int,
    avg_output_tokens: int,
    model: str = "claude-sonnet-4-20250514"
) -> dict:
    itpm = requests_per_minute * avg_input_tokens
    otpm = requests_per_minute * avg_output_tokens

    # Estimate monthly cost
    pricing = {
        "claude-haiku-4-20250514": (0.80, 4.00),
        "claude-sonnet-4-20250514": (3.00, 15.00),
        "claude-opus-4-20250514": (15.00, 75.00),
    }
    rates = pricing[model]
    cost_per_request = (avg_input_tokens * rates[0] + avg_output_tokens * rates[1]) / 1_000_000
    monthly_cost = cost_per_request * requests_per_minute * 60 * 24 * 30

    return {
        "rpm_needed": requests_per_minute,
        "itpm_needed": itpm,
        "otpm_needed": otpm,
        "cost_per_request": f"${cost_per_request:.4f}",
        "monthly_estimate": f"${monthly_cost:,.0f}",
        "recommendation": "Contact Anthropic sales for Scale tier" if requests_per_minute > 500 else "Self-serve tiers sufficient",
    }

print(plan_capacity(100, 500, 200))

Load Testing Script


import anthropic
import asyncio
import time
from dataclasses import dataclass

@dataclass
class LoadTestResult:
    total_requests: int = 0
    successful: int = 0
    failed: int = 0
    rate_limited: int = 0
    avg_latency_ms: float = 0
    p99_latency_ms: float = 0
    total_input_tokens: int = 0
    total_output_tokens: int = 0

async def load_test(
    concurrency: int = 10,
    total_requests: int = 100,
    model: str = "claude-haiku-4-20250514"
) -> LoadTestResult:
    client = anthropic.Anthropic()
    result = LoadTestResult()
    latencies = []
    semaphore = asyncio.Semaphore(concurrency)

    async def single_request():
        async with semaphore:
            start = time.monotonic()
            try:
                msg = client.messages.create(
                    model=model,
                    max_tokens=64,
                    messages=[{"role": "user", "content": "Respond with exactly: OK"}]
                )
                duration = (time.monotonic() - start) * 1000
                latencies.append(duration)
                result.successful += 1
                result.total_input_tokens += msg.usage.input_tokens
                result.total_output_tokens += msg.usage.output_tokens
            except anthropic.RateLimitError:
                result.rate_limited += 1
            except Exception:
                result.failed += 1
            result.total_requests += 1

    tasks = [single_request() for _ in range(total_requests)]
    await asyncio.gather(*tasks)

    if latencies:
        latencies.sort()
        result.avg_latency_ms = sum(latencies) / len(latencies)
        result.p99_latency_ms = latencies[int(len(latencies) * 0.99)]

    return result

# Run: asyncio.run(load_test(concurrency=10, total_requests=50))

Scaling Strategies

Strategy When Implementation
Queue-based processing > 50 RPM sustained Redis/SQS queue + worker pool
Model routing Mixed workloads Haiku for simple, Sonnet for complex
Message Batches Offline processing 100K requests, 50% cheaper, no RPM impact
Prompt caching Repeated system prompts 90% input token savings
Request coalescing Duplicate prompts Cache identical request hashes

Horizontal Scaling Pattern


# Multiple application instances sharing the same API key
# Rate limits are per-organization, NOT per-instance
# Use a shared rate limiter (Redis) to coordinate

import redis

r = redis.Redis()

def check_rate_limit(key: str = "claude:rpm", limit: int = 100, window: int = 60) -> bool:
    current = r.incr(key)
    if current == 1:
        r.expire(key, window)
    return current <= limit

Error Handling

Issue Cause Fix
429 during load test Exceeded tier limits Reduce concurrency or upgrade tier
Increasing latency under load Output queue saturation Reduce max_tokens
Uneven request distribution No load balancing Use queue for fair distribution

Resources

Next Steps

For reliability patterns, see anth-reliability-patterns.

Ready to use anthropic-pack?