langchain-langgraph-streaming

Pick the correct LangGraph 1.0 stream_mode ("messages" vs "updates" vs "values"), wire it into SSE or WebSocket without proxy-buffering gotchas, and filter astream_events(v2) server-side before forwarding to the browser. Use when building a live-token chat UI, a per-node progress bar, a debug/time-travel view, or diagnosing a LangGraph stream that hangs over a production proxy. Trigger with "langgraph streaming", "stream_mode messages", "stream_mode updates", "stream_mode values", "langgraph SSE", "langgraph astream_events", "SSE hangs behind nginx", "cloud run streaming".

claude-codecodex
4 Tools
langchain-py-pack Plugin
saas packs Category

Allowed Tools

ReadWriteEditBash(python:*)

Provided by Plugin

langchain-py-pack

Claude Code skill pack for LangChain 1.0 + LangGraph 1.0 (Python) - 34 skills covering chains, agents, RAG, middleware, checkpointing, HITL, streaming, and production patterns

saas packs v2.0.0
View Plugin

Installation

This skill is included in the langchain-py-pack plugin:

/plugin install langchain-py-pack@claude-code-plugins-plus

Click to copy

Instructions

LangGraph Streaming (Python)

Overview

An engineer ships stream_mode="values" to a token-level chat UI because it

"seemed the most complete." Every single token causes the full graph state —

message history, scratchpad, plan — to be re-sent and re-rendered. At ~60

tokens/sec the browser overdraws, the React reconciler can't keep up, the tab

freezes, and users blame the model. The correct answer was stream_mode="messages",

which emits an AIMessageChunk delta per token (typically 5-50 bytes) — one

token's worth of DOM work. This is pain-catalog entry P19 and it is the #1

LangGraph integration mistake in the 1.0 generation.

Then the same UI ships to Cloud Run and hangs forever. No error. No logs. The

server is emitting tokens; they just never reach the browser. Default proxy

buffering (Nginx, Cloud Run's HTTP/1.1 path, Cloudflare Free) holds the last

chunk waiting for more bytes. This is P46 — SSE streams from LangGraph

drop the final end event over proxies that buffer — and the fix is three

headers: X-Accel-Buffering: no, Cache-Control: no-cache, Connection: keep-alive.

And then the debug view starts crashing browser tabs on long runs. The engineer

forwarded astream_events(version="v2") raw to the client because "it has more

detail" — but v2 emits thousands of events per invocation (per-token, per-node,

per-runnable lifecycle), and a 60-second agent run easily hits 3,000 events.

Browsers freeze on the JSON deserialize queue. This is P47 — filter

server-side, forward only onchatmodel_stream tokens (and optionally

ontoolstart / ontoolend).

This skill ships the decision matrix, a production-grade FastAPI SSE endpoint

with the anti-buffering headers and a 15-second heartbeat, a server-side v2

event filter that drops ~90% of noise, and a WebSocket variant with

reconnect-by-thread_id that resumes from the LangGraph checkpointer. Pin:

langgraph 1.0.x, langchain-core 1.0.x. Pain-catalog anchors: **P19, P46,

P47, P48, P67**, plus P16 for the thread_id rule and P22 for checkpointer

persistence.

Prerequisites

  • Python 3.10+
  • langgraph >= 1.0, < 2.0, langchain-core >= 1.0, < 2.0
  • fastapi >= 0.110, uvicorn[standard] (for SSE/WebSocket hosting)
  • A checkpointer: langgraph.checkpoint.memory.MemorySaver for dev, or

langgraph.checkpoint.postgres.PostgresSaver for prod

  • Access to deploy behind your actual proxy (Nginx / Cloud Run / Cloudflare) —

localhost does not reproduce the buffering class of bugs

Instructions

Step 1 — Pick the right stream_mode for your UI

The three modes emit fundamentally different payloads. Match the mode to the

UI shape before writing any server code.

UI type stream_mode Payload each tick Emit rate Overdraw risk Typical bandwidth per 5s run
Live-token chat "messages" (AIMessageChunk, metadata) delta ~30-80 tokens/sec Low ~5-15 KB
Per-node progress bar / status line "updates" {nodename: statediff} 1 per node (~2-20 per run) Low ~1-5 KB
Debug / time-travel / state replay "values" Entire graph state dict 1 per node (~2-20 per run) High (state size × steps) ~20 KB to MBs
Hybrid (progress + tokens) ["updates", "messages"] (mode, payload) interleaved Sum of above Depends on inner modes Sum
Non-browser observability astream_events(v2) + filter Filtered dicts Depends on filter Low (server-controlled) Controlled

Decision tree:


Do you need LLM tokens rendered live in the UI?
├── Yes → stream_mode="messages"
│         (add "updates" to the list if you also want per-node progress)
└── No, I need per-step progress
    ├── Full state for debug/replay? → stream_mode="values"
    └── Just what changed (most UIs)  → stream_mode="updates"

Full payload samples and combined-mode examples are in

Stream Mode Comparison.

Step 2 — Wire a minimal SSE endpoint


import asyncio, json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from app.graph import build_graph

app = FastAPI()
graph = build_graph(checkpointer=MemorySaver())


def sse(event: str, data: dict) -> str:
    return f"event: {event}\ndata: {json.dumps(data, default=str)}\n\n"


async def stream_tokens(thread_id: str, user_input: str):
    config = {"configurable": {"thread_id": thread_id}}
    async for chunk, metadata in graph.astream(
        {"messages": [HumanMessage(user_input)]},
        config=config,
        stream_mode="messages",
    ):
        # chunk.content may be list[dict] on Claude tool-use turns (P02)
        text = chunk.text if hasattr(chunk, "text") else (
            chunk.content if isinstance(chunk.content, str) else None
        )
        if text:
            yield sse("token", {"text": text, "node": metadata.get("langgraph_node")})
    yield sse("done", {"thread_id": thread_id})

Always use graph.astream(...) (async). Never call graph.stream(...) (sync)

from inside an async handler — it blocks the event loop and one slow request

blocks every other connection (P48).

Step 3 — Set the anti-buffering headers


@app.get("/stream")
async def stream(thread_id: str, q: str):
    return StreamingResponse(
        stream_tokens(thread_id, q),
        media_type="text/event-stream",
        headers={
            "X-Accel-Buffering": "no",    # Nginx / Cloud Run / Cloudflare
            "Cache-Control": "no-cache",  # Block intermediate caches
            "Connection": "keep-alive",   # Hold the TCP connection
        },
    )

These three headers are non-negotiable in production. Without them, your

stream works on localhost and hangs on Cloud Run. See SSE Endpoint Template

