Know Your Customer (KYC) processes require secure document collection, verification, and storage. Building AI agents that handle KYC workflows faces a critical challenge: accessing customer documents across multiple Google Workspace accounts while maintaining strict security boundaries. This guide demonstrates how to build a production-ready KYC intake agent using Arcade.dev's Model Context Protocol (MCP) authentication and Google Docs toolkit.
Prerequisites and Setup Requirements
Before implementing your KYC agent, ensure your environment meets these requirements:
- Active Arcade.dev account with API key
- Google Cloud Console project with OAuth 2.0 credentials configured
- Python 3.8+ or Node.js 16+ development environment
- Google Workspace API access enabled for your domain
- Basic knowledge of OAuth flows and document processing
Initial Environment Configuration
Start by installing the Arcade Python client and configuring your authentication credentials:
# Install core Arcade components
pip install arcadepy arcade_tdk arcade-ai
# Set environment variables
export ARCADE_API_KEY="your_arcade_api_key"
export GOOGLE_CLIENT_ID="your_google_client_id"
export GOOGLE_CLIENT_SECRET="your_google_client_secret"
export GOOGLE_WORKSPACE_DOMAIN="your_domain.com"KYC Document Workflow Architecture
The Authentication Challenge in KYC Processes
Traditional KYC systems struggle with three key problems that Arcade.dev solves:
- Document Access Fragmentation: Customers store KYC documents across personal and business Google accounts
- Compliance Requirements: Financial regulations demand granular audit trails for every document access
- Security Boundaries: AI agents must never have direct access to authentication tokens or raw document data
Arcade's platform addresses these challenges through managed OAuth flows and tool-calling isolation. The platform acts as an authentication broker, enabling your KYC agent to request documents from customers without handling credentials directly.
Building the Core KYC Agent
Document Collection Pattern
This Python class implements the fundamental pattern for collecting KYC documents from multiple customer accounts:
from typing import Dict, List, Any
from arcadepy import Arcade
import os
class KYCIntakeAgent:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.required_documents = [
            "government_id",
            "proof_of_address",
            "bank_statement",
            "incorporation_docs"
        ]
        self.customer_sessions: Dict[str, Any] = {}
    async def initiate_kyc_intake(self, customer_id: str, email: str) -> Dict:
        """Start KYC document collection for a customer"""
        # Check if Google Drive access is authorized
        auth_response = await self.arcade.tools.authorize(
            tool_name="GoogleDocs.SearchAndRetrieveDocuments",
            user_id=customer_id
        )
        if auth_response.status != "completed":
            return {
                "status": "authorization_required",
                "auth_url": auth_response.url,
                "message": "Please authorize access to your Google Drive documents"
            }
        # Wait for OAuth completion
        await self.arcade.auth.wait_for_completion(auth_response)
        # Initialize document collection session
        self.customer_sessions[customer_id] = {
            "email": email,
            "status": "collecting",
            "documents_collected": [],
            "timestamp": datetime.now()
        }
        return {"status": "ready", "customer_id": customer_id}
    async def search_kyc_documents(self, customer_id: str, document_type: str) -> List[Dict]:
        """Search customer's Drive for specific KYC document types"""
        search_patterns = {
            "government_id": ["passport", "driver license", "national id"],
            "proof_of_address": ["utility bill", "bank statement address", "lease"],
            "bank_statement": ["bank statement", "financial statement"],
            "incorporation_docs": ["articles incorporation", "certificate incorporation"]
        }
        results = []
        for pattern in search_patterns.get(document_type, []):
            response = await self.arcade.tools.execute(
                tool_name="GoogleDrive.SearchFiles",
                input={"query": f"name contains '{pattern}'"},
                user_id=customer_id
            )
            results.extend(response.output.get("files", []))
        return resultsDocument Verification Workflow
