Building an intelligent task synchronization system between Asana and Notion has traditionally required managing multiple OAuth flows, handling advanced API integrations, and maintaining authentication infrastructure. With Arcade's AI tool-calling platform and pre-built toolkits for both Asana and Notion, you can build a production-ready task sync agent that securely acts on behalf of users without exposing credentials to your AI.
This guide demonstrates building a Task Manager Sync Agent that automatically keeps Asana tasks and Notion pages in sync, handles bidirectional updates, and maintains consistency across both platforms. The Arcade Notion toolkit enables agents to interact with pages, databases, and workspaces, while the Arcade Asana toolkit provides tools for managing teams, projects, tasks, and workspaces.
Prerequisites and Setup
Before building your sync agent, ensure you have:
- An active Arcade.dev account with API key
- Python 3.8+ or Node.js 16+ installed
- Basic knowledge of async/await patterns
- Asana and Notion accounts for testing
Installing Arcade Client Libraries
Start by installing the Arcade Python client which provides type-safe access to all toolkits:
# Install the Arcade Python client
pip install arcadepy
# For LangGraph integration (optional)
pip install langchain-arcade
For JavaScript/TypeScript projects:
npm install @arcadeai/arcadejs
Setting Up Environment Variables
Create a .env file to store your credentials securely:
ARCADE_API_KEY=your_arcade_api_key_here
OPENAI_API_KEY=your_openai_api_key_here # If using AI orchestration
Overview of the Authentication Architecture
Arcade provides an authorization system that handles OAuth 2.0, API keys, and user tokens needed by AI agents to access external services through tools. This authentication layer is critical for the sync agent as it needs to access both Asana and Notion on behalf of users.
OAuth Flow Management
When you call a tool with client.tools.execute(), Arcade checks for authorization, routes the request to the tool's provider, and returns the tool's response. The sync agent leverages this to handle authentication for both platforms seamlessly:
from arcadepy import Arcade
class TaskSyncAuth:
def __init__(self):
self.client = Arcade() # Automatically finds ARCADE_API_KEY
async def authorize_asana(self, user_id: str):
"""Handle Asana OAuth authorization"""
auth_response = self.client.tools.authorize(
tool_name="Asana.ListTasks",
user_id=user_id
)
if auth_response.status != "completed":
print(f"Authorize Asana: {auth_response.url}")
# Wait for user to complete OAuth
self.client.auth.wait_for_completion(auth_response.id)
async def authorize_notion(self, user_id: str):
"""Handle Notion OAuth authorization"""
auth_response = self.client.tools.authorize(
tool_name="NotionToolkit.SearchByTitle",
user_id=user_id
)
if auth_response.status != "completed":
print(f"Authorize Notion: {auth_response.url}")
self.client.auth.wait_for_completion(auth_response.id)
Working with the Asana Toolkit
The Arcade Asana toolkit provides pre-built tools for managing teams, projects, workspaces, creating and updating tasks, retrieving task data, and managing task attachments.
Fetching Asana Tasks
The sync agent needs to retrieve tasks from Asana workspaces and projects. Here's how to implement task fetching with proper filtering:
class AsanaTaskManager:
def __init__(self, client: Arcade):
self.client = client
async def get_workspace_tasks(self, user_id: str, workspace_id: str = None):
"""Retrieve all incomplete tasks from Asana workspace"""
# First, get available workspaces if not specified
if not workspace_id:
workspaces_response = await self.client.tools.execute(
tool_name="Asana.ListWorkspaces",
input={"limit": 100},
user_id=user_id
)
workspaces = workspaces_response.output.value
if len(workspaces) == 1:
workspace_id = workspaces[0]['id']
else:
# Handle multiple workspaces
return {"error": "Multiple workspaces found", "workspaces": workspaces}
# Get tasks without completed ones
tasks_response = await self.client.tools.execute(
tool_name="Asana.GetTasksWithoutId",
input={
"workspace_id": workspace_id,
"completed": False,
"limit": 20,
"sort_by": "MODIFIED_AT",
"sort_order": "DESCENDING"
},
user_id=user_id
)
return tasks_response.output.value
async def create_asana_task(self, user_id: str, task_data: dict):
"""Create a new task in Asana"""
response = await self.client.tools.execute(
tool_name="Asana.CreateTask",
input={
"name": task_data['title'],
"description": task_data.get('description', ''),
"due_date": task_data.get('due_date'),
"project": task_data.get('project_id'),
"assignee_id": task_data.get('assignee_id'),
"tags": task_data.get('tags', [])
},
user_id=user_id
)
return response.output.value
Managing Task Updates
The sync agent must handle task updates efficiently, tracking changes and preventing infinite sync loops:
async def update_asana_task(self, user_id: str, task_id: str, updates: dict):
"""Update an existing Asana task"""
# Build update payload
update_payload = {"task_id": task_id}
# Map Notion properties to Asana fields
if 'title' in updates:
update_payload['name'] = updates['title']
if 'content' in updates:
update_payload['description'] = updates['content']
if 'completed' in updates:
update_payload['completed'] = updates['completed']
if 'due_date' in updates:
update_payload['due_date'] = updates['due_date']
response = await self.client.tools.execute(
tool_name="Asana.UpdateTask",
input=update_payload,
user_id=user_id
)
return response.output.value
Working with the Notion Toolkit
The Arcade Notion toolkit provides tools for getting page content, creating pages, searching for pages or databases by title, getting metadata, and retrieving workspace structure.
Setting Up Notion Integration
The Notion toolkit requires specific OAuth configuration. With the Arcade Cloud Platform, there's nothing to configure as users will see Arcade as the application requesting permission.
class NotionPageManager:
def __init__(self, client: Arcade):
self.client = client
async def search_notion_pages(self, user_id: str, query: str = None):
"""Search for Notion pages that match task criteria"""
response = await self.client.tools.execute(
tool_name="NotionToolkit.SearchByTitle",
input={
"query": query,
"select": "page", # Only search pages, not databases
"order_by": "descending",
"limit": 100
},
user_id=user_id
)
return response.output.value
async def create_notion_page(self, user_id: str, page_data: dict):
"""Create a new Notion page from Asana task"""
# Find or create parent page for tasks
parent_search = await self.client.tools.execute(
tool_name="NotionToolkit.SearchByTitle",
input={"query": "Tasks"},
user_id=user_id
)
parent_title = "Tasks" # Default parent page
if parent_search.output.value:
parent_title = parent_search.output.value[0]['title']
# Create the page with task content
response = await self.client.tools.execute(
tool_name="NotionToolkit.CreatePage",
input={
"parent_title": parent_title,
"title": page_data['title'],
"content": self._format_task_content(page_data)
},
user_id=user_id
)
return response.output.value
def _format_task_content(self, task_data: dict) -> str:
"""Format Asana task data as Notion markdown content"""
content = []
if task_data.get('description'):
content.append(f"## Description\n{task_data['description']}")
if task_data.get('due_date'):
content.append(f"**Due Date:** {task_data['due_date']}")
if task_data.get('assignee'):
content.append(f"**Assignee:** {task_data['assignee']}")
if task_data.get('tags'):
tags = ", ".join(task_data['tags'])
content.append(f"**Tags:** {tags}")
content.append(f"\n---\n*Synced from Asana Task ID: {task_data.get('id', 'Unknown')}*")
return "\n\n".join(content)
Retrieving and Updating Notion Pages
The sync agent needs to fetch existing Notion pages and update them based on Asana changes:
async def get_notion_page_content(self, user_id: str, page_title: str):
"""Get content of a specific Notion page"""
response = await self.client.tools.execute(
tool_name="NotionToolkit.GetPageContentByTitle",
input={"title": page_title},
user_id=user_id
)
return response.output.value
async def update_notion_page(self, user_id: str, page_id: str, updates: dict):
"""Update existing Notion page with Asana task changes"""
# Get current page metadata
metadata_response = await self.client.tools.execute(
tool_name="NotionToolkit.GetObjectMetadata",
input={"object_id": page_id},
user_id=user_id
)
current_title = metadata_response.output.value.get('title', '')
# Create new page with updated content (Notion doesn't support direct updates)
# Archive old page by renaming it
archived_title = f"[Archived] {current_title}"
# Create updated page
response = await self.client.tools.execute(
tool_name="NotionToolkit.CreatePage",
input={
"parent_title": "Tasks",
"title": updates.get('title', current_title),
"content": self._format_task_content(updates)
},
user_id=user_id
)
return response.output.value
Building the Bidirectional Sync Engine
The core sync engine orchestrates data flow between Asana and Notion, handling conflict resolution and maintaining sync state.
Sync State Management
To prevent infinite loops and track changes, implement a sync state manager:
import json
from datetime import datetime
from typing import Dict, Any
class SyncStateManager:
def __init__(self, storage_path: str = "sync_state.json"):
self.storage_path = storage_path
self.state = self._load_state()
def _load_state(self) -> Dict[str, Any]:
"""Load sync state from persistent storage"""
try:
with open(self.storage_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {"mappings": {}, "last_sync": {}}
def save_state(self):
"""Persist sync state to storage"""
with open(self.storage_path, 'w') as f:
json.dump(self.state, f, indent=2)
def get_mapping(self, asana_id: str) -> str:
"""Get Notion page ID for an Asana task"""
return self.state["mappings"].get(asana_id)
def set_mapping(self, asana_id: str, notion_id: str):
"""Store mapping between Asana task and Notion page"""
self.state["mappings"][asana_id] = notion_id
self.save_state()
def get_last_sync(self, item_id: str) -> str:
"""Get last sync timestamp for an item"""
return self.state["last_sync"].get(item_id)
def update_last_sync(self, item_id: str):
"""Update last sync timestamp"""
self.state["last_sync"][item_id] = datetime.utcnow().isoformat()
self.save_state()
Main Sync Logic
The main synchronization engine coordinates between both platforms:
class TaskManagerSyncAgent:
def __init__(self):
self.client = Arcade()
self.asana = AsanaTaskManager(self.client)
self.notion = NotionPageManager(self.client)
self.state = SyncStateManager()
async def initialize_user(self, user_id: str):
"""Initialize authentication for both platforms"""
print("Initializing Task Sync Agent...")
# Authorize Asana
auth_response = self.client.tools.authorize(
tool_name="Asana.ListWorkspaces",
user_id=user_id
)
if auth_response.status != "completed":
print(f"Please authorize Asana access: {auth_response.url}")
self.client.auth.wait_for_completion(auth_response.id)
# Authorize Notion
notion_auth = self.client.tools.authorize(
tool_name="NotionToolkit.SearchByTitle",
user_id=user_id
)
if notion_auth.status != "completed":
print(f"Please authorize Notion access: {notion_auth.url}")
self.client.auth.wait_for_completion(notion_auth.id)
print("Authentication complete for both platforms!")
async def sync_asana_to_notion(self, user_id: str, workspace_id: str = None):
"""Sync all Asana tasks to Notion pages"""
print("Starting Asana to Notion sync...")
# Get all incomplete tasks from Asana
tasks = await self.asana.get_workspace_tasks(user_id, workspace_id)
if isinstance(tasks, dict) and 'error' in tasks:
return tasks
synced_count = 0
for task in tasks:
task_id = task['id']
task_title = task['name']
# Check if task already has a Notion page
notion_id = self.state.get_mapping(task_id)
if notion_id:
# Update existing Notion page
print(f"Updating Notion page for task: {task_title}")
await self.notion.update_notion_page(
user_id=user_id,
page_id=notion_id,
updates={
'title': task_title,
'description': task.get('notes', ''),
'due_date': task.get('due_on'),
'completed': task.get('completed'),
'tags': [tag['name'] for tag in task.get('tags', [])],
'id': task_id
}
)
else:
# Create new Notion page
print(f"Creating Notion page for task: {task_title}")
notion_page = await self.notion.create_notion_page(
user_id=user_id,
page_data={
'title': task_title,
'description': task.get('notes', ''),
'due_date': task.get('due_on'),
'assignee': task.get('assignee', {}).get('name'),
'tags': [tag['name'] for tag in task.get('tags', [])],
'id': task_id
}
)
# Store mapping
if notion_page and 'id' in notion_page:
self.state.set_mapping(task_id, notion_page['id'])
self.state.update_last_sync(task_id)
synced_count += 1
print(f"Synced {synced_count} tasks from Asana to Notion")
return {"success": True, "synced": synced_count}
Implementing Bidirectional Sync
For true bidirectional synchronization, the agent needs to detect changes in Notion and reflect them back to Asana:
async def sync_notion_to_asana(self, user_id: str):
"""Sync Notion pages back to Asana tasks"""
print("Starting Notion to Asana sync...")
# Search for all task-related pages in Notion
notion_pages = await self.notion.search_notion_pages(
user_id=user_id,
query="Task"
)
synced_count = 0
for page in notion_pages:
page_title = page['title']
page_id = page['id']
# Check if this Notion page is mapped to an Asana task
asana_id = None
for aid, nid in self.state.state["mappings"].items():
if nid == page_id:
asana_id = aid
break
if asana_id:
# Update existing Asana task
print(f"Updating Asana task from Notion: {page_title}")
# Get full page content
content = await self.notion.get_notion_page_content(
user_id=user_id,
page_title=page_title
)
# Parse content and update Asana
await self.asana.update_asana_task(
user_id=user_id,
task_id=asana_id,
updates={
'title': page_title,
'content': content
}
)
else:
# Create new Asana task from Notion page
print(f"Creating Asana task from Notion: {page_title}")
content = await self.notion.get_notion_page_content(
user_id=user_id,
page_title=page_title
)
new_task = await self.asana.create_asana_task(
user_id=user_id,
task_data={
'title': page_title,
'description': content
}
)
if new_task and 'id' in new_task:
self.state.set_mapping(new_task['id'], page_id)
self.state.update_last_sync(page_id)
synced_count += 1
print(f"Synced {synced_count} pages from Notion to Asana")
return {"success": True, "synced": synced_count}
async def run_full_sync(self, user_id: str):
"""Run complete bidirectional sync"""
# Initialize user authentication
await self.initialize_user(user_id)
# Sync Asana to Notion
asana_result = await self.sync_asana_to_notion(user_id)
# Sync Notion to Asana
notion_result = await self.sync_notion_to_asana(user_id)
return {
"asana_to_notion": asana_result,
"notion_to_asana": notion_result,
"timestamp": datetime.utcnow().isoformat()
}
Advanced Features and Optimization
Implementing Intelligent Conflict Resolution
When dealing with bidirectional sync, conflicts are inevitable. Here's an advanced conflict resolution strategy:
class ConflictResolver:
def __init__(self):
self.resolution_strategy = "last_write_wins" # or "manual", "merge"
async def resolve_conflict(self, asana_data: dict, notion_data: dict) -> dict:
"""Resolve conflicts between Asana and Notion data"""
if self.resolution_strategy == "last_write_wins":
# Compare modification timestamps
asana_modified = asana_data.get('modified_at')
notion_modified = notion_data.get('last_edited_time')
if asana_modified > notion_modified:
return {"winner": "asana", "data": asana_data}
else:
return {"winner": "notion", "data": notion_data}
elif self.resolution_strategy == "merge":
# Merge non-conflicting fields
merged = {}
# Take newer title
if asana_data.get('name') != notion_data.get('title'):
merged['title'] = asana_data['name'] if asana_data.get('modified_at') > notion_data.get('last_edited_time') else notion_data['title']
else:
merged['title'] = asana_data.get('name')
# Merge description/content (append changes)
merged['content'] = f"{asana_data.get('notes', '')}\n\n{notion_data.get('content', '')}"
# Take most restrictive due date
if asana_data.get('due_on') and notion_data.get('due_date'):
merged['due_date'] = min(asana_data['due_on'], notion_data['due_date'])
else:
merged['due_date'] = asana_data.get('due_on') or notion_data.get('due_date')
return {"winner": "merged", "data": merged}
Adding Real-Time Sync with Webhooks
For production deployments, implement webhook listeners for real-time synchronization:
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import asyncio
app = FastAPI()
sync_agent = TaskManagerSyncAgent()
@app.post("/webhooks/asana")
async def asana_webhook(request: Request):
"""Handle Asana webhook events"""
body = await request.json()
events = body.get('events', [])
for event in events:
if event['action'] in ['added', 'changed']:
task_id = event['resource']['gid']
user_id = event['user']['gid']
# Trigger sync for specific task
asyncio.create_task(
sync_agent.sync_single_task(user_id, task_id)
)
return JSONResponse({"status": "processed"})
@app.post("/webhooks/notion")
async def notion_webhook(request: Request):
"""Handle Notion webhook events"""
body = await request.json()
if body.get('type') == 'page_updated':
page_id = body['page']['id']
user_id = body['user']['id']
# Trigger reverse sync for specific page
asyncio.create_task(
sync_agent.sync_single_page(user_id, page_id)
)
return JSONResponse({"status": "processed"})
Integration with AI Orchestration Frameworks
The sync agent can be enhanced with AI capabilities using frameworks like LangGraph or LangChain. Arcade tools integrate seamlessly into LangGraph applications:
from langchain_arcade import ArcadeToolManager
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
class IntelligentSyncAgent:
def __init__(self, arcade_api_key: str):
# Initialize Arcade tool manager
self.tool_manager = ArcadeToolManager(api_key=arcade_api_key)
# Get both Asana and Notion tools
self.tools = self.tool_manager.get_tools(
toolkits=["asana", "notion"]
)
# Create AI model with tools
self.model = ChatOpenAI(model="gpt-4").bind_tools(self.tools)
async def intelligent_sync(self, user_id: str, natural_language_request: str):
"""Process sync requests in natural language"""
# Create agent with tools
agent = create_react_agent(
model=self.model,
tools=self.tools
)
# Process natural language request
config = {"configurable": {"user_id": user_id}}
result = await agent.invoke({
"messages": [
("user", natural_language_request)
]
}, config)
return result
Usage Example with Natural Language
intelligent_agent = IntelligentSyncAgent(arcade_api_key="your_key")
# Natural language sync requests
await intelligent_agent.intelligent_sync(
user_id="user@example.com",
natural_language_request="Find all Asana tasks due this week and create corresponding Notion pages with detailed project context"
)
await intelligent_agent.intelligent_sync(
user_id="user@example.com",
natural_language_request="Update all completed tasks in Notion to be marked as completed in Asana"
)
Production Deployment Considerations
Self-Hosted Deployment
For enterprise deployments, Arcade can be deployed using Arcade Deploy with a worker.toml configuration file:
# worker.toml for Task Sync Agent
[[worker]]
[worker.config]
id = "task-sync-worker"
secret = "your-secret-key"
[worker.local_source]
packages = ["./task-sync-agent"]
[worker.env]
ARCADE_API_KEY = "${env:ARCADE_API_KEY}"
DATABASE_URL = "${env:DATABASE_URL}"
[worker.schedule]
# Run sync every hour
cron = "0 * * * *"
Deploy the worker:
arcade deploy --config worker.toml
Monitoring and Observability
Implement comprehensive logging and monitoring for production deployments:
import logging
from datetime import datetime
class SyncMonitor:
def __init__(self):
self.logger = logging.getLogger("task_sync_agent")
self.metrics = {
"total_syncs": 0,
"successful_syncs": 0,
"failed_syncs": 0,
"auth_failures": 0
}
def log_sync_event(self, event_type: str, details: dict):
"""Log sync events for monitoring"""
self.logger.info(f"""
Sync Event: {event_type}
Timestamp: {datetime.utcnow().isoformat()}
User: {details.get('user_id')}
Platform: {details.get('platform')}
Items Synced: {details.get('count', 0)}
Status: {details.get('status')}
""")
# Update metrics
if event_type == "sync_completed":
self.metrics["total_syncs"] += 1
self.metrics["successful_syncs"] += 1
elif event_type == "sync_failed":
self.metrics["total_syncs"] += 1
self.metrics["failed_syncs"] += 1
def get_metrics(self) -> dict:
"""Return current sync metrics"""
return {
**self.metrics,
"success_rate": self.metrics["successful_syncs"] / max(self.metrics["total_syncs"], 1) * 100
}
Security Best Practices
User Data Isolation
Ensure complete isolation between different users' data:
class SecureSyncAgent(TaskManagerSyncAgent):
def __init__(self):
super().__init__()
self.user_contexts = {}
def get_user_context(self, user_id: str):
"""Get or create isolated context for user"""
if user_id not in self.user_contexts:
self.user_contexts[user_id] = {
"asana_auth": False,
"notion_auth": False,
"sync_state": SyncStateManager(f"sync_state_{user_id}.json")
}
return self.user_contexts[user_id]
async def sync_with_isolation(self, user_id: str):
"""Perform sync with complete user isolation"""
context = self.get_user_context(user_id)
# Ensure user-specific state is used
self.state = context["sync_state"]
# Perform sync
return await self.run_full_sync(user_id)
Rate Limiting and Error Handling
Implement robust error handling and rate limiting:
import time
from typing import Callable
class RateLimiter:
def __init__(self, max_calls: int = 100, time_window: int = 60):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
async def execute_with_limit(self, func: Callable, *args, **kwargs):
"""Execute function with rate limiting"""
current_time = time.time()
# Remove old calls outside the time window
self.calls = [t for t in self.calls if current_time - t < self.time_window]
# Check rate limit
if len(self.calls) >= self.max_calls:
wait_time = self.time_window - (current_time - self.calls[0])
print(f"Rate limit reached. Waiting {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
# Execute function
self.calls.append(current_time)
try:
return await func(*args, **kwargs)
except Exception as e:
print(f"Error executing {func.__name__}: {e}")
# Implement exponential backoff for retries
await asyncio.sleep(2 ** len([c for c in self.calls[-5:] if c > current_time - 10]))
return await func(*args, **kwargs)
Testing and Validation
Create comprehensive tests for your sync agent:
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_asana_to_notion_sync():
"""Test syncing tasks from Asana to Notion"""
# Mock Arcade client
with patch('arcadepy.Arcade') as mock_arcade:
mock_client = AsyncMock()
mock_arcade.return_value = mock_client
# Setup mock responses
mock_client.tools.execute.return_value = AsyncMock(
output=AsyncMock(value=[
{
"id": "123",
"name": "Test Task",
"notes": "Test Description",
"due_on": "2024-12-31",
"completed": False
}
])
)
# Initialize agent
agent = TaskManagerSyncAgent()
# Run sync
result = await agent.sync_asana_to_notion("test_user")
# Verify results
assert result["success"] == True
assert result["synced"] == 1
# Verify API calls
assert mock_client.tools.execute.call_count >= 2
Conclusion
Building a Task Manager Sync Agent with Arcade's Asana and Notion toolkits eliminates the typical challenges of multi-platform integrations. By leveraging Arcade's authentication layer and pre-built toolkits, you can focus on building intelligent synchronization logic rather than managing OAuth flows and API integrations.
The sync agent demonstrated here provides:
- Secure Authentication: User credentials are managed by Arcade, never exposed to your application
- Bidirectional Sync: Changes in either platform are automatically reflected in the other
- Conflict Resolution: Intelligent handling of concurrent updates
- Production Ready: Built-in error handling, rate limiting, and monitoring capabilities
- AI Enhancement: Natural language processing for sync commands through LangGraph integration
To get started with your own Task Manager Sync Agent:
- Sign up for an Arcade.dev account
- Install the Arcade Python client
- Review the Asana toolkit documentation and Notion toolkit documentation
- Deploy your agent using the Arcade CLI
With Arcade's platform simplifying authentication processes and providing standardized tool interfaces, you can build sophisticated multi-platform sync agents that scale from prototype to production. The combination of Asana's powerful task management capabilities and Notion's flexible knowledge base creates a comprehensive productivity system that adapts to any workflow.



