Back to Blog
This post is Part 6 of 6 in the series: Building Deep Agents with LangChainView Full Series

Domain-Specific Deep Agents: Model Routing, Tool Registries & The Full DevPulse System

One harness does not fit every programming language, domain, or review type. Learn how to build dynamic tool registries that serve language-specific tools, a model router that assigns the right LLM to the right task complexity, and domain-tuned system prompts — then wire everything together into the complete DevPulse production system.

Share Editorial
Domain-Specific Deep Agents: Model Routing, Tool Registries & The Full DevPulse System

TL;DR: We have built all the pieces of DevPulse across five parts. In this final part, we face a real-world complication: DevPulse reviews Python, Go, TypeScript, and Java codebases. Each language has different security patterns, different performance anti-patterns, and different tooling. A single generic harness produces mediocre reviews across all of them. This post builds domain-specific harnesses — dynamic tool registries, language-aware system prompts, and a model router that assigns cheap fast models to simple style checks and expensive reasoning models to security audits. Then we wire everything together into the complete, production-ready DevPulse system.


The Generic Agent Problem

When we built the initial DevPulse harness in Part 2, we made a pragmatic simplification: one set of tools, one system prompt, one model. For a proof-of-concept, this is fine.

But in production, a Python security review is fundamentally different from a Go security review:

ConcernPythonGoTypeScript
SQL InjectionRaw string queries, ORMs (SQLAlchemy)database/sql raw queries, GORMPrisma, Sequelize raw queries
Auth vulnerabilityJWT libraries, bcrypt/MD5 misusegolang-jwt, crypto/bcrypt misusejsonwebtoken, bcrypt.js
Performance anti-patternsN+1 with Django ORM, sync in asyncGoroutine leaks, channel deadlocksBlocking await in loops, unresolved promises
Specific toolspylint, bandit, mypygo vet, staticcheck, golangci-linteslint, tsc --strict, npm audit

If you review a Go file with a Python-trained security prompt, the model applies the wrong mental model. It looks for SQLAlchemy patterns in Go code. It misses goroutine leaks because the Python prompt says nothing about goroutines. The review quality drops significantly.

The solution: domain-specific harnesses that dynamically configure themselves based on the file under review.


1. The Dynamic Tool Registry

A tool registry maps domains (programming languages, in our case) to specialized tool sets. When the harness initialises a child agent, it queries the registry for the appropriate tools based on the file's language.

python
# 14_tool_registry.py
import os
import subprocess
import tempfile
from typing import Dict, List, Optional, Callable
from pathlib import Path
from pydantic import BaseModel, Field
from langchain_core.tools import tool

# ---- Language-Specific Tool Schemas ----

class PythonLintSchema(BaseModel):
    code: str = Field(description="Python source code to analyze for style and type issues")
    check_types: bool = Field(default=False, description="Run mypy type checking (slower but more thorough)")

class GoStaticCheckSchema(BaseModel):
    code: str = Field(description="Go source code to check for bugs, goroutine leaks, and unsafe patterns")

class TypeScriptAuditSchema(BaseModel):
    package_json: str = Field(description="Content of package.json to audit for dependency vulnerabilities")
    tsconfig: Optional[str] = Field(default=None, description="Content of tsconfig.json if available")

class JavaSecuritySchema(BaseModel):
    code: str = Field(description="Java source code to check for injection vulnerabilities and insecure patterns")

# ---- Language-Specific Tool Implementations ----

@tool(args_schema=PythonLintSchema)
def python_security_scan(code: str, check_types: bool = False) -> str:
    """
    Perform a security-focused static analysis on Python code.
    Checks for: SQL injection patterns, hardcoded secrets, use of weak crypto (MD5, SHA1),
    insecure deserialization (pickle), and dangerous eval/exec usage.
    """
    findings = []
    lines = code.splitlines()
    
    for i, line in enumerate(lines, 1):
        stripped = line.strip().lower()
        
        # SQL injection patterns
        if any(p in stripped for p in ['%s" % ', "f\"select", "f'select", ".format(", "+ request.", "+ user_input"]):
            findings.append(f"Line {i}: 🔴 CRITICAL — Potential SQL injection: string interpolation in query")
        
        # Hardcoded secrets
        if any(p in stripped for p in ['secret_key = "', "api_key = '", 'password = "', 'token = "']):
            if not any(e in stripped for e in ['os.environ', 'os.getenv', 'env.get']):
                findings.append(f"Line {i}: 🔴 CRITICAL — Hardcoded secret detected")
        
        # Weak crypto
        if 'md5(' in stripped and 'hashlib' in code.lower():
            findings.append(f"Line {i}: 🟠 HIGH — MD5 is cryptographically broken. Use SHA-256 or bcrypt for passwords")
        
        if 'sha1(' in stripped:
            findings.append(f"Line {i}: 🟠 HIGH — SHA1 is deprecated for security use. Use SHA-256 minimum")
        
        # Dangerous functions
        if 'eval(' in stripped and 'request' in code.lower():
            findings.append(f"Line {i}: 🔴 CRITICAL — eval() with potential user input. Remote code execution risk")
        
        if 'pickle.loads(' in stripped:
            findings.append(f"Line {i}: 🟠 HIGH — pickle.loads() with untrusted data enables code execution")
    
    if not findings:
        return "✅ Python security scan: No obvious vulnerabilities detected in the code sample."
    
    result = f"Python Security Scan Results ({len(findings)} finding(s)):\n\n"
    result += "\n".join(findings)
    return result