Implement document extraction and verification with proper error handling:
class KYCDocumentProcessor:
    def __init__(self, arcade_client: Arcade):
        self.arcade = arcade_client
        self.verification_rules = self.load_verification_rules()
    async def extract_document_content(self, customer_id: str, document_id: str) -> Dict:
        """Extract and parse document content for verification"""
        try:
            # Fetch document metadata
            metadata_response = await self.arcade.tools.execute(
                tool_name="GoogleDrive.GetFileMetadata",
                input={"file_id": document_id},
                user_id=customer_id
            )
            # Extract document content based on type
            if metadata_response.output.get("mimeType") == "application/vnd.google-apps.document":
                content_response = await self.arcade.tools.execute(
                    tool_name="GoogleDocs.GetDocument",
                    input={"document_id": document_id},
                    user_id=customer_id
                )
                return {
                    "document_id": document_id,
                    "content": self.parse_document_content(content_response.output),
                    "metadata": metadata_response.output,
                    "extraction_status": "success"
                }
            # Handle PDFs and images
            elif metadata_response.output.get("mimeType") in ["application/pdf", "image/png", "image/jpeg"]:
                # Export for OCR processing
                export_response = await self.arcade.tools.execute(
                    tool_name="GoogleDrive.ExportFile",
                    input={
                        "file_id": document_id,
                        "mime_type": "application/pdf"
                    },
                    user_id=customer_id
                )
                return {
                    "document_id": document_id,
                    "export_url": export_response.output.get("export_link"),
                    "requires_ocr": True,
                    "metadata": metadata_response.output
                }
        except Exception as e:
            return {
                "document_id": document_id,
                "error": str(e),
                "extraction_status": "failed"
            }
    def parse_document_content(self, doc_content: Dict) -> Dict:
        """Parse Google Docs content structure"""
        extracted_data = {
            "text_content": "",
            "tables": [],
            "headers": []
        }
        # Extract text elements
        for element in doc_content.get("body", {}).get("content", []):
            if "paragraph" in element:
                text = self.extract_paragraph_text(element["paragraph"])
                extracted_data["text_content"] += text + "\n"
            elif "table" in element:
                table_data = self.extract_table_data(element["table"])
                extracted_data["tables"].append(table_data)
        return extracted_dataImplementing Multi-Customer Authentication
Concurrent Customer Session Management
Handle multiple KYC intakes simultaneously with proper session isolation:
from asyncio import Queue, create_task
from typing import Set
class MultiCustomerKYCManager:
    def __init__(self, max_concurrent: int = 10):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.active_sessions: Dict[str, Any] = {}
        self.processing_queue = Queue(maxsize=max_concurrent)
        self.completed_verifications: Set[str] = set()
    async def process_batch_kyc(self, customer_list: List[Dict]) -> Dict:
        """Process multiple KYC intakes concurrently"""
        tasks = []
        for customer in customer_list:
            task = create_task(self.process_single_kyc(
                customer["id"],
                customer["email"]
            ))
            tasks.append(task)
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return {
            "total_processed": len(results),
            "successful": len([r for r in results if not isinstance(r, Exception)]),
            "failed": len([r for r in results if isinstance(r, Exception)]),
            "details": results
        }
    async def process_single_kyc(self, customer_id: str, email: str) -> Dict:
        """Process individual KYC with authentication handling"""
        await self.processing_queue.put(customer_id)
        try:
            # Initialize customer authentication
            auth_status = await self.authenticate_customer(customer_id)
            if not auth_status["authenticated"]:
                return {
                    "customer_id": customer_id,
                    "status": "pending_authorization",
                    "auth_url": auth_status.get("auth_url")
                }
            # Collect required documents
            documents = await self.collect_all_documents(customer_id)
            # Verify document compliance
            verification = await self.verify_documents(customer_id, documents)
            self.completed_verifications.add(customer_id)
            return {
                "customer_id": customer_id,
                "status": "completed",
                "verification_result": verification
            }
        finally:
            await self.processing_queue.get()OAuth Flow Management for KYC
