Files
vontor-cz/backend/commerce/views.py
Brunobrno b279ac36d5 Add shopping cart and product review features
Introduces Cart and CartItem models, admin, serializers, and API endpoints for shopping cart management for both authenticated and anonymous users. Adds Review model, serializers, and endpoints for product reviews, including public creation and retrieval. Updates ProductImage ordering, enhances order save logic with notification, and improves product and order endpoints with new actions and filters. Includes related migrations for commerce, configuration, social chat, and Deutsche Post integration.
2026-01-17 02:38:02 +01:00

740 lines
24 KiB
Python

from rest_framework.views import APIView
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status
import base64
from .models import Refund, Review
from .serializers import RefundCreatePublicSerializer, ReviewSerializerPublic
from django.db import transaction
from rest_framework import viewsets, mixins, status
from rest_framework.permissions import AllowAny, IsAdminUser, SAFE_METHODS
from rest_framework.decorators import action
from rest_framework.response import Response
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiExample
from rest_framework import filters, permissions
from django_filters.rest_framework import DjangoFilterBackend
from .models import (
Order,
OrderItem,
Carrier,
Payment,
Product,
DiscountCode,
Category,
ProductImage,
Refund,
Cart,
CartItem,
)
from .serializers import (
OrderReadSerializer,
OrderMiniSerializer,
OrderCreateSerializer,
OrderItemReadSerializer,
CarrierReadSerializer,
PaymentReadSerializer,
ProductSerializer,
CategorySerializer,
ProductImageSerializer,
DiscountCodeSerializer,
RefundSerializer,
CartSerializer,
CartItemSerializer,
CartItemCreateSerializer,
)
#FIXME: uravit view na nový order serializer
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List Orders (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve Order (public)"),
)
class OrderViewSet(mixins.ListModelMixin, mixins.RetrieveModelMixin, viewsets.GenericViewSet):
queryset = Order.objects.select_related("carrier", "payment").prefetch_related(
"items__product", "discount"
).order_by("-created_at")
permission_classes = [AllowAny]
def get_serializer_class(self):
if self.action == "mini":
return OrderMiniSerializer
if self.action in ["list", "retrieve"]:
return OrderReadSerializer
if self.action == "create":
return OrderCreateSerializer
return OrderReadSerializer
@extend_schema(
tags=["commerce", "public"],
summary="Create Order (public)",
request=OrderCreateSerializer,
responses={201: OrderReadSerializer},
examples=[
OpenApiExample(
"Create order",
value={
"first_name": "Jan",
"last_name": "Novak",
"email": "jan@example.com",
"phone": "+420123456789",
"address": "Ulice 1",
"city": "Praha",
"postal_code": "11000",
"country": "Czech Republic",
"note": "Prosím doručit odpoledne",
"items": [
{"product_id": 1, "quantity": 2},
{"product_id": 7, "quantity": 1},
],
"carrier": {"shipping_method": "store"},
"payment": {"payment_method": "stripe"},
"discount_codes": ["WELCOME10"],
},
)
],
)
def create(self, request, *args, **kwargs):
serializer = OrderCreateSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
order = serializer.save()
out = OrderReadSerializer(order)
return Response(out.data, status=status.HTTP_201_CREATED)
# -- List mini orders -- (public) --
@action(detail=False, methods=["get"], url_path="detail")
@extend_schema(
tags=["commerce", "public"],
summary="List mini orders (public)",
responses={200: OrderMiniSerializer(many=True)},
)
def mini(self, request, *args, **kwargs):
qs = self.get_queryset()
page = self.paginate_queryset(qs)
if page is not None:
ser = OrderMiniSerializer(page, many=True)
return self.get_paginated_response(ser.data)
ser = OrderMiniSerializer(qs, many=True)
return Response(ser.data)
# -- Get order items -- (public) --
@action(detail=True, methods=["get"], url_path="items")
@extend_schema(
tags=["commerce", "public"],
summary="List order items (public)",
responses={200: OrderItemReadSerializer(many=True)},
)
def items(self, request, pk=None):
order = self.get_object()
qs = order.items.select_related("product").all()
ser = OrderItemReadSerializer(qs, many=True)
return Response(ser.data)
# -- Get order carrier -- (public) --
@action(detail=True, methods=["get"], url_path="carrier")
@extend_schema(
tags=["commerce", "public"],
summary="Get order carrier (public)",
responses={200: CarrierReadSerializer},
)
def carrier_detail(self, request, pk=None):
order = self.get_object()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Get order payment -- (public) --
@action(detail=True, methods=["get"], url_path="payment")
@extend_schema(
tags=["commerce", "public"],
summary="Get order payment (public)",
responses={200: PaymentReadSerializer},
)
def payment_detail(self, request, pk=None):
order = self.get_object()
ser = PaymentReadSerializer(order.payment)
return Response(ser.data)
# -- Mark carrier ready to pickup(store) (admin) --
@action(
detail=True,
methods=["patch"],
url_path="carrier/ready-to-pickup",
permission_classes=[IsAdminUser],
)
@extend_schema(
tags=["commerce"],
summary="Mark carrier ready to pickup (admin)",
request=None,
responses={200: CarrierReadSerializer},
)
def carrier_ready_to_pickup(self, request, pk=None):
order = self.get_object()
if not order.carrier:
return Response({"detail": "Carrier not set."}, status=400)
order.carrier.ready_to_pickup()
order.carrier.refresh_from_db()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Start ordering shipping (admin) --
@action(
detail=True,
methods=["patch"],
url_path="carrier/start-ordering-shipping",
permission_classes=[IsAdminUser],
)
@extend_schema(
tags=["commerce"],
summary="Start ordering shipping (admin)",
request=None,
responses={200: CarrierReadSerializer},
)
def carrier_start_ordering_shipping(self, request, pk=None):
order = self.get_object()
if not order.carrier:
return Response({"detail": "Carrier not set."}, status=400)
order.carrier.start_ordering_shipping()
order.carrier.refresh_from_db()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Get user's own orders (authenticated) --
@action(detail=False, methods=['get'], permission_classes=[permissions.IsAuthenticated])
@extend_schema(
tags=["commerce"],
summary="Get authenticated user's orders",
responses={200: OrderMiniSerializer(many=True)},
)
def my_orders(self, request):
orders = Order.objects.filter(user=request.user).order_by('-created_at')
page = self.paginate_queryset(orders)
if page:
serializer = OrderMiniSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = OrderMiniSerializer(orders, many=True)
return Response(serializer.data)
# -- Cancel order (authenticated) --
@action(detail=True, methods=['post'], permission_classes=[permissions.IsAuthenticated])
@extend_schema(
tags=["commerce"],
summary="Cancel order (authenticated user)",
responses={200: {"type": "object", "properties": {"status": {"type": "string"}}}},
)
def cancel(self, request, pk=None):
order = self.get_object()
# Check if user owns order
if order.user != request.user:
return Response({'detail': 'Not authorized'}, status=403)
# Can only cancel if not shipped
if order.carrier and order.carrier.state != Carrier.STATE.PREPARING:
return Response({'detail': 'Cannot cancel shipped order'}, status=400)
order.status = Order.OrderStatus.CANCELLED
order.save()
# Restore stock
for item in order.items.all():
item.product.stock += item.quantity
item.product.save()
return Response({'status': 'cancelled'})
# -- Verify payment (public) --
@action(detail=True, methods=['post'], url_path='payment/verify')
@extend_schema(
tags=["commerce", "public"],
summary="Verify payment status (public)",
responses={200: {"type": "object", "properties": {
"order_id": {"type": "integer"},
"payment_status": {"type": "string"}
}}},
)
def verify_payment(self, request, pk=None):
order = self.get_object()
if order.payment and order.payment.payment_method == Payment.PAYMENT.STRIPE:
if order.payment.stripe:
return Response({
'order_id': order.id,
'payment_status': order.payment.stripe.status
})
return Response({
'order_id': order.id,
'payment_status': 'unknown'
})
# -- Download invoice (public) --
@action(detail=True, methods=['get'], url_path='invoice')
@extend_schema(
tags=["commerce", "public"],
summary="Download invoice PDF (public)",
responses={200: "PDF File"},
)
def download_invoice(self, request, pk=None):
from django.http import FileResponse
order = self.get_object()
if not order.invoice or not order.invoice.pdf_file:
return Response({'detail': 'Invoice not available'}, status=404)
return FileResponse(
order.invoice.pdf_file.open('rb'),
content_type='application/pdf',
as_attachment=True,
filename=f'invoice_{order.invoice.invoice_number}.pdf'
)
def retrieve(self, request, *args, **kwargs):
"""Override retrieve to filter by user if authenticated and not admin"""
order = self.get_object()
# If user is authenticated and not admin, check if they own the order
if request.user.is_authenticated and not request.user.is_staff:
if order.user and order.user != request.user:
return Response({'detail': 'Not found.'}, status=404)
# Also check by email for anonymous orders
elif not order.user and order.email != request.user.email:
return Response({'detail': 'Not found.'}, status=404)
serializer = self.get_serializer(order)
return Response(serializer.data)
# ---------- Permissions helpers ----------
class AdminWriteOnlyOrReadOnly(AllowAny.__class__):
def has_permission(self, request, view):
if request.method in SAFE_METHODS:
return True
return IsAdminUser().has_permission(request, view)
class AdminOnlyForPatchOtherwisePublic(AllowAny.__class__):
def has_permission(self, request, view):
if request.method in SAFE_METHODS or request.method == "POST":
return True
if request.method == "PATCH":
return IsAdminUser().has_permission(request, view)
# default to admin for other unsafe
return IsAdminUser().has_permission(request, view)
# ---------- Public/admin viewsets ----------
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List products (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve product (public)"),
create=extend_schema(tags=["commerce"], summary="Create product (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update product (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace product (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete product (auth required)"),
)
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.all()
serializer_class = ProductSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['category', 'is_active']
search_fields = ["name", "code", "description"]
ordering_fields = ["price", "name", "created_at"]
ordering = ["price"]
@action(detail=True, methods=['get'], url_path='availability')
@extend_schema(
tags=["commerce", "public"],
summary="Check product availability (public)",
responses={200: {"type": "object", "properties": {
"product_id": {"type": "integer"},
"available": {"type": "boolean"},
"stock": {"type": "integer"},
"is_active": {"type": "boolean"}
}}},
)
def check_availability(self, request, pk=None):
product = self.get_object()
return Response({
'product_id': product.id,
'available': product.available,
'stock': product.stock,
'is_active': product.is_active
})
@action(detail=True, methods=['get'], url_path='rating')
@extend_schema(
tags=["commerce", "public"],
summary="Get product rating summary (public)",
responses={200: {"type": "object", "properties": {
"product_id": {"type": "integer"},
"average_rating": {"type": "number"},
"review_count": {"type": "integer"}
}}},
)
def rating_summary(self, request, pk=None):
from django.db.models import Avg, Count
product = self.get_object()
stats = product.reviews.aggregate(
average=Avg('rating'),
count=Count('id')
)
return Response({
'product_id': product.id,
'average_rating': round(stats['average'], 2) if stats['average'] else 0,
'review_count': stats['count']
})
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List categories (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve category (public)"),
create=extend_schema(tags=["commerce"], summary="Create category (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update category (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace category (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete category (auth required)"),
)
class CategoryViewSet(viewsets.ModelViewSet):
queryset = Category.objects.all()
serializer_class = CategorySerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["name", "description"]
ordering_fields = ["name", "id"]
ordering = ["name"]
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List product images (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve product image (public)"),
create=extend_schema(tags=["commerce"], summary="Create product image (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update product image (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace product image (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete product image (auth required)"),
)
class ProductImageViewSet(viewsets.ModelViewSet):
queryset = ProductImage.objects.all()
serializer_class = ProductImageSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["alt_text", "product__name"]
ordering_fields = ["id", "product__name"]
ordering = ["-id"]
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List discount codes (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve discount code (public)"),
create=extend_schema(tags=["commerce"], summary="Create discount code (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update discount code (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace discount code (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete discount code (auth required)"),
)
class DiscountCodeViewSet(viewsets.ModelViewSet):
queryset = DiscountCode.objects.all()
serializer_class = DiscountCodeSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["code", "description"]
ordering_fields = ["percent", "amount", "valid_from", "valid_to"]
ordering = ["-valid_from"]
# -- Refund (Admin only) --
@extend_schema_view(
list=extend_schema(tags=["commerce"], summary="List refunds (admin)"),
retrieve=extend_schema(tags=["commerce"], summary="Retrieve refund (admin)"),
create=extend_schema(tags=["commerce"], summary="Create refund (admin)"),
partial_update=extend_schema(tags=["commerce"], summary="Update refund (admin)"),
update=extend_schema(tags=["commerce"], summary="Replace refund (admin)"),
destroy=extend_schema(tags=["commerce"], summary="Delete refund (admin)"),
)
class RefundViewSet(viewsets.ModelViewSet):
queryset = Refund.objects.select_related("order").all().order_by("-created_at")
serializer_class = RefundSerializer
permission_classes = [IsAdminUser]
class RefundPublicView(APIView):
"""Public endpoint to create and fetch refund objects.
POST: Create a refund given email and invoice_number or order_id.
Returns JSON with refund info, order items, and a base64 PDF payload.
GET: Return a refund object by id (query param `id`).
"""
permission_classes = [AllowAny]
def get(self, request):
rid = request.query_params.get("id")
if not rid:
return Response({"detail": "Missing 'id' query parameter."}, status=status.HTTP_400_BAD_REQUEST)
try:
refund = Refund.objects.select_related("order").get(id=rid)
except Refund.DoesNotExist:
return Response({"detail": "Refund not found."}, status=status.HTTP_404_NOT_FOUND)
order = refund.order
items = []
for it in order.items.select_related('product').all():
items.append({
"product_name": getattr(it.product, "name", "Item"),
"sku": getattr(it.product, "code", None),
"quantity": it.quantity,
})
data = {
"refund": {
"id": refund.id,
"order_id": order.id,
"reason_choice": refund.reason_choice,
"reason_text": refund.reason_text,
"verified": refund.verified,
"created_at": refund.created_at,
},
"order": {
"id": order.id,
"email": order.email,
"created_at": order.created_at,
"items": items,
},
}
return Response(data)
def post(self, request):
serializer = RefundCreatePublicSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
refund = serializer.save()
order = refund.order
# Build items list for response
items = []
for it in order.items.select_related('product').all():
items.append({
"product_name": getattr(it.product, "name", "Item"),
"sku": getattr(it.product, "code", None),
"quantity": it.quantity,
})
# Generate PDF bytes using model helper
pdf_bytes = refund.generate_refund_pdf_for_customer()
pdf_b64 = base64.b64encode(pdf_bytes).decode('ascii')
resp = {
"refund": {
"id": refund.id,
"order_id": order.id,
"reason_choice": refund.reason_choice,
"reason_text": refund.reason_text,
"verified": refund.verified,
"created_at": refund.created_at,
},
"order": {
"id": order.id,
"email": order.email,
"created_at": order.created_at,
"items": items,
},
"pdf": {
"filename": f"refund_{refund.id}.pdf",
"content_type": "application/pdf",
"base64": pdf_b64,
},
}
return Response(resp, status=status.HTTP_201_CREATED)
class ReviewPostPublicView(APIView):
"""Public endpoint to create a review for a product."""
permission_classes = [permissions.IsAuthenticated]
@extend_schema(
tags=["commerce", "public"],
summary="Create a product review (public)",
request=ReviewSerializerPublic,
responses={201: ReviewSerializerPublic},
)
def post(self, request):
serializer = ReviewSerializerPublic(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
review = serializer.save()
out = ReviewSerializerPublic(review)
return Response(out.data, status=status.HTTP_201_CREATED)
# readonly public reviews, admin can patch
class ReviewPublicViewSet(viewsets.ModelViewSet):
queryset = Review.objects.all()
serializer_class = ReviewSerializerPublic
permission_classes = [AdminOnlyForPatchOtherwisePublic]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["product__name", "user__username", "comment"]
ordering_fields = ["rating", "created_at"]
ordering = ["-created_at"]
@action(detail=False, methods=['get'], url_path='product/(?P<product_id>[^/.]+)')
@extend_schema(
tags=["commerce", "public"],
summary="Get reviews for specific product (public)",
responses={200: ReviewSerializerPublic(many=True)},
)
def by_product(self, request, product_id=None):
reviews = self.get_queryset().filter(product_id=product_id)
page = self.paginate_queryset(reviews)
if page:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(reviews, many=True)
return Response(serializer.data)
# ---------- CART MANAGEMENT ----------
@extend_schema_view(
list=extend_schema(tags=["commerce", "cart"], summary="Get current cart (public)"),
retrieve=extend_schema(tags=["commerce", "cart"], summary="Get cart by ID (public)"),
)
class CartViewSet(viewsets.GenericViewSet):
"""
Shopping cart management for authenticated and anonymous users.
Anonymous users use session_key, authenticated users use their user account.
"""
queryset = Cart.objects.prefetch_related('items__product')
serializer_class = CartSerializer
permission_classes = [AllowAny]
def get_or_create_cart(self, request):
"""Get or create cart for current user/session"""
if request.user.is_authenticated:
cart, _ = Cart.objects.get_or_create(user=request.user)
else:
# Use session for anonymous users
session_key = request.session.session_key
if not session_key:
request.session.create()
session_key = request.session.session_key
cart, _ = Cart.objects.get_or_create(session_key=session_key)
return cart
@action(detail=False, methods=['get'], url_path='current')
@extend_schema(
tags=["commerce", "cart"],
summary="Get current user's cart",
responses={200: CartSerializer},
)
def current(self, request):
"""Get the current user's cart"""
cart = self.get_or_create_cart(request)
serializer = CartSerializer(cart)
return Response(serializer.data)
@action(detail=False, methods=['post'], url_path='add')
@extend_schema(
tags=["commerce", "cart"],
summary="Add item to cart",
request=CartItemCreateSerializer,
responses={200: CartSerializer},
)
def add_item(self, request):
"""Add a product to the cart or update quantity if already exists"""
serializer = CartItemCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
cart = self.get_or_create_cart(request)
product_id = serializer.validated_data['product_id']
quantity = serializer.validated_data['quantity']
product = Product.objects.get(pk=product_id)
# Check if item already in cart
cart_item, created = CartItem.objects.get_or_create(
cart=cart,
product=product,
defaults={'quantity': quantity}
)
if not created:
# Update quantity if item already exists
cart_item.quantity += quantity
cart_item.save()
cart.refresh_from_db()
return Response(CartSerializer(cart).data)
@action(detail=False, methods=['patch'], url_path='items/(?P<item_id>[^/.]+)')
@extend_schema(
tags=["commerce", "cart"],
summary="Update cart item quantity",
request={"type": "object", "properties": {"quantity": {"type": "integer"}}},
responses={200: CartSerializer},
)
def update_item(self, request, item_id=None):
"""Update quantity of a cart item"""
cart = self.get_or_create_cart(request)
try:
cart_item = CartItem.objects.get(pk=item_id, cart=cart)
except CartItem.DoesNotExist:
return Response({'detail': 'Cart item not found'}, status=404)
quantity = request.data.get('quantity')
if quantity is None:
return Response({'detail': 'Quantity is required'}, status=400)
try:
quantity = int(quantity)
if quantity < 1:
return Response({'detail': 'Quantity must be at least 1'}, status=400)
except ValueError:
return Response({'detail': 'Invalid quantity'}, status=400)
cart_item.quantity = quantity
cart_item.save()
cart.refresh_from_db()
return Response(CartSerializer(cart).data)
@action(detail=False, methods=['delete'], url_path='items/(?P<item_id>[^/.]+)')
@extend_schema(
tags=["commerce", "cart"],
summary="Remove item from cart",
responses={200: CartSerializer},
)
def remove_item(self, request, item_id=None):
"""Remove an item from the cart"""
cart = self.get_or_create_cart(request)
try:
cart_item = CartItem.objects.get(pk=item_id, cart=cart)
cart_item.delete()
except CartItem.DoesNotExist:
return Response({'detail': 'Cart item not found'}, status=404)
cart.refresh_from_db()
return Response(CartSerializer(cart).data)
@action(detail=False, methods=['post'], url_path='clear')
@extend_schema(
tags=["commerce", "cart"],
summary="Clear all items from cart",
responses={200: CartSerializer},
)
def clear(self, request):
"""Remove all items from the cart"""
cart = self.get_or_create_cart(request)
cart.items.all().delete()
cart.refresh_from_db()
return Response(CartSerializer(cart).data)