for the full template with a 15-second heartbeat (required to survive Cloud

Run's 60s idle timeout and corporate-proxy timeouts) plus reverse-proxy

snippets for Nginx, Traefik, and Cloud Run.

Step 4 — Filter astream_events(version="v2") server-side

If your UI needs richer events than "messages" provides — tool start/end,

progress markers, retrieval events — do not forward astream_events

raw. A single 60-second agent run can emit 3,000+ events. Filter on the

server and forward only what the browser uses.


FORWARD = {"on_chat_model_stream", "on_tool_start", "on_tool_end"}

async def filtered(graph, inputs, config):
    async for event in graph.astream_events(inputs, config=config, version="v2"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            chunk = event["data"]["chunk"]
            text = chunk.text if hasattr(chunk, "text") else None
            if text:
                yield {"type": "token", "text": text,
                       "node": event["metadata"].get("langgraph_node")}
        elif kind == "on_tool_start":
            yield {"type": "tool_start", "tool": event["name"]}
        elif kind == "on_tool_end":
            yield {"type": "tool_end", "tool": event["name"]}
        # Drop: on_chain_*, on_parser_*, on_prompt_*, on_retriever_* (P47)

Never use astream_log() in new code — soft-deprecated in 1.0 (P67), scheduled

for removal in 2.0. Use astream_events(version="v2") instead. Full event

taxonomy and compression/backpressure patterns in

Astream Events Filtering.

Step 5 — WebSocket variant with reconnect

Use WebSocket instead of SSE when the user may cancel, interrupt, or send

follow-up messages mid-stream. WebSocket also sidesteps Cloudflare Free's

default response buffering.


from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{thread_id}")
async def ws(websocket: WebSocket, thread_id: str):
    await websocket.accept()
    config = {"configurable": {"thread_id": thread_id}}  # P16 — always
    try:
        while True:
            msg = json.loads(await websocket.receive_text())
            if msg["type"] == "user_message":
                async for chunk, metadata in graph.astream(
                    {"messages": [HumanMessage(msg["text"])]},
                    config=config,
                    stream_mode="messages",
                ):
                    text = chunk.text if hasattr(chunk, "text") else None
                    if text:
                        await websocket.send_json({"type": "token", "text": text})
                await websocket.send_json({"type": "done"})
    except WebSocketDisconnect:
        pass  # Checkpointer persists state; reconnect with same thread_id resumes

Because LangGraph checkpointers persist state per thread_id, a client that

reconnects to /ws/{same-thread-id} automatically sees the prior conversation

history on the next turn — no special "resume" handshake required for

between-turn reconnects. For mid-stream reconnects and cancellation handling,

see WebSocket & Reconnect.

Step 6 — Run the proxy-readiness checklist before shipping

A stream that works on uvicorn --reload main:app on your laptop will hang

behind Cloud Run. Before you ship, walk this checklist:

  • [ ] Deployed endpoint returns Content-Type: text/event-stream (or 101 Switching Protocols for WebSocket)
  • [ ] Deployed endpoint returns X-Accel-Buffering: no and Cache-Control: no-cache
  • [ ] curl -N https://your.app/stream?... shows tokens arriving incrementally — NOT all at once at the end
  • [ ] Cloud Run deployed with --use-http2 (HTTP/2 end-to-end flushes chunks reliably)
  • [ ] If behind Cloudflare: Free plan may buffer — test on Pro or use a paid page rule to disable response buffering (or switch to WebSocket)
  • [ ] 15-second heartbeat configured (: heartbeat\n\n SSE comment every 15s) so idle streams don't get killed by the 60s timeout
  • [ ] Long-idle load test: run a tool-using agent that waits 45s on an API call; stream should stay alive

Test behind your actual proxy, not just localhost.

Output

  • stream_mode chosen deliberately from the decision matrix ("messages" for tokens, "updates" for progress, "values" for debug)
  • Minimal FastAPI SSE endpoint using graph.astream(..., stream_mode="messages") in an async handler
  • Required anti-buffering headers (X-Accel-Buffering, Cache-Control, Connection) on the StreamingResponse
  • Server-side astreamevents(version="v2") filter that forwards only onchatmodelstream + ontoolstart + ontoolend
  • Optional WebSocket variant with thread_id required at the route and checkpointer-backed resume
  • Proxy-readiness checklist walked end-to-end before shipping past localhost

Error Handling

Symptom Cause Fix
Browser tab freezes on token stream Shipped stream_mode="values" to a token UI; full state on every tick (P19) Switch to stream_mode="messages" — emits per-token deltas only
Per-node progress bar never advances Shipped stream_mode="messages" to a per-node UI; no node-boundary events (P19) Switch to stream_mode="updates"
Stream works on localhost, hangs on Cloud Run Proxy buffering holds last chunk (P46) Add X-Accel-Buffering: no, Cache-Control: no-cache headers; deploy Cloud Run with --use-http2
Stream closes after ~60s with no data Idle-connection timeout on proxy Send : heartbeat\n\n SSE comment every 15s
Browser tab freezes on long astream_events run Forwarded unfiltered v2 events; 3,000+ events per run (P47) Filter server-side: forward only onchatmodel_stream + optional tool events
DeprecationWarning: astream_log is deprecated Using soft-deprecated API (P67) Migrate to astream_events(version="v2")
Agent has amnesia on every WebSocket message Missing thread_id in config (P16) Require thread_id at route; assert in middleware
AttributeError: 'list' object has no attribute 'lower' on chunk.content Claude streams content blocks, not plain strings on tool-use turns (P02) Use chunk.text (1.0+) or check isinstance(chunk.content, str) before calling string methods
One slow request blocks all other WebSocket clients Sync graph.stream() or graph.invoke() inside async handler (P48) Always use graph.astream() / graph.ainvoke() in async contexts
Cloudflare Free plan buffers SSE Free-tier response buffering Upgrade plan with page rule to disable buffering, or switch endpoint to WebSocket

Examples

Live-token chat UI (the default case)

stream_mode="messages" plus SSE plus the three anti-buffering headers. One

token per SSE frame (~5-50 bytes each), 30-80 frames/sec during active model

generation, heartbeat every 15s during tool waits. See

SSE Endpoint Template for the complete

FastAPI example including heartbeat, reverse-proxy config, and the client

EventSource code.

Per-node progress bar for a multi-step agent

stream_mode="updates" yields one event per node (typically 2-20 per

invocation). Render as discrete status ticks: "Planning..." → "Searching..." →

"Summarizing..." → "Done." Payload is tiny (~100 bytes per tick). Combine with

"messages" (stream_mode=["updates", "messages"]) to show both progress

ticks and streaming tokens in the active node's pane. Full payload samples in

Stream Mode Comparison.

Debug / time-travel view with "values"

stream_mode="values" yields the entire graph state after each node. Useful

for state replay, test recording, observability pipelines — not for

browser UIs where state size × steps × re-render quickly freezes the tab.

Pipe to a server-side log (or LangSmith), not to the browser. Example and

caveats in Stream Mode Comparison.

WebSocket with reconnect-by-thread_id

When users can cancel mid-stream or send follow-up messages before the

previous turn finishes. The thread_id is required at the route; the

checkpointer persists history; reconnecting with the same thread_id

automatically sees prior turns. Cancellation is implemented via

asyncio.Task.cancel() on the active astream iteration. Worked example

with half-open connection detection in

WebSocket & Reconnect.

Resources

Ready to use langchain-py-pack?