Back to Blog
This post is Part 9 of 10 in the series: LangChain v1.x Core Series

Event Streaming & Real-Time Agent UX — Part 9

Build real-time streaming agent interfaces in LangChain using astream_events(). Learn to differentiate token streams, tool call events, and node transitions — then render them in a live dashboard for multi-agent workflows.

Share Editorial
Event Streaming & Real-Time Agent UX — Part 9

TL;DR: Static loading spinners lead to poor user experiences. This post demonstrates how to use astream_events() to capture granular lifecycle events and build a real-time console dashboard for multi-agent graphs.

The Dead Spinner Problem

In Part 1 we covered basic streaming with llm.stream(). That gives you tokens as they arrive. It works for a single model call.

But the Tech News Daily newsroom from Part 6 is a multi-agent graph. When a user submits an article, up to four things happen in sequence: the supervisor decides, the researcher runs, the writer composes, the notifier formats. With basic streaming, you see nothing until the very last agent finishes. Users stare at a blank screen for 15–30 seconds thinking the app has frozen.

astream_events() solves this. Instead of streaming final tokens, it streams events — typed lifecycle signals that fire at every meaningful moment:

Event typeWhen it fires
on_chain_startA node or chain starts executing
on_chat_model_startThe LLM begins processing
on_chat_model_streamA single token arrives from the model
on_chat_model_endThe LLM finishes its response
on_tool_startA tool call begins
on_tool_endA tool call completes
on_chain_endA node or chain finishes

You subscribe to the events you care about and build whatever UX you need — a spinner, a live transcript, a progress bar, a dashboard.

Why is this different from llm.stream()? llm.stream() gives you chunks of the final answer. It has no concept of tools, agents, or graph nodes. astream_events() is aware of the entire execution stack — it tells you which component produced each event, not just what the text content is. For multi-agent systems, this distinction is everything.


Setup

bash
source langchain-env/bin/activate
pip install langchain-google-genai langchain-core langgraph python-dotenv

astream_events() is async — all examples in this post use asyncio.


Part 1: Understanding the Event Stream

Before building a dashboard, understand the raw event format.

python
# create a file: 26_event_streaming.py
import asyncio
import os
from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_google_genai import ChatGoogleGenerativeAI

load_dotenv()
llm = ChatGoogleGenerativeAI(model="gemini-3.5-flash", temperature=0)

# -------------------------------------------------------
# A simple single-chain example to understand event types
# before adding the complexity of a full agent graph
# -------------------------------------------------------
headline_chain = (
    ChatPromptTemplate.from_messages([
        ("system", "You are a news headline writer for Tech News Daily."),
        ("human", "Write 3 headline options for this story: {story}")
    ])
    | llm.with_config({"run_name": "headline_llm"})  # name shows up in events
    | StrOutputParser()
)

async def inspect_events():
    """Print every event with its type and relevant data."""
    
    story = "Cloudflare reported a 99.99% uptime SLA breach affecting European customers for 47 minutes on June 13, 2026. The cause was a BGP routing misconfiguration."
    
    print("=== Raw Event Stream ===\n")
    
    async for event in headline_chain.astream_events(
        {"story": story},
        version="v2"          # always specify version — v2 is current stable
    ):
        kind = event["event"]
        name = event.get("name", "unknown")
        
        # Skip verbose start/end events for this inspection
        if kind == "on_chat_model_stream":
            chunk = event["data"]["chunk"].content
            if chunk:  # filter empty chunks
                print(f"  TOKEN: '{chunk}'", end="", flush=True)
        
        elif kind == "on_tool_start":
            print(f"\n  TOOL START → {event['data']['input']}")
        
        elif kind == "on_tool_end":
            print(f"  TOOL END  → output length: {len(str(event['data']['output']))}")
        
        elif kind in ("on_chain_start", "on_chain_end"):
            print(f"\n  {kind.upper()}: {name}")
        
        elif kind == "on_chat_model_end":
            print(f"\n\n  MODEL DONE — total tokens used: {event['data']['output'].usage_metadata}")


asyncio.run(inspect_events())

Run it:

bash
python 26_event_streaming.py

You will see the chain events fire in sequence: chain starts, model starts, tokens stream in real time, model ends, chain ends. Now you have the raw material to build any UX on top of.

Why version="v2" and not "v1" or "v3"? v2 is the current stable version of astream_events. v1 is deprecated. v3 is in preview and the event schema may still change. Always specify the version explicitly — without it, LangChain defaults to v1 and emits a deprecation warning that will break your logs.

Tip — filter events by run_name: In a multi-agent graph, dozens of events fire. Use .with_config({"run_name": "my_node"}) on any chain, LLM, or tool, then filter events by event["name"] == "my_node" in your handler. Without naming, all events arrive with generic identifiers and it becomes impossible to know which agent produced which token.