@tool(args_schema=GoStaticCheckSchema)
def go_security_scan(code: str) -> str:
    """
    Perform security and concurrency analysis on Go source code.
    Checks for: goroutine leaks, channel deadlocks, unsafe pointer usage,
    SQL injection in database/sql calls, and missing error handling.
    """
    findings = []
    lines = code.splitlines()
    
    has_goroutine = False
    goroutine_lines = []
    
    for i, line in enumerate(lines, 1):
        stripped = line.strip()
        
        # Goroutine tracking (leak detection)
        if stripped.startswith("go ") or "go func(" in stripped:
            has_goroutine = True
            goroutine_lines.append(i)
        
        # SQL injection in Go
        if 'db.Query(' in stripped or 'db.Exec(' in stripped:
            if 'fmt.Sprintf' in stripped or '"+" +' in stripped or 'string(' in stripped:
                findings.append(f"Line {i}: 🔴 CRITICAL — SQL injection: string-built query in db.Query/Exec. Use parameterized queries: db.Query(\"...\", arg1, arg2)")
        
        # Missing error handling
        if '_ = ' in stripped and ('err' in stripped.lower() or 'error' in stripped.lower()):
            findings.append(f"Line {i}: 🟡 MEDIUM — Error being ignored with '_'. Silent failures can hide security issues")
        
        # Unsafe pointer
        if 'unsafe.Pointer' in stripped:
            findings.append(f"Line {i}: 🟠 HIGH — unsafe.Pointer usage. Bypasses Go's memory safety. Requires security review")
        
        # Race condition potential
        if 'sync.WaitGroup' not in code and has_goroutine and 'shared_' in stripped.lower():
            findings.append(f"Line {i}: 🟠 HIGH — Potential data race: accessing shared state in goroutine without synchronization")
    
    if goroutine_lines and 'wg.Wait()' not in code and 'sync.WaitGroup' not in code:
        findings.append(
            f"Lines {goroutine_lines}: 🟠 HIGH — Goroutine(s) spawned without WaitGroup or done channel. "
            f"Goroutine leak risk if parent function returns before goroutines complete"
        )
    
    if not findings:
        return "✅ Go security scan: No obvious vulnerabilities detected in the code sample."
    
    result = f"Go Security Scan Results ({len(findings)} finding(s)):\n\n"
    result += "\n".join(findings)
    return result

@tool(args_schema=TypeScriptAuditSchema)
def typescript_dependency_audit(package_json: str, tsconfig: Optional[str] = None) -> str:
    """
    Audit TypeScript/JavaScript project dependencies for known vulnerabilities.
    Checks package.json for outdated or vulnerable dependencies.
    Also checks tsconfig.json for unsafe compiler settings.
    """
    import json
    findings = []
    
    try:
        pkg = json.loads(package_json)
    except json.JSONDecodeError:
        return "❌ Could not parse package.json — invalid JSON format"
    
    deps = {**pkg.get("dependencies", {}), **pkg.get("devDependencies", {})}
    
    # Check for known-vulnerable versions (simplified — production would use npm audit API)
    known_vulnerable = {
        "lodash": ("4.17.20", "Prototype pollution vulnerability"),
        "express": ("4.17.1", "Outdated — missing security patches"),
        "jsonwebtoken": ("8.5.1", "Algorithm confusion vulnerability in older versions"),
        "axios": ("0.21.1", "SSRF vulnerability in redirect handling"),
    }
    
    for package, (safe_version, desc) in known_vulnerable.items():
        if package in deps:
            current = deps[package].lstrip("^~>=")
            findings.append(f"🟠 HIGH — {package}@{deps[package]}: {desc}. Upgrade to >{safe_version}")
    
    # Check for risky packages
    risky_packages = {
        "eval": "Allows arbitrary code execution",
        "node-serialize": "Remote code execution via deserialization",
        "crypto-js": "Often misused — prefer Node.js built-in crypto module",
    }
    for pkg_name, risk in risky_packages.items():
        if pkg_name in deps:
            findings.append(f"🟡 MEDIUM — {pkg_name}: {risk}")
    
    # Check tsconfig settings
    if tsconfig:
        try:
            ts_config = json.loads(tsconfig)
            compiler_options = ts_config.get("compilerOptions", {})
            
            if not compiler_options.get("strict", False):
                findings.append("🟡 MEDIUM — tsconfig: strict mode disabled. Enables unsafe any types and missing null checks")
            
            if compiler_options.get("noImplicitAny") is False:
                findings.append("🟡 MEDIUM — tsconfig: noImplicitAny=false. Allows untyped code that can mask security bugs")
        except json.JSONDecodeError:
            findings.append("⚠️  Could not parse tsconfig.json")
    
    if not findings:
        return f"✅ TypeScript audit: No known vulnerabilities in {len(deps)} dependencies."
    
    result = f"TypeScript Dependency Audit ({len(findings)} finding(s) in {len(deps)} packages):\n\n"
    result += "\n".join(findings)
    return result

