Files
vontor-cz/backend/account/views.py
Brunobrno 775709bd08 Migrate to global currency system in commerce app
Removed per-product currency in favor of a global site currency managed via SiteConfiguration. Updated models, views, templates, and Stripe integration to use the global currency. Added migration, management command for migration, and API endpoint for currency info. Improved permissions and filtering for orders, reviews, and carts. Expanded supported currencies in configuration.
2026-01-24 21:51:56 +01:00

421 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.contrib.auth import get_user_model, authenticate
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
from django.utils.encoding import force_bytes, force_str
from .serializers import *
from .permissions import *
from .models import CustomUser
from .tokens import *
from .tasks import send_password_reset_email_task, send_email_verification_task
from django.conf import settings
import logging
logger = logging.getLogger(__name__)
from .filters import UserFilter
from rest_framework import generics, permissions, status, viewsets
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenError, AuthenticationFailed
from django_filters.rest_framework import DjangoFilterBackend
from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiExample, OpenApiParameter
User = get_user_model()
#general user view API
from rest_framework_simplejwt.views import TokenObtainPairView
#---------------------------------------------TOKENY------------------------------------------------
# Custom Token obtaining view
@extend_schema(
tags=["account", "public"],
summary="Obtain JWT access and refresh tokens (cookie-based)",
description="Authenticate user and obtain JWT access and refresh tokens. You can use either email or username.",
request=CustomTokenObtainPairSerializer,
responses={
200: OpenApiResponse(response=CustomTokenObtainPairSerializer, description="Tokens returned successfully."),
401: OpenApiResponse(description="Invalid credentials or inactive user."),
},
)
class CookieTokenObtainPairView(TokenObtainPairView):
serializer_class = CustomTokenObtainPairSerializer
def post(self, request, *args, **kwargs):
response = super().post(request, *args, **kwargs)
# Získáme tokeny z odpovědi
access = response.data.get("access")
refresh = response.data.get("refresh")
if not access or not refresh:
return response # Např. při chybě přihlášení
jwt_settings = settings.SIMPLE_JWT
# Access token cookie
response.set_cookie(
key=jwt_settings.get("AUTH_COOKIE", "access_token"),
value=access,
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
max_age=5 * 60, # 5 minut
)
# Refresh token cookie
response.set_cookie(
key="refresh_token",
value=refresh,
httponly=True,
secure=not settings.DEBUG,
samesite="Lax",
path="/",
max_age=7 * 24 * 60 * 60, # 7 dní
)
return response
def validate(self, attrs):
username = attrs.get("username")
password = attrs.get("password")
# Přihlaš uživatele ručně
user = authenticate(request=self.context.get('request'), username=username, password=password)
if not user:
raise AuthenticationFailed("Špatné uživatelské jméno nebo heslo.")
if not user.is_active:
raise AuthenticationFailed("Uživatel je deaktivován.")
# Nastav validní uživatele (přebere další logiku ze SimpleJWT)
self.user = user
# Vrátí access a refresh token jako obvykle
return super().validate(attrs)
@extend_schema(
tags=["account", "public"],
summary="Refresh JWT token using cookie",
description="Refresh JWT access and refresh tokens using the refresh token stored in cookie.",
responses={
200: OpenApiResponse(description="Tokens refreshed successfully."),
400: OpenApiResponse(description="Refresh token cookie not found."),
401: OpenApiResponse(description="Invalid refresh token."),
},
)
class CookieTokenRefreshView(APIView):
def post(self, request):
refresh_token = request.COOKIES.get('refresh_token')
if not refresh_token:
return Response({"detail": "Refresh token cookie not found."}, status=status.HTTP_400_BAD_REQUEST)
try:
refresh = RefreshToken(refresh_token)
access_token = str(refresh.access_token)
new_refresh_token = str(refresh) # volitelně nový refresh token
response = Response({
"access": access_token,
"refresh": new_refresh_token,
})
# Nastav nové HttpOnly cookies
# Access token cookie (např. 5 minut platnost)
response.set_cookie(
"access_token",
access_token,
httponly=True,
secure=not settings.DEBUG,
samesite="Lax",
max_age=5 * 60,
path="/",
)
# Refresh token cookie (delší platnost, např. 7 dní)
response.set_cookie(
"refresh_token",
new_refresh_token,
httponly=True,
secure=not settings.DEBUG,
samesite="Lax",
max_age=7 * 24 * 60 * 60,
path="/",
)
return response
except TokenError:
return Response({"detail": "Invalid refresh token."}, status=status.HTTP_401_UNAUTHORIZED)
#---------------------------------------------LOGOUT------------------------------------------------
@extend_schema(
tags=["account", "public"],
summary="Logout user (delete access and refresh token cookies)",
description="Logs out the user by deleting access and refresh token cookies.",
responses={
200: OpenApiResponse(description="Logout successful."),
},
)
class LogoutView(APIView):
permission_classes = [AllowAny]
def post(self, request):
response = Response({"detail": "Logout successful"}, status=status.HTTP_200_OK)
# Smazání cookies
response.delete_cookie("access_token", path="/")
response.delete_cookie("refresh_token", path="/")
return response
#--------------------------------------------------------------------------------------------------------------
@extend_schema(
tags=["account"],
summary="List, retrieve, update, and delete users.",
description="Displays all users with filtering and ordering options. Requires authentication and appropriate role.",
responses={
200: OpenApiResponse(response=CustomUserSerializer, description="User(s) retrieved successfully."),
403: OpenApiResponse(description="Permission denied."),
},
)
class UserView(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = CustomUserSerializer
filter_backends = [DjangoFilterBackend]
filterset_class = UserFilter
# Require authentication and role permission
permission_classes = [IsAuthenticated]
class Meta:
model = CustomUser
extra_kwargs = {
"email": {"help_text": "Unikátní e-mailová adresa uživatele."},
"phone_number": {"help_text": "Telefonní číslo ve formátu +420123456789."},
"role": {"help_text": "Role uživatele určující jeho oprávnění v systému."},
"account_type": {"help_text": "Typ účtu firma nebo fyzická osoba."},
"email_verified": {"help_text": "Určuje, zda je e-mail ověřen."},
"create_time": {"help_text": "Datum a čas registrace uživatele (pouze pro čtení).", "read_only": True},
"var_symbol": {"help_text": "Variabilní symbol pro platby, pokud je vyžadován."},
"bank_account": {"help_text": "Číslo bankovního účtu uživatele."},
"ICO": {"help_text": "IČO firmy, pokud se jedná o firemní účet."},
"RC": {"help_text": "Rodné číslo pro fyzické osoby."},
"city": {"help_text": "Město trvalého pobytu / sídla."},
"street": {"help_text": "Ulice a číslo popisné."},
"PSC": {"help_text": "PSČ místa pobytu / sídla."},
"GDPR": {"help_text": "Souhlas se zpracováním osobních údajů."},
"is_active": {"help_text": "Stav aktivace uživatele."},
}
@extend_schema(
tags=["account"],
summary="Get permissions based on user role and action.",
description="Determines permissions for various actions based on user role and ownership.",
responses={
200: OpenApiResponse(description="Permissions determined successfully."),
403: OpenApiResponse(description="Permission denied."),
},
)
def get_permissions(self):
# Only admin can list or create users
if self.action in ['list', 'create']:
return [OnlyRolesAllowed("admin")()]
# Only admin or the user themselves can update or delete
elif self.action in ['update', 'partial_update', 'destroy']:
user = getattr(self, 'request', None) and getattr(self.request, 'user', None)
# Admins can modify any user
if user and getattr(user, 'is_authenticated', False) and getattr(user, 'role', None) == 'admin':
return [OnlyRolesAllowed("admin")()]
# Users can modify their own record
if user and getattr(user, 'is_authenticated', False) and self.kwargs.get('pk') and str(getattr(user, 'id', '')) == self.kwargs['pk']:
return [IsAuthenticated()]
# Fallback - deny access (prevents AttributeError for AnonymousUser)
return [OnlyRolesAllowed("admin")()]
# Users can only view their own profile, admins can view any profile
elif self.action == 'retrieve':
user = getattr(self, 'request', None) and getattr(self.request, 'user', None)
# Admins can view any user profile
if user and getattr(user, 'is_authenticated', False) and getattr(user, 'role', None) == 'admin':
return [IsAuthenticated()]
# Users can view their own profile
if user and getattr(user, 'is_authenticated', False) and self.kwargs.get('pk') and str(getattr(user, 'id', '')) == self.kwargs['pk']:
return [IsAuthenticated()]
# Deny access to other users' profiles
return [OnlyRolesAllowed("admin")()]
return super().get_permissions()
# Get current user data
@extend_schema(
tags=["account"],
summary="Get current authenticated user",
description="Returns details of the currently authenticated user based on JWT token or session.",
responses={
200: OpenApiResponse(response=CustomUserSerializer, description="Current user details."),
401: OpenApiResponse(description="Unauthorized, user is not authenticated."),
}
)
class CurrentUserView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
serializer = CustomUserSerializer(request.user)
return Response(serializer.data)
#------------------------------------------------REGISTRACE--------------------------------------------------------------
#1. registration API
@extend_schema(
tags=["account", "public"],
summary="Register a new user (company or individual)",
description="Register a new user (company or individual). The user will receive an email with a verification link.",
request=UserRegistrationSerializer,
responses={
201: OpenApiResponse(response=UserRegistrationSerializer, description="User registered successfully."),
400: OpenApiResponse(description="Invalid registration data."),
},
)
class UserRegistrationViewSet(ModelViewSet):
queryset = CustomUser.objects.all()
serializer_class = UserRegistrationSerializer
http_method_names = ['post']
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.save()
try:
send_email_verification_task.delay(user.id) # posílaní emailu pro potvrzení registrace - CELERY TASK
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_email_verification_task(user.id) # posílaní emailu pro potvrzení registrace
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
#2. confirming email
@extend_schema(
tags=["account", "public"],
summary="Verify user email via link",
description="Verify user email using the link with uid and token.",
parameters=[
OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."),
OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="User token from email link."),
],
responses={
200: OpenApiResponse(description="Email successfully verified."),
400: OpenApiResponse(description="Invalid or expired token."),
},
)
class EmailVerificationView(APIView):
def get(self, request, uidb64, token):
try:
uid = force_str(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (User.DoesNotExist, ValueError, TypeError):
return Response({"error": "Neplatný odkaz."}, status=400)
if account_activation_token.check_token(user, token):
user.email_verified = True
user.is_active = True # Aktivace uživatele po ověření e-mailu
user.save()
return Response({"detail": "E-mail byl úspěšně ověřen. Účet je aktivován."})
else:
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
#-------------------------------------------------END REGISTRACE-------------------------------------------------------------
#1. PasswordReset + send Email
@extend_schema(
tags=["account", "public"],
summary="Request password reset (send email)",
description="Request password reset by providing registered email. An email with instructions will be sent.",
request=PasswordResetRequestSerializer,
responses={
200: OpenApiResponse(description="Email with instructions sent."),
400: OpenApiResponse(description="Invalid email or request data."),
},
)
class PasswordResetRequestView(APIView):
def post(self, request):
serializer = PasswordResetRequestSerializer(data=request.data)
if serializer.is_valid():
try:
user = User.objects.get(email=serializer.validated_data['email'])
except User.DoesNotExist:
# Always return 200 even if user doesn't exist to avoid user enumeration
return Response({"detail": "E-mail s odkazem byl odeslán."})
try:
send_password_reset_email_task.delay(user.id) # posílaní emailu pro obnovení hesla - CELERY TASK
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_password_reset_email_task(user.id) # posílaní emailu pro obnovení hesla registrace
return Response({"detail": "E-mail s odkazem byl odeslán."})
return Response(serializer.errors, status=400)
#2. Confirming reset
@extend_schema(
tags=["account", "public"],
summary="Confirm password reset via token",
description="Confirm password reset using token from email.",
request=PasswordResetConfirmSerializer,
parameters=[
OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."),
OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="Password reset token from email link."),
],
responses={
200: OpenApiResponse(description="Password changed successfully."),
400: OpenApiResponse(description="Invalid token or request data."),
},
)
class PasswordResetConfirmView(APIView):
def post(self, request, uidb64, token):
try:
uid = force_str(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
return Response({"error": "Neplatný odkaz."}, status=400)
if not password_reset_token.check_token(user, token):
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
serializer = PasswordResetConfirmSerializer(data=request.data)
if serializer.is_valid():
user.set_password(serializer.validated_data['password'])
user.save()
return Response({"detail": "Heslo bylo úspěšně změněno."})
return Response(serializer.errors, status=400)