Amazon's S3 API has become the universal standard for object storage. Whether you're using AWS, Google Cloud, Hetzner, or a self-hosted solution like DanubeData, the same Python code works everywhere — thanks to the boto3 library.
This guide covers everything you need to work with S3-compatible storage in Python: from basic operations to presigned URLs, multipart uploads, error handling, and framework integrations with Django and Flask.
Why Use S3-Compatible Storage?
S3-compatible storage gives you the power of the S3 API without vendor lock-in. Here's why developers choose it:
| Benefit | Description |
|---|---|
| No Vendor Lock-in | Switch providers by changing one endpoint URL — your code stays the same |
| Cost Savings | European S3-compatible providers like DanubeData cost a fraction of AWS (€3.99/month includes 1TB) |
| GDPR Compliance | Store data in European data centers with full data sovereignty |
| Mature Ecosystem | boto3, rclone, Cyberduck, and hundreds of tools support S3-compatible APIs |
| Battle-Tested Protocol | The S3 API has been refined over 20 years — it's rock solid |
Installing boto3
The boto3 library is Amazon's official Python SDK for AWS services, including S3. It works with any S3-compatible endpoint.
Using pip
pip install boto3
Using Poetry
poetry add boto3
Using pipenv
pipenv install boto3
In a requirements.txt
boto3>=1.35.0
botocore>=1.35.0
Verify the installation:
python -c "import boto3; print(boto3.__version__)"
Configuring boto3 for S3-Compatible Endpoints
The key difference between using boto3 with AWS and an S3-compatible provider is the endpoint_url parameter. With DanubeData, your endpoint is https://s3.danubedata.ro.
Basic Configuration
import boto3
from botocore.config import Config
# Create an S3 client for DanubeData
s3_client = boto3.client(
's3',
endpoint_url='https://s3.danubedata.ro',
region_name='fsn1',
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY',
config=Config(
signature_version='s3v4',
s3={'addressing_style': 'path'}
)
)
# Create an S3 resource (higher-level API)
s3_resource = boto3.resource(
's3',
endpoint_url='https://s3.danubedata.ro',
region_name='fsn1',
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY',
config=Config(
signature_version='s3v4',
s3={'addressing_style': 'path'}
)
)
Using Environment Variables (Recommended)
Never hardcode credentials. Use environment variables instead:
# .env file or shell environment
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export S3_ENDPOINT_URL=https://s3.danubedata.ro
export S3_REGION=fsn1
import os
import boto3
from botocore.config import Config
s3_client = boto3.client(
's3',
endpoint_url=os.environ['S3_ENDPOINT_URL'],
region_name=os.environ.get('S3_REGION', 'fsn1'),
config=Config(
signature_version='s3v4',
s3={'addressing_style': 'path'}
)
)
# boto3 automatically reads AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
Using a Session (Multiple Profiles)
session = boto3.Session(
profile_name='danubedata'
)
s3_client = session.client(
's3',
endpoint_url='https://s3.danubedata.ro',
config=Config(
signature_version='s3v4',
s3={'addressing_style': 'path'}
)
)
Configure the profile in ~/.aws/credentials:
[danubedata]
aws_access_key_id = YOUR_ACCESS_KEY
aws_secret_access_key = YOUR_SECRET_KEY
region = fsn1
Basic Operations
Create a Bucket
def create_bucket(client, bucket_name):
"""Create a new S3 bucket."""
try:
client.create_bucket(Bucket=bucket_name)
print(f"Bucket '{bucket_name}' created successfully.")
except client.exceptions.BucketAlreadyOwnedByYou:
print(f"Bucket '{bucket_name}' already exists and is owned by you.")
except Exception as e:
print(f"Error creating bucket: {e}")
create_bucket(s3_client, 'my-app-uploads')
List All Buckets
def list_buckets(client):
"""List all buckets in the account."""
response = client.list_buckets()
print("Your buckets:")
for bucket in response['Buckets']:
print(f" - {bucket['Name']} (created: {bucket['CreationDate']})")
return response['Buckets']
list_buckets(s3_client)
Upload a File
def upload_file(client, file_path, bucket, object_key=None):
"""Upload a file to S3."""
if object_key is None:
object_key = os.path.basename(file_path)
try:
client.upload_file(
Filename=file_path,
Bucket=bucket,
Key=object_key,
ExtraArgs={
'ContentType': 'application/octet-stream',
'Metadata': {
'uploaded-by': 'my-python-app',
'environment': 'production'
}
}
)
print(f"Uploaded {file_path} to s3://{bucket}/{object_key}")
except Exception as e:
print(f"Upload failed: {e}")
upload_file(s3_client, '/path/to/report.pdf', 'my-app-uploads', 'reports/2026/q1-report.pdf')
Upload from a String or Bytes
import json
# Upload JSON data directly
data = {"users": 1500, "revenue": 45000, "month": "March 2026"}
s3_client.put_object(
Bucket='my-app-uploads',
Key='analytics/march-2026.json',
Body=json.dumps(data, indent=2),
ContentType='application/json'
)
# Upload binary data
with open('image.png', 'rb') as f:
s3_client.put_object(
Bucket='my-app-uploads',
Key='images/logo.png',
Body=f.read(),
ContentType='image/png'
)
Download a File
def download_file(client, bucket, object_key, download_path):
"""Download a file from S3."""
try:
client.download_file(
Bucket=bucket,
Key=object_key,
Filename=download_path
)
print(f"Downloaded s3://{bucket}/{object_key} to {download_path}")
except client.exceptions.NoSuchKey:
print(f"Object '{object_key}' does not exist in bucket '{bucket}'.")
except Exception as e:
print(f"Download failed: {e}")
download_file(s3_client, 'my-app-uploads', 'reports/2026/q1-report.pdf', '/tmp/q1-report.pdf')
Download to Memory
import io
response = s3_client.get_object(Bucket='my-app-uploads', Key='analytics/march-2026.json')
content = response['Body'].read().decode('utf-8')
data = json.loads(content)
print(f"Users: {data['users']}, Revenue: €{data['revenue']}")
List Objects in a Bucket
def list_objects(client, bucket, prefix=''):
"""List objects in a bucket with optional prefix filter."""
paginator = client.get_paginator('list_objects_v2')
total_size = 0
total_count = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get('Contents', []):
size_mb = obj['Size'] / (1024 * 1024)
print(f" {obj['Key']} ({size_mb:.2f} MB, modified: {obj['LastModified']})")
total_size += obj['Size']
total_count += 1
print(f"
Total: {total_count} objects, {total_size / (1024**3):.2f} GB")
list_objects(s3_client, 'my-app-uploads', prefix='reports/')
Delete an Object
def delete_object(client, bucket, object_key):
"""Delete a single object from S3."""
client.delete_object(Bucket=bucket, Key=object_key)
print(f"Deleted s3://{bucket}/{object_key}")
delete_object(s3_client, 'my-app-uploads', 'reports/2026/old-report.pdf')
Delete Multiple Objects
def delete_objects(client, bucket, keys):
"""Delete multiple objects in a single request (up to 1000)."""
objects = [{'Key': key} for key in keys]
response = client.delete_objects(
Bucket=bucket,
Delete={'Objects': objects, 'Quiet': True}
)
errors = response.get('Errors', [])
if errors:
for error in errors:
print(f"Failed to delete {error['Key']}: {error['Message']}")
else:
print(f"Successfully deleted {len(keys)} objects.")
delete_objects(s3_client, 'my-app-uploads', [
'temp/file1.txt',
'temp/file2.txt',
'temp/file3.txt'
])
Copy an Object
# Copy within the same bucket
s3_client.copy_object(
Bucket='my-app-uploads',
Key='backups/report-backup.pdf',
CopySource={'Bucket': 'my-app-uploads', 'Key': 'reports/2026/q1-report.pdf'}
)
# Copy between buckets
s3_client.copy_object(
Bucket='my-archive-bucket',
Key='2026/q1-report.pdf',
CopySource={'Bucket': 'my-app-uploads', 'Key': 'reports/2026/q1-report.pdf'}
)
Presigned URLs for Temporary Access
Presigned URLs let you grant temporary access to private objects without sharing your credentials. This is essential for serving user uploads, generating download links, or allowing direct browser uploads.
Generate a Download URL
def generate_download_url(client, bucket, object_key, expiration=3600):
"""Generate a presigned URL for downloading an object.
Args:
expiration: URL validity in seconds (default: 1 hour)
"""
url = client.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': bucket,
'Key': object_key
},
ExpiresIn=expiration
)
return url
# URL valid for 24 hours
download_url = generate_download_url(
s3_client, 'my-app-uploads', 'reports/2026/q1-report.pdf',
expiration=86400
)
print(f"Download link (valid 24h): {download_url}")
Generate an Upload URL (Browser Direct Upload)
def generate_upload_url(client, bucket, object_key, content_type, expiration=3600):
"""Generate a presigned URL for uploading directly from a browser."""
url = client.generate_presigned_url(
ClientMethod='put_object',
Params={
'Bucket': bucket,
'Key': object_key,
'ContentType': content_type
},
ExpiresIn=expiration
)
return url
# Generate URL for a user avatar upload
upload_url = generate_upload_url(
s3_client, 'my-app-uploads',
'avatars/user-12345.jpg',
'image/jpeg',
expiration=600 # 10 minutes
)
print(f"Upload URL: {upload_url}")
Use this URL from JavaScript in the browser:
// Frontend JavaScript
const response = await fetch(uploadUrl, {
method: 'PUT',
body: file,
headers: {
'Content-Type': 'image/jpeg'
}
});
Presigned POST (Form-Based Upload)
def generate_presigned_post(client, bucket, key_prefix, max_size_mb=10, expiration=3600):
"""Generate presigned POST data for HTML form uploads."""
conditions = [
['content-length-range', 1, max_size_mb * 1024 * 1024],
['starts-with', '$key', key_prefix],
['starts-with', '$Content-Type', 'image/']
]
post_data = client.generate_presigned_post(
Bucket=bucket,
Key=f"{key_prefix}/${{filename}}",
Conditions=conditions,
ExpiresIn=expiration
)
return post_data
post = generate_presigned_post(s3_client, 'my-app-uploads', 'uploads/photos')
print(f"POST URL: {post['url']}")
print(f"Fields: {post['fields']}")
Multipart Uploads for Large Files
For files larger than 100 MB, multipart uploads are strongly recommended. They split the file into parts, upload them in parallel, and can resume from where they left off if a part fails.
Using TransferConfig (Recommended)
from boto3.s3.transfer import TransferConfig
# Configure multipart settings
transfer_config = TransferConfig(
multipart_threshold=100 * 1024 * 1024, # 100 MB: switch to multipart above this
multipart_chunksize=25 * 1024 * 1024, # 25 MB per part
max_concurrency=10, # 10 parallel uploads
use_threads=True # Enable threading
)
def upload_large_file(client, file_path, bucket, key):
"""Upload a large file with multipart upload and progress tracking."""
file_size = os.path.getsize(file_path)
class ProgressCallback:
def __init__(self, total):
self.total = total
self.uploaded = 0
def __call__(self, bytes_transferred):
self.uploaded += bytes_transferred
pct = (self.uploaded / self.total) * 100
print(f"
Progress: {pct:.1f}% ({self.uploaded / (1024**2):.1f} MB / {self.total / (1024**2):.1f} MB)", end='')
client.upload_file(
Filename=file_path,
Bucket=bucket,
Key=key,
Config=transfer_config,
Callback=ProgressCallback(file_size)
)
print(f"
Upload complete: s3://{bucket}/{key}")
upload_large_file(s3_client, '/data/database-dump.sql.gz', 'my-backups', 'db/2026-03-19.sql.gz')
Manual Multipart Upload (Advanced)
def manual_multipart_upload(client, file_path, bucket, key, part_size=25*1024*1024):
"""Manual multipart upload with retry logic for each part."""
# Initiate multipart upload
mpu = client.create_multipart_upload(Bucket=bucket, Key=key)
upload_id = mpu['UploadId']
parts = []
try:
with open(file_path, 'rb') as f:
part_number = 1
while True:
data = f.read(part_size)
if not data:
break
# Upload each part with retry
for attempt in range(3):
try:
response = client.upload_part(
Bucket=bucket,
Key=key,
PartNumber=part_number,
UploadId=upload_id,
Body=data
)
parts.append({
'PartNumber': part_number,
'ETag': response['ETag']
})
print(f"Part {part_number} uploaded ({len(data) / (1024**2):.1f} MB)")
break
except Exception as e:
if attempt == 2:
raise
print(f"Part {part_number} failed (attempt {attempt+1}): {e}")
part_number += 1
# Complete multipart upload
client.complete_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts}
)
print(f"Multipart upload complete: s3://{bucket}/{key}")
except Exception as e:
# Abort on failure to clean up incomplete parts
client.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
print(f"Upload aborted: {e}")
raise
manual_multipart_upload(s3_client, '/data/large-video.mp4', 'media-bucket', 'videos/intro.mp4')
Error Handling
Robust error handling is critical for production applications. boto3 raises specific exceptions you can catch and handle gracefully.
from botocore.exceptions import (
ClientError,
EndpointConnectionError,
ConnectionClosedError,
NoCredentialsError,
ParamValidationError
)
def safe_get_object(client, bucket, key):
"""Download an object with comprehensive error handling."""
try:
response = client.get_object(Bucket=bucket, Key=key)
return response['Body'].read()
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NoSuchKey':
print(f"Object '{key}' not found in bucket '{bucket}'.")
elif error_code == 'NoSuchBucket':
print(f"Bucket '{bucket}' does not exist.")
elif error_code == 'AccessDenied':
print(f"Access denied to s3://{bucket}/{key}. Check your credentials and permissions.")
elif error_code == 'InvalidAccessKeyId':
print("Invalid access key. Please verify your credentials.")
elif error_code == 'SignatureDoesNotMatch':
print("Secret key mismatch. Please verify your credentials.")
elif error_code == 'RequestTimeTooSkewed':
print("Server clock is too far out of sync. Check your system time.")
else:
print(f"S3 error ({error_code}): {e.response['Error']['Message']}")
return None
except EndpointConnectionError:
print("Cannot connect to S3 endpoint. Check your endpoint_url and network.")
return None
except ConnectionClosedError:
print("Connection was closed unexpectedly. Retry the request.")
return None
except NoCredentialsError:
print("No AWS credentials found. Set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.")
return None
except ParamValidationError as e:
print(f"Invalid parameters: {e}")
return None
Retry Logic with Exponential Backoff
import time
import random
def retry_s3_operation(operation, max_retries=3, base_delay=1):
"""Retry an S3 operation with exponential backoff and jitter."""
for attempt in range(max_retries):
try:
return operation()
except ClientError as e:
error_code = e.response['Error']['Code']
# Don't retry client errors (4xx)
if error_code in ('NoSuchKey', 'NoSuchBucket', 'AccessDenied'):
raise
# Retry server errors (5xx) and throttling
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed, retrying in {delay:.1f}s...")
time.sleep(delay)
else:
raise
# Usage
data = retry_s3_operation(
lambda: s3_client.get_object(Bucket='my-bucket', Key='data.json')['Body'].read()
)
Working with Bucket Policies
Bucket policies control access to your buckets and objects. They're written as JSON documents.
Set a Public Read Policy
import json
public_read_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PublicReadGetObject",
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::my-public-assets/*"
}
]
}
s3_client.put_bucket_policy(
Bucket='my-public-assets',
Policy=json.dumps(public_read_policy)
)
print("Public read policy applied.")
Restrict Access by IP Address
ip_restricted_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowFromOfficeOnly",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::sensitive-data",
"arn:aws:s3:::sensitive-data/*"
],
"Condition": {
"NotIpAddress": {
"aws:SourceIp": [
"203.0.113.0/24",
"198.51.100.42/32"
]
}
}
}
]
}
s3_client.put_bucket_policy(
Bucket='sensitive-data',
Policy=json.dumps(ip_restricted_policy)
)
Get and Delete Policies
# Get current bucket policy
try:
response = s3_client.get_bucket_policy(Bucket='my-bucket')
policy = json.loads(response['Policy'])
print(json.dumps(policy, indent=2))
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucketPolicy':
print("No bucket policy set.")
# Delete bucket policy
s3_client.delete_bucket_policy(Bucket='my-bucket')
Bucket Versioning and Lifecycle Rules
Enable Versioning
s3_client.put_bucket_versioning(
Bucket='my-versioned-bucket',
VersioningConfiguration={'Status': 'Enabled'}
)
# List object versions
response = s3_client.list_object_versions(Bucket='my-versioned-bucket', Prefix='config.json')
for version in response.get('Versions', []):
print(f" Version: {version['VersionId']} | Modified: {version['LastModified']} | Size: {version['Size']}")
Set Lifecycle Rules
s3_client.put_bucket_lifecycle_configuration(
Bucket='my-app-uploads',
LifecycleConfiguration={
'Rules': [
{
'ID': 'delete-temp-files',
'Filter': {'Prefix': 'temp/'},
'Status': 'Enabled',
'Expiration': {'Days': 7}
},
{
'ID': 'delete-old-logs',
'Filter': {'Prefix': 'logs/'},
'Status': 'Enabled',
'Expiration': {'Days': 90}
}
]
}
)
CORS Configuration
If you're uploading files directly from the browser, you need CORS configured on your bucket:
s3_client.put_bucket_cors(
Bucket='my-app-uploads',
CORSConfiguration={
'CORSRules': [
{
'AllowedOrigins': ['https://myapp.com', 'https://www.myapp.com'],
'AllowedMethods': ['GET', 'PUT', 'POST', 'DELETE'],
'AllowedHeaders': ['*'],
'ExposeHeaders': ['ETag', 'x-amz-request-id'],
'MaxAgeSeconds': 3600
}
]
}
)
Django Integration with django-storages
The django-storages library provides seamless S3 integration for Django's file handling system.
Installation
pip install django-storages boto3
Settings Configuration
# settings.py
INSTALLED_APPS = [
# ... your apps
'storages',
]
# S3-compatible storage settings for DanubeData
AWS_ACCESS_KEY_ID = os.environ['S3_ACCESS_KEY']
AWS_SECRET_ACCESS_KEY = os.environ['S3_SECRET_KEY']
AWS_STORAGE_BUCKET_NAME = 'my-django-app'
AWS_S3_ENDPOINT_URL = 'https://s3.danubedata.ro'
AWS_S3_REGION_NAME = 'fsn1'
AWS_S3_SIGNATURE_VERSION = 's3v4'
AWS_S3_ADDRESSING_STYLE = 'path'
# Customize file URLs
AWS_S3_CUSTOM_DOMAIN = None # Use default endpoint
AWS_QUERYSTRING_AUTH = True # Use presigned URLs for private files
AWS_QUERYSTRING_EXPIRE = 3600 # URL expiry in seconds
# Optional: Prevent overwriting files with same name
AWS_S3_FILE_OVERWRITE = False
AWS_DEFAULT_ACL = None
# Use S3 for media files
DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
# Use S3 for static files too (optional)
# STATICFILES_STORAGE = 'storages.backends.s3boto3.S3StaticStorage'
Custom Storage Backends
# storage_backends.py
from storages.backends.s3boto3 import S3Boto3Storage
class PublicMediaStorage(S3Boto3Storage):
location = 'media'
default_acl = 'public-read'
file_overwrite = False
querystring_auth = False
class PrivateMediaStorage(S3Boto3Storage):
location = 'private'
default_acl = 'private'
file_overwrite = False
querystring_auth = True
querystring_expire = 3600 # 1 hour
Using in Models
from django.db import models
from .storage_backends import PublicMediaStorage, PrivateMediaStorage
class UserProfile(models.Model):
avatar = models.ImageField(
storage=PublicMediaStorage(),
upload_to='avatars/',
blank=True
)
id_document = models.FileField(
storage=PrivateMediaStorage(),
upload_to='documents/',
blank=True
)
def get_document_url(self):
"""Get a presigned URL for the private document."""
if self.id_document:
return self.id_document.url # Automatically generates presigned URL
return None
Direct Upload from Django Views
from django.http import JsonResponse
from django.views.decorators.http import require_POST
import boto3
from botocore.config import Config
import uuid
@require_POST
def get_upload_url(request):
"""Generate a presigned URL for direct browser upload."""
filename = request.POST.get('filename')
content_type = request.POST.get('content_type', 'application/octet-stream')
s3 = boto3.client(
's3',
endpoint_url=settings.AWS_S3_ENDPOINT_URL,
region_name=settings.AWS_S3_REGION_NAME,
config=Config(signature_version='s3v4', s3={'addressing_style': 'path'})
)
key = f"uploads/{uuid.uuid4()}/{filename}"
url = s3.generate_presigned_url(
'put_object',
Params={
'Bucket': settings.AWS_STORAGE_BUCKET_NAME,
'Key': key,
'ContentType': content_type
},
ExpiresIn=600
)
return JsonResponse({'url': url, 'key': key})
Flask Integration
Flask doesn't have a built-in storage abstraction like Django, but integrating boto3 directly is straightforward.
Flask Application Setup
from flask import Flask, request, jsonify
import boto3
from botocore.config import Config
import os
import uuid
app = Flask(__name__)
# Initialize S3 client
s3_client = boto3.client(
's3',
endpoint_url=os.environ.get('S3_ENDPOINT_URL', 'https://s3.danubedata.ro'),
region_name=os.environ.get('S3_REGION', 'fsn1'),
config=Config(
signature_version='s3v4',
s3={'addressing_style': 'path'}
)
)
BUCKET_NAME = os.environ.get('S3_BUCKET', 'my-flask-app')
@app.route('/upload', methods=['POST'])
def upload_file():
"""Handle file upload via Flask and store in S3."""
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Generate unique key
ext = os.path.splitext(file.filename)[1]
key = f"uploads/{uuid.uuid4()}{ext}"
try:
s3_client.upload_fileobj(
file,
BUCKET_NAME,
key,
ExtraArgs={'ContentType': file.content_type}
)
return jsonify({'key': key, 'message': 'Upload successful'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/download/<path:key>')
def download_file(key):
"""Generate a presigned download URL."""
try:
url = s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': BUCKET_NAME, 'Key': key},
ExpiresIn=3600
)
return jsonify({'url': url})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/files')
def list_files():
"""List all uploaded files."""
paginator = s3_client.get_paginator('list_objects_v2')
files = []
for page in paginator.paginate(Bucket=BUCKET_NAME, Prefix='uploads/'):
for obj in page.get('Contents', []):
files.append({
'key': obj['Key'],
'size': obj['Size'],
'modified': obj['LastModified'].isoformat()
})
return jsonify({'files': files})
if __name__ == '__main__':
app.run(debug=True)
Flask with Flask-Uploads Pattern
class S3Storage:
"""Reusable S3 storage helper for Flask applications."""
def __init__(self, app=None):
self.client = None
self.bucket = None
if app:
self.init_app(app)
def init_app(self, app):
self.client = boto3.client(
's3',
endpoint_url=app.config['S3_ENDPOINT_URL'],
region_name=app.config.get('S3_REGION', 'fsn1'),
config=Config(signature_version='s3v4', s3={'addressing_style': 'path'})
)
self.bucket = app.config['S3_BUCKET']
def upload(self, file_obj, key, content_type=None):
self.client.upload_fileobj(
file_obj, self.bucket, key,
ExtraArgs={'ContentType': content_type or 'application/octet-stream'}
)
return key
def download(self, key):
response = self.client.get_object(Bucket=self.bucket, Key=key)
return response['Body'].read()
def get_url(self, key, expires=3600):
return self.client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket, 'Key': key},
ExpiresIn=expires
)
def delete(self, key):
self.client.delete_object(Bucket=self.bucket, Key=key)
# Usage
storage = S3Storage()
storage.init_app(app)
Performance Tips
1. Use Threading for Bulk Operations
from concurrent.futures import ThreadPoolExecutor, as_completed
def bulk_upload(client, bucket, files, max_workers=10):
"""Upload multiple files concurrently using threads."""
results = []
def upload_single(file_path, key):
client.upload_file(file_path, bucket, key)
return key
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {}
for file_path, key in files:
future = executor.submit(upload_single, file_path, key)
futures[future] = key
for future in as_completed(futures):
key = futures[future]
try:
future.result()
results.append({'key': key, 'status': 'success'})
print(f"Uploaded: {key}")
except Exception as e:
results.append({'key': key, 'status': 'failed', 'error': str(e)})
print(f"Failed: {key} - {e}")
return results
# Upload 100 files concurrently
files = [(f'/data/file_{i}.csv', f'data/file_{i}.csv') for i in range(100)]
bulk_upload(s3_client, 'my-bucket', files, max_workers=20)
2. Connection Pooling
from botocore.config import Config
# Increase connection pool size for high-throughput applications
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'},
signature_version='s3v4',
s3={'addressing_style': 'path'}
)
s3_client = boto3.client(
's3',
endpoint_url='https://s3.danubedata.ro',
region_name='fsn1',
config=config
)
3. Stream Large Files Without Loading Into Memory
def stream_download(client, bucket, key, chunk_size=8192):
"""Stream a large file without loading it entirely into memory."""
response = client.get_object(Bucket=bucket, Key=key)
body = response['Body']
with open(f'/tmp/{os.path.basename(key)}', 'wb') as f:
while True:
chunk = body.read(chunk_size)
if not chunk:
break
f.write(chunk)
# Process a large CSV line by line
import csv
import io
response = s3_client.get_object(Bucket='data-bucket', Key='large-dataset.csv')
lines = response['Body'].read().decode('utf-8').splitlines()
reader = csv.DictReader(lines)
for row in reader:
process_row(row) # Your processing logic
4. Use Transfer Acceleration Config
from boto3.s3.transfer import TransferConfig
# Optimized config for different scenarios
fast_config = TransferConfig(
multipart_threshold=50 * 1024 * 1024, # 50 MB
multipart_chunksize=25 * 1024 * 1024, # 25 MB chunks
max_concurrency=20, # 20 parallel parts
num_download_attempts=5, # Retry failed parts
use_threads=True
)
# For many small files
small_files_config = TransferConfig(
multipart_threshold=1024 * 1024 * 1024, # Effectively disable multipart
max_concurrency=1,
use_threads=False
)
Common Mistakes and Fixes
| Mistake | Symptom | Fix |
|---|---|---|
Missing endpoint_url |
Requests go to AWS instead of your provider | Always set endpoint_url for non-AWS endpoints |
| Wrong signature version | SignatureDoesNotMatch errors |
Use signature_version='s3v4' in Config |
| Virtual-hosted addressing | DNS resolution errors or 404s | Set s3={'addressing_style': 'path'} |
| Hardcoded credentials | Credentials leaked in source control | Use environment variables or AWS credential files |
No pagination on list_objects |
Only first 1000 objects returned | Use get_paginator('list_objects_v2') |
Uploading without ContentType |
Files download instead of displaying in browser | Set ContentType in ExtraArgs |
Not handling NoSuchKey |
Unhandled ClientError crashes |
Always catch ClientError and check the error code |
| Single-threaded bulk uploads | Very slow for many files | Use ThreadPoolExecutor or increase max_concurrency |
| Wrong region | AuthorizationHeaderMalformed errors |
Match the region your provider expects (e.g., fsn1 for DanubeData) |
| No CORS for browser uploads | Browser blocks presigned URL PUT requests | Configure CORS on the bucket with allowed origins and methods |
Full Working Example
Here's a complete, production-ready script that demonstrates all the key operations:
#!/usr/bin/env python3
"""
S3-Compatible Storage Manager for DanubeData
Demonstrates all key boto3 operations with proper error handling.
"""
import os
import sys
import json
import hashlib
import mimetypes
from datetime import datetime
from pathlib import Path
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from boto3.s3.transfer import TransferConfig
class S3Manager:
"""Production-ready S3-compatible storage manager."""
def __init__(self, endpoint_url=None, region=None, bucket=None):
self.endpoint_url = endpoint_url or os.environ.get(
'S3_ENDPOINT_URL', 'https://s3.danubedata.ro'
)
self.region = region or os.environ.get('S3_REGION', 'fsn1')
self.bucket = bucket or os.environ.get('S3_BUCKET', 'my-app')
self.client = boto3.client(
's3',
endpoint_url=self.endpoint_url,
region_name=self.region,
config=Config(
signature_version='s3v4',
s3={'addressing_style': 'path'},
retries={'max_attempts': 3, 'mode': 'adaptive'},
max_pool_connections=25
)
)
self.transfer_config = TransferConfig(
multipart_threshold=100 * 1024 * 1024,
multipart_chunksize=25 * 1024 * 1024,
max_concurrency=10,
use_threads=True
)
def upload(self, file_path, key=None, content_type=None):
"""Upload a file with automatic content type detection."""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
key = key or file_path.name
content_type = content_type or mimetypes.guess_type(str(file_path))[0] or 'application/octet-stream'
extra_args = {
'ContentType': content_type,
'Metadata': {
'original-name': file_path.name,
'uploaded-at': datetime.utcnow().isoformat(),
'checksum': self._file_md5(file_path)
}
}
try:
self.client.upload_file(
str(file_path), self.bucket, key,
Config=self.transfer_config,
ExtraArgs=extra_args
)
size = file_path.stat().st_size
print(f"Uploaded: s3://{self.bucket}/{key} ({size / 1024:.1f} KB)")
return key
except ClientError as e:
print(f"Upload failed: {e.response['Error']['Message']}")
raise
def download(self, key, destination=None):
"""Download a file from S3."""
destination = destination or Path('/tmp') / Path(key).name
try:
self.client.download_file(
self.bucket, key, str(destination),
Config=self.transfer_config
)
print(f"Downloaded: s3://{self.bucket}/{key} -> {destination}")
return destination
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print(f"Object not found: {key}")
else:
print(f"Download failed: {e.response['Error']['Message']}")
raise
def list_objects(self, prefix='', max_keys=None):
"""List objects with pagination."""
paginator = self.client.get_paginator('list_objects_v2')
objects = []
for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
for obj in page.get('Contents', []):
objects.append({
'key': obj['Key'],
'size': obj['Size'],
'modified': obj['LastModified'].isoformat()
})
if max_keys and len(objects) >= max_keys:
return objects
return objects
def get_presigned_url(self, key, expires=3600, method='get_object'):
"""Generate a presigned URL."""
return self.client.generate_presigned_url(
method,
Params={'Bucket': self.bucket, 'Key': key},
ExpiresIn=expires
)
def delete(self, key):
"""Delete an object."""
self.client.delete_object(Bucket=self.bucket, Key=key)
print(f"Deleted: s3://{self.bucket}/{key}")
def sync_directory(self, local_dir, prefix=''):
"""Sync a local directory to S3."""
local_dir = Path(local_dir)
uploaded = 0
skipped = 0
for file_path in local_dir.rglob('*'):
if file_path.is_file():
relative = file_path.relative_to(local_dir)
key = f"{prefix}/{relative}" if prefix else str(relative)
# Check if file already exists with same size
try:
head = self.client.head_object(Bucket=self.bucket, Key=key)
if head['ContentLength'] == file_path.stat().st_size:
skipped += 1
continue
except ClientError:
pass
self.upload(file_path, key)
uploaded += 1
print(f"
Sync complete: {uploaded} uploaded, {skipped} skipped")
@staticmethod
def _file_md5(file_path):
"""Calculate MD5 hash of a file."""
md5 = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
md5.update(chunk)
return md5.hexdigest()
def main():
"""Example usage of S3Manager."""
manager = S3Manager(
endpoint_url='https://s3.danubedata.ro',
region='fsn1',
bucket='my-app-data'
)
# Upload a file
manager.upload('report.pdf', 'reports/2026/march-report.pdf')
# List all reports
objects = manager.list_objects(prefix='reports/')
for obj in objects:
print(f" {obj['key']} - {obj['size']} bytes")
# Generate a download link
url = manager.get_presigned_url('reports/2026/march-report.pdf', expires=86400)
print(f"Download URL (24h): {url}")
# Sync a local directory
manager.sync_directory('./static-assets', prefix='assets')
if __name__ == '__main__':
main()
Ready to Get Started?
DanubeData Object Storage gives you S3-compatible storage with European data sovereignty at €3.99/month — including 1TB of storage and 1TB of egress traffic. No surprise bills, no hidden fees.
- S3-compatible API at
s3.danubedata.ro - Works with boto3, Django Storages, Flask, and every S3 tool
- European data centers (Falkenstein, Germany) for GDPR compliance
- Create access keys and start uploading in under 60 seconds