Implement secure OAuth handling with automatic retry and refresh:
class KYCAuthenticationHandler:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.pending_authorizations: Dict[str, Any] = {}
    async def handle_customer_authorization(self, customer_id: str, callback_url: str) -> Dict:
        """Manage OAuth flow for document access"""
        # Check existing authorization status
        existing_auth = self.pending_authorizations.get(customer_id)
        if existing_auth and existing_auth["status"] == "completed":
            return {"status": "already_authorized", "customer_id": customer_id}
        # Request Google Docs/Drive authorization
        auth_response = await self.arcade.tools.authorize(
            tool_name="GoogleDocs.SearchAndRetrieveDocuments",
            user_id=customer_id,
            redirect_uri=callback_url
        )
        # Store pending authorization
        self.pending_authorizations[customer_id] = {
            "auth_id": auth_response.id,
            "auth_url": auth_response.url,
            "status": "pending",
            "requested_at": datetime.now()
        }
        return {
            "status": "authorization_required",
            "auth_url": auth_response.url,
            "customer_id": customer_id
        }
    async def complete_authorization(self, customer_id: str, auth_code: str) -> Dict:
        """Complete OAuth callback and validate access"""
        pending = self.pending_authorizations.get(customer_id)
        if not pending:
            return {"status": "error", "message": "No pending authorization found"}
        # Complete OAuth flow
        completion_result = await self.arcade.auth.complete_oauth(
            auth_id=pending["auth_id"],
            auth_code=auth_code,
            user_id=customer_id
        )
        if completion_result.success:
            # Verify document access permissions
            test_access = await self.verify_document_access(customer_id)
            if test_access["has_access"]:
                self.pending_authorizations[customer_id]["status"] = "completed"
                return {"status": "success", "customer_id": customer_id}
        return {"status": "failed", "error": completion_result.error}Security and Compliance Features
Audit Trail Implementation
Track every document access for regulatory compliance:
import hashlib
from datetime import datetime
import json
class KYCAuditLogger:
    def __init__(self, encryption_key: str):
        self.encryption_key = encryption_key
        self.audit_entries: List[Dict] = []
    async def log_document_access(self, event_data: Dict) -> None:
        """Create tamper-proof audit log entry"""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "customer_id": self.hash_customer_id(event_data["customer_id"]),
            "document_id": event_data["document_id"],
            "action": event_data["action"],
            "ip_address": event_data.get("ip_address"),
            "agent_version": event_data.get("agent_version"),
            "hash": ""
        }
        # Create cryptographic hash of entry
        entry_json = json.dumps(entry, sort_keys=True)
        entry["hash"] = hashlib.sha256(
            (entry_json + self.encryption_key).encode()
        ).hexdigest()
        self.audit_entries.append(entry)
        # Persist to secure storage
        await self.persist_audit_entry(entry)
    def hash_customer_id(self, customer_id: str) -> str:
        """Hash customer ID for privacy compliance"""
        return hashlib.sha256(
            (customer_id + self.encryption_key).encode()
        ).hexdigest()[:16]
    async def generate_compliance_report(self, start_date: datetime, end_date: datetime) -> Dict:
        """Generate KYC compliance report for regulators"""
        filtered_entries = [
            e for e in self.audit_entries
            if start_date <= datetime.fromisoformat(e["timestamp"]) <= end_date
        ]
        return {
            "period": {
                "start": start_date.isoformat(),
                "end": end_date.isoformat()
            },
            "total_verifications": len(filtered_entries),
            "unique_customers": len(set(e["customer_id"] for e in filtered_entries)),
            "document_types_accessed": self.categorize_documents(filtered_entries),
            "audit_entries": filtered_entries
        }Token Security and Isolation