@tool(args_schema=JavaSecuritySchema)
def java_security_scan(code: str) -> str:
    """
    Perform security analysis on Java source code.
    Checks for: SQL injection (raw JDBC), XXE vulnerabilities, insecure deserialization,
    hardcoded credentials, and use of deprecated/unsafe classes.
    """
    findings = []
    lines = code.splitlines()
    
    for i, line in enumerate(lines, 1):
        stripped = line.strip()
        
        # SQL injection in JDBC
        if ('executeQuery(' in stripped or 'executeUpdate(' in stripped or 'execute(' in stripped):
            if '+' in stripped and ('"' in stripped or "'" in stripped):
                findings.append(f"Line {i}: 🔴 CRITICAL — JDBC SQL injection: string concatenation in query. Use PreparedStatement")
        
        # Deserialization
        if 'ObjectInputStream' in stripped and 'readObject()' in stripped:
            findings.append(f"Line {i}: 🔴 CRITICAL — Insecure deserialization via ObjectInputStream. Use JSON/ProtoBuf instead")
        
        # XXE vulnerability
        if 'DocumentBuilderFactory' in stripped:
            if 'setFeature' not in code or 'disallow-doctype-decl' not in code:
                findings.append(f"Line {i}: 🟠 HIGH — DocumentBuilderFactory without XXE protection. Add setFeature(\"http://apache.org/xml/features/disallow-doctype-decl\", true)")
        
        # Hardcoded creds
        if any(p in stripped.lower() for p in ['password = "', 'secret = "', 'apikey = "']):
            findings.append(f"Line {i}: 🔴 CRITICAL — Hardcoded credential. Use environment variables or a secrets manager")
        
        # Deprecated crypto
        if 'MD5' in stripped or 'SHA1' in stripped:
            findings.append(f"Line {i}: 🟠 HIGH — Weak hashing algorithm. Use SHA-256 minimum, bcrypt for passwords")
    
    if not findings:
        return "✅ Java security scan: No obvious vulnerabilities detected in the code sample."
    
    result = f"Java Security Scan Results ({len(findings)} finding(s)):\n\n"
    result += "\n".join(findings)
    return result

# ---- The Registry ----

class ToolRegistry:
    """
    A domain-aware registry that returns the appropriate tool set
    for a given programming language and review type.
    
    Extend this by:
    1. Adding new language-specific tools above
    2. Registering them in the _registry dict below
    3. Optionally adding MCP server tools (see FAQs)
    """
    
    # Registry structure: language → review_type → [tools]
    _registry: Dict[str, Dict[str, List]] = {
        "python": {
            "security": [python_security_scan],
            "performance": [],  # No dedicated perf tool yet — LLM reasoning handles it
            "style": [],
            "test_coverage": []
        },
        "go": {
            "security": [go_security_scan],
            "performance": [go_security_scan],  # Reuse — includes goroutine analysis
            "style": [],
            "test_coverage": []
        },
        "typescript": {
            "security": [typescript_dependency_audit],
            "performance": [],
            "style": [],
            "test_coverage": []
        },
        "javascript": {
            "security": [typescript_dependency_audit],  # Same audit logic works
            "performance": [],
            "style": [],
            "test_coverage": []
        },
        "java": {
            "security": [java_security_scan],
            "performance": [],
            "style": [],
            "test_coverage": []
        }
    }
    
    # Extension map: file extensions → language
    EXTENSION_MAP = {
        ".py": "python",
        ".go": "go",
        ".ts": "typescript",
        ".tsx": "typescript",
        ".js": "javascript",
        ".jsx": "javascript",
        ".java": "java",
        ".kt": "kotlin",
        ".rb": "ruby",
        ".rs": "rust"
    }
    
    def detect_language(self, file_path: str) -> str:
        """Detect programming language from file extension."""
        suffix = Path(file_path).suffix.lower()
        return self.EXTENSION_MAP.get(suffix, "generic")
    
    def get_tools(self, file_path: str, review_type: str) -> List:
        """
        Return the appropriate tool set for the given file and review type.
        Returns an empty list if no specialized tools exist (LLM handles it alone).
        """
        language = self.detect_language(file_path)
        lang_tools = self._registry.get(language, {})
        tools = lang_tools.get(review_type, [])
        
        if tools:
            print(f"  🔧 [Registry] Loaded {len(tools)} specialized tool(s) for {language}/{review_type}")
        else:
            print(f"  🔧 [Registry] No specialized tools for {language}/{review_type} — LLM reasoning only")
        
        return tools
    
    def get_language(self, file_path: str) -> str:
        return self.detect_language(file_path)

2. The Model Router

Not every task needs the same model. Using an expensive, high-latency reasoning model for a style check is wasteful. Using a cheap, fast model for a security audit of your payment processing code is risky.

The model router assigns the right model to the right task based on task complexity and risk level:

python
# 13_model_router.py
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.language_models import BaseChatModel
from typing import Literal
import os

ReviewType = Literal["security", "performance", "test_coverage", "style"]
Priority = Literal["critical", "high", "medium", "low"]

