Files
vontor-cz/backend/commerce/views.py
Brunobrno e86839f2da Add public refund creation endpoint and PDF generation
Introduces RefundPublicView for public refund creation via email and invoice/order ID, returning refund info and a base64-encoded PDF slip. Adds RefundCreatePublicSerializer for validation and creation, implements PDF generation in Refund model, and provides a customer-facing HTML template for the return slip. Updates URLs to expose the new endpoint.
2025-11-19 00:53:37 +01:00

494 lines
16 KiB
Python

from rest_framework.views import APIView
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status
import base64
from .models import Refund
from .serializers import RefundCreatePublicSerializer
from django.db import transaction
from rest_framework import viewsets, mixins, status
from rest_framework.permissions import AllowAny, IsAdminUser, SAFE_METHODS
from rest_framework.decorators import action
from rest_framework.response import Response
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiExample
from rest_framework import filters, permissions
from .models import (
Order,
OrderItem,
Carrier,
Payment,
Product,
DiscountCode,
Category,
ProductImage,
Refund,
)
from .serializers import (
OrderReadSerializer,
OrderMiniSerializer,
OrderCreateSerializer,
OrderItemReadSerializer,
CarrierReadSerializer,
PaymentReadSerializer,
ProductSerializer,
CategorySerializer,
ProductImageSerializer,
DiscountCodeSerializer,
RefundSerializer,
)
@extend_schema_view(
list=extend_schema(tags=["Orders"], summary="List Orders (public)"),
retrieve=extend_schema(tags=["Orders"], summary="Retrieve Order (public)"),
)
class OrderViewSet(mixins.ListModelMixin, mixins.RetrieveModelMixin, viewsets.GenericViewSet):
queryset = Order.objects.select_related("carrier", "payment").prefetch_related(
"items__product", "discount"
).order_by("-created_at")
permission_classes = [AllowAny]
def get_serializer_class(self):
if self.action == "mini":
return OrderMiniSerializer
if self.action in ["list", "retrieve"]:
return OrderReadSerializer
if self.action == "create":
return OrderCreateSerializer
return OrderReadSerializer
@extend_schema(
tags=["Orders"],
summary="Create Order (public)",
request=OrderCreateSerializer,
responses={201: OrderReadSerializer},
examples=[
OpenApiExample(
"Create order",
value={
"first_name": "Jan",
"last_name": "Novak",
"email": "jan@example.com",
"phone": "+420123456789",
"address": "Ulice 1",
"city": "Praha",
"postal_code": "11000",
"country": "Czech Republic",
"note": "Prosím doručit odpoledne",
"items": [
{"product_id": 1, "quantity": 2},
{"product_id": 7, "quantity": 1},
],
"carrier": {"shipping_method": "store"},
"payment": {"payment_method": "stripe"},
"discount_codes": ["WELCOME10"],
},
)
],
)
def create(self, request, *args, **kwargs):
serializer = OrderCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
user = request.user
#VELMI DŮLEŽITELÉ: vše vytvořit v transakci, aby se nepřidávaly neúplné objednávky
with transaction.atomic():
# Create base order (customer details only for now)
order = Order.objects.create(
user=user if getattr(user, "is_authenticated", False) else None,
first_name=data["first_name"],
last_name=data["last_name"],
email=data["email"],
phone=data.get("phone", ""),
address=data["address"],
city=data["city"],
postal_code=data["postal_code"],
country=data.get("country", "Czech Republic"),
note=data.get("note", ""),
)
# Carrier
carrier_payload = data["carrier"]
carrier = Carrier.objects.create(
shipping_method=carrier_payload["shipping_method"],
weight=carrier_payload.get("weight"),
)
order.carrier = carrier
order.save(update_fields=["carrier", "updated_at"]) # recalc later after items
# Items
items_payload = data["items"]
order_items = []
for item in items_payload:
product = Product.objects.get(pk=item["product_id"]) # raises 404 if missing
qty = int(item["quantity"])
order_items.append(OrderItem(order=order, product=product, quantity=qty))
OrderItem.objects.bulk_create(order_items)
# Discount codes (optional)
codes = data.get("discount_codes") or []
if codes:
discounts = list(DiscountCode.objects.filter(code__in=codes))
order.discount.add(*discounts)
# Recalculate now that items/discounts/carrier are linked
order.save()
# Payment and validation
pay_payload = data["payment"]
payment_method = pay_payload["payment_method"]
# Validate combos (mirror of Payment.save but here we have order)
if payment_method == Payment.PAYMENT.SHOP and order.carrier.shipping_method != Carrier.SHIPPING.STORE:
return Response(
{"payment": "Platba v obchodě je možná pouze pro osobní odběr."},
status=status.HTTP_400_BAD_REQUEST,
)
if payment_method == Payment.PAYMENT.CASH_ON_DELIVERY and order.carrier.shipping_method == Carrier.SHIPPING.STORE:
return Response(
{"payment": "Dobírka není možná pro osobní odběr."},
status=status.HTTP_400_BAD_REQUEST,
)
# Create payment WITHOUT triggering Payment.save (which expects reverse link first)
payment = Payment(payment_method=payment_method)
# Bypass custom save by bulk_create
Payment.objects.bulk_create([payment])
order.payment = payment
order.save(update_fields=["payment", "updated_at"])
# If Stripe, create StripePayment now and attach
if payment_method == Payment.PAYMENT.STRIPE:
from thirdparty.stripe.models import StripePayment
stripe_obj = StripePayment.objects.create(amount=order.total_price)
payment.stripe = stripe_obj
payment.save(update_fields=["stripe"])
out = self.get_serializer(order)
return Response(out.data, status=status.HTTP_201_CREATED)
# -- List mini orders -- (public) --
@action(detail=False, methods=["get"], url_path="detail")
@extend_schema(
tags=["Orders"],
summary="List mini orders (public)",
responses={200: OrderMiniSerializer(many=True)},
)
def mini(self, request, *args, **kwargs):
qs = self.get_queryset()
page = self.paginate_queryset(qs)
if page is not None:
ser = OrderMiniSerializer(page, many=True)
return self.get_paginated_response(ser.data)
ser = OrderMiniSerializer(qs, many=True)
return Response(ser.data)
# -- Get order items -- (public) --
@action(detail=True, methods=["get"], url_path="items")
@extend_schema(
tags=["Orders"],
summary="List order items (public)",
responses={200: OrderItemReadSerializer(many=True)},
)
def items(self, request, pk=None):
order = self.get_object()
qs = order.items.select_related("product").all()
ser = OrderItemReadSerializer(qs, many=True)
return Response(ser.data)
# -- Get order carrier -- (public) --
@action(detail=True, methods=["get"], url_path="carrier")
@extend_schema(
tags=["Orders"],
summary="Get order carrier (public)",
responses={200: CarrierReadSerializer},
)
def carrier_detail(self, request, pk=None):
order = self.get_object()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Get order payment -- (public) --
@action(detail=True, methods=["get"], url_path="payment")
@extend_schema(
tags=["Orders"],
summary="Get order payment (public)",
responses={200: PaymentReadSerializer},
)
def payment_detail(self, request, pk=None):
order = self.get_object()
ser = PaymentReadSerializer(order.payment)
return Response(ser.data)
# -- Mark carrier ready to pickup(store) (admin) --
@action(
detail=True,
methods=["patch"],
url_path="carrier/ready-to-pickup",
permission_classes=[IsAdminUser],
)
@extend_schema(
tags=["Orders"],
summary="Mark carrier ready to pickup (admin)",
request=None,
responses={200: CarrierReadSerializer},
)
def carrier_ready_to_pickup(self, request, pk=None):
order = self.get_object()
if not order.carrier:
return Response({"detail": "Carrier not set."}, status=400)
order.carrier.ready_to_pickup()
order.carrier.refresh_from_db()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Start ordering shipping (admin) --
@action(
detail=True,
methods=["patch"],
url_path="carrier/start-ordering-shipping",
permission_classes=[IsAdminUser],
)
@extend_schema(
tags=["Orders"],
summary="Start ordering shipping (admin)",
request=None,
responses={200: CarrierReadSerializer},
)
def carrier_start_ordering_shipping(self, request, pk=None):
order = self.get_object()
if not order.carrier:
return Response({"detail": "Carrier not set."}, status=400)
order.carrier.start_ordering_shipping()
order.carrier.refresh_from_db()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Invoice PDF for Order --
class OrderInvoice(viewsets.ViewSet):
@action(detail=True, methods=["get"], url_path="generate-invoice")
@extend_schema(
tags=["Orders"],
summary="Get Invoice PDF for Order (public)",
responses={200: "PDF File"},
)
def get(order_id, request):
try:
order = Order.objects.get(pk=order_id)
except Order.DoesNotExist:
return Response({"detail": "Order not found."}, status=status.HTTP_404_NOT_FOUND)
return Response(order.invoice.pdf_file, content_type='application/pdf')
# ---------- Permissions helpers ----------
class AdminWriteOnlyOrReadOnly(AllowAny.__class__):
def has_permission(self, request, view):
if request.method in SAFE_METHODS:
return True
return IsAdminUser().has_permission(request, view)
class AdminOnlyForPatchOtherwisePublic(AllowAny.__class__):
def has_permission(self, request, view):
if request.method in SAFE_METHODS or request.method == "POST":
return True
if request.method == "PATCH":
return IsAdminUser().has_permission(request, view)
# default to admin for other unsafe
return IsAdminUser().has_permission(request, view)
# ---------- Public/admin viewsets ----------
@extend_schema_view(
list=extend_schema(tags=["Products"], summary="List products (public)"),
retrieve=extend_schema(tags=["Products"], summary="Retrieve product (public)"),
create=extend_schema(tags=["Products"], summary="Create product (auth required)"),
partial_update=extend_schema(tags=["Products"], summary="Update product (auth required)"),
update=extend_schema(tags=["Products"], summary="Replace product (auth required)"),
destroy=extend_schema(tags=["Products"], summary="Delete product (auth required)"),
)
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.all()
serializer_class = ProductSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["name", "code"]
ordering_fields = ["price", "name", "created_at"]
ordering = ["price"]
@extend_schema_view(
list=extend_schema(tags=["Categories"], summary="List categories (public)"),
retrieve=extend_schema(tags=["Categories"], summary="Retrieve category (public)"),
create=extend_schema(tags=["Categories"], summary="Create category (auth required)"),
partial_update=extend_schema(tags=["Categories"], summary="Update category (auth required)"),
update=extend_schema(tags=["Categories"], summary="Replace category (auth required)"),
destroy=extend_schema(tags=["Categories"], summary="Delete category (auth required)"),
)
class CategoryViewSet(viewsets.ModelViewSet):
queryset = Category.objects.all()
serializer_class = CategorySerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["name", "description"]
ordering_fields = ["name", "id"]
ordering = ["name"]
@extend_schema_view(
list=extend_schema(tags=["Product Images"], summary="List product images (public)"),
retrieve=extend_schema(tags=["Product Images"], summary="Retrieve product image (public)"),
create=extend_schema(tags=["Product Images"], summary="Create product image (auth required)"),
partial_update=extend_schema(tags=["Product Images"], summary="Update product image (auth required)"),
update=extend_schema(tags=["Product Images"], summary="Replace product image (auth required)"),
destroy=extend_schema(tags=["Product Images"], summary="Delete product image (auth required)"),
)
class ProductImageViewSet(viewsets.ModelViewSet):
queryset = ProductImage.objects.all()
serializer_class = ProductImageSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["alt_text", "product__name"]
ordering_fields = ["id", "product__name"]
ordering = ["-id"]
@extend_schema_view(
list=extend_schema(tags=["Discount Codes"], summary="List discount codes (public)"),
retrieve=extend_schema(tags=["Discount Codes"], summary="Retrieve discount code (public)"),
create=extend_schema(tags=["Discount Codes"], summary="Create discount code (auth required)"),
partial_update=extend_schema(tags=["Discount Codes"], summary="Update discount code (auth required)"),
update=extend_schema(tags=["Discount Codes"], summary="Replace discount code (auth required)"),
destroy=extend_schema(tags=["Discount Codes"], summary="Delete discount code (auth required)"),
)
class DiscountCodeViewSet(viewsets.ModelViewSet):
queryset = DiscountCode.objects.all()
serializer_class = DiscountCodeSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["code", "description"]
ordering_fields = ["percent", "amount", "valid_from", "valid_to"]
ordering = ["-valid_from"]
# -- Refund (Admin only) --
@extend_schema_view(
list=extend_schema(tags=["Refunds"], summary="List refunds (admin)"),
retrieve=extend_schema(tags=["Refunds"], summary="Retrieve refund (admin)"),
create=extend_schema(tags=["Refunds"], summary="Create refund (admin)"),
partial_update=extend_schema(tags=["Refunds"], summary="Update refund (admin)"),
update=extend_schema(tags=["Refunds"], summary="Replace refund (admin)"),
destroy=extend_schema(tags=["Refunds"], summary="Delete refund (admin)"),
)
class RefundViewSet(viewsets.ModelViewSet):
queryset = Refund.objects.select_related("order").all().order_by("-created_at")
serializer_class = RefundSerializer
permission_classes = [IsAdminUser]
class RefundPublicView(APIView):
"""Public endpoint to create and fetch refund objects.
POST: Create a refund given email and invoice_number or order_id.
Returns JSON with refund info, order items, and a base64 PDF payload.
GET: Return a refund object by id (query param `id`).
"""
permission_classes = [AllowAny]
def get(self, request):
rid = request.query_params.get("id")
if not rid:
return Response({"detail": "Missing 'id' query parameter."}, status=status.HTTP_400_BAD_REQUEST)
try:
refund = Refund.objects.select_related("order").get(id=rid)
except Refund.DoesNotExist:
return Response({"detail": "Refund not found."}, status=status.HTTP_404_NOT_FOUND)
order = refund.order
items = []
for it in order.items.select_related('product').all():
items.append({
"product_name": getattr(it.product, "name", "Item"),
"sku": getattr(it.product, "code", None),
"quantity": it.quantity,
})
data = {
"refund": {
"id": refund.id,
"order_id": order.id,
"reason_choice": refund.reason_choice,
"reason_text": refund.reason_text,
"verified": refund.verified,
"created_at": refund.created_at,
},
"order": {
"id": order.id,
"email": order.email,
"created_at": order.created_at,
"items": items,
},
}
return Response(data)
def post(self, request):
serializer = RefundCreatePublicSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
refund = serializer.save()
order = refund.order
# Build items list for response
items = []
for it in order.items.select_related('product').all():
items.append({
"product_name": getattr(it.product, "name", "Item"),
"sku": getattr(it.product, "code", None),
"quantity": it.quantity,
})
# Generate PDF bytes using model helper
pdf_bytes = refund.generate_refund_pdf_for_customer()
pdf_b64 = base64.b64encode(pdf_bytes).decode('ascii')
resp = {
"refund": {
"id": refund.id,
"order_id": order.id,
"reason_choice": refund.reason_choice,
"reason_text": refund.reason_text,
"verified": refund.verified,
"created_at": refund.created_at,
},
"order": {
"id": order.id,
"email": order.email,
"created_at": order.created_at,
"items": items,
},
"pdf": {
"filename": f"refund_{refund.id}.pdf",
"content_type": "application/pdf",
"base64": pdf_b64,
},
}
return Response(resp, status=status.HTTP_201_CREATED)