from django.contrib.auth import get_user_model, authenticate, login as django_login, logout as django_logout from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode from django.utils.encoding import force_bytes, force_str from .serializers import * from .permissions import * from .models import CustomUser from .tokens import * from .tasks import ( send_password_reset_email_task, send_email_verification_task, send_email_change_verification_task, send_email_change_notification_task, ) from django.conf import settings import logging logger = logging.getLogger(__name__) from .filters import UserFilter from rest_framework import generics, permissions, status, viewsets from rest_framework.response import Response from rest_framework_simplejwt.tokens import RefreshToken from rest_framework.views import APIView from rest_framework.viewsets import ModelViewSet from rest_framework.permissions import IsAuthenticated, AllowAny from rest_framework_simplejwt.tokens import RefreshToken from rest_framework_simplejwt.exceptions import TokenError, AuthenticationFailed, InvalidToken from django_filters.rest_framework import DjangoFilterBackend from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiExample, OpenApiParameter from vontor_cz.turnstile import verify_turnstile User = get_user_model() #general user view API from rest_framework_simplejwt.views import TokenObtainPairView #---------------------------------------------TOKENY------------------------------------------------ # Custom Token obtaining view @extend_schema( tags=["account", "public"], summary="Obtain JWT access and refresh tokens (cookie-based)", description="Authenticate user and obtain JWT access and refresh tokens. You can use either email or username.", request=CustomTokenObtainPairSerializer, responses={ 200: OpenApiResponse(response=CustomTokenObtainPairSerializer, description="Tokens returned successfully."), 401: OpenApiResponse(description="Invalid credentials or inactive user."), }, ) class CookieTokenObtainPairView(TokenObtainPairView): serializer_class = CustomTokenObtainPairSerializer def post(self, request, *args, **kwargs): turnstile_token = request.data.get("turnstile_token", "") remote_ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", "")) if not verify_turnstile(turnstile_token, remote_ip): return Response({"detail": "Ověření CAPTCHA selhalo."}, status=status.HTTP_400_BAD_REQUEST) serializer = self.get_serializer(data=request.data) try: serializer.is_valid(raise_exception=True) except TokenError as e: raise InvalidToken(e.args[0]) access = serializer.validated_data.get("access") refresh = serializer.validated_data.get("refresh") # Create a Django session so AuthMiddlewareStack authenticates WebSocket connections django_login(request, serializer.user, backend='django.contrib.auth.backends.ModelBackend') response = Response(serializer.validated_data, status=status.HTTP_200_OK) jwt_settings = settings.SIMPLE_JWT # Access token cookie response.set_cookie( key=jwt_settings.get("AUTH_COOKIE", "access_token"), value=access, httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True), secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG), samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"), path=jwt_settings.get("AUTH_COOKIE_PATH", "/"), max_age=5 * 60, # 5 minut ) # Refresh token cookie response.set_cookie( key="refresh_token", value=refresh, httponly=True, secure=not settings.DEBUG, samesite="Lax", path="/", max_age=7 * 24 * 60 * 60, # 7 dní ) return response @extend_schema( tags=["account", "public"], summary="Refresh JWT token using cookie", description="Refresh JWT access and refresh tokens using the refresh token stored in cookie.", responses={ 200: OpenApiResponse(description="Tokens refreshed successfully."), 400: OpenApiResponse(description="Refresh token cookie not found."), 401: OpenApiResponse(description="Invalid refresh token."), }, ) class CookieTokenRefreshView(APIView): def post(self, request): refresh_token = request.COOKIES.get('refresh_token') if not refresh_token: return Response({"detail": "Refresh token cookie not found."}, status=status.HTTP_400_BAD_REQUEST) try: refresh = RefreshToken(refresh_token) access_token = str(refresh.access_token) new_refresh_token = str(refresh) # volitelně nový refresh token response = Response({ "access": access_token, "refresh": new_refresh_token, }) # Nastav nové HttpOnly cookies # Access token cookie (např. 5 minut platnost) response.set_cookie( "access_token", access_token, httponly=True, secure=not settings.DEBUG, samesite="Lax", max_age=5 * 60, path="/", ) # Refresh token cookie (delší platnost, např. 7 dní) response.set_cookie( "refresh_token", new_refresh_token, httponly=True, secure=not settings.DEBUG, samesite="Lax", max_age=7 * 24 * 60 * 60, path="/", ) return response except TokenError: return Response({"detail": "Invalid refresh token."}, status=status.HTTP_401_UNAUTHORIZED) #---------------------------------------------LOGOUT------------------------------------------------ @extend_schema( tags=["account", "public"], summary="Logout user (delete access and refresh token cookies)", description="Logs out the user by deleting access and refresh token cookies.", responses={ 200: OpenApiResponse(description="Logout successful."), }, ) class LogoutView(APIView): permission_classes = [AllowAny] def post(self, request): django_logout(request) # destroy Django session (used for WebSocket auth) response = Response({"detail": "Logout successful"}, status=status.HTTP_200_OK) response.delete_cookie("access_token", path="/") response.delete_cookie("refresh_token", path="/") return response #-------------------------------------------------------------------------------------------------------------- @extend_schema( tags=["account"], summary="List, retrieve, update, and delete users.", description="Displays all users with filtering and ordering options. Requires authentication and appropriate role.", responses={ 200: OpenApiResponse(response=CustomUserSerializer, description="User(s) retrieved successfully."), 403: OpenApiResponse(description="Permission denied."), }, ) class UserView(viewsets.ModelViewSet): queryset = User.objects.all() serializer_class = CustomUserSerializer filter_backends = [DjangoFilterBackend] filterset_class = UserFilter # Require authentication and role permission permission_classes = [IsAuthenticated] class Meta: model = CustomUser extra_kwargs = { "email": {"help_text": "Unikátní e-mailová adresa uživatele."}, "phone_number": {"help_text": "Telefonní číslo ve formátu +420123456789."}, "role": {"help_text": "Role uživatele určující jeho oprávnění v systému."}, "account_type": {"help_text": "Typ účtu – firma nebo fyzická osoba."}, "email_verified": {"help_text": "Určuje, zda je e-mail ověřen."}, "create_time": {"help_text": "Datum a čas registrace uživatele (pouze pro čtení).", "read_only": True}, "var_symbol": {"help_text": "Variabilní symbol pro platby, pokud je vyžadován."}, "bank_account": {"help_text": "Číslo bankovního účtu uživatele."}, "ICO": {"help_text": "IČO firmy, pokud se jedná o firemní účet."}, "RC": {"help_text": "Rodné číslo pro fyzické osoby."}, "city": {"help_text": "Město trvalého pobytu / sídla."}, "street": {"help_text": "Ulice a číslo popisné."}, "PSC": {"help_text": "PSČ místa pobytu / sídla."}, "GDPR": {"help_text": "Souhlas se zpracováním osobních údajů."}, "is_active": {"help_text": "Stav aktivace uživatele."}, } @extend_schema( tags=["account"], summary="Get permissions based on user role and action.", description="Determines permissions for various actions based on user role and ownership.", responses={ 200: OpenApiResponse(description="Permissions determined successfully."), 403: OpenApiResponse(description="Permission denied."), }, ) def get_permissions(self): if self.action == 'create': return [OnlyRolesAllowed("admin")()] if self.action in ['list', 'retrieve']: return [IsAuthenticated()] if self.action in ['update', 'partial_update', 'destroy']: user = getattr(self, 'request', None) and getattr(self.request, 'user', None) if user and getattr(user, 'is_authenticated', False) and getattr(user, 'role', None) == 'admin': return [OnlyRolesAllowed("admin")()] lookup = self.kwargs.get('pk', '') if user and getattr(user, 'is_authenticated', False) and lookup and ( str(getattr(user, 'id', '')) == lookup ): return [IsAuthenticated()] return [OnlyRolesAllowed("admin")()] return super().get_permissions() def get_serializer_class(self): user = getattr(self.request, 'user', None) is_admin = user and (getattr(user, 'role', None) == 'admin' or getattr(user, 'is_superuser', False)) if self.action in ['retrieve', 'list'] and not is_admin: return PublicUserSerializer return CustomUserSerializer # Get current user data @extend_schema( tags=["account"], summary="Get current authenticated user", description="Returns details of the currently authenticated user based on JWT token or session.", responses={ 200: OpenApiResponse(response=CustomUserSerializer, description="Current user details."), 401: OpenApiResponse(description="Unauthorized, user is not authenticated."), } ) class CurrentUserView(APIView): permission_classes = [IsAuthenticated] def get(self, request): serializer = CustomUserSerializer(request.user) return Response(serializer.data) @extend_schema( tags=["account"], summary="Change password for the authenticated user", request=ChangePasswordSerializer, responses={ 200: OpenApiResponse(description="Password changed successfully."), 400: OpenApiResponse(description="Invalid current password or validation error."), }, ) class ChangePasswordView(APIView): permission_classes = [IsAuthenticated] def post(self, request): serializer = ChangePasswordSerializer(data=request.data) serializer.is_valid(raise_exception=True) user = request.user if not user.check_password(serializer.validated_data['current_password']): return Response({"current_password": "Nesprávné heslo."}, status=status.HTTP_400_BAD_REQUEST) user.set_password(serializer.validated_data['new_password']) user.save() return Response({"detail": "Heslo bylo úspěšně změněno."}) #------------------------------------------------REGISTRACE-------------------------------------------------------------- #1. registration API @extend_schema( tags=["account", "public"], summary="Register a new user (company or individual)", description="Register a new user (company or individual). The user will receive an email with a verification link.", request=UserRegistrationSerializer, responses={ 201: OpenApiResponse(response=UserRegistrationSerializer, description="User registered successfully."), 400: OpenApiResponse(description="Invalid registration data."), }, ) class UserRegistrationViewSet(ModelViewSet): queryset = CustomUser.objects.all() serializer_class = UserRegistrationSerializer http_method_names = ['post'] def create(self, request, *args, **kwargs): turnstile_token = request.data.get("turnstile_token", "") remote_ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", "")) if not verify_turnstile(turnstile_token, remote_ip): return Response({"detail": "Ověření CAPTCHA selhalo."}, status=status.HTTP_400_BAD_REQUEST) serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) user = serializer.save() try: send_email_verification_task.delay(user.id) # posílaní emailu pro potvrzení registrace - CELERY TASK except Exception as e: logger.error(f"Celery not available, using fallback. Error: {e}") send_email_verification_task(user.id) # posílaní emailu pro potvrzení registrace headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) #2. confirming email @extend_schema( tags=["account", "public"], summary="Verify user email via link", description="Verify user email using the link with uid and token.", parameters=[ OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."), OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="User token from email link."), ], responses={ 200: OpenApiResponse(description="Email successfully verified."), 400: OpenApiResponse(description="Invalid or expired token."), }, ) class EmailVerificationView(APIView): def get(self, request, uidb64, token): try: uid = force_str(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (User.DoesNotExist, ValueError, TypeError): return Response({"error": "Neplatný odkaz."}, status=400) if account_activation_token.check_token(user, token): user.email_verified = True user.is_active = True # Aktivace uživatele po ověření e-mailu user.save() return Response({"detail": "E-mail byl úspěšně ověřen. Účet je aktivován."}) else: return Response({"error": "Token je neplatný nebo expirovaný."}, status=400) #-------------------------------------------------END REGISTRACE------------------------------------------------------------- #1. PasswordReset + send Email @extend_schema( tags=["account", "public"], summary="Request password reset (send email)", description="Request password reset by providing registered email. An email with instructions will be sent.", request=PasswordResetRequestSerializer, responses={ 200: OpenApiResponse(description="Email with instructions sent."), 400: OpenApiResponse(description="Invalid email or request data."), }, ) class PasswordResetRequestView(APIView): def post(self, request): serializer = PasswordResetRequestSerializer(data=request.data) if serializer.is_valid(): try: user = User.objects.get(email=serializer.validated_data['email']) except User.DoesNotExist: # Always return 200 even if user doesn't exist to avoid user enumeration return Response({"detail": "E-mail s odkazem byl odeslán."}) try: send_password_reset_email_task.delay(user.id) # posílaní emailu pro obnovení hesla - CELERY TASK except Exception as e: logger.error(f"Celery not available, using fallback. Error: {e}") send_password_reset_email_task(user.id) # posílaní emailu pro obnovení hesla registrace return Response({"detail": "E-mail s odkazem byl odeslán."}) return Response(serializer.errors, status=400) #2. Confirming reset @extend_schema( tags=["account", "public"], summary="Confirm password reset via token", description="Confirm password reset using token from email.", request=PasswordResetConfirmSerializer, parameters=[ OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."), OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="Password reset token from email link."), ], responses={ 200: OpenApiResponse(description="Password changed successfully."), 400: OpenApiResponse(description="Invalid token or request data."), }, ) class PasswordResetConfirmView(APIView): def post(self, request, uidb64, token): try: uid = force_str(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (TypeError, ValueError, OverflowError, User.DoesNotExist): return Response({"error": "Neplatný odkaz."}, status=400) if not password_reset_token.check_token(user, token): return Response({"error": "Token je neplatný nebo expirovaný."}, status=400) serializer = PasswordResetConfirmSerializer(data=request.data) if serializer.is_valid(): user.set_password(serializer.validated_data['password']) user.save() return Response({"detail": "Heslo bylo úspěšně změněno."}) return Response(serializer.errors, status=400) @extend_schema( tags=["account"], summary="Resend email verification", description="Resends the verification email to the currently authenticated user. Limited to once per minute.", responses={ 200: OpenApiResponse(description="Verification email sent."), 400: OpenApiResponse(description="Email is already verified."), 429: OpenApiResponse(description="Rate limited — seconds_remaining returned."), }, ) class ResendEmailVerificationView(APIView): permission_classes = [IsAuthenticated] COOLDOWN = 60 # seconds def post(self, request): from django.core.cache import cache import time user = request.user if user.email_verified: return Response({"detail": "E-mail je již ověřen."}, status=status.HTTP_400_BAD_REQUEST) cache_key = f"resend_verification_{user.id}" sent_at = cache.get(cache_key) if sent_at is not None: remaining = int(self.COOLDOWN - (time.time() - sent_at)) if remaining > 0: return Response( {"detail": "Příliš mnoho pokusů.", "seconds_remaining": remaining}, status=status.HTTP_429_TOO_MANY_REQUESTS, ) cache.set(cache_key, time.time(), timeout=self.COOLDOWN) try: send_email_verification_task.delay(user.id) except Exception as e: logger.error(f"Celery not available, using fallback. Error: {e}") send_email_verification_task(user.id) return Response({"detail": "Ověřovací e-mail byl odeslán."}) @extend_schema( tags=["account"], summary="Request email address change", description="Validates current password + Turnstile, stores pending_email, sends confirmation link to the new address and a notification to the old one.", responses={ 200: OpenApiResponse(description="Confirmation email sent to new address."), 400: OpenApiResponse(description="Wrong password, duplicate email, or failed CAPTCHA."), }, ) class ChangeEmailView(APIView): permission_classes = [IsAuthenticated] def post(self, request): turnstile_token = request.data.get("turnstile_token", "") remote_ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", "")) if not verify_turnstile(turnstile_token, remote_ip): return Response({"detail": "Ověření CAPTCHA selhalo."}, status=status.HTTP_400_BAD_REQUEST) serializer = ChangeEmailSerializer(data=request.data) serializer.is_valid(raise_exception=True) user = request.user if not user.check_password(serializer.validated_data["current_password"]): return Response({"current_password": "Nesprávné heslo."}, status=status.HTTP_400_BAD_REQUEST) new_email = serializer.validated_data["new_email"] old_email = user.email user.pending_email = new_email uid = urlsafe_base64_encode(force_bytes(user.pk)) token = user.generate_email_verification_token(save=False) user.save(update_fields=["pending_email", "email_verification_token", "email_verification_sent_at"]) verify_url = f"{settings.FRONTEND_URL}/account/confirm-email-change/{uid}/{token}/" try: send_email_change_verification_task.delay(user.id, new_email, verify_url) send_email_change_notification_task.delay(user.id, old_email, new_email) except Exception as e: logger.error(f"Celery not available, using fallback. Error: {e}") send_email_change_verification_task(user.id, new_email, verify_url) send_email_change_notification_task(user.id, old_email, new_email) return Response({"detail": "Ověřovací e-mail byl odeslán na novou adresu."}) @extend_schema( tags=["account"], summary="Confirm email address change via link", description="Verifies uid + token from the confirmation email, swaps email to pending_email.", parameters=[ OpenApiParameter(name="uidb64", type=str, location=OpenApiParameter.PATH), OpenApiParameter(name="token", type=str, location=OpenApiParameter.PATH), ], responses={ 200: OpenApiResponse(description="Email changed successfully."), 400: OpenApiResponse(description="Invalid or expired link."), }, ) class ConfirmEmailChangeView(APIView): def get(self, request, uidb64, token): ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", "unknown")) try: uid = force_str(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (User.DoesNotExist, ValueError, TypeError): return Response({"error": "Neplatný odkaz."}, status=status.HTTP_400_BAD_REQUEST) if not user.pending_email: return Response({"error": "Žádná čekající změna e-mailu."}, status=status.HTTP_400_BAD_REQUEST) if not user.verify_email_token(token): return Response({"error": "Token je neplatný nebo expirovaný."}, status=status.HTTP_400_BAD_REQUEST) old_email = user.email user.email = user.pending_email user.pending_email = None user.email_verified = True user.save(update_fields=["email", "pending_email", "email_verified"]) logger.info("Email changed for user %s: %s → %s (IP: %s)", user.pk, old_email, user.email, ip) return Response({"detail": "E-mail byl úspěšně změněn."}) @extend_schema( tags=["account"], summary="Change username", description="Updates the username. Requires Turnstile. Limited to once per 30 days.", responses={ 200: OpenApiResponse(description="Username changed."), 400: OpenApiResponse(description="Duplicate name, invalid characters, or failed CAPTCHA."), 429: OpenApiResponse(description="Rate limited — days_remaining returned."), }, ) class ChangeUsernameView(APIView): permission_classes = [IsAuthenticated] COOLDOWN_DAYS = 30 def post(self, request): from django.core.cache import cache import time turnstile_token = request.data.get("turnstile_token", "") remote_ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", "")) if not verify_turnstile(turnstile_token, remote_ip): return Response({"detail": "Ověření CAPTCHA selhalo."}, status=status.HTTP_400_BAD_REQUEST) serializer = ChangeUsernameSerializer(data=request.data) serializer.is_valid(raise_exception=True) user = request.user cache_key = f"username_change_{user.id}" changed_at = cache.get(cache_key) if changed_at is not None: remaining_days = int(self.COOLDOWN_DAYS - (time.time() - changed_at) / 86400) if remaining_days > 0: return Response( {"detail": "Příliš brzy.", "days_remaining": remaining_days}, status=status.HTTP_429_TOO_MANY_REQUESTS, ) user.username = serializer.validated_data["new_username"] user.save(update_fields=["username"]) cache.set(cache_key, time.time(), timeout=self.COOLDOWN_DAYS * 86400) return Response({"detail": "Uživatelské jméno bylo změněno.", "username": user.username}) @extend_schema( tags=["account"], summary="Delete own account", description="Soft-deletes the authenticated user's account after password confirmation. Clears auth cookies.", responses={ 204: OpenApiResponse(description="Account deleted."), 400: OpenApiResponse(description="Wrong password."), }, ) class DeleteAccountView(APIView): permission_classes = [IsAuthenticated] def post(self, request): user = request.user turnstile_token = request.data.get("turnstile_token", "") remote_ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", "")) if not verify_turnstile(turnstile_token, remote_ip): return Response({"detail": "Ověření CAPTCHA selhalo."}, status=status.HTTP_400_BAD_REQUEST) password = request.data.get("current_password", "") if not password or not user.check_password(password): return Response({"current_password": "Nesprávné heslo."}, status=status.HTTP_400_BAD_REQUEST) logger.info("Account deletion requested for user %s (id=%s)", user.username, user.pk) django_logout(request) user.delete() response = Response(status=status.HTTP_204_NO_CONTENT) response.delete_cookie("access_token", path="/") response.delete_cookie("refresh_token", path="/") return response