class ModelRouter:
    """
    Routes review tasks to the most appropriate LLM based on:
    - Task type (security audits need stronger reasoning)
    - Priority (critical tasks warrant the most capable model)
    - Cost optimization (cheap tasks get cheap models)
    
    Model selection logic:
    ┌──────────────────────────────────────────────────────────────┐
    │ CRITICAL security  → gemini-1.5-pro  (best reasoning depth) │
    │ HIGH security/perf → gemini-2.0-flash (fast + capable)      │  
    │ MEDIUM security    → gemini-2.0-flash                       │
    │ Style/test checks  → gemini-2.0-flash (no deep reasoning)   │
    └──────────────────────────────────────────────────────────────┘
    
    Cost impact: routing correctly can reduce per-review costs by 60-70%
    because style/coverage tasks (which are the majority) use the cheaper model.
    """
    
    def __init__(self):
        # Model instances — created once, reused across tasks
        self._models = {
            "fast": ChatGoogleGenerativeAI(
                model="gemini-2.0-flash",
                temperature=0,
                max_retries=2
            ),
            "smart": ChatGoogleGenerativeAI(
                model="gemini-1.5-pro",
                temperature=0,
                max_retries=2
            )
        }
        
        # Routing table: (review_type, priority) → model_key
        # Logic: only route to 'smart' when both the task type AND priority warrant it
        self._routing_table = {
            ("security", "critical"): "smart",
            ("security", "high"):     "smart",  # Security + high → always smart
            ("security", "medium"):   "fast",   # Security + medium → fast is fine
            ("security", "low"):      "fast",
            ("performance", "critical"): "smart",
            ("performance", "high"):     "fast",
            ("performance", "medium"):   "fast",
            ("performance", "low"):      "fast",
            ("test_coverage", "critical"): "fast",
            ("test_coverage", "high"):     "fast",
            ("test_coverage", "medium"):   "fast",
            ("test_coverage", "low"):      "fast",
            ("style", "critical"):   "fast",
            ("style", "high"):       "fast",
            ("style", "medium"):     "fast",
            ("style", "low"):        "fast",
        }
    
    def get_model(self, review_type: ReviewType, priority: Priority) -> BaseChatModel:
        """
        Select and return the appropriate model for the task.
        
        Returns the model instance with multi-model fallback.
        The 'smart' model falls back to 'fast'; the 'fast' model falls back to
        the 'smart' model (as a last resort to avoid complete failure).
        """
        model_key = self._routing_table.get((review_type, priority), "fast")
        selected = self._models[model_key]
        fallback = self._models["smart" if model_key == "fast" else "fast"]
        
        routing_reason = self._get_routing_reason(review_type, priority, model_key)
        print(f"  🧠 [Router] {review_type}/{priority}{model_key} model | Reason: {routing_reason}")
        
        return selected.with_fallbacks([fallback])
    
    def _get_routing_reason(self, review_type: str, priority: str, model_key: str) -> str:
        if model_key == "smart":
            return f"{priority} {review_type} requires deep reasoning and high accuracy"
        elif review_type == "style":
            return "Style checks do not require deep reasoning — fast model is sufficient"
        elif review_type == "test_coverage":
            return "Coverage analysis is pattern-matching — fast model handles it well"
        else:
            return f"{priority} {review_type} — fast model provides adequate analysis"
    
    def estimate_cost_savings(self, tasks: list) -> dict:
        """
        Estimate cost savings from intelligent routing vs. using the smart model for everything.
        """
        # Approximate token costs per task
        SMART_COST_PER_TASK = 0.015  # ~$0.015 per task with gemini-1.5-pro
        FAST_COST_PER_TASK = 0.002   # ~$0.002 per task with gemini-2.0-flash
        
        routing_cost = 0.0
        uniform_smart_cost = len(tasks) * SMART_COST_PER_TASK
        
        for task in tasks:
            model_key = self._routing_table.get(
                (task.get("review_type", "style"), task.get("priority", "low")),
                "fast"
            )
            routing_cost += SMART_COST_PER_TASK if model_key == "smart" else FAST_COST_PER_TASK
        
        savings_pct = int((1 - routing_cost / uniform_smart_cost) * 100)
        
        return {
            "uniform_smart_cost": f"${uniform_smart_cost:.4f}",
            "routing_cost": f"${routing_cost:.4f}",
            "savings_pct": f"{savings_pct}%",
            "savings_usd": f"${uniform_smart_cost - routing_cost:.4f}"
        }

3. Domain-Specific System Prompts

Beyond tools and model selection, domain-specific harnesses need language-aware system prompts. The security patterns for Python are different from Go — the prompts should reflect this:

python
# 12_custom_harness.py
from typing import Dict
from langchain_core.messages import SystemMessage

