How to Build an MCP-Authenticated KYC Intake Agent with Arcade's Google Docs Toolkit

How to Build an MCP-Authenticated KYC Intake Agent with Arcade's Google Docs Toolkit

Arcade.dev Team's avatar
Arcade.dev Team
OCTOBER 16, 2025
9 MIN READ
TUTORIALS
Rays decoration image
Ghost Icon

Know Your Customer (KYC) processes require secure document collection, verification, and storage. Building AI agents that handle KYC workflows faces a critical challenge: accessing customer documents across multiple Google Workspace accounts while maintaining strict security boundaries. This guide demonstrates how to build a production-ready KYC intake agent using Arcade.dev's Model Context Protocol (MCP) authentication and Google Docs toolkit.

Prerequisites and Setup Requirements

Before implementing your KYC agent, ensure your environment meets these requirements:

  • Active Arcade.dev account with API key
  • Google Cloud Console project with OAuth 2.0 credentials configured
  • Python 3.8+ or Node.js 16+ development environment
  • Google Workspace API access enabled for your domain
  • Basic knowledge of OAuth flows and document processing

Initial Environment Configuration

Start by installing the Arcade Python client and configuring your authentication credentials:

# Install core Arcade components
pip install arcadepy arcade_tdk arcade-ai

# Set environment variables
export ARCADE_API_KEY="your_arcade_api_key"
export GOOGLE_CLIENT_ID="your_google_client_id"
export GOOGLE_CLIENT_SECRET="your_google_client_secret"
export GOOGLE_WORKSPACE_DOMAIN="your_domain.com"

KYC Document Workflow Architecture

The Authentication Challenge in KYC Processes

Traditional KYC systems struggle with three key problems that Arcade.dev solves:

  • Document Access Fragmentation: Customers store KYC documents across personal and business Google accounts
  • Compliance Requirements: Financial regulations demand granular audit trails for every document access
  • Security Boundaries: AI agents must never have direct access to authentication tokens or raw document data

Arcade's platform addresses these challenges through managed OAuth flows and tool-calling isolation. The platform acts as an authentication broker, enabling your KYC agent to request documents from customers without handling credentials directly.

Building the Core KYC Agent

Document Collection Pattern

This Python class implements the fundamental pattern for collecting KYC documents from multiple customer accounts:

from typing import Dict, List, Any
from arcadepy import Arcade
import os

class KYCIntakeAgent:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.required_documents = [
            "government_id",
            "proof_of_address",
            "bank_statement",
            "incorporation_docs"
        ]
        self.customer_sessions: Dict[str, Any] = {}

    async def initiate_kyc_intake(self, customer_id: str, email: str) -> Dict:
        """Start KYC document collection for a customer"""

        # Check if Google Drive access is authorized
        auth_response = await self.arcade.tools.authorize(
            tool_name="GoogleDocs.SearchAndRetrieveDocuments",
            user_id=customer_id
        )

        if auth_response.status != "completed":
            return {
                "status": "authorization_required",
                "auth_url": auth_response.url,
                "message": "Please authorize access to your Google Drive documents"
            }

        # Wait for OAuth completion
        await self.arcade.auth.wait_for_completion(auth_response)

        # Initialize document collection session
        self.customer_sessions[customer_id] = {
            "email": email,
            "status": "collecting",
            "documents_collected": [],
            "timestamp": datetime.now()
        }

        return {"status": "ready", "customer_id": customer_id}

    async def search_kyc_documents(self, customer_id: str, document_type: str) -> List[Dict]:
        """Search customer's Drive for specific KYC document types"""

        search_patterns = {
            "government_id": ["passport", "driver license", "national id"],
            "proof_of_address": ["utility bill", "bank statement address", "lease"],
            "bank_statement": ["bank statement", "financial statement"],
            "incorporation_docs": ["articles incorporation", "certificate incorporation"]
        }

        results = []
        for pattern in search_patterns.get(document_type, []):
            response = await self.arcade.tools.execute(
                tool_name="GoogleDrive.SearchFiles",
                input={"query": f"name contains '{pattern}'"},
                user_id=customer_id
            )
            results.extend(response.output.get("files", []))

        return results

Document Verification Workflow

Implement document extraction and verification with proper error handling:

