Files
e-trznice/backend/account/views.py
2025-10-02 00:54:34 +02:00

409 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.contrib.auth import get_user_model, authenticate
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
from django.utils.encoding import force_bytes, force_str
from django.conf import settings
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import ensure_csrf_cookie
from .serializers import *
from .permissions import *
from .tasks import *
from .models import CustomUser
from .tokens import *
from .filters import UserFilter
from rest_framework import generics, permissions, status, viewsets
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenError, AuthenticationFailed
from django_filters.rest_framework import DjangoFilterBackend
from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiExample, OpenApiParameter
User = get_user_model()
#general user view API
import logging
logger = logging.getLogger(__name__)
from rest_framework_simplejwt.views import TokenObtainPairView
#---------------------------------------------TOKENY------------------------------------------------
# Custom Token obtaining view
@extend_schema(
tags=["api"],
summary="Obtain JWT access and refresh tokens (cookie-based)",
request=CustomTokenObtainPairSerializer,
description="Authentication - získaš Access a Refresh token... lze do <username> vložit E-mail nebo username"
)
@method_decorator(ensure_csrf_cookie, name="dispatch")
class CookieTokenObtainPairView(TokenObtainPairView):
permission_classes = [AllowAny]
serializer_class = CustomTokenObtainPairSerializer
def post(self, request, *args, **kwargs):
response = super().post(request, *args, **kwargs)
# Získáme tokeny z odpovědi
access = response.data.get("access")
refresh = response.data.get("refresh")
if not access or not refresh:
return response # Např. při chybě přihlášení
jwt_settings = settings.SIMPLE_JWT
# Access token cookie
response.set_cookie(
key=jwt_settings.get("AUTH_COOKIE", "access_token"),
value=access,
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
max_age=int(settings.ACCESS_TOKEN_LIFETIME.total_seconds()),
)
# Refresh token cookie
response.set_cookie(
key=jwt_settings.get("AUTH_COOKIE_REFRESH", "refresh_token"),
value=refresh,
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
max_age=int(settings.REFRESH_TOKEN_LIFETIME.total_seconds()),
)
return response
def validate(self, attrs):
username = attrs.get("username")
password = attrs.get("password")
# Přihlaš uživatele ručně
user = authenticate(request=self.context.get('request'), username=username, password=password)
if not user:
raise AuthenticationFailed("Špatné uživatelské jméno nebo heslo.")
if not user.is_active:
raise AuthenticationFailed("Uživatel je deaktivován.")
# Nastav validní uživatele (přebere další logiku ze SimpleJWT)
self.user = user
# Vrátí access a refresh token jako obvykle
return super().validate(attrs)
@extend_schema(
tags=["api"],
summary="Refresh JWT token using cookie",
description="Refresh JWT token"
)
@method_decorator(ensure_csrf_cookie, name="dispatch")
class CookieTokenRefreshView(APIView):
permission_classes = [AllowAny]
def post(self, request):
refresh_token = request.COOKIES.get('refresh_token') or request.data.get('refresh')
if not refresh_token:
return Response({"detail": "Refresh token cookie not found."}, status=status.HTTP_400_BAD_REQUEST)
try:
refresh = RefreshToken(refresh_token)
access_token = str(refresh.access_token)
new_refresh_token = str(refresh) # volitelně nový refresh token
response = Response({
"access": access_token,
"refresh": new_refresh_token,
})
jwt_settings = settings.SIMPLE_JWT
# Access token cookie
response.set_cookie(
key=jwt_settings.get("AUTH_COOKIE", "access_token"),
value=access_token,
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
max_age=int(5),
)
# Refresh token cookie
response.set_cookie(
key=jwt_settings.get("AUTH_COOKIE_REFRESH", "refresh_token"),
value=new_refresh_token,
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
max_age=int(settings.REFRESH_TOKEN_LIFETIME.total_seconds()),
)
return response
except TokenError:
logger.error("Invalid refresh token used.")
return Response({"detail": "Invalid refresh token."}, status=status.HTTP_401_UNAUTHORIZED)
#---------------------------------------------LOGIN/LOGOUT------------------------------------------------
@extend_schema(
tags=["api"],
summary="Logout user (delete access and refresh token cookies)",
description="Odhlásí uživatele smaže access a refresh token cookies"
)
class LogoutView(APIView):
permission_classes = [AllowAny]
def post(self, request):
response = Response({"detail": "Logout successful"}, status=status.HTTP_200_OK)
# Smazání cookies
response.delete_cookie("access_token", path="/")
response.delete_cookie("refresh_token", path="/")
return response
#--------------------------------------------------------------------------------------------------------------
@extend_schema(
tags=["User"],
responses={200: CustomUserSerializer},
description="Zobrazí všechny uživatele s možností filtrování a řazení.",
)
class UserView(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = CustomUserSerializer
filter_backends = [DjangoFilterBackend]
filterset_class = UserFilter
# Require authentication and role permission
permission_classes = [IsAuthenticated]
class Meta:
model = CustomUser
extra_kwargs = {
"email": {"help_text": "Unikátní e-mailová adresa uživatele."},
"phone_number": {"help_text": "Telefonní číslo ve formátu +420123456789."},
"role": {"help_text": "Role uživatele určující jeho oprávnění v systému."},
"account_type": {"help_text": "Typ účtu firma nebo fyzická osoba."},
"email_verified": {"help_text": "Určuje, zda je e-mail ověřen."},
"create_time": {"help_text": "Datum a čas registrace uživatele (pouze pro čtení).", "read_only": True},
"var_symbol": {"help_text": "Variabilní symbol pro platby, pokud je vyžadován."},
"bank_account": {"help_text": "Číslo bankovního účtu uživatele."},
"ICO": {"help_text": "IČO firmy, pokud se jedná o firemní účet."},
"RC": {"help_text": "Rodné číslo pro fyzické osoby."},
"city": {"help_text": "Město trvalého pobytu / sídla."},
"street": {"help_text": "Ulice a číslo popisné."},
"PSC": {"help_text": "PSČ místa pobytu / sídla."},
"GDPR": {"help_text": "Souhlas se zpracováním osobních údajů."},
"is_active": {"help_text": "Stav aktivace uživatele."},
}
def get_permissions(self):
if self.action in ['list', 'create']: # GET / POST /api/account/users/
return [OnlyRolesAllowed("cityClerk", "admin")()]
elif self.action in ['update', 'partial_update', 'destroy']: # PUT / PATCH / DELETE /api/account/users/{id}
if self.request.user.role in ['cityClerk', 'admin']:
return [OnlyRolesAllowed("cityClerk", "admin")()]
elif self.kwargs.get('pk') and str(self.request.user.id) == self.kwargs['pk']:
return [IsAuthenticated]
else:
# fallback - deny access
return [OnlyRolesAllowed("cityClerk", "admin")()] # or custom DenyAll()
elif self.action == 'retrieve': # GET /api/account/users/{id}
if self.request.user.role in ['cityClerk', 'admin']:
return [OnlyRolesAllowed("cityClerk", "admin")()]
elif self.kwargs.get('pk') and str(self.request.user.id) == self.kwargs['pk']:
return [IsAuthenticated()]
else:
return [OnlyRolesAllowed("cityClerk", "admin")()] # or a custom read-only self-access permission
return super().get_permissions()
# Get current user data
@extend_schema(
tags=["User"],
summary="Get current authenticated user",
description="Vrátí detail aktuálně přihlášeného uživatele podle JWT tokenu nebo session.",
responses={
200: OpenApiResponse(response=CustomUserSerializer),
401: OpenApiResponse(description="Unauthorized, uživatel není přihlášen"),
}
)
class CurrentUserView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
serializer = CustomUserSerializer(request.user)
return Response(serializer.data)
#------------------------------------------------REGISTRACE--------------------------------------------------------------
#1. registration API
@extend_schema(
tags=["User Registration"],
summary="Register a new user (company or individual)",
request=UserRegistrationSerializer,
responses={201: UserRegistrationSerializer},
description="1. Registrace nového uživatele(firmy). Uživateli přijde email s odkazem na ověření.",
)
class UserRegistrationViewSet(ModelViewSet):
queryset = CustomUser.objects.all()
serializer_class = UserRegistrationSerializer
http_method_names = ['post']
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.save()
try:
send_email_verification_task.delay(user.id) # posílaní emailu pro potvrzení registrace - CELERY TASK
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_email_verification_task(user.id) # posílaní emailu pro potvrzení registrace
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
#2. confirming email
@extend_schema(
tags=["User Registration"],
summary="Verify user email via link",
responses={
200: OpenApiResponse(description="Email úspěšně ověřen."),
400: OpenApiResponse(description="Chybný nebo expirovaný token.")
},
parameters=[
OpenApiParameter(name='uidb64', type=str, location='path', description="Token z E-mailu"),
OpenApiParameter(name='token', type=str, location='path', description="Token uživatele"),
],
description="2. Ověření emailu pomocí odkazu s uid a tokenem. (stačí jenom převzít a poslat)",
)
class EmailVerificationView(APIView):
def get(self, request, uidb64, token):
try:
uid = force_str(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (User.DoesNotExist, ValueError, TypeError):
return Response({"error": "Neplatný odkaz."}, status=400)
if account_activation_token.check_token(user, token):
user.email_verified = True
user.save()
return Response({"detail": "E-mail byl úspěšně ověřen. Účet čeká na schválení."})
else:
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
#3. seller activation API (var_symbol)
@extend_schema(
tags=["User Registration"],
summary="Activate user and set variable symbol (admin/cityClerk only)",
request=UserActivationSerializer,
responses={200: UserActivationSerializer},
description="3. Aktivace uživatele a zadání variabilního symbolu (pouze pro adminy a úředníky).",
)
class UserActivationViewSet(APIView):
permission_classes = [OnlyRolesAllowed('cityClerk', 'admin')]
def patch(self, request, *args, **kwargs):
serializer = UserActivationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.save()
try:
send_email_clerk_accepted_task.delay(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol - CELERY TASK
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_email_clerk_accepted_task(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol
return Response(serializer.to_representation(user), status=status.HTTP_200_OK)
#-------------------------------------------------END REGISTRACE-------------------------------------------------------------
#1. PasswordReset + send Email
@extend_schema(
tags=["User password reset"],
summary="Request password reset (send email)",
request=PasswordResetRequestSerializer,
responses={
200: OpenApiResponse(description="Odeslán email s instrukcemi."),
400: OpenApiResponse(description="Neplatný email.")
},
description="1(a). Požadavek na reset hesla - uživatel zadá svůj email."
)
class PasswordResetRequestView(APIView):
def post(self, request):
serializer = PasswordResetRequestSerializer(data=request.data)
if serializer.is_valid():
try:
user = User.objects.get(email=serializer.validated_data['email'])
except User.DoesNotExist:
# Always return 200 even if user doesn't exist to avoid user enumeration
return Response({"detail": "E-mail s odkazem byl odeslán."})
try:
send_password_reset_email_task.delay(user.id) # posílaní emailu pro obnovení hesla - CELERY TASK
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_password_reset_email_task(user.id) # posílaní emailu pro obnovení hesla registrace
return Response({"detail": "E-mail s odkazem byl odeslán."})
return Response(serializer.errors, status=400)
#2. Confirming reset
@extend_schema(
tags=["User password reset"],
summary="Confirm password reset via token",
request=PasswordResetConfirmSerializer,
parameters=[
OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH),
OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH),
],
responses={
200: OpenApiResponse(description="Heslo bylo změněno."),
400: OpenApiResponse(description="Chybný token nebo data.")
},
description="1(a). Potvrzení resetu hesla pomocí tokenu z emailu."
)
class PasswordResetConfirmView(APIView):
def post(self, request, uidb64, token):
try:
uid = force_str(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
return Response({"error": "Neplatný odkaz."}, status=400)
if not password_reset_token.check_token(user, token):
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
serializer = PasswordResetConfirmSerializer(data=request.data)
if serializer.is_valid():
user.set_password(serializer.validated_data['password'])
user.save()
return Response({"detail": "Heslo bylo úspěšně změněno."})
return Response(serializer.errors, status=400)