123 lines
4.3 KiB
Python
123 lines
4.3 KiB
Python
import os
|
|
import math
|
|
import hashlib
|
|
import base64
|
|
from io import BytesIO
|
|
from PIL import Image
|
|
from django.db import models
|
|
from django.core.files.base import ContentFile
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_fernet():
|
|
"""
|
|
Returns a Fernet (or MultiFernet) instance for encryption/decryption.
|
|
|
|
Settings:
|
|
FIELD_ENCRYPTION_KEY — single key (str or bytes), or a list of keys for rotation.
|
|
First key in a list is used for new encrypts; all are tried
|
|
for decrypts, so old values remain readable after a key change.
|
|
Falls back to a SECRET_KEY-derived key when not set (development only).
|
|
|
|
Key rotation workflow:
|
|
1. Add new key as the FIRST item in FIELD_ENCRYPTION_KEY list.
|
|
2. Keep old key(s) as remaining items — old values still decrypt fine.
|
|
3. Optionally run a migration command to re-encrypt all rows with the new key,
|
|
then remove old keys from the list.
|
|
"""
|
|
from cryptography.fernet import Fernet, MultiFernet
|
|
from django.conf import settings
|
|
|
|
raw_setting = getattr(settings, 'FIELD_ENCRYPTION_KEY', None)
|
|
|
|
if isinstance(raw_setting, (str, bytes)):
|
|
# Single key
|
|
key = raw_setting.encode() if isinstance(raw_setting, str) else raw_setting
|
|
return Fernet(key)
|
|
|
|
# List of keys — first is active, rest are for decrypting old values
|
|
fernet_keys = [
|
|
Fernet(k.encode() if isinstance(k, str) else k)
|
|
for k in raw_setting
|
|
]
|
|
return MultiFernet(fernet_keys)
|
|
|
|
|
|
class WebPImageField(models.ImageField):
|
|
"""Converts uploaded images to WebP on save. Drop-in replacement for ImageField."""
|
|
|
|
def pre_save(self, model_instance, add):
|
|
uploaded_file = getattr(model_instance, self.attname)
|
|
|
|
if uploaded_file and not uploaded_file._committed:
|
|
self._convert_to_webp(uploaded_file)
|
|
|
|
return super().pre_save(model_instance, add)
|
|
|
|
def _convert_to_webp(self, uploaded_file):
|
|
try:
|
|
uploaded_file.open()
|
|
image = Image.open(uploaded_file)
|
|
|
|
if image.format == 'WEBP':
|
|
image.close()
|
|
return
|
|
|
|
# Preserve transparency for palette/alpha modes
|
|
if image.mode == 'P':
|
|
image = image.convert('RGBA')
|
|
|
|
elif image.mode not in ('RGBA', 'LA'):
|
|
image = image.convert('RGB')
|
|
|
|
webp_buffer = BytesIO()
|
|
|
|
image.save(webp_buffer, format='WEBP', quality=100, optimize=True)
|
|
image.close()
|
|
|
|
webp_filename = os.path.splitext(uploaded_file.name)[0] + '.webp'
|
|
uploaded_file.save(webp_filename, ContentFile(webp_buffer.getvalue()), save=False)
|
|
|
|
except Exception as error:
|
|
logger.error(f"WebP conversion failed: {error}")
|
|
|
|
|
|
class EncryptedCharField(models.CharField):
|
|
"""
|
|
Transparently encrypts values at rest using Fernet (AES-128-CBC + HMAC-SHA256).
|
|
Drop-in replacement for CharField — max_length is the plaintext limit.
|
|
Set FIELD_ENCRYPTION_KEY in settings to a URL-safe base64 32-byte key
|
|
(generate: from cryptography.fernet import Fernet; Fernet.generate_key()).
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
# DB column must fit the Fernet token: plaintext + 73 bytes overhead, base64-encoded
|
|
plaintext_max_length = self.max_length or 200
|
|
self._db_column_max_length = math.ceil((plaintext_max_length + 73) / 3) * 4
|
|
|
|
def db_type(self, connection):
|
|
# Silently use the larger column size; migrations still show plaintext max_length
|
|
return connection.data_types['CharField'] % {'max_length': self._db_column_max_length}
|
|
|
|
def from_db_value(self, value, expression, connection):
|
|
if not value:
|
|
return value
|
|
try:
|
|
return _get_fernet().decrypt(value.encode()).decode()
|
|
except Exception:
|
|
logger.warning("EncryptedCharField: decryption failed, returning raw value.")
|
|
return value
|
|
|
|
def get_prep_value(self, value):
|
|
if value is None or value == '':
|
|
return super().get_prep_value(value)
|
|
return _get_fernet().encrypt(value.encode()).decode()
|
|
|
|
def to_python(self, value):
|
|
if value is None:
|
|
return value
|
|
return str(value)
|