class KYCDocumentProcessor:
    def __init__(self, arcade_client: Arcade):
        self.arcade = arcade_client
        self.verification_rules = self.load_verification_rules()

    async def extract_document_content(self, customer_id: str, document_id: str) -> Dict:
        """Extract and parse document content for verification"""

        try:
            # Fetch document metadata
            metadata_response = await self.arcade.tools.execute(
                tool_name="GoogleDrive.GetFileMetadata",
                input={"file_id": document_id},
                user_id=customer_id
            )

            # Extract document content based on type
            if metadata_response.output.get("mimeType") == "application/vnd.google-apps.document":
                content_response = await self.arcade.tools.execute(
                    tool_name="GoogleDocs.GetDocument",
                    input={"document_id": document_id},
                    user_id=customer_id
                )

                return {
                    "document_id": document_id,
                    "content": self.parse_document_content(content_response.output),
                    "metadata": metadata_response.output,
                    "extraction_status": "success"
                }

            # Handle PDFs and images
            elif metadata_response.output.get("mimeType") in ["application/pdf", "image/png", "image/jpeg"]:
                # Export for OCR processing
                export_response = await self.arcade.tools.execute(
                    tool_name="GoogleDrive.ExportFile",
                    input={
                        "file_id": document_id,
                        "mime_type": "application/pdf"
                    },
                    user_id=customer_id
                )

                return {
                    "document_id": document_id,
                    "export_url": export_response.output.get("export_link"),
                    "requires_ocr": True,
                    "metadata": metadata_response.output
                }

        except Exception as e:
            return {
                "document_id": document_id,
                "error": str(e),
                "extraction_status": "failed"
            }

    def parse_document_content(self, doc_content: Dict) -> Dict:
        """Parse Google Docs content structure"""

        extracted_data = {
            "text_content": "",
            "tables": [],
            "headers": []
        }

        # Extract text elements
        for element in doc_content.get("body", {}).get("content", []):
            if "paragraph" in element:
                text = self.extract_paragraph_text(element["paragraph"])
                extracted_data["text_content"] += text + "\n"

            elif "table" in element:
                table_data = self.extract_table_data(element["table"])
                extracted_data["tables"].append(table_data)

        return extracted_data

Implementing Multi-Customer Authentication

Concurrent Customer Session Management

Handle multiple KYC intakes simultaneously with proper session isolation:

from asyncio import Queue, create_task
from typing import Set

class MultiCustomerKYCManager:
    def __init__(self, max_concurrent: int = 10):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.active_sessions: Dict[str, Any] = {}
        self.processing_queue = Queue(maxsize=max_concurrent)
        self.completed_verifications: Set[str] = set()

    async def process_batch_kyc(self, customer_list: List[Dict]) -> Dict:
        """Process multiple KYC intakes concurrently"""

        tasks = []
        for customer in customer_list:
            task = create_task(self.process_single_kyc(
                customer["id"],
                customer["email"]
            ))
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

        return {
            "total_processed": len(results),
            "successful": len([r for r in results if not isinstance(r, Exception)]),
            "failed": len([r for r in results if isinstance(r, Exception)]),
            "details": results
        }

    async def process_single_kyc(self, customer_id: str, email: str) -> Dict:
        """Process individual KYC with authentication handling"""

        await self.processing_queue.put(customer_id)

        try:
            # Initialize customer authentication
            auth_status = await self.authenticate_customer(customer_id)

            if not auth_status["authenticated"]:
                return {
                    "customer_id": customer_id,
                    "status": "pending_authorization",
                    "auth_url": auth_status.get("auth_url")
                }

            # Collect required documents
            documents = await self.collect_all_documents(customer_id)

            # Verify document compliance
            verification = await self.verify_documents(customer_id, documents)

            self.completed_verifications.add(customer_id)

            return {
                "customer_id": customer_id,
                "status": "completed",
                "verification_result": verification
            }

        finally:
            await self.processing_queue.get()

OAuth Flow Management for KYC

Implement secure OAuth handling with automatic retry and refresh:

class KYCAuthenticationHandler:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.pending_authorizations: Dict[str, Any] = {}

    async def handle_customer_authorization(self, customer_id: str, callback_url: str) -> Dict:
        """Manage OAuth flow for document access"""

        # Check existing authorization status
        existing_auth = self.pending_authorizations.get(customer_id)

        if existing_auth and existing_auth["status"] == "completed":
            return {"status": "already_authorized", "customer_id": customer_id}

        # Request Google Docs/Drive authorization
        auth_response = await self.arcade.tools.authorize(
            tool_name="GoogleDocs.SearchAndRetrieveDocuments",
            user_id=customer_id,
            redirect_uri=callback_url
        )

        # Store pending authorization
        self.pending_authorizations[customer_id] = {
            "auth_id": auth_response.id,
            "auth_url": auth_response.url,
            "status": "pending",
            "requested_at": datetime.now()
        }

        return {
            "status": "authorization_required",
            "auth_url": auth_response.url,
            "customer_id": customer_id
        }

    async def complete_authorization(self, customer_id: str, auth_code: str) -> Dict:
        """Complete OAuth callback and validate access"""

        pending = self.pending_authorizations.get(customer_id)

        if not pending:
            return {"status": "error", "message": "No pending authorization found"}

        # Complete OAuth flow
        completion_result = await self.arcade.auth.complete_oauth(
            auth_id=pending["auth_id"],
            auth_code=auth_code,
            user_id=customer_id
        )

        if completion_result.success:
            # Verify document access permissions
            test_access = await self.verify_document_access(customer_id)

            if test_access["has_access"]:
                self.pending_authorizations[customer_id]["status"] = "completed"
                return {"status": "success", "customer_id": customer_id}

        return {"status": "failed", "error": completion_result.error}

Security and Compliance Features

Audit Trail Implementation

Track every document access for regulatory compliance:

import hashlib
from datetime import datetime
import json

class KYCAuditLogger:
    def __init__(self, encryption_key: str):
        self.encryption_key = encryption_key
        self.audit_entries: List[Dict] = []

    async def log_document_access(self, event_data: Dict) -> None:
        """Create tamper-proof audit log entry"""

        entry = {
            "timestamp": datetime.now().isoformat(),
            "customer_id": self.hash_customer_id(event_data["customer_id"]),
            "document_id": event_data["document_id"],
            "action": event_data["action"],
            "ip_address": event_data.get("ip_address"),
            "agent_version": event_data.get("agent_version"),
            "hash": ""
        }

        # Create cryptographic hash of entry
        entry_json = json.dumps(entry, sort_keys=True)
        entry["hash"] = hashlib.sha256(
            (entry_json + self.encryption_key).encode()
        ).hexdigest()

        self.audit_entries.append(entry)

        # Persist to secure storage
        await self.persist_audit_entry(entry)

    def hash_customer_id(self, customer_id: str) -> str:
        """Hash customer ID for privacy compliance"""
        return hashlib.sha256(
            (customer_id + self.encryption_key).encode()
        ).hexdigest()[:16]

    async def generate_compliance_report(self, start_date: datetime, end_date: datetime) -> Dict:
        """Generate KYC compliance report for regulators"""

        filtered_entries = [
            e for e in self.audit_entries
            if start_date <= datetime.fromisoformat(e["timestamp"]) <= end_date
        ]

        return {
            "period": {
                "start": start_date.isoformat(),
                "end": end_date.isoformat()
            },
            "total_verifications": len(filtered_entries),
            "unique_customers": len(set(e["customer_id"] for e in filtered_entries)),
            "document_types_accessed": self.categorize_documents(filtered_entries),
            "audit_entries": filtered_entries
        }

Token Security and Isolation

Implement secure token handling without exposure to the AI model:

class SecureKYCTokenManager:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        # Tokens are never stored locally - Arcade manages them

    async def execute_with_customer_context(self, customer_id: str, operation: str, params: Dict) -> Dict:
        """Execute Google Docs operations with isolated customer credentials"""

        try:
            # Arcade handles token retrieval and refresh automatically
            response = await self.arcade.tools.execute(
                tool_name=f"GoogleDocs.{operation}",
                input=params,
                user_id=customer_id
            )

            # Log access without exposing tokens
            await self.log_secure_access(customer_id, operation)

            return {"success": True, "data": response.output}

        except Exception as e:
            if "token_expired" in str(e):
                # Arcade handles refresh automatically
                return await self.retry_with_refresh(customer_id, operation, params)

            elif "insufficient_scope" in str(e):
                # Request additional permissions
                return {
                    "success": False,
                    "requires_reauth": True,
                    "additional_scopes": self.extract_required_scopes(e)
                }

            raise

Production Deployment Configuration

Kubernetes Deployment for Scale

