409 lines
17 KiB
Python
409 lines
17 KiB
Python
from django.contrib.auth import get_user_model, authenticate
|
||
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
|
||
from django.utils.encoding import force_bytes, force_str
|
||
from django.conf import settings
|
||
from django.utils.decorators import method_decorator
|
||
from django.views.decorators.csrf import ensure_csrf_cookie
|
||
|
||
from .serializers import *
|
||
from .permissions import *
|
||
from .tasks import *
|
||
from .models import CustomUser
|
||
from .tokens import *
|
||
from .filters import UserFilter
|
||
|
||
from rest_framework import generics, permissions, status, viewsets
|
||
from rest_framework.response import Response
|
||
from rest_framework_simplejwt.tokens import RefreshToken
|
||
from rest_framework.views import APIView
|
||
from rest_framework.viewsets import ModelViewSet
|
||
from rest_framework.permissions import IsAuthenticated, AllowAny
|
||
|
||
from rest_framework_simplejwt.tokens import RefreshToken
|
||
from rest_framework_simplejwt.exceptions import TokenError, AuthenticationFailed
|
||
from django_filters.rest_framework import DjangoFilterBackend
|
||
|
||
from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiExample, OpenApiParameter
|
||
|
||
|
||
User = get_user_model()
|
||
|
||
#general user view API
|
||
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
from rest_framework_simplejwt.views import TokenObtainPairView
|
||
|
||
#---------------------------------------------TOKENY------------------------------------------------
|
||
|
||
# Custom Token obtaining view
|
||
@extend_schema(
|
||
tags=["api"],
|
||
summary="Obtain JWT access and refresh tokens (cookie-based)",
|
||
request=CustomTokenObtainPairSerializer,
|
||
description="Authentication - získaš Access a Refresh token... lze do <username> vložit E-mail nebo username"
|
||
)
|
||
@method_decorator(ensure_csrf_cookie, name="dispatch")
|
||
class CookieTokenObtainPairView(TokenObtainPairView):
|
||
permission_classes = [AllowAny]
|
||
serializer_class = CustomTokenObtainPairSerializer
|
||
|
||
|
||
def post(self, request, *args, **kwargs):
|
||
response = super().post(request, *args, **kwargs)
|
||
|
||
# Získáme tokeny z odpovědi
|
||
access = response.data.get("access")
|
||
refresh = response.data.get("refresh")
|
||
|
||
if not access or not refresh:
|
||
return response # Např. při chybě přihlášení
|
||
|
||
jwt_settings = settings.SIMPLE_JWT
|
||
|
||
# Access token cookie
|
||
response.set_cookie(
|
||
key=jwt_settings.get("AUTH_COOKIE", "access_token"),
|
||
value=access,
|
||
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
|
||
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
|
||
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
|
||
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
|
||
max_age=int(settings.ACCESS_TOKEN_LIFETIME.total_seconds()),
|
||
)
|
||
|
||
# Refresh token cookie
|
||
response.set_cookie(
|
||
key=jwt_settings.get("AUTH_COOKIE_REFRESH", "refresh_token"),
|
||
value=refresh,
|
||
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
|
||
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
|
||
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
|
||
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
|
||
max_age=int(settings.REFRESH_TOKEN_LIFETIME.total_seconds()),
|
||
)
|
||
|
||
return response
|
||
|
||
def validate(self, attrs):
|
||
username = attrs.get("username")
|
||
password = attrs.get("password")
|
||
|
||
# Přihlaš uživatele ručně
|
||
user = authenticate(request=self.context.get('request'), username=username, password=password)
|
||
|
||
if not user:
|
||
raise AuthenticationFailed("Špatné uživatelské jméno nebo heslo.")
|
||
|
||
if not user.is_active:
|
||
raise AuthenticationFailed("Uživatel je deaktivován.")
|
||
|
||
# Nastav validní uživatele (přebere další logiku ze SimpleJWT)
|
||
self.user = user
|
||
|
||
# Vrátí access a refresh token jako obvykle
|
||
return super().validate(attrs)
|
||
|
||
@extend_schema(
|
||
tags=["api"],
|
||
summary="Refresh JWT token using cookie",
|
||
description="Refresh JWT token"
|
||
)
|
||
@method_decorator(ensure_csrf_cookie, name="dispatch")
|
||
class CookieTokenRefreshView(APIView):
|
||
permission_classes = [AllowAny]
|
||
|
||
def post(self, request):
|
||
refresh_token = request.COOKIES.get('refresh_token') or request.data.get('refresh')
|
||
if not refresh_token:
|
||
return Response({"detail": "Refresh token cookie not found."}, status=status.HTTP_400_BAD_REQUEST)
|
||
|
||
try:
|
||
refresh = RefreshToken(refresh_token)
|
||
access_token = str(refresh.access_token)
|
||
new_refresh_token = str(refresh) # volitelně nový refresh token
|
||
|
||
response = Response({
|
||
"access": access_token,
|
||
"refresh": new_refresh_token,
|
||
})
|
||
|
||
jwt_settings = settings.SIMPLE_JWT
|
||
|
||
# Access token cookie
|
||
response.set_cookie(
|
||
key=jwt_settings.get("AUTH_COOKIE", "access_token"),
|
||
value=access_token,
|
||
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
|
||
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
|
||
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
|
||
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
|
||
max_age=int(5),
|
||
)
|
||
|
||
# Refresh token cookie
|
||
response.set_cookie(
|
||
key=jwt_settings.get("AUTH_COOKIE_REFRESH", "refresh_token"),
|
||
value=new_refresh_token,
|
||
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
|
||
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
|
||
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
|
||
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
|
||
max_age=int(settings.REFRESH_TOKEN_LIFETIME.total_seconds()),
|
||
)
|
||
|
||
return response
|
||
|
||
except TokenError:
|
||
logger.error("Invalid refresh token used.")
|
||
return Response({"detail": "Invalid refresh token."}, status=status.HTTP_401_UNAUTHORIZED)
|
||
|
||
#---------------------------------------------LOGIN/LOGOUT------------------------------------------------
|
||
|
||
@extend_schema(
|
||
tags=["api"],
|
||
summary="Logout user (delete access and refresh token cookies)",
|
||
description="Odhlásí uživatele – smaže access a refresh token cookies"
|
||
)
|
||
class LogoutView(APIView):
|
||
permission_classes = [AllowAny]
|
||
|
||
def post(self, request):
|
||
response = Response({"detail": "Logout successful"}, status=status.HTTP_200_OK)
|
||
|
||
# Smazání cookies
|
||
response.delete_cookie("access_token", path="/")
|
||
response.delete_cookie("refresh_token", path="/")
|
||
|
||
return response
|
||
|
||
#--------------------------------------------------------------------------------------------------------------
|
||
|
||
@extend_schema(
|
||
tags=["User"],
|
||
responses={200: CustomUserSerializer},
|
||
description="Zobrazí všechny uživatele s možností filtrování a řazení.",
|
||
)
|
||
class UserView(viewsets.ModelViewSet):
|
||
queryset = User.objects.all()
|
||
serializer_class = CustomUserSerializer
|
||
filter_backends = [DjangoFilterBackend]
|
||
filterset_class = UserFilter
|
||
|
||
# Require authentication and role permission
|
||
permission_classes = [IsAuthenticated]
|
||
|
||
class Meta:
|
||
model = CustomUser
|
||
extra_kwargs = {
|
||
"email": {"help_text": "Unikátní e-mailová adresa uživatele."},
|
||
"phone_number": {"help_text": "Telefonní číslo ve formátu +420123456789."},
|
||
"role": {"help_text": "Role uživatele určující jeho oprávnění v systému."},
|
||
"account_type": {"help_text": "Typ účtu – firma nebo fyzická osoba."},
|
||
"email_verified": {"help_text": "Určuje, zda je e-mail ověřen."},
|
||
"create_time": {"help_text": "Datum a čas registrace uživatele (pouze pro čtení).", "read_only": True},
|
||
"var_symbol": {"help_text": "Variabilní symbol pro platby, pokud je vyžadován."},
|
||
"bank_account": {"help_text": "Číslo bankovního účtu uživatele."},
|
||
"ICO": {"help_text": "IČO firmy, pokud se jedná o firemní účet."},
|
||
"RC": {"help_text": "Rodné číslo pro fyzické osoby."},
|
||
"city": {"help_text": "Město trvalého pobytu / sídla."},
|
||
"street": {"help_text": "Ulice a číslo popisné."},
|
||
"PSC": {"help_text": "PSČ místa pobytu / sídla."},
|
||
"GDPR": {"help_text": "Souhlas se zpracováním osobních údajů."},
|
||
"is_active": {"help_text": "Stav aktivace uživatele."},
|
||
}
|
||
|
||
def get_permissions(self):
|
||
if self.action in ['list', 'create']: # GET / POST /api/account/users/
|
||
return [OnlyRolesAllowed("cityClerk", "admin")()]
|
||
|
||
elif self.action in ['update', 'partial_update', 'destroy']: # PUT / PATCH / DELETE /api/account/users/{id}
|
||
if self.request.user.role in ['cityClerk', 'admin']:
|
||
return [OnlyRolesAllowed("cityClerk", "admin")()]
|
||
elif self.kwargs.get('pk') and str(self.request.user.id) == self.kwargs['pk']:
|
||
return [IsAuthenticated]
|
||
else:
|
||
# fallback - deny access
|
||
return [OnlyRolesAllowed("cityClerk", "admin")()] # or custom DenyAll()
|
||
|
||
elif self.action == 'retrieve': # GET /api/account/users/{id}
|
||
if self.request.user.role in ['cityClerk', 'admin']:
|
||
return [OnlyRolesAllowed("cityClerk", "admin")()]
|
||
elif self.kwargs.get('pk') and str(self.request.user.id) == self.kwargs['pk']:
|
||
return [IsAuthenticated()]
|
||
else:
|
||
return [OnlyRolesAllowed("cityClerk", "admin")()] # or a custom read-only self-access permission
|
||
|
||
return super().get_permissions()
|
||
|
||
|
||
|
||
# Get current user data
|
||
@extend_schema(
|
||
tags=["User"],
|
||
summary="Get current authenticated user",
|
||
description="Vrátí detail aktuálně přihlášeného uživatele podle JWT tokenu nebo session.",
|
||
responses={
|
||
200: OpenApiResponse(response=CustomUserSerializer),
|
||
401: OpenApiResponse(description="Unauthorized, uživatel není přihlášen"),
|
||
}
|
||
)
|
||
class CurrentUserView(APIView):
|
||
permission_classes = [IsAuthenticated]
|
||
|
||
def get(self, request):
|
||
serializer = CustomUserSerializer(request.user)
|
||
return Response(serializer.data)
|
||
|
||
|
||
#------------------------------------------------REGISTRACE--------------------------------------------------------------
|
||
|
||
#1. registration API
|
||
@extend_schema(
|
||
tags=["User Registration"],
|
||
summary="Register a new user (company or individual)",
|
||
request=UserRegistrationSerializer,
|
||
responses={201: UserRegistrationSerializer},
|
||
description="1. Registrace nového uživatele(firmy). Uživateli přijde email s odkazem na ověření.",
|
||
)
|
||
class UserRegistrationViewSet(ModelViewSet):
|
||
queryset = CustomUser.objects.all()
|
||
serializer_class = UserRegistrationSerializer
|
||
http_method_names = ['post']
|
||
|
||
def create(self, request, *args, **kwargs):
|
||
serializer = self.get_serializer(data=request.data)
|
||
serializer.is_valid(raise_exception=True)
|
||
user = serializer.save()
|
||
|
||
try:
|
||
send_email_verification_task.delay(user.id) # posílaní emailu pro potvrzení registrace - CELERY TASK
|
||
except Exception as e:
|
||
logger.error(f"Celery not available, using fallback. Error: {e}")
|
||
send_email_verification_task(user.id) # posílaní emailu pro potvrzení registrace
|
||
|
||
|
||
|
||
headers = self.get_success_headers(serializer.data)
|
||
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
|
||
|
||
#2. confirming email
|
||
@extend_schema(
|
||
tags=["User Registration"],
|
||
summary="Verify user email via link",
|
||
responses={
|
||
200: OpenApiResponse(description="Email úspěšně ověřen."),
|
||
400: OpenApiResponse(description="Chybný nebo expirovaný token.")
|
||
},
|
||
parameters=[
|
||
OpenApiParameter(name='uidb64', type=str, location='path', description="Token z E-mailu"),
|
||
OpenApiParameter(name='token', type=str, location='path', description="Token uživatele"),
|
||
],
|
||
description="2. Ověření emailu pomocí odkazu s uid a tokenem. (stačí jenom převzít a poslat)",
|
||
)
|
||
class EmailVerificationView(APIView):
|
||
def get(self, request, uidb64, token):
|
||
try:
|
||
uid = force_str(urlsafe_base64_decode(uidb64))
|
||
user = User.objects.get(pk=uid)
|
||
except (User.DoesNotExist, ValueError, TypeError):
|
||
return Response({"error": "Neplatný odkaz."}, status=400)
|
||
|
||
if account_activation_token.check_token(user, token):
|
||
user.email_verified = True
|
||
user.save()
|
||
|
||
return Response({"detail": "E-mail byl úspěšně ověřen. Účet čeká na schválení."})
|
||
else:
|
||
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
|
||
|
||
#3. seller activation API (var_symbol)
|
||
@extend_schema(
|
||
tags=["User Registration"],
|
||
summary="Activate user and set variable symbol (admin/cityClerk only)",
|
||
request=UserActivationSerializer,
|
||
responses={200: UserActivationSerializer},
|
||
description="3. Aktivace uživatele a zadání variabilního symbolu (pouze pro adminy a úředníky).",
|
||
)
|
||
class UserActivationViewSet(APIView):
|
||
permission_classes = [OnlyRolesAllowed('cityClerk', 'admin')]
|
||
|
||
def patch(self, request, *args, **kwargs):
|
||
serializer = UserActivationSerializer(data=request.data)
|
||
serializer.is_valid(raise_exception=True)
|
||
user = serializer.save()
|
||
|
||
try:
|
||
send_email_clerk_accepted_task.delay(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol - CELERY TASK
|
||
except Exception as e:
|
||
logger.error(f"Celery not available, using fallback. Error: {e}")
|
||
send_email_clerk_accepted_task(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol
|
||
|
||
return Response(serializer.to_representation(user), status=status.HTTP_200_OK)
|
||
|
||
#-------------------------------------------------END REGISTRACE-------------------------------------------------------------
|
||
|
||
#1. PasswordReset + send Email
|
||
@extend_schema(
|
||
tags=["User password reset"],
|
||
summary="Request password reset (send email)",
|
||
request=PasswordResetRequestSerializer,
|
||
responses={
|
||
200: OpenApiResponse(description="Odeslán email s instrukcemi."),
|
||
400: OpenApiResponse(description="Neplatný email.")
|
||
},
|
||
description="1(a). Požadavek na reset hesla - uživatel zadá svůj email."
|
||
)
|
||
class PasswordResetRequestView(APIView):
|
||
def post(self, request):
|
||
serializer = PasswordResetRequestSerializer(data=request.data)
|
||
if serializer.is_valid():
|
||
try:
|
||
user = User.objects.get(email=serializer.validated_data['email'])
|
||
except User.DoesNotExist:
|
||
# Always return 200 even if user doesn't exist to avoid user enumeration
|
||
return Response({"detail": "E-mail s odkazem byl odeslán."})
|
||
try:
|
||
send_password_reset_email_task.delay(user.id) # posílaní emailu pro obnovení hesla - CELERY TASK
|
||
except Exception as e:
|
||
logger.error(f"Celery not available, using fallback. Error: {e}")
|
||
send_password_reset_email_task(user.id) # posílaní emailu pro obnovení hesla registrace
|
||
|
||
return Response({"detail": "E-mail s odkazem byl odeslán."})
|
||
|
||
return Response(serializer.errors, status=400)
|
||
|
||
#2. Confirming reset
|
||
@extend_schema(
|
||
tags=["User password reset"],
|
||
summary="Confirm password reset via token",
|
||
request=PasswordResetConfirmSerializer,
|
||
parameters=[
|
||
OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH),
|
||
OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH),
|
||
],
|
||
responses={
|
||
200: OpenApiResponse(description="Heslo bylo změněno."),
|
||
400: OpenApiResponse(description="Chybný token nebo data.")
|
||
},
|
||
description="1(a). Potvrzení resetu hesla pomocí tokenu z emailu."
|
||
)
|
||
class PasswordResetConfirmView(APIView):
|
||
def post(self, request, uidb64, token):
|
||
try:
|
||
uid = force_str(urlsafe_base64_decode(uidb64))
|
||
user = User.objects.get(pk=uid)
|
||
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
|
||
return Response({"error": "Neplatný odkaz."}, status=400)
|
||
|
||
if not password_reset_token.check_token(user, token):
|
||
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
|
||
|
||
serializer = PasswordResetConfirmSerializer(data=request.data)
|
||
if serializer.is_valid():
|
||
user.set_password(serializer.validated_data['password'])
|
||
user.save()
|
||
return Response({"detail": "Heslo bylo úspěšně změněno."})
|
||
return Response(serializer.errors, status=400) |