from django.contrib.auth import get_user_model, authenticate from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode from django.utils.encoding import force_bytes, force_str from .serializers import * from .permissions import * from .models import CustomUser from .tokens import * from .tasks import send_password_reset_email_task from django.conf import settings import logging logger = logging.getLogger(__name__) from .filters import UserFilter from rest_framework import generics, permissions, status, viewsets from rest_framework.response import Response from rest_framework_simplejwt.tokens import RefreshToken from rest_framework.views import APIView from rest_framework.viewsets import ModelViewSet from rest_framework.permissions import IsAuthenticated, AllowAny from rest_framework_simplejwt.tokens import RefreshToken from rest_framework_simplejwt.exceptions import TokenError, AuthenticationFailed from django_filters.rest_framework import DjangoFilterBackend from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiExample, OpenApiParameter User = get_user_model() #general user view API from rest_framework_simplejwt.views import TokenObtainPairView #---------------------------------------------TOKENY------------------------------------------------ # Custom Token obtaining view @extend_schema( tags=["Authentication"], summary="Obtain JWT access and refresh tokens (cookie-based)", description="Authenticate user and obtain JWT access and refresh tokens. You can use either email or username.", request=CustomTokenObtainPairSerializer, responses={ 200: OpenApiResponse(response=CustomTokenObtainPairSerializer, description="Tokens returned successfully."), 401: OpenApiResponse(description="Invalid credentials or inactive user."), }, ) class CookieTokenObtainPairView(TokenObtainPairView): serializer_class = CustomTokenObtainPairSerializer def post(self, request, *args, **kwargs): response = super().post(request, *args, **kwargs) # Získáme tokeny z odpovědi access = response.data.get("access") refresh = response.data.get("refresh") if not access or not refresh: return response # Např. při chybě přihlášení jwt_settings = settings.SIMPLE_JWT # Access token cookie response.set_cookie( key=jwt_settings.get("AUTH_COOKIE", "access_token"), value=access, httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True), secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG), samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"), path=jwt_settings.get("AUTH_COOKIE_PATH", "/"), max_age=5 * 60, # 5 minut ) # Refresh token cookie response.set_cookie( key="refresh_token", value=refresh, httponly=True, secure=not settings.DEBUG, samesite="Lax", path="/", max_age=7 * 24 * 60 * 60, # 7 dní ) return response def validate(self, attrs): username = attrs.get("username") password = attrs.get("password") # Přihlaš uživatele ručně user = authenticate(request=self.context.get('request'), username=username, password=password) if not user: raise AuthenticationFailed("Špatné uživatelské jméno nebo heslo.") if not user.is_active: raise AuthenticationFailed("Uživatel je deaktivován.") # Nastav validní uživatele (přebere další logiku ze SimpleJWT) self.user = user # Vrátí access a refresh token jako obvykle return super().validate(attrs) @extend_schema( tags=["Authentication"], summary="Refresh JWT token using cookie", description="Refresh JWT access and refresh tokens using the refresh token stored in cookie.", responses={ 200: OpenApiResponse(description="Tokens refreshed successfully."), 400: OpenApiResponse(description="Refresh token cookie not found."), 401: OpenApiResponse(description="Invalid refresh token."), }, ) class CookieTokenRefreshView(APIView): def post(self, request): refresh_token = request.COOKIES.get('refresh_token') if not refresh_token: return Response({"detail": "Refresh token cookie not found."}, status=status.HTTP_400_BAD_REQUEST) try: refresh = RefreshToken(refresh_token) access_token = str(refresh.access_token) new_refresh_token = str(refresh) # volitelně nový refresh token response = Response({ "access": access_token, "refresh": new_refresh_token, }) # Nastav nové HttpOnly cookies # Access token cookie (např. 5 minut platnost) response.set_cookie( "access_token", access_token, httponly=True, secure=not settings.DEBUG, samesite="Lax", max_age=5 * 60, path="/", ) # Refresh token cookie (delší platnost, např. 7 dní) response.set_cookie( "refresh_token", new_refresh_token, httponly=True, secure=not settings.DEBUG, samesite="Lax", max_age=7 * 24 * 60 * 60, path="/", ) return response except TokenError: return Response({"detail": "Invalid refresh token."}, status=status.HTTP_401_UNAUTHORIZED) #---------------------------------------------LOGOUT------------------------------------------------ @extend_schema( tags=["Authentication"], summary="Logout user (delete access and refresh token cookies)", description="Logs out the user by deleting access and refresh token cookies.", responses={ 200: OpenApiResponse(description="Logout successful."), }, ) class LogoutView(APIView): permission_classes = [AllowAny] def post(self, request): response = Response({"detail": "Logout successful"}, status=status.HTTP_200_OK) # Smazání cookies response.delete_cookie("access_token", path="/") response.delete_cookie("refresh_token", path="/") return response #-------------------------------------------------------------------------------------------------------------- @extend_schema( tags=["User"], summary="List, retrieve, update, and delete users.", description="Displays all users with filtering and ordering options. Requires authentication and appropriate role.", responses={ 200: OpenApiResponse(response=CustomUserSerializer, description="User(s) retrieved successfully."), 403: OpenApiResponse(description="Permission denied."), }, ) class UserView(viewsets.ModelViewSet): queryset = User.objects.all() serializer_class = CustomUserSerializer filter_backends = [DjangoFilterBackend] filterset_class = UserFilter # Require authentication and role permission permission_classes = [IsAuthenticated] class Meta: model = CustomUser extra_kwargs = { "email": {"help_text": "Unikátní e-mailová adresa uživatele."}, "phone_number": {"help_text": "Telefonní číslo ve formátu +420123456789."}, "role": {"help_text": "Role uživatele určující jeho oprávnění v systému."}, "account_type": {"help_text": "Typ účtu – firma nebo fyzická osoba."}, "email_verified": {"help_text": "Určuje, zda je e-mail ověřen."}, "create_time": {"help_text": "Datum a čas registrace uživatele (pouze pro čtení).", "read_only": True}, "var_symbol": {"help_text": "Variabilní symbol pro platby, pokud je vyžadován."}, "bank_account": {"help_text": "Číslo bankovního účtu uživatele."}, "ICO": {"help_text": "IČO firmy, pokud se jedná o firemní účet."}, "RC": {"help_text": "Rodné číslo pro fyzické osoby."}, "city": {"help_text": "Město trvalého pobytu / sídla."}, "street": {"help_text": "Ulice a číslo popisné."}, "PSC": {"help_text": "PSČ místa pobytu / sídla."}, "GDPR": {"help_text": "Souhlas se zpracováním osobních údajů."}, "is_active": {"help_text": "Stav aktivace uživatele."}, } def get_permissions(self): # Only admin can list or create users if self.action in ['list', 'create']: return [OnlyRolesAllowed("admin")()] # Only admin or the user themselves can update or delete elif self.action in ['update', 'partial_update', 'destroy']: user = getattr(self, 'request', None) and getattr(self.request, 'user', None) # Admins can modify any user if user and getattr(user, 'is_authenticated', False) and getattr(user, 'role', None) == 'admin': return [OnlyRolesAllowed("admin")()] # Users can modify their own record if user and getattr(user, 'is_authenticated', False) and self.kwargs.get('pk') and str(getattr(user, 'id', '')) == self.kwargs['pk']: return [IsAuthenticated()] # Fallback - deny access (prevents AttributeError for AnonymousUser) return [OnlyRolesAllowed("admin")()] # Any authenticated user can retrieve (view) any user's profile elif self.action == 'retrieve': return [IsAuthenticated()] return super().get_permissions() # Get current user data @extend_schema( tags=["User"], summary="Get current authenticated user", description="Returns details of the currently authenticated user based on JWT token or session.", responses={ 200: OpenApiResponse(response=CustomUserSerializer, description="Current user details."), 401: OpenApiResponse(description="Unauthorized, user is not authenticated."), } ) class CurrentUserView(APIView): permission_classes = [IsAuthenticated] def get(self, request): serializer = CustomUserSerializer(request.user) return Response(serializer.data) #------------------------------------------------REGISTRACE-------------------------------------------------------------- #1. registration API @extend_schema( tags=["User Registration"], summary="Register a new user (company or individual)", description="Register a new user (company or individual). The user will receive an email with a verification link.", request=UserRegistrationSerializer, responses={ 201: OpenApiResponse(response=UserRegistrationSerializer, description="User registered successfully."), 400: OpenApiResponse(description="Invalid registration data."), }, ) class UserRegistrationViewSet(ModelViewSet): queryset = CustomUser.objects.all() serializer_class = UserRegistrationSerializer http_method_names = ['post'] def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) user = serializer.save() try: send_email_verification_task.delay(user.id) # posílaní emailu pro potvrzení registrace - CELERY TASK except Exception as e: logger.error(f"Celery not available, using fallback. Error: {e}") send_email_verification_task(user.id) # posílaní emailu pro potvrzení registrace headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) #2. confirming email @extend_schema( tags=["User Registration"], summary="Verify user email via link", description="Verify user email using the link with uid and token.", parameters=[ OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."), OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="User token from email link."), ], responses={ 200: OpenApiResponse(description="Email successfully verified."), 400: OpenApiResponse(description="Invalid or expired token."), }, ) class EmailVerificationView(APIView): def get(self, request, uidb64, token): try: uid = force_str(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (User.DoesNotExist, ValueError, TypeError): return Response({"error": "Neplatný odkaz."}, status=400) if account_activation_token.check_token(user, token): user.email_verified = True user.save() return Response({"detail": "E-mail byl úspěšně ověřen. Účet čeká na schválení."}) else: return Response({"error": "Token je neplatný nebo expirovaný."}, status=400) #3. seller activation API (var_symbol) @extend_schema( tags=["User Registration"], summary="Activate user and set variable symbol (admin/cityClerk only)", description="Activate user and set variable symbol. Only accessible by admin or cityClerk.", request=UserActivationSerializer, responses={ 200: OpenApiResponse(response=UserActivationSerializer, description="User activated successfully."), 400: OpenApiResponse(description="Invalid activation data."), 404: OpenApiResponse(description="User not found."), }, ) class UserActivationViewSet(APIView): permission_classes = [OnlyRolesAllowed('cityClerk', 'admin')] def patch(self, request, *args, **kwargs): serializer = UserActivationSerializer(data=request.data) serializer.is_valid(raise_exception=True) user = serializer.save() try: send_email_clerk_accepted_task.delay(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol - CELERY TASK except Exception as e: logger.error(f"Celery not available, using fallback. Error: {e}") send_email_clerk_accepted_task(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol return Response(serializer.to_representation(user), status=status.HTTP_200_OK) #-------------------------------------------------END REGISTRACE------------------------------------------------------------- #1. PasswordReset + send Email @extend_schema( tags=["User password reset"], summary="Request password reset (send email)", description="Request password reset by providing registered email. An email with instructions will be sent.", request=PasswordResetRequestSerializer, responses={ 200: OpenApiResponse(description="Email with instructions sent."), 400: OpenApiResponse(description="Invalid email or request data."), }, ) class PasswordResetRequestView(APIView): def post(self, request): serializer = PasswordResetRequestSerializer(data=request.data) if serializer.is_valid(): try: user = User.objects.get(email=serializer.validated_data['email']) except User.DoesNotExist: # Always return 200 even if user doesn't exist to avoid user enumeration return Response({"detail": "E-mail s odkazem byl odeslán."}) try: send_password_reset_email_task.delay(user.id) # posílaní emailu pro obnovení hesla - CELERY TASK except Exception as e: logger.error(f"Celery not available, using fallback. Error: {e}") send_password_reset_email_task(user.id) # posílaní emailu pro obnovení hesla registrace return Response({"detail": "E-mail s odkazem byl odeslán."}) return Response(serializer.errors, status=400) #2. Confirming reset @extend_schema( tags=["User password reset"], summary="Confirm password reset via token", description="Confirm password reset using token from email.", request=PasswordResetConfirmSerializer, parameters=[ OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."), OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="Password reset token from email link."), ], responses={ 200: OpenApiResponse(description="Password changed successfully."), 400: OpenApiResponse(description="Invalid token or request data."), }, ) class PasswordResetConfirmView(APIView): def post(self, request, uidb64, token): try: uid = force_str(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (TypeError, ValueError, OverflowError, User.DoesNotExist): return Response({"error": "Neplatný odkaz."}, status=400) if not password_reset_token.check_token(user, token): return Response({"error": "Token je neplatný nebo expirovaný."}, status=400) serializer = PasswordResetConfirmSerializer(data=request.data) if serializer.is_valid(): user.set_password(serializer.validated_data['password']) user.save() return Response({"detail": "Heslo bylo úspěšně změněno."}) return Response(serializer.errors, status=400)