This commit is contained in:
2025-10-01 18:37:59 +02:00
parent 85b035fd27
commit 696d0e61f1
46 changed files with 1750 additions and 0 deletions

421
backend/account/views.py Normal file
View File

@@ -0,0 +1,421 @@
from django.contrib.auth import get_user_model, authenticate
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
from django.utils.encoding import force_bytes, force_str
from .serializers import *
from .permissions import *
from .models import CustomUser
from .tokens import *
from .tasks import send_password_reset_email_task
from django.conf import settings
import logging
logger = logging.getLogger(__name__)
from .filters import UserFilter
from rest_framework import generics, permissions, status, viewsets
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenError, AuthenticationFailed
from django_filters.rest_framework import DjangoFilterBackend
from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiExample, OpenApiParameter
User = get_user_model()
#general user view API
from rest_framework_simplejwt.views import TokenObtainPairView
#---------------------------------------------TOKENY------------------------------------------------
# Custom Token obtaining view
@extend_schema(
tags=["Authentication"],
summary="Obtain JWT access and refresh tokens (cookie-based)",
description="Authenticate user and obtain JWT access and refresh tokens. You can use either email or username.",
request=CustomTokenObtainPairSerializer,
responses={
200: OpenApiResponse(response=CustomTokenObtainPairSerializer, description="Tokens returned successfully."),
401: OpenApiResponse(description="Invalid credentials or inactive user."),
},
)
class CookieTokenObtainPairView(TokenObtainPairView):
serializer_class = CustomTokenObtainPairSerializer
def post(self, request, *args, **kwargs):
response = super().post(request, *args, **kwargs)
# Získáme tokeny z odpovědi
access = response.data.get("access")
refresh = response.data.get("refresh")
if not access or not refresh:
return response # Např. při chybě přihlášení
jwt_settings = settings.SIMPLE_JWT
# Access token cookie
response.set_cookie(
key=jwt_settings.get("AUTH_COOKIE", "access_token"),
value=access,
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
max_age=5 * 60, # 5 minut
)
# Refresh token cookie
response.set_cookie(
key="refresh_token",
value=refresh,
httponly=True,
secure=not settings.DEBUG,
samesite="Lax",
path="/",
max_age=7 * 24 * 60 * 60, # 7 dní
)
return response
def validate(self, attrs):
username = attrs.get("username")
password = attrs.get("password")
# Přihlaš uživatele ručně
user = authenticate(request=self.context.get('request'), username=username, password=password)
if not user:
raise AuthenticationFailed("Špatné uživatelské jméno nebo heslo.")
if not user.is_active:
raise AuthenticationFailed("Uživatel je deaktivován.")
# Nastav validní uživatele (přebere další logiku ze SimpleJWT)
self.user = user
# Vrátí access a refresh token jako obvykle
return super().validate(attrs)
@extend_schema(
tags=["Authentication"],
summary="Refresh JWT token using cookie",
description="Refresh JWT access and refresh tokens using the refresh token stored in cookie.",
responses={
200: OpenApiResponse(description="Tokens refreshed successfully."),
400: OpenApiResponse(description="Refresh token cookie not found."),
401: OpenApiResponse(description="Invalid refresh token."),
},
)
class CookieTokenRefreshView(APIView):
def post(self, request):
refresh_token = request.COOKIES.get('refresh_token')
if not refresh_token:
return Response({"detail": "Refresh token cookie not found."}, status=status.HTTP_400_BAD_REQUEST)
try:
refresh = RefreshToken(refresh_token)
access_token = str(refresh.access_token)
new_refresh_token = str(refresh) # volitelně nový refresh token
response = Response({
"access": access_token,
"refresh": new_refresh_token,
})
# Nastav nové HttpOnly cookies
# Access token cookie (např. 5 minut platnost)
response.set_cookie(
"access_token",
access_token,
httponly=True,
secure=not settings.DEBUG,
samesite="Lax",
max_age=5 * 60,
path="/",
)
# Refresh token cookie (delší platnost, např. 7 dní)
response.set_cookie(
"refresh_token",
new_refresh_token,
httponly=True,
secure=not settings.DEBUG,
samesite="Lax",
max_age=7 * 24 * 60 * 60,
path="/",
)
return response
except TokenError:
return Response({"detail": "Invalid refresh token."}, status=status.HTTP_401_UNAUTHORIZED)
#---------------------------------------------LOGIN/LOGOUT------------------------------------------------
@extend_schema(
tags=["Authentication"],
summary="Logout user (delete access and refresh token cookies)",
description="Logs out the user by deleting access and refresh token cookies.",
responses={
200: OpenApiResponse(description="Logout successful."),
},
)
class LogoutView(APIView):
permission_classes = [AllowAny]
def post(self, request):
response = Response({"detail": "Logout successful"}, status=status.HTTP_200_OK)
# Smazání cookies
response.delete_cookie("access_token", path="/")
response.delete_cookie("refresh_token", path="/")
return response
#--------------------------------------------------------------------------------------------------------------
@extend_schema(
tags=["User"],
summary="List, retrieve, update, and delete users.",
description="Displays all users with filtering and ordering options. Requires authentication and appropriate role.",
responses={
200: OpenApiResponse(response=CustomUserSerializer, description="User(s) retrieved successfully."),
403: OpenApiResponse(description="Permission denied."),
},
)
class UserView(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = CustomUserSerializer
filter_backends = [DjangoFilterBackend]
filterset_class = UserFilter
# Require authentication and role permission
permission_classes = [IsAuthenticated]
class Meta:
model = CustomUser
extra_kwargs = {
"email": {"help_text": "Unikátní e-mailová adresa uživatele."},
"phone_number": {"help_text": "Telefonní číslo ve formátu +420123456789."},
"role": {"help_text": "Role uživatele určující jeho oprávnění v systému."},
"account_type": {"help_text": "Typ účtu firma nebo fyzická osoba."},
"email_verified": {"help_text": "Určuje, zda je e-mail ověřen."},
"create_time": {"help_text": "Datum a čas registrace uživatele (pouze pro čtení).", "read_only": True},
"var_symbol": {"help_text": "Variabilní symbol pro platby, pokud je vyžadován."},
"bank_account": {"help_text": "Číslo bankovního účtu uživatele."},
"ICO": {"help_text": "IČO firmy, pokud se jedná o firemní účet."},
"RC": {"help_text": "Rodné číslo pro fyzické osoby."},
"city": {"help_text": "Město trvalého pobytu / sídla."},
"street": {"help_text": "Ulice a číslo popisné."},
"PSC": {"help_text": "PSČ místa pobytu / sídla."},
"GDPR": {"help_text": "Souhlas se zpracováním osobních údajů."},
"is_active": {"help_text": "Stav aktivace uživatele."},
}
def get_permissions(self):
# Only admin can list or create users
if self.action in ['list', 'create']:
return [OnlyRolesAllowed("admin")()]
# Only admin or the user themselves can update or delete
elif self.action in ['update', 'partial_update', 'destroy']:
if self.request.user.role == 'admin':
return [OnlyRolesAllowed("admin")()]
elif self.kwargs.get('pk') and str(self.request.user.id) == self.kwargs['pk']:
return [IsAuthenticated()]
else:
# fallback - deny access
return [OnlyRolesAllowed("admin")()]
# Any authenticated user can retrieve (view) any user's profile
elif self.action == 'retrieve':
return [IsAuthenticated()]
return super().get_permissions()
# Get current user data
@extend_schema(
tags=["User"],
summary="Get current authenticated user",
description="Returns details of the currently authenticated user based on JWT token or session.",
responses={
200: OpenApiResponse(response=CustomUserSerializer, description="Current user details."),
401: OpenApiResponse(description="Unauthorized, user is not authenticated."),
}
)
class CurrentUserView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
serializer = CustomUserSerializer(request.user)
return Response(serializer.data)
#------------------------------------------------REGISTRACE--------------------------------------------------------------
#1. registration API
@extend_schema(
tags=["User Registration"],
summary="Register a new user (company or individual)",
description="Register a new user (company or individual). The user will receive an email with a verification link.",
request=UserRegistrationSerializer,
responses={
201: OpenApiResponse(response=UserRegistrationSerializer, description="User registered successfully."),
400: OpenApiResponse(description="Invalid registration data."),
},
)
class UserRegistrationViewSet(ModelViewSet):
queryset = CustomUser.objects.all()
serializer_class = UserRegistrationSerializer
http_method_names = ['post']
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.save()
try:
send_email_verification_task.delay(user.id) # posílaní emailu pro potvrzení registrace - CELERY TASK
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_email_verification_task(user.id) # posílaní emailu pro potvrzení registrace
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
#2. confirming email
@extend_schema(
tags=["User Registration"],
summary="Verify user email via link",
description="Verify user email using the link with uid and token.",
parameters=[
OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."),
OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="User token from email link."),
],
responses={
200: OpenApiResponse(description="Email successfully verified."),
400: OpenApiResponse(description="Invalid or expired token."),
},
)
class EmailVerificationView(APIView):
def get(self, request, uidb64, token):
try:
uid = force_str(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (User.DoesNotExist, ValueError, TypeError):
return Response({"error": "Neplatný odkaz."}, status=400)
if account_activation_token.check_token(user, token):
user.email_verified = True
user.save()
return Response({"detail": "E-mail byl úspěšně ověřen. Účet čeká na schválení."})
else:
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
#3. seller activation API (var_symbol)
@extend_schema(
tags=["User Registration"],
summary="Activate user and set variable symbol (admin/cityClerk only)",
description="Activate user and set variable symbol. Only accessible by admin or cityClerk.",
request=UserActivationSerializer,
responses={
200: OpenApiResponse(response=UserActivationSerializer, description="User activated successfully."),
400: OpenApiResponse(description="Invalid activation data."),
404: OpenApiResponse(description="User not found."),
},
)
class UserActivationViewSet(APIView):
permission_classes = [OnlyRolesAllowed('cityClerk', 'admin')]
def patch(self, request, *args, **kwargs):
serializer = UserActivationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.save()
try:
send_email_clerk_accepted_task.delay(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol - CELERY TASK
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_email_clerk_accepted_task(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol
return Response(serializer.to_representation(user), status=status.HTTP_200_OK)
#-------------------------------------------------END REGISTRACE-------------------------------------------------------------
#1. PasswordReset + send Email
@extend_schema(
tags=["User password reset"],
summary="Request password reset (send email)",
description="Request password reset by providing registered email. An email with instructions will be sent.",
request=PasswordResetRequestSerializer,
responses={
200: OpenApiResponse(description="Email with instructions sent."),
400: OpenApiResponse(description="Invalid email or request data."),
},
)
class PasswordResetRequestView(APIView):
def post(self, request):
serializer = PasswordResetRequestSerializer(data=request.data)
if serializer.is_valid():
try:
user = User.objects.get(email=serializer.validated_data['email'])
except User.DoesNotExist:
# Always return 200 even if user doesn't exist to avoid user enumeration
return Response({"detail": "E-mail s odkazem byl odeslán."})
try:
send_password_reset_email_task.delay(user.id) # posílaní emailu pro obnovení hesla - CELERY TASK
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_password_reset_email_task(user.id) # posílaní emailu pro obnovení hesla registrace
return Response({"detail": "E-mail s odkazem byl odeslán."})
return Response(serializer.errors, status=400)
#2. Confirming reset
@extend_schema(
tags=["User password reset"],
summary="Confirm password reset via token",
description="Confirm password reset using token from email.",
request=PasswordResetConfirmSerializer,
parameters=[
OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."),
OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="Password reset token from email link."),
],
responses={
200: OpenApiResponse(description="Password changed successfully."),
400: OpenApiResponse(description="Invalid token or request data."),
},
)
class PasswordResetConfirmView(APIView):
def post(self, request, uidb64, token):
try:
uid = force_str(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
return Response({"error": "Neplatný odkaz."}, status=400)
if not password_reset_token.check_token(user, token):
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
serializer = PasswordResetConfirmSerializer(data=request.data)
if serializer.is_valid():
user.set_password(serializer.validated_data['password'])
user.save()
return Response({"detail": "Heslo bylo úspěšně změněno."})
return Response(serializer.errors, status=400)