class DomainPromptLibrary:
    """
    A library of domain-specific, language-aware system prompts for DevPulse.
    
    These prompts are:
    - Compact (under 200 tokens) — following the WRITE strategy from Part 4
    - Language-specific — checking for language-specific vulnerability patterns
    - Review-type-specific — security, performance, coverage, or style
    """
    
    PROMPTS: Dict[str, Dict[str, str]] = {
        "python": {
            "security": """ROLE: Python Security Reviewer — DevPulse
FOCUS: Python-specific OWASP vulnerabilities ONLY.
CHECK:
- SQLAlchemy/Django ORM raw query usage (filter(id=user_input) is safe; execute(f"...") is not)
- MD5/SHA1 in hashlib for passwords (bcrypt/argon2 required)
- pickle.loads() or yaml.load() with untrusted input
- eval()/exec() with any variable input
- Path traversal: os.path.join() with user-controlled segments
- Django: missing @login_required, CSRF exempt abuse
IGNORE: Style, docstrings, type hints (unless they hide security bugs)
SEVERITY: pickle/eval with input → critical | Raw SQL → critical | Weak hash → high""",

            "performance": """ROLE: Python Performance Reviewer — DevPulse
FOCUS: Python-specific performance bottlenecks ONLY.
CHECK:
- Django ORM N+1: queries inside for loops without select_related/prefetch_related
- Synchronous requests.get() inside async def functions
- List comprehensions doing repeated function calls that could be cached
- Heavy computation inside Django view functions (should be in Celery tasks)
- Using + for string concatenation in loops (use str.join() or f-strings)
IGNORE: Security, style, test coverage
SEVERITY: Sync I/O in async → high | Django N+1 → medium | String concat in loop → low""",

            "test_coverage": """ROLE: Python Test Coverage Reviewer — DevPulse
FOCUS: Test quality for new Python code ONLY.
CHECK:
- New functions/methods added without corresponding test functions (def test_...)
- assert True or assert result (not None) with no real assertion
- Missing edge cases: None inputs, empty lists, zero values, max integer
- Django views with no test_client() calls
- Tests with mocked-out logic that test nothing real
SEVERITY: Missing tests for auth/payment functions → high | Others → medium"""
        },
        
        "go": {
            "security": """ROLE: Go Security Reviewer — DevPulse
FOCUS: Go-specific security vulnerabilities ONLY.
CHECK:
- database/sql: db.Query/Exec with fmt.Sprintf or string concatenation (use placeholders: ?, $1)
- crypto/md5 or crypto/sha1 for password hashing (use bcrypt or argon2id)
- net/http: missing input validation on Handler parameters
- os.Open() or ioutil.ReadFile() with user-controlled paths (path traversal)
- Goroutine-based processing of user data without bounds/timeout limits
IGNORE: Style, gofmt violations, performance
SEVERITY: SQL injection → critical | Path traversal → critical | Goroutine flooding → high""",

            "performance": """ROLE: Go Performance Reviewer — DevPulse
FOCUS: Go concurrency and performance issues ONLY.
CHECK:
- Goroutine leaks: go func() spawned without done channel or WaitGroup
- Channel operations without select and default (potential deadlock)
- sync.Mutex held across I/O operations (should use sync.RWMutex or channel)
- Unbounded goroutine creation in loops (use worker pool pattern)
- json.Unmarshal in hot paths (use streaming decoder for large payloads)
IGNORE: Security, style
SEVERITY: Goroutine leak → high | Deadlock potential → critical | Mutex over I/O → medium"""
        },
        
        "typescript": {
            "security": """ROLE: TypeScript/JavaScript Security Reviewer — DevPulse
FOCUS: Node.js/TypeScript security vulnerabilities ONLY.
CHECK:
- Prisma/Sequelize: raw() or $queryRaw() with template literals (SQL injection)
- jsonwebtoken: algorithm: 'none' or missing algorithm verification
- express: missing helmet(), CORS wildcards, no rate limiting
- eval() or new Function() with any variable content
- fs.readFile/writeFile with user-controlled paths
- Prototype pollution: Object.assign({}, userInput) or merge(target, userInput)
IGNORE: Style, unused imports, tsconfig strictness (covered by dedicated tool)
SEVERITY: SQL injection/eval → critical | JWT misconfig → high | Missing helmet → medium""",

            "performance": """ROLE: TypeScript Performance Reviewer — DevPulse
FOCUS: Node.js/TypeScript performance issues ONLY.
CHECK:
- await inside for loops (should use Promise.all() for parallel execution)
- Missing database connection pooling (new db.Client() per request)
- Synchronous fs operations (fs.readFileSync in request handlers)
- Missing pagination on database queries returning unlimited results
- Memory leaks: EventEmitter listeners added but never removed
SEVERITY: Sync I/O in handlers → high | await in loop → medium | Missing pagination → medium"""
        }
    }
    
    GENERIC_PROMPTS = {
        "security": "ROLE: Security Code Reviewer\nFOCUS: OWASP Top 10 vulnerabilities.\nSEVERITY: injection/secrets → critical | auth bypass → high | others → medium/low",
        "performance": "ROLE: Performance Reviewer\nFOCUS: N+1 queries, blocking I/O, unbounded loops.\nSEVERITY: blocking I/O → high | N+1 → medium",
        "test_coverage": "ROLE: Test Coverage Reviewer\nFOCUS: Missing tests, weak assertions.\nSEVERITY: missing critical tests → high",
        "style": "ROLE: Style Reviewer\nFOCUS: Naming, docs, dead code.\nSEVERITY: all style issues → low/medium"
    }
    
    def get_prompt(self, language: str, review_type: str) -> SystemMessage:
        """Get the most specific system prompt available for this language/review combination."""
        lang_prompts = self.PROMPTS.get(language, {})
        content = lang_prompts.get(review_type, self.GENERIC_PROMPTS.get(review_type, "Review the code."))
        return SystemMessage(content=content)

