Python agents require direct database access to analyze data and generate insights. This guide shows how to build Python agents that query PostgreSQL databases through Arcade's Model Context Protocol implementation, enabling secure database operations without exposing credentials to language models.
Prerequisites
Install required components:
- Python 3.8+
- PostgreSQL database with connection details
- Arcade.dev account with API key
- Basic SQL and Python knowledge
Architecture Overview
Arcade's MCP implementation separates database credentials from agent logic. The platform manages:
- Credential isolation: Connection strings remain server-side
- Tool discovery: Agents locate and invoke database operations through standardized MCP interfaces
- User context: Per-user database permissions and connection management
Install Arcade SDK
pip install arcade-ai
Set your API key:
export ARCADE_API_KEY="your_arcade_api_key"
Get your API key from the Arcade dashboard.
Configure Postgres MCP Server
Using Arcade's Community Postgres MCP Server
Arcade provides a community Postgres MCP server. View all available MCP Servers here.
For local development, install the Arcade Engine:
# macOS
brew install arcade-engine
# Ubuntu/Debian
curl -fsSL https://get.arcade.dev | sh
Create your engine configuration:
# ~/.arcade/engine.yaml
api:
development: false
host: 0.0.0.0
port: 9099
workers:
- id: "postgres-worker"
enabled: true
http:
uri: "http://localhost:8002"
secret: "${env:WORKER_SECRET}"
Set environment variables:
export POSTGRES_HOST="localhost"
export POSTGRES_PORT="5432"
export POSTGRES_DATABASE="your_database"
export POSTGRES_USER="your_user"
export POSTGRES_PASSWORD="your_password"
export WORKER_SECRET="your-secure-secret"
export OPENAI_API_KEY="your_openai_key"
See the local installation guide for detailed setup.
Build Custom Postgres Tools
Create Toolkit Structure
arcade new postgres_agent_toolkit
cd postgres_agent_toolkit
Implement Query Tool
# postgres_agent_toolkit/tools/query.py
from typing import Annotated, Dict, Any
from arcade_tdk import tool, ToolContext
import psycopg2
from psycopg2.extras import RealDictCursor
import os
@tool
def execute_read_query(
context: ToolContext,
sql_query: Annotated[str, "SQL SELECT statement to execute"],
) -> Dict[str, Any]:
"""
Execute read-only SQL queries against PostgreSQL.
Examples:
- execute_read_query("SELECT * FROM users LIMIT 10")
- execute_read_query("SELECT COUNT(*) FROM orders WHERE status='completed'")
"""
# Validate query is read-only
query_upper = sql_query.strip().upper()
if not query_upper.startswith('SELECT'):
return {"error": "Only SELECT queries allowed"}
forbidden = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE', 'TRUNCATE']
if any(kw in query_upper for kw in forbidden):
return {"error": "Query contains forbidden operations"}
# Build connection string
conn_string = f"postgresql://{os.environ['POSTGRES_USER']}:{os.environ['POSTGRES_PASSWORD']}@{os.environ['POSTGRES_HOST']}:{os.environ['POSTGRES_PORT']}/{os.environ['POSTGRES_DATABASE']}"
try:
conn = psycopg2.connect(conn_string)
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(sql_query)
results = cursor.fetchall()
return {
"success": True,
"rows": [dict(row) for row in results],
"count": len(results)
}
except psycopg2.Error as e:
return {
"success": False,
"error": str(e),
"error_code": e.pgcode
}
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
Add Schema Discovery
# postgres_agent_toolkit/tools/schema.py
from typing import Annotated, Dict, List, Any
from arcade_tdk import tool, ToolContext
import psycopg2
import os
@tool
def get_table_list(
context: ToolContext,
schema_name: Annotated[str, "Schema to query"] = "public"
) -> Dict[str, Any]:
"""
List all tables in specified schema.
Example: get_table_list("public")
"""
conn_string = f"postgresql://{os.environ['POSTGRES_USER']}:{os.environ['POSTGRES_PASSWORD']}@{os.environ['POSTGRES_HOST']}:{os.environ['POSTGRES_PORT']}/{os.environ['POSTGRES_DATABASE']}"
query = """
SELECT table_name,
pg_size_pretty(pg_total_relation_size(quote_ident(table_name)::regclass)) as size
FROM information_schema.tables
WHERE table_schema = %s
AND table_type = 'BASE TABLE'
ORDER BY table_name;
"""
try:
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
cursor.execute(query, (schema_name,))
tables = []
for row in cursor.fetchall():
tables.append({"name": row[0], "size": row[1]})
return {
"schema": schema_name,
"tables": tables,
"total": len(tables)
}
except psycopg2.Error as e:
return {"error": str(e)}
finally:
cursor.close()
conn.close()
@tool
def get_table_schema(
context: ToolContext,
table_name: Annotated[str, "Table name to describe"],
schema_name: Annotated[str, "Schema name"] = "public"
) -> Dict[str, Any]:
"""
Get column definitions for a table.
Example: get_table_schema("users", "public")
"""
conn_string = f"postgresql://{os.environ['POSTGRES_USER']}:{os.environ['POSTGRES_PASSWORD']}@{os.environ['POSTGRES_HOST']}:{os.environ['POSTGRES_PORT']}/{os.environ['POSTGRES_DATABASE']}"
query = """
SELECT
column_name,
data_type,
character_maximum_length,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position;
"""
try:
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
cursor.execute(query, (schema_name, table_name))
columns = []
for row in cursor.fetchall():
columns.append({
"name": row[0],
"type": row[1],
"max_length": row[2],
"nullable": row[3],
"default": row[4]
})
return {
"table": f"{schema_name}.{table_name}",
"columns": columns
}
except psycopg2.Error as e:
return {"error": str(e)}
finally:
cursor.close()
conn.close()
Package Configuration
# postgres_agent_toolkit/tools/__init__.py
from postgres_agent_toolkit.tools.query import execute_read_query
from postgres_agent_toolkit.tools.schema import get_table_list, get_table_schema
__all__ = ["execute_read_query", "get_table_list", "get_table_schema"]
# postgres_agent_toolkit/__init__.py
from postgres_agent_toolkit.tools import execute_read_query, get_table_list, get_table_schema
__all__ = ["execute_read_query", "get_table_list", "get_table_schema"]
Install locally:
pip install -e .
Create Python Agent with LangGraph
Install dependencies:
pip install langgraph langchain-openai arcadepy
Build the agent:
# postgres_agent.py
import os
import asyncio
from typing import Annotated, Literal
from arcadepy import Arcade
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class PostgresQueryAgent:
def __init__(self, user_id: str):
self.arcade = Arcade(api_key=os.environ["ARCADE_API_KEY"])
self.user_id = user_id
self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
async def load_tools(self):
"""Load Postgres tools from Arcade"""
response = await self.arcade.tools.list(
toolkit="postgres_agent_toolkit",
user_id=self.user_id
)
return response.items
def create_tool_node(self, tools):
"""Create LangGraph tool execution node"""
async def execute_tools(state: MessagesState):
last_message = state["messages"][-1]
tool_calls = last_message.tool_calls
results = []
for tool_call in tool_calls:
result = await self.arcade.tools.execute(
tool_name=tool_call["name"],
input=tool_call["args"],
user_id=self.user_id
)
results.append({
"tool_call_id": tool_call["id"],
"content": str(result.output)
})
return {"messages": results}
return execute_tools
async def should_continue(self, state: MessagesState) -> Literal["tools", "end"]:
"""Route to tools or end"""
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "end"
async def call_model(self, state: MessagesState):
"""Invoke LLM with tools"""
messages = state["messages"]
response = await self.llm.ainvoke(messages)
return {"messages": [response]}
async def build_graph(self):
"""Construct agent workflow"""
tools = await self.load_tools()
# Bind tools to LLM
self.llm = self.llm.bind_tools(tools)
# Build graph
workflow = StateGraph(MessagesState)
workflow.add_node("agent", self.call_model)
workflow.add_node("tools", self.create_tool_node(tools))
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
self.should_continue,
{"tools": "tools", "end": END}
)
workflow.add_edge("tools", "agent")
return workflow.compile()
async def query(self, question: str) -> str:
"""Execute natural language query"""
graph = await self.build_graph()
system_prompt = """You are a PostgreSQL database assistant. You help users query and analyze data.
When users ask questions:
1. First check available tables if needed
2. Examine table schemas to understand structure
3. Construct appropriate SQL queries
4. Execute queries and interpret results
5. Present findings clearly
Always validate queries are read-only before execution."""
result = await graph.ainvoke({
"messages": [
SystemMessage(content=system_prompt),
HumanMessage(content=question)
]
})
return result["messages"][-1].content
async def main():
agent = PostgresQueryAgent(user_id="user@example.com")
# Example queries
questions = [
"What tables exist in the database?",
"Show me the schema for the users table",
"Get the first 5 rows from users table"
]
for question in questions:
print(f"\nQuestion: {question}")
answer = await agent.query(question)
print(f"Answer: {answer}\n")
if __name__ == "__main__":
asyncio.run(main())
Run the agent:
python postgres_agent.py
Run Local MCP Server
Start the Arcade Engine:
arcade-engine
In another terminal, serve your tools:
arcade serve --port 8002
Test with the CLI chat:
arcade chat
Full details in the Arcade CLI documentation.
Deploy to Production
Deploy with Arcade Cloud
Create deployment config:
# worker.toml
[[worker]]
[worker.config]
id = "postgres-worker"
secret = "your-secure-worker-secret"
[worker.local_source]
packages = ["./postgres_agent_toolkit"]
Deploy:
arcade deploy
View deployment status in the Arcade dashboard.
Complete deployment guide: Arcade Deploy
Self-Host with Docker
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y postgresql-client && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY postgres_agent_toolkit ./postgres_agent_toolkit
RUN pip install -e ./postgres_agent_toolkit
COPY postgres_agent.py .
CMD ["python", "postgres_agent.py"]
Build and run:
docker build -t postgres-mcp-agent .
docker run -e ARCADE_API_KEY=$ARCADE_API_KEY \
-e POSTGRES_HOST=$POSTGRES_HOST \
-e POSTGRES_PORT=$POSTGRES_PORT \
-e POSTGRES_DATABASE=$POSTGRES_DATABASE \
-e POSTGRES_USER=$POSTGRES_USER \
-e POSTGRES_PASSWORD=$POSTGRES_PASSWORD \
postgres-mcp-agent
Security Implementation
Database User Permissions
Create restricted database role:
-- Read-only analytics role
CREATE ROLE agent_readonly WITH LOGIN PASSWORD 'secure_password';
GRANT USAGE ON SCHEMA public TO agent_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO agent_readonly;
-- Prevent modifications
REVOKE INSERT, UPDATE, DELETE, TRUNCATE ON ALL TABLES IN SCHEMA public FROM agent_readonly;
Update connection environment:
export POSTGRES_USER="agent_readonly"
export POSTGRES_PASSWORD="secure_password"
Query Validation
Implement validation layers:
def validate_sql_query(query: str) -> tuple[bool, str]:
"""Validate SQL query safety"""
query_upper = query.strip().upper()
# Only SELECT allowed
if not query_upper.startswith('SELECT'):
return False, "Only SELECT statements permitted"
# Block dangerous operations
forbidden = [
'DROP', 'DELETE', 'UPDATE', 'INSERT',
'ALTER', 'CREATE', 'TRUNCATE', 'GRANT',
'REVOKE', 'EXEC', 'EXECUTE'
]
for keyword in forbidden:
if keyword in query_upper:
return False, f"Query contains forbidden keyword: {keyword}"
# Prevent command injection
if ';' in query and not query.rstrip().endswith(';'):
return False, "Multiple statements not allowed"
return True, "Query validated"
Reference: SQL Tools Security Guide
Connect MCP Clients
Claude Desktop Configuration
Add to ~/Library/Application Support/Claude/claude_desktop_config.json
:
{
"mcpServers": {
"postgres": {
"command": "arcade",
"args": ["serve", "--mcp"],
"env": {
"ARCADE_API_KEY": "your_api_key",
"POSTGRES_HOST": "localhost",
"POSTGRES_PORT": "5432",
"POSTGRES_DATABASE": "your_db",
"POSTGRES_USER": "your_user",
"POSTGRES_PASSWORD": "your_password"
}
}
}
}
Restart Claude Desktop. Your Postgres tools appear in the MCP tools list.
VS Code and Cursor Integration
Configure for VS Code MCP or Cursor similarly.
Integration with Agent Frameworks
CrewAI Integration
from crewai_arcade import ArcadeToolManager
from crewai import Agent, Task, Crew
manager = ArcadeToolManager(default_user_id="user@example.com")
tools = manager.get_tools(toolkit="postgres_agent_toolkit")
analyst = Agent(
role="Data Analyst",
goal="Query database and provide insights",
tools=tools,
verbose=True
)
task = Task(
description="Find top 10 customers by revenue",
agent=analyst,
expected_output="List of customers with revenue figures"
)
crew = Crew(agents=[analyst], tasks=[task])
result = crew.kickoff()
Guide: CrewAI with Arcade
OpenAI Agents Integration
from agents import Agent, Runner
from arcadepy import AsyncArcade
from agents_arcade import get_arcade_tools
async def run_agent():
client = AsyncArcade()
tools = await get_arcade_tools(client, toolkits=["postgres_agent_toolkit"])
agent = Agent(
name="Database Agent",
instructions="Query PostgreSQL and analyze data",
model="gpt-4o-mini",
tools=tools
)
result = await Runner.run(
starting_agent=agent,
input="What are the top products by sales?",
context={"user_id": "user@example.com"}
)
return result.final_output
Details: OpenAI Agents with Arcade
Performance Optimization
Connection Pooling
from psycopg2 import pool
class ConnectionPool:
_pool = None
@classmethod
def initialize(cls, minconn=1, maxconn=10):
if cls._pool is None:
cls._pool = pool.ThreadedConnectionPool(
minconn=minconn,
maxconn=maxconn,
host=os.environ['POSTGRES_HOST'],
port=os.environ['POSTGRES_PORT'],
database=os.environ['POSTGRES_DATABASE'],
user=os.environ['POSTGRES_USER'],
password=os.environ['POSTGRES_PASSWORD']
)
@classmethod
def get_conn(cls):
return cls._pool.getconn()
@classmethod
def put_conn(cls, conn):
cls._pool.putconn(conn)
Use in tools:
@tool
def execute_read_query(context: ToolContext, sql_query: str) -> Dict[str, Any]:
"""Execute query with pooled connection"""
conn = ConnectionPool.get_conn()
try:
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(sql_query)
return {"rows": [dict(r) for r in cursor.fetchall()]}
finally:
cursor.close()
ConnectionPool.put_conn(conn)
Query Result Caching
from functools import lru_cache
import hashlib
@lru_cache(maxsize=128)
def get_cached_query_result(query_hash: str):
"""Cache query results by hash"""
return None # Cache miss
def cache_query_result(query_hash: str, result: dict):
"""Store result in cache"""
pass # Implement caching logic
Monitoring and Logging
Add observability:
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@tool
def execute_read_query(context: ToolContext, sql_query: str) -> Dict[str, Any]:
"""Execute query with logging"""
start = datetime.now()
logger.info(f"Executing query: {sql_query[:100]}")
try:
result = run_query(sql_query)
duration = (datetime.now() - start).total_seconds()
logger.info(f"Query completed in {duration}s, {len(result['rows'])} rows")
return result
except Exception as e:
logger.error(f"Query failed: {str(e)}")
raise
Additional Resources
- Arcade Documentation
- API Reference
- Toolkit Creation Guide
- MCP Servers Registry
- GitHub Examples
- Arcade Blog
Next Steps
Extend your agent with:
- Multi-table JOIN query generation
- Aggregate analytics and reporting
- Data visualization integration
- Scheduled query execution
- Integration with Gmail or Slack for notifications
Your PostgreSQL query agent is now production-ready with MCP protocol support through Arcade's platform.