langchain-webhooks-events
Dispatch LangChain 1.0 chain/agent events to external systems — webhooks, Kafka, Redis Streams, SNS — via async fire-and-forget callbacks, subgraph-aware wiring, and HMAC-signed delivery with idempotency keys. Use when firing webhooks on tool calls, pushing telemetry to Kafka / Redis Streams, or fanning progress to multiple subscribers without blocking the chain. Trigger with "langchain webhook", "langchain event dispatch", "langchain callback kafka", "langchain pubsub", "langchain per-tool webhook", "BaseCallbackHandler webhook", "on_tool_end webhook", "langchain analytics event".
Allowed Tools
Provided by Plugin
langchain-py-pack
Claude Code skill pack for LangChain 1.0 + LangGraph 1.0 (Python) - 34 skills covering chains, agents, RAG, middleware, checkpointing, HITL, streaming, and production patterns
Installation
This skill is included in the langchain-py-pack plugin:
/plugin install langchain-py-pack@claude-code-plugins-plus
Click to copy
Instructions
LangChain Webhooks and Event Dispatch (Python)
Overview
A team wires per-tool webhook dispatch from their LangChain agent via FastAPI
BackgroundTasks — analytics is always N seconds late because BackgroundTasks
fire after the HTTP response closes, not during the stream (P60). Worse:
the BaseCallbackHandler they attached via .with_config(callbacks=[h])
fires on the outer agent but is dark on the subagent's tool calls — custom
callbacks are not inherited by LangGraph subgraphs (P28), they must be
passed via config["callbacks"] at invoke time.
Pain-catalog anchors handled here:
- P28 — Callbacks via
with_configdon't propagate to subgraphs - P46 — SSE streams dropped by buffering proxies (see
langchain-langgraph-streaming) - P47 —
astream_events(v2)emits thousands of events; never forward raw - P48 — Sync
invoke()inside async endpoint blocks the event loop - P60 —
BackgroundTasksfire post-response; wrong for per-event dispatch
This skill walks through an async AsyncCallbackHandler with fire-and-forget
dispatch, per-target sinks for HTTP / Kafka / Redis Streams / SNS, HMAC-signed
delivery with 1s/5s/30s retry and DLQ, idempotency keys = `runid + eventtype
- step_index
, andconfig["callbacks"]` wiring that makes subagent calls visible.
Typical webhook latency budget: <500ms per event. Pin: langchain-core 1.0.x,
langgraph 1.0.x. Scope: server-to-server dispatch only — UI streaming is in
langchain-langgraph-streaming.
Prerequisites
- Python 3.10+
langchain-core >= 1.0, < 2.0,langgraph >= 1.0, < 2.0httpx >= 0.27for async HTTP (oraiohttp)- One of:
aiokafka,redis[hiredis] >= 5,aioboto3(per target) - An event sink — a webhook endpoint, Kafka topic, Redis Stream, or SNS topic
- A shared secret (for HMAC) stored in your secret manager, not env
Instructions
Step 1 — Write an async handler that fire-and-forget dispatches
Sync dispatch from a callback blocks the chain — a slow HTTP POST during
ontoolend serializes all downstream tokens behind it (P48). Use
asyncio.create_task(...) so the dispatch runs alongside the chain:
import asyncio
import uuid
from typing import Any
from langchain_core.callbacks import AsyncCallbackHandler
class EventDispatchHandler(AsyncCallbackHandler):
"""Fire-and-forget dispatch to external sinks.
IMPORTANT: subclass AsyncCallbackHandler (not BaseCallbackHandler) so
on_* methods are awaited. Mixing sync and async handlers is a silent
footgun — sync on_* blocks the event loop (P48).
"""
def __init__(self, sink, *, run_id: str | None = None):
self.sink = sink # dispatch target — Step 3
self.run_id = run_id or str(uuid.uuid4())
self._tasks: set[asyncio.Task] = set()
def _dispatch(self, event_type: str, payload: dict, step_index: int) -> None:
# Fire-and-forget. Keep a strong reference so the task isn't GC'd
# mid-flight (asyncio quirk — orphan tasks get garbage-collected).
task = asyncio.create_task(
self.sink.send(
idempotency_key=f"{self.run_id}:{event_type}:{step_index}",
event_type=event_type,
payload=payload,
)
)
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
async def on_tool_end(self, output: Any, *, run_id, parent_run_id=None, **kwargs):
# 4000 char cap — keep payload under typical webhook body limits
# while preserving enough context for downstream analytics.
MAX_OUTPUT_CHARS = 4000
self._dispatch(
"tool_end",
{"output": str(output)[:MAX_OUTPUT_CHARS], "run_id": str(run_id)},
step_index=kwargs.get("tags", []).__len__() or 0,
)
async def on_chain_end(self, outputs: dict, *, run_id, **kwargs):
# Only named chains — skip the unnamed LCEL inner nodes (P47)
name = kwargs.get("name")
if not name or name.startswith("RunnableLambda"):
return
self._dispatch("chain_end", {"name": name, "run_id": str(run_id)}, step_index=0)
async def drain(self, timeout: float = 5.0) -> None:
"""Call before process exit so in-flight dispatches complete."""
if self._tasks:
await asyncio.wait(self._tasks, timeout=timeout)
See Async Callback Handler for the full
handler — onllmend, filtering, sync-vs-async decision.
Step 2 — Pass callbacks via config so subgraphs inherit them
P28: Runnable.with_config(callbacks=[h]) binds at definition and is not
inherited by LangGraph subgraphs. Pass callbacks via config at invocation:
# WRONG — subagent tool calls never fire the handler
agent_with_handler = agent.with_config({"callbacks": [handler]})
await agent_with_handler.ainvoke({"messages": [...]})
# RIGHT — callbacks in config propagate into subgraphs
await agent.ainvoke(
{"messages": [...]},
config={"callbacks": [handler], "configurable": {"thread_id": "t1"}},
)
Validate propagation with a probe that counts events by kwargs["name"] and
asserts the subagent's name appears. See Subgraph Propagation.
Step 3 — Pick a dispatch target by delivery semantics
Match the event to the transport:
| Target | Delivery | Typical latency | Use when | Failure mode |
|---|---|---|---|---|
| HTTP webhook | At-least-once (with retry) | 50-500ms | Partner integrations, Zapier/Make, customer-owned endpoints | Endpoint 5xx → retry 1s/5s/30s → DLQ |
| Kafka (aiokafka) | At-least-once (idempotent producer) | 5-20ms intra-region | High-volume telemetry, analytics fan-in | Broker unavailable → retry + local buffer |
Redis Streams (XADD) |
At-least-once (consumer groups) | 1-5ms | Near-realtime worker queues, progress fan-out | Redis down → retry or spill to disk |
| SNS | At-most-once (best-effort) | 10-100ms | Fan-out to multiple SQS/Lambda subscribers | Best-effort only; accept loss or front with SQS FIFO |
Handler stays provider-agnostic; only the sink changes. A minimal HTTP sink:
import hashlib, hmac, json, os
import httpx
WEBHOOK_URL = os.environ["WEBHOOK_URL"]
SIGNING_SECRET = os.environ["WEBHOOK_SIGNING_SECRET"].encode()
class WebhookSink:
# 256-bit HMAC — industry-standard signature strength (GitHub, Stripe use same)
SIG_ALG = hashlib.sha256
# Retry schedule: 1s absorbs transient blips, 5s absorbs brief 503s,
# 30s absorbs autoscaler / cold-start incidents. Beyond 30s = stale event.
RETRY_DELAYS = (1, 5, 30)
REQUEST_TIMEOUT_S = 5.0
def __init__(self, client: httpx.AsyncClient):
self.client = client
async def send(self, *, idempotency_key: str, event_type: str, payload: dict) -> None:
body = json.dumps({"event": event_type, "data": payload}, sort_keys=True).encode()
sig = hmac.new(SIGNING_SECRET, body, self.SIG_ALG).hexdigest()
headers = {
"Content-Type": "application/json",
"Idempotency-Key": idempotency_key,
"X-Signature-256": f"sha256={sig}",
}
for delay in self.RETRY_DELAYS:
try:
resp = await self.client.post(WEBHOOK_URL, content=body, headers=headers, timeout=self.REQUEST_TIMEOUT_S)
if 200 <= resp.status_code < 300:
return
if resp.status_code < 500 and resp.status_code != 429:
return # 4xx (except 429) is not retryable
except (httpx.TimeoutException, httpx.TransportError):
pass
await asyncio.sleep(delay)
await self._dead_letter(idempotency_key, event_type, payload)
See Dispatch Targets for Kafka / Redis Streams
/ SNS sinks and per-target DLQ patterns.
Step 4 — Filter events so you don't saturate the downstream
astream_events(version="v2") emits thousands of events per invocation (P47).
Never forward raw — dispatch only what the downstream consumes:
| Callback method | Typical decision | Why |
|---|---|---|
onllmstart |
Skip | Prompt content often contains PII; low value without masking |
onllmnew_token |
Skip for dispatch (UI only) | 1 event per token; N/A to analytics |
onllmend |
Dispatch for named chains only | Token usage, final response — high value, low volume |
onchainstart |
Skip (P47 noise) | LCEL emits one per inner runnable |
onchainend |
Dispatch for named subgraphs only | Stage completion — what analytics cares about |
ontoolstart |
Optional (dispatch for audit log) | Matters for compliance / tool-use audit |
ontoolend |
Dispatch always | The key analytics signal in agent flows |
onagentaction |
Dispatch | Cleaner signal than ontoolstart in agent graphs |
onagentfinish |
Dispatch | Terminal event for the run |
Rule of thumb: dispatch ontoolend + named onchainend + onllmend.
Everything else is noise.
Step 5 — Build idempotency keys and a retry budget
At-least-once transports mean duplicates. Build the key deterministically:
# run_id — unique per chain invocation (propagates into subgraphs)
# event_type — on_tool_end / on_chain_end / on_llm_end
# step_index — monotonic per-run counter you maintain in the handler
idempotency_key = f"{run_id}:{event_type}:{step_index}"
Retry budget: 1s → 5s → 30s (~36s total) then DLQ. Retry on 5xx / 429 /
network only — 4xx (except 429) goes straight to DLQ. DLQ is a Redis Stream
or S3 prefix keyed by YYYY/MM/DD/runid/idempotencykey.json; alarm on depth
growth. See Idempotency and Retry for
HMAC verify, at-least-once vs at-most-once, and 24h de-dup window sizing.
Step 6 — Never dispatch from BackgroundTasks (P60)
FastAPI BackgroundTasks run after the response closes — exactly wrong for
per-event dispatch. Events must go out during the chain:
# WRONG — events fire all at once after the stream ends
@app.post("/chat")
async def chat(req: Request, bg: BackgroundTasks):
bg.add_task(agent.ainvoke, {"messages": [...]}) # late + no streaming
return {"status": "accepted"}
# RIGHT — handler fires during the chain, each on_tool_end dispatches immediately
@app.post("/chat")
async def chat(req: ChatReq):
handler = EventDispatchHandler(sink=webhook_sink, run_id=req.run_id)
try:
result = await agent.ainvoke(
{"messages": req.messages},
config={"callbacks": [handler], "configurable": {"thread_id": req.thread_id}},
)
finally:
await handler.drain(timeout=5.0) # flush in-flight dispatches
return {"result": result}
drain() awaits in-flight asyncio.create_task() dispatches up to 5s so
events aren't lost when the pod scales down mid-request.
Output
- Async
BaseCallbackHandlersubclass withasyncio.create_task()fire-and-forget - Callbacks wired via
config["callbacks"]at invoke time (subgraph-safe) - Per-target sink abstraction: HTTP / Kafka / Redis Streams / SNS
- HMAC-signed webhook payloads with 1s/5s/30s retry and DLQ fallback
- Idempotency key =
runid + eventtype + step_index - Event-taxonomy filter: dispatch
ontoolend+ namedonchainend+onllmend drain()on shutdown to flush in-flight dispatches
Error Handling
| Error | Cause | Fix |
|---|---|---|
| Handler fires on outer agent but not subagent | Bound via with_config at definition (P28) |
Pass via config["callbacks"] at invoke(...) time |
| Webhook analytics lags generation duration | BackgroundTasks fire post-response (P60) |
Dispatch from the callback handler, never from BackgroundTasks |
| Browser / Kafka saturates on long generations | Forwarded astream_events(v2) raw (P47) |
Filter events server-side; dispatch only ontoolend/onchainend/onllmend |
| SSE stream hangs, no end event | Proxy buffering (P46) | Set X-Accel-Buffering: no — see langchain-langgraph-streaming |
| Event loop freezes on slow webhook | Sync POST in callback (P48) | Subclass AsyncCallbackHandler; use asyncio.create_task() |
| Duplicate events downstream | At-least-once dispatch + retry | Receiver dedupes on Idempotency-Key header with 24h cache |
Orphan asyncio.create_task never runs |
GC collected the task | Hold a strong reference in self._tasks and discard on completion |
| Events lost on pod shutdown | In-flight tasks cancelled | Call await handler.drain(timeout=5.0) in endpoint finally block |
| 4xx webhook errors retrying 3x | Retry logic retrying everything | Retry only on 5xx / 429 / network; 4xx goes straight to DLQ |
| Signature verification fails on receiver | Body re-serialized with different key order | Sign the exact bytes you send; use sort_keys=True in json.dumps |
Examples
Named subgraph dispatch with a LangGraph agent
Planner subagent runs search_docs → summarize; outer agent needs a webhook
on summarize completion. Full wiring in Subgraph Propagation.
Fan-out: webhook + Kafka + Redis Streams in one invocation
CompositeSink dispatches to multiple child sinks via
asyncio.gather(..., return_exceptions=True) — one sink's failure doesn't block
others. See Dispatch Targets.
HMAC-signed webhook receiver with de-dup
Verify X-Signature-256, SETNX Idempotency-Key against Redis with 24h TTL,
200 on both replay and first-seen. See Idempotency and Retry.
Resources
- LangChain callbacks concepts
BaseCallbackHandler/AsyncCallbackHandlerAPIastreameventsv2 events reference- LangGraph streaming concepts
- FastAPI BackgroundTasks — note the "after response" semantics
- aiokafka producer idempotence
- Redis Streams
XADD/ consumer groups - Cross-reference in this pack:
langchain-langgraph-streaming(UI streaming),langchain-debug-bundle(callback-propagation debugging) - Pack pain catalog:
docs/pain-catalog.md(entries P28, P46, P47, P48, P60)