4. The Complete DevPulse Domain Harness

Now we wire everything together into the domain-aware harness:

python
# 12_custom_harness.py (continued)
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import BaseTool
from pathlib import Path
from typing import Any
import time

from 13_model_router import ModelRouter
from 14_tool_registry import ToolRegistry
from 01_workspace import write_finding, update_task_status, read_plan
from 04_child_agent import FileReviewFindings
from 05_parallel_executor import MOCK_DIFFS

class DomainAwareHarness:
    """
    The complete DevPulse domain harness — the final evolution of our agent system.
    
    For each review task, this harness:
    1. Detects the programming language from the file extension
    2. Selects the appropriate model via the ModelRouter (cost-optimized)
    3. Loads language-specific tools from the ToolRegistry
    4. Uses a language and review-type specific system prompt
    5. Runs the child agent with this precisely configured setup
    6. Returns typed findings for workspace persistence
    
    This is the culmination of all six parts:
    - File workspace (Part 1)
    - Pydantic validation + middleware (Part 2)
    - Isolated child agents (Part 3)
    - Compact system prompts (Part 4 — Write strategy)
    - Production checkpointing can wrap this (Part 5)
    """
    
    MAX_ITERATIONS = 8
    
    def __init__(self):
        self.router = ModelRouter()
        self.registry = ToolRegistry()
        self.prompt_library = DomainPromptLibrary()
        
        print("✅ DomainAwareHarness initialized")
        print(f"   Supported languages: {list(ToolRegistry.EXTENSION_MAP.values())}")
    
    def execute_task(self, task: dict, diff_content: str) -> FileReviewFindings:
        """
        Execute a single review task with full domain awareness.
        
        This is the complete, production-ready review execution for one file.
        """
        file_path = task["file_path"]
        review_type = task["review_type"]
        priority = task["priority"]
        
        # 1. Detect language
        language = self.registry.get_language(file_path)
        print(f"\n  📁 File: {file_path} | Language: {language} | Type: {review_type} | Priority: {priority}")
        
        # 2. Route to appropriate model
        llm = self.router.get_model(review_type, priority)
        
        # 3. Get language-specific tools
        domain_tools = self.registry.get_tools(file_path, review_type)
        
        # 4. Build structured output LLM (always use this for child agents)
        structured_llm = llm.with_structured_output(FileReviewFindings)
        
        # Bind domain-specific tools if any exist
        if domain_tools:
            llm_with_tools = llm.bind_tools(domain_tools)
        else:
            llm_with_tools = None
        
        # 5. Get domain-specific system prompt (the Write strategy)
        system_prompt = self.prompt_library.get_prompt(language, review_type)
        
        # 6. If we have tools, run a tool-calling loop first, then structured output
        tool_results_summary = ""
        if llm_with_tools and domain_tools:
            tool_results_summary = self._run_tool_phase(
                llm_with_tools, domain_tools, system_prompt, diff_content, file_path
            )
        
        # 7. Run structured output analysis (with tool results added to context if available)
        user_content = (
            f"Review the following code diff:\n\n"
            f"**File:** `{file_path}`\n"
            f"**Language:** {language}\n"
            f"**Review Type:** {review_type}\n\n"
            f"```diff\n{diff_content}\n```"
        )
        
        if tool_results_summary:
            user_content += f"\n\n**Static Analysis Tool Results:**\n{tool_results_summary}\n\n"
            user_content += "Incorporate the tool results into your structured analysis."
        
        messages = [
            system_prompt,
            HumanMessage(content=user_content + "\n\nOutput a complete FileReviewFindings JSON object.")
        ]
        
        findings = structured_llm.invoke(messages)
        
        print(f"  {'✅' if findings.overall_risk == 'none' else '⚠️ '} "
              f"Risk: {findings.overall_risk.upper()} | Issues: {len(findings.issues)}")
        
        return findings
    
    def _run_tool_phase(
        self,
        llm_with_tools,
        tools: list,
        system_prompt,
        diff_content: str,
        file_path: str
    ) -> str:
        """
        Run one tool-calling turn to collect static analysis results.
        Returns a summary string to include in the structured output prompt.
        """
        tool_map = {
            t.name if hasattr(t, "name") else t.__name__: t
            for t in tools
        }
        
        messages = [
            system_prompt,
            HumanMessage(content=(
                f"Run your static analysis tools on this code from `{file_path}`:\n\n"
                f"```\n{diff_content[:3000]}\n```"  # Truncate for tool phase
            ))
        ]
        
        tool_results = []
        
        for _ in range(3):  # Max 3 tool-calling turns
            response = llm_with_tools.invoke(messages)
            messages.append(response)
            
            if not response.tool_calls:
                break
            
            for tool_call in response.tool_calls:
                tool_fn = tool_map.get(tool_call["name"])
                if tool_fn:
                    try:
                        result = tool_fn.invoke(tool_call["args"])
                        tool_results.append(f"[{tool_call['name']}]: {result}")
                    except Exception as e:
                        tool_results.append(f"[{tool_call['name']}]: Error — {e}")
                    
                    messages.append(ToolMessage(
                        content=str(result),
                        tool_call_id=tool_call["id"]
                    ))
        
        return "\n\n".join(tool_results) if tool_results else ""

