How to Build an MCP-Authenticated KYC Intake Agent with Arcade's Google Docs Toolkit

How to Build an MCP-Authenticated KYC Intake Agent with Arcade's Google Docs Toolkit

Arcade.dev Team's avatar
Arcade.dev Team
OCTOBER 16, 2025
8 MIN READ
TUTORIALS
Rays decoration image
Ghost Icon

Know Your Customer (KYC) processes require secure document collection, verification, and storage. Building AI agents that handle KYC workflows faces a critical challenge: accessing customer documents across multiple Google Workspace accounts while maintaining strict security boundaries. This guide demonstrates how to build a production-ready KYC intake agent using Arcade.dev's Model Context Protocol (MCP) authentication and Google Docs toolkit.

Prerequisites and Setup Requirements

Before implementing your KYC agent, ensure your environment meets these requirements:

  • Active Arcade.dev account with API key
  • Google Cloud Console project with OAuth 2.0 credentials configured
  • Python 3.8+ or Node.js 16+ development environment
  • Google Workspace API access enabled for your domain
  • Basic knowledge of OAuth flows and document processing

Initial Environment Configuration

Start by installing the Arcade Python client and configuring your authentication credentials:

# Install core Arcade components
pip install arcadepy arcade_tdk arcade-ai

# Set environment variables
export ARCADE_API_KEY="your_arcade_api_key"
export GOOGLE_CLIENT_ID="your_google_client_id"
export GOOGLE_CLIENT_SECRET="your_google_client_secret"
export GOOGLE_WORKSPACE_DOMAIN="your_domain.com"

KYC Document Workflow Architecture

The Authentication Challenge in KYC Processes

Traditional KYC systems struggle with three key problems that Arcade.dev solves:

  • Document Access Fragmentation: Customers store KYC documents across personal and business Google accounts
  • Compliance Requirements: Financial regulations demand granular audit trails for every document access
  • Security Boundaries: AI agents must never have direct access to authentication tokens or raw document data

Arcade's platform addresses these challenges through managed OAuth flows and tool-calling isolation. The platform acts as an authentication broker, enabling your KYC agent to request documents from customers without handling credentials directly.

Building the Core KYC Agent

Document Collection Pattern

This Python class implements the fundamental pattern for collecting KYC documents from multiple customer accounts:

from typing import Dict, List, Any
from arcadepy import Arcade
import os

class KYCIntakeAgent:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.required_documents = [
            "government_id",
            "proof_of_address",
            "bank_statement",
            "incorporation_docs"
        ]
        self.customer_sessions: Dict[str, Any] = {}

    async def initiate_kyc_intake(self, customer_id: str, email: str) -> Dict:
        """Start KYC document collection for a customer"""

        # Check if Google Drive access is authorized
        auth_response = await self.arcade.tools.authorize(
            tool_name="GoogleDocs.SearchAndRetrieveDocuments",
            user_id=customer_id
        )

        if auth_response.status != "completed":
            return {
                "status": "authorization_required",
                "auth_url": auth_response.url,
                "message": "Please authorize access to your Google Drive documents"
            }

        # Wait for OAuth completion
        await self.arcade.auth.wait_for_completion(auth_response)

        # Initialize document collection session
        self.customer_sessions[customer_id] = {
            "email": email,
            "status": "collecting",
            "documents_collected": [],
            "timestamp": datetime.now()
        }

        return {"status": "ready", "customer_id": customer_id}

    async def search_kyc_documents(self, customer_id: str, document_type: str) -> List[Dict]:
        """Search customer's Drive for specific KYC document types"""

        search_patterns = {
            "government_id": ["passport", "driver license", "national id"],
            "proof_of_address": ["utility bill", "bank statement address", "lease"],
            "bank_statement": ["bank statement", "financial statement"],
            "incorporation_docs": ["articles incorporation", "certificate incorporation"]
        }

        results = []
        for pattern in search_patterns.get(document_type, []):
            response = await self.arcade.tools.execute(
                tool_name="GoogleDrive.SearchFiles",
                input={"query": f"name contains '{pattern}'"},
                user_id=customer_id
            )
            results.extend(response.output.get("files", []))

        return results

Document Verification Workflow

Implement document extraction and verification with proper error handling:

