Deep Agents: Planning, Context Files & Long-Horizon Tasks — Part 10
Build production-ready long-horizon planning agents in LangGraph that write structured task plans to files, persist state across sessions, spawn subagent children, and resume exactly where they left off after crashes.

TL;DR: Long-horizon tasks like multi-day investigations cannot run in a single chat session. This post covers deep agents: planning complex sub-tasks, offloading context to files, and resuming exactly where they left off.
When Sessions Aren't Enough
Every agent we have built across Parts 1–9 assumes the task fits in one session. The user asks, the agent responds, the conversation ends. That covers 95% of agentic tasks.
The remaining 5% are the hardest and most valuable: investigative research, due diligence, competitive analysis, multi-day report generation. These tasks require:
- Planning — decompose a complex goal into ordered sub-tasks before starting
- Persistence — save intermediate results somewhere that survives a session end
- Resumability — pick up exactly where you left off, without re-running completed steps
- Context offloading — avoid packing large intermediate results into the LLM's context window
These properties define what the LangChain ecosystem calls a Deep Agent. The pattern is architectural — not a specific library or API — and it runs entirely with the tools you already have.
How is this different from Part 6's multi-agent supervisor? The Part 6 supervisor runs everything in a single graph invocation. If you interrupt it (Ctrl+C, server restart, network failure), the work is lost. A deep agent externalises its state: the plan is a file, each step's output is a file, and the checkpoint tells the graph which steps are done. You can interrupt and resume with zero re-work.
Setup
source langchain-env/bin/activate
pip install langgraph langchain-google-genai langchain-core pydantic python-dotenvWe will write files to a local ./newsroom_workspace/ directory. No external services needed.
Part 1: The Planner-Executor Pattern
The critical insight: planning and executing are different cognitive modes. An agent that tries to do both simultaneously produces worse plans and worse execution than one that separates them.
The planner runs once and produces a structured task list. The executor processes the list step by step, never needing to re-plan.
# create a file: 28_planner_executor.py
import os
import json
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from typing import List
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
load_dotenv()
llm = ChatGoogleGenerativeAI(model="gemini-3.5-flash", temperature=0)
# -------------------------------------------------------
# Step 1: Define the plan structure (structured output from Part 7)
# -------------------------------------------------------
class ResearchTask(BaseModel):
"""A single step in the research plan."""
task_id: str = Field(description="Unique ID like 'task_001'")
title: str = Field(description="Short description of what this step does")
instructions: str = Field(description="Specific instructions for the executor")
depends_on: List[str] = Field(
default=[],
description="List of task_ids that must complete before this step"
)
estimated_minutes: int = Field(description="Estimated time in minutes")
class ResearchPlan(BaseModel):
"""A structured plan for a multi-step investigative research project."""
story_title: str
thesis: str = Field(description="The core angle/thesis of the investigation")
tasks: List[ResearchTask]
total_estimated_hours: float
# -------------------------------------------------------
# Step 2: The Planner — produces the structured plan
# -------------------------------------------------------
planner_chain = (
ChatPromptTemplate.from_messages([
("system", """You are the investigative desk editor at Tech News Daily.
Your job is to create research plans for investigative stories.
Each plan must:
- Break the investigation into 4-6 distinct, sequential tasks
- Each task should be completable independently by a researcher
- Include clear instructions for each step
- Estimate realistic time per step"""),
("human", """Create a research plan for this investigative story:
Story brief: {story_brief}
The plan will be executed over multiple sessions by an AI research team.
Each task's output will be saved to a file for use by subsequent tasks.""")
])
| llm.with_structured_output(ResearchPlan)
)
# -------------------------------------------------------
# Step 3: Generate a plan for a real Tech News Daily story
# -------------------------------------------------------
story_brief = """
Investigation: "The Hidden Cost of AI Infrastructure — How Hyperscaler GPU Buildouts Are
Straining Power Grids Across Three Continents"
We want to investigate whether the rapid buildout of AI data centres by AWS, Google,
Microsoft, and Meta is causing measurable strain on local power infrastructure,
water usage, and real estate markets in key regions (Virginia, Iowa, Dublin, Singapore).
"""
print("Generating research plan...\n")
plan: ResearchPlan = planner_chain.invoke({"story_brief": story_brief})
print(f"Story: {plan.story_title}")
print(f"Thesis: {plan.thesis}")
print(f"Estimated total: {plan.total_estimated_hours}h")
print(f"\nTasks ({len(plan.tasks)}):")
for task in plan.tasks:
deps = f" [after: {', '.join(task.depends_on)}]" if task.depends_on else ""
print(f" {task.task_id}: {task.title} (~{task.estimated_minutes}min){deps}")
# -------------------------------------------------------
# Step 4: Save the plan to a file — this is the handoff to the executor
# -------------------------------------------------------
os.makedirs("./newsroom_workspace", exist_ok=True)
plan_path = "./newsroom_workspace/investigation_plan.json"
with open(plan_path, "w") as f:
json.dump(plan.model_dump(), f, indent=2)
print(f"\n✓ Plan saved to: {plan_path}")
print(" The executor will read this file and process tasks in order.")Run it:
python 28_planner_executor.pyInspect ./newsroom_workspace/investigation_plan.json — this is your agent's durable, shareable, version-controllable task list.
Why save the plan as JSON and not keep it in LangGraph state? LangGraph state (even with
MemorySaver) is process-local. If your machine restarts, the state is gone. A JSON file on disk persists across restarts, deployments, and even machine migrations. For long-horizon tasks (days, not minutes), file persistence is the right choice. Think of the plan file as your agent's equivalent of a project management board — it exists independently of any running process.
Why separate
depends_onin the plan? Some research tasks can run in parallel (e.g., researching Virginia's power grid while simultaneously researching Dublin's).depends_onmakes those dependencies explicit. An executor can use this to parallelise independent tasks usingasyncio.gather(). Even if you run tasks sequentially now, the dependency map makes it safe to parallelise later without rewriting the planner.
Part 2: Context Files — Saving Large Results to Disk
The executor runs each task and saves its findings to a dedicated file. This is context offloading: instead of accumulating all findings in the LLM's conversation window, each step writes to disk and the next step reads only what it needs.
# create a file: 29_context_files.py
import os
import json
from datetime import datetime
from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
load_dotenv()
llm = ChatGoogleGenerativeAI(model="gemini-3.5-flash", temperature=0)
WORKSPACE = "./newsroom_workspace"
os.makedirs(WORKSPACE, exist_ok=True)
# -------------------------------------------------------
# File-based task executor
# Each task: reads dependencies from files → runs LLM → writes output to file
# -------------------------------------------------------
def get_task_output_path(task_id: str) -> str:
return os.path.join(WORKSPACE, f"{task_id}_output.md")
def task_is_complete(task_id: str) -> bool:
"""Check if a task has already been completed (output file exists)."""
return os.path.exists(get_task_output_path(task_id))
def load_dependency_context(depends_on: list[str]) -> str:
"""Load the output of completed dependency tasks as context."""
context_parts = []
for dep_id in depends_on:
path = get_task_output_path(dep_id)
if os.path.exists(path):
with open(path) as f:
content = f.read()
context_parts.append(f"=== Output from {dep_id} ===\n{content}")
else:
context_parts.append(f"=== {dep_id}: NOT YET COMPLETE ===")
return "\n\n".join(context_parts) if context_parts else "No prior context."
def execute_task(task: dict) -> str:
"""
Execute a single research task:
1. Load dependency context from files
2. Run the LLM with task instructions + dependency context
3. Save output to a file
4. Return the output
"""
task_id = task["task_id"]
# Skip if already done (resumability)
if task_is_complete(task_id):
output_path = get_task_output_path(task_id)
with open(output_path) as f:
existing = f.read()
print(f" ⏭ {task_id} already complete — skipping")
return existing
# Load context from dependencies (key: only load what's needed)
dep_context = load_dependency_context(task.get("depends_on", []))
# Build the prompt with only the relevant context
# This is context isolation in action: each task sees only its dependencies,
# not the entire conversation history
research_prompt = ChatPromptTemplate.from_messages([
("system", """You are a senior investigative researcher at Tech News Daily.
Execute the assigned research task thoroughly and factually.
If you cannot verify a specific claim, mark it [NEEDS VERIFICATION].
Structure your output clearly with headings and bullet points."""),
("human", """Research Task: {task_title}
Specific instructions:
{instructions}
Context from prior research steps:
{dependency_context}
Produce a detailed research output. This will be saved to a file and used by subsequent tasks.""")
])
chain = research_prompt | llm
print(f" ▶ Executing {task_id}: {task['title']}")
response = chain.invoke({
"task_title": task["title"],
"instructions": task["instructions"],
"dependency_context": dep_context
})
output = response.content
# Save to file with metadata header
output_path = get_task_output_path(task_id)
with open(output_path, "w") as f:
f.write(f"# {task['title']}\n")
f.write(f"Task ID: {task_id}\n")
f.write(f"Completed: {datetime.utcnow().isoformat()}Z\n")
f.write(f"---\n\n")
f.write(output)
print(f" ✓ {task_id} complete → saved to {output_path}")
return output
# -------------------------------------------------------
# Load the plan and execute the first two tasks
# (demonstrating partial execution + resumability)
# -------------------------------------------------------
plan_path = os.path.join(WORKSPACE, "investigation_plan.json")
if not os.path.exists(plan_path):
print("Run 28_planner_executor.py first to generate the plan.")
exit(1)
with open(plan_path) as f:
plan = json.load(f)
print(f"\nInvestigation: {plan['story_title']}")
print(f"Tasks to execute: {len(plan['tasks'])}\n")
# Execute tasks in order, respecting dependencies
# (For parallel tasks, you would check depends_on and use asyncio.gather)
for task in plan["tasks"][:2]: # execute first 2 tasks as a demo
print(f"\nTask: {task['task_id']} — {task['title']}")
execute_task(task)
print(f"\n\nWorkspace contents:")
for f in os.listdir(WORKSPACE):
size = os.path.getsize(os.path.join(WORKSPACE, f))
print(f" {f} ({size} bytes)")Run it:
python 29_context_files.pyRun it a second time — the completed tasks are skipped instantly. This is the core of resumability.
Why write output as Markdown files instead of storing in LangGraph state? Three reasons: (1) Files persist across process restarts. (2) Files can be opened by humans to inspect intermediate results — invaluable for debugging a long investigation. (3) Files decouple task outputs from the agent's active context window. If task_003 produces 10,000 words of research, those 10,000 words never enter the LLM's context unless a later task explicitly reads that file. Each task only loads the context it actually needs.
Tip — add a checksum to each output file: In production, write a
md5hash of the output alongside the file. Before using a dependency's output, verify its checksum. This detects accidental file corruption or manual edits that could cause incorrect downstream results.
Part 3: Resumable Long-Horizon Tasks with LangGraph
Now wire the planner and executor into a LangGraph graph that can pause, persist state, and resume across sessions.
# create a file: 30_resumable_tasks.py
import os
import json
import asyncio
from dotenv import load_dotenv
from typing import Annotated, List
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
load_dotenv()
llm = ChatGoogleGenerativeAI(model="gemini-3.5-flash", temperature=0)
WORKSPACE = "./newsroom_workspace"
# -------------------------------------------------------
# State for the long-horizon investigation graph
# -------------------------------------------------------
class InvestigationState(TypedDict):
story_brief: str
plan: dict # the full plan dict
completed_tasks: List[str] # task_ids that are done
current_task_id: str # which task is running
final_report: str # assembled at the end
status: str # "planning" | "executing" | "complete"
# -------------------------------------------------------
# Node: Load or generate the plan
# -------------------------------------------------------
def planning_node(state: InvestigationState) -> dict:
plan_path = os.path.join(WORKSPACE, "investigation_plan.json")
if os.path.exists(plan_path):
print("\n[Planning] Existing plan found — loading")
with open(plan_path) as f:
plan = json.load(f)
else:
print("\n[Planning] Generating new plan...")
# In production: call the planner_chain from Part 1
# Here we create a minimal plan for demonstration
plan = {
"story_title": "AI Infrastructure Power Grid Impact",
"thesis": "Hyperscaler AI buildouts are straining power infrastructure",
"tasks": [
{
"task_id": "task_001",
"title": "Power consumption baseline",
"instructions": "Research current power consumption of major AI data centres. Focus on AWS us-east-1, Google Council Bluffs Iowa, and Microsoft Dublin.",
"depends_on": [],
"estimated_minutes": 30
},
{
"task_id": "task_002",
"title": "Grid impact analysis",
"instructions": "Based on power consumption data, analyse reported grid strain incidents. Look for utility company statements, regulatory filings, and news reports from 2024-2026.",
"depends_on": ["task_001"],
"estimated_minutes": 45
},
{
"task_id": "task_003",
"title": "Executive summary",
"instructions": "Synthesise all research into a 500-word executive summary suitable for a Tech News Daily front-page investigation.",
"depends_on": ["task_001", "task_002"],
"estimated_minutes": 20
}
],
"total_estimated_hours": 1.58
}
os.makedirs(WORKSPACE, exist_ok=True)
with open(plan_path, "w") as f:
json.dump(plan, f, indent=2)
return {"plan": plan, "status": "executing", "completed_tasks": []}
# -------------------------------------------------------
# Node: Execute the next pending task
# -------------------------------------------------------
def executor_node(state: InvestigationState) -> dict:
plan = state["plan"]
completed = set(state.get("completed_tasks", []))
# Find the next task whose dependencies are all satisfied
next_task = None
for task in plan["tasks"]:
if task["task_id"] in completed:
continue
deps_met = all(d in completed for d in task.get("depends_on", []))
if deps_met:
next_task = task
break
if next_task is None:
print("\n[Executor] All tasks complete")
return {"status": "assembling"}
task_id = next_task["task_id"]
task_output_path = os.path.join(WORKSPACE, f"{task_id}_output.md")
# Resumability: skip if already done
if os.path.exists(task_output_path):
print(f"\n[Executor] {task_id} already complete — skipping")
new_completed = list(completed) + [task_id]
return {
"completed_tasks": new_completed,
"current_task_id": task_id
}
# Load dependency context (context isolation — only load what's needed)
dep_context = ""
for dep_id in next_task.get("depends_on", []):
dep_path = os.path.join(WORKSPACE, f"{dep_id}_output.md")
if os.path.exists(dep_path):
with open(dep_path) as f:
dep_context += f"\n\n=== {dep_id} findings ===\n{f.read()}"
# Execute
print(f"\n[Executor] Running {task_id}: {next_task['title']}")
research_chain = (
ChatPromptTemplate.from_messages([
("system", "You are an investigative researcher at Tech News Daily. Be thorough and factual. Mark unverified claims as [NEEDS VERIFICATION]."),
("human", "Task: {title}\n\nInstructions: {instructions}\n\nPrior context:\n{context}")
])
| llm
)
response = research_chain.invoke({
"title": next_task["title"],
"instructions": next_task["instructions"],
"context": dep_context or "No prior context — this is the first task."
})
# Save to file
from datetime import datetime
with open(task_output_path, "w") as f:
f.write(f"# {next_task['title']}\nCompleted: {datetime.utcnow().isoformat()}Z\n---\n\n")
f.write(response.content)
print(f"[Executor] ✓ {task_id} saved to {task_output_path}")
new_completed = list(completed) + [task_id]
return {
"completed_tasks": new_completed,
"current_task_id": task_id
}
# -------------------------------------------------------
# Node: Assemble the final report from all task outputs
# -------------------------------------------------------
def assembler_node(state: InvestigationState) -> dict:
print("\n[Assembler] Compiling final report...")
plan = state["plan"]
all_findings = ""
for task in plan["tasks"]:
path = os.path.join(WORKSPACE, f"{task['task_id']}_output.md")
if os.path.exists(path):
with open(path) as f:
all_findings += f"\n\n{f.read()}"
report_chain = (
ChatPromptTemplate.from_messages([
("system", "You are the editor-in-chief of Tech News Daily."),
("human", f"Story thesis: {plan['thesis']}\n\nResearch findings:\n{all_findings}\n\nWrite a 400-word final investigative article suitable for publication.")
])
| llm
)
report = report_chain.invoke({})
report_path = os.path.join(WORKSPACE, "final_report.md")
with open(report_path, "w") as f:
f.write(f"# {plan['story_title']}\n\n")
f.write(report.content)
print(f"[Assembler] ✓ Final report saved to {report_path}")
return {"final_report": report.content, "status": "complete"}
# -------------------------------------------------------
# Routing functions
# -------------------------------------------------------
def route_after_executor(state: InvestigationState) -> str:
if state.get("status") == "assembling":
return "assembler"
plan = state["plan"]
completed = set(state.get("completed_tasks", []))
all_done = all(t["task_id"] in completed for t in plan["tasks"])
return "assembler" if all_done else "executor"
# -------------------------------------------------------
# Build the graph
# -------------------------------------------------------
investigation = StateGraph(InvestigationState)
investigation.add_node("planner", planning_node)
investigation.add_node("executor", executor_node)
investigation.add_node("assembler", assembler_node)
investigation.add_edge(START, "planner")
investigation.add_edge("planner", "executor")
investigation.add_conditional_edges("executor", route_after_executor,
{"executor": "executor", "assembler": "assembler"})
investigation.add_edge("assembler", END)
# MemorySaver stores the graph state — in production use SqliteSaver
checkpointer = MemorySaver()
investigation_graph = investigation.compile(checkpointer=checkpointer)
# -------------------------------------------------------
# Session 1: Start the investigation (may be interrupted)
# -------------------------------------------------------
thread_config = {"configurable": {"thread_id": "ai-power-investigation-v1"}}
initial_state = {
"story_brief": "Investigate the impact of AI data centre buildouts on power grids",
"plan": {},
"completed_tasks": [],
"current_task_id": "",
"final_report": "",
"status": "planning"
}
print("=" * 60)
print("Session 1: Starting investigation")
print("=" * 60)
final = investigation_graph.invoke(initial_state, thread_config, {"recursion_limit": 30})
print(f"\n{'=' * 60}")
print("Investigation complete!")
print(f"Status: {final['status']}")
print(f"Tasks completed: {final['completed_tasks']}")
print(f"\nFinal report preview:")
print(final["final_report"][:500] + "...")
# -------------------------------------------------------
# Session 2: Demonstrate resumability
# Run again — all tasks are skipped because their output files exist
# -------------------------------------------------------
print(f"\n{'=' * 60}")
print("Session 2: Resuming (simulating restart)")
print("=" * 60)
# Reset completed_tasks to force re-evaluation — graph will find the files and skip
resume_state = {**initial_state, "status": "planning"}
final2 = investigation_graph.invoke(resume_state, thread_config, {"recursion_limit": 30})
print(f"Resume complete. Tasks executed from scratch: 0 (all skipped via file check)")Run it:
python 30_resumable_tasks.pyRun it twice. The second run skips all completed tasks immediately and assembles the final report from the existing files.
Why check files for task completion instead of LangGraph's checkpointer? The checkpointer stores which nodes have executed. But if the checkpointer is in-memory (
MemorySaver) and the process restarts, the checkpoint is lost. File-based completion checks are truly durable. In production, use both:SqliteSaveras the checkpointer for graph state, and file existence as the task completion gate. They are complementary, not alternatives.
When should you use
SqliteSaverinstead ofMemorySaver? As soon as your task spans more than one process invocation.MemorySaveris lost on restart.SqliteSaverpersists to a local database file.PostgresSaverworks across multiple machines. For the newsroom investigation that runs Monday through Wednesday, use at minimumSqliteSaver. The code change is one line:from langgraph.checkpoint.sqlite import SqliteSaverandcheckpointer = SqliteSaver.from_conn_string("./investigation.db").
Series Wrap-Up: The Complete Stack
You have now built every layer of a production-grade LangChain system. Here is the full capability map across all 10 parts:
| Part | Topic | What you can build |
|---|---|---|
| Part 1 | Agents & ReAct | Tool-calling agents with streaming |
| Part 2 | LangGraph | Stateful graphs with memory and checkpointing |
| Part 3 | RAG | Private knowledge retrieval |
| Part 4 | MCP | Decentralised tool microservices |
| Part 5 | Production | Safety guardrails, failover, quality evaluation |
| Part 6 | Multi-Agent | Supervisor routing with shared state |
| Part 7 | Structured Output | Typed Pydantic objects from every agent |
| Part 8 | Context Engineering | Trim, summarise, and layer your context window |
| Part 9 | Event Streaming | Real-time dashboards with astream_events() |
| Part 10 (this post) | Deep Agents | Planning, file-based context, resumable tasks |
The code files from all ten posts (01_ through 30_) form a single coherent codebase. Each post's files build on the previous ones. Run any file independently to see its component work, or run them in order to build the full Tech News Daily investigative platform.
FAQs
Q: What defines a deep agent in LangChain and LangGraph?
A: A deep agent is an architectural pattern designed to handle long-horizon, high-context workloads. It is defined by its ability to generate an explicit execution plan, save intermediate results to persistent files or databases, and resume execution after process crashes or system restarts.
Q: How does context offloading prevent token bloat?
A: Instead of carrying the entire history of intermediate calculations and tool outputs in the chat memory, the agent writes detailed outputs to workspace files and only keeps light metadata or summaries in the LLM's active context.
Q: Why should I combine checkpointers like SqliteSaver with file-based checks?
A: SqliteSaver persists the LangGraph thread state. However, performing file-existence checks acts as a secondary, durable gate: even if the in-memory graph state is cleared, the agent can check the filesystem to verify which tasks are already complete and skip redundant work.