CrewAI Integration
Securely relay OTPs to your CrewAI agents. Enable your crew members to complete verification flows without direct access to SMS or email.
Note: The Python SDK is coming soon. This guide shows the integration pattern. The Python SDK will have a similar API.
Overview
Agent OTP helps your CrewAI agents receive verification codes securely:
- Crew member requests an OTP when it needs to complete a verification
- User approves which OTP to share
- OTP is encrypted and delivered to the agent
- OTP is auto-deleted after consumption
Python Example (Coming Soon)
from agent_otp import AgentOTPClient, generate_key_pair, export_public_key
from crewai import Agent, Task, Crew
from crewai_tools import BaseTool
otp = AgentOTPClient(api_key="ak_live_xxxx")
# Generate encryption keys
public_key, private_key = generate_key_pair()
class OTPSignUpTool(BaseTool):
name: str = "Sign Up with OTP"
description: str = "Signs up for a service and handles OTP verification."
def _run(self, service_name: str, email: str, service_url: str) -> str:
# Step 1: Start the sign-up process
self._start_signup(service_url, email)
# Step 2: Request OTP from Agent OTP
request = otp.request_otp(
reason=f"Sign up verification for {service_name}",
expected_sender=service_name,
filter={
"sources": ["email"],
"sender_pattern": f"*@{service_url.split('/')[2]}"
},
public_key=export_public_key(public_key),
wait_for_otp=True,
timeout=120
)
if request.status != "otp_received":
return f"Could not get OTP: {request.status}"
# Step 3: Consume the OTP
result = otp.consume_otp(request.id, private_key)
# Step 4: Complete verification
self._complete_verification(service_url, result.code)
return f"Successfully signed up for {service_name} with {email}"
def _start_signup(self, url: str, email: str):
# Your sign-up implementation
pass
def _complete_verification(self, url: str, code: str):
# Your verification implementation
pass
# Create agents with OTP tools
signup_tool = OTPSignUpTool()
signup_agent = Agent(
role="Account Specialist",
goal="Handle service registrations and verifications",
backstory="Expert at signing up for online services",
tools=[signup_tool],
verbose=True
)
# Create task
signup_task = Task(
description="Sign up for Acme Inc service at https://acme.com with user@example.com",
expected_output="Confirmation that signup was completed",
agent=signup_agent
)
# Create and run crew
crew = Crew(
agents=[signup_agent],
tasks=[signup_task],
verbose=True
)
result = crew.kickoff()OTP Tool Wrapper
Create a wrapper to add OTP capabilities to existing tools:
from agent_otp import AgentOTPClient, generate_key_pair, export_public_key
class OTPEnabledTool(BaseTool):
"""Base class for tools that need OTP verification."""
def __init__(self, otp_client: AgentOTPClient, **kwargs):
super().__init__(**kwargs)
self.otp = otp_client
self.public_key, self.private_key = generate_key_pair()
def request_and_consume_otp(
self,
reason: str,
expected_sender: str = None,
sources: list = None,
timeout: int = 120
) -> str:
"""Request and consume an OTP, returning the code."""
filter_opts = {}
if sources:
filter_opts["sources"] = sources
request = self.otp.request_otp(
reason=reason,
expected_sender=expected_sender,
filter=filter_opts if filter_opts else None,
public_key=export_public_key(self.public_key),
wait_for_otp=True,
timeout=timeout
)
if request.status != "otp_received":
raise Exception(f"Failed to get OTP: {request.status}")
result = self.otp.consume_otp(request.id, self.private_key)
return result.code
class EmailVerificationTool(OTPEnabledTool):
name: str = "Verify Email"
description: str = "Completes email verification for a service"
def _run(self, service_name: str, email: str) -> str:
# Trigger verification email (your implementation)
send_verification_email(service_name, email)
# Get the OTP
code = self.request_and_consume_otp(
reason=f"Email verification for {service_name}",
expected_sender=service_name,
sources=["email"]
)
# Submit the code (your implementation)
submit_verification_code(service_name, code)
return f"Email {email} verified for {service_name}"Crew-Level OTP Management
Manage OTPs at the crew level for coordinated operations:
from agent_otp import AgentOTPClient, generate_key_pair
class OTPManagedCrew(Crew):
"""A crew with centralized OTP management."""
def __init__(self, otp_client: AgentOTPClient, **kwargs):
super().__init__(**kwargs)
self.otp = otp_client
# Shared key pair for the crew
self.public_key, self.private_key = generate_key_pair()
def get_otp(self, reason: str, **kwargs) -> str:
"""Centralized OTP retrieval for any crew member."""
from agent_otp import export_public_key
request = self.otp.request_otp(
reason=reason,
public_key=export_public_key(self.public_key),
wait_for_otp=True,
**kwargs
)
if request.status == "otp_received":
result = self.otp.consume_otp(request.id, self.private_key)
return result.code
raise Exception(f"Failed to get OTP: {request.status}")
# Usage
otp_client = AgentOTPClient(api_key="ak_live_xxxx")
crew = OTPManagedCrew(
otp_client=otp_client,
agents=[signup_agent, verification_agent],
tasks=[signup_task, verify_task],
verbose=True
)
# Any agent in the crew can now use crew.get_otp()
result = crew.kickoff()Handling Multiple OTPs
For workflows that need multiple verifications:
class MultiServiceSignUpTool(OTPEnabledTool):
name: str = "Multi-Service Sign Up"
description: str = "Signs up for multiple services with OTP verification"
def _run(self, services: list) -> str:
results = []
for service in services:
try:
# Start signup
start_signup(service["url"], service["email"])
# Get OTP
code = self.request_and_consume_otp(
reason=f"Sign up for {service['name']}",
expected_sender=service["name"],
sources=["email"]
)
# Complete verification
complete_verification(service["url"], code)
results.append(f"{service['name']}: Success")
except Exception as e:
results.append(f"{service['name']}: Failed - {str(e)}")
return "\n".join(results)Error Handling
from agent_otp import (
OTPNotFoundError,
OTPExpiredError,
OTPAlreadyConsumedError,
OTPApprovalDeniedError,
RateLimitError,
)
class RobustOTPTool(OTPEnabledTool):
name: str = "Robust OTP Tool"
description: str = "Tool with comprehensive error handling"
def _run(self, service: str, action: str) -> str:
try:
code = self.request_and_consume_otp(
reason=f"{action} for {service}",
expected_sender=service
)
return f"Success! Code: {code}"
except OTPApprovalDeniedError:
return "User denied the OTP request"
except OTPExpiredError:
return "OTP request timed out - please try again"
except OTPAlreadyConsumedError:
return "OTP was already used"
except RateLimitError as e:
return f"Rate limited - try again in {e.retry_after}s"
except Exception as e:
return f"Unexpected error: {str(e)}"