First Experience
I still remember my first time using AWS S3. I thought uploading a simple file to cloud storage would be easy, but just configuring authentication took me quite a while. Today, let me walk you through various practical experiences in AWS S3 development step by step.
Did you know? According to AWS official statistics, as of 2024, over 1 million companies worldwide are using S3 storage services, processing more than 100PB of data daily. These numbers tell us that mastering S3 development has become an essential skill for Python developers.
Environment Setup
Before we start coding, we need to set up the development environment. Here's a pitfall I encountered: don't use the root account's Access Key directly. I suggest creating a user with minimal permissions in the IAM console specifically for development and testing.
import boto3
import os
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='ap-northeast-1'
)
Core Operations
When it comes to file uploads, many developers like to use the upload_file method directly. But you might not know that when the file size exceeds 100MB, this approach may encounter timeout issues. I encountered this situation in a real project and later solved it using multipart upload.
def upload_large_file(file_path, bucket, object_name):
# Initialize multipart upload
mpu = s3_client.create_multipart_upload(
Bucket=bucket,
Key=object_name
)
# Set chunk size to 5MB
chunk_size = 5 * 1024 * 1024
chunk_count = 0
# Read and upload parts
try:
with open(file_path, 'rb') as f:
while True:
data = f.read(chunk_size)
if not data:
break
chunk_count += 1
s3_client.upload_part(
Bucket=bucket,
Key=object_name,
PartNumber=chunk_count,
UploadId=mpu['UploadId'],
Body=data
)
# Complete upload
s3_client.complete_multipart_upload(
Bucket=bucket,
Key=object_name,
UploadId=mpu['UploadId'],
MultipartUpload={
'Parts': [
{'PartNumber': i, 'ETag': f'"{i}"'}
for i in range(1, chunk_count + 1)
]
}
)
except Exception as e:
# Abort upload on error
s3_client.abort_multipart_upload(
Bucket=bucket,
Key=object_name,
UploadId=mpu['UploadId']
)
raise e
Performance Optimization
Speaking of performance optimization, I recently discovered an interesting phenomenon in a project. When uploading many small files, simply looping through upload_file results in poor performance. After testing, I found that using multithreading can significantly improve performance.
from concurrent.futures import ThreadPoolExecutor
import threading
class UploadTracker:
def __init__(self):
self.lock = threading.Lock()
self.total = 0
self.completed = 0
def increment(self):
with self.lock:
self.completed += 1
return f'Progress: {self.completed}/{self.total}'
def batch_upload(file_list, bucket):
tracker = UploadTracker()
tracker.total = len(file_list)
def upload_single_file(file_path):
try:
object_name = os.path.basename(file_path)
s3_client.upload_file(file_path, bucket, object_name)
return tracker.increment()
except Exception as e:
return f'Failed to upload {file_path}: {str(e)}'
# Use thread pool for concurrent uploads
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(upload_single_file, file_list))
return results
Security Protection
In cloud development, security is always paramount. I've seen too many projects discover security vulnerabilities after going live. For example, some developers set buckets to public access, which is very dangerous. Let's look at how to properly set bucket access permissions:
def create_secure_bucket(bucket_name):
# Create private bucket
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={
'LocationConstraint': 'ap-northeast-1'
}
)
# Set bucket policy
bucket_policy = {
'Version': '2012-10-17',
'Statement': [{
'Sid': 'DenyPublicAccess',
'Effect': 'Deny',
'Principal': '*',
'Action': 's3:*',
'Resource': [
f'arn:aws:s3:::{bucket_name}',
f'arn:aws:s3:::{bucket_name}/*'
],
'Condition': {
'Bool': {
'aws:SecureTransport': 'false'
}
}
}]
}
s3_client.put_bucket_policy(
Bucket=bucket_name,
Policy=json.dumps(bucket_policy)
)
Cost Control
When it comes to cloud services, we must address cost control. In one project, I encountered cost overruns due to forgetting to delete temporary files. So I wrote a script to automatically clean up expired files:
def cleanup_expired_objects(bucket, days_threshold=7):
# Get timestamp from specified days ago
cutoff_date = datetime.now() - timedelta(days=days_threshold)
# List all objects
paginator = s3_client.get_paginator('list_objects_v2')
deleted_count = 0
saved_space = 0
for page in paginator.paginate(Bucket=bucket):
if 'Contents' not in page:
continue
# Filter expired files
expired_objects = [
obj for obj in page['Contents']
if obj['LastModified'].replace(tzinfo=None) < cutoff_date
]
if not expired_objects:
continue
# Batch delete
objects_to_delete = {'Objects': [{'Key': obj['Key']} for obj in expired_objects]}
s3_client.delete_objects(Bucket=bucket, Delete=objects_to_delete)
# Calculate saved space
deleted_count += len(expired_objects)
saved_space += sum(obj['Size'] for obj in expired_objects)
return {
'deleted_count': deleted_count,
'saved_space_mb': saved_space / (1024 * 1024)
}
Experience Summary
Through this period of practice, I've summarized several important lessons:
- Always use the principle of least privilege when configuring IAM users
- Use multipart upload for large files and concurrent upload for small files
- Regularly clean up expired files to avoid cost overruns
- Manage all sensitive information through environment variables
Do you find these experiences helpful? Feel free to share your problems and solutions when using AWS S3 in the comments. If you want to learn more about cloud development, let me know, and we can discuss other topics next time.
Remember, in the cloud development field, maintaining enthusiasm for learning is important. Technology keeps advancing, and we need to keep up with the times. What do you think?