class KYCDocumentProcessor:
    def __init__(self, arcade_client: Arcade):
        self.arcade = arcade_client
        self.verification_rules = self.load_verification_rules()

    async def extract_document_content(self, customer_id: str, document_id: str) -> Dict:
        """Extract and parse document content for verification"""

        try:
            # Fetch document metadata
            metadata_response = await self.arcade.tools.execute(
                tool_name="GoogleDrive.GetFileMetadata",
                input={"file_id": document_id},
                user_id=customer_id
            )

            # Extract document content based on type
            if metadata_response.output.get("mimeType") == "application/vnd.google-apps.document":
                content_response = await self.arcade.tools.execute(
                    tool_name="GoogleDocs.GetDocument",
                    input={"document_id": document_id},
                    user_id=customer_id
                )

                return {
                    "document_id": document_id,
                    "content": self.parse_document_content(content_response.output),
                    "metadata": metadata_response.output,
                    "extraction_status": "success"
                }

            # Handle PDFs and images
            elif metadata_response.output.get("mimeType") in ["application/pdf", "image/png", "image/jpeg"]:
                # Export for OCR processing
                export_response = await self.arcade.tools.execute(
                    tool_name="GoogleDrive.ExportFile",
                    input={
                        "file_id": document_id,
                        "mime_type": "application/pdf"
                    },
                    user_id=customer_id
                )

                return {
                    "document_id": document_id,
                    "export_url": export_response.output.get("export_link"),
                    "requires_ocr": True,
                    "metadata": metadata_response.output
                }

        except Exception as e:
            return {
                "document_id": document_id,
                "error": str(e),
                "extraction_status": "failed"
            }

    def parse_document_content(self, doc_content: Dict) -> Dict:
        """Parse Google Docs content structure"""

        extracted_data = {
            "text_content": "",
            "tables": [],
            "headers": []
        }

        # Extract text elements
        for element in doc_content.get("body", {}).get("content", []):
            if "paragraph" in element:
                text = self.extract_paragraph_text(element["paragraph"])
                extracted_data["text_content"] += text + "\n"

            elif "table" in element:
                table_data = self.extract_table_data(element["table"])
                extracted_data["tables"].append(table_data)

        return extracted_data

Implementing Multi-Customer Authentication

Concurrent Customer Session Management

Handle multiple KYC intakes simultaneously with proper session isolation:

from asyncio import Queue, create_task
from typing import Set

class MultiCustomerKYCManager:
    def __init__(self, max_concurrent: int = 10):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.active_sessions: Dict[str, Any] = {}
        self.processing_queue = Queue(maxsize=max_concurrent)
        self.completed_verifications: Set[str] = set()

    async def process_batch_kyc(self, customer_list: List[Dict]) -> Dict:
        """Process multiple KYC intakes concurrently"""

        tasks = []
        for customer in customer_list:
            task = create_task(self.process_single_kyc(
                customer["id"],
                customer["email"]
            ))
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

        return {
            "total_processed": len(results),
            "successful": len([r for r in results if not isinstance(r, Exception)]),
            "failed": len([r for r in results if isinstance(r, Exception)]),
            "details": results
        }

    async def process_single_kyc(self, customer_id: str, email: str) -> Dict:
        """Process individual KYC with authentication handling"""

        await self.processing_queue.put(customer_id)

        try:
            # Initialize customer authentication
            auth_status = await self.authenticate_customer(customer_id)

            if not auth_status["authenticated"]:
                return {
                    "customer_id": customer_id,
                    "status": "pending_authorization",
                    "auth_url": auth_status.get("auth_url")
                }

            # Collect required documents
            documents = await self.collect_all_documents(customer_id)

            # Verify document compliance
            verification = await self.verify_documents(customer_id, documents)

            self.completed_verifications.add(customer_id)

            return {
                "customer_id": customer_id,
                "status": "completed",
                "verification_result": verification
            }

        finally:
            await self.processing_queue.get()

OAuth Flow Management for KYC

Implement secure OAuth handling with automatic retry and refresh:

