Claims intake processes in insurance, healthcare, and legal operations consume significant operational resources. Traditional automation fails because it requires admin-level credentials that create security risks and compliance issues. This guide shows how to build a production-ready claims intake bot using Arcade's Gmail toolkit with delegated-user access patterns.
What You Need
- Arcade API key
- Python 3.8 or higher
- Gmail account for testing
- Development environment with async support
Install the Arcade Python client:
pip install arcadepy
Set your API key:
export ARCADE_API_KEY="your_arcade_api_key"
Delegated-User Access Pattern
Delegated-user access means each bot action operates with individual user credentials, not shared service accounts. This provides:
- Audit trails showing which user triggered each action
- Permission scoping based on user roles
- Elimination of over-privileged bot accounts
- Compliance with data access regulations
Arcade handles OAuth flows, token rotation, and permission scoping automatically.
Bot Architecture
The claims intake bot has three components:
- Email Monitor - Watches incoming Gmail for claims messages
- Claims Processor - Extracts structured data from email content
- Response Handler - Sends confirmations and updates tracking systems
Initialize the Bot
Set up the base client configuration:
from arcadepy import Arcade
class ClaimsIntakeBot:
def __init__(self):
self.client = Arcade()
self.user_sessions = {}
async def setup_user(self, user_id: str):
"""Initialize bot for a specific user"""
auth_response = await self.client.tools.authorize(
tool_name="Gmail.ListEmails",
user_id=user_id
)
if auth_response.status != "completed":
return {
"authorization_required": True,
"url": auth_response.url,
"message": "Complete Gmail authorization to enable claims processing"
}
await self.client.auth.wait_for_completion(auth_response)
self.user_sessions[user_id] = {
"authenticated": True,
"permissions": ["read", "send"]
}
return {"status": "ready"}
Each user authorizes their own Gmail access. Arcade manages token persistence and refresh, so users authenticate once until tokens expire or are revoked.
Monitor Incoming Claims
Build email monitoring with Gmail.ListEmailsByHeader:
async def monitor_claims_inbox(self, user_id: str):
"""Monitor for new claims emails"""
try:
response = await self.client.tools.execute(
tool_name="Gmail.ListEmailsByHeader",
input={
"subject": "claim",
"limit": 50
},
user_id=user_id
)
emails = response.output.value
new_claims = [
email for email in emails
if self._is_claim_submission(email)
]
return new_claims
except Exception as e:
if hasattr(e, 'type') and e.type == "authorization_required":
return await self.setup_user(user_id)
raise
def _is_claim_submission(self, email):
"""Validate email matches claim submission criteria"""
required_keywords = ["claim number", "policy", "incident"]
email_text = email.get("body", "").lower()
return any(keyword in email_text for keyword in required_keywords)
The Gmail.ListEmailsByHeader tool searches by subject, which is more efficient than retrieving all emails. The tool respects user Gmail permissions automatically.
Extract Claims Data
Parse structured claim information from email content:
import re
from datetime import datetime
async def extract_claim_data(self, email_content: dict):
"""Parse claim information from email body"""
body = email_content.get("body", "")
claim_data = {
"claim_number": self._extract_claim_number(body),
"policy_number": self._extract_policy_number(body),
"incident_date": self._extract_date(body),
"description": self._extract_description(body),
"submitter": email_content.get("sender"),
"received_at": datetime.now().isoformat()
}
if not all([claim_data["claim_number"], claim_data["policy_number"]]):
return {"valid": False, "reason": "Missing required fields"}
return {"valid": True, "data": claim_data}
def _extract_claim_number(self, text: str):
"""Extract claim number using pattern matching"""
pattern = r'claim\s*(?:number|#)?\s*:?\s*([A-Z0-9-]+)'
match = re.search(pattern, text, re.IGNORECASE)
return match.group(1) if match else None
def _extract_policy_number(self, text: str):
"""Extract policy number"""
pattern = r'policy\s*(?:number|#)?\s*:?\s*([A-Z0-9-]+)'
match = re.search(pattern, text, re.IGNORECASE)
return match.group(1) if match else None
def _extract_date(self, text: str):
"""Extract incident date"""
pattern = r'incident\s*date\s*:?\s*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})'
match = re.search(pattern, text, re.IGNORECASE)
return match.group(1) if match else None
def _extract_description(self, text: str):
"""Extract incident description"""
pattern = r'description\s*:?\s*(.+?)(?:\n\n|$)'
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
return match.group(1).strip() if match else None
Adapt extraction logic to your specific claim formats. Add LLM-based extraction for unstructured submissions if needed.
Send Confirmation Emails
Send confirmations using Gmail.SendEmail:
async def send_claim_confirmation(
self,
user_id: str,
recipient: str,
claim_data: dict
):
"""Send confirmation email for processed claim"""
subject = f"Claim Confirmation: {claim_data['claim_number']}"
body = f"""
Thank you for submitting your claim.
Claim Details:
- Claim Number: {claim_data['claim_number']}
- Policy Number: {claim_data['policy_number']}
- Incident Date: {claim_data['incident_date']}
- Received: {claim_data['received_at']}
Your claim has been logged and is under review.
You will receive updates via email as processing progresses.
Reference Number: {claim_data['claim_number']}
"""
try:
response = await self.client.tools.execute(
tool_name="Gmail.SendEmail",
input={
"subject": subject,
"body": body,
"recipient": recipient
},
user_id=user_id
)
return {
"sent": True,
"message_id": response.output.value
}
except Exception as e:
return {
"sent": False,
"error": str(e)
}
The bot sends emails from the authenticated user's account, maintaining proper sender attribution for compliance and audit trails.
Manage Drafts for Review
For claims requiring review before sending, use Gmail.WriteDraftEmail:
async def create_claim_draft(
self,
user_id: str,
recipient: str,
claim_data: dict,
requires_review: bool = True
):
"""Create draft for claims needing manual review"""
subject = f"Additional Information Required: Claim {claim_data['claim_number']}"
body = f"""
Dear {recipient},
We are processing your claim {claim_data['claim_number']}.
To proceed, please provide:
- [List required documents]
- [Additional information needed]
Reply to this email with the requested information.
"""
if requires_review:
response = await self.client.tools.execute(
tool_name="Gmail.WriteDraftEmail",
input={
"subject": subject,
"body": body,
"recipient": recipient
},
user_id=user_id
)
return {
"draft_created": True,
"draft_id": response.output.value,
"status": "pending_review"
}
else:
return await self.send_claim_confirmation(
user_id, recipient, claim_data
)
Draft management enables human oversight for edge cases while automating routine confirmations.
Build the Processing Pipeline
Combine components into a complete claims processing pipeline:
async def process_incoming_claims(self, user_id: str):
"""Main processing loop for claims intake"""
if user_id not in self.user_sessions:
setup_result = await self.setup_user(user_id)
if setup_result.get("authorization_required"):
return setup_result
new_claims = await self.monitor_claims_inbox(user_id)
if not new_claims:
return {"processed": 0, "message": "No new claims"}
results = []
for email in new_claims:
extraction_result = await self.extract_claim_data(email)
if not extraction_result["valid"]:
await self._handle_invalid_claim(user_id, email)
continue
claim_data = extraction_result["data"]
await self._store_claim(claim_data)
confirmation = await self.send_claim_confirmation(
user_id=user_id,
recipient=email["sender"],
claim_data=claim_data
)
results.append({
"claim_number": claim_data["claim_number"],
"confirmation_sent": confirmation["sent"]
})
return {
"processed": len(results),
"results": results
}
async def _handle_invalid_claim(self, user_id: str, email: dict):
"""Send rejection email for invalid claims"""
await self.client.tools.execute(
tool_name="Gmail.SendEmail",
input={
"subject": "Claim Submission - Additional Information Required",
"body": "Your claim submission is missing required information...",
"recipient": email["sender"]
},
user_id=user_id
)
async def _store_claim(self, claim_data: dict):
"""Store claim in your database or CRM"""
pass
This pipeline handles monitoring, extraction, validation, storage, and confirmation.
Scale to Multiple Users
Handle multiple claims processors with concurrent sessions:
import asyncio
from typing import Dict
class MultiUserClaimsBot:
def __init__(self):
self.client = Arcade()
self.active_users: Dict[str, ClaimsIntakeBot] = {}
async def register_user(self, user_id: str, user_email: str):
"""Register a claims processor"""
bot = ClaimsIntakeBot()
setup_result = await bot.setup_user(user_id)
if setup_result.get("authorization_required"):
return setup_result
self.active_users[user_id] = bot
return {"registered": True}
async def process_all_users(self):
"""Process claims for all registered users"""
tasks = [
bot.process_incoming_claims(user_id)
for user_id, bot in self.active_users.items()
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
"total_users": len(self.active_users),
"results": results
}
Each user maintains their own authentication context for proper attribution and permission boundaries.
Implement Error Handling
Production bots need robust error handling for authentication failures and API limits:
from typing import Optional
async def execute_with_retry(
self,
tool_name: str,
input_data: dict,
user_id: str,
max_retries: int = 3
) -> Optional[dict]:
"""Execute tool with exponential backoff retry"""
for attempt in range(max_retries):
try:
response = await self.client.tools.execute(
tool_name=tool_name,
input=input_data,
user_id=user_id
)
return response.output.value
except Exception as e:
error_type = getattr(e, 'type', str(type(e).__name__))
if error_type == "authorization_required":
auth_result = await self.setup_user(user_id)
if auth_result.get("authorization_required"):
return None
continue
elif error_type == "rate_limit_exceeded":
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
continue
else:
raise
return None
This pattern handles transient failures without manual intervention. See Arcade's error handling documentation for more error types.
Deploy to Production
Environment Configuration
import os
from dataclasses import dataclass
@dataclass
class BotConfig:
arcade_api_key: str
processing_interval: int = 300
max_concurrent_users: int = 50
retry_attempts: int = 3
@classmethod
def from_env(cls):
return cls(
arcade_api_key=os.getenv("ARCADE_API_KEY"),
processing_interval=int(os.getenv("PROCESSING_INTERVAL", "300")),
max_concurrent_users=int(os.getenv("MAX_CONCURRENT_USERS", "50"))
)
Scheduled Processing
Run the bot on a schedule using a task scheduler:
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def main():
config = BotConfig.from_env()
bot = MultiUserClaimsBot()
user_ids = await load_active_users()
for user_id in user_ids:
await bot.register_user(user_id, user_email=user_id)
scheduler = AsyncIOScheduler()
scheduler.add_job(
bot.process_all_users,
'interval',
seconds=config.processing_interval
)
scheduler.start()
try:
await asyncio.Event().wait()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Monitoring and Logging
Track bot performance:
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class MonitoredClaimsBot(ClaimsIntakeBot):
def __init__(self):
super().__init__()
self.logger = logging.getLogger("claims_bot")
self.metrics = {
"processed": 0,
"errors": 0,
"auth_failures": 0
}
async def process_incoming_claims(self, user_id: str):
start_time = datetime.now()
try:
result = await super().process_incoming_claims(user_id)
self.metrics["processed"] += result.get("processed", 0)
self.logger.info(
f"Processed {result['processed']} claims for user {user_id}"
)
return result
except Exception as e:
self.metrics["errors"] += 1
self.logger.error(f"Error processing claims for {user_id}: {e}")
raise
finally:
duration = (datetime.now() - start_time).total_seconds()
self.logger.info(f"Processing took {duration}s")
Advanced Features
Thread Management
Process entire email threads using Gmail.GetThread:
async def process_claim_thread(self, user_id: str, thread_id: str):
"""Process all messages in a claim thread"""
response = await self.client.tools.execute(
tool_name="Gmail.GetThread",
input={"thread_id": thread_id},
user_id=user_id
)
thread = response.output.value
messages = thread.get("messages", [])
claim_updates = []
for message in messages:
update = await self.extract_claim_data(message)
if update["valid"]:
claim_updates.append(update["data"])
return claim_updates
Attachment Handling
Process claims with document attachments:
async def handle_claim_with_attachments(
self,
user_id: str,
email: dict
):
"""Process claims with document attachments"""
claim_data = await self.extract_claim_data(email)
if email.get("attachments"):
attachment_urls = await self._store_attachments(
email["attachments"],
claim_data["data"]["claim_number"]
)
claim_data["data"]["attachments"] = attachment_urls
return claim_data
Integration with CRM Systems
Connect to claims management platforms by building custom Arcade tools:
async def sync_to_crm(self, claim_data: dict):
"""Sync processed claim to CRM system"""
# Use custom Arcade tools or direct API calls
# to update your claims management system
pass
Test the Bot
Validate functionality before production:
import unittest
class TestClaimsBot(unittest.TestCase):
async def test_claim_extraction(self):
bot = ClaimsIntakeBot()
sample_email = {
"body": """
Claim Number: CLM-2025-001
Policy Number: POL-123456
Incident Date: 01/15/2025
Description: Vehicle collision at intersection
""",
"sender": "john.doe@example.com"
}
result = await bot.extract_claim_data(sample_email)
self.assertTrue(result["valid"])
self.assertEqual(result["data"]["claim_number"], "CLM-2025-001")
self.assertEqual(result["data"]["policy_number"], "POL-123456")
async def test_invalid_claim(self):
bot = ClaimsIntakeBot()
invalid_email = {
"body": "Random email content",
"sender": "someone@example.com"
}
result = await bot.extract_claim_data(invalid_email)
self.assertFalse(result["valid"])
Security Considerations
Token Management
Arcade manages token persistence and refresh automatically. Implement audit logging for compliance:
async def audit_log_access(self, user_id: str, action: str, claim_id: str):
"""Log all data access for compliance"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"action": action,
"claim_id": claim_id,
"ip_address": self._get_user_ip(user_id)
}
await self._store_audit_log(log_entry)
Permission Validation
Verify users have appropriate permissions:
async def validate_user_permissions(self, user_id: str):
"""Check if user has required permissions"""
try:
await self.client.tools.execute(
tool_name="Gmail.ListEmails",
input={"n_emails": 1},
user_id=user_id
)
return True
except Exception:
return False
Integration Patterns
Webhook-Triggered Processing
Process claims immediately upon receipt:
from fastapi import FastAPI, Request
app = FastAPI()
bot = ClaimsIntakeBot()
@app.post("/webhook/gmail")
async def gmail_webhook(request: Request):
"""Handle Gmail webhook notifications"""
data = await request.json()
user_id = data["user_id"]
result = await bot.process_incoming_claims(user_id)
return {"status": "processed", "count": result["processed"]}
Multi-Tenant Deployment
Support multiple organizations with isolated processing:
class TenantClaimsBot:
def __init__(self, tenant_id: str):
self.tenant_id = tenant_id
self.bot = ClaimsIntakeBot()
self.tenant_config = self._load_tenant_config(tenant_id)
def _load_tenant_config(self, tenant_id: str):
"""Load tenant-specific configuration"""
return {
"extraction_rules": self._get_extraction_rules(tenant_id),
"workflow_settings": self._get_workflow_settings(tenant_id)
}
Performance Optimization
Batch Processing
Process multiple emails concurrently:
async def batch_process_claims(self, user_id: str, emails: list):
"""Process multiple claims concurrently"""
tasks = [
self.extract_claim_data(email)
for email in emails
]
results = await asyncio.gather(*tasks)
valid_claims = [
result["data"]
for result in results
if result["valid"]
]
return valid_claims
Session Caching
Cache user sessions to reduce authentication overhead:
from functools import lru_cache
class CachedClaimsBot(ClaimsIntakeBot):
@lru_cache(maxsize=100)
def _get_user_session(self, user_id: str):
"""Cache user session data"""
return self.user_sessions.get(user_id)
Summary
This delegated-user claims intake bot uses Arcade's Gmail toolkit to enable secure, scalable automation with proper permission boundaries. The architecture supports:
- Individual user authentication with automatic token management
- Concurrent processing across multiple claims processors
- Audit-compliant operation with proper attribution
- Production-ready error handling and monitoring
The bot handles routine claim intake while preserving human oversight for edge cases. By eliminating the auth bottleneck, the bot reaches production without compromising security or compliance.
Extend this foundation by building custom tools for your claims management systems or integrating additional toolkits from the Arcade toolkit library.
For code examples and integration patterns, visit the Arcade GitHub repository and documentation.



