langchain-langgraph-streaming
Pick the correct LangGraph 1.0 stream_mode ("messages" vs "updates" vs "values"), wire it into SSE or WebSocket without proxy-buffering gotchas, and filter astream_events(v2) server-side before forwarding to the browser. Use when building a live-token chat UI, a per-node progress bar, a debug/time-travel view, or diagnosing a LangGraph stream that hangs over a production proxy. Trigger with "langgraph streaming", "stream_mode messages", "stream_mode updates", "stream_mode values", "langgraph SSE", "langgraph astream_events", "SSE hangs behind nginx", "cloud run streaming".
Allowed Tools
Provided by Plugin
langchain-py-pack
Claude Code skill pack for LangChain 1.0 + LangGraph 1.0 (Python) - 34 skills covering chains, agents, RAG, middleware, checkpointing, HITL, streaming, and production patterns
Installation
This skill is included in the langchain-py-pack plugin:
/plugin install langchain-py-pack@claude-code-plugins-plus
Click to copy
Instructions
LangGraph Streaming (Python)
Overview
An engineer ships stream_mode="values" to a token-level chat UI because it
"seemed the most complete." Every single token causes the full graph state —
message history, scratchpad, plan — to be re-sent and re-rendered. At ~60
tokens/sec the browser overdraws, the React reconciler can't keep up, the tab
freezes, and users blame the model. The correct answer was stream_mode="messages",
which emits an AIMessageChunk delta per token (typically 5-50 bytes) — one
token's worth of DOM work. This is pain-catalog entry P19 and it is the #1
LangGraph integration mistake in the 1.0 generation.
Then the same UI ships to Cloud Run and hangs forever. No error. No logs. The
server is emitting tokens; they just never reach the browser. Default proxy
buffering (Nginx, Cloud Run's HTTP/1.1 path, Cloudflare Free) holds the last
chunk waiting for more bytes. This is P46 — SSE streams from LangGraph
drop the final end event over proxies that buffer — and the fix is three
headers: X-Accel-Buffering: no, Cache-Control: no-cache, Connection: keep-alive.
And then the debug view starts crashing browser tabs on long runs. The engineer
forwarded astream_events(version="v2") raw to the client because "it has more
detail" — but v2 emits thousands of events per invocation (per-token, per-node,
per-runnable lifecycle), and a 60-second agent run easily hits 3,000 events.
Browsers freeze on the JSON deserialize queue. This is P47 — filter
server-side, forward only onchatmodel_stream tokens (and optionally
ontoolstart / ontoolend).
This skill ships the decision matrix, a production-grade FastAPI SSE endpoint
with the anti-buffering headers and a 15-second heartbeat, a server-side v2
event filter that drops ~90% of noise, and a WebSocket variant with
reconnect-by-thread_id that resumes from the LangGraph checkpointer. Pin:
langgraph 1.0.x, langchain-core 1.0.x. Pain-catalog anchors: **P19, P46,
P47, P48, P67**, plus P16 for the thread_id rule and P22 for checkpointer
persistence.
Prerequisites
- Python 3.10+
langgraph >= 1.0, < 2.0,langchain-core >= 1.0, < 2.0fastapi >= 0.110,uvicorn[standard](for SSE/WebSocket hosting)- A checkpointer:
langgraph.checkpoint.memory.MemorySaverfor dev, or
langgraph.checkpoint.postgres.PostgresSaver for prod
- Access to deploy behind your actual proxy (Nginx / Cloud Run / Cloudflare) —
localhost does not reproduce the buffering class of bugs
Instructions
Step 1 — Pick the right stream_mode for your UI
The three modes emit fundamentally different payloads. Match the mode to the
UI shape before writing any server code.
| UI type | stream_mode |
Payload each tick | Emit rate | Overdraw risk | Typical bandwidth per 5s run |
|---|---|---|---|---|---|
| Live-token chat | "messages" |
(AIMessageChunk, metadata) delta |
~30-80 tokens/sec | Low | ~5-15 KB |
| Per-node progress bar / status line | "updates" |
{nodename: statediff} |
1 per node (~2-20 per run) | Low | ~1-5 KB |
| Debug / time-travel / state replay | "values" |
Entire graph state dict | 1 per node (~2-20 per run) | High (state size × steps) | ~20 KB to MBs |
| Hybrid (progress + tokens) | ["updates", "messages"] |
(mode, payload) interleaved |
Sum of above | Depends on inner modes | Sum |
| Non-browser observability | astream_events(v2) + filter |
Filtered dicts | Depends on filter | Low (server-controlled) | Controlled |
Decision tree:
Do you need LLM tokens rendered live in the UI?
├── Yes → stream_mode="messages"
│ (add "updates" to the list if you also want per-node progress)
└── No, I need per-step progress
├── Full state for debug/replay? → stream_mode="values"
└── Just what changed (most UIs) → stream_mode="updates"
Full payload samples and combined-mode examples are in
Step 2 — Wire a minimal SSE endpoint
import asyncio, json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from app.graph import build_graph
app = FastAPI()
graph = build_graph(checkpointer=MemorySaver())
def sse(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data, default=str)}\n\n"
async def stream_tokens(thread_id: str, user_input: str):
config = {"configurable": {"thread_id": thread_id}}
async for chunk, metadata in graph.astream(
{"messages": [HumanMessage(user_input)]},
config=config,
stream_mode="messages",
):
# chunk.content may be list[dict] on Claude tool-use turns (P02)
text = chunk.text if hasattr(chunk, "text") else (
chunk.content if isinstance(chunk.content, str) else None
)
if text:
yield sse("token", {"text": text, "node": metadata.get("langgraph_node")})
yield sse("done", {"thread_id": thread_id})
Always use graph.astream(...) (async). Never call graph.stream(...) (sync)
from inside an async handler — it blocks the event loop and one slow request
blocks every other connection (P48).
Step 3 — Set the anti-buffering headers
@app.get("/stream")
async def stream(thread_id: str, q: str):
return StreamingResponse(
stream_tokens(thread_id, q),
media_type="text/event-stream",
headers={
"X-Accel-Buffering": "no", # Nginx / Cloud Run / Cloudflare
"Cache-Control": "no-cache", # Block intermediate caches
"Connection": "keep-alive", # Hold the TCP connection
},
)
These three headers are non-negotiable in production. Without them, your
stream works on localhost and hangs on Cloud Run. See SSE Endpoint Template
for the full template with a 15-second heartbeat (required to survive Cloud
Run's 60s idle timeout and corporate-proxy timeouts) plus reverse-proxy
snippets for Nginx, Traefik, and Cloud Run.
Step 4 — Filter astream_events(version="v2") server-side
If your UI needs richer events than "messages" provides — tool start/end,
progress markers, retrieval events — do not forward astream_events
raw. A single 60-second agent run can emit 3,000+ events. Filter on the
server and forward only what the browser uses.
FORWARD = {"on_chat_model_stream", "on_tool_start", "on_tool_end"}
async def filtered(graph, inputs, config):
async for event in graph.astream_events(inputs, config=config, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
chunk = event["data"]["chunk"]
text = chunk.text if hasattr(chunk, "text") else None
if text:
yield {"type": "token", "text": text,
"node": event["metadata"].get("langgraph_node")}
elif kind == "on_tool_start":
yield {"type": "tool_start", "tool": event["name"]}
elif kind == "on_tool_end":
yield {"type": "tool_end", "tool": event["name"]}
# Drop: on_chain_*, on_parser_*, on_prompt_*, on_retriever_* (P47)
Never use astream_log() in new code — soft-deprecated in 1.0 (P67), scheduled
for removal in 2.0. Use astream_events(version="v2") instead. Full event
taxonomy and compression/backpressure patterns in
Step 5 — WebSocket variant with reconnect
Use WebSocket instead of SSE when the user may cancel, interrupt, or send
follow-up messages mid-stream. WebSocket also sidesteps Cloudflare Free's
default response buffering.
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/{thread_id}")
async def ws(websocket: WebSocket, thread_id: str):
await websocket.accept()
config = {"configurable": {"thread_id": thread_id}} # P16 — always
try:
while True:
msg = json.loads(await websocket.receive_text())
if msg["type"] == "user_message":
async for chunk, metadata in graph.astream(
{"messages": [HumanMessage(msg["text"])]},
config=config,
stream_mode="messages",
):
text = chunk.text if hasattr(chunk, "text") else None
if text:
await websocket.send_json({"type": "token", "text": text})
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
pass # Checkpointer persists state; reconnect with same thread_id resumes
Because LangGraph checkpointers persist state per thread_id, a client that
reconnects to /ws/{same-thread-id} automatically sees the prior conversation
history on the next turn — no special "resume" handshake required for
between-turn reconnects. For mid-stream reconnects and cancellation handling,
Step 6 — Run the proxy-readiness checklist before shipping
A stream that works on uvicorn --reload main:app on your laptop will hang
behind Cloud Run. Before you ship, walk this checklist:
- [ ] Deployed endpoint returns
Content-Type: text/event-stream(or101 Switching Protocolsfor WebSocket) - [ ] Deployed endpoint returns
X-Accel-Buffering: noandCache-Control: no-cache - [ ]
curl -N https://your.app/stream?...shows tokens arriving incrementally — NOT all at once at the end - [ ] Cloud Run deployed with
--use-http2(HTTP/2 end-to-end flushes chunks reliably) - [ ] If behind Cloudflare: Free plan may buffer — test on Pro or use a paid page rule to disable response buffering (or switch to WebSocket)
- [ ] 15-second heartbeat configured (
: heartbeat\n\nSSE comment every 15s) so idle streams don't get killed by the 60s timeout - [ ] Long-idle load test: run a tool-using agent that waits 45s on an API call; stream should stay alive
Test behind your actual proxy, not just localhost.
Output
stream_modechosen deliberately from the decision matrix ("messages"for tokens,"updates"for progress,"values"for debug)- Minimal FastAPI SSE endpoint using
graph.astream(..., stream_mode="messages")in an async handler - Required anti-buffering headers (
X-Accel-Buffering,Cache-Control,Connection) on theStreamingResponse - Server-side
astreamevents(version="v2")filter that forwards onlyonchatmodelstream+ontoolstart+ontoolend - Optional WebSocket variant with
thread_idrequired at the route and checkpointer-backed resume - Proxy-readiness checklist walked end-to-end before shipping past localhost
Error Handling
| Symptom | Cause | Fix |
|---|---|---|
| Browser tab freezes on token stream | Shipped stream_mode="values" to a token UI; full state on every tick (P19) |
Switch to stream_mode="messages" — emits per-token deltas only |
| Per-node progress bar never advances | Shipped stream_mode="messages" to a per-node UI; no node-boundary events (P19) |
Switch to stream_mode="updates" |
| Stream works on localhost, hangs on Cloud Run | Proxy buffering holds last chunk (P46) | Add X-Accel-Buffering: no, Cache-Control: no-cache headers; deploy Cloud Run with --use-http2 |
| Stream closes after ~60s with no data | Idle-connection timeout on proxy | Send : heartbeat\n\n SSE comment every 15s |
Browser tab freezes on long astream_events run |
Forwarded unfiltered v2 events; 3,000+ events per run (P47) | Filter server-side: forward only onchatmodel_stream + optional tool events |
DeprecationWarning: astream_log is deprecated |
Using soft-deprecated API (P67) | Migrate to astream_events(version="v2") |
| Agent has amnesia on every WebSocket message | Missing thread_id in config (P16) |
Require thread_id at route; assert in middleware |
AttributeError: 'list' object has no attribute 'lower' on chunk.content |
Claude streams content blocks, not plain strings on tool-use turns (P02) | Use chunk.text (1.0+) or check isinstance(chunk.content, str) before calling string methods |
| One slow request blocks all other WebSocket clients | Sync graph.stream() or graph.invoke() inside async handler (P48) |
Always use graph.astream() / graph.ainvoke() in async contexts |
| Cloudflare Free plan buffers SSE | Free-tier response buffering | Upgrade plan with page rule to disable buffering, or switch endpoint to WebSocket |
Examples
Live-token chat UI (the default case)
stream_mode="messages" plus SSE plus the three anti-buffering headers. One
token per SSE frame (~5-50 bytes each), 30-80 frames/sec during active model
generation, heartbeat every 15s during tool waits. See
SSE Endpoint Template for the complete
FastAPI example including heartbeat, reverse-proxy config, and the client
EventSource code.
Per-node progress bar for a multi-step agent
stream_mode="updates" yields one event per node (typically 2-20 per
invocation). Render as discrete status ticks: "Planning..." → "Searching..." →
"Summarizing..." → "Done." Payload is tiny (~100 bytes per tick). Combine with
"messages" (stream_mode=["updates", "messages"]) to show both progress
ticks and streaming tokens in the active node's pane. Full payload samples in
Debug / time-travel view with "values"
stream_mode="values" yields the entire graph state after each node. Useful
for state replay, test recording, observability pipelines — not for
browser UIs where state size × steps × re-render quickly freezes the tab.
Pipe to a server-side log (or LangSmith), not to the browser. Example and
caveats in Stream Mode Comparison.
WebSocket with reconnect-by-thread_id
When users can cancel mid-stream or send follow-up messages before the
previous turn finishes. The thread_id is required at the route; the
checkpointer persists history; reconnecting with the same thread_id
automatically sees prior turns. Cancellation is implemented via
asyncio.Task.cancel() on the active astream iteration. Worked example
with half-open connection detection in
Resources
- LangGraph streaming how-to
- LangGraph streaming concepts
- LangChain
astreameventsv2 - FastAPI
StreamingResponse - FastAPI WebSockets
- Cloud Run HTTP/2 end-to-end
- Nginx
proxy_bufferingdirective - Pack pain catalog:
docs/pain-catalog.md(entries P16, P19, P22, P46, P47, P48, P67)