Sdks
Python SDK
Official Python SDK for daimon.email
Installation
pip install daimon-emailInfo
The Python SDK is automatically generated from our OpenAPI spec using Fern. It includes type hints and is updated with every API release.
Quick Start
Initialize the Client
from daimon_email import DaimonClient
client = DaimonClient(
api_key=os.environ['DAIMON_API_KEY'] # Never hardcode your API key
)Warning
For free tier (no API key yet), omit the api_key parameter. You'll receive an API key when creating your first inbox.
Create Your First Inbox
# No API key required for first inbox
response = client.inboxes.create(
username='my-agent',
client_id='deployment-prod-1' # Idempotency key
)
inbox = response['result']
print('Email address:', inbox['address'])
# my-agent@daimon.email
print('API key:', inbox['account_api_key'])
# dm_free_abc123...
# Save this API key for future requests
os.environ['DAIMON_API_KEY'] = inbox['account_api_key']List Messages
messages = client.inboxes.messages.list(inbox['id'])
for message in messages['result']:
print('From:', message['from'])
print('Subject:', message['subject'])
print('Body:', message['body'])
print('---')Send an Email
try:
sent = client.inboxes.send(inbox['id'], {
'to': 'customer@example.com',
'subject': 'Welcome to our service',
'body': 'Thanks for signing up!',
'client_id': 'welcome-email-user-123' # Idempotency
})
print('Message sent:', sent['result']['id'])
except DaimonError as e:
if e.code == 'SEND_REQUIRES_PAID':
# Free tier can't send - guide agent to upgrade
print('Upgrade needed:', e.upgrade_context['operator_action_url'])Core Features
Inboxes
inbox = client.inboxes.create(
username='support-bot',
client_id='deploy-123-inbox',
metadata={
'purpose': 'customer support',
'deployment': 'production'
}
)inbox = client.inboxes.get('inbox_abc123')
print(inbox['result']['address'])inboxes = client.inboxes.list(limit=50, offset=0)
for inbox in inboxes['result']:
print(inbox['address'])updated = client.inboxes.update('inbox_abc123', {
'metadata': {
'status': 'active',
'last_checked': datetime.now().isoformat()
}
})client.inboxes.delete('inbox_abc123')
# Inbox and all messages deleted immediatelyMessages
messages = client.inboxes.messages.list(
'inbox_abc123',
limit=20,
unread_only=True
)message = client.inboxes.messages.get('inbox_abc123', 'msg_xyz789')
print('From:', message['result']['from'])
print('Subject:', message['result']['subject'])
print('Plain text:', message['result']['body'])
print('HTML:', message['result']['body_html'])
# Auto-detected CTA links (e.g., "Verify Email")
for cta in message['result']['cta_links']:
print(f"CTA: {cta['text']} -> {cta['url']}")sent = client.inboxes.send('inbox_abc123', {
'to': 'user@example.com',
'subject': 'Password Reset',
'body': 'Click here to reset: https://app.example.com/reset/token123',
'client_id': 'reset-user-456'
})reply = client.inboxes.messages.reply('inbox_abc123', 'msg_xyz789', {
'body': 'Thanks for your message! We will get back to you soon.',
'client_id': 'reply-msg-xyz789'
})
# Automatically includes In-Reply-To and References headersforwarded = client.inboxes.messages.forward('inbox_abc123', 'msg_xyz789', {
'to': 'team@example.com',
'note': 'Please review this support request.',
'client_id': 'forward-msg-xyz789'
})Threads
threads = client.threads.list('inbox_abc123', limit=20)
for thread in threads['result']:
print('Subject:', thread['subject'])
print('Message count:', thread['message_count'])
print('Participants:', thread['participants'])thread = client.threads.get('thread_abc123')
# All messages in the thread, sorted chronologically
for message in thread['result']['messages']:
print(f"{message['from']}: {message['subject']}")# Mark as read, add labels, etc.
client.threads.update('thread_abc123', {
'read': True,
'labels': ['support', 'urgent']
})Webhooks
webhook = client.webhooks.create({
'url': 'https://your-server.com/webhooks/daimon',
'events': ['message.received', 'message.sent'],
'inbox_id': 'inbox_abc123', # Optional: filter to specific inbox
'client_id': 'webhook-prod-messages'
})
print('Webhook secret:', webhook['result']['secret'])
# Save this to verify webhook signatureswebhooks = client.webhooks.list()# Send a test event to verify your endpoint
client.webhooks.test('webhook_abc123')client.webhooks.delete('webhook_abc123')Account & Capabilities
account = client.account.get()
print('Tier:', account['result']['tier'])
print('Email:', account['result']['email'])caps = client.account.capabilities()
print('Can send:', 'send_messages' in caps['result']['can'])
print('Send quota:', caps['result']['limits']['sends_per_day'])
print('Used today:', caps['result']['usage']['sends_today'])# Generate magic upgrade link for operator
link = client.account.get_upgrade_link({
'desired_tier': 'developer',
'return_url': 'https://your-app.com/settings'
})
print('Upgrade URL:', link['result']['url'])
# Send this to your operator# Trigger operator notification
client.account.notify_operator({
'message': 'I need sending capabilities to complete my task.',
'action_required': 'upgrade_to_developer',
'context': {
'reason': 'send_limit_exceeded',
'current_tier': 'free'
}
})Advanced Usage
Webhook Signature Verification
Verify incoming webhooks are authentic:
import hmac
import hashlib
def verify_webhook_signature(payload: bytes, signature: str, secret: str) -> bool:
expected_signature = hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
# Flask example
from flask import Flask, request
app = Flask(__name__)
@app.route('/webhooks/daimon', methods=['POST'])
def handle_webhook():
signature = request.headers.get('X-Daimon-Signature')
payload = request.get_data()
if not verify_webhook_signature(payload, signature, os.environ['WEBHOOK_SECRET']):
return 'Invalid signature', 401
event = request.json
if event['event'] == 'message.received':
print('New message:', event['data']['subject'])
# Process message
return '', 200Error Handling
from daimon_email import DaimonError
try:
client.inboxes.send(inbox_id, message)
except DaimonError as e:
# Known API error
print('API error:', e.code)
print('Message:', e.message)
if hasattr(e, 'upgrade_context') and e.upgrade_context:
# Tier upgrade required
print('Upgrade URL:', e.upgrade_context['operator_action_url'])
if hasattr(e, 'retry_after') and e.retry_after:
# Rate limited
print('Retry after:', e.retry_after, 'seconds')
except Exception as e:
# Network error, timeout, etc.
print('Unexpected error:', e)Retry Logic with Exponential Backoff
import time
def send_with_retry(client, inbox_id, message, max_retries=3):
attempt = 0
while attempt < max_retries:
try:
return client.inboxes.send(inbox_id, message)
except DaimonError as e:
if e.status == 429:
# Rate limited - use provided retry_after
delay = e.retry_after
print(f'Rate limited. Waiting {delay}s...')
time.sleep(delay)
attempt += 1
else:
# Other error - raise
raise
raise Exception('Max retries exceeded')Polling with Adaptive Intervals
import time
class MessagePoller:
def __init__(self):
self.interval = 5 # Start at 5 seconds
self.min_interval = 5
self.max_interval = 60
def start(self, inbox_id, on_message):
while True:
messages = client.inboxes.messages.list(
inbox_id,
unread_only=True
)
if messages['result']:
# New messages - reset to fast polling
self.interval = self.min_interval
for msg in messages['result']:
on_message(msg)
else:
# No messages - slow down
self.interval = min(self.interval * 1.5, self.max_interval)
time.sleep(self.interval)
# Usage
poller = MessagePoller()
poller.start('inbox_abc123', lambda message: print('New message:', message['subject']))Framework Examples
FastAPI
from fastapi import FastAPI, Request
from daimon_email import DaimonClient
import os
app = FastAPI()
client = DaimonClient(api_key=os.environ['DAIMON_API_KEY'])
@app.post('/webhooks/daimon')
async def handle_webhook(request: Request):
event = await request.json()
if event['event'] == 'message.received':
message = event['data']
print('Received:', message['subject'])
# Auto-reply
await client.inboxes.messages.reply(
message['inbox_id'],
message['id'],
{
'body': 'Thank you for your message. We will respond within 24 hours.',
'client_id': f"auto-reply-{message['id']}"
}
)
return {'success': True}Django
# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from daimon_email import DaimonClient
import os
import json
client = DaimonClient(api_key=os.environ['DAIMON_API_KEY'])
@csrf_exempt
def daimon_webhook(request):
if request.method != 'POST':
return JsonResponse({'error': 'Method not allowed'}, status=405)
event = json.loads(request.body)
if event['event'] == 'message.received':
message = event['data']
# Handle message
return JsonResponse({'success': True})Flask
from flask import Flask, request, jsonify
from daimon_email import DaimonClient
import os
app = Flask(__name__)
client = DaimonClient(api_key=os.environ['DAIMON_API_KEY'])
@app.route('/webhooks/daimon', methods=['POST'])
def handle_webhook():
event = request.json
if event['event'] == 'message.received':
message = event['data']
# Handle message
return jsonify({'success': True})
if __name__ == '__main__':
app.run(port=3000)Async Support
The SDK supports async/await for non-blocking I/O:
import asyncio
from daimon_email import AsyncDaimonClient
async def main():
client = AsyncDaimonClient(api_key=os.environ['DAIMON_API_KEY'])
# All methods are async
inbox = await client.inboxes.get('inbox_abc123')
messages = await client.inboxes.messages.list(inbox['result']['id'])
# Concurrent requests
inboxes, webhooks = await asyncio.gather(
client.inboxes.list(),
client.webhooks.list()
)
asyncio.run(main())Type Hints
The SDK includes type hints for better IDE support:
from typing import Dict, List
from daimon_email import DaimonClient
client: DaimonClient = DaimonClient(api_key='...')
inbox: Dict = client.inboxes.get('inbox_abc123')
messages: List[Dict] = client.inboxes.messages.list('inbox_abc123')['result']Resources
- PyPI Package: pypi.org/project/daimon-email
- GitHub Repo: github.com/daimon-email/daimon-python
- Type Stubs: Included in package
- Generated via: Fern from OpenAPI spec