Event Streaming & Real-Time Agent UX — Part 9
Build real-time streaming agent interfaces in LangChain using astream_events(). Learn to differentiate token streams, tool call events, and node transitions — then render them in a live dashboard for multi-agent workflows.

TL;DR: Static loading spinners lead to poor user experiences. This post demonstrates how to use astream_events() to capture granular lifecycle events and build a real-time console dashboard for multi-agent graphs.
The Dead Spinner Problem
In Part 1 we covered basic streaming with llm.stream(). That gives you tokens as they arrive. It works for a single model call.
But the Tech News Daily newsroom from Part 6 is a multi-agent graph. When a user submits an article, up to four things happen in sequence: the supervisor decides, the researcher runs, the writer composes, the notifier formats. With basic streaming, you see nothing until the very last agent finishes. Users stare at a blank screen for 15–30 seconds thinking the app has frozen.
astream_events() solves this. Instead of streaming final tokens, it streams events — typed lifecycle signals that fire at every meaningful moment:
| Event type | When it fires |
|---|---|
on_chain_start | A node or chain starts executing |
on_chat_model_start | The LLM begins processing |
on_chat_model_stream | A single token arrives from the model |
on_chat_model_end | The LLM finishes its response |
on_tool_start | A tool call begins |
on_tool_end | A tool call completes |
on_chain_end | A node or chain finishes |
You subscribe to the events you care about and build whatever UX you need — a spinner, a live transcript, a progress bar, a dashboard.
Why is this different from
llm.stream()?llm.stream()gives you chunks of the final answer. It has no concept of tools, agents, or graph nodes.astream_events()is aware of the entire execution stack — it tells you which component produced each event, not just what the text content is. For multi-agent systems, this distinction is everything.
Setup
source langchain-env/bin/activate
pip install langchain-google-genai langchain-core langgraph python-dotenvastream_events() is async — all examples in this post use asyncio.
Part 1: Understanding the Event Stream
Before building a dashboard, understand the raw event format.
# create a file: 26_event_streaming.py
import asyncio
import os
from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_google_genai import ChatGoogleGenerativeAI
load_dotenv()
llm = ChatGoogleGenerativeAI(model="gemini-3.5-flash", temperature=0)
# -------------------------------------------------------
# A simple single-chain example to understand event types
# before adding the complexity of a full agent graph
# -------------------------------------------------------
headline_chain = (
ChatPromptTemplate.from_messages([
("system", "You are a news headline writer for Tech News Daily."),
("human", "Write 3 headline options for this story: {story}")
])
| llm.with_config({"run_name": "headline_llm"}) # name shows up in events
| StrOutputParser()
)
async def inspect_events():
"""Print every event with its type and relevant data."""
story = "Cloudflare reported a 99.99% uptime SLA breach affecting European customers for 47 minutes on June 13, 2026. The cause was a BGP routing misconfiguration."
print("=== Raw Event Stream ===\n")
async for event in headline_chain.astream_events(
{"story": story},
version="v2" # always specify version — v2 is current stable
):
kind = event["event"]
name = event.get("name", "unknown")
# Skip verbose start/end events for this inspection
if kind == "on_chat_model_stream":
chunk = event["data"]["chunk"].content
if chunk: # filter empty chunks
print(f" TOKEN: '{chunk}'", end="", flush=True)
elif kind == "on_tool_start":
print(f"\n TOOL START → {event['data']['input']}")
elif kind == "on_tool_end":
print(f" TOOL END → output length: {len(str(event['data']['output']))}")
elif kind in ("on_chain_start", "on_chain_end"):
print(f"\n {kind.upper()}: {name}")
elif kind == "on_chat_model_end":
print(f"\n\n MODEL DONE — total tokens used: {event['data']['output'].usage_metadata}")
asyncio.run(inspect_events())Run it:
python 26_event_streaming.pyYou will see the chain events fire in sequence: chain starts, model starts, tokens stream in real time, model ends, chain ends. Now you have the raw material to build any UX on top of.
Why
version="v2"and not"v1"or"v3"?v2is the current stable version ofastream_events.v1is deprecated.v3is in preview and the event schema may still change. Always specify the version explicitly — without it, LangChain defaults tov1and emits a deprecation warning that will break your logs.
Tip — filter events by
run_name: In a multi-agent graph, dozens of events fire. Use.with_config({"run_name": "my_node"})on any chain, LLM, or tool, then filter events byevent["name"] == "my_node"in your handler. Without naming, all events arrive with generic identifiers and it becomes impossible to know which agent produced which token.
Part 2: Live Terminal Dashboard for the Newsroom
Now build a real dashboard. This streams the multi-agent newsroom graph from Part 6, showing each agent's status in real time.
# create a file: 27_live_dashboard.py
import asyncio
import os
import sys
import time
from dotenv import load_dotenv
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_google_genai import ChatGoogleGenerativeAI
load_dotenv()
llm = ChatGoogleGenerativeAI(model="gemini-3.5-flash", temperature=0, streaming=True)
# -------------------------------------------------------
# Newsroom state (same shape as Part 6)
# -------------------------------------------------------
class NewsroomState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
task: str
research_findings: str
draft_content: str
next_agent: str
# -------------------------------------------------------
# Dashboard renderer — prints status without clearing the screen
# Compatible with any terminal (no ANSI tricks needed)
# -------------------------------------------------------
class NewsDashboard:
def __init__(self):
self.current_agent = None
self.token_count = 0
self.start_time = time.time()
self.tool_calls = []
def agent_started(self, agent_name: str):
elapsed = time.time() - self.start_time
print(f"\n[{elapsed:.1f}s] ▶ {agent_name.upper()} started", flush=True)
self.current_agent = agent_name
def agent_token(self, token: str):
print(token, end="", flush=True)
self.token_count += 1
def agent_done(self, agent_name: str):
elapsed = time.time() - self.start_time
print(f"\n[{elapsed:.1f}s] ✓ {agent_name.upper()} done", flush=True)
def tool_called(self, tool_name: str, input_summary: str):
elapsed = time.time() - self.start_time
print(f"\n[{elapsed:.1f}s] 🔧 Tool: {tool_name} | Input: {input_summary[:60]}", flush=True)
self.tool_calls.append(tool_name)
def summary(self):
elapsed = time.time() - self.start_time
print(f"\n{'='*60}")
print(f"Pipeline complete in {elapsed:.1f}s")
print(f"Total tokens streamed: {self.token_count}")
print(f"Tool calls: {self.tool_calls or 'none'}")
dashboard = NewsDashboard()
# -------------------------------------------------------
# Specialist agents — each named for event filtering
# -------------------------------------------------------
def make_researcher(llm):
from langchain_core.prompts import ChatPromptTemplate
return (
ChatPromptTemplate.from_messages([
("system", "You are the Tech News Daily researcher. Gather key facts."),
("human", "Research task: {task}\nProvide 4 numbered findings.")
])
| llm.with_config({"run_name": "researcher_llm"})
)
def make_writer(llm):
from langchain_core.prompts import ChatPromptTemplate
return (
ChatPromptTemplate.from_messages([
("system", "You are the Tech News Daily writer. Write sharp, factual summaries."),
("human", "Task: {task}\n\nFindings:\n{findings}\n\nWrite a 2-paragraph summary.")
])
| llm.with_config({"run_name": "writer_llm"})
)
researcher_chain = make_researcher(llm)
writer_chain = make_writer(llm)
# -------------------------------------------------------
# Graph nodes — each is also named for event filtering
# -------------------------------------------------------
async def researcher_node(state: NewsroomState) -> dict:
response = await researcher_chain.ainvoke(
{"task": state["task"]},
config={"run_name": "researcher_node"}
)
return {
"research_findings": response.content,
"messages": [AIMessage(content=response.content, name="researcher")]
}
async def writer_node(state: NewsroomState) -> dict:
response = await writer_chain.ainvoke(
{"task": state["task"], "findings": state["research_findings"]},
config={"run_name": "writer_node"}
)
return {
"draft_content": response.content,
"messages": [AIMessage(content=response.content, name="writer")]
}
async def notifier_node(state: NewsroomState) -> dict:
content = f"📰 PUBLISHED: {state['draft_content'][:200]}..."
return {
"messages": [AIMessage(content=content, name="notifier")]
}
def supervisor_node(state: NewsroomState) -> dict:
research_done = bool(state.get("research_findings", "").strip())
writing_done = bool(state.get("draft_content", "").strip())
notifier_ran = any(getattr(m, "name", "") == "notifier" for m in state.get("messages", []))
if not research_done:
return {"next_agent": "researcher"}
elif not writing_done:
return {"next_agent": "writer"}
elif not notifier_ran:
return {"next_agent": "notifier"}
else:
return {"next_agent": "FINISH"}
def route_to_agent(state: NewsroomState) -> str:
n = state.get("next_agent", "FINISH")
return END if n == "FINISH" else n
# Build the graph
graph = StateGraph(NewsroomState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_node("notifier", notifier_node)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_to_agent,
{"researcher": "researcher", "writer": "writer", "notifier": "notifier", END: END})
graph.add_edge("researcher", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("notifier", "supervisor")
newsroom_graph = graph.compile()
# -------------------------------------------------------
# The streaming runner — the key logic is here
# -------------------------------------------------------
async def run_with_dashboard(task: str):
print(f"\n{'='*60}")
print(f"Tech News Daily — Processing: {task[:70]}")
print(f"{'='*60}")
initial_state = {
"task": task,
"messages": [],
"research_findings": "",
"draft_content": "",
"next_agent": "",
}
# Track which agent is "active" for event attribution
active_agent = None
async for event in newsroom_graph.astream_events(
initial_state,
version="v2",
config={"recursion_limit": 20}
):
kind = event["event"]
name = event.get("name", "")
# Detect which agent node is starting
if kind == "on_chain_start" and name in ("researcher", "writer", "notifier", "supervisor"):
dashboard.agent_started(name)
active_agent = name
# Stream tokens from the LLM — filter to researcher and writer
elif kind == "on_chat_model_stream":
if active_agent in ("researcher", "writer"):
chunk = event["data"]["chunk"].content
if chunk:
dashboard.agent_token(chunk)
# Tool calls (if any are used in future)
elif kind == "on_tool_start":
tool_name = name
tool_input = str(event["data"].get("input", ""))
dashboard.tool_called(tool_name, tool_input)
# Agent node completed
elif kind == "on_chain_end" and name in ("researcher", "writer", "notifier"):
dashboard.agent_done(name)
dashboard.summary()
# -------------------------------------------------------
# Run it
# -------------------------------------------------------
story_task = "Stripe announced native stablecoin payment support for USD Coin (USDC) and Tether (USDT), targeting cross-border B2B payments. The feature launches first in 25 countries with settlement in under 2 minutes."
asyncio.run(run_with_dashboard(story_task))Run it:
python 27_live_dashboard.pyYou will see each agent's name appear as it starts, tokens stream in real time during researcher and writer phases, and a summary of total time and tokens when the pipeline completes.
Why
streaming=Trueon the LLM constructor? Withoutstreaming=True, evenastream_events()buffers the model's full response before emitting theon_chat_model_streamevents — you get a burst all at once rather than token-by-token. Setstreaming=Trueto get genuine per-token events. Note: this only affects the LLM; tool calls and node transitions are always real-time.
What is the performance cost of
astream_events()? Negligible. The events are generated by the LangChain runtime itself, not by extra model calls. The total number of tokens consumed is identical toainvoke(). The only overhead is the Python asyncio event dispatch loop, which adds microseconds. Use it freely.
Tip — filter events by
tagsfor granular control: Addtags=["researcher"]to a chain's config:llm.with_config({"tags": ["researcher"]}). Then filter events by"researcher" in event["tags"]. This is cleaner than name matching when you have many nested chains.
Async vs Sync — When to Use Which
| Scenario | Use |
|---|---|
| CLI scripts, one-off tasks | asyncio.run(run_with_dashboard(...)) |
| FastAPI endpoints | async def endpoint() + async for event in ... |
| Streamlit apps | asyncio.run() in a non-async context |
| Jupyter notebooks | await run_with_dashboard(...) directly |
| LangServe / LangGraph Platform | Built-in SSE streaming — no manual event loop |
Why does this post use async when earlier parts were synchronous?
astream_events()only exists as an async API. The underlying reason is that streaming from multiple concurrent agents requires cooperative multitasking — one agent yields control while waiting for a token, another runs. Sync Python cannot do this; async can. Once you move to multi-agent streaming, async is the natural choice.
What Comes Next
The newsroom now gives real-time feedback. Editors see exactly which agent is working and what it is producing, without staring at a dead spinner.
The final capability gap: long-horizon tasks. A major investigative story takes days — gather sources on Monday, verify facts on Tuesday, draft on Wednesday. No single session handles this. In the next post, we build agents that plan, save their work to files, and resume across sessions.
Continue to Part 10: Deep Agents — Planning, Files & Long-Horizon Tasks →
FAQs
Q: How does astream_events() differ from llm.stream()?
A: llm.stream() only yields token chunks from a single chat model call. astream_events() is an execution-aware async stream that captures events across the entire graph call stack, including node transitions, tool starts/ends, and nested LLM runs.
Q: Why is version="v2" required when calling astream_events()?
A: Specifying version="v2" ensures you use the current stable event wire schema definition in LangChain. Omitting this parameter defaults the call to the deprecated v1 schema, which prints deprecation warnings and can break parsing.
Q: Does streaming add extra token costs or overhead to LLM requests?
A: No. Streaming events are generated internally by the LangChain framework wrapper and do not trigger extra LLM API requests. The token cost is identical to a standard ainvoke() call, with minimal async context-switching latency.