Compliance audits require organized, timestamped evidence across SOC 2, ISO 27001, HIPAA, and PCI DSS frameworks. Organizations waste weeks manually searching Google Drive for policies, training records, incident reports, and system documentation. This guide shows you how to build an automated compliance evidence collector using Arcade's Google Drive toolkit.
Prerequisites
Before starting, ensure you have:
- Active Arcade.dev account with API key
- Google Cloud Console project with OAuth 2.0 credentials configured
- Python 3.8 or higher installed
- Basic knowledge of async/await patterns in Python
- Access to Google Drive containing compliance documentation
Why Evidence Collection Fails Manually
Manual compliance evidence collection creates several critical failures:
- Missing documents that cause audit delays or failures
- Version control issues where auditors receive outdated files
- Inconsistent evidence across different compliance frameworks
- Time pressure during active audits leading to rushed submission
- Human error in document categorization and labeling
Organizations pursuing multiple frameworks face exponential complexity. A company maintaining SOC 2, ISO 27001, and HIPAA compliance must produce hundreds of evidence documents, each mapped to specific control requirements. Manual collection for this scenario takes 6-8 weeks of full-time work before each audit.
Installing Arcade SDK
Install the Arcade Python SDK:
pip install arcadepy
Get your API key from the Arcade dashboard. Set it as an environment variable:
export ARCADE_API_KEY="your_api_key_here"
Initialize the client:
from arcadepy import Arcade
client = Arcade() # Reads ARCADE_API_KEY from environment
Configuring Google OAuth Provider
Navigate to Google Cloud Console and complete these steps:
- Create a new project or select existing project
- Enable Google Drive API
- Create OAuth 2.0 credentials (Web application)
- Note Client ID and Client Secret
Add the provider in Arcade dashboard:
- Go to OAuth → Providers
- Click "Add OAuth Provider"
- Select "Google" from Included Providers tab
- Enter unique provider ID (e.g., "compliance-drive-provider")
- Paste Client ID and Client Secret
- Copy generated Redirect URL
- Add Redirect URL to Google app's Authorized redirect URIs
For full documentation, see Google auth provider setup.
Setting Up Authentication
Implementing Organization Authentication
Each compliance team needs isolated Google Drive access. Create an authentication handler:
from arcadepy import Arcade
from datetime import datetime
class ComplianceCollector:
def __init__(self):
self.client = Arcade()
self.auth_cache = {}
async def authenticate_org(self, org_id: str) -> dict:
"""Authenticate organization's Google Drive access"""
auth_response = await self.client.auth.start(
user_id=org_id,
provider="google",
scopes=[
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/drive.metadata.readonly"
]
)
if auth_response.status != "completed":
return {
"requires_auth": True,
"url": auth_response.url,
"org": org_id
}
await self.client.auth.wait_for_completion(auth_response)
self.auth_cache[org_id] = {
"authenticated_at": datetime.now(),
"scopes": ["drive.readonly", "drive.metadata.readonly"]
}
return {"authenticated": True, "org": org_id}
The drive.readonly scope allows reading all files. The drive.metadata.readonly scope provides access to file metadata, modification dates, and folder structures without content access. This maintains least-privilege security for compliance use cases.
Learn more about Arcade's authentication architecture.
Handling OAuth Callbacks
When authentication is incomplete, users must visit the authorization URL. After completion, verify the authentication:
async def verify_authentication(self, org_id: str) -> bool:
"""Verify organization authentication status"""
if org_id in self.auth_cache:
auth_time = self.auth_cache[org_id]["authenticated_at"]
# Check if auth is less than 24 hours old
age_hours = (datetime.now() - auth_time).total_seconds() / 3600
return age_hours < 24
return False
Arcade handles token refresh automatically. Your application code never accesses raw OAuth tokens.
Building Evidence Search Functions
Searching by File Name Pattern
Compliance documents follow naming conventions. Search for files matching specific patterns:
async def search_by_name(
self,
org_id: str,
pattern: str,
file_types: list = None
) -> list:
"""Search Drive files by name pattern"""
# Build query
query_parts = [f"name contains '{pattern}'"]
if file_types:
mime_filters = []
for ftype in file_types:
if ftype == "pdf":
mime_filters.append("mimeType='application/pdf'")
elif ftype == "doc":
mime_filters.append("mimeType='application/vnd.google-apps.document'")
elif ftype == "sheet":
mime_filters.append("mimeType='application/vnd.google-apps.spreadsheet'")
if mime_filters:
query_parts.append(f"({' or '.join(mime_filters)})")
query = " and ".join(query_parts)
result = await self.client.tools.execute(
tool_name="GoogleDrive.SearchFiles",
input={
"query": query,
"order_by": "modifiedTime desc",
"page_size": 50
},
user_id=org_id
)
return result.output.get("files", []) if result.output else []
The GoogleDrive.SearchFiles tool accepts Google Drive API query syntax. See available query operators in Google Drive reference documentation.
Searching by Content
For policies and procedures, search document content:
async def search_by_content(
self,
org_id: str,
search_term: str,
modified_after: str = None
) -> list:
"""Search files by content text"""
query = f"fullText contains '{search_term}'"
if modified_after:
query += f" and modifiedTime > '{modified_after}'"
result = await self.client.tools.execute(
tool_name="GoogleDrive.SearchFiles",
input={
"query": query,
"order_by": "modifiedTime desc",
"page_size": 30
},
user_id=org_id
)
return result.output.get("files", []) if result.output else []
The fullText parameter searches inside document content, not just file names. This finds evidence even when file names don't match search patterns.
Finding Folder Structures
Compliance evidence often lives in organized folder hierarchies:
async def find_folders(self, org_id: str, folder_name: str) -> list:
"""Find folders matching name pattern"""
result = await self.client.tools.execute(
tool_name="GoogleDrive.SearchFiles",
input={
"query": f"name contains '{folder_name}' and mimeType='application/vnd.google-apps.folder'",
"order_by": "name"
},
user_id=org_id
)
return result.output.get("files", []) if result.output else []
Listing Files in Specific Folders
After identifying folders, list their contents:
async def list_folder_contents(
self,
org_id: str,
folder_id: str
) -> list:
"""List all files in specific folder"""
result = await self.client.tools.execute(
tool_name="GoogleDrive.SearchFiles",
input={
"query": f"'{folder_id}' in parents",
"order_by": "name"
},
user_id=org_id
)
return result.output.get("files", []) if result.output else []
Mapping Framework Requirements to Evidence
SOC 2 Evidence Requirements
SOC 2 Type 2 audits require specific evidence categories:
async def collect_soc2_evidence(self, org_id: str) -> dict:
"""Collect evidence for SOC 2 compliance"""
requirements = {
"policies": [
"security policy",
"acceptable use policy",
"access control policy",
"incident response policy"
],
"procedures": [
"backup procedure",
"change management",
"vulnerability management",
"security monitoring"
],
"training": [
"security awareness training",
"onboarding training",
"annual security training"
],
"logs": [
"access log",
"audit log",
"change log",
"incident log"
],
"assessments": [
"risk assessment",
"vendor assessment",
"penetration test",
"vulnerability scan"
]
}
collected = {}
for category, terms in requirements.items():
category_docs = []
for term in terms:
docs = await self.search_by_content(org_id, term)
for doc in docs:
category_docs.append({
"id": doc["id"],
"name": doc["name"],
"modified": doc["modifiedTime"],
"link": doc["webViewLink"],
"requirement": term
})
collected[category] = category_docs
return collected
ISO 27001 Evidence Requirements
ISO 27001 requires different evidence types:
async def collect_iso27001_evidence(self, org_id: str) -> dict:
"""Collect evidence for ISO 27001 compliance"""
requirements = {
"isms_documentation": [
"information security policy",
"scope statement",
"risk assessment methodology",
"statement of applicability"
],
"asset_management": [
"asset inventory",
"asset classification",
"acceptable use",
"return of assets"
],
"access_control": [
"access control policy",
"user registration",
"password management",
"access review"
],
"operations": [
"operational procedures",
"change management",
"capacity management",
"backup policy"
],
"business_continuity": [
"business continuity plan",
"disaster recovery plan",
"continuity testing",
"backup restoration"
]
}
collected = {}
for category, terms in requirements.items():
category_docs = []
for term in terms:
docs = await self.search_by_content(org_id, term)
category_docs.extend([{
"id": d["id"],
"name": d["name"],
"modified": d["modifiedTime"],
"link": d["webViewLink"],
"requirement": term
} for d in docs])
collected[category] = category_docs
return collected
HIPAA Evidence Requirements
Healthcare organizations need HIPAA-specific evidence:
async def collect_hipaa_evidence(self, org_id: str) -> dict:
"""Collect evidence for HIPAA compliance"""
requirements = {
"administrative": [
"security management process",
"risk analysis",
"workforce security",
"information access management",
"security awareness training"
],
"physical": [
"facility access controls",
"workstation security",
"device and media controls"
],
"technical": [
"access control",
"audit controls",
"integrity controls",
"transmission security",
"encryption"
],
"privacy": [
"privacy policy",
"breach notification",
"patient rights",
"minimum necessary"
]
}
collected = {}
for category, terms in requirements.items():
category_docs = []
for term in terms:
docs = await self.search_by_content(org_id, term)
category_docs.extend([{
"id": d["id"],
"name": d["name"],
"modified": d["modifiedTime"],
"link": d["webViewLink"],
"requirement": term
} for d in docs])
collected[category] = category_docs
return collected
Version Control and Evidence Freshness
Getting Latest Document Versions
Auditors require current document versions:
async def get_latest_version(
self,
org_id: str,
doc_name: str
) -> dict:
"""Get most recent version of document"""
results = await self.search_by_name(org_id, doc_name)
if not results:
return {"error": f"No documents found matching '{doc_name}'"}
# Sort by modification time
sorted_results = sorted(
results,
key=lambda x: x["modifiedTime"],
reverse=True
)
return {
"latest": sorted_results[0],
"all_versions": sorted_results,
"version_count": len(sorted_results)
}
Filtering by Date Range
Collect evidence modified within specific timeframes:
async def get_evidence_by_date_range(
self,
org_id: str,
search_term: str,
start_date: str,
end_date: str
) -> list:
"""Get evidence modified within date range"""
query = f"fullText contains '{search_term}' and modifiedTime >= '{start_date}' and modifiedTime <= '{end_date}'"
result = await self.client.tools.execute(
tool_name="GoogleDrive.SearchFiles",
input={
"query": query,
"order_by": "modifiedTime desc"
},
user_id=org_id
)
return result.output.get("files", []) if result.output else []
Validating Evidence Completeness
Checking Required Evidence
Verify all required evidence exists before audit submission:
async def validate_framework_evidence(
self,
org_id: str,
framework: str
) -> dict:
"""Validate evidence completeness for framework"""
if framework == "soc2":
evidence = await self.collect_soc2_evidence(org_id)
elif framework == "iso27001":
evidence = await self.collect_iso27001_evidence(org_id)
elif framework == "hipaa":
evidence = await self.collect_hipaa_evidence(org_id)
else:
return {"error": f"Unknown framework: {framework}"}
validation = {
"framework": framework,
"org": org_id,
"complete": True,
"missing": [],
"incomplete": []
}
for category, docs in evidence.items():
if not docs:
validation["complete"] = False
validation["missing"].append(category)
elif len(docs) < 3: # Expect at least 3 docs per category
validation["incomplete"].append({
"category": category,
"found": len(docs),
"expected_minimum": 3
})
return validation
Generating Evidence Reports
Create structured reports for audit submission:
async def generate_evidence_report(
self,
org_id: str,
framework: str
) -> dict:
"""Generate comprehensive evidence report"""
if framework == "soc2":
evidence = await self.collect_soc2_evidence(org_id)
elif framework == "iso27001":
evidence = await self.collect_iso27001_evidence(org_id)
elif framework == "hipaa":
evidence = await self.collect_hipaa_evidence(org_id)
else:
return {"error": f"Unknown framework: {framework}"}
report = {
"organization": org_id,
"framework": framework,
"generated_at": datetime.now().isoformat(),
"summary": {
"total_categories": len(evidence),
"total_documents": sum(len(docs) for docs in evidence.values()),
"categories": {}
},
"evidence_by_category": {}
}
for category, docs in evidence.items():
report["summary"]["categories"][category] = len(docs)
report["evidence_by_category"][category] = [
{
"name": doc["name"],
"modified": doc["modified"],
"link": doc["link"],
"requirement": doc["requirement"]
}
for doc in docs
]
return report
Implementing Continuous Monitoring
Scheduled Evidence Collection
Set up periodic evidence collection:
import asyncio
async def run_continuous_monitoring(
self,
org_id: str,
framework: str,
interval_hours: int = 24
):
"""Run continuous evidence monitoring"""
while True:
try:
# Verify authentication
if not await self.verify_authentication(org_id):
print(f"Re-authentication required for {org_id}")
auth_result = await self.authenticate_org(org_id)
if auth_result.get("requires_auth"):
print(f"Visit: {auth_result['url']}")
await asyncio.sleep(3600) # Wait 1 hour for auth
continue
# Collect evidence
evidence = await self.generate_evidence_report(org_id, framework)
# Validate completeness
validation = await self.validate_framework_evidence(org_id, framework)
# Save report
filename = f"{org_id}_{framework}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
import json
json.dump({
"evidence": evidence,
"validation": validation
}, f, indent=2)
print(f"Collected evidence for {org_id}: {evidence['summary']['total_documents']} documents")
if not validation["complete"]:
print(f"Missing categories: {validation['missing']}")
# Wait for next collection
await asyncio.sleep(interval_hours * 3600)
except Exception as e:
print(f"Error during monitoring: {str(e)}")
await asyncio.sleep(300) # Retry after 5 minutes
Change Detection
Track modifications to evidence documents:
async def detect_changes_since(
self,
org_id: str,
since_date: str
) -> dict:
"""Detect evidence changes since date"""
result = await self.client.tools.execute(
tool_name="GoogleDrive.SearchFiles",
input={
"query": f"modifiedTime > '{since_date}'",
"order_by": "modifiedTime desc"
},
user_id=org_id
)
if not result.output or "files" not in result.output:
return {"changes": [], "count": 0}
changes = []
for file in result.output["files"]:
changes.append({
"name": file["name"],
"id": file["id"],
"modified": file["modifiedTime"],
"modifier": file.get("lastModifyingUser", {}).get("displayName", "Unknown"),
"link": file["webViewLink"]
})
return {
"changes": changes,
"count": len(changes),
"since": since_date
}
Error Handling
Handling Authentication Errors
Implement robust authentication error handling:
async def safe_execute(
self,
org_id: str,
tool_name: str,
input_params: dict,
max_retries: int = 2
):
"""Execute tool with automatic authentication retry"""
for attempt in range(max_retries):
try:
result = await self.client.tools.execute(
tool_name=tool_name,
input=input_params,
user_id=org_id
)
return result
except Exception as e:
error_msg = str(e).lower()
if "authorization" in error_msg and attempt < max_retries - 1:
# Re-authenticate
auth_result = await self.authenticate_org(org_id)
if not auth_result.get("authenticated"):
raise Exception(f"Authentication failed for {org_id}")
continue
else:
raise
Rate Limit Handling
Google Drive API imposes rate limits. Handle them with exponential backoff:
async def execute_with_backoff(
self,
org_id: str,
tool_name: str,
input_params: dict
):
"""Execute with exponential backoff for rate limits"""
max_retries = 5
base_delay = 1
for attempt in range(max_retries):
try:
return await self.client.tools.execute(
tool_name=tool_name,
input=input_params,
user_id=org_id
)
except Exception as e:
if "rate limit" in str(e).lower() and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f"Rate limit hit. Waiting {delay}s before retry {attempt + 1}/{max_retries}")
await asyncio.sleep(delay)
else:
raise
Production Implementation
Complete Evidence Collector
Here's a production-ready implementation:
from arcadepy import Arcade
from datetime import datetime
import asyncio
import json
import os
class ProductionEvidenceCollector:
def __init__(self, output_dir: str = "./evidence"):
self.client = Arcade()
self.output_dir = output_dir
self.auth_cache = {}
os.makedirs(output_dir, exist_ok=True)
async def authenticate_org(self, org_id: str) -> dict:
auth_response = await self.client.auth.start(
user_id=org_id,
provider="google",
scopes=[
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/drive.metadata.readonly"
]
)
if auth_response.status != "completed":
return {"requires_auth": True, "url": auth_response.url}
await self.client.auth.wait_for_completion(auth_response)
self.auth_cache[org_id] = datetime.now()
return {"authenticated": True}
async def search_evidence(
self,
org_id: str,
search_term: str
) -> list:
result = await self.client.tools.execute(
tool_name="GoogleDrive.SearchFiles",
input={
"query": f"fullText contains '{search_term}'",
"order_by": "modifiedTime desc",
"page_size": 20
},
user_id=org_id
)
return result.output.get("files", []) if result.output else []
async def collect_soc2(self, org_id: str) -> dict:
requirements = {
"policies": ["security policy", "access control", "incident response"],
"procedures": ["backup", "change management", "monitoring"],
"training": ["security awareness", "onboarding"],
"logs": ["access log", "audit log", "incident log"]
}
collected = {}
for category, terms in requirements.items():
docs = []
for term in terms:
results = await self.search_evidence(org_id, term)
docs.extend([{
"name": r["name"],
"id": r["id"],
"modified": r["modifiedTime"],
"link": r["webViewLink"],
"matched": term
} for r in results])
collected[category] = docs
return collected
async def run_collection(self, org_id: str, framework: str):
print(f"Starting collection: {org_id} - {framework}")
# Authenticate
if org_id not in self.auth_cache:
auth = await self.authenticate_org(org_id)
if auth.get("requires_auth"):
print(f"Authorization required: {auth['url']}")
return
# Collect evidence
if framework.lower() == "soc2":
evidence = await self.collect_soc2(org_id)
else:
print(f"Framework {framework} not implemented")
return
# Generate report
report = {
"org": org_id,
"framework": framework,
"collected_at": datetime.now().isoformat(),
"total_docs": sum(len(docs) for docs in evidence.values()),
"evidence": evidence
}
# Save to file
filename = f"{org_id}_{framework}_{datetime.now().strftime('%Y%m%d')}.json"
filepath = os.path.join(self.output_dir, filename)
with open(filepath, 'w') as f:
json.dump(report, f, indent=2)
print(f"Collected {report['total_docs']} documents")
print(f"Report saved: {filepath}")
return report
async def main():
collector = ProductionEvidenceCollector()
await collector.run_collection("your_org_id", "soc2")
if __name__ == "__main__":
asyncio.run(main())
Deployment Configuration
Docker Deployment
Create a Dockerfile for containerized deployment:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY collector.py .
ENV ARCADE_API_KEY=""
ENV OUTPUT_DIR="/data/evidence"
ENV COLLECTION_INTERVAL="24"
VOLUME ["/data/evidence"]
CMD ["python", "collector.py"]
Environment Variables
Use environment-based configuration:
import os
class Config:
ARCADE_API_KEY = os.getenv("ARCADE_API_KEY")
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./evidence")
COLLECTION_INTERVAL = int(os.getenv("COLLECTION_INTERVAL", "24"))
@classmethod
def validate(cls):
if not cls.ARCADE_API_KEY:
raise ValueError("ARCADE_API_KEY environment variable required")
return True
Security Best Practices
Token Management
Never log or expose OAuth tokens. Arcade manages token storage, rotation, and refresh:
# Correct: Let Arcade handle tokens
result = await self.client.tools.execute(
tool_name="GoogleDrive.SearchFiles",
input={"query": "compliance"},
user_id=org_id
)
# Never do this:
# token = self.client.get_token(org_id) # Don't access tokens
# print(token) # Never log tokens
Audit Logging
Log all evidence collection activity:
async def log_activity(
self,
org_id: str,
action: str,
details: dict
):
log_entry = {
"timestamp": datetime.now().isoformat(),
"org": org_id,
"action": action,
"details": details
}
# Write to audit log
log_file = f"audit_{datetime.now().strftime('%Y%m')}.log"
with open(log_file, 'a') as f:
f.write(json.dumps(log_entry) + "\n")
Least Privilege Scopes
Request only necessary scopes. For evidence collection, use read-only access:
# Good: Read-only scopes
scopes = [
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/drive.metadata.readonly"
]
# Avoid: Excessive permissions
# scopes = ["https://www.googleapis.com/auth/drive"] # Too broad
Additional Resources
- Arcade Documentation
- Google Drive Toolkit Reference
- Authentication Guide
- Tool Development
- API Reference
- GitHub Repository
Next Steps
After implementing the evidence collector, consider these enhancements:
Multi-Framework Support: Add PCI DSS and FedRAMP evidence mapping by extending the framework-specific search functions.
Evidence Quality Scoring: Implement automated scoring that evaluates document freshness, completeness, and relevance to control requirements.
Automated Remediation: When gaps are detected, generate specific recommendations for compliance teams about missing documentation.
Notification System: Integrate webhook notifications to alert compliance teams when evidence collection completes or gaps are found.
Custom Tool Development: Use the Arcade Tool SDK to build custom evidence processing tools for your specific compliance needs.



