How to Query Postgres from Open Agents SDK via Arcade (MCP)

How to Query Postgres from Open Agents SDK via Arcade (MCP)

Arcade.dev Team's avatar
Arcade.dev Team
OCTOBER 21, 2025
7 MIN READ
TUTORIALS
Rays decoration image
Ghost Icon

LangChain's Open Agent Platform enables developers to build production-ready AI agents. This guide demonstrates how to connect these agents to Postgres databases using Arcade's Model Context Protocol implementation, providing secure database access without managing credentials or building custom integrations.

Prerequisites

  • Python 3.8+
  • Arcade API key
  • Postgres database with connection credentials
  • LangChain and LangGraph installed

Installation

Install required packages:

pip install langchain-arcade langchain-openai langgraph arcadepy

Set environment variables:

export ARCADE_API_KEY="your_arcade_api_key"
export OPENAI_API_KEY="your_openai_api_key"

Architecture Overview

The integration consists of three layers:

LangChain Agent Layer: Handles agent orchestration and LLM interactions
Arcade Engine: Manages tool registration, execution, and MCP server connections
Postgres MCP Server: Exposes database operations as standardized tools

Arcade's native MCP server support enables connection to external MCP servers via HTTP transport, allowing agents to query Postgres through standardized interfaces.

Building a Custom Postgres Tool

Create a Postgres query tool using Arcade's Tool SDK:

from arcade_tdk import ToolContext, tool
from typing import Dict, Any, Annotated
import psycopg2
from psycopg2.extras import RealDictCursor
import os

@tool
def query_postgres(
    context: ToolContext,
    sql_query: Annotated[str, "SQL SELECT statement to execute"],
) -> Dict[str, Any]:
    """Execute read-only SQL queries against Postgres database.

    Returns query results as structured data.
    """

    # Validate read-only operation
    query_upper = sql_query.strip().upper()
    if not query_upper.startswith('SELECT'):
        return {
            "success": False,
            "error": "Only SELECT queries allowed"
        }

    # Connect to database
    try:
        conn = psycopg2.connect(
            host=os.getenv("POSTGRES_HOST"),
            port=os.getenv("POSTGRES_PORT"),
            database=os.getenv("POSTGRES_DATABASE"),
            user=os.getenv("POSTGRES_USER"),
            password=os.getenv("POSTGRES_PASSWORD")
        )

        with conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(sql_query)
            results = cursor.fetchall()

            return {
                "success": True,
                "rows": [dict(row) for row in results],
                "row_count": len(results)
            }

    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }
    finally:
        if conn:
            conn.close()

@tool
def get_schema(
    context: ToolContext,
    table_name: Annotated[str, "Table name to inspect"] = None
) -> Dict[str, Any]:
    """Get database schema information for tables and columns."""

    try:
        conn = psycopg2.connect(
            host=os.getenv("POSTGRES_HOST"),
            port=os.getenv("POSTGRES_PORT"),
            database=os.getenv("POSTGRES_DATABASE"),
            user=os.getenv("POSTGRES_USER"),
            password=os.getenv("POSTGRES_PASSWORD")
        )

        with conn.cursor(cursor_factory=RealDictCursor) as cursor:
            if table_name:
                cursor.execute("""
                    SELECT column_name, data_type, is_nullable
                    FROM information_schema.columns
                    WHERE table_name = %s
                    ORDER BY ordinal_position
                """, (table_name,))
            else:
                cursor.execute("""
                    SELECT table_name
                    FROM information_schema.tables
                    WHERE table_schema = 'public'
                    ORDER BY table_name
                """)

            results = cursor.fetchall()
            return {
                "success": True,
                "schema": [dict(row) for row in results]
            }

    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }
    finally:
        if conn:
            conn.close()

Integrating with LangChain

Basic LangGraph Agent Implementation

Create a LangChain agent with Postgres tools from Arcade:

import os
from langchain_arcade import ArcadeToolManager
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver

# Initialize Arcade and get tools
arcade_api_key = os.getenv("ARCADE_API_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")

# Create tool manager
manager = ArcadeToolManager(
    api_key=arcade_api_key,
    user_id="user@example.com"
)

# Get Postgres tools from your deployed toolkit
tools = manager.get_tools(toolkits=["postgres"])

# Initialize LLM
model = ChatOpenAI(
    model="gpt-4o",
    api_key=openai_api_key
)

