Files
vontor-cz/backend/commerce/views.py
Brunobrno 98426f8b05 Add product review model, serializer, and API endpoint
Introduces a Review model for product reviews, including rating and comment fields. Adds a public serializer and a ModelViewSet for reviews with search and ordering capabilities. Also updates the frontend API client to use the correct token refresh endpoint and improves FormData handling.
2026-01-14 00:10:46 +01:00

429 lines
14 KiB
Python

from rest_framework.views import APIView
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status
import base64
from .models import Refund, Review
from .serializers import RefundCreatePublicSerializer, ReviewSerializerPublic
from django.db import transaction
from rest_framework import viewsets, mixins, status
from rest_framework.permissions import AllowAny, IsAdminUser, SAFE_METHODS
from rest_framework.decorators import action
from rest_framework.response import Response
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiExample
from rest_framework import filters, permissions
from .models import (
Order,
OrderItem,
Carrier,
Payment,
Product,
DiscountCode,
Category,
ProductImage,
Refund,
)
from .serializers import (
OrderReadSerializer,
OrderMiniSerializer,
OrderCreateSerializer,
OrderItemReadSerializer,
CarrierReadSerializer,
PaymentReadSerializer,
ProductSerializer,
CategorySerializer,
ProductImageSerializer,
DiscountCodeSerializer,
RefundSerializer,
)
#FIXME: uravit view na nový order serializer
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List Orders (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve Order (public)"),
)
class OrderViewSet(mixins.ListModelMixin, mixins.RetrieveModelMixin, viewsets.GenericViewSet):
queryset = Order.objects.select_related("carrier", "payment").prefetch_related(
"items__product", "discount"
).order_by("-created_at")
permission_classes = [AllowAny]
def get_serializer_class(self):
if self.action == "mini":
return OrderMiniSerializer
if self.action in ["list", "retrieve"]:
return OrderReadSerializer
if self.action == "create":
return OrderCreateSerializer
return OrderReadSerializer
@extend_schema(
tags=["commerce", "public"],
summary="Create Order (public)",
request=OrderCreateSerializer,
responses={201: OrderReadSerializer},
examples=[
OpenApiExample(
"Create order",
value={
"first_name": "Jan",
"last_name": "Novak",
"email": "jan@example.com",
"phone": "+420123456789",
"address": "Ulice 1",
"city": "Praha",
"postal_code": "11000",
"country": "Czech Republic",
"note": "Prosím doručit odpoledne",
"items": [
{"product_id": 1, "quantity": 2},
{"product_id": 7, "quantity": 1},
],
"carrier": {"shipping_method": "store"},
"payment": {"payment_method": "stripe"},
"discount_codes": ["WELCOME10"],
},
)
],
)
def create(self, request, *args, **kwargs):
serializer = OrderCreateSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
order = serializer.save()
out = OrderReadSerializer(order)
return Response(out.data, status=status.HTTP_201_CREATED)
# -- List mini orders -- (public) --
@action(detail=False, methods=["get"], url_path="detail")
@extend_schema(
tags=["commerce", "public"],
summary="List mini orders (public)",
responses={200: OrderMiniSerializer(many=True)},
)
def mini(self, request, *args, **kwargs):
qs = self.get_queryset()
page = self.paginate_queryset(qs)
if page is not None:
ser = OrderMiniSerializer(page, many=True)
return self.get_paginated_response(ser.data)
ser = OrderMiniSerializer(qs, many=True)
return Response(ser.data)
# -- Get order items -- (public) --
@action(detail=True, methods=["get"], url_path="items")
@extend_schema(
tags=["commerce", "public"],
summary="List order items (public)",
responses={200: OrderItemReadSerializer(many=True)},
)
def items(self, request, pk=None):
order = self.get_object()
qs = order.items.select_related("product").all()
ser = OrderItemReadSerializer(qs, many=True)
return Response(ser.data)
# -- Get order carrier -- (public) --
@action(detail=True, methods=["get"], url_path="carrier")
@extend_schema(
tags=["commerce", "public"],
summary="Get order carrier (public)",
responses={200: CarrierReadSerializer},
)
def carrier_detail(self, request, pk=None):
order = self.get_object()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Get order payment -- (public) --
@action(detail=True, methods=["get"], url_path="payment")
@extend_schema(
tags=["commerce", "public"],
summary="Get order payment (public)",
responses={200: PaymentReadSerializer},
)
def payment_detail(self, request, pk=None):
order = self.get_object()
ser = PaymentReadSerializer(order.payment)
return Response(ser.data)
# -- Mark carrier ready to pickup(store) (admin) --
@action(
detail=True,
methods=["patch"],
url_path="carrier/ready-to-pickup",
permission_classes=[IsAdminUser],
)
@extend_schema(
tags=["commerce"],
summary="Mark carrier ready to pickup (admin)",
request=None,
responses={200: CarrierReadSerializer},
)
def carrier_ready_to_pickup(self, request, pk=None):
order = self.get_object()
if not order.carrier:
return Response({"detail": "Carrier not set."}, status=400)
order.carrier.ready_to_pickup()
order.carrier.refresh_from_db()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Start ordering shipping (admin) --
@action(
detail=True,
methods=["patch"],
url_path="carrier/start-ordering-shipping",
permission_classes=[IsAdminUser],
)
@extend_schema(
tags=["commerce"],
summary="Start ordering shipping (admin)",
request=None,
responses={200: CarrierReadSerializer},
)
def carrier_start_ordering_shipping(self, request, pk=None):
order = self.get_object()
if not order.carrier:
return Response({"detail": "Carrier not set."}, status=400)
order.carrier.start_ordering_shipping()
order.carrier.refresh_from_db()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Invoice PDF for Order --
class OrderInvoice(viewsets.ViewSet):
@action(detail=True, methods=["get"], url_path="generate-invoice")
@extend_schema(
tags=["commerce", "public"],
summary="Get Invoice PDF for Order (public)",
responses={200: "PDF File"},
)
def get(order_id, request):
try:
order = Order.objects.get(pk=order_id)
except Order.DoesNotExist:
return Response({"detail": "Order not found."}, status=status.HTTP_404_NOT_FOUND)
return Response(order.invoice.pdf_file, content_type='application/pdf')
# ---------- Permissions helpers ----------
class AdminWriteOnlyOrReadOnly(AllowAny.__class__):
def has_permission(self, request, view):
if request.method in SAFE_METHODS:
return True
return IsAdminUser().has_permission(request, view)
class AdminOnlyForPatchOtherwisePublic(AllowAny.__class__):
def has_permission(self, request, view):
if request.method in SAFE_METHODS or request.method == "POST":
return True
if request.method == "PATCH":
return IsAdminUser().has_permission(request, view)
# default to admin for other unsafe
return IsAdminUser().has_permission(request, view)
# ---------- Public/admin viewsets ----------
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List products (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve product (public)"),
create=extend_schema(tags=["commerce"], summary="Create product (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update product (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace product (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete product (auth required)"),
)
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.all()
serializer_class = ProductSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["name", "code"]
ordering_fields = ["price", "name", "created_at"]
ordering = ["price"]
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List categories (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve category (public)"),
create=extend_schema(tags=["commerce"], summary="Create category (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update category (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace category (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete category (auth required)"),
)
class CategoryViewSet(viewsets.ModelViewSet):
queryset = Category.objects.all()
serializer_class = CategorySerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["name", "description"]
ordering_fields = ["name", "id"]
ordering = ["name"]
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List product images (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve product image (public)"),
create=extend_schema(tags=["commerce"], summary="Create product image (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update product image (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace product image (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete product image (auth required)"),
)
class ProductImageViewSet(viewsets.ModelViewSet):
queryset = ProductImage.objects.all()
serializer_class = ProductImageSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["alt_text", "product__name"]
ordering_fields = ["id", "product__name"]
ordering = ["-id"]
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List discount codes (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve discount code (public)"),
create=extend_schema(tags=["commerce"], summary="Create discount code (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update discount code (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace discount code (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete discount code (auth required)"),
)
class DiscountCodeViewSet(viewsets.ModelViewSet):
queryset = DiscountCode.objects.all()
serializer_class = DiscountCodeSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["code", "description"]
ordering_fields = ["percent", "amount", "valid_from", "valid_to"]
ordering = ["-valid_from"]
# -- Refund (Admin only) --
@extend_schema_view(
list=extend_schema(tags=["commerce"], summary="List refunds (admin)"),
retrieve=extend_schema(tags=["commerce"], summary="Retrieve refund (admin)"),
create=extend_schema(tags=["commerce"], summary="Create refund (admin)"),
partial_update=extend_schema(tags=["commerce"], summary="Update refund (admin)"),
update=extend_schema(tags=["commerce"], summary="Replace refund (admin)"),
destroy=extend_schema(tags=["commerce"], summary="Delete refund (admin)"),
)
class RefundViewSet(viewsets.ModelViewSet):
queryset = Refund.objects.select_related("order").all().order_by("-created_at")
serializer_class = RefundSerializer
permission_classes = [IsAdminUser]
class RefundPublicView(APIView):
"""Public endpoint to create and fetch refund objects.
POST: Create a refund given email and invoice_number or order_id.
Returns JSON with refund info, order items, and a base64 PDF payload.
GET: Return a refund object by id (query param `id`).
"""
permission_classes = [AllowAny]
def get(self, request):
rid = request.query_params.get("id")
if not rid:
return Response({"detail": "Missing 'id' query parameter."}, status=status.HTTP_400_BAD_REQUEST)
try:
refund = Refund.objects.select_related("order").get(id=rid)
except Refund.DoesNotExist:
return Response({"detail": "Refund not found."}, status=status.HTTP_404_NOT_FOUND)
order = refund.order
items = []
for it in order.items.select_related('product').all():
items.append({
"product_name": getattr(it.product, "name", "Item"),
"sku": getattr(it.product, "code", None),
"quantity": it.quantity,
})
data = {
"refund": {
"id": refund.id,
"order_id": order.id,
"reason_choice": refund.reason_choice,
"reason_text": refund.reason_text,
"verified": refund.verified,
"created_at": refund.created_at,
},
"order": {
"id": order.id,
"email": order.email,
"created_at": order.created_at,
"items": items,
},
}
return Response(data)
def post(self, request):
serializer = RefundCreatePublicSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
refund = serializer.save()
order = refund.order
# Build items list for response
items = []
for it in order.items.select_related('product').all():
items.append({
"product_name": getattr(it.product, "name", "Item"),
"sku": getattr(it.product, "code", None),
"quantity": it.quantity,
})
# Generate PDF bytes using model helper
pdf_bytes = refund.generate_refund_pdf_for_customer()
pdf_b64 = base64.b64encode(pdf_bytes).decode('ascii')
resp = {
"refund": {
"id": refund.id,
"order_id": order.id,
"reason_choice": refund.reason_choice,
"reason_text": refund.reason_text,
"verified": refund.verified,
"created_at": refund.created_at,
},
"order": {
"id": order.id,
"email": order.email,
"created_at": order.created_at,
"items": items,
},
"pdf": {
"filename": f"refund_{refund.id}.pdf",
"content_type": "application/pdf",
"base64": pdf_b64,
},
}
return Response(resp, status=status.HTTP_201_CREATED)
class ReviewPublicViewSet(viewsets.ModelViewSet):
queryset = Review.objects.all()
serializer_class = ReviewSerializerPublic
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["product__name", "user__username", "comment"]
ordering_fields = ["rating", "created_at"]
ordering = ["-created_at"]