Automating Twilio Recording Exports for Quality Purposes: Python Implementation Guidelines
Discover how to use Python to download recordings from Twilio and transcribe them for sentimental analysis, quality, and audit purposes.
Join the DZone community and get the full member experience.
Join For FreeFor crucial business operations, compliance, and quality assurance call recordings are pivotal. Twilio is a call management system that provides excellent call recording capabilities, but often organizations are in need of automatically downloading and storing these recordings locally or in their preferred cloud storage. However, downloading large numbers of recordings from Twilio can be challenging. In this article, we'll explore how to build an efficient Python solution for bulk-downloading Twilio recordings while handling pagination, parallel downloads, and queue filtering.
Use Cases
When working with call management systems like Twilio, we might need to:
- Download thousands of call recordings for quality assurance.
- Export call recordings while excluding specific queues.
- Process or download recordings within specific date ranges.
- Handle processes efficiently without overwhelming resources.
Solution Overview
Using Python, we will create a class that handles the bulk download of recordings with the following key features:
- Parallel downloads using
ThreadPoolExecutor
- Pagination handling for large datasets
- Queue filtering capabilities
- Progress tracking with
tqdm
- Error handling and retry logic
Prerequisites
- Python 3.8+
- Twilio account with recordings
Required Python packages:
twilio
boto3
python-dotenv
requests
import os
from datetime import datetime, timedelta
from twilio.rest import Client
import requests
from pathlib import Path
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
Implementation
Complete the Python class as shown here:
import os
from datetime import datetime, timedelta
from twilio.rest import Client
import requests
from pathlib import Path
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
class TwilioRecordingExporter:
def __init__(self, account_sid, auth_token, output_dir="random_recordings"):
"""
Initialize the exporter with Twilio credentials
"""
self.client = Client(account_sid, auth_token)
self.account_sid = account_sid
self.auth_token = auth_token
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
# Excluded queues with their queue SIDs (no need for names anymore)
self.excluded_queue_sids = {
'WQ65xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', # Example SID 1
'WQ3xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', # Example SID 2
'WQexxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', # Example SID 3
'WQ0xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # Example SID 4
}
self.max_workers = 10 # Number of parallel downloads
def download_recording(self, recording):
"""
Download a single recording
"""
try:
date_str = recording.date_created.strftime('%Y%m%d_')
filename = f"{date_str}{recording.sid}.wav"
filepath = self.output_dir / filename
if filepath.exists():
return filepath
wav_url = f"{recording.media_url}.wav"
response = requests.get(wav_url, auth=(self.account_sid, self.auth_token))
if response.status_code == 200:
filepath.write_bytes(response.content)
return filepath
else:
print(f"\nFailed to download {recording.sid}: {response.status_code}")
return None
except Exception as e:
print(f"\nError downloading recording {recording.sid}: {str(e)}")
return None
def download_batch(self, recordings):
"""
Download a batch of recordings in parallel
"""
successful_downloads = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_recording = {
executor.submit(self.download_recording, recording): recording
for recording in recordings
}
for future in as_completed(future_to_recording):
filepath = future.result()
if filepath:
successful_downloads.append(filepath)
return successful_downloads
def export_random_recordings(self, num_recordings=10000, days_back=180, batch_size=100):
"""
Export random recordings while excluding specific queues
"""
downloaded_files = []
try:
# Calculate date range
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days_back)
print(f"Fetching recordings from {start_date} to {end_date}")
print("Excluded queues SIDs:", ", ".join(self.excluded_queue_sids))
# Fetch recordings with pagination
all_recordings = []
page = self.client.recordings.list(
date_created_after=start_date,
date_created_before=end_date,
page_size=100 # Maximum page size
)
with tqdm(desc="Fetching recordings", unit="page") as pbar:
while page:
all_recordings.extend(page)
pbar.update(1)
if len(all_recordings) >= num_recordings * 2: # Fetch extra to account for excluded queues
break
page = page.next_page() if hasattr(page, 'next_page') else None
print(f"\nFound {len(all_recordings)} recordings")
# Shuffle recordings
random.shuffle(all_recordings)
# Process in batches
selected_recordings = []
processed_count = 0
with tqdm(total=num_recordings, desc="Downloading recordings") as pbar:
for i in range(0, len(all_recordings), batch_size):
if processed_count >= num_recordings:
break
batch = all_recordings[i:i + batch_size]
# Filter out recordings associated with excluded queues
filtered_batch = [
recording for recording in batch
if not self.is_recording_in_excluded_queue(recording)
]
downloaded_batch = self.download_batch(filtered_batch)
downloaded_files.extend(downloaded_batch)
new_count = min(len(downloaded_batch), num_recordings - processed_count)
processed_count += new_count
pbar.update(new_count)
if processed_count >= num_recordings:
break
except Exception as e:
print(f"\nError in export process: {str(e)}")
return downloaded_files[:num_recordings]
def is_recording_in_excluded_queue(self, recording):
"""
Check if the recording is associated with an excluded queue based on task queue SID
"""
task_queue_sid = recording.queue_sid if hasattr(recording, 'queue_sid') else None
return task_queue_sid in self.excluded_queue_sids
def main():
# Your Twilio credentials
ACCOUNT_SID = "AC738a9a46c65dxxxxxxxxxxxxxxxxx"
AUTH_TOKEN = "xxxxxxxxxx9ae2e4572xxxxxxxxxxxx"
try:
start_time = time.time()
# Create exporter instance
exporter = TwilioRecordingExporter(ACCOUNT_SID, AUTH_TOKEN)
# Download random recordings
print("Starting random recording export...")
downloaded_files = exporter.export_random_recordings(
num_recordings=10000,
days_back=180,
batch_size=100
)
duration = time.time() - start_time
print(f"\nExport complete:")
print(f"- Downloaded: {len(downloaded_files)} files")
print(f"- Location: {exporter.output_dir}")
print(f"- Time taken: {duration:.2f} seconds")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
main()
main()
print('success')
Let's breakdown the above code into manageable components:
1. Basic Setup
First, we create a Python class to handle Twilio client initialization and configuration:
class TwilioRecordingExporter:
def __init__(self, account_sid, auth_token, output_dir="random_recordings"):
self.client = Client(account_sid, auth_token)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.max_workers = 10
2. Single Record Download Implementation
The method below will handle individual recording downloads:
def download_recording(self, recording):
try:
date_str = recording.date_created.strftime('%Y%m%d_')
filename = f"{date_str}{recording.sid}.wav"
filepath = self.output_dir / filename
if filepath.exists():
return filepath
wav_url = f"{recording.media_url}.wav"
response = requests.get(wav_url,
auth=(self.account_sid, self.auth_token))
if response.status_code == 200:
filepath.write_bytes(response.content)
return filepath
except Exception as e:
print(f"\nError downloading recording {recording.sid}: {str(e)}")
return None
3. Parallel Downloads
Implementing the code below will improve performance when downloading a large number of recordings.
def download_batch(self, recordings):
successful_downloads = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_recording = {
executor.submit(self.download_recording, recording): recording
for recording in recordings
}
for future in as_completed(future_to_recording):
filepath = future.result()
if filepath:
successful_downloads.append(filepath)
return successful_downloads
4. Queue Filtering
For queue filtering, we can filter out or eliminate a few queues that are not required for QA.
def is_recording_in_excluded_queue(self, recording):
task_queue_sid = recording.queue_sid if hasattr(recording, 'queue_sid') else None
return task_queue_sid in self.excluded_queue_sids
Best Practices and Optimizations
Batch Processing
To manage resources efficiently and process recordings in batches, use the following:
for i in range(0, len(all_recordings), batch_size):
batch = all_recordings[i:i + batch_size]
Tracking Progress
Implement tqdm
for tracking progress visually:
with tqdm(total=num_recordings, desc="Downloading recordings") as pbar:
# Download process
pbar.update(new_count)
Error Handling
Error handling can be implemented at multiple levels, such as:
- Download failures
- Batch processing errors
- API communication issues
Resource Management
Parallel downloads can be controlled by max_workers
as shown below:
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Parallel download logic
Example Usage
Use the below exporter
to download 10000
records within 180
days chunking batch sizes of 100
:
exporter = TwilioRecordingExporter(ACCOUNT_SID, AUTH_TOKEN)
downloaded_files = exporter.export_random_recordings(
num_recordings=10000,
days_back=180,
batch_size=100
)
Considerations for Security
- File safety: Use
pathlib
for safe file operations:
filepath = Path(output_dir) / filename
- Credential management:
ACCOUNT_SID = os.environ.get('TWILIO_ACCOUNT_SID')
AUTH_TOKEN = os.environ.get('TWILIO_AUTH_TOKEN')
- Further improvement considerations:
- Call metadata can be included to know the caller name, agent name, duration, and other factors.
- Automate export to AWS S3 buckets
Conclusion
Using the solution above, one can efficiently download bulk recordings from Twilio while maintaining best practices like error handling, performance, and resource management. This implementation can be easily extended for further use cases and can be scaled according to needs.
Opinions expressed by DZone contributors are their own.
Comments