class KYCAuthenticationHandler:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.pending_authorizations: Dict[str, Any] = {}

    async def handle_customer_authorization(self, customer_id: str, callback_url: str) -> Dict:
        """Manage OAuth flow for document access"""

        # Check existing authorization status
        existing_auth = self.pending_authorizations.get(customer_id)

        if existing_auth and existing_auth["status"] == "completed":
            return {"status": "already_authorized", "customer_id": customer_id}

        # Request Google Docs/Drive authorization
        auth_response = await self.arcade.tools.authorize(
            tool_name="GoogleDocs.SearchAndRetrieveDocuments",
            user_id=customer_id,
            redirect_uri=callback_url
        )

        # Store pending authorization
        self.pending_authorizations[customer_id] = {
            "auth_id": auth_response.id,
            "auth_url": auth_response.url,
            "status": "pending",
            "requested_at": datetime.now()
        }

        return {
            "status": "authorization_required",
            "auth_url": auth_response.url,
            "customer_id": customer_id
        }

    async def complete_authorization(self, customer_id: str, auth_code: str) -> Dict:
        """Complete OAuth callback and validate access"""

        pending = self.pending_authorizations.get(customer_id)

        if not pending:
            return {"status": "error", "message": "No pending authorization found"}

        # Complete OAuth flow
        completion_result = await self.arcade.auth.complete_oauth(
            auth_id=pending["auth_id"],
            auth_code=auth_code,
            user_id=customer_id
        )

        if completion_result.success:
            # Verify document access permissions
            test_access = await self.verify_document_access(customer_id)

            if test_access["has_access"]:
                self.pending_authorizations[customer_id]["status"] = "completed"
                return {"status": "success", "customer_id": customer_id}

        return {"status": "failed", "error": completion_result.error}

Security and Compliance Features

Audit Trail Implementation

Track every document access for regulatory compliance:

import hashlib
from datetime import datetime
import json

class KYCAuditLogger:
    def __init__(self, encryption_key: str):
        self.encryption_key = encryption_key
        self.audit_entries: List[Dict] = []

    async def log_document_access(self, event_data: Dict) -> None:
        """Create tamper-proof audit log entry"""

        entry = {
            "timestamp": datetime.now().isoformat(),
            "customer_id": self.hash_customer_id(event_data["customer_id"]),
            "document_id": event_data["document_id"],
            "action": event_data["action"],
            "ip_address": event_data.get("ip_address"),
            "agent_version": event_data.get("agent_version"),
            "hash": ""
        }

        # Create cryptographic hash of entry
        entry_json = json.dumps(entry, sort_keys=True)
        entry["hash"] = hashlib.sha256(
            (entry_json + self.encryption_key).encode()
        ).hexdigest()

        self.audit_entries.append(entry)

        # Persist to secure storage
        await self.persist_audit_entry(entry)

    def hash_customer_id(self, customer_id: str) -> str:
        """Hash customer ID for privacy compliance"""
        return hashlib.sha256(
            (customer_id + self.encryption_key).encode()
        ).hexdigest()[:16]

    async def generate_compliance_report(self, start_date: datetime, end_date: datetime) -> Dict:
        """Generate KYC compliance report for regulators"""

        filtered_entries = [
            e for e in self.audit_entries
            if start_date <= datetime.fromisoformat(e["timestamp"]) <= end_date
        ]

        return {
            "period": {
                "start": start_date.isoformat(),
                "end": end_date.isoformat()
            },
            "total_verifications": len(filtered_entries),
            "unique_customers": len(set(e["customer_id"] for e in filtered_entries)),
            "document_types_accessed": self.categorize_documents(filtered_entries),
            "audit_entries": filtered_entries
        }

Token Security and Isolation

Implement secure token handling without exposure to the AI model:

class SecureKYCTokenManager:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        # Tokens are never stored locally - Arcade manages them

    async def execute_with_customer_context(self, customer_id: str, operation: str, params: Dict) -> Dict:
        """Execute Google Docs operations with isolated customer credentials"""

        try:
            # Arcade handles token retrieval and refresh automatically
            response = await self.arcade.tools.execute(
                tool_name=f"GoogleDocs.{operation}",
                input=params,
                user_id=customer_id
            )

            # Log access without exposing tokens
            await self.log_secure_access(customer_id, operation)

            return {"success": True, "data": response.output}

        except Exception as e:
            if "token_expired" in str(e):
                # Arcade handles refresh automatically
                return await self.retry_with_refresh(customer_id, operation, params)

            elif "insufficient_scope" in str(e):
                # Request additional permissions
                return {
                    "success": False,
                    "requires_reauth": True,
                    "additional_scopes": self.extract_required_scopes(e)
                }

            raise

Production Deployment Configuration

Kubernetes Deployment for Scale