Part 2: Live Terminal Dashboard for the Newsroom

Now build a real dashboard. This streams the multi-agent newsroom graph from Part 6, showing each agent's status in real time.

python
# create a file: 27_live_dashboard.py
import asyncio
import os
import sys
import time
from dotenv import load_dotenv
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_google_genai import ChatGoogleGenerativeAI

load_dotenv()
llm = ChatGoogleGenerativeAI(model="gemini-3.5-flash", temperature=0, streaming=True)

# -------------------------------------------------------
# Newsroom state (same shape as Part 6)
# -------------------------------------------------------
class NewsroomState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    task: str
    research_findings: str
    draft_content: str
    next_agent: str


# -------------------------------------------------------
# Dashboard renderer — prints status without clearing the screen
# Compatible with any terminal (no ANSI tricks needed)
# -------------------------------------------------------
class NewsDashboard:
    def __init__(self):
        self.current_agent = None
        self.token_count = 0
        self.start_time = time.time()
        self.tool_calls = []
    
    def agent_started(self, agent_name: str):
        elapsed = time.time() - self.start_time
        print(f"\n[{elapsed:.1f}s] ▶ {agent_name.upper()} started", flush=True)
        self.current_agent = agent_name
    
    def agent_token(self, token: str):
        print(token, end="", flush=True)
        self.token_count += 1
    
    def agent_done(self, agent_name: str):
        elapsed = time.time() - self.start_time
        print(f"\n[{elapsed:.1f}s] ✓ {agent_name.upper()} done", flush=True)
    
    def tool_called(self, tool_name: str, input_summary: str):
        elapsed = time.time() - self.start_time
        print(f"\n[{elapsed:.1f}s] 🔧 Tool: {tool_name} | Input: {input_summary[:60]}", flush=True)
        self.tool_calls.append(tool_name)
    
    def summary(self):
        elapsed = time.time() - self.start_time
        print(f"\n{'='*60}")
        print(f"Pipeline complete in {elapsed:.1f}s")
        print(f"Total tokens streamed: {self.token_count}")
        print(f"Tool calls: {self.tool_calls or 'none'}")


dashboard = NewsDashboard()


# -------------------------------------------------------
# Specialist agents — each named for event filtering
# -------------------------------------------------------
def make_researcher(llm):
    from langchain_core.prompts import ChatPromptTemplate
    return (
        ChatPromptTemplate.from_messages([
            ("system", "You are the Tech News Daily researcher. Gather key facts."),
            ("human", "Research task: {task}\nProvide 4 numbered findings.")
        ])
        | llm.with_config({"run_name": "researcher_llm"})
    )

def make_writer(llm):
    from langchain_core.prompts import ChatPromptTemplate
    return (
        ChatPromptTemplate.from_messages([
            ("system", "You are the Tech News Daily writer. Write sharp, factual summaries."),
            ("human", "Task: {task}\n\nFindings:\n{findings}\n\nWrite a 2-paragraph summary.")
        ])
        | llm.with_config({"run_name": "writer_llm"})
    )

researcher_chain = make_researcher(llm)
writer_chain = make_writer(llm)


# -------------------------------------------------------
# Graph nodes — each is also named for event filtering
# -------------------------------------------------------
async def researcher_node(state: NewsroomState) -> dict:
    response = await researcher_chain.ainvoke(
        {"task": state["task"]},
        config={"run_name": "researcher_node"}
    )
    return {
        "research_findings": response.content,
        "messages": [AIMessage(content=response.content, name="researcher")]
    }

async def writer_node(state: NewsroomState) -> dict:
    response = await writer_chain.ainvoke(
        {"task": state["task"], "findings": state["research_findings"]},
        config={"run_name": "writer_node"}
    )
    return {
        "draft_content": response.content,
        "messages": [AIMessage(content=response.content, name="writer")]
    }

async def notifier_node(state: NewsroomState) -> dict:
    content = f"📰 PUBLISHED: {state['draft_content'][:200]}..."
    return {
        "messages": [AIMessage(content=content, name="notifier")]
    }

def supervisor_node(state: NewsroomState) -> dict:
    research_done = bool(state.get("research_findings", "").strip())
    writing_done = bool(state.get("draft_content", "").strip())
    notifier_ran = any(getattr(m, "name", "") == "notifier" for m in state.get("messages", []))
    
    if not research_done:
        return {"next_agent": "researcher"}
    elif not writing_done:
        return {"next_agent": "writer"}
    elif not notifier_ran:
        return {"next_agent": "notifier"}
    else:
        return {"next_agent": "FINISH"}

def route_to_agent(state: NewsroomState) -> str:
    n = state.get("next_agent", "FINISH")
    return END if n == "FINISH" else n


# Build the graph
graph = StateGraph(NewsroomState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_node("notifier", notifier_node)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_to_agent,
    {"researcher": "researcher", "writer": "writer", "notifier": "notifier", END: END})
