Going to Production: Reliability, Observability & Resumability
Moving DevPulse from a working prototype to a production system. Learn how to implement persistent SQLite checkpointing so agents survive crashes, add LangSmith distributed tracing to debug multi-agent pipelines, inject human-in-the-loop approval gates for high-risk actions, and build cost safeguards that prevent runaway API spend.

TL;DR: A prototype runs on your laptop with a fast internet connection, a fresh Python environment, and no interruptions. Production runs in a CI/CD pipeline at 2am, on a server with a flaky connection to the GitHub API, hitting rate limits on the 17th tool call of a 23-file review. This post is about the gap between those two realities. We add persistent checkpointing (resume from any crash), LangSmith distributed tracing (see exactly what happened inside every agent), human-in-the-loop approval gates (stop before destructive actions), and cost safeguards (never spend more than you intend to).
What Actually Breaks in Production
Let's be honest about the failure modes that prototype code doesn't handle.
Failure Mode 1: The Mid-Run Crash
DevPulse is in the middle of reviewing a 23-file PR. It has completed 16 of 23 file reviews, written findings to the workspace, and is now in the middle of calling the GitHub API to fetch the diff for file 17. The server restarts because of an unrelated OS patch.
With the current implementation:
The Python process dies. When DevPulse is restarted, it reads plan.json and sees that tasks for files 1-16 are "completed" and file 17-23 are "pending". It picks up from file 17. ✅
This is good — our workspace file pattern from Part 1 already handles basic resumability. But what about mid-graph failures in LangGraph? If the agent crashes inside a complex multi-step graph execution (not just between tasks), the state may be lost.
Failure Mode 2: The Black Box
A developer reports: "DevPulse posted a wrong finding on line 47 of tokens.py. It said we have a hardcoded secret, but that's actually a test fixture."
You need to debug this. What tool call did the agent make? What did the GitHub API actually return? What was the model's internal reasoning? What was in the message history at that point?
Without tracing, you cannot answer any of these questions. You are debugging a black box.
Failure Mode 3: The Destructive Action
DevPulse's Jira integration is ready. The agent can automatically file bug reports. But what happens when it gets confused and files 50 duplicate tickets, each with priority: CRITICAL, flooding your backlog?
Or worse — what if the agent is extended to auto-close PRs for critical security issues, and it closes the wrong one?
These are high-stakes, irreversible actions. They need a human approval gate.
Failure Mode 4: The Runaway Cost
An agent gets stuck in a loop. Maybe the model keeps calling the same tool expecting a different result (the classic "insanity" loop). Maybe a bug in the state update logic causes the agent to repeatedly re-review the same file.
Without cost safeguards, a 10-minute looping agent can easily rack up $50-100 in API costs. This has happened to teams building production agents.
Solution 1: Persistent Checkpointing with LangGraph
LangGraph's checkpointing system is designed for exactly this problem. A checkpointer automatically saves the complete graph state after every node execution. If the process crashes, it can resume from the last checkpoint.
Why LangGraph Checkpointing vs. Our File Workspace
Both patterns solve resumability, but at different levels:
| Layer | Our File Workspace | LangGraph Checkpointer |
|---|---|---|
| Granularity | Per-task (between files) | Per-node (within a file's review) |
| What's saved | Plan status, findings files | Complete graph state, messages, custom vars |
| Resume point | Start of next pending task | Exact node where crash occurred |
| Use case | Cross-run persistence | Mid-graph crash recovery |
For production DevPulse, we want both: the workspace for cross-run persistence, and LangGraph checkpointing for within-graph resilience.
# 09_production_checkpointing.py
import sqlite3
from typing import TypedDict, List, Annotated, Optional
from pathlib import Path
from datetime import datetime
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from dotenv import load_dotenv
import operator
load_dotenv()
# ---- Graph State ----
# Annotated[List[...], operator.add] is the LangGraph pattern for appended lists.
# Instead of replacing messages, new messages are APPENDED to the existing list.
# This is critical — without this annotation, a node returning {"messages": [...]}
# would REPLACE the entire message history, not add to it.
class DevPulseGraphState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
pr_number: int
current_task: Optional[dict]
completed_tasks: Annotated[List[str], operator.add]
failed_tasks: Annotated[List[str], operator.add]
total_tokens_used: int
max_token_budget: int
run_id: str
# ---- Graph Nodes ----
def initialize_run(state: DevPulseGraphState) -> DevPulseGraphState:
"""
Node 1: Initialize the review run.
Sets up the initial state and marks the run as started.
"""
print(f"\n🚀 [Node: initialize_run] Starting PR #{state['pr_number']} review")
print(f" Run ID: {state['run_id']}")
print(f" Token Budget: {state['max_token_budget']:,}")
return {
"messages": [SystemMessage(content=(
f"DevPulse review started for PR #{state['pr_number']} "
f"at {datetime.utcnow().isoformat()}Z"
))]
}
def fetch_pr_tasks(state: DevPulseGraphState) -> DevPulseGraphState:
"""
Node 2: Fetch the review tasks from the workspace plan.
In production, this reads from the workspace plan.json created in Part 1.
"""
# Mock task list for demonstration
tasks = [
{"id": "task_auth_login", "file_path": "src/auth/login.py", "review_type": "security", "priority": "critical"},
{"id": "task_auth_tokens", "file_path": "src/auth/tokens.py", "review_type": "security", "priority": "high"},
{"id": "task_db_repo", "file_path": "src/db/user_repository.py", "review_type": "performance", "priority": "high"},
]
print(f"📋 [Node: fetch_pr_tasks] Loaded {len(tasks)} tasks")
return {
"messages": [AIMessage(content=f"Loaded {len(tasks)} review tasks from workspace plan.")],
"current_task": tasks[0] if tasks else None
}
def execute_review(state: DevPulseGraphState) -> DevPulseGraphState:
"""
Node 3: Execute the current review task.
This is where the child agent (from Part 3) would be invoked.
The node updates both the message history and the completed_tasks list.
IMPORTANT: If this node crashes mid-execution, the checkpointer
has saved state at the END of the previous node. When we resume,
we resume from the START of this node — meaning the task is retried
from the beginning, not from the middle of the LLM call.
"""
task = state["current_task"]
if not task:
return {"messages": [AIMessage(content="No current task. Review may be complete.")]}
print(f"🔍 [Node: execute_review] Reviewing: {task['file_path']} ({task['review_type']})")
# Token budget check (cost safeguard)
if state["total_tokens_used"] >= state["max_token_budget"]:
print(f"💰 [Node: execute_review] Token budget exhausted. Stopping.")
return {
"messages": [AIMessage(content=f"⚠️ Token budget exhausted. Stopping review.")],
"failed_tasks": [task["id"]]
}
# Simulate the review (in production, call run_child_agent from Part 3)
simulated_tokens = 2500
simulated_finding = (
f"Reviewed {task['file_path']}: "
f"Found {'SQL injection vulnerability' if 'auth' in task['file_path'] else 'N+1 query pattern'}. "
f"Severity: {task['priority']}. Comment posted to PR."
)
print(f" ✅ Review complete. Findings: {simulated_finding[:80]}...")
return {
"messages": [AIMessage(content=simulated_finding)],
"completed_tasks": [task["id"]],
"total_tokens_used": state["total_tokens_used"] + simulated_tokens
}
def check_completion(state: DevPulseGraphState) -> str:
"""
Conditional edge: decide whether to continue reviewing or finish.
Returns the name of the next node to execute.
"""
# In production: check workspace plan for remaining pending tasks
total_tasks = 3 # Mock total from our task list
completed = len(state.get("completed_tasks", []))
failed = len(state.get("failed_tasks", []))
if completed + failed >= total_tasks:
print(f"\n✅ [Conditional] All tasks done. Completed: {completed}, Failed: {failed}")
return "aggregate"
if state["total_tokens_used"] >= state["max_token_budget"]:
print(f"\n💰 [Conditional] Budget exhausted. Moving to aggregate.")
return "aggregate"
print(f" [Conditional] {completed}/{total_tasks} done. Continuing...")
return "execute" # Loop back to execute next task
def aggregate_results(state: DevPulseGraphState) -> DevPulseGraphState:
"""
Node 4: Aggregate all findings and post the final review comment.
"""
completed = state.get("completed_tasks", [])
failed = state.get("failed_tasks", [])
tokens_used = state.get("total_tokens_used", 0)
summary = (
f"PR #{state['pr_number']} review complete. "
f"Tasks: {len(completed)} completed, {len(failed)} failed. "
f"Tokens used: {tokens_used:,}/{state['max_token_budget']:,}. "
f"[Mock] Final review comment posted to GitHub."
)
print(f"\n📊 [Node: aggregate_results] {summary}")
return {
"messages": [AIMessage(content=summary)]
}
# ---- Graph Construction ----
def build_devpulse_graph(db_path: str = "devpulse_checkpoints.db"):
"""
Build the DevPulse review graph with persistent SQLite checkpointing.
The graph structure:
START → initialize_run → fetch_pr_tasks → execute_review ↻ → aggregate_results → END
The ↻ indicates a conditional loop: execute_review can route back to itself
or forward to aggregate_results based on completion status.
Why SQLite for checkpointing?
- Zero-dependency persistence (SQLite is in Python's standard library)
- Works locally and on any server with a persistent filesystem
- For production Kubernetes deployments, switch to PostgresSaver to use
a managed database that survives pod restarts
"""
builder = StateGraph(DevPulseGraphState)
# Add all nodes
builder.add_node("initialize", initialize_run)
builder.add_node("fetch_tasks", fetch_pr_tasks)
builder.add_node("execute", execute_review)
builder.add_node("aggregate", aggregate_results)
# Static edges
builder.add_edge(START, "initialize")
builder.add_edge("initialize", "fetch_tasks")
builder.add_edge("fetch_tasks", "execute")
# Conditional loop edge — this is where the agent decides to continue or stop
builder.add_conditional_edges(
"execute",
check_completion,
{
"execute": "execute", # Loop: review next task
"aggregate": "aggregate" # Done: aggregate and post
}
)
builder.add_edge("aggregate", END)
# Attach the SQLite checkpointer
conn = sqlite3.connect(db_path, check_same_thread=False)
checkpointer = SqliteSaver(conn)
# compile() creates a runnable graph with checkpointing enabled
return builder.compile(checkpointer=checkpointer)
# ---- Execution with Resumability ----
def run_with_resumability(pr_number: int, run_id: str = None, budget: int = 50_000):
"""
Run the DevPulse graph with full resumability support.
The key is the `thread_id` in the config. Every run of the same PR
uses the same thread_id. LangGraph stores all checkpoints under this ID.
If the process crashes and you call this function again with the same
thread_id, LangGraph automatically resumes from the last checkpoint —
no code changes needed.
Args:
pr_number: The GitHub PR number to review
run_id: Unique identifier for this run (defaults to pr_{pr_number})
budget: Maximum tokens to use across all agent calls
"""
import uuid
if run_id is None:
run_id = f"pr_{pr_number}"
graph = build_devpulse_graph()
# The thread_id is the resumability key
# Same thread_id = same checkpoint stream = automatic resume on restart
config = {
"configurable": {
"thread_id": run_id
}
}
# Check if there's an existing checkpoint (resuming a previous run)
existing_state = graph.get_state(config)
if existing_state.values:
completed = existing_state.values.get("completed_tasks", [])
print(f"\n♻️ Resuming existing run '{run_id}'")
print(f" Previously completed tasks: {completed}")
# Resume from checkpoint (pass None as input — LangGraph uses checkpoint state)
for event in graph.stream(None, config=config):
print(f" Event: {list(event.keys())}")
else:
print(f"\n🆕 Starting new run '{run_id}'")
# Initial state for a new run
initial_state: DevPulseGraphState = {
"messages": [],
"pr_number": pr_number,
"current_task": None,
"completed_tasks": [],
"failed_tasks": [],
"total_tokens_used": 0,
"max_token_budget": budget,
"run_id": run_id
}
# Stream execution — each event is a node completion
for event in graph.stream(initial_state, config=config):
node_name = list(event.keys())[0]
print(f" ✓ Node '{node_name}' completed")
# Final state
final_state = graph.get_state(config)
print(f"\n📊 Final State:")
print(f" Completed tasks: {final_state.values.get('completed_tasks', [])}")
print(f" Failed tasks: {final_state.values.get('failed_tasks', [])}")
print(f" Tokens used: {final_state.values.get('total_tokens_used', 0):,}")
if __name__ == "__main__":
print("=== DevPulse Production Run with Checkpointing ===")
run_with_resumability(pr_number=847, budget=50_000)
print("\n\n=== Simulating Resume (same thread_id) ===")
# In production, this would be called after a crash — same thread_id, picks up where left off
run_with_resumability(pr_number=847, budget=50_000)Solution 2: LangSmith Observability
LangSmith is LangChain's hosted tracing platform. When enabled, it automatically captures the complete execution trace of every LLM call, tool invocation, and graph node — without any code changes to your agent logic.
Why You Cannot Debug Without Tracing
Imagine this: DevPulse reviewed src/auth/login.py and posted a review comment saying "no issues found" — but the file clearly has a SQL injection. You need to know:
- What diff content did
get_file_diffactually return to the agent? - Did the child agent's system prompt correctly say "check for SQL injection"?
- What was the model's reasoning (its internal chain-of-thought)?
- Did the structured output parsing fail silently?
Without LangSmith, the answer to all of these is "I don't know". With LangSmith, you can open the UI and see the exact input/output at every step.
Configuring LangSmith
# 10_langsmith_tracing.py
import os
from dotenv import load_dotenv
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, SystemMessage
load_dotenv()
def configure_langsmith(project_name: str = "DevPulse-Production", enable: bool = True) -> None:
"""
Configure LangSmith tracing for the current process.
Call this ONCE at startup, before any LangChain calls.
All subsequent LangChain/LangGraph calls will be automatically traced.
Args:
project_name: The LangSmith project to log traces to
enable: Set False to disable tracing (e.g., in test environments)
Environment variables required (in .env):
LANGSMITH_API_KEY: Your LangSmith API key from https://smith.langchain.com
"""
if not enable:
os.environ["LANGCHAIN_TRACING_V2"] = "false"
print("📊 [LangSmith] Tracing disabled")
return
langsmith_key = os.getenv("LANGSMITH_API_KEY")
if not langsmith_key:
print("⚠️ [LangSmith] LANGSMITH_API_KEY not set. Tracing will not be active.")
print(" Get your API key at: https://smith.langchain.com")
return
# These environment variables activate automatic tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = langsmith_key
os.environ["LANGCHAIN_PROJECT"] = project_name
print(f"📊 [LangSmith] Tracing enabled → Project: '{project_name}'")
print(f" View traces at: https://smith.langchain.com")
def add_run_metadata(pr_number: int, run_id: str, reviewer_id: str = "devpulse-bot") -> dict:
"""
Create run metadata to attach to every LangSmith trace in this review.
This metadata appears in the LangSmith UI and makes filtering much easier:
- Filter all traces for a specific PR
- See which runs were triggered by which reviewer
- Track latency and cost per PR number
Usage:
config = {"metadata": add_run_metadata(847, "run_20260617_001")}
chain.invoke(input, config=config)
"""
return {
"pr_number": str(pr_number),
"run_id": run_id,
"reviewer": reviewer_id,
"environment": os.getenv("ENVIRONMENT", "development"),
"devpulse_version": "1.0.0"
}
def demonstrate_tracing():
"""
Show how LangSmith captures a complete review chain trace.
When you run this, open LangSmith and you'll see the full trace.
"""
configure_langsmith(project_name="DevPulse-Demo", enable=True)
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
pr_number = 847
run_id = "demo_run_001"
metadata = add_run_metadata(pr_number, run_id)
config = {"metadata": metadata, "tags": ["security_review", "demo"]}
# This call will appear in LangSmith with the metadata attached
response = llm.invoke(
[
SystemMessage(content="You are a security reviewer. Analyze the code diff for vulnerabilities."),
HumanMessage(content="""
Review this diff for SQL injection:
```diff
- query = "SELECT * FROM users WHERE id = " + user_id
+ query = f"SELECT * FROM users WHERE id = '{user_id}'"
```
""")
],
config=config
)
print(f"\n📊 Review result:\n{response.content}")
print(f"\n✅ Open LangSmith to see the full trace with metadata:")
print(f" Project: DevPulse-Demo | PR: #{pr_number} | Run: {run_id}")What You See in LangSmith
When DevPulse runs a full PR review with LangSmith enabled, the trace tree looks like this:
PR #847 Review (LangGraph Run) [Total: 47.2s | Tokens: 28,431 | Cost: $0.0021]
│
├── initialize_run (Node) [0.1s]
│
├── fetch_pr_tasks (Node) [0.2s]
│
├── execute_review: src/auth/login.py (Node) [14.1s]
│ ├── get_file_diff (Tool) [0.3s]
│ │ └── Input: {pr_number: 847, file_path: "src/auth/login.py"}
│ │ └── Output: "@@ -10,15 +10,23 @@\n..."
│ ├── ChatGoogleGenerativeAI (LLM) [13.5s | 2,847 tokens]
│ │ ├── Input: [SystemMessage, HumanMessage, ToolMessage]
│ │ └── Output: AIMessage + tool_calls=[post_review_comment]
│ └── post_review_comment (Tool) [0.3s]
│ └── Input: {pr_number: 847, severity: "error", body: "SQL injection found..."}
│
├── execute_review: src/auth/tokens.py (Node) [11.8s]
│ └── ... (same pattern)
│
└── aggregate_results (Node) [2.4s]
└── Final review comment postedEvery LLM call, tool execution, and node transition is captured. If something goes wrong at any step, you can see the exact input, output, and timing.
Solution 3: Human-in-the-Loop Approval Gates
Some actions should never be taken automatically:
- Filing a Jira ticket marked
CRITICAL(might create duplicate tickets) - Auto-closing or blocking a PR (might block urgent releases)
- Sending Slack notifications to a team (can cause alert fatigue)
LangGraph provides interrupt-before hooks to pause graph execution at designated nodes and wait for human approval before proceeding.
# 11_hitl_approval.py
import sqlite3
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
import json
class ApprovalWorkflowState(TypedDict):
"""State for a workflow that requires human approval before destructive actions."""
pr_number: int
proposed_jira_tickets: list[dict] # Tickets the agent wants to create
approved_tickets: list[dict] # Tickets approved by human
rejected_tickets: list[dict] # Tickets rejected by human
approval_requested: bool
approval_granted: Optional[bool]
final_report: str
def analyze_for_critical_issues(state: ApprovalWorkflowState) -> ApprovalWorkflowState:
"""
Node 1: Analyze the PR and identify critical issues that require Jira tickets.
This is the analysis phase — no external actions taken yet.
"""
print(f"\n🔍 [Node: analyze] Analyzing PR #{state['pr_number']} for critical issues...")
# In production: run the full subagent review from Part 3
# For demo: return mock critical findings
proposed_tickets = [
{
"title": "SQL Injection vulnerability in login_user()",
"file": "src/auth/login.py",
"line": 13,
"priority": "CRITICAL",
"description": "Raw SQL query with f-string interpolation. Immediate fix required.",
"suggested_fix": "Use parameterized queries: cursor.execute('... WHERE id = ?', (user_id,))"
},
{
"title": "Hardcoded JWT secret key detected",
"file": "src/auth/tokens.py",
"line": 6,
"priority": "HIGH",
"description": "JWT_SECRET falls back to hardcoded string when env var is missing.",
"suggested_fix": "Remove the fallback: SECRET_KEY = os.environ['JWT_SECRET'] # will raise KeyError if not set"
}
]
print(f" Found {len(proposed_tickets)} issues requiring Jira tickets:")
for ticket in proposed_tickets:
print(f" {'🔴' if ticket['priority'] == 'CRITICAL' else '🟠'} [{ticket['priority']}] {ticket['title']}")
return {
"proposed_jira_tickets": proposed_tickets,
"approval_requested": True
}
def request_human_approval(state: ApprovalWorkflowState) -> ApprovalWorkflowState:
"""
Node 2: Format the approval request for the human reviewer.
Execution PAUSES after this node (interrupt_before=['create_tickets']).
The human sees this output and can approve/reject before execution continues.
"""
tickets = state["proposed_jira_tickets"]
print(f"\n📢 [Node: request_approval] Preparing approval request for human review...")
# Format a clear summary for the human
approval_summary = {
"action_required": "JIRA_TICKET_CREATION",
"pr_number": state["pr_number"],
"proposed_tickets": tickets,
"instructions": (
"DevPulse has identified the above issues and proposes to create Jira tickets. "
"Review each ticket and approve or reject. "
"To approve: app.update_state(config, {'approval_granted': True}, as_node='request_approval') "
"To reject: app.update_state(config, {'approval_granted': False}, as_node='request_approval')"
)
}
print(json.dumps(approval_summary, indent=2))
return {} # State already set from previous node; just pass through
def create_approved_tickets(state: ApprovalWorkflowState) -> ApprovalWorkflowState:
"""
Node 3: Create the Jira tickets — BUT ONLY after human approval.
This node runs after the interrupt_before pause.
"""
if not state.get("approval_granted"):
print("\n❌ [Node: create_tickets] Approval was denied. No tickets will be created.")
return {
"rejected_tickets": state["proposed_jira_tickets"],
"approved_tickets": [],
"final_report": "Ticket creation cancelled by reviewer."
}
print("\n✅ [Node: create_tickets] Approval granted. Creating Jira tickets...")
created_tickets = []
for ticket in state["proposed_jira_tickets"]:
# In production: call create_jira_ticket from Part 2
ticket_id = f"DP-{abs(hash(ticket['title'])) % 9000 + 1000}"
print(f" Created: {ticket_id} — {ticket['title']}")
created_tickets.append({**ticket, "jira_id": ticket_id, "status": "created"})
return {
"approved_tickets": created_tickets,
"rejected_tickets": [],
"final_report": f"Created {len(created_tickets)} Jira ticket(s): {[t['jira_id'] for t in created_tickets]}"
}
def build_approval_workflow():
"""
Build the human-in-the-loop approval graph.
The key: interrupt_before=["create_tickets"] tells LangGraph to
PAUSE execution before the "create_tickets" node and wait for
app.stream(None, config) to be called again after human input.
"""
builder = StateGraph(ApprovalWorkflowState)
builder.add_node("analyze", analyze_for_critical_issues)
builder.add_node("request_approval", request_human_approval)
builder.add_node("create_tickets", create_approved_tickets)
builder.add_edge(START, "analyze")
builder.add_edge("analyze", "request_approval")
builder.add_edge("request_approval", "create_tickets")
builder.add_edge("create_tickets", END)
# SQLite checkpointer — required for interrupt_before to work
# (The state must be persisted at the interrupt point)
memory = SqliteSaver.from_conn_string(":memory:")
return builder.compile(
checkpointer=memory,
interrupt_before=["create_tickets"] # PAUSE here and wait for human input
)
def run_approval_workflow_demo():
"""
Demonstrates the complete HITL workflow:
1. Run until the interrupt point
2. Inspect proposed actions
3. Provide approval (or rejection)
4. Resume execution
"""
app = build_approval_workflow()
config = {"configurable": {"thread_id": "pr_847_approval_demo"}}
initial_state: ApprovalWorkflowState = {
"pr_number": 847,
"proposed_jira_tickets": [],
"approved_tickets": [],
"rejected_tickets": [],
"approval_requested": False,
"approval_granted": None,
"final_report": ""
}
print("=== Phase 1: Running analysis until interrupt point ===")
# Run until interrupt_before["create_tickets"] — graph pauses here
for event in app.stream(initial_state, config=config):
if "__interrupt__" in event:
print("\n⏸️ GRAPH PAUSED — Awaiting human approval")
break
node = list(event.keys())[0]
print(f" ✓ Node '{node}' completed")
# Inspect current state (what the human sees)
snapshot = app.get_state(config)
proposed = snapshot.values.get("proposed_jira_tickets", [])
print(f"\n=== Phase 2: Human review of {len(proposed)} proposed ticket(s) ===")
for ticket in proposed:
priority_icon = "🔴" if ticket["priority"] == "CRITICAL" else "🟠"
print(f"\n {priority_icon} [{ticket['priority']}] {ticket['title']}")
print(f" File: {ticket['file']} (line {ticket['line']})")
print(f" Fix: {ticket['suggested_fix']}")
# Simulate human decision (in production: actual web UI interaction)
human_decision = True # Would come from UI/API in production
print(f"\n Human decision: {'APPROVED ✅' if human_decision else 'REJECTED ❌'}")
print("\n=== Phase 3: Resuming with human approval ===")
# Update state with human decision BEFORE resuming
app.update_state(config, {"approval_granted": human_decision}, as_node="request_approval")
# Resume from the interrupt point
for event in app.stream(None, config=config):
node = list(event.keys())[0]
if node != "__end__":
print(f" ✓ Node '{node}' completed")
# Final state
final = app.get_state(config)
print(f"\n📋 Final Report: {final.values.get('final_report', 'No report')}")
if __name__ == "__main__":
run_approval_workflow_demo()Solution 4: Cost Safeguards
Even with a well-designed agent, runaway costs are a real risk. Here is a layered defense:
# 12_cost_safeguards.py
import os
import time
import logging
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
logger = logging.getLogger("devpulse.cost")
@dataclass
class CostTracker:
"""
Tracks API usage costs for a DevPulse review run.
Pricing reference (approximate, as of 2026):
- gemini-2.0-flash: $0.075 per 1M input tokens, $0.30 per 1M output tokens
- gemini-1.5-pro: $1.25 per 1M input tokens, $5.00 per 1M output tokens
"""
# Per-million-token pricing
PRICING = {
"gemini-2.0-flash": {"input": 0.075, "output": 0.30},
"gemini-1.5-pro": {"input": 1.25, "output": 5.00},
"gemini-1.5-flash": {"input": 0.075, "output": 0.30},
}
max_budget_usd: float = 2.00 # Default: $2 per PR review
current_cost_usd: float = field(default=0.0)
input_tokens: int = field(default=0)
output_tokens: int = field(default=0)
tool_calls: int = field(default=0)
start_time: datetime = field(default_factory=datetime.utcnow)
def record_llm_call(self, model: str, input_tokens: int, output_tokens: int) -> None:
"""Record tokens used by an LLM call and update cost estimate."""
pricing = self.PRICING.get(model, self.PRICING["gemini-2.0-flash"])
call_cost = (
(input_tokens / 1_000_000) * pricing["input"] +
(output_tokens / 1_000_000) * pricing["output"]
)
self.input_tokens += input_tokens
self.output_tokens += output_tokens
self.current_cost_usd += call_cost
logger.info(
f"LLM call | model={model} | in={input_tokens} out={output_tokens} | "
f"call_cost=${call_cost:.4f} | total=${self.current_cost_usd:.4f}"
)
def record_tool_call(self) -> None:
self.tool_calls += 1
def is_over_budget(self) -> bool:
return self.current_cost_usd >= self.max_budget_usd
def budget_remaining_usd(self) -> float:
return max(0.0, self.max_budget_usd - self.current_cost_usd)
def get_report(self) -> dict:
elapsed = (datetime.utcnow() - self.start_time).total_seconds()
return {
"total_cost_usd": f"${self.current_cost_usd:.4f}",
"budget_usd": f"${self.max_budget_usd:.2f}",
"budget_remaining_usd": f"${self.budget_remaining_usd():.4f}",
"budget_used_pct": f"{self.current_cost_usd / self.max_budget_usd * 100:.1f}%",
"input_tokens": f"{self.input_tokens:,}",
"output_tokens": f"{self.output_tokens:,}",
"tool_calls": self.tool_calls,
"elapsed_seconds": f"{elapsed:.1f}s",
"over_budget": self.is_over_budget()
}
class IterationGuard:
"""
Hard cap on reasoning iterations to prevent infinite agent loops.
When an agent gets into a loop (e.g., calling the same tool repeatedly
expecting a different result), the iteration guard stops execution
before costs compound. Log the loop state for debugging.
"""
def __init__(self, max_iterations: int = 15):
self.max_iterations = max_iterations
self.current_iteration = 0
self._tool_call_history: list = []
def tick(self) -> None:
"""Call at the start of each iteration."""
self.current_iteration += 1
def record_tool_call(self, tool_name: str, args: dict) -> None:
"""Record a tool call to detect repetitive loops."""
self._tool_call_history.append({
"tool": tool_name,
"args": str(args),
"iteration": self.current_iteration
})
def should_stop(self) -> tuple[bool, str]:
"""
Check whether the agent should stop.
Returns (should_stop: bool, reason: str)
"""
if self.current_iteration >= self.max_iterations:
return True, f"Max iterations ({self.max_iterations}) reached"
# Detect repetitive loops: same tool called with same args 3+ times
if len(self._tool_call_history) >= 3:
recent = self._tool_call_history[-3:]
if (len(set(c["tool"] for c in recent)) == 1 and
len(set(c["args"] for c in recent)) == 1):
tool = recent[0]["tool"]
return True, f"Infinite loop detected: '{tool}' called 3x with identical args"
return False, ""
def assert_should_continue(self) -> None:
"""Raise an exception if the guard says to stop. Use in agent loops."""
should_stop, reason = self.should_stop()
if should_stop:
raise RuntimeError(f"IterationGuard: {reason}")
# ---- Integrated Cost-Safe Execution Example ----
def execute_with_cost_safety(
review_task: dict,
max_budget_usd: float = 2.00,
max_iterations: int = 10
) -> dict:
"""
Execute a review task with full cost and iteration safeguards.
"""
cost_tracker = CostTracker(max_budget_usd=max_budget_usd)
iteration_guard = IterationGuard(max_iterations=max_iterations)
print(f"\n💰 Starting cost-safe review with ${max_budget_usd:.2f} budget")
print(f"🔄 Max iterations: {max_iterations}")
try:
for iteration in range(max_iterations + 1):
iteration_guard.tick()
# Check iteration guard
should_stop, reason = iteration_guard.should_stop()
if should_stop:
logger.warning(f"Stopping review: {reason}")
return {
"status": "stopped",
"reason": reason,
"cost_report": cost_tracker.get_report()
}
# Check cost budget
if cost_tracker.is_over_budget():
msg = f"Budget exhausted: ${cost_tracker.current_cost_usd:.4f} >= ${max_budget_usd:.2f}"
logger.warning(msg)
return {
"status": "budget_exhausted",
"reason": msg,
"cost_report": cost_tracker.get_report()
}
# Simulate an LLM call (in production: actual LLM invocation)
print(f"\n Iteration {iteration + 1}: Making LLM call...")
simulated_input_tokens = 2400 + (iteration * 500)
simulated_output_tokens = 350 + (iteration * 50)
cost_tracker.record_llm_call(
model="gemini-2.0-flash",
input_tokens=simulated_input_tokens,
output_tokens=simulated_output_tokens
)
print(f" Cost so far: ${cost_tracker.current_cost_usd:.4f} / ${max_budget_usd:.2f}")
# Simulate finding no tool calls on iteration 3 (agent finished)
if iteration == 2:
print(f"\n ✅ Agent completed review.")
return {
"status": "completed",
"cost_report": cost_tracker.get_report()
}
except RuntimeError as e:
return {"status": "error", "reason": str(e), "cost_report": cost_tracker.get_report()}
if __name__ == "__main__":
# Test cost safeguards
result = execute_with_cost_safety(
review_task={"file_path": "src/auth/login.py", "review_type": "security"},
max_budget_usd=0.005, # Very small budget to trigger safeguard quickly
max_iterations=10
)
print(f"\n📊 Execution Result:")
print(f" Status: {result['status']}")
print(f" Cost Report: {result['cost_report']}")Production Deployment Checklist
Before running DevPulse in production, verify each of these:
Reliability:
- LangGraph SQLite/Postgres checkpointer configured and tested
-
thread_idstrategy implemented (one per PR run ID) - Resume path tested: kill process mid-run, restart, verify continuation
Observability:
- LangSmith configured with project name and metadata
- Run metadata includes PR number, run ID, environment
- Alert set up for runs taking >10 minutes (detect stuck agents)
Safety:
- HITL approval gates on all destructive tools (Jira, Slack, PR close)
-
CostTrackerintegrated with per-run budget set at deployment time -
IterationGuardconfigured with sensible max (10-15 for most tasks)
Resilience:
- All tools wrapped with retry logic (already handled by
.with_fallbacks()from Part 2) - Failed tasks marked in workspace plan for retry on next run
- Dead letter queue or alert for tasks that fail repeatedly
FAQs
Q: Should I use SQLite or Postgres for the checkpointer?
A: SQLite is fine for a single-server deployment where the database file is on persistent storage. For containerized deployments (Docker, Kubernetes), use PostgresSaver — containers have ephemeral filesystems, and a SQLite file inside a container will be lost when the container restarts. Point PostgresSaver at a managed database (AWS RDS, GCP Cloud SQL) to guarantee persistence across container recycling.
Q: Does LangSmith store the actual code that DevPulse reviews?
A: By default, yes — LangSmith captures the full input/output of every LLM call, which includes the code diff content. For proprietary codebases, you have options: (1) Deploy LangSmith self-hosted on your own infrastructure so no data leaves your network. (2) Use the hide_inputs/hide_outputs configuration to strip sensitive data before it is sent to LangSmith. (3) Use metadata-only logging for compliance scenarios.
Q: What's the difference between interrupt_before and interrupt_after?
A: interrupt_before=["node_name"] pauses BEFORE the node executes — you can inspect what the agent intends to do and decide whether to let it proceed. interrupt_after=["node_name"] pauses AFTER the node executes — you can inspect what the agent did and either approve the result or roll back. For human approval gates on destructive actions, interrupt_before is almost always the correct choice.
Q: How do you set the right budget? $2 seems arbitrary.
A: It depends on your PR size and model. As a reference: with gemini-2.0-flash at $0.075/1M input tokens, a 23-file PR review with average 3,000 tokens per file review and 5 reasoning turns each uses approximately 23 × 3,000 × 5 = 345,000 input tokens — about $0.026. A $2.00 budget is ~75x buffer, which is conservative enough to catch loops while not being so tight it blocks legitimate reviews of large PRs.
Continue to Part 6: Domain-Specific Harnesses & Model Routing →