LangChain's Open Agent Platform enables developers to build production-ready AI agents. This guide demonstrates how to connect these agents to Postgres databases using Arcade's Model Context Protocol implementation, providing secure database access without managing credentials or building custom integrations.
Prerequisites
- Python 3.8+
- Arcade API key
- Postgres database with connection credentials
- LangChain and LangGraph installed
Installation
Install required packages:
pip install langchain-arcade langchain-openai langgraph arcadepy
Set environment variables:
export ARCADE_API_KEY="your_arcade_api_key"
export OPENAI_API_KEY="your_openai_api_key"
Architecture Overview
The integration consists of three layers:
LangChain Agent Layer: Handles agent orchestration and LLM interactions
Arcade Engine: Manages tool registration, execution, and MCP server connections
Postgres MCP Server: Exposes database operations as standardized tools
Arcade's native MCP server support enables connection to external MCP servers via HTTP transport, allowing agents to query Postgres through standardized interfaces.
Building a Custom Postgres Tool
Create a Postgres query tool using Arcade's Tool SDK:
from arcade_tdk import ToolContext, tool
from typing import Dict, Any, Annotated
import psycopg2
from psycopg2.extras import RealDictCursor
import os
@tool
def query_postgres(
context: ToolContext,
sql_query: Annotated[str, "SQL SELECT statement to execute"],
) -> Dict[str, Any]:
"""Execute read-only SQL queries against Postgres database.
Returns query results as structured data.
"""
# Validate read-only operation
query_upper = sql_query.strip().upper()
if not query_upper.startswith('SELECT'):
return {
"success": False,
"error": "Only SELECT queries allowed"
}
# Connect to database
try:
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST"),
port=os.getenv("POSTGRES_PORT"),
database=os.getenv("POSTGRES_DATABASE"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD")
)
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(sql_query)
results = cursor.fetchall()
return {
"success": True,
"rows": [dict(row) for row in results],
"row_count": len(results)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
finally:
if conn:
conn.close()
@tool
def get_schema(
context: ToolContext,
table_name: Annotated[str, "Table name to inspect"] = None
) -> Dict[str, Any]:
"""Get database schema information for tables and columns."""
try:
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST"),
port=os.getenv("POSTGRES_PORT"),
database=os.getenv("POSTGRES_DATABASE"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD")
)
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
if table_name:
cursor.execute("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = %s
ORDER BY ordinal_position
""", (table_name,))
else:
cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name
""")
results = cursor.fetchall()
return {
"success": True,
"schema": [dict(row) for row in results]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
finally:
if conn:
conn.close()
Integrating with LangChain
Basic LangGraph Agent Implementation
Create a LangChain agent with Postgres tools from Arcade:
import os
from langchain_arcade import ArcadeToolManager
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
# Initialize Arcade and get tools
arcade_api_key = os.getenv("ARCADE_API_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")
# Create tool manager
manager = ArcadeToolManager(
api_key=arcade_api_key,
user_id="user@example.com"
)
# Get Postgres tools from your deployed toolkit
tools = manager.get_tools(toolkits=["postgres"])
# Initialize LLM
model = ChatOpenAI(
model="gpt-4o",
api_key=openai_api_key
)
# Bind tools to model
bound_model = model.bind_tools(tools)
# Create memory checkpointer
memory = MemorySaver()
# Create ReAct agent
graph = create_react_agent(
model=bound_model,
tools=tools,
checkpointer=memory
)
# Run agent
config = {
"configurable": {
"thread_id": "postgres-query-thread",
"user_id": "user@example.com"
}
}
user_input = {
"messages": [
{
"role": "user",
"content": "Query the customers table and show the top 10 by total purchases"
}
]
}
for chunk in graph.stream(user_input, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
Handling Authorization Interrupts
LangChain agents may need to handle user authorization for authenticated tools:
from langgraph.errors import NodeInterrupt
try:
for chunk in graph.stream(user_input, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
except NodeInterrupt as exc:
print(f"Authorization required: {exc}")
print("Visit the authorization URL, then re-run the agent")
Deploying Postgres Tools to Arcade
Create Worker Configuration
Create worker.toml in your project directory:
[worker]
name = "postgres-worker"
description = "Postgres database query tools"
[worker.toolkits]
postgres = { path = "./postgres_tools" }
Deploy to Arcade Cloud
Use the Arcade CLI to deploy:
# Login to Arcade
arcade login
# Deploy toolkit
arcade deploy
# Verify deployment
arcade worker list
Your tools are now available at api.arcade.dev for all agents.
Connect Deployed Tools in LangChain
from langchain_arcade import ArcadeToolManager
# Connect to deployed tools
manager = ArcadeToolManager(
api_key=os.getenv("ARCADE_API_KEY"),
user_id="user@example.com"
)
# Get deployed Postgres toolkit
tools = manager.get_tools(toolkits=["postgres-worker"])
# Use in agent as shown above
Connecting External MCP Servers
Arcade's HTTP MCP transport allows connection to external Postgres MCP servers.
Configure Engine for External MCP Server
Add MCP server configuration to engine.yaml:
workers:
- id: "external-postgres-mcp"
enabled: true
http:
uri: "http://your-postgres-mcp-server:8000/mcp"
secret: ${env:WORKER_SECRET}
timeout: 30s
Start the Arcade Engine:
arcade-engine -c engine.yaml
The external MCP server's tools are now available to your LangChain agents through Arcade.
Production Configuration
Connection Pooling
Implement connection pooling for production workloads:
from psycopg2 import pool
import os
class DatabasePool:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=20,
host=os.getenv("POSTGRES_HOST"),
port=os.getenv("POSTGRES_PORT"),
database=os.getenv("POSTGRES_DATABASE"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD")
)
return cls._instance
def get_conn(self):
return self.pool.getconn()
def release_conn(self, conn):
self.pool.putconn(conn)
# Use in tool
db_pool = DatabasePool()
@tool
def query_with_pool(context: ToolContext, sql_query: str) -> Dict[str, Any]:
"""Execute query using connection pool."""
conn = db_pool.get_conn()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(sql_query)
results = cursor.fetchall()
return {
"success": True,
"rows": [dict(row) for row in results]
}
finally:
db_pool.release_conn(conn)
Query Timeout Protection
Prevent long-running queries:
@tool
def query_with_timeout(
context: ToolContext,
sql_query: str,
timeout_seconds: int = 30
) -> Dict[str, Any]:
"""Execute query with timeout protection."""
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST"),
port=os.getenv("POSTGRES_PORT"),
database=os.getenv("POSTGRES_DATABASE"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD")
)
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(f"SET statement_timeout = {timeout_seconds * 1000}")
cursor.execute(sql_query)
results = cursor.fetchall()
return {
"success": True,
"rows": [dict(row) for row in results]
}
except psycopg2.errors.QueryCanceled:
return {
"success": False,
"error": f"Query timeout after {timeout_seconds} seconds"
}
finally:
conn.close()
Read-Only Database User
Create a read-only Postgres user for agent queries:
-- Create read-only role
CREATE ROLE agent_readonly LOGIN PASSWORD 'secure_password';
-- Grant connect privilege
GRANT CONNECT ON DATABASE your_database TO agent_readonly;
-- Grant schema usage
GRANT USAGE ON SCHEMA public TO agent_readonly;
-- Grant SELECT on all tables
GRANT SELECT ON ALL TABLES IN SCHEMA public TO agent_readonly;
-- Grant SELECT on future tables
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO agent_readonly;
Multi-User Database Access
Implement per-user database access using Arcade's authorization system:
from langchain_arcade import ArcadeToolManager
def create_user_agent(user_id: str):
"""Create agent with user-specific database access."""
manager = ArcadeToolManager(
api_key=os.getenv("ARCADE_API_KEY"),
user_id=user_id # Each user gets isolated access
)
tools = manager.get_tools(toolkits=["postgres"])
model = ChatOpenAI(model="gpt-4o")
bound_model = model.bind_tools(tools)
memory = MemorySaver()
return create_react_agent(
model=bound_model,
tools=tools,
checkpointer=memory
)
# Create agent for specific user
user_agent = create_user_agent("analyst@company.com")
# Run with user context
config = {
"configurable": {
"thread_id": "user-specific-thread",
"user_id": "analyst@company.com"
}
}
result = user_agent.stream(user_input, config)
Query Validation and Safety
SQL Validation Function
def validate_sql(query: str) -> tuple[bool, str]:
"""Validate SQL query for safety.
Returns:
(is_valid, error_message)
"""
query_upper = query.strip().upper()
# Check allowed operations
if not query_upper.startswith(('SELECT', 'WITH')):
return False, "Only SELECT and WITH queries allowed"
# Block dangerous operations
forbidden = [
'INSERT', 'UPDATE', 'DELETE', 'DROP',
'ALTER', 'CREATE', 'TRUNCATE', 'GRANT',
'REVOKE', 'EXECUTE', 'CALL'
]
for keyword in forbidden:
if keyword in query_upper:
return False, f"Forbidden keyword: {keyword}"
# Block comments that might hide malicious SQL
if '--' in query or '/*' in query:
return False, "Comments not allowed in queries"
return True, ""
# Use in tool
@tool
def safe_query(context: ToolContext, sql_query: str) -> Dict[str, Any]:
"""Execute validated SQL query."""
is_valid, error = validate_sql(sql_query)
if not is_valid:
return {"success": False, "error": error}
# Execute query
# ... rest of implementation
Result Size Limiting
Prevent memory issues with large result sets:
@tool
def query_limited(
context: ToolContext,
sql_query: str,
max_rows: int = 1000
) -> Dict[str, Any]:
"""Execute query with result size limit."""
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST"),
# ... connection params
)
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Add LIMIT to query if not present
if 'LIMIT' not in sql_query.upper():
sql_query = f"{sql_query} LIMIT {max_rows}"
cursor.execute(sql_query)
results = cursor.fetchmany(max_rows)
return {
"success": True,
"rows": [dict(row) for row in results],
"truncated": len(results) == max_rows
}
finally:
conn.close()
Agent Configuration Examples
Business Intelligence Agent
from langchain_arcade import ArcadeToolManager
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
manager = ArcadeToolManager(
api_key=os.getenv("ARCADE_API_KEY"),
user_id="bi_analyst@company.com"
)
tools = manager.get_tools(toolkits=["postgres"])
model = ChatOpenAI(model="gpt-4o")
agent = create_react_agent(
model=model.bind_tools(tools),
tools=tools,
state_modifier="""You are a business intelligence analyst.
When answering questions:
1. Query the database to get relevant data
2. Analyze results for patterns and insights
3. Provide clear, actionable recommendations
Available tables: customers, orders, products, transactions
Always check schema before querying unfamiliar tables."""
)
# Execute analysis
config = {"configurable": {"thread_id": "bi-analysis"}}
query = {"messages": [{"role": "user", "content": "Analyze sales trends for Q4"}]}
for event in agent.stream(query, config):
print(event)
Data Quality Monitor
agent = create_react_agent(
model=model.bind_tools(tools),
tools=tools,
state_modifier="""You monitor data quality in Postgres.
Check for:
- NULL values in critical columns
- Duplicate records
- Referential integrity violations
- Stale data (old timestamps)
Report issues with specific examples and row counts."""
)
# Run quality check
query = {
"messages": [{
"role": "user",
"content": "Check data quality for customer and order tables"
}]
}
for event in agent.stream(query, config):
print(event)
Monitoring and Logging
Track agent database activity:
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@tool
def monitored_query(
context: ToolContext,
sql_query: str
) -> Dict[str, Any]:
"""Execute query with logging."""
start_time = datetime.now()
user_id = context.user_id
logger.info(f"Query started - User: {user_id}")
logger.info(f"SQL: {sql_query[:200]}")
try:
result = execute_query(sql_query)
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"Query completed - Duration: {duration}s, Rows: {result.get('row_count', 0)}")
return result
except Exception as e:
logger.error(f"Query failed - User: {user_id}, Error: {str(e)}")
raise
Troubleshooting
Connection Issues
Problem: Agent cannot connect to Postgres
Solutions:
- Verify
POSTGRES_HOST,POSTGRES_PORT,POSTGRES_DATABASEenvironment variables - Check network connectivity and firewall rules
- Confirm Postgres accepts connections from agent host
- Test connection using
psqlcommand line tool
Authorization Failures
Problem: Tools require authorization
Solutions:
- Verify
ARCADE_API_KEYis set correctly - Ensure
user_idis passed in agent config - Check tool registration in Arcade Dashboard
- Review authorization documentation
Query Execution Errors
Problem: SQL queries fail
Solutions:
- Validate SQL syntax before execution
- Check table and column names exist using schema tool
- Verify database user has SELECT permissions
- Review Postgres logs for detailed error messages
- Ensure query timeout is set appropriately
Performance Problems
Problem: Slow query execution
Solutions:
- Implement connection pooling
- Add appropriate database indexes
- Limit result set sizes
- Use query timeouts
- Monitor query execution plans
Additional Resources
- Arcade Documentation
- MCP Overview
- Tool Development Guide
- LangChain Integration
- Arcade GitHub
- API Reference
Next Steps
After implementing Postgres queries in LangChain agents:
- Build custom tools for your specific database schema
- Implement multi-agent workflows combining database and API access
- Deploy tools to Arcade Cloud for production
- Explore additional MCP servers in the Arcade ecosystem
- Add evaluation suites to test agent query accuracy
LangChain's Open Agent Platform combined with Arcade's MCP implementation provides production-ready database access for AI agents while maintaining security and performance requirements.



