Refactored commerce models to support refunds, invoices, and improved carrier/payment logic. Added new serializers and viewsets for products, categories, images, discount codes, and refunds. Introduced Stripe client integration and removed legacy Stripe admin/model code. Updated Dockerfile for PDF generation dependencies. Removed obsolete migration files and updated configuration app initialization. Added invoice template and tasks for order cleanup.
425 lines
18 KiB
Python
425 lines
18 KiB
Python
from django.contrib.auth import get_user_model, authenticate
|
||
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
|
||
from django.utils.encoding import force_bytes, force_str
|
||
|
||
from .serializers import *
|
||
from .permissions import *
|
||
from .models import CustomUser
|
||
from .tokens import *
|
||
from .tasks import send_password_reset_email_task, send_email_verification_task, send_email_clerk_accepted_task # FIXME: send_email_clerk_accepted_task neexistuje !!!
|
||
from django.conf import settings
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
from .filters import UserFilter
|
||
|
||
from rest_framework import generics, permissions, status, viewsets
|
||
from rest_framework.response import Response
|
||
from rest_framework_simplejwt.tokens import RefreshToken
|
||
from rest_framework.views import APIView
|
||
from rest_framework.viewsets import ModelViewSet
|
||
from rest_framework.permissions import IsAuthenticated, AllowAny
|
||
|
||
from rest_framework_simplejwt.tokens import RefreshToken
|
||
from rest_framework_simplejwt.exceptions import TokenError, AuthenticationFailed
|
||
from django_filters.rest_framework import DjangoFilterBackend
|
||
|
||
from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiExample, OpenApiParameter
|
||
|
||
|
||
User = get_user_model()
|
||
|
||
#general user view API
|
||
|
||
|
||
from rest_framework_simplejwt.views import TokenObtainPairView
|
||
|
||
#---------------------------------------------TOKENY------------------------------------------------
|
||
|
||
# Custom Token obtaining view
|
||
@extend_schema(
|
||
tags=["Authentication"],
|
||
summary="Obtain JWT access and refresh tokens (cookie-based)",
|
||
description="Authenticate user and obtain JWT access and refresh tokens. You can use either email or username.",
|
||
request=CustomTokenObtainPairSerializer,
|
||
responses={
|
||
200: OpenApiResponse(response=CustomTokenObtainPairSerializer, description="Tokens returned successfully."),
|
||
401: OpenApiResponse(description="Invalid credentials or inactive user."),
|
||
},
|
||
)
|
||
class CookieTokenObtainPairView(TokenObtainPairView):
|
||
serializer_class = CustomTokenObtainPairSerializer
|
||
|
||
|
||
def post(self, request, *args, **kwargs):
|
||
response = super().post(request, *args, **kwargs)
|
||
|
||
# Získáme tokeny z odpovědi
|
||
access = response.data.get("access")
|
||
refresh = response.data.get("refresh")
|
||
|
||
if not access or not refresh:
|
||
return response # Např. při chybě přihlášení
|
||
|
||
jwt_settings = settings.SIMPLE_JWT
|
||
|
||
# Access token cookie
|
||
response.set_cookie(
|
||
key=jwt_settings.get("AUTH_COOKIE", "access_token"),
|
||
value=access,
|
||
httponly=jwt_settings.get("AUTH_COOKIE_HTTP_ONLY", True),
|
||
secure=jwt_settings.get("AUTH_COOKIE_SECURE", not settings.DEBUG),
|
||
samesite=jwt_settings.get("AUTH_COOKIE_SAMESITE", "Lax"),
|
||
path=jwt_settings.get("AUTH_COOKIE_PATH", "/"),
|
||
max_age=5 * 60, # 5 minut
|
||
)
|
||
|
||
# Refresh token cookie
|
||
response.set_cookie(
|
||
key="refresh_token",
|
||
value=refresh,
|
||
httponly=True,
|
||
secure=not settings.DEBUG,
|
||
samesite="Lax",
|
||
path="/",
|
||
max_age=7 * 24 * 60 * 60, # 7 dní
|
||
)
|
||
|
||
return response
|
||
|
||
def validate(self, attrs):
|
||
username = attrs.get("username")
|
||
password = attrs.get("password")
|
||
|
||
# Přihlaš uživatele ručně
|
||
user = authenticate(request=self.context.get('request'), username=username, password=password)
|
||
|
||
if not user:
|
||
raise AuthenticationFailed("Špatné uživatelské jméno nebo heslo.")
|
||
|
||
if not user.is_active:
|
||
raise AuthenticationFailed("Uživatel je deaktivován.")
|
||
|
||
# Nastav validní uživatele (přebere další logiku ze SimpleJWT)
|
||
self.user = user
|
||
|
||
# Vrátí access a refresh token jako obvykle
|
||
return super().validate(attrs)
|
||
|
||
@extend_schema(
|
||
tags=["Authentication"],
|
||
summary="Refresh JWT token using cookie",
|
||
description="Refresh JWT access and refresh tokens using the refresh token stored in cookie.",
|
||
responses={
|
||
200: OpenApiResponse(description="Tokens refreshed successfully."),
|
||
400: OpenApiResponse(description="Refresh token cookie not found."),
|
||
401: OpenApiResponse(description="Invalid refresh token."),
|
||
},
|
||
)
|
||
class CookieTokenRefreshView(APIView):
|
||
def post(self, request):
|
||
refresh_token = request.COOKIES.get('refresh_token')
|
||
if not refresh_token:
|
||
return Response({"detail": "Refresh token cookie not found."}, status=status.HTTP_400_BAD_REQUEST)
|
||
|
||
try:
|
||
refresh = RefreshToken(refresh_token)
|
||
access_token = str(refresh.access_token)
|
||
new_refresh_token = str(refresh) # volitelně nový refresh token
|
||
|
||
response = Response({
|
||
"access": access_token,
|
||
"refresh": new_refresh_token,
|
||
})
|
||
|
||
# Nastav nové HttpOnly cookies
|
||
# Access token cookie (např. 5 minut platnost)
|
||
response.set_cookie(
|
||
"access_token",
|
||
access_token,
|
||
httponly=True,
|
||
secure=not settings.DEBUG,
|
||
samesite="Lax",
|
||
max_age=5 * 60,
|
||
path="/",
|
||
)
|
||
|
||
# Refresh token cookie (delší platnost, např. 7 dní)
|
||
response.set_cookie(
|
||
"refresh_token",
|
||
new_refresh_token,
|
||
httponly=True,
|
||
secure=not settings.DEBUG,
|
||
samesite="Lax",
|
||
max_age=7 * 24 * 60 * 60,
|
||
path="/",
|
||
)
|
||
|
||
return response
|
||
|
||
except TokenError:
|
||
return Response({"detail": "Invalid refresh token."}, status=status.HTTP_401_UNAUTHORIZED)
|
||
|
||
#---------------------------------------------LOGOUT------------------------------------------------
|
||
|
||
@extend_schema(
|
||
tags=["Authentication"],
|
||
summary="Logout user (delete access and refresh token cookies)",
|
||
description="Logs out the user by deleting access and refresh token cookies.",
|
||
responses={
|
||
200: OpenApiResponse(description="Logout successful."),
|
||
},
|
||
)
|
||
class LogoutView(APIView):
|
||
permission_classes = [AllowAny]
|
||
|
||
def post(self, request):
|
||
response = Response({"detail": "Logout successful"}, status=status.HTTP_200_OK)
|
||
|
||
# Smazání cookies
|
||
response.delete_cookie("access_token", path="/")
|
||
response.delete_cookie("refresh_token", path="/")
|
||
|
||
return response
|
||
|
||
#--------------------------------------------------------------------------------------------------------------
|
||
|
||
@extend_schema(
|
||
tags=["User"],
|
||
summary="List, retrieve, update, and delete users.",
|
||
description="Displays all users with filtering and ordering options. Requires authentication and appropriate role.",
|
||
responses={
|
||
200: OpenApiResponse(response=CustomUserSerializer, description="User(s) retrieved successfully."),
|
||
403: OpenApiResponse(description="Permission denied."),
|
||
},
|
||
)
|
||
class UserView(viewsets.ModelViewSet):
|
||
queryset = User.objects.all()
|
||
serializer_class = CustomUserSerializer
|
||
filter_backends = [DjangoFilterBackend]
|
||
filterset_class = UserFilter
|
||
|
||
# Require authentication and role permission
|
||
permission_classes = [IsAuthenticated]
|
||
|
||
class Meta:
|
||
model = CustomUser
|
||
extra_kwargs = {
|
||
"email": {"help_text": "Unikátní e-mailová adresa uživatele."},
|
||
"phone_number": {"help_text": "Telefonní číslo ve formátu +420123456789."},
|
||
"role": {"help_text": "Role uživatele určující jeho oprávnění v systému."},
|
||
"account_type": {"help_text": "Typ účtu – firma nebo fyzická osoba."},
|
||
"email_verified": {"help_text": "Určuje, zda je e-mail ověřen."},
|
||
"create_time": {"help_text": "Datum a čas registrace uživatele (pouze pro čtení).", "read_only": True},
|
||
"var_symbol": {"help_text": "Variabilní symbol pro platby, pokud je vyžadován."},
|
||
"bank_account": {"help_text": "Číslo bankovního účtu uživatele."},
|
||
"ICO": {"help_text": "IČO firmy, pokud se jedná o firemní účet."},
|
||
"RC": {"help_text": "Rodné číslo pro fyzické osoby."},
|
||
"city": {"help_text": "Město trvalého pobytu / sídla."},
|
||
"street": {"help_text": "Ulice a číslo popisné."},
|
||
"PSC": {"help_text": "PSČ místa pobytu / sídla."},
|
||
"GDPR": {"help_text": "Souhlas se zpracováním osobních údajů."},
|
||
"is_active": {"help_text": "Stav aktivace uživatele."},
|
||
}
|
||
|
||
def get_permissions(self):
|
||
# Only admin can list or create users
|
||
if self.action in ['list', 'create']:
|
||
return [OnlyRolesAllowed("admin")()]
|
||
|
||
# Only admin or the user themselves can update or delete
|
||
elif self.action in ['update', 'partial_update', 'destroy']:
|
||
user = getattr(self, 'request', None) and getattr(self.request, 'user', None)
|
||
# Admins can modify any user
|
||
if user and getattr(user, 'is_authenticated', False) and getattr(user, 'role', None) == 'admin':
|
||
return [OnlyRolesAllowed("admin")()]
|
||
|
||
# Users can modify their own record
|
||
if user and getattr(user, 'is_authenticated', False) and self.kwargs.get('pk') and str(getattr(user, 'id', '')) == self.kwargs['pk']:
|
||
return [IsAuthenticated()]
|
||
|
||
# Fallback - deny access (prevents AttributeError for AnonymousUser)
|
||
return [OnlyRolesAllowed("admin")()]
|
||
|
||
# Any authenticated user can retrieve (view) any user's profile
|
||
elif self.action == 'retrieve':
|
||
return [IsAuthenticated()]
|
||
|
||
return super().get_permissions()
|
||
|
||
|
||
|
||
# Get current user data
|
||
@extend_schema(
|
||
tags=["User"],
|
||
summary="Get current authenticated user",
|
||
description="Returns details of the currently authenticated user based on JWT token or session.",
|
||
responses={
|
||
200: OpenApiResponse(response=CustomUserSerializer, description="Current user details."),
|
||
401: OpenApiResponse(description="Unauthorized, user is not authenticated."),
|
||
}
|
||
)
|
||
class CurrentUserView(APIView):
|
||
permission_classes = [IsAuthenticated]
|
||
|
||
def get(self, request):
|
||
serializer = CustomUserSerializer(request.user)
|
||
return Response(serializer.data)
|
||
|
||
|
||
#------------------------------------------------REGISTRACE--------------------------------------------------------------
|
||
|
||
#1. registration API
|
||
@extend_schema(
|
||
tags=["User Registration"],
|
||
summary="Register a new user (company or individual)",
|
||
description="Register a new user (company or individual). The user will receive an email with a verification link.",
|
||
request=UserRegistrationSerializer,
|
||
responses={
|
||
201: OpenApiResponse(response=UserRegistrationSerializer, description="User registered successfully."),
|
||
400: OpenApiResponse(description="Invalid registration data."),
|
||
},
|
||
)
|
||
class UserRegistrationViewSet(ModelViewSet):
|
||
queryset = CustomUser.objects.all()
|
||
serializer_class = UserRegistrationSerializer
|
||
http_method_names = ['post']
|
||
|
||
def create(self, request, *args, **kwargs):
|
||
serializer = self.get_serializer(data=request.data)
|
||
serializer.is_valid(raise_exception=True)
|
||
user = serializer.save()
|
||
|
||
try:
|
||
send_email_verification_task.delay(user.id) # posílaní emailu pro potvrzení registrace - CELERY TASK
|
||
except Exception as e:
|
||
logger.error(f"Celery not available, using fallback. Error: {e}")
|
||
send_email_verification_task(user.id) # posílaní emailu pro potvrzení registrace
|
||
|
||
|
||
|
||
headers = self.get_success_headers(serializer.data)
|
||
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
|
||
|
||
#2. confirming email
|
||
@extend_schema(
|
||
tags=["User Registration"],
|
||
summary="Verify user email via link",
|
||
description="Verify user email using the link with uid and token.",
|
||
parameters=[
|
||
OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."),
|
||
OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="User token from email link."),
|
||
],
|
||
responses={
|
||
200: OpenApiResponse(description="Email successfully verified."),
|
||
400: OpenApiResponse(description="Invalid or expired token."),
|
||
},
|
||
)
|
||
class EmailVerificationView(APIView):
|
||
def get(self, request, uidb64, token):
|
||
try:
|
||
uid = force_str(urlsafe_base64_decode(uidb64))
|
||
user = User.objects.get(pk=uid)
|
||
except (User.DoesNotExist, ValueError, TypeError):
|
||
return Response({"error": "Neplatný odkaz."}, status=400)
|
||
|
||
if account_activation_token.check_token(user, token):
|
||
user.email_verified = True
|
||
user.save()
|
||
|
||
return Response({"detail": "E-mail byl úspěšně ověřen. Účet čeká na schválení."})
|
||
else:
|
||
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
|
||
|
||
#3. seller activation API (var_symbol)
|
||
@extend_schema(
|
||
tags=["User Registration"],
|
||
summary="Activate user and set variable symbol (admin/cityClerk only)",
|
||
description="Activate user and set variable symbol. Only accessible by admin or cityClerk.",
|
||
request=UserActivationSerializer,
|
||
responses={
|
||
200: OpenApiResponse(response=UserActivationSerializer, description="User activated successfully."),
|
||
400: OpenApiResponse(description="Invalid activation data."),
|
||
404: OpenApiResponse(description="User not found."),
|
||
},
|
||
)
|
||
class UserActivationViewSet(APIView):
|
||
permission_classes = [OnlyRolesAllowed('cityClerk', 'admin')]
|
||
|
||
def patch(self, request, *args, **kwargs):
|
||
serializer = UserActivationSerializer(data=request.data)
|
||
serializer.is_valid(raise_exception=True)
|
||
user = serializer.save()
|
||
|
||
try:
|
||
send_email_clerk_accepted_task.delay(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol - CELERY TASK
|
||
except Exception as e:
|
||
logger.error(f"Celery not available, using fallback. Error: {e}")
|
||
send_email_clerk_accepted_task(user.id) # posílaní emailu pro informování uživatele o dokončení registrace, uředník doplnil variabilní symbol
|
||
|
||
return Response(serializer.to_representation(user), status=status.HTTP_200_OK)
|
||
|
||
#-------------------------------------------------END REGISTRACE-------------------------------------------------------------
|
||
|
||
#1. PasswordReset + send Email
|
||
@extend_schema(
|
||
tags=["User password reset"],
|
||
summary="Request password reset (send email)",
|
||
description="Request password reset by providing registered email. An email with instructions will be sent.",
|
||
request=PasswordResetRequestSerializer,
|
||
responses={
|
||
200: OpenApiResponse(description="Email with instructions sent."),
|
||
400: OpenApiResponse(description="Invalid email or request data."),
|
||
},
|
||
)
|
||
class PasswordResetRequestView(APIView):
|
||
def post(self, request):
|
||
serializer = PasswordResetRequestSerializer(data=request.data)
|
||
if serializer.is_valid():
|
||
try:
|
||
user = User.objects.get(email=serializer.validated_data['email'])
|
||
except User.DoesNotExist:
|
||
# Always return 200 even if user doesn't exist to avoid user enumeration
|
||
return Response({"detail": "E-mail s odkazem byl odeslán."})
|
||
try:
|
||
send_password_reset_email_task.delay(user.id) # posílaní emailu pro obnovení hesla - CELERY TASK
|
||
except Exception as e:
|
||
logger.error(f"Celery not available, using fallback. Error: {e}")
|
||
send_password_reset_email_task(user.id) # posílaní emailu pro obnovení hesla registrace
|
||
|
||
return Response({"detail": "E-mail s odkazem byl odeslán."})
|
||
|
||
return Response(serializer.errors, status=400)
|
||
|
||
#2. Confirming reset
|
||
@extend_schema(
|
||
tags=["User password reset"],
|
||
summary="Confirm password reset via token",
|
||
description="Confirm password reset using token from email.",
|
||
request=PasswordResetConfirmSerializer,
|
||
parameters=[
|
||
OpenApiParameter(name='uidb64', type=str, location=OpenApiParameter.PATH, description="User ID encoded in base64 from email link."),
|
||
OpenApiParameter(name='token', type=str, location=OpenApiParameter.PATH, description="Password reset token from email link."),
|
||
],
|
||
responses={
|
||
200: OpenApiResponse(description="Password changed successfully."),
|
||
400: OpenApiResponse(description="Invalid token or request data."),
|
||
},
|
||
)
|
||
class PasswordResetConfirmView(APIView):
|
||
def post(self, request, uidb64, token):
|
||
try:
|
||
uid = force_str(urlsafe_base64_decode(uidb64))
|
||
user = User.objects.get(pk=uid)
|
||
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
|
||
return Response({"error": "Neplatný odkaz."}, status=400)
|
||
|
||
if not password_reset_token.check_token(user, token):
|
||
return Response({"error": "Token je neplatný nebo expirovaný."}, status=400)
|
||
|
||
serializer = PasswordResetConfirmSerializer(data=request.data)
|
||
if serializer.is_valid():
|
||
user.set_password(serializer.validated_data['password'])
|
||
user.save()
|
||
return Response({"detail": "Heslo bylo úspěšně změněno."})
|
||
return Response(serializer.errors, status=400) |