graph.add_edge("researcher", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("notifier", "supervisor")
newsroom_graph = graph.compile()


# -------------------------------------------------------
# The streaming runner — the key logic is here
# -------------------------------------------------------
async def run_with_dashboard(task: str):
    print(f"\n{'='*60}")
    print(f"Tech News Daily — Processing: {task[:70]}")
    print(f"{'='*60}")
    
    initial_state = {
        "task": task,
        "messages": [],
        "research_findings": "",
        "draft_content": "",
        "next_agent": "",
    }
    
    # Track which agent is "active" for event attribution
    active_agent = None
    
    async for event in newsroom_graph.astream_events(
        initial_state,
        version="v2",
        config={"recursion_limit": 20}
    ):
        kind = event["event"]
        name = event.get("name", "")
        
        # Detect which agent node is starting
        if kind == "on_chain_start" and name in ("researcher", "writer", "notifier", "supervisor"):
            dashboard.agent_started(name)
            active_agent = name
        
        # Stream tokens from the LLM — filter to researcher and writer
        elif kind == "on_chat_model_stream":
            if active_agent in ("researcher", "writer"):
                chunk = event["data"]["chunk"].content
                if chunk:
                    dashboard.agent_token(chunk)
        
        # Tool calls (if any are used in future)
        elif kind == "on_tool_start":
            tool_name = name
            tool_input = str(event["data"].get("input", ""))
            dashboard.tool_called(tool_name, tool_input)
        
        # Agent node completed
        elif kind == "on_chain_end" and name in ("researcher", "writer", "notifier"):
            dashboard.agent_done(name)
    
    dashboard.summary()


# -------------------------------------------------------
# Run it
# -------------------------------------------------------
story_task = "Stripe announced native stablecoin payment support for USD Coin (USDC) and Tether (USDT), targeting cross-border B2B payments. The feature launches first in 25 countries with settlement in under 2 minutes."

asyncio.run(run_with_dashboard(story_task))

Run it:

bash
python 27_live_dashboard.py

You will see each agent's name appear as it starts, tokens stream in real time during researcher and writer phases, and a summary of total time and tokens when the pipeline completes.

Why streaming=True on the LLM constructor? Without streaming=True, even astream_events() buffers the model's full response before emitting the on_chat_model_stream events — you get a burst all at once rather than token-by-token. Set streaming=True to get genuine per-token events. Note: this only affects the LLM; tool calls and node transitions are always real-time.

What is the performance cost of astream_events()? Negligible. The events are generated by the LangChain runtime itself, not by extra model calls. The total number of tokens consumed is identical to ainvoke(). The only overhead is the Python asyncio event dispatch loop, which adds microseconds. Use it freely.

Tip — filter events by tags for granular control: Add tags=["researcher"] to a chain's config: llm.with_config({"tags": ["researcher"]}). Then filter events by "researcher" in event["tags"]. This is cleaner than name matching when you have many nested chains.


Async vs Sync — When to Use Which

ScenarioUse
CLI scripts, one-off tasksasyncio.run(run_with_dashboard(...))
FastAPI endpointsasync def endpoint() + async for event in ...
Streamlit appsasyncio.run() in a non-async context
Jupyter notebooksawait run_with_dashboard(...) directly
LangServe / LangGraph PlatformBuilt-in SSE streaming — no manual event loop

Why does this post use async when earlier parts were synchronous? astream_events() only exists as an async API. The underlying reason is that streaming from multiple concurrent agents requires cooperative multitasking — one agent yields control while waiting for a token, another runs. Sync Python cannot do this; async can. Once you move to multi-agent streaming, async is the natural choice.


What Comes Next

The newsroom now gives real-time feedback. Editors see exactly which agent is working and what it is producing, without staring at a dead spinner.

The final capability gap: long-horizon tasks. A major investigative story takes days — gather sources on Monday, verify facts on Tuesday, draft on Wednesday. No single session handles this. In the next post, we build agents that plan, save their work to files, and resume across sessions.

Continue to Part 10: Deep Agents — Planning, Files & Long-Horizon Tasks →


FAQs

Q: How does astream_events() differ from llm.stream()?
A: llm.stream() only yields token chunks from a single chat model call. astream_events() is an execution-aware async stream that captures events across the entire graph call stack, including node transitions, tool starts/ends, and nested LLM runs.

Q: Why is version="v2" required when calling astream_events()?
A: Specifying version="v2" ensures you use the current stable event wire schema definition in LangChain. Omitting this parameter defaults the call to the deprecated v1 schema, which prints deprecation warnings and can break parsing.

Q: Does streaming add extra token costs or overhead to LLM requests?
A: No. Streaming events are generated internally by the LangChain framework wrapper and do not trigger extra LLM API requests. The token cost is identical to a standard ainvoke() call, with minimal async context-switching latency.