Files
vontor-cz/backend/account/views.py

642 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.contrib.auth import get_user_model, authenticate, login as django_login, logout as django_logout
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
from django.utils.encoding import force_bytes, force_str
from .serializers import *
from .permissions import *
from .models import CustomUser
from .tokens import *
from .tasks import (
send_password_reset_email_task, send_email_verification_task,
send_email_change_verification_task, send_email_change_notification_task,
)
from django.conf import settings
import logging
logger = logging.getLogger(__name__)
from .filters import UserFilter
from rest_framework import generics, permissions, status, viewsets
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenError, AuthenticationFailed, InvalidToken
from django_filters.rest_framework import DjangoFilterBackend
from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiExample, OpenApiParameter
from vontor_cz.turnstile import verify_turnstile
User = get_user_model()
#general user view API
from rest_framework_simplejwt.views import TokenObtainPairView
#---------------------------------------------TOKENY------------------------------------------------
# Custom Token obtaining view
@extend_schema(
tags=["account", "public"],
summary="Obtain JWT access and refresh tokens (cookie-based)",
description="Authenticate user and obtain JWT access and refresh tokens. You can use either email or username.",
request=CustomTokenObtainPairSerializer,
responses={
200: OpenApiResponse(response=CustomTokenObtainPairSerializer, description="Tokens returned successfully."),
401: OpenApiResponse(description="Invalid credentials or inactive user."),
},
)
class CookieTokenObtainPairView(TokenObtainPairView):
serializer_class = CustomTokenObtainPairSerializer
def post(self, request, *args, **kwargs):
turnstile_token = request.data.get("turnstile_token", "")
remote_ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", ""))
if not verify_turnstile(turnstile_token, remote_ip):
return Response({"detail": "Ověření CAPTCHA selhalo."}, status=status.HTTP_400_BAD_REQUEST)
serializer = self.get_serializer(data=request.data)
try:
serializer.is_valid(raise_exception=True)
except TokenError as e:
raise InvalidToken(e.args[0])
access = serializer.validated_data.get("access")
refresh = serializer.validated_data.get("refresh")
# Create a Django session so AuthMiddlewareStack authenticates WebSocket connections
django_login(request, serializer.user, backend='django.contrib.auth.backends.ModelBackend')
response = Response(serializer.validated_data, status=status.HTTP_200_OK)
jwt_settings = settings.SIMPLE_JWT
# Access token cookie
response.set_cookie(
key=jwt_settings.get("AUTH_COOKIE", "access_token"),
value=access,
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
max_age=5 * 60, # 5 minut
)
# Refresh token cookie
response.set_cookie(
key="refresh_token",
value=refresh,
httponly=True,
secure=not settings.DEBUG,
samesite="Lax",
path="/",
max_age=7 * 24 * 60 * 60, # 7 dní
)
return response
@extend_schema(
tags=["account", "public"],
summary="Refresh JWT token using cookie",
description="Refresh JWT access and refresh tokens using the refresh token stored in cookie.",
responses={
200: OpenApiResponse(description="Tokens refreshed successfully."),
400: OpenApiResponse(description="Refresh token cookie not found."),
401: OpenApiResponse(description="Invalid refresh token."),
},
)
class CookieTokenRefreshView(APIView):
def post(self, request):
refresh_token = request.COOKIES.get('refresh_token')
if not refresh_token:
return Response({"detail": "Refresh token cookie not found."}, status=status.HTTP_400_BAD_REQUEST)
try:
refresh = RefreshToken(refresh_token)
access_token = str(refresh.access_token)
new_refresh_token = str(refresh) # volitelně nový refresh token
response = Response({
"access": access_token,
"refresh": new_refresh_token,
})
# Nastav nové HttpOnly cookies
# Access token cookie (např. 5 minut platnost)
response.set_cookie(
"access_token",
access_token,
httponly=True,
secure=not settings.DEBUG,
samesite="Lax",
max_age=5 * 60,
path="/",
)
# Refresh token cookie (delší platnost, např. 7 dní)
response.set_cookie(
"refresh_token",
new_refresh_token,
httponly=True,
secure=not settings.DEBUG,
samesite="Lax",
max_age=7 * 24 * 60 * 60,
path="/",
)
return response
except TokenError:
return Response({"detail": "Invalid refresh token."}, status=status.HTTP_401_UNAUTHORIZED)
#---------------------------------------------LOGOUT------------------------------------------------
@extend_schema(
tags=["account", "public"],
summary="Logout user (delete access and refresh token cookies)",
description="Logs out the user by deleting access and refresh token cookies.",
responses={
200: OpenApiResponse(description="Logout successful."),
},
)
class LogoutView(APIView):
permission_classes = [AllowAny]
def post(self, request):
django_logout(request) # destroy Django session (used for WebSocket auth)
response = Response({"detail": "Logout successful"}, status=status.HTTP_200_OK)
response.delete_cookie("access_token", path="/")
response.delete_cookie("refresh_token", path="/")
return response
#--------------------------------------------------------------------------------------------------------------
@extend_schema(
tags=["account"],
summary="List, retrieve, update, and delete users.",
description="Displays all users with filtering and ordering options. Requires authentication and appropriate role.",
responses={
200: OpenApiResponse(response=CustomUserSerializer, description="User(s) retrieved successfully."),
403: OpenApiResponse(description="Permission denied."),
},
)
class UserView(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = CustomUserSerializer
filter_backends = [DjangoFilterBackend]
filterset_class = UserFilter
# Require authentication and role permission
permission_classes = [IsAuthenticated]
class Meta:
model = CustomUser
extra_kwargs = {
"email": {"help_text": "Unikátní e-mailová adresa uživatele."},
"phone_number": {"help_text": "Telefonní číslo ve formátu +420123456789."},
"role": {"help_text": "Role uživatele určující jeho oprávnění v systému."},
"account_type": {"help_text": "Typ účtu firma nebo fyzická osoba."},
"email_verified": {"help_text": "Určuje, zda je e-mail ověřen."},
"create_time": {"help_text": "Datum a čas registrace uživatele (pouze pro čtení).", "read_only": True},
"var_symbol": {"help_text": "Variabilní symbol pro platby, pokud je vyžadován."},
"bank_account": {"help_text": "Číslo bankovního účtu uživatele."},
"ICO": {"help_text": "IČO firmy, pokud se jedná o firemní účet."},
"RC": {"help_text": "Rodné číslo pro fyzické osoby."},
"city": {"help_text": "Město trvalého pobytu / sídla."},
"street": {"help_text": "Ulice a číslo popisné."},
"PSC": {"help_text": "PSČ místa pobytu / sídla."},
"GDPR": {"help_text": "Souhlas se zpracováním osobních údajů."},
"is_active": {"help_text": "Stav aktivace uživatele."},
}
@extend_schema(
tags=["account"],
summary="Get permissions based on user role and action.",
description="Determines permissions for various actions based on user role and ownership.",
responses={
200: OpenApiResponse(description="Permissions determined successfully."),
403: OpenApiResponse(description="Permission denied."),
},
)
def get_permissions(self):
if self.action == 'create':
return [OnlyRolesAllowed("admin")()]
if self.action in ['list', 'retrieve']:
return [IsAuthenticated()]
if self.action in ['update', 'partial_update', 'destroy']:
user = getattr(self, 'request', None) and getattr(self.request, 'user', None)
if user and getattr(user, 'is_authenticated', False) and getattr(user, 'role', None) == 'admin':
return [OnlyRolesAllowed("admin")()]
lookup = self.kwargs.get('pk', '')
if user and getattr(user, 'is_authenticated', False) and lookup and (
str(getattr(user, 'id', '')) == lookup
):
return [IsAuthenticated()]
return [OnlyRolesAllowed("admin")()]
return super().get_permissions()
def get_serializer_class(self):
user = getattr(self.request, 'user', None)
is_admin = user and (getattr(user, 'role', None) == 'admin' or getattr(user, 'is_superuser', False))
if self.action in ['retrieve', 'list'] and not is_admin:
return PublicUserSerializer
return CustomUserSerializer
# Get current user data
@extend_schema(
tags=["account"],
summary="Get current authenticated user",
description="Returns details of the currently authenticated user based on JWT token or session.",
responses={
200: OpenApiResponse(response=CustomUserSerializer, description="Current user details."),
401: OpenApiResponse(description="Unauthorized, user is not authenticated."),
}
)
class CurrentUserView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
serializer = CustomUserSerializer(request.user)
return Response(serializer.data)
@extend_schema(
tags=["account"],
summary="Change password for the authenticated user",
request=ChangePasswordSerializer,
responses={
200: OpenApiResponse(description="Password changed successfully."),
400: OpenApiResponse(description="Invalid current password or validation error."),
},
)
class ChangePasswordView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request):
serializer = ChangePasswordSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = request.user
if not user.check_password(serializer.validated_data['current_password']):
return Response({"current_password": "Nesprávné heslo."}, status=status.HTTP_400_BAD_REQUEST)
user.set_password(serializer.validated_data['new_password'])
user.save()
return Response({"detail": "Heslo bylo úspěšně změněno."})
#------------------------------------------------REGISTRACE--------------------------------------------------------------
#1. registration API
@extend_schema(
tags=["account", "public"],
summary="Register a new user (company or individual)",
description="Register a new user (company or individual). The user will receive an email with a verification link.",
request=UserRegistrationSerializer,
responses={
201: OpenApiResponse(response=UserRegistrationSerializer, description="User registered successfully."),
400: OpenApiResponse(description="Invalid registration data."),
},
)
class UserRegistrationViewSet(ModelViewSet):
queryset = CustomUser.objects.all()
serializer_class = UserRegistrationSerializer
http_method_names = ['post']
def create(self, request, *args, **kwargs):
turnstile_token = request.data.get("turnstile_token", "")
remote_ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", ""))
if not verify_turnstile(turnstile_token, remote_ip):
return Response({"detail": "Ověření CAPTCHA selhalo."}, status=status.HTTP_400_BAD_REQUEST)
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.save()
try:
send_email_verification_task.delay(user.id) # posílaní emailu pro potvrzení registrace - CELERY TASK
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_email_verification_task(user.id) # posílaní emailu pro potvrzení registrace
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
#2. confirming email
@extend_schema(
tags=["account", "public"],
summary="Verify user email via link",
description="Verify user email using the link with uid and token.",
parameters=[
OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."),
OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="User token from email link."),
],
responses={
200: OpenApiResponse(description="Email successfully verified."),
400: OpenApiResponse(description="Invalid or expired token."),
},
)
class EmailVerificationView(APIView):
def get(self, request, uidb64, token):
try:
uid = force_str(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (User.DoesNotExist, ValueError, TypeError):
return Response({"error": "Neplatný odkaz."}, status=400)
if account_activation_token.check_token(user, token):
user.email_verified = True
user.is_active = True # Aktivace uživatele po ověření e-mailu
user.save()
return Response({"detail": "E-mail byl úspěšně ověřen. Účet je aktivován."})
else:
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
#-------------------------------------------------END REGISTRACE-------------------------------------------------------------
#1. PasswordReset + send Email
@extend_schema(
tags=["account", "public"],
summary="Request password reset (send email)",
description="Request password reset by providing registered email. An email with instructions will be sent.",
request=PasswordResetRequestSerializer,
responses={
200: OpenApiResponse(description="Email with instructions sent."),
400: OpenApiResponse(description="Invalid email or request data."),
},
)
class PasswordResetRequestView(APIView):
def post(self, request):
serializer = PasswordResetRequestSerializer(data=request.data)
if serializer.is_valid():
try:
user = User.objects.get(email=serializer.validated_data['email'])
except User.DoesNotExist:
# Always return 200 even if user doesn't exist to avoid user enumeration
return Response({"detail": "E-mail s odkazem byl odeslán."})
try:
send_password_reset_email_task.delay(user.id) # posílaní emailu pro obnovení hesla - CELERY TASK
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_password_reset_email_task(user.id) # posílaní emailu pro obnovení hesla registrace
return Response({"detail": "E-mail s odkazem byl odeslán."})
return Response(serializer.errors, status=400)
#2. Confirming reset
@extend_schema(
tags=["account", "public"],
summary="Confirm password reset via token",
description="Confirm password reset using token from email.",
request=PasswordResetConfirmSerializer,
parameters=[
OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."),
OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="Password reset token from email link."),
],
responses={
200: OpenApiResponse(description="Password changed successfully."),
400: OpenApiResponse(description="Invalid token or request data."),
},
)
class PasswordResetConfirmView(APIView):
def post(self, request, uidb64, token):
try:
uid = force_str(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
return Response({"error": "Neplatný odkaz."}, status=400)
if not password_reset_token.check_token(user, token):
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
serializer = PasswordResetConfirmSerializer(data=request.data)
if serializer.is_valid():
user.set_password(serializer.validated_data['password'])
user.save()
return Response({"detail": "Heslo bylo úspěšně změněno."})
return Response(serializer.errors, status=400)
@extend_schema(
tags=["account"],
summary="Resend email verification",
description="Resends the verification email to the currently authenticated user. Limited to once per minute.",
responses={
200: OpenApiResponse(description="Verification email sent."),
400: OpenApiResponse(description="Email is already verified."),
429: OpenApiResponse(description="Rate limited — seconds_remaining returned."),
},
)
class ResendEmailVerificationView(APIView):
permission_classes = [IsAuthenticated]
COOLDOWN = 60 # seconds
def post(self, request):
from django.core.cache import cache
import time
user = request.user
if user.email_verified:
return Response({"detail": "E-mail je již ověřen."}, status=status.HTTP_400_BAD_REQUEST)
cache_key = f"resend_verification_{user.id}"
sent_at = cache.get(cache_key)
if sent_at is not None:
remaining = int(self.COOLDOWN - (time.time() - sent_at))
if remaining > 0:
return Response(
{"detail": "Příliš mnoho pokusů.", "seconds_remaining": remaining},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
cache.set(cache_key, time.time(), timeout=self.COOLDOWN)
try:
send_email_verification_task.delay(user.id)
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_email_verification_task(user.id)
return Response({"detail": "Ověřovací e-mail byl odeslán."})
@extend_schema(
tags=["account"],
summary="Request email address change",
description="Validates current password + Turnstile, stores pending_email, sends confirmation link to the new address and a notification to the old one.",
responses={
200: OpenApiResponse(description="Confirmation email sent to new address."),
400: OpenApiResponse(description="Wrong password, duplicate email, or failed CAPTCHA."),
},
)
class ChangeEmailView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request):
turnstile_token = request.data.get("turnstile_token", "")
remote_ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", ""))
if not verify_turnstile(turnstile_token, remote_ip):
return Response({"detail": "Ověření CAPTCHA selhalo."}, status=status.HTTP_400_BAD_REQUEST)
serializer = ChangeEmailSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = request.user
if not user.check_password(serializer.validated_data["current_password"]):
return Response({"current_password": "Nesprávné heslo."}, status=status.HTTP_400_BAD_REQUEST)
new_email = serializer.validated_data["new_email"]
old_email = user.email
user.pending_email = new_email
uid = urlsafe_base64_encode(force_bytes(user.pk))
token = user.generate_email_verification_token(save=False)
user.save(update_fields=["pending_email", "email_verification_token", "email_verification_sent_at"])
verify_url = f"{settings.FRONTEND_URL}/account/confirm-email-change/{uid}/{token}/"
try:
send_email_change_verification_task.delay(user.id, new_email, verify_url)
send_email_change_notification_task.delay(user.id, old_email, new_email)
except Exception as e:
logger.error(f"Celery not available, using fallback. Error: {e}")
send_email_change_verification_task(user.id, new_email, verify_url)
send_email_change_notification_task(user.id, old_email, new_email)
return Response({"detail": "Ověřovací e-mail byl odeslán na novou adresu."})
@extend_schema(
tags=["account"],
summary="Confirm email address change via link",
description="Verifies uid + token from the confirmation email, swaps email to pending_email.",
parameters=[
OpenApiParameter(name="uidb64", type=str, location=OpenApiParameter.PATH),
OpenApiParameter(name="token", type=str, location=OpenApiParameter.PATH),
],
responses={
200: OpenApiResponse(description="Email changed successfully."),
400: OpenApiResponse(description="Invalid or expired link."),
},
)
class ConfirmEmailChangeView(APIView):
def get(self, request, uidb64, token):
ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", "unknown"))
try:
uid = force_str(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (User.DoesNotExist, ValueError, TypeError):
return Response({"error": "Neplatný odkaz."}, status=status.HTTP_400_BAD_REQUEST)
if not user.pending_email:
return Response({"error": "Žádná čekající změna e-mailu."}, status=status.HTTP_400_BAD_REQUEST)
if not user.verify_email_token(token):
return Response({"error": "Token je neplatný nebo expirovaný."}, status=status.HTTP_400_BAD_REQUEST)
old_email = user.email
user.email = user.pending_email
user.pending_email = None
user.email_verified = True
user.save(update_fields=["email", "pending_email", "email_verified"])
logger.info("Email changed for user %s: %s%s (IP: %s)", user.pk, old_email, user.email, ip)
return Response({"detail": "E-mail byl úspěšně změněn."})
@extend_schema(
tags=["account"],
summary="Change username",
description="Updates the username. Requires Turnstile. Limited to once per 30 days.",
responses={
200: OpenApiResponse(description="Username changed."),
400: OpenApiResponse(description="Duplicate name, invalid characters, or failed CAPTCHA."),
429: OpenApiResponse(description="Rate limited — days_remaining returned."),
},
)
class ChangeUsernameView(APIView):
permission_classes = [IsAuthenticated]
COOLDOWN_DAYS = 30
def post(self, request):
from django.core.cache import cache
import time
turnstile_token = request.data.get("turnstile_token", "")
remote_ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", ""))
if not verify_turnstile(turnstile_token, remote_ip):
return Response({"detail": "Ověření CAPTCHA selhalo."}, status=status.HTTP_400_BAD_REQUEST)
serializer = ChangeUsernameSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = request.user
cache_key = f"username_change_{user.id}"
changed_at = cache.get(cache_key)
if changed_at is not None:
remaining_days = int(self.COOLDOWN_DAYS - (time.time() - changed_at) / 86400)
if remaining_days > 0:
return Response(
{"detail": "Příliš brzy.", "days_remaining": remaining_days},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
user.username = serializer.validated_data["new_username"]
user.save(update_fields=["username"])
cache.set(cache_key, time.time(), timeout=self.COOLDOWN_DAYS * 86400)
return Response({"detail": "Uživatelské jméno bylo změněno.", "username": user.username})
@extend_schema(
tags=["account"],
summary="Delete own account",
description="Soft-deletes the authenticated user's account after password confirmation. Clears auth cookies.",
responses={
204: OpenApiResponse(description="Account deleted."),
400: OpenApiResponse(description="Wrong password."),
},
)
class DeleteAccountView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request):
user = request.user
turnstile_token = request.data.get("turnstile_token", "")
remote_ip = request.META.get("HTTP_X_FORWARDED_FOR", request.META.get("REMOTE_ADDR", ""))
if not verify_turnstile(turnstile_token, remote_ip):
return Response({"detail": "Ověření CAPTCHA selhalo."}, status=status.HTTP_400_BAD_REQUEST)
password = request.data.get("current_password", "")
if not password or not user.check_password(password):
return Response({"current_password": "Nesprávné heslo."}, status=status.HTTP_400_BAD_REQUEST)
logger.info("Account deletion requested for user %s (id=%s)", user.username, user.pk)
django_logout(request)
user.delete()
response = Response(status=status.HTTP_204_NO_CONTENT)
response.delete_cookie("access_token", path="/")
response.delete_cookie("refresh_token", path="/")
return response