Deploy your KYC agent with high availability and auto-scaling:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kyc-intake-agent
  namespace: production
spec:
  replicas: 5
  selector:
    matchLabels:
      app: kyc-agent
  template:
    metadata:
      labels:
        app: kyc-agent
    spec:
      containers:
        - name: arcade-kyc-engine
          image: ghcr.io/arcadeai/engine:latest
          env:
            - name: ARCADE_API_KEY
              valueFrom:
                secretKeyRef:
                  name: arcade-secrets
                  key: api-key
            - name: GOOGLE_CLIENT_ID
              valueFrom:
                secretKeyRef:
                  name: google-oauth
                  key: client-id
            - name: GOOGLE_CLIENT_SECRET
              valueFrom:
                secretKeyRef:
                  name: google-oauth
                  key: client-secret
            - name: KYC_COMPLIANCE_MODE
              value: "strict"
          resources:
            requests:
              memory: "1Gi"
              cpu: "1000m"
            limits:
              memory: "2Gi"
              cpu: "2000m"
          livenessProbe:
            httpGet:
              path: /health
              port: 9099
            initialDelaySeconds: 30
            periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: kyc-agent-service
spec:
  selector:
    app: kyc-agent
  ports:
    - protocol: TCP
      port: 443
      targetPort: 9099
  type: LoadBalancer

Performance Optimization

Implement caching and batch processing for high-volume KYC operations:

from functools import lru_cache
import asyncio

class OptimizedKYCProcessor:
    def __init__(self):
        self.arcade = Arcade(api_key=os.environ.get("ARCADE_API_KEY"))
        self.document_cache = {}
        self.batch_size = 20

    @lru_cache(maxsize=1000)
    async def get_cached_document_metadata(self, customer_id: str, document_id: str) -> Dict:
        """Cache document metadata to reduce API calls"""

        cache_key = f"{customer_id}:{document_id}"

        if cache_key in self.document_cache:
            cached = self.document_cache[cache_key]
            if (datetime.now() - cached["timestamp"]).seconds < 3600:
                return cached["data"]

        # Fetch fresh metadata
        response = await self.arcade.tools.execute(
            tool_name="GoogleDrive.GetFileMetadata",
            input={"file_id": document_id},
            user_id=customer_id
        )

        self.document_cache[cache_key] = {
            "data": response.output,
            "timestamp": datetime.now()
        }

        return response.output

    async def batch_process_documents(self, customer_id: str, document_ids: List[str]) -> List[Dict]:
        """Process multiple documents in optimized batches"""

        results = []

        for i in range(0, len(document_ids), self.batch_size):
            batch = document_ids[i:i + self.batch_size]

            batch_tasks = [
                self.process_document(customer_id, doc_id)
                for doc_id in batch
            ]

            batch_results = await asyncio.gather(*batch_tasks)
            results.extend(batch_results)

            # Rate limiting pause between batches
            await asyncio.sleep(0.5)

        return results

Monitoring and Error Handling

Comprehensive Error Recovery

Handle authentication failures and document access issues gracefully:

class KYCErrorHandler:
    def __init__(self):
        self.retry_limits = {
            "auth_failed": 3,
            "rate_limit": 5,
            "document_access": 2
        }
        self.retry_counts = {}

    async def handle_kyc_error(self, error: Exception, context: Dict) -> Dict:
        """Intelligent error handling with recovery strategies"""

        error_type = self.classify_error(error)
        customer_id = context.get("customer_id")

        # Track retry attempts
        retry_key = f"{customer_id}:{error_type}"
        self.retry_counts[retry_key] = self.retry_counts.get(retry_key, 0) + 1

        if self.retry_counts[retry_key] > self.retry_limits.get(error_type, 3):
            return {
                "status": "failed",
                "error": "Maximum retry attempts exceeded",
                "escalate_to": "manual_review"
            }

        # Error-specific recovery strategies
        recovery_strategies = {
            "token_expired": self.refresh_customer_token,
            "insufficient_scope": self.request_additional_scopes,
            "document_not_found": self.search_alternative_documents,
            "rate_limit_exceeded": self.implement_backoff,
            "authorization_required": self.reinitiate_oauth
        }

        strategy = recovery_strategies.get(error_type)

        if strategy:
            return await strategy(customer_id, context)

        return {"status": "unrecoverable_error", "error": str(error)}

    async def implement_backoff(self, customer_id: str, context: Dict) -> Dict:
        """Exponential backoff for rate limiting"""

        retry_count = self.retry_counts.get(f"{customer_id}:rate_limit", 1)
        wait_time = (2 ** retry_count) + random.uniform(0, 1)

        await asyncio.sleep(wait_time)

        return {"status": "retry", "wait_time": wait_time}