Implement secure token handling without exposure to the AI model:
class SecureKYCTokenManager:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        # Tokens are never stored locally - Arcade manages them
    async def execute_with_customer_context(self, customer_id: str, operation: str, params: Dict) -> Dict:
        """Execute Google Docs operations with isolated customer credentials"""
        try:
            # Arcade handles token retrieval and refresh automatically
            response = await self.arcade.tools.execute(
                tool_name=f"GoogleDocs.{operation}",
                input=params,
                user_id=customer_id
            )
            # Log access without exposing tokens
            await self.log_secure_access(customer_id, operation)
            return {"success": True, "data": response.output}
        except Exception as e:
            if "token_expired" in str(e):
                # Arcade handles refresh automatically
                return await self.retry_with_refresh(customer_id, operation, params)
            elif "insufficient_scope" in str(e):
                # Request additional permissions
                return {
                    "success": False,
                    "requires_reauth": True,
                    "additional_scopes": self.extract_required_scopes(e)
                }
            raiseProduction Deployment Configuration
Kubernetes Deployment for Scale
Deploy your KYC agent with high availability and auto-scaling:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kyc-intake-agent
  namespace: production
spec:
  replicas: 5
  selector:
    matchLabels:
      app: kyc-agent
  template:
    metadata:
      labels:
        app: kyc-agent
    spec:
      containers:
        - name: arcade-kyc-engine
          image: ghcr.io/arcadeai/engine:latest
          env:
            - name: ARCADE_API_KEY
              valueFrom:
                secretKeyRef:
                  name: arcade-secrets
                  key: api-key
            - name: GOOGLE_CLIENT_ID
              valueFrom:
                secretKeyRef:
                  name: google-oauth
                  key: client-id
            - name: GOOGLE_CLIENT_SECRET
              valueFrom:
                secretKeyRef:
                  name: google-oauth
                  key: client-secret
            - name: KYC_COMPLIANCE_MODE
              value: "strict"
          resources:
            requests:
              memory: "1Gi"
              cpu: "1000m"
            limits:
              memory: "2Gi"
              cpu: "2000m"
          livenessProbe:
            httpGet:
              path: /health
              port: 9099
            initialDelaySeconds: 30
            periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: kyc-agent-service
spec:
  selector:
    app: kyc-agent
  ports:
    - protocol: TCP
      port: 443
      targetPort: 9099
  type: LoadBalancerPerformance Optimization
Implement caching and batch processing for high-volume KYC operations:
from functools import lru_cache
import asyncio
class OptimizedKYCProcessor:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.document_cache = {}
        self.batch_size = 20
    @lru_cache(maxsize=1000)
    async def get_cached_document_metadata(self, customer_id: str, document_id: str) -> Dict:
        """Cache document metadata to reduce API calls"""
        cache_key = f"{customer_id}:{document_id}"
        if cache_key in self.document_cache:
            cached = self.document_cache[cache_key]
            if (datetime.now() - cached["timestamp"]).seconds < 3600:
                return cached["data"]
        # Fetch fresh metadata
        response = await self.arcade.tools.execute(
            tool_name="GoogleDrive.GetFileMetadata",
            input={"file_id": document_id},
            user_id=customer_id
        )
        self.document_cache[cache_key] = {
            "data": response.output,
            "timestamp": datetime.now()
        }
        return response.output
    async def batch_process_documents(self, customer_id: str, document_ids: List[str]) -> List[Dict]:
        """Process multiple documents in optimized batches"""
        results = []
        for i in range(0, len(document_ids), self.batch_size):
            batch = document_ids[i:i + self.batch_size]
            batch_tasks = [
                self.process_document(customer_id, doc_id)
                for doc_id in batch
            ]
            batch_results = await asyncio.gather(*batch_tasks)
            results.extend(batch_results)
            # Rate limiting pause between batches
            await asyncio.sleep(0.5)
        return resultsMonitoring and Error Handling