Deploy your KYC agent with high availability and auto-scaling:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kyc-intake-agent
  namespace: production
spec:
  replicas: 5
  selector:
    matchLabels:
      app: kyc-agent
  template:
    metadata:
      labels:
        app: kyc-agent
    spec:
      containers:
        - name: arcade-kyc-engine
          image: ghcr.io/arcadeai/engine:latest
          env:
            - name: ARCADE_API_KEY
              valueFrom:
                secretKeyRef:
                  name: arcade-secrets
                  key: api-key
            - name: GOOGLE_CLIENT_ID
              valueFrom:
                secretKeyRef:
                  name: google-oauth
                  key: client-id
            - name: GOOGLE_CLIENT_SECRET
              valueFrom:
                secretKeyRef:
                  name: google-oauth
                  key: client-secret
            - name: KYC_COMPLIANCE_MODE
              value: "strict"
          resources:
            requests:
              memory: "1Gi"
              cpu: "1000m"
            limits:
              memory: "2Gi"
              cpu: "2000m"
          livenessProbe:
            httpGet:
              path: /health
              port: 9099
            initialDelaySeconds: 30
            periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: kyc-agent-service
spec:
  selector:
    app: kyc-agent
  ports:
    - protocol: TCP
      port: 443
      targetPort: 9099
  type: LoadBalancer

Performance Optimization

Implement caching and batch processing for high-volume KYC operations:

from functools import lru_cache
import asyncio

class OptimizedKYCProcessor:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.document_cache = {}
        self.batch_size = 20

    @lru_cache(maxsize=1000)
    async def get_cached_document_metadata(self, customer_id: str, document_id: str) -> Dict:
        """Cache document metadata to reduce API calls"""

        cache_key = f"{customer_id}:{document_id}"

        if cache_key in self.document_cache:
            cached = self.document_cache[cache_key]
            if (datetime.now() - cached["timestamp"]).seconds < 3600:
                return cached["data"]

        # Fetch fresh metadata
        response = await self.arcade.tools.execute(
            tool_name="GoogleDrive.GetFileMetadata",
            input={"file_id": document_id},
            user_id=customer_id
        )

        self.document_cache[cache_key] = {
            "data": response.output,
            "timestamp": datetime.now()
        }

        return response.output

    async def batch_process_documents(self, customer_id: str, document_ids: List[str]) -> List[Dict]:
        """Process multiple documents in optimized batches"""

        results = []

        for i in range(0, len(document_ids), self.batch_size):
            batch = document_ids[i:i + self.batch_size]

            batch_tasks = [
                self.process_document(customer_id, doc_id)
                for doc_id in batch
            ]

            batch_results = await asyncio.gather(*batch_tasks)
            results.extend(batch_results)

            # Rate limiting pause between batches
            await asyncio.sleep(0.5)

        return results

Monitoring and Error Handling

Comprehensive Error Recovery

Handle authentication failures and document access issues gracefully:

class KYCErrorHandler:
    def __init__(self):
        self.retry_limits = {
            "auth_failed": 3,
            "rate_limit": 5,
            "document_access": 2
        }
        self.retry_counts = {}

    async def handle_kyc_error(self, error: Exception, context: Dict) -> Dict:
        """Intelligent error handling with recovery strategies"""

        error_type = self.classify_error(error)
        customer_id = context.get("customer_id")

        # Track retry attempts
        retry_key = f"{customer_id}:{error_type}"
        self.retry_counts[retry_key] = self.retry_counts.get(retry_key, 0) + 1

        if self.retry_counts[retry_key] > self.retry_limits.get(error_type, 3):
            return {
                "status": "failed",
                "error": "Maximum retry attempts exceeded",
                "escalate_to": "manual_review"
            }

        # Error-specific recovery strategies
        recovery_strategies = {
            "token_expired": self.refresh_customer_token,
            "insufficient_scope": self.request_additional_scopes,
            "document_not_found": self.search_alternative_documents,
            "rate_limit_exceeded": self.implement_backoff,
            "authorization_required": self.reinitiate_oauth
        }

        strategy = recovery_strategies.get(error_type)

        if strategy:
            return await strategy(customer_id, context)

        return {"status": "unrecoverable_error", "error": str(error)}

    async def implement_backoff(self, customer_id: str, context: Dict) -> Dict:
        """Exponential backoff for rate limiting"""

        retry_count = self.retry_counts.get(f"{customer_id}:rate_limit", 1)
        wait_time = (2 ** retry_count) + random.uniform(0, 1)

        await asyncio.sleep(wait_time)

        return {"status": "retry", "wait_time": wait_time}

