gukgjzkgjhgjh
This commit is contained in:
@@ -1,4 +1,7 @@
|
||||
import os
|
||||
import math
|
||||
import hashlib
|
||||
import base64
|
||||
from io import BytesIO
|
||||
from PIL import Image
|
||||
from django.db import models
|
||||
@@ -8,44 +11,112 @@ import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebPImageField(models.ImageField):
|
||||
def _get_fernet():
|
||||
"""
|
||||
A custom ImageField that converts uploaded images to WebP automatically.
|
||||
Returns a Fernet (or MultiFernet) instance for encryption/decryption.
|
||||
|
||||
Inherits from models.ImageField (description: "Image").
|
||||
Accepts the same arguments:
|
||||
verbose_name, name, upload_to, storage,
|
||||
width_field, height_field, **kwargs
|
||||
Settings:
|
||||
FIELD_ENCRYPTION_KEY — single key (str or bytes), or a list of keys for rotation.
|
||||
First key in a list is used for new encrypts; all are tried
|
||||
for decrypts, so old values remain readable after a key change.
|
||||
Falls back to a SECRET_KEY-derived key when not set (development only).
|
||||
|
||||
Key rotation workflow:
|
||||
1. Add new key as the FIRST item in FIELD_ENCRYPTION_KEY list.
|
||||
2. Keep old key(s) as remaining items — old values still decrypt fine.
|
||||
3. Optionally run a migration command to re-encrypt all rows with the new key,
|
||||
then remove old keys from the list.
|
||||
"""
|
||||
from cryptography.fernet import Fernet, MultiFernet
|
||||
from django.conf import settings
|
||||
|
||||
raw_setting = getattr(settings, 'FIELD_ENCRYPTION_KEY', None)
|
||||
|
||||
if isinstance(raw_setting, (str, bytes)):
|
||||
# Single key
|
||||
key = raw_setting.encode() if isinstance(raw_setting, str) else raw_setting
|
||||
return Fernet(key)
|
||||
|
||||
# List of keys — first is active, rest are for decrypting old values
|
||||
fernet_keys = [
|
||||
Fernet(k.encode() if isinstance(k, str) else k)
|
||||
for k in raw_setting
|
||||
]
|
||||
return MultiFernet(fernet_keys)
|
||||
|
||||
|
||||
class WebPImageField(models.ImageField):
|
||||
"""Converts uploaded images to WebP on save. Drop-in replacement for ImageField."""
|
||||
|
||||
def pre_save(self, model_instance, add):
|
||||
file_obj = getattr(model_instance, self.attname)
|
||||
uploaded_file = getattr(model_instance, self.attname)
|
||||
|
||||
if file_obj and not file_obj._committed:
|
||||
self._convert_to_webp(file_obj)
|
||||
if uploaded_file and not uploaded_file._committed:
|
||||
self._convert_to_webp(uploaded_file)
|
||||
|
||||
return super().pre_save(model_instance, add)
|
||||
|
||||
def _convert_to_webp(self, file_obj):
|
||||
def _convert_to_webp(self, uploaded_file):
|
||||
try:
|
||||
file_obj.open()
|
||||
image = Image.open(file_obj)
|
||||
uploaded_file.open()
|
||||
image = Image.open(uploaded_file)
|
||||
|
||||
if image.format == 'WEBP':
|
||||
image.close()
|
||||
return
|
||||
|
||||
# Preserve transparency for palette/alpha modes
|
||||
if image.mode == 'P':
|
||||
image = image.convert('RGBA')
|
||||
|
||||
elif image.mode not in ('RGBA', 'LA'):
|
||||
image = image.convert('RGB')
|
||||
|
||||
image_io = BytesIO()
|
||||
image.save(image_io, format='WEBP', quality=85, optimize=True)
|
||||
webp_buffer = BytesIO()
|
||||
|
||||
image.save(webp_buffer, format='WEBP', quality=100, optimize=True)
|
||||
image.close()
|
||||
|
||||
new_filename = os.path.splitext(file_obj.name)[0] + '.webp'
|
||||
file_obj.save(new_filename, ContentFile(image_io.getvalue()), save=False)
|
||||
webp_filename = os.path.splitext(uploaded_file.name)[0] + '.webp'
|
||||
uploaded_file.save(webp_filename, ContentFile(webp_buffer.getvalue()), save=False)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"WebP conversion failed: {e}")
|
||||
except Exception as error:
|
||||
logger.error(f"WebP conversion failed: {error}")
|
||||
|
||||
|
||||
class EncryptedCharField(models.CharField):
|
||||
"""
|
||||
Transparently encrypts values at rest using Fernet (AES-128-CBC + HMAC-SHA256).
|
||||
Drop-in replacement for CharField — max_length is the plaintext limit.
|
||||
Set FIELD_ENCRYPTION_KEY in settings to a URL-safe base64 32-byte key
|
||||
(generate: from cryptography.fernet import Fernet; Fernet.generate_key()).
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
# DB column must fit the Fernet token: plaintext + 73 bytes overhead, base64-encoded
|
||||
plaintext_max_length = self.max_length or 200
|
||||
self._db_column_max_length = math.ceil((plaintext_max_length + 73) / 3) * 4
|
||||
|
||||
def db_type(self, connection):
|
||||
# Silently use the larger column size; migrations still show plaintext max_length
|
||||
return connection.data_types['CharField'] % {'max_length': self._db_column_max_length}
|
||||
|
||||
def from_db_value(self, value, expression, connection):
|
||||
if not value:
|
||||
return value
|
||||
try:
|
||||
return _get_fernet().decrypt(value.encode()).decode()
|
||||
except Exception:
|
||||
logger.warning("EncryptedCharField: decryption failed, returning raw value.")
|
||||
return value
|
||||
|
||||
def get_prep_value(self, value):
|
||||
if value is None or value == '':
|
||||
return super().get_prep_value(value)
|
||||
return _get_fernet().encrypt(value.encode()).decode()
|
||||
|
||||
def to_python(self, value):
|
||||
if value is None:
|
||||
return value
|
||||
return str(value)
|
||||
|
||||
Reference in New Issue
Block a user