Comprehensive Error Recovery
Handle authentication failures and document access issues gracefully:
class KYCErrorHandler:
    def __init__(self):
        self.retry_limits = {
            "auth_failed": 3,
            "rate_limit": 5,
            "document_access": 2
        }
        self.retry_counts = {}
    async def handle_kyc_error(self, error: Exception, context: Dict) -> Dict:
        """Intelligent error handling with recovery strategies"""
        error_type = self.classify_error(error)
        customer_id = context.get("customer_id")
        # Track retry attempts
        retry_key = f"{customer_id}:{error_type}"
        self.retry_counts[retry_key] = self.retry_counts.get(retry_key, 0) + 1
        if self.retry_counts[retry_key] > self.retry_limits.get(error_type, 3):
            return {
                "status": "failed",
                "error": "Maximum retry attempts exceeded",
                "escalate_to": "manual_review"
            }
        # Error-specific recovery strategies
        recovery_strategies = {
            "token_expired": self.refresh_customer_token,
            "insufficient_scope": self.request_additional_scopes,
            "document_not_found": self.search_alternative_documents,
            "rate_limit_exceeded": self.implement_backoff,
            "authorization_required": self.reinitiate_oauth
        }
        strategy = recovery_strategies.get(error_type)
        if strategy:
            return await strategy(customer_id, context)
        return {"status": "unrecoverable_error", "error": str(error)}
    async def implement_backoff(self, customer_id: str, context: Dict) -> Dict:
        """Exponential backoff for rate limiting"""
        retry_count = self.retry_counts.get(f"{customer_id}:rate_limit", 1)
        wait_time = (2 ** retry_count) + random.uniform(0, 1)
        await asyncio.sleep(wait_time)
        return {"status": "retry", "wait_time": wait_time}Testing Your KYC Agent
Integration Testing Framework
Create comprehensive tests for your KYC intake workflow:
import pytest
from unittest.mock import Mock, patch
class TestKYCIntakeAgent:
    @pytest.fixture
    async def kyc_agent(self):
        """Initialize test agent with mock Arcade client"""
        with patch('arcadepy.Arcade') as mock_arcade:
            agent = KYCIntakeAgent()
            agent.arcade = mock_arcade
            return agent
    async def test_customer_authorization_flow(self, kyc_agent):
        """Test complete authorization workflow"""
        # Mock authorization response
        kyc_agent.arcade.tools.authorize.return_value = Mock(
            status="pending",
            url="https://accounts.google.com/oauth/authorize?..."
        )
        result = await kyc_agent.initiate_kyc_intake("test_customer_123", "customer@example.com")
        assert result["status"] == "authorization_required"
        assert "auth_url" in result
    async def test_document_collection(self, kyc_agent):
        """Test document search and retrieval"""
        # Mock document search results
        kyc_agent.arcade.tools.execute.return_value = Mock(
            output={"files": [
                {"id": "doc123", "name": "passport.pdf", "mimeType": "application/pdf"},
                {"id": "doc456", "name": "bank_statement.pdf", "mimeType": "application/pdf"}
            ]}
        )
        documents = await kyc_agent.search_kyc_documents("test_customer_123", "government_id")
        assert len(documents) == 2
        assert documents[0]["name"] == "passport.pdf"Conclusion
Building an MCP-authenticated KYC intake agent with Arcade's Google Docs toolkit transforms document collection from a manual, error-prone process into an automated, secure workflow. The platform's OAuth management, tool-calling architecture, and enterprise-grade security features enable financial institutions to meet compliance requirements while providing seamless customer experiences.
Key takeaways for production deployment:
- Authentication First: Arcade's managed OAuth ensures your AI agent never handles raw credentials
- Compliance Built-In: Audit logging and granular permissions satisfy regulatory requirements
- Scale Ready: Multi-customer session management handles thousands of concurrent KYC intakes
- Security by Design: Token isolation and encryption protect sensitive customer data
For additional implementation details and toolkit documentation, explore the Arcade documentation and toolkit reference. To see example implementations and contribute to the community, visit the Arcade GitHub repository.
 
  
  
 



 
 