Know Your Customer (KYC) processes require secure document collection, verification, and storage. Building AI agents that handle KYC workflows faces a critical challenge: accessing customer documents across multiple Google Workspace accounts while maintaining strict security boundaries. This guide demonstrates how to build a production-ready KYC intake agent using Arcade.dev's Model Context Protocol (MCP) authentication and Google Docs toolkit.
Prerequisites and Setup Requirements
Before implementing your KYC agent, ensure your environment meets these requirements:
- Active Arcade.dev account with API key
- Google Cloud Console project with OAuth 2.0 credentials configured
- Python 3.8+ or Node.js 16+ development environment
- Google Workspace API access enabled for your domain
- Basic knowledge of OAuth flows and document processing
Initial Environment Configuration
Start by installing the Arcade Python client and configuring your authentication credentials:
# Install core Arcade components
pip install arcadepy arcade_tdk arcade-ai
# Set environment variables
export ARCADE_API_KEY="your_arcade_api_key"
export GOOGLE_CLIENT_ID="your_google_client_id"
export GOOGLE_CLIENT_SECRET="your_google_client_secret"
export GOOGLE_WORKSPACE_DOMAIN="your_domain.com"
KYC Document Workflow Architecture
The Authentication Challenge in KYC Processes
Traditional KYC systems struggle with three key problems that Arcade.dev solves:
- Document Access Fragmentation: Customers store KYC documents across personal and business Google accounts
- Compliance Requirements: Financial regulations demand granular audit trails for every document access
- Security Boundaries: AI agents must never have direct access to authentication tokens or raw document data
Arcade's platform addresses these challenges through managed OAuth flows and tool-calling isolation. The platform acts as an authentication broker, enabling your KYC agent to request documents from customers without handling credentials directly.
Building the Core KYC Agent
Document Collection Pattern
This Python class implements the fundamental pattern for collecting KYC documents from multiple customer accounts:
from typing import Dict, List, Any
from arcadepy import Arcade
import os
class KYCIntakeAgent:
def __init__(self):
self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
self.required_documents = [
"government_id",
"proof_of_address",
"bank_statement",
"incorporation_docs"
]
self.customer_sessions: Dict[str, Any] = {}
async def initiate_kyc_intake(self, customer_id: str, email: str) -> Dict:
"""Start KYC document collection for a customer"""
# Check if Google Drive access is authorized
auth_response = await self.arcade.tools.authorize(
tool_name="GoogleDocs.SearchAndRetrieveDocuments",
user_id=customer_id
)
if auth_response.status != "completed":
return {
"status": "authorization_required",
"auth_url": auth_response.url,
"message": "Please authorize access to your Google Drive documents"
}
# Wait for OAuth completion
await self.arcade.auth.wait_for_completion(auth_response)
# Initialize document collection session
self.customer_sessions[customer_id] = {
"email": email,
"status": "collecting",
"documents_collected": [],
"timestamp": datetime.now()
}
return {"status": "ready", "customer_id": customer_id}
async def search_kyc_documents(self, customer_id: str, document_type: str) -> List[Dict]:
"""Search customer's Drive for specific KYC document types"""
search_patterns = {
"government_id": ["passport", "driver license", "national id"],
"proof_of_address": ["utility bill", "bank statement address", "lease"],
"bank_statement": ["bank statement", "financial statement"],
"incorporation_docs": ["articles incorporation", "certificate incorporation"]
}
results = []
for pattern in search_patterns.get(document_type, []):
response = await self.arcade.tools.execute(
tool_name="GoogleDrive.SearchFiles",
input={"query": f"name contains '{pattern}'"},
user_id=customer_id
)
results.extend(response.output.get("files", []))
return results
Document Verification Workflow
Implement document extraction and verification with proper error handling:
class KYCDocumentProcessor:
def __init__(self, arcade_client: Arcade):
self.arcade = arcade_client
self.verification_rules = self.load_verification_rules()
async def extract_document_content(self, customer_id: str, document_id: str) -> Dict:
"""Extract and parse document content for verification"""
try:
# Fetch document metadata
metadata_response = await self.arcade.tools.execute(
tool_name="GoogleDrive.GetFileMetadata",
input={"file_id": document_id},
user_id=customer_id
)
# Extract document content based on type
if metadata_response.output.get("mimeType") == "application/vnd.google-apps.document":
content_response = await self.arcade.tools.execute(
tool_name="GoogleDocs.GetDocument",
input={"document_id": document_id},
user_id=customer_id
)
return {
"document_id": document_id,
"content": self.parse_document_content(content_response.output),
"metadata": metadata_response.output,
"extraction_status": "success"
}
# Handle PDFs and images
elif metadata_response.output.get("mimeType") in ["application/pdf", "image/png", "image/jpeg"]:
# Export for OCR processing
export_response = await self.arcade.tools.execute(
tool_name="GoogleDrive.ExportFile",
input={
"file_id": document_id,
"mime_type": "application/pdf"
},
user_id=customer_id
)
return {
"document_id": document_id,
"export_url": export_response.output.get("export_link"),
"requires_ocr": True,
"metadata": metadata_response.output
}
except Exception as e:
return {
"document_id": document_id,
"error": str(e),
"extraction_status": "failed"
}
def parse_document_content(self, doc_content: Dict) -> Dict:
"""Parse Google Docs content structure"""
extracted_data = {
"text_content": "",
"tables": [],
"headers": []
}
# Extract text elements
for element in doc_content.get("body", {}).get("content", []):
if "paragraph" in element:
text = self.extract_paragraph_text(element["paragraph"])
extracted_data["text_content"] += text + "\n"
elif "table" in element:
table_data = self.extract_table_data(element["table"])
extracted_data["tables"].append(table_data)
return extracted_data
Implementing Multi-Customer Authentication
Concurrent Customer Session Management
Handle multiple KYC intakes simultaneously with proper session isolation:
from asyncio import Queue, create_task
from typing import Set
class MultiCustomerKYCManager:
def __init__(self, max_concurrent: int = 10):
self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
self.active_sessions: Dict[str, Any] = {}
self.processing_queue = Queue(maxsize=max_concurrent)
self.completed_verifications: Set[str] = set()
async def process_batch_kyc(self, customer_list: List[Dict]) -> Dict:
"""Process multiple KYC intakes concurrently"""
tasks = []
for customer in customer_list:
task = create_task(self.process_single_kyc(
customer["id"],
customer["email"]
))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
"total_processed": len(results),
"successful": len([r for r in results if not isinstance(r, Exception)]),
"failed": len([r for r in results if isinstance(r, Exception)]),
"details": results
}
async def process_single_kyc(self, customer_id: str, email: str) -> Dict:
"""Process individual KYC with authentication handling"""
await self.processing_queue.put(customer_id)
try:
# Initialize customer authentication
auth_status = await self.authenticate_customer(customer_id)
if not auth_status["authenticated"]:
return {
"customer_id": customer_id,
"status": "pending_authorization",
"auth_url": auth_status.get("auth_url")
}
# Collect required documents
documents = await self.collect_all_documents(customer_id)
# Verify document compliance
verification = await self.verify_documents(customer_id, documents)
self.completed_verifications.add(customer_id)
return {
"customer_id": customer_id,
"status": "completed",
"verification_result": verification
}
finally:
await self.processing_queue.get()
OAuth Flow Management for KYC
Implement secure OAuth handling with automatic retry and refresh:
class KYCAuthenticationHandler:
def __init__(self):
self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
self.pending_authorizations: Dict[str, Any] = {}
async def handle_customer_authorization(self, customer_id: str, callback_url: str) -> Dict:
"""Manage OAuth flow for document access"""
# Check existing authorization status
existing_auth = self.pending_authorizations.get(customer_id)
if existing_auth and existing_auth["status"] == "completed":
return {"status": "already_authorized", "customer_id": customer_id}
# Request Google Docs/Drive authorization
auth_response = await self.arcade.tools.authorize(
tool_name="GoogleDocs.SearchAndRetrieveDocuments",
user_id=customer_id,
redirect_uri=callback_url
)
# Store pending authorization
self.pending_authorizations[customer_id] = {
"auth_id": auth_response.id,
"auth_url": auth_response.url,
"status": "pending",
"requested_at": datetime.now()
}
return {
"status": "authorization_required",
"auth_url": auth_response.url,
"customer_id": customer_id
}
async def complete_authorization(self, customer_id: str, auth_code: str) -> Dict:
"""Complete OAuth callback and validate access"""
pending = self.pending_authorizations.get(customer_id)
if not pending:
return {"status": "error", "message": "No pending authorization found"}
# Complete OAuth flow
completion_result = await self.arcade.auth.complete_oauth(
auth_id=pending["auth_id"],
auth_code=auth_code,
user_id=customer_id
)
if completion_result.success:
# Verify document access permissions
test_access = await self.verify_document_access(customer_id)
if test_access["has_access"]:
self.pending_authorizations[customer_id]["status"] = "completed"
return {"status": "success", "customer_id": customer_id}
return {"status": "failed", "error": completion_result.error}
Security and Compliance Features
Audit Trail Implementation
Track every document access for regulatory compliance:
import hashlib
from datetime import datetime
import json
class KYCAuditLogger:
def __init__(self, encryption_key: str):
self.encryption_key = encryption_key
self.audit_entries: List[Dict] = []
async def log_document_access(self, event_data: Dict) -> None:
"""Create tamper-proof audit log entry"""
entry = {
"timestamp": datetime.now().isoformat(),
"customer_id": self.hash_customer_id(event_data["customer_id"]),
"document_id": event_data["document_id"],
"action": event_data["action"],
"ip_address": event_data.get("ip_address"),
"agent_version": event_data.get("agent_version"),
"hash": ""
}
# Create cryptographic hash of entry
entry_json = json.dumps(entry, sort_keys=True)
entry["hash"] = hashlib.sha256(
(entry_json + self.encryption_key).encode()
).hexdigest()
self.audit_entries.append(entry)
# Persist to secure storage
await self.persist_audit_entry(entry)
def hash_customer_id(self, customer_id: str) -> str:
"""Hash customer ID for privacy compliance"""
return hashlib.sha256(
(customer_id + self.encryption_key).encode()
).hexdigest()[:16]
async def generate_compliance_report(self, start_date: datetime, end_date: datetime) -> Dict:
"""Generate KYC compliance report for regulators"""
filtered_entries = [
e for e in self.audit_entries
if start_date <= datetime.fromisoformat(e["timestamp"]) <= end_date
]
return {
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
},
"total_verifications": len(filtered_entries),
"unique_customers": len(set(e["customer_id"] for e in filtered_entries)),
"document_types_accessed": self.categorize_documents(filtered_entries),
"audit_entries": filtered_entries
}
Token Security and Isolation
Implement secure token handling without exposure to the AI model:
class SecureKYCTokenManager:
def __init__(self):
self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
# Tokens are never stored locally - Arcade manages them
async def execute_with_customer_context(self, customer_id: str, operation: str, params: Dict) -> Dict:
"""Execute Google Docs operations with isolated customer credentials"""
try:
# Arcade handles token retrieval and refresh automatically
response = await self.arcade.tools.execute(
tool_name=f"GoogleDocs.{operation}",
input=params,
user_id=customer_id
)
# Log access without exposing tokens
await self.log_secure_access(customer_id, operation)
return {"success": True, "data": response.output}
except Exception as e:
if "token_expired" in str(e):
# Arcade handles refresh automatically
return await self.retry_with_refresh(customer_id, operation, params)
elif "insufficient_scope" in str(e):
# Request additional permissions
return {
"success": False,
"requires_reauth": True,
"additional_scopes": self.extract_required_scopes(e)
}
raise
Production Deployment Configuration
Kubernetes Deployment for Scale
Deploy your KYC agent with high availability and auto-scaling:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kyc-intake-agent
namespace: production
spec:
replicas: 5
selector:
matchLabels:
app: kyc-agent
template:
metadata:
labels:
app: kyc-agent
spec:
containers:
- name: arcade-kyc-engine
image: ghcr.io/arcadeai/engine:latest
env:
- name: ARCADE_API_KEY
valueFrom:
secretKeyRef:
name: arcade-secrets
key: api-key
- name: GOOGLE_CLIENT_ID
valueFrom:
secretKeyRef:
name: google-oauth
key: client-id
- name: GOOGLE_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: google-oauth
key: client-secret
- name: KYC_COMPLIANCE_MODE
value: "strict"
resources:
requests:
memory: "1Gi"
cpu: "1000m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 9099
initialDelaySeconds: 30
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: kyc-agent-service
spec:
selector:
app: kyc-agent
ports:
- protocol: TCP
port: 443
targetPort: 9099
type: LoadBalancer
Performance Optimization
Implement caching and batch processing for high-volume KYC operations:
from functools import lru_cache
import asyncio
class OptimizedKYCProcessor:
def __init__(self):
self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
self.document_cache = {}
self.batch_size = 20
@lru_cache(maxsize=1000)
async def get_cached_document_metadata(self, customer_id: str, document_id: str) -> Dict:
"""Cache document metadata to reduce API calls"""
cache_key = f"{customer_id}:{document_id}"
if cache_key in self.document_cache:
cached = self.document_cache[cache_key]
if (datetime.now() - cached["timestamp"]).seconds < 3600:
return cached["data"]
# Fetch fresh metadata
response = await self.arcade.tools.execute(
tool_name="GoogleDrive.GetFileMetadata",
input={"file_id": document_id},
user_id=customer_id
)
self.document_cache[cache_key] = {
"data": response.output,
"timestamp": datetime.now()
}
return response.output
async def batch_process_documents(self, customer_id: str, document_ids: List[str]) -> List[Dict]:
"""Process multiple documents in optimized batches"""
results = []
for i in range(0, len(document_ids), self.batch_size):
batch = document_ids[i:i + self.batch_size]
batch_tasks = [
self.process_document(customer_id, doc_id)
for doc_id in batch
]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
# Rate limiting pause between batches
await asyncio.sleep(0.5)
return results
Monitoring and Error Handling
Comprehensive Error Recovery
Handle authentication failures and document access issues gracefully:
class KYCErrorHandler:
def __init__(self):
self.retry_limits = {
"auth_failed": 3,
"rate_limit": 5,
"document_access": 2
}
self.retry_counts = {}
async def handle_kyc_error(self, error: Exception, context: Dict) -> Dict:
"""Intelligent error handling with recovery strategies"""
error_type = self.classify_error(error)
customer_id = context.get("customer_id")
# Track retry attempts
retry_key = f"{customer_id}:{error_type}"
self.retry_counts[retry_key] = self.retry_counts.get(retry_key, 0) + 1
if self.retry_counts[retry_key] > self.retry_limits.get(error_type, 3):
return {
"status": "failed",
"error": "Maximum retry attempts exceeded",
"escalate_to": "manual_review"
}
# Error-specific recovery strategies
recovery_strategies = {
"token_expired": self.refresh_customer_token,
"insufficient_scope": self.request_additional_scopes,
"document_not_found": self.search_alternative_documents,
"rate_limit_exceeded": self.implement_backoff,
"authorization_required": self.reinitiate_oauth
}
strategy = recovery_strategies.get(error_type)
if strategy:
return await strategy(customer_id, context)
return {"status": "unrecoverable_error", "error": str(error)}
async def implement_backoff(self, customer_id: str, context: Dict) -> Dict:
"""Exponential backoff for rate limiting"""
retry_count = self.retry_counts.get(f"{customer_id}:rate_limit", 1)
wait_time = (2 ** retry_count) + random.uniform(0, 1)
await asyncio.sleep(wait_time)
return {"status": "retry", "wait_time": wait_time}
Testing Your KYC Agent
Integration Testing Framework
Create comprehensive tests for your KYC intake workflow:
import pytest
from unittest.mock import Mock, patch
class TestKYCIntakeAgent:
@pytest.fixture
async def kyc_agent(self):
"""Initialize test agent with mock Arcade client"""
with patch('arcadepy.Arcade') as mock_arcade:
agent = KYCIntakeAgent()
agent.arcade = mock_arcade
return agent
async def test_customer_authorization_flow(self, kyc_agent):
"""Test complete authorization workflow"""
# Mock authorization response
kyc_agent.arcade.tools.authorize.return_value = Mock(
status="pending",
url="https://accounts.google.com/oauth/authorize?..."
)
result = await kyc_agent.initiate_kyc_intake("test_customer_123", "customer@example.com")
assert result["status"] == "authorization_required"
assert "auth_url" in result
async def test_document_collection(self, kyc_agent):
"""Test document search and retrieval"""
# Mock document search results
kyc_agent.arcade.tools.execute.return_value = Mock(
output={"files": [
{"id": "doc123", "name": "passport.pdf", "mimeType": "application/pdf"},
{"id": "doc456", "name": "bank_statement.pdf", "mimeType": "application/pdf"}
]}
)
documents = await kyc_agent.search_kyc_documents("test_customer_123", "government_id")
assert len(documents) == 2
assert documents[0]["name"] == "passport.pdf"
Conclusion
Building an MCP-authenticated KYC intake agent with Arcade's Google Docs toolkit transforms document collection from a manual, error-prone process into an automated, secure workflow. The platform's OAuth management, tool-calling architecture, and enterprise-grade security features enable financial institutions to meet compliance requirements while providing seamless customer experiences.
Key takeaways for production deployment:
- Authentication First: Arcade's managed OAuth ensures your AI agent never handles raw credentials
- Compliance Built-In: Audit logging and granular permissions satisfy regulatory requirements
- Scale Ready: Multi-customer session management handles thousands of concurrent KYC intakes
- Security by Design: Token isolation and encryption protect sensitive customer data
For additional implementation details and toolkit documentation, explore the Arcade documentation and toolkit reference. To see example implementations and contribute to the community, visit the Arcade GitHub repository.
Other Articles
- How to Build a Python Agent Gmail Agent with Arcade (MCP)
- How to Connect Python Agent to Slack with Arcade (MCP)
- How to Query Postgres from Python Agent via Arcade (MCP)
- How to Build a Delegated-User Claims Intake Bot with Arcade’s Gmail Toolkit
- How to Build a Compliance Evidence Collector with Arcade’s Google Drive Toolkit
- How to Build a Prior Authorization Packet Assembler with Arcade’s Google Docs Toolkit
- How to Build an FDA Form Autofill Agent with Arcade’s Google Docs + Drive Toolkits
- How to Build an MCP-Authenticated Multi-Tenant Agent Gateway with Arcade’s MCP Gateway Toolkit
- How to Build an Agent Auth Handshake (OIDC) with Arcade’s OAuth Toolkit
- How to Query Postgres from Open Agents SDK via Arcade (MCP)
- How to Connect Open Agents SDK to Slack with Arcade (MCP)
- How to Build an Open Agents SDK Gmail Agent with Arcade (MCP)
- How to Query Postgres from GPT-5 via Arcade (MCP)
- How to Connect GPT-5 to Slack with Arcade (MCP)
- How to Build a GPT-5 Gmail Agent with Arcade (MCP)



