from django.contrib.auth import get_user_model, authenticate from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode from django.utils.encoding import force_bytes, force_str from django.conf import settings from django.utils.decorators import method_decorator from django.views.decorators.csrf import ensure_csrf_cookie from .serializers import * from .permissions import * from .tasks import * from .models import CustomUser from .tokens import * from .filters import UserFilter from rest_framework import generics, permissions, status, viewsets from rest_framework.response import Response from rest_framework_simplejwt.tokens import RefreshToken from rest_framework.views import APIView from rest_framework.viewsets import ModelViewSet from rest_framework.permissions import IsAuthenticated, AllowAny from rest_framework_simplejwt.tokens import RefreshToken from rest_framework_simplejwt.exceptions import TokenError, AuthenticationFailed from django_filters.rest_framework import DjangoFilterBackend from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiExample, OpenApiParameter User = get_user_model() #general user view API import logging logger = logging.getLogger(__name__) from rest_framework_simplejwt.views import TokenObtainPairView #---------------------------------------------TOKENY------------------------------------------------ # Custom Token obtaining view @extend_schema( tags=["api"], summary="Obtain JWT access and refresh tokens (cookie-based)", request=CustomTokenObtainPairSerializer, description="Authentication - získaš Access a Refresh token... lze do vložit E-mail nebo username" ) @method_decorator(ensure_csrf_cookie, name="dispatch") class CookieTokenObtainPairView(TokenObtainPairView): permission_classes = [AllowAny] serializer_class = CustomTokenObtainPairSerializer def post(self, request, *args, **kwargs): response = super().post(request, *args, **kwargs) # Získáme tokeny z odpovědi access = response.data.get("access") refresh = response.data.get("refresh") if not access or not refresh: return response # Např. při chybě přihlášení jwt_settings = settings.SIMPLE_JWT # Access token cookie response.set_cookie( key=jwt_settings.get("AUTH_COOKIE", "access_token"), value=access, httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True), secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG), samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"), path=jwt_settings.get("AUTH_COOKIE_PATH", "/"), max_age=int(settings.ACCESS_TOKEN_LIFETIME.total_seconds()), ) # Refresh token cookie response.set_cookie( key=jwt_settings.get("AUTH_COOKIE_REFRESH", "refresh_token"), value=refresh, httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True), secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG), samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"), path=jwt_settings.get("AUTH_COOKIE_PATH", "/"), max_age=int(settings.REFRESH_TOKEN_LIFETIME.total_seconds()), ) return response def validate(self, attrs): username = attrs.get("username") password = attrs.get("password") # Přihlaš uživatele ručně user = authenticate(request=self.context.get('request'), username=username, password=password) if not user: raise AuthenticationFailed("Špatné uživatelské jméno nebo heslo.") if not user.is_active: raise AuthenticationFailed("Uživatel je deaktivován.") # Nastav validní uživatele (přebere další logiku ze SimpleJWT) self.user = user # Vrátí access a refresh token jako obvykle return super().validate(attrs) @extend_schema( tags=["api"], summary="Refresh JWT token using cookie", description="Refresh JWT token" ) @method_decorator(ensure_csrf_cookie, name="dispatch") class CookieTokenRefreshView(APIView): permission_classes = [AllowAny] def post(self, request): refresh_token = request.COOKIES.get('refresh_token') or request.data.get('refresh') if not refresh_token: return Response({"detail": "Refresh token cookie not found."}, status=status.HTTP_400_BAD_REQUEST) try: refresh = RefreshToken(refresh_token) access_token = str(refresh.access_token) new_refresh_token = str(refresh) # volitelně nový refresh token response = Response({ "access": access_token, "refresh": new_refresh_token, }) jwt_settings = settings.SIMPLE_JWT # Access token cookie response.set_cookie( key=jwt_settings.get("AUTH_COOKIE", "access_token"), value=access_token, httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True), secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG), samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"), path=jwt_settings.get("AUTH_COOKIE_PATH", "/"), max_age=int(5), ) # Refresh token cookie response.set_cookie( key=jwt_settings.get("AUTH_COOKIE_REFRESH", "refresh_token"), value=new_refresh_token, httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True), secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG), samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"), path=jwt_settings.get("AUTH_COOKIE_PATH", "/"), max_age=int(settings.REFRESH_TOKEN_LIFETIME.total_seconds()), ) return response except TokenError: logger.error("Invalid refresh token used.") return Response({"detail": "Invalid refresh token."}, status=status.HTTP_401_UNAUTHORIZED) #---------------------------------------------LOGIN/LOGOUT------------------------------------------------ @extend_schema( tags=["api"], summary="Logout user (delete access and refresh token cookies)", description="Odhlásí uživatele – smaže access a refresh token cookies" ) class LogoutView(APIView): permission_classes = [AllowAny] def post(self, request): response = Response({"detail": "Logout successful"}, status=status.HTTP_200_OK) # Smazání cookies response.delete_cookie("access_token", path="/") response.delete_cookie("refresh_token", path="/") return response #-------------------------------------------------------------------------------------------------------------- @extend_schema( tags=["User"], responses={200: CustomUserSerializer}, description="Zobrazí všechny uživatele s možností filtrování a řazení.", ) class UserView(viewsets.ModelViewSet): queryset = User.objects.all() serializer_class = CustomUserSerializer filter_backends = [DjangoFilterBackend] filterset_class = UserFilter # Require authentication and role permission permission_classes = [IsAuthenticated] class Meta: model = CustomUser extra_kwargs = { "email": {"help_text": "Unikátní e-mailová adresa uživatele."}, "phone_number": {"help_text": "Telefonní číslo ve formátu +420123456789."}, "role": {"help_text": "Role uživatele určující jeho oprávnění v systému."}, "account_type": {"help_text": "Typ účtu – firma nebo fyzická osoba."}, "email_verified": {"help_text": "Určuje, zda je e-mail ověřen."}, "create_time": {"help_text": "Datum a čas registrace uživatele (pouze pro čtení).", "read_only": True}, "var_symbol": {"help_text": "Variabilní symbol pro platby, pokud je vyžadován."}, "bank_account": {"help_text": "Číslo bankovního účtu uživatele."}, "ICO": {"help_text": "IČO firmy, pokud se jedná o firemní účet."}, "RC": {"help_text": "Rodné číslo pro fyzické osoby."}, "city": {"help_text": "Město trvalého pobytu / sídla."}, "street": {"help_text": "Ulice a číslo popisné."}, "PSC": {"help_text": "PSČ místa pobytu / sídla."}, "GDPR": {"help_text": "Souhlas se zpracováním osobních údajů."}, "is_active": {"help_text": "Stav aktivace uživatele."}, } def get_permissions(self): if self.action in ['list', 'create']: # GET / POST /api/account/users/ return [OnlyRolesAllowed("cityClerk", "admin")()] elif self.action in ['update', 'partial_update', 'destroy']: # PUT / PATCH / DELETE /api/account/users/{id} if self.request.user.role in ['cityClerk', 'admin']: return [OnlyRolesAllowed("cityClerk", "admin")()] elif self.kwargs.get('pk') and str(self.request.user.id) == self.kwargs['pk']: return [IsAuthenticated] else: # fallback - deny access return [OnlyRolesAllowed("cityClerk", "admin")()] # or custom DenyAll() elif self.action == 'retrieve': # GET /api/account/users/{id} if self.request.user.role in ['cityClerk', 'admin']: return [OnlyRolesAllowed("cityClerk", "admin")()] elif self.kwargs.get('pk') and str(self.request.user.id) == self.kwargs['pk']: return [IsAuthenticated()] else: return [OnlyRolesAllowed("cityClerk", "admin")()] # or a custom read-only self-access permission return super().get_permissions() # Get current user data @extend_schema( tags=["User"], summary="Get current authenticated user", description="Vrátí detail aktuálně přihlášeného uživatele podle JWT tokenu nebo session.", responses={ 200: OpenApiResponse(response=CustomUserSerializer), 401: OpenApiResponse(description="Unauthorized, uživatel není přihlášen"), } ) class CurrentUserView(APIView): permission_classes = [IsAuthenticated] def get(self, request): serializer = CustomUserSerializer(request.user) return Response(serializer.data) #------------------------------------------------REGISTRACE-------------------------------------------------------------- #1. registration API @extend_schema( tags=["User Registration"], summary="Register a new user (company or individual)", request=UserRegistrationSerializer, responses={201: UserRegistrationSerializer}, description="1. Registrace nového uživatele(firmy). Uživateli přijde email s odkazem na ověření.", ) class UserRegistrationViewSet(ModelViewSet): queryset = CustomUser.objects.all() serializer_class = UserRegistrationSerializer http_method_names = ['post'] def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) user = serializer.save() try: send_email_verification_task.delay(user.id) # posílaní emailu pro potvrzení registrace - CELERY TASK except Exception as e: logger.error(f"Celery not available, using fallback. Error: {e}") send_email_verification_task(user.id) # posílaní emailu pro potvrzení registrace headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) #2. confirming email @extend_schema( tags=["User Registration"], summary="Verify user email via link", responses={ 200: OpenApiResponse(description="Email úspěšně ověřen."), 400: OpenApiResponse(description="Chybný nebo expirovaný token.") }, parameters=[ OpenApiParameter(name='uidb64', type=str, location='path', description="Token z E-mailu"), OpenApiParameter(name='token', type=str, location='path', description="Token uživatele"), ], description="2. Ověření emailu pomocí odkazu s uid a tokenem. (stačí jenom převzít a poslat)", ) class EmailVerificationView(APIView): def get(self, request, uidb64, token): try: uid = force_str(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (User.DoesNotExist, ValueError, TypeError): return Response({"error": "Neplatný odkaz."}, status=400) if account_activation_token.check_token(user, token): user.email_verified = True user.save() return Response({"detail": "E-mail byl úspěšně ověřen. Účet čeká na schválení."}) else: return Response({"error": "Token je neplatný nebo expirovaný."}, status=400) #3. seller activation API (var_symbol) @extend_schema( tags=["User Registration"], summary="Activate user and set variable symbol (admin/cityClerk only)", request=UserActivationSerializer, responses={200: UserActivationSerializer}, description="3. Aktivace uživatele a zadání variabilního symbolu (pouze pro adminy a úředníky).", ) class UserActivationViewSet(APIView): permission_classes = [OnlyRolesAllowed('cityClerk', 'admin')] def patch(self, request, *args, **kwargs): serializer = UserActivationSerializer(data=request.data) serializer.is_valid(raise_exception=True) user = serializer.save() try: send_email_clerk_accepted_task.delay(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol - CELERY TASK except Exception as e: logger.error(f"Celery not available, using fallback. Error: {e}") send_email_clerk_accepted_task(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol return Response(serializer.to_representation(user), status=status.HTTP_200_OK) #-------------------------------------------------END REGISTRACE------------------------------------------------------------- #1. PasswordReset + send Email @extend_schema( tags=["User password reset"], summary="Request password reset (send email)", request=PasswordResetRequestSerializer, responses={ 200: OpenApiResponse(description="Odeslán email s instrukcemi."), 400: OpenApiResponse(description="Neplatný email.") }, description="1(a). Požadavek na reset hesla - uživatel zadá svůj email." ) class PasswordResetRequestView(APIView): def post(self, request): serializer = PasswordResetRequestSerializer(data=request.data) if serializer.is_valid(): try: user = User.objects.get(email=serializer.validated_data['email']) except User.DoesNotExist: # Always return 200 even if user doesn't exist to avoid user enumeration return Response({"detail": "E-mail s odkazem byl odeslán."}) try: send_password_reset_email_task.delay(user.id) # posílaní emailu pro obnovení hesla - CELERY TASK except Exception as e: logger.error(f"Celery not available, using fallback. Error: {e}") send_password_reset_email_task(user.id) # posílaní emailu pro obnovení hesla registrace return Response({"detail": "E-mail s odkazem byl odeslán."}) return Response(serializer.errors, status=400) #2. Confirming reset @extend_schema( tags=["User password reset"], summary="Confirm password reset via token", request=PasswordResetConfirmSerializer, parameters=[ OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH), OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH), ], responses={ 200: OpenApiResponse(description="Heslo bylo změněno."), 400: OpenApiResponse(description="Chybný token nebo data.") }, description="1(a). Potvrzení resetu hesla pomocí tokenu z emailu." ) class PasswordResetConfirmView(APIView): def post(self, request, uidb64, token): try: uid = force_str(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (TypeError, ValueError, OverflowError, User.DoesNotExist): return Response({"error": "Neplatný odkaz."}, status=400) if not password_reset_token.check_token(user, token): return Response({"error": "Token je neplatný nebo expirovaný."}, status=400) serializer = PasswordResetConfirmSerializer(data=request.data) if serializer.is_valid(): user.set_password(serializer.validated_data['password']) user.save() return Response({"detail": "Heslo bylo úspěšně změněno."}) return Response(serializer.errors, status=400)