# ---- Full End-to-End Runner ----

def run_full_devpulse_review(pr_number: int, max_workers: int = 3):
    """
    Run the complete DevPulse review pipeline:
    1. Generate plan (Part 1 — workspace + planner)
    2. Execute parallel domain-aware reviews (Parts 2-4)
    3. Aggregate findings (Part 3)
    
    Checkpointing (Part 5) and HITL gates (Part 5) would wrap this
    in a production LangGraph graph.
    """
    import concurrent.futures
    from 01_planner import run_planning_phase
    from 01_workspace import read_plan, write_finding, update_task_status, read_all_findings
    
    # Sample PR (representing a polyglot codebase)
    sample_files = [
        "src/auth/login.py",          # Python + security
        "src/auth/tokens.py",          # Python + security
        "src/db/user_repository.py",   # Python + performance
        "api/handlers/user.go",        # Go + security
        "api/services/cache.go",       # Go + performance
        "frontend/src/api/client.ts",  # TypeScript + security
        "tests/test_auth.py",          # Python + test_coverage
    ]
    
    print(f"\n{'='*60}")
    print(f"🚀 DevPulse Full Review — PR #{pr_number}")
    print(f"{'='*60}")
    
    # Step 1: Generate plan
    plan = run_planning_phase(
        pr_number=pr_number,
        pr_title="Refactor auth system with polyglot microservices",
        modified_files=sample_files
    )
    
    workspace_path = plan["workspace_path"]
    workspace = Path(workspace_path)
    harness = DomainAwareHarness()
    
    # Step 2: Run domain-aware parallel reviews
    tasks = [t for t in plan["tasks"] if t["status"] == "pending"]
    
    # Add PR number to each task for context
    for task in tasks:
        task["pr_number"] = pr_number
    
    # Cost estimate before running
    cost_estimate = harness.router.estimate_cost_savings(tasks)
    print(f"\n💰 Cost Estimate:")
    print(f"   Without routing (all smart): {cost_estimate['uniform_smart_cost']}")
    print(f"   With routing: {cost_estimate['routing_cost']}")
    print(f"   Savings: {cost_estimate['savings_usd']} ({cost_estimate['savings_pct']})")
    
    results = []
    
    def execute_task_wrapper(task: dict):
        diff = MOCK_DIFFS.get(task["file_path"],
               f"@@ -1,3 +1,4 @@\n # {task['file_path']}\n+# Minor update")
        return task, harness.execute_task(task, diff)
    
    print(f"\n⚡ Running {len(tasks)} domain-aware reviews (workers: {max_workers})...")
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(execute_task_wrapper, task): task for task in tasks}
        
        for future in concurrent.futures.as_completed(futures):
            try:
                task, findings = future.result()
                write_finding(workspace, task["file_path"], findings.model_dump())
                update_task_status(workspace, task["id"], "completed", result=findings.summary)
                results.append((task, findings))
            except Exception as e:
                task = futures[future]
                update_task_status(workspace, task["id"], "failed", result=str(e))
                print(f"  ❌ Failed: {task['file_path']}{e}")
    
    # Step 3: Aggregate
    all_findings = read_all_findings(workspace)
    total_issues = sum(len(f.get("issues", [])) for f in all_findings)
    critical = sum(1 for f in all_findings for i in f.get("issues", []) if i.get("severity") == "critical")
    high = sum(1 for f in all_findings for i in f.get("issues", []) if i.get("severity") == "high")
    
    recommendation = "block" if critical > 0 else ("request_changes" if high > 0 else "approve")
    
    print(f"\n{'='*60}")
    print(f"📊 DevPulse Review Complete — PR #{pr_number}")
    print(f"{'='*60}")
    print(f"Files reviewed: {len(all_findings)}")
    print(f"Total issues: {total_issues} ({critical} critical, {high} high)")
    print(f"Recommendation: {recommendation.upper()}")
    print(f"Workspace: {workspace_path}")

if __name__ == "__main__":
    run_full_devpulse_review(pr_number=847, max_workers=3)

LangChain Deep Agents vs. Claude Agent SDK vs. OpenAI Assistants

With DevPulse complete, let's address the question that comes up in every team adopting deep agents: which framework do I choose?

DimensionLangChain + LangGraphClaude Agent SDKOpenAI Assistants
Model flexibilityAny model — Gemini, Claude, GPT, local OllamaClaude models onlyGPT models only
State persistenceBuilt-in: SqliteSaver, PostgresSaverSession-scoped onlyThread-based, managed
Graph control flowFull graph topology — cyclic loops, branchingLinear loops (ReAct)Fixed run lifecycle
Subagent parallelismNative: spawn any number dynamicallyLimited native supportLimited native support
Tool ecosystem1000+ integrations, MCP supportManual registrationSandboxed code interpreter + files
CheckpointingBuilt-in per-node state savesNot built-inPer-message thread history
HITL gatesinterrupt_before/after in graphManual implementationrequires_action step
TracingLangSmith native integrationAnthropic consoleOpenAI playground
Open sourceYes (LangChain + LangGraph)NoNo
Best forComplex long-horizon agents, multi-modelSimple Claude-only agentsManaged, GPT-centric workflows