# Bind tools to model
bound_model = model.bind_tools(tools)

# Create memory checkpointer
memory = MemorySaver()

# Create ReAct agent
graph = create_react_agent(
    model=bound_model,
    tools=tools,
    checkpointer=memory
)

# Run agent
config = {
    "configurable": {
        "thread_id": "postgres-query-thread",
        "user_id": "user@example.com"
    }
}

user_input = {
    "messages": [
        {
            "role": "user",
            "content": "Query the customers table and show the top 10 by total purchases"
        }
    ]
}

for chunk in graph.stream(user_input, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

Handling Authorization Interrupts

LangChain agents may need to handle user authorization for authenticated tools:

from langgraph.errors import NodeInterrupt

try:
    for chunk in graph.stream(user_input, config, stream_mode="values"):
        chunk["messages"][-1].pretty_print()
except NodeInterrupt as exc:
    print(f"Authorization required: {exc}")
    print("Visit the authorization URL, then re-run the agent")

Deploying Postgres Tools to Arcade

Create Worker Configuration

Create worker.toml in your project directory:

[worker]
name = "postgres-worker"
description = "Postgres database query tools"

[worker.toolkits]
postgres = { path = "./postgres_tools" }

Deploy to Arcade Cloud

Use the Arcade CLI to deploy:

# Login to Arcade
arcade login

# Deploy toolkit
arcade deploy

# Verify deployment
arcade worker list

Your tools are now available at api.arcade.dev for all agents.

Connect Deployed Tools in LangChain

from langchain_arcade import ArcadeToolManager

# Connect to deployed tools
manager = ArcadeToolManager(
    api_key=os.getenv("ARCADE_API_KEY"),
    user_id="user@example.com"
)

# Get deployed Postgres toolkit
tools = manager.get_tools(toolkits=["postgres-worker"])

# Use in agent as shown above

Connecting External MCP Servers

Arcade's HTTP MCP transport allows connection to external Postgres MCP servers.

Configure Engine for External MCP Server

Add MCP server configuration to engine.yaml:

workers:
  - id: "external-postgres-mcp"
    enabled: true
    http:
      uri: "http://your-postgres-mcp-server:8000/mcp"
      secret: ${env:WORKER_SECRET}
      timeout: 30s

Start the Arcade Engine:

arcade-engine -c engine.yaml

The external MCP server's tools are now available to your LangChain agents through Arcade.

Production Configuration

Connection Pooling

Implement connection pooling for production workloads:

from psycopg2 import pool
import os

class DatabasePool:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.pool = pool.ThreadedConnectionPool(
                minconn=1,
                maxconn=20,
                host=os.getenv("POSTGRES_HOST"),
                port=os.getenv("POSTGRES_PORT"),
                database=os.getenv("POSTGRES_DATABASE"),
                user=os.getenv("POSTGRES_USER"),
                password=os.getenv("POSTGRES_PASSWORD")
            )
        return cls._instance

    def get_conn(self):
        return self.pool.getconn()

    def release_conn(self, conn):
        self.pool.putconn(conn)

# Use in tool
db_pool = DatabasePool()

@tool
def query_with_pool(context: ToolContext, sql_query: str) -> Dict[str, Any]:
    """Execute query using connection pool."""
    conn = db_pool.get_conn()
    try:
        with conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(sql_query)
            results = cursor.fetchall()
            return {
                "success": True,
                "rows": [dict(row) for row in results]
            }
    finally:
        db_pool.release_conn(conn)

Query Timeout Protection

Prevent long-running queries:

@tool
def query_with_timeout(
    context: ToolContext,
    sql_query: str,
    timeout_seconds: int = 30
) -> Dict[str, Any]:
    """Execute query with timeout protection."""

    conn = psycopg2.connect(
        host=os.getenv("POSTGRES_HOST"),
        port=os.getenv("POSTGRES_PORT"),
        database=os.getenv("POSTGRES_DATABASE"),
        user=os.getenv("POSTGRES_USER"),
        password=os.getenv("POSTGRES_PASSWORD")
    )

    try:
        with conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(f"SET statement_timeout = {timeout_seconds * 1000}")
            cursor.execute(sql_query)
            results = cursor.fetchall()
            return {
                "success": True,
                "rows": [dict(row) for row in results]
            }
    except psycopg2.errors.QueryCanceled:
        return {
            "success": False,
            "error": f"Query timeout after {timeout_seconds} seconds"
        }
    finally:
        conn.close()

Read-Only Database User

Create a read-only Postgres user for agent queries:

-- Create read-only role
CREATE ROLE agent_readonly LOGIN PASSWORD 'secure_password';

-- Grant connect privilege
GRANT CONNECT ON DATABASE your_database TO agent_readonly;

-- Grant schema usage
GRANT USAGE ON SCHEMA public TO agent_readonly;

-- Grant SELECT on all tables
GRANT SELECT ON ALL TABLES IN SCHEMA public TO agent_readonly;

-- Grant SELECT on future tables
ALTER DEFAULT PRIVILEGES IN SCHEMA public
    GRANT SELECT ON TABLES TO agent_readonly;

Multi-User Database Access

Implement per-user database access using Arcade's authorization system:

from langchain_arcade import ArcadeToolManager

def create_user_agent(user_id: str):
    """Create agent with user-specific database access."""

    manager = ArcadeToolManager(
        api_key=os.getenv("ARCADE_API_KEY"),
        user_id=user_id  # Each user gets isolated access
    )

    tools = manager.get_tools(toolkits=["postgres"])

    model = ChatOpenAI(model="gpt-4o")
    bound_model = model.bind_tools(tools)
    memory = MemorySaver()

    return create_react_agent(
        model=bound_model,
        tools=tools,
        checkpointer=memory
    )

# Create agent for specific user
user_agent = create_user_agent("analyst@company.com")

# Run with user context
config = {
    "configurable": {
        "thread_id": "user-specific-thread",
        "user_id": "analyst@company.com"
    }
}

result = user_agent.stream(user_input, config)

Query Validation and Safety

SQL Validation Function

def validate_sql(query: str) -> tuple[bool, str]:
    """Validate SQL query for safety.

    Returns:
        (is_valid, error_message)
    """
    query_upper = query.strip().upper()

    # Check allowed operations
    if not query_upper.startswith(('SELECT', 'WITH')):
        return False, "Only SELECT and WITH queries allowed"

    # Block dangerous operations
    forbidden = [
        'INSERT', 'UPDATE', 'DELETE', 'DROP',
        'ALTER', 'CREATE', 'TRUNCATE', 'GRANT',
        'REVOKE', 'EXECUTE', 'CALL'
    ]

    for keyword in forbidden:
        if keyword in query_upper:
            return False, f"Forbidden keyword: {keyword}"

    # Block comments that might hide malicious SQL
    if '--' in query or '/*' in query:
        return False, "Comments not allowed in queries"

    return True, ""

# Use in tool
@tool
def safe_query(context: ToolContext, sql_query: str) -> Dict[str, Any]:
    """Execute validated SQL query."""

    is_valid, error = validate_sql(sql_query)
    if not is_valid:
        return {"success": False, "error": error}

    # Execute query
    # ... rest of implementation

Result Size Limiting

Prevent memory issues with large result sets:

@tool
def query_limited(
    context: ToolContext,
    sql_query: str,
    max_rows: int = 1000
) -> Dict[str, Any]:
    """Execute query with result size limit."""

    conn = psycopg2.connect(
        host=os.getenv("POSTGRES_HOST"),
        # ... connection params
    )

    try:
        with conn.cursor(cursor_factory=RealDictCursor) as cursor:
            # Add LIMIT to query if not present
            if 'LIMIT' not in sql_query.upper():
                sql_query = f"{sql_query} LIMIT {max_rows}"

            cursor.execute(sql_query)
            results = cursor.fetchmany(max_rows)

            return {
                "success": True,
                "rows": [dict(row) for row in results],
                "truncated": len(results) == max_rows
            }
    finally:
        conn.close()

Agent Configuration Examples

Business Intelligence Agent

from langchain_arcade import ArcadeToolManager
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

manager = ArcadeToolManager(
    api_key=os.getenv("ARCADE_API_KEY"),
    user_id="bi_analyst@company.com"
)

tools = manager.get_tools(toolkits=["postgres"])

model = ChatOpenAI(model="gpt-4o")

agent = create_react_agent(
    model=model.bind_tools(tools),
    tools=tools,
    state_modifier="""You are a business intelligence analyst.

When answering questions:
1. Query the database to get relevant data
2. Analyze results for patterns and insights
3. Provide clear, actionable recommendations

Available tables: customers, orders, products, transactions

Always check schema before querying unfamiliar tables."""
)

# Execute analysis
config = {"configurable": {"thread_id": "bi-analysis"}}
query = {"messages": [{"role": "user", "content": "Analyze sales trends for Q4"}]}

for event in agent.stream(query, config):
    print(event)

Data Quality Monitor

agent = create_react_agent(
    model=model.bind_tools(tools),
    tools=tools,
    state_modifier="""You monitor data quality in Postgres.

Check for:
- NULL values in critical columns
- Duplicate records
- Referential integrity violations
- Stale data (old timestamps)

Report issues with specific examples and row counts."""
)

# Run quality check
query = {
    "messages": [{
        "role": "user",
        "content": "Check data quality for customer and order tables"
    }]
}

for event in agent.stream(query, config):
    print(event)

Monitoring and Logging

Track agent database activity:

import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@tool
def monitored_query(
    context: ToolContext,
    sql_query: str
) -> Dict[str, Any]:
    """Execute query with logging."""

    start_time = datetime.now()
    user_id = context.user_id

    logger.info(f"Query started - User: {user_id}")
    logger.info(f"SQL: {sql_query[:200]}")

    try:
        result = execute_query(sql_query)

        duration = (datetime.now() - start_time).total_seconds()
        logger.info(f"Query completed - Duration: {duration}s, Rows: {result.get('row_count', 0)}")

        return result
    except Exception as e:
        logger.error(f"Query failed - User: {user_id}, Error: {str(e)}")
        raise

Troubleshooting

Connection Issues

Problem: Agent cannot connect to Postgres

Solutions:

  • Verify POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DATABASE environment variables
  • Check network connectivity and firewall rules
  • Confirm Postgres accepts connections from agent host
  • Test connection using psql command line tool

Authorization Failures

Problem: Tools require authorization

Solutions:

Query Execution Errors

Problem: SQL queries fail

Solutions:

  • Validate SQL syntax before execution
  • Check table and column names exist using schema tool
  • Verify database user has SELECT permissions
  • Review Postgres logs for detailed error messages
  • Ensure query timeout is set appropriately

Performance Problems

Problem: Slow query execution

Solutions:

  • Implement connection pooling
  • Add appropriate database indexes
  • Limit result set sizes
  • Use query timeouts
  • Monitor query execution plans

Additional Resources

Next Steps

After implementing Postgres queries in LangChain agents:

LangChain's Open Agent Platform combined with Arcade's MCP implementation provides production-ready database access for AI agents while maintaining security and performance requirements.

SHARE THIS POST

RECENT ARTICLES

Rays decoration image
THOUGHT LEADERSHIP

How to Query Postgres from GPT-5 via Arcade (MCP)

Large language models need structured data access to provide accurate, data-driven insights. This guide demonstrates how to connect GPT-5 to PostgreSQL databases through Arcade's Model Context Protocol implementation, enabling secure database queries without exposing credentials directly to language models. Prerequisites Before implementing database connectivity, ensure you have: * Python 3.8 or higher installed * PostgreSQL database with connection credentials * Arcade API key (free t

Rays decoration image
THOUGHT LEADERSHIP

How to Connect GPT-5 to Slack with Arcade (MCP)

Building AI agents that interact with Slack requires secure OAuth authentication, proper token management, and reliable tool execution. This guide shows you how to connect GPT-5 to Slack using Arcade's Model Context Protocol (MCP) implementation, enabling your agents to send messages, read conversations, and manage channels with production-grade security. Prerequisites Before starting, ensure you have: * Arcade.dev account with API key * Python 3.10+ or Node.js 18+ installed * OpenAI A

Rays decoration image
THOUGHT LEADERSHIP

How to Build a GPT-5 Gmail Agent with Arcade (MCP)

Building AI agents that can access and act on Gmail data represents a significant challenge in production environments. This guide demonstrates how to build a fully functional Gmail agent using OpenAI's latest models through Arcade's Model Context Protocol implementation, enabling secure OAuth-based authentication and real-world email operations. Prerequisites Before starting, ensure you have: * Active Arcade.dev account with API key * Python 3.10 or higher installed * OpenAI API key w

Blog CTA Icon

Get early access to Arcade, and start building now.