Testing Your KYC Agent

Integration Testing Framework

Create comprehensive tests for your KYC intake workflow:

import pytest
from unittest.mock import Mock, patch

class TestKYCIntakeAgent:
    @pytest.fixture
    async def kyc_agent(self):
        """Initialize test agent with mock Arcade client"""
        with patch('arcadepy.Arcade') as mock_arcade:
            agent = KYCIntakeAgent()
            agent.arcade = mock_arcade
            return agent

    async def test_customer_authorization_flow(self, kyc_agent):
        """Test complete authorization workflow"""

        # Mock authorization response
        kyc_agent.arcade.tools.authorize.return_value = Mock(
            status="pending",
            url="https://accounts.google.com/oauth/authorize?..."
        )

        result = await kyc_agent.initiate_kyc_intake("test_customer_123", "customer@example.com")

        assert result["status"] == "authorization_required"
        assert "auth_url" in result

    async def test_document_collection(self, kyc_agent):
        """Test document search and retrieval"""

        # Mock document search results
        kyc_agent.arcade.tools.execute.return_value = Mock(
            output={"files": [
                {"id": "doc123", "name": "passport.pdf", "mimeType": "application/pdf"},
                {"id": "doc456", "name": "bank_statement.pdf", "mimeType": "application/pdf"}
            ]}
        )

        documents = await kyc_agent.search_kyc_documents("test_customer_123", "government_id")

        assert len(documents) == 2
        assert documents[0]["name"] == "passport.pdf"

Conclusion

Building an MCP-authenticated KYC intake agent with Arcade's Google Docs toolkit transforms document collection from a manual, error-prone process into an automated, secure workflow. The platform's OAuth management, tool-calling architecture, and enterprise-grade security features enable financial institutions to meet compliance requirements while providing seamless customer experiences.

Key takeaways for production deployment:

  • Authentication First: Arcade's managed OAuth ensures your AI agent never handles raw credentials
  • Compliance Built-In: Audit logging and granular permissions satisfy regulatory requirements
  • Scale Ready: Multi-customer session management handles thousands of concurrent KYC intakes
  • Security by Design: Token isolation and encryption protect sensitive customer data

For additional implementation details and toolkit documentation, explore the Arcade documentation and toolkit reference. To see example implementations and contribute to the community, visit the Arcade GitHub repository.

SHARE THIS POST

RECENT ARTICLES

Rays decoration image
THOUGHT LEADERSHIP

Enterprise MCP Guide For Retail Banking & Payments: Use Cases, Best Practices, and Trends

The global payments industry processes $2.0 quadrillion in value flows annually, generating $2.5 trillion in revenue. Yet despite decades of digital transformation investment, critical banking operations,anti-money laundering investigation, KYC onboarding, payment reconciliation,remain largely manual. Model Context Protocol (MCP) represents the infrastructure breakthrough that enables financial institutions to move beyond chatbot pilots to production-grade AI agents that take multi-user authoriz

Rays decoration image
THOUGHT LEADERSHIP

Enterprise MCP Guide For Capital Markets & Trading: Use Cases, Best Practices, and Trends

Capital markets technology leaders face a critical infrastructure challenge: scattered AI pilots, disconnected integrations, and fragmented, domain-specific systems that turn engineers into human APIs manually stitching together trading platforms, market data feeds, and risk management tools. The Model Context Protocol (MCP) represents a fundamental shift from this costly one-off integration approach to a universal standardization layer that acts as the backbone for AI-native financial enterpris

Rays decoration image
THOUGHT LEADERSHIP

Enterprise MCP Guide For InsurTech: Use Cases, Best Practices, and Trends

The insurance industry faces a pivotal transformation moment. Model Context Protocol (MCP) has moved from experimental technology to production infrastructure, with 16,000+ active servers deployed across enterprises and millions of weekly SDK downloads. For InsurTech leaders, the question is no longer whether to adopt MCP, but how to implement it securely and effectively. Arcade's platform provides the MCP runtime for secure, multi-user authorization so AI agents can act on behalf of users acros

Blog CTA Icon

Get early access to Arcade, and start building now.