Testing Your KYC Agent

Integration Testing Framework

Create comprehensive tests for your KYC intake workflow:

import pytest
from unittest.mock import Mock, patch

class TestKYCIntakeAgent:
    @pytest.fixture
    async def kyc_agent(self):
        """Initialize test agent with mock Arcade client"""
        with patch('arcadepy.Arcade') as mock_arcade:
            agent = KYCIntakeAgent()
            agent.arcade = mock_arcade
            return agent

    async def test_customer_authorization_flow(self, kyc_agent):
        """Test complete authorization workflow"""

        # Mock authorization response
        kyc_agent.arcade.tools.authorize.return_value = Mock(
            status="pending",
            url="https://accounts.google.com/oauth/authorize?..."
        )

        result = await kyc_agent.initiate_kyc_intake("test_customer_123", "customer@example.com")

        assert result["status"] == "authorization_required"
        assert "auth_url" in result

    async def test_document_collection(self, kyc_agent):
        """Test document search and retrieval"""

        # Mock document search results
        kyc_agent.arcade.tools.execute.return_value = Mock(
            output={"files": [
                {"id": "doc123", "name": "passport.pdf", "mimeType": "application/pdf"},
                {"id": "doc456", "name": "bank_statement.pdf", "mimeType": "application/pdf"}
            ]}
        )

        documents = await kyc_agent.search_kyc_documents("test_customer_123", "government_id")

        assert len(documents) == 2
        assert documents[0]["name"] == "passport.pdf"

Conclusion

Building an MCP-authenticated KYC intake agent with Arcade's Google Docs toolkit transforms document collection from a manual, error-prone process into an automated, secure workflow. The platform's OAuth management, tool-calling architecture, and enterprise-grade security features enable financial institutions to meet compliance requirements while providing seamless customer experiences.

Key takeaways for production deployment:

  • Authentication First: Arcade's managed OAuth ensures your AI agent never handles raw credentials
  • Compliance Built-In: Audit logging and granular permissions satisfy regulatory requirements
  • Scale Ready: Multi-customer session management handles thousands of concurrent KYC intakes
  • Security by Design: Token isolation and encryption protect sensitive customer data

For additional implementation details and toolkit documentation, explore the Arcade documentation and toolkit reference. To see example implementations and contribute to the community, visit the Arcade GitHub repository.

SHARE THIS POST

RECENT ARTICLES

Rays decoration image
THOUGHT LEADERSHIP

How to Query Postgres from GPT-5 via Arcade (MCP)

Large language models need structured data access to provide accurate, data-driven insights. This guide demonstrates how to connect GPT-5 to PostgreSQL databases through Arcade's Model Context Protocol implementation, enabling secure database queries without exposing credentials directly to language models. Prerequisites Before implementing database connectivity, ensure you have: * Python 3.8 or higher installed * PostgreSQL database with connection credentials * Arcade API key (free t

Rays decoration image
THOUGHT LEADERSHIP

How to Connect GPT-5 to Slack with Arcade (MCP)

Building AI agents that interact with Slack requires secure OAuth authentication, proper token management, and reliable tool execution. This guide shows you how to connect GPT-5 to Slack using Arcade's Model Context Protocol (MCP) implementation, enabling your agents to send messages, read conversations, and manage channels with production-grade security. Prerequisites Before starting, ensure you have: * Arcade.dev account with API key * Python 3.10+ or Node.js 18+ installed * OpenAI A

Rays decoration image
THOUGHT LEADERSHIP

How to Build a GPT-5 Gmail Agent with Arcade (MCP)

Building AI agents that can access and act on Gmail data represents a significant challenge in production environments. This guide demonstrates how to build a fully functional Gmail agent using OpenAI's latest models through Arcade's Model Context Protocol implementation, enabling secure OAuth-based authentication and real-world email operations. Prerequisites Before starting, ensure you have: * Active Arcade.dev account with API key * Python 3.10 or higher installed * OpenAI API key w

Blog CTA Icon

Get early access to Arcade, and start building now.