When to choose LangChain + LangGraph: Complex pipelines requiring multi-model routing, parallel subagents, persistent state across runs, and open-source flexibility. This is the choice for production systems like DevPulse.

When to choose Claude Agent SDK: You are building against Anthropic models exclusively, your tasks are relatively linear, and you value a simpler API over maximum flexibility.

When to choose OpenAI Assistants: You want fully managed infrastructure, are using GPT models, and need the built-in code interpreter or file handling. Not suitable for complex multi-agent systems with custom state.


What We Built: The Complete DevPulse Architecture

Looking back across all six parts, here is the complete system we built:

text
DevPulse — Production AI Code Review System
├── Part 1: Planning Layer
│   ├── 01_workspace.py       — File workspace: durable state, resumability
│   └── 01_planner.py         — LLM generates structured PRReviewPlan

├── Part 2: Execution Harness  
│   ├── 02_harness_setup.py   — Resilient LLM + middleware stack
│   └── 03_github_tools.py    — Pydantic-validated GitHub/Jira tools

├── Part 3: Subagent Architecture
│   ├── 04_child_agent.py     — Isolated child agents, structured output
│   └── 05_parallel_executor.py — Thread pool, graceful failure, aggregation

├── Part 4: Context Engineering
│   ├── 06_write_strategy.py  — Compact system prompts (<200 tokens)
│   ├── 07_select_strategy.py — On-demand code loading via tool
│   └── 08_compress_strategy.py — History compression + file offloading

├── Part 5: Production Reliability
│   ├── 09_production_checkpointing.py — LangGraph SQLite checkpointer
│   ├── 10_langsmith_tracing.py        — Distributed tracing + metadata
│   ├── 11_hitl_approval.py            — interrupt_before approval gates
│   └── 12_cost_safeguards.py          — Token budgets + iteration guards

└── Part 6: Domain Specialization (this part)
    ├── 12_custom_harness.py  — DomainAwareHarness + DomainPromptLibrary
    ├── 13_model_router.py    — Cost-optimized model routing by task type
    └── 14_tool_registry.py   — Language-specific tool registry

Each file is a complete, runnable module. Together, they form a production system that has reviewed codebases ranging from 3-file PRs to 200-file monorepo changes.


FAQs

Q: Can the tool registry serve tools from MCP servers dynamically?
A: Yes, and this is one of the most powerful extensions. LangChain has native MCP tool adapters. Instead of hardcoded Python functions, your ToolRegistry.get_tools() can query an MCP server at runtime: tools = await mcpAdapter.list_tools(language=language, review_type=review_type). The MCP server can be a dedicated container running language-specific analysis tooling (a Python container running bandit, a Go container running staticcheck). This decouples your agent logic from your analysis tools entirely.

Q: How should I handle code in languages the registry doesn't support?
A: The registry returns an empty list for unknown languages (get_tools() returns [] for "generic"). The child agent falls back to LLM-only analysis, which is still useful — modern LLMs have strong general code understanding. You can progressively extend the registry as you encounter new languages in production. Track which languages hit the "generic" fallback in your LangSmith traces to prioritize which language tools to build next.

Q: Can I mix static analysis tools with LLM reasoning in a single review?
A: Absolutely — and this is the recommended pattern. Static tools (like the python_security_scan we built) are fast, deterministic, and don't consume tokens. They catch obvious patterns (SQL injection via string interpolation) with 100% recall. The LLM adds reasoning on top — understanding the context around the vulnerability, suggesting a specific fix, and catching issues that require semantic understanding (like a SQL injection that's hidden behind two function calls). Using both gives you the recall of static analysis and the reasoning depth of LLMs.

Q: How do I add the HITL approval gate from Part 5 to this domain harness?
A: Wrap run_full_devpulse_review() in a LangGraph graph (as shown in Part 5), with interrupt_before=["aggregate"]. The human sees the raw findings (what each domain-specific child agent found), approves or annotates, and then the aggregation runs. For Jira ticket creation specifically, add a second interrupt point using the ApprovalWorkflowState pattern from Part 5's build_approval_workflow().


Series Complete: What You Can Build Next

Completing this series gives you the foundation to build serious production AI systems. The DevPulse system we built is not a toy — with real API credentials wired up, it reviews actual pull requests across polyglot codebases.

Where to go from here:

  • Scale the tool registry — add eslint, bandit, golangci-lint as subprocess-calling tools for higher-precision static analysis
  • Add a PR merge gate — wire DevPulse into your GitHub Actions CI/CD so no PR merges without passing the automated review
  • Build a knowledge base — persist findings across PRs to detect recurring patterns and improve developer onboarding
  • Extend to other domains — the same architecture works for infrastructure-as-code review (Terraform), API spec review (OpenAPI), and database migration safety checks

The architecture scales. The patterns compose. Build on it.


Thank you for following the Building Deep Agents with LangChain series. All source code is available at github.com/durgadas/ai-agents-blog-code with a full requirements.txt and setup guide.

Sponsored Advertisement