Files
vontor-cz/backend/commerce/views.py
2026-01-25 23:19:56 +01:00

1081 lines
38 KiB
Python

from rest_framework.views import APIView
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status
import base64
from datetime import datetime
from django.utils.dateparse import parse_datetime
from .models import Refund, Review
from .serializers import RefundCreatePublicSerializer, ReviewSerializerPublic
from .analytics import (
SalesAnalytics, ProductAnalytics, CustomerAnalytics,
ShippingAnalytics, ReviewAnalytics, AnalyticsAggregator,
get_predefined_date_ranges
)
from django.db import transaction, models
from rest_framework import viewsets, mixins, status
from rest_framework.permissions import AllowAny, IsAdminUser, SAFE_METHODS
from rest_framework.decorators import action
from rest_framework.response import Response
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiExample
from rest_framework import filters, permissions
from django_filters.rest_framework import DjangoFilterBackend
from account.permissions import AdminWriteOnlyOrReadOnly, AdminOnlyForPatchOtherwisePublic
from .models import (
Order,
OrderItem,
Carrier,
Payment,
Product,
DiscountCode,
Category,
ProductImage,
Refund,
Cart,
CartItem,
Wishlist,
)
from .serializers import (
OrderReadSerializer,
OrderMiniSerializer,
OrderCreateSerializer,
OrderItemReadSerializer,
CarrierReadSerializer,
PaymentReadSerializer,
ProductSerializer,
CategorySerializer,
ProductImageSerializer,
DiscountCodeSerializer,
RefundSerializer,
CartSerializer,
CartItemSerializer,
CartItemCreateSerializer,
WishlistSerializer,
WishlistProductActionSerializer,
)
#FIXME: uravit view na nový order serializer
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List Orders (public - anonymous orders, authenticated - user orders, admin - all orders)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve Order (public - anonymous orders, authenticated - user orders, admin - all orders)"),
create=extend_schema(tags=["commerce", "public"], summary="Create Order (public)"),
)
class OrderViewSet(mixins.ListModelMixin, mixins.RetrieveModelMixin, mixins.CreateModelMixin, viewsets.GenericViewSet):
queryset = Order.objects.select_related("carrier", "payment").prefetch_related(
"items__product", "discount"
).order_by("-created_at")
permission_classes = [permissions.AllowAny]
def get_permissions(self):
"""Allow public order access (for anonymous orders) and creation, require auth for user orders"""
if self.action in ['create', 'list', 'retrieve', 'mini', 'items', 'carrier_detail', 'payment_detail', 'verify_payment', 'download_invoice']:
return [permissions.AllowAny()]
return super().get_permissions()
def get_queryset(self):
"""Filter orders by user - admins see all, authenticated users see own orders, anonymous users see anonymous orders only"""
queryset = super().get_queryset()
# Admin users can see all orders
if self.request.user.is_authenticated and getattr(self.request.user, 'role', None) == 'admin':
return queryset
# Authenticated users see only their own orders (both user-linked and email-matched anonymous orders)
if self.request.user.is_authenticated:
return queryset.filter(
models.Q(user=self.request.user) |
models.Q(user__isnull=True, email=self.request.user.email)
)
# Anonymous users can only see anonymous orders (no user linked)
return queryset.filter(user__isnull=True)
def get_serializer_class(self):
if self.action == "mini":
return OrderMiniSerializer
if self.action in ["list", "retrieve"]:
return OrderReadSerializer
if self.action == "create":
return OrderCreateSerializer
return OrderReadSerializer
# Order creation is now handled by CreateModelMixin with proper serializer
# -- List mini orders -- (public) --
@action(detail=False, methods=["get"], url_path="detail")
@extend_schema(
tags=["commerce", "public"],
summary="List mini orders (public - anonymous orders, authenticated - user orders)",
responses={200: OrderMiniSerializer(many=True)},
)
def mini(self, request, *args, **kwargs):
qs = self.get_queryset() # Already filtered by user/anonymous status
page = self.paginate_queryset(qs)
if page is not None:
ser = OrderMiniSerializer(page, many=True)
return self.get_paginated_response(ser.data)
ser = OrderMiniSerializer(qs, many=True)
return Response(ser.data)
# -- Get order items -- (public) --
@action(detail=True, methods=["get"], url_path="items")
@extend_schema(
tags=["commerce", "public"],
summary="List order items (public - anonymous orders, authenticated - user orders)",
responses={200: OrderItemReadSerializer(many=True)},
)
def items(self, request, pk=None):
order = self.get_object() # get_object respects get_queryset filtering
qs = order.items.select_related("product").all()
ser = OrderItemReadSerializer(qs, many=True)
return Response(ser.data)
# -- Get order carrier -- (public) --
@action(detail=True, methods=["get"], url_path="carrier")
@extend_schema(
tags=["commerce", "public"],
summary="Get order carrier (public - anonymous orders, authenticated - user orders)",
responses={200: CarrierReadSerializer},
)
def carrier_detail(self, request, pk=None):
order = self.get_object() # get_object respects get_queryset filtering
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Get order payment -- (public) --
@action(detail=True, methods=["get"], url_path="payment")
@extend_schema(
tags=["commerce", "public"],
summary="Get order payment (public - anonymous orders, authenticated - user orders)",
responses={200: PaymentReadSerializer},
)
def payment_detail(self, request, pk=None):
order = self.get_object() # get_object respects get_queryset filtering
ser = PaymentReadSerializer(order.payment)
return Response(ser.data)
# -- Mark carrier ready to pickup(store) (admin) --
@action(
detail=True,
methods=["patch"],
url_path="carrier/ready-to-pickup",
permission_classes=[IsAdminUser],
)
@extend_schema(
tags=["commerce"],
summary="Mark carrier ready to pickup (admin)",
request=None,
responses={200: CarrierReadSerializer},
)
def carrier_ready_to_pickup(self, request, pk=None):
order = self.get_object()
if not order.carrier:
return Response({"detail": "Carrier not set."}, status=400)
order.carrier.ready_to_pickup()
order.carrier.refresh_from_db()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Start ordering shipping (admin) --
@action(
detail=True,
methods=["patch"],
url_path="carrier/start-ordering-shipping",
permission_classes=[IsAdminUser],
)
@extend_schema(
tags=["commerce"],
summary="Start ordering shipping (admin)",
request=None,
responses={200: CarrierReadSerializer},
)
def carrier_start_ordering_shipping(self, request, pk=None):
order = self.get_object()
if not order.carrier:
return Response({"detail": "Carrier not set."}, status=400)
order.carrier.start_ordering_shipping()
order.carrier.refresh_from_db()
ser = CarrierReadSerializer(order.carrier)
return Response(ser.data)
# -- Cancel order (authenticated) --
@action(detail=True, methods=['post'], permission_classes=[permissions.IsAuthenticated])
@extend_schema(
tags=["commerce"],
summary="Cancel order (authenticated user)",
responses={200: {"type": "object", "properties": {"status": {"type": "string"}}}},
)
def cancel(self, request, pk=None):
order = self.get_object() # get_object respects get_queryset filtering
# Can only cancel if not shipped
if order.carrier and order.carrier.state != Carrier.STATE.PREPARING:
return Response({'detail': 'Cannot cancel shipped order'}, status=400)
order.status = Order.OrderStatus.CANCELLED
order.save()
# Restore stock
for item in order.items.all():
item.product.stock += item.quantity
item.product.save()
return Response({'status': 'cancelled'})
# -- Verify payment (public) --
@action(detail=True, methods=['post'], url_path='payment/verify')
@extend_schema(
tags=["commerce", "public"],
summary="Verify payment status (public)",
responses={200: {"type": "object", "properties": {
"order_id": {"type": "integer"},
"payment_status": {"type": "string"}
}}},
)
def verify_payment(self, request, pk=None):
order = self.get_object()
if order.payment and order.payment.payment_method == Payment.PAYMENT.STRIPE:
if order.payment.stripe:
return Response({
'order_id': order.id,
'payment_status': order.payment.stripe.status
})
return Response({
'order_id': order.id,
'payment_status': 'unknown'
})
# -- Download invoice (public) --
@action(detail=True, methods=['get'], url_path='invoice')
@extend_schema(
tags=["commerce", "public"],
summary="Download invoice PDF (public)",
responses={200: "PDF File"},
)
def download_invoice(self, request, pk=None):
from django.http import FileResponse
order = self.get_object()
if not order.invoice or not order.invoice.pdf_file:
return Response({'detail': 'Invoice not available'}, status=404)
return FileResponse(
order.invoice.pdf_file.open('rb'),
content_type='application/pdf',
as_attachment=True,
filename=f'invoice_{order.invoice.invoice_number}.pdf'
)
# retrieve method removed - get_queryset handles filtering automatically
# ---------- Public/admin viewsets ----------
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List products (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve product (public)"),
create=extend_schema(tags=["commerce"], summary="Create product (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update product (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace product (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete product (auth required)"),
)
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.all()
serializer_class = ProductSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['category', 'is_active', 'stock', 'price', 'created_at', 'updated_at', 'limited_to']
search_fields = ["name", "code", "description"]
ordering_fields = ["price", "name", "created_at", "updated_at", "stock"]
ordering = ["price"]
@action(detail=True, methods=['get'], url_path='availability')
@extend_schema(
tags=["commerce", "public"],
summary="Check product availability (public)",
responses={200: {"type": "object", "properties": {
"product_id": {"type": "integer"},
"available": {"type": "boolean"},
"stock": {"type": "integer"},
"is_active": {"type": "boolean"}
}}},
)
def check_availability(self, request, pk=None):
product = self.get_object()
return Response({
'product_id': product.id,
'available': product.available,
'stock': product.stock,
'is_active': product.is_active
})
@action(detail=True, methods=['get'], url_path='rating')
@extend_schema(
tags=["commerce", "public"],
summary="Get product rating summary (public)",
responses={200: {"type": "object", "properties": {
"product_id": {"type": "integer"},
"average_rating": {"type": "number"},
"review_count": {"type": "integer"}
}}},
)
def rating_summary(self, request, pk=None):
from django.db.models import Avg, Count
product = self.get_object()
stats = product.reviews.aggregate(
average=Avg('rating'),
count=Count('id')
)
return Response({
'product_id': product.id,
'average_rating': round(stats['average'], 2) if stats['average'] else 0,
'review_count': stats['count']
})
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List categories (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve category (public)"),
create=extend_schema(tags=["commerce"], summary="Create category (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update category (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace category (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete category (auth required)"),
)
class CategoryViewSet(viewsets.ModelViewSet):
queryset = Category.objects.all()
serializer_class = CategorySerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["name", "description"]
ordering_fields = ["name", "id"]
ordering = ["name"]
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List product images (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve product image (public)"),
create=extend_schema(tags=["commerce"], summary="Create product image (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update product image (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace product image (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete product image (auth required)"),
)
class ProductImageViewSet(viewsets.ModelViewSet):
queryset = ProductImage.objects.all()
serializer_class = ProductImageSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["alt_text", "product__name"]
ordering_fields = ["id", "product__name"]
ordering = ["-id"]
@extend_schema_view(
list=extend_schema(tags=["commerce", "public"], summary="List discount codes (public)"),
retrieve=extend_schema(tags=["commerce", "public"], summary="Retrieve discount code (public)"),
create=extend_schema(tags=["commerce"], summary="Create discount code (auth required)"),
partial_update=extend_schema(tags=["commerce"], summary="Update discount code (auth required)"),
update=extend_schema(tags=["commerce"], summary="Replace discount code (auth required)"),
destroy=extend_schema(tags=["commerce"], summary="Delete discount code (auth required)"),
)
class DiscountCodeViewSet(viewsets.ModelViewSet):
queryset = DiscountCode.objects.all()
serializer_class = DiscountCodeSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["code", "description"]
ordering_fields = ["percent", "amount", "valid_from", "valid_to"]
ordering = ["-valid_from"]
# -- Refund (Admin only) --
@extend_schema_view(
list=extend_schema(tags=["commerce"], summary="List refunds (admin)"),
retrieve=extend_schema(tags=["commerce"], summary="Retrieve refund (admin)"),
create=extend_schema(tags=["commerce"], summary="Create refund (admin)"),
partial_update=extend_schema(tags=["commerce"], summary="Update refund (admin)"),
update=extend_schema(tags=["commerce"], summary="Replace refund (admin)"),
destroy=extend_schema(tags=["commerce"], summary="Delete refund (admin)"),
)
class RefundViewSet(viewsets.ModelViewSet):
queryset = Refund.objects.select_related("order").all().order_by("-created_at")
serializer_class = RefundSerializer
permission_classes = [IsAdminUser]
class RefundPublicView(APIView):
"""Public endpoint to create and fetch refund objects.
POST: Create a refund given email and invoice_number or order_id.
Returns JSON with refund info, order items, and a base64 PDF payload.
GET: Return a refund object by id (query param `id`).
"""
permission_classes = [AllowAny]
def get(self, request):
rid = request.query_params.get("id")
if not rid:
return Response({"detail": "Missing 'id' query parameter."}, status=status.HTTP_400_BAD_REQUEST)
try:
refund = Refund.objects.select_related("order").get(id=rid)
except Refund.DoesNotExist:
return Response({"detail": "Refund not found."}, status=status.HTTP_404_NOT_FOUND)
order = refund.order
items = []
for it in order.items.select_related('product').all():
items.append({
"product_name": getattr(it.product, "name", "Item"),
"sku": getattr(it.product, "code", None),
"quantity": it.quantity,
})
data = {
"refund": {
"id": refund.id,
"order_id": order.id,
"reason_choice": refund.reason_choice,
"reason_text": refund.reason_text,
"verified": refund.verified,
"created_at": refund.created_at,
},
"order": {
"id": order.id,
"email": order.email,
"created_at": order.created_at,
"items": items,
},
}
return Response(data)
def post(self, request):
serializer = RefundCreatePublicSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
refund = serializer.save()
order = refund.order
# Build items list for response
items = []
for it in order.items.select_related('product').all():
items.append({
"product_name": getattr(it.product, "name", "Item"),
"sku": getattr(it.product, "code", None),
"quantity": it.quantity,
})
# Generate PDF bytes using model helper
pdf_bytes = refund.generate_refund_pdf_for_customer()
pdf_b64 = base64.b64encode(pdf_bytes).decode('ascii')
resp = {
"refund": {
"id": refund.id,
"order_id": order.id,
"reason_choice": refund.reason_choice,
"reason_text": refund.reason_text,
"verified": refund.verified,
"created_at": refund.created_at,
},
"order": {
"id": order.id,
"email": order.email,
"created_at": order.created_at,
"items": items,
},
"pdf": {
"filename": f"refund_{refund.id}.pdf",
"content_type": "application/pdf",
"base64": pdf_b64,
},
}
return Response(resp, status=status.HTTP_201_CREATED)
class ReviewPostPublicView(APIView):
"""Public endpoint to create a review for a product."""
permission_classes = [permissions.IsAuthenticated]
@extend_schema(
tags=["commerce", "public"],
summary="Create a product review (public)",
request=ReviewSerializerPublic,
responses={201: ReviewSerializerPublic},
)
def post(self, request):
serializer = ReviewSerializerPublic(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
review = serializer.save()
out = ReviewSerializerPublic(review)
return Response(out.data, status=status.HTTP_201_CREATED)
# readonly public reviews, admin can patch
class ReviewPublicViewSet(viewsets.ModelViewSet):
queryset = Review.objects.all()
serializer_class = ReviewSerializerPublic
permission_classes = [AdminOnlyForPatchOtherwisePublic]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["product__name", "user__username", "comment"]
ordering_fields = ["rating", "created_at"]
ordering = ["-created_at"]
def get_queryset(self):
"""Filter reviews - admins see all, users see all reviews but can only modify their own"""
queryset = super().get_queryset()
# Admin users can see and modify all reviews
if self.request.user.is_authenticated and getattr(self.request.user, 'role', None) == 'admin':
return queryset
# For modification actions, users can only modify their own reviews
if self.action in ['update', 'partial_update', 'destroy']:
if self.request.user.is_authenticated:
return queryset.filter(user=self.request.user)
return queryset.none()
# For viewing, everyone can see all reviews (they're public)
return queryset
@action(detail=False, methods=['get'], url_path='product/(?P<product_id>[^/.]+)')
@extend_schema(
tags=["commerce", "public"],
summary="Get reviews for specific product (public)",
responses={200: ReviewSerializerPublic(many=True)},
)
def by_product(self, request, product_id=None):
reviews = self.get_queryset().filter(product_id=product_id)
page = self.paginate_queryset(reviews)
if page:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(reviews, many=True)
return Response(serializer.data)
# ---------- CART MANAGEMENT ----------
@extend_schema_view(
list=extend_schema(tags=["commerce", "cart"], summary="Get current cart (public)"),
retrieve=extend_schema(tags=["commerce", "cart"], summary="Get cart by ID (public)"),
)
class CartViewSet(viewsets.GenericViewSet):
"""
Shopping cart management for authenticated and anonymous users.
Anonymous users use session_key, authenticated users use their user account.
"""
queryset = Cart.objects.prefetch_related('items__product')
serializer_class = CartSerializer
permission_classes = [AllowAny]
def get_queryset(self):
"""Filter carts by user/session - users only see their own cart"""
queryset = super().get_queryset()
# Admin users can see all carts
if self.request.user.is_authenticated and getattr(self.request.user, 'role', None) == 'admin':
return queryset
# Authenticated users see only their cart
if self.request.user.is_authenticated:
return queryset.filter(user=self.request.user)
# Anonymous users see only their session cart
session_key = self.request.session.session_key
if session_key:
return queryset.filter(session_key=session_key, user__isnull=True)
return queryset.none()
def get_or_create_cart(self, request):
"""Get or create cart for current user/session"""
if request.user.is_authenticated:
cart, _ = Cart.objects.get_or_create(user=request.user)
else:
# Use session for anonymous users
session_key = request.session.session_key
if not session_key:
request.session.create()
session_key = request.session.session_key
cart, _ = Cart.objects.get_or_create(session_key=session_key)
return cart
@action(detail=False, methods=['get'], url_path='current')
@extend_schema(
tags=["commerce", "cart"],
summary="Get current user's cart",
responses={200: CartSerializer},
)
def current(self, request):
"""Get the current user's cart"""
cart = self.get_or_create_cart(request)
serializer = CartSerializer(cart)
return Response(serializer.data)
@action(detail=False, methods=['post'], url_path='add')
@extend_schema(
tags=["commerce", "cart"],
summary="Add item to cart",
request=CartItemCreateSerializer,
responses={200: CartSerializer},
)
def add_item(self, request):
"""Add a product to the cart or update quantity if already exists"""
serializer = CartItemCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
cart = self.get_or_create_cart(request)
product_id = serializer.validated_data['product_id']
quantity = serializer.validated_data['quantity']
product = Product.objects.get(pk=product_id)
# Check if item already in cart
cart_item, created = CartItem.objects.get_or_create(
cart=cart,
product=product,
defaults={'quantity': quantity}
)
if not created:
# Update quantity if item already exists
cart_item.quantity += quantity
cart_item.save()
cart.refresh_from_db()
return Response(CartSerializer(cart).data)
@action(detail=False, methods=['patch'], url_path='items/(?P<item_id>[^/.]+)')
@extend_schema(
tags=["commerce", "cart"],
summary="Update cart item quantity",
request={"type": "object", "properties": {"quantity": {"type": "integer"}}},
responses={200: CartSerializer},
)
def update_item(self, request, item_id=None):
"""Update quantity of a cart item"""
cart = self.get_or_create_cart(request)
try:
cart_item = CartItem.objects.get(pk=item_id, cart=cart)
except CartItem.DoesNotExist:
return Response({'detail': 'Cart item not found'}, status=404)
quantity = request.data.get('quantity')
if quantity is None:
return Response({'detail': 'Quantity is required'}, status=400)
try:
quantity = int(quantity)
if quantity < 1:
return Response({'detail': 'Quantity must be at least 1'}, status=400)
except ValueError:
return Response({'detail': 'Invalid quantity'}, status=400)
cart_item.quantity = quantity
cart_item.save()
cart.refresh_from_db()
return Response(CartSerializer(cart).data)
@action(detail=False, methods=['delete'], url_path='items/(?P<item_id>[^/.]+)')
@extend_schema(
tags=["commerce", "cart"],
summary="Remove item from cart",
responses={200: CartSerializer},
)
def remove_item(self, request, item_id=None):
"""Remove an item from the cart"""
cart = self.get_or_create_cart(request)
try:
cart_item = CartItem.objects.get(pk=item_id, cart=cart)
cart_item.delete()
except CartItem.DoesNotExist:
return Response({'detail': 'Cart item not found'}, status=404)
cart.refresh_from_db()
return Response(CartSerializer(cart).data)
@action(detail=False, methods=['post'], url_path='clear')
@extend_schema(
tags=["commerce", "cart"],
summary="Clear all items from cart",
responses={200: CartSerializer},
)
def clear(self, request):
"""Remove all items from the cart"""
cart = self.get_or_create_cart(request)
cart.items.all().delete()
cart.refresh_from_db()
return Response(CartSerializer(cart).data)
# ---------- WISHLIST MANAGEMENT ----------
@extend_schema_view(
list=extend_schema(tags=["commerce", "wishlist"], summary="Get current user's wishlist (authenticated)"),
retrieve=extend_schema(tags=["commerce", "wishlist"], summary="Get wishlist by ID (authenticated)"),
)
class WishlistViewSet(viewsets.GenericViewSet):
"""
User wishlist management - users can only access their own wishlist
"""
queryset = Wishlist.objects.prefetch_related('products')
serializer_class = WishlistSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
"""Filter to current user's wishlist only"""
return self.queryset.filter(user=self.request.user)
def get_or_create_wishlist(self):
"""Get or create wishlist for current user"""
wishlist, _ = Wishlist.objects.get_or_create(user=self.request.user)
return wishlist
@action(detail=False, methods=['get'], url_path='current')
@extend_schema(
tags=["commerce", "wishlist"],
summary="Get current user's wishlist",
responses={200: WishlistSerializer},
)
def current(self, request):
"""Get the current user's wishlist"""
wishlist = self.get_or_create_wishlist()
serializer = WishlistSerializer(wishlist)
return Response(serializer.data)
@action(detail=False, methods=['post'], url_path='add')
@extend_schema(
tags=["commerce", "wishlist"],
summary="Add product to wishlist",
request=WishlistProductActionSerializer,
responses={200: WishlistSerializer},
)
def add_product(self, request):
"""Add a product to the user's wishlist"""
serializer = WishlistProductActionSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
wishlist = self.get_or_create_wishlist()
product_id = serializer.validated_data['product_id']
product = Product.objects.get(pk=product_id)
wishlist.add_product(product)
wishlist.refresh_from_db()
return Response(WishlistSerializer(wishlist).data)
@action(detail=False, methods=['post'], url_path='remove')
@extend_schema(
tags=["commerce", "wishlist"],
summary="Remove product from wishlist",
request=WishlistProductActionSerializer,
responses={200: WishlistSerializer},
)
def remove_product(self, request):
"""Remove a product from the user's wishlist"""
serializer = WishlistProductActionSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
wishlist = self.get_or_create_wishlist()
product_id = serializer.validated_data['product_id']
try:
product = Product.objects.get(pk=product_id)
wishlist.remove_product(product)
except Product.DoesNotExist:
return Response({'detail': 'Product not found'}, status=404)
wishlist.refresh_from_db()
return Response(WishlistSerializer(wishlist).data)
@action(detail=False, methods=['post'], url_path='clear')
@extend_schema(
tags=["commerce", "wishlist"],
summary="Clear all products from wishlist",
responses={200: WishlistSerializer},
)
def clear(self, request):
"""Remove all products from the user's wishlist"""
wishlist = self.get_or_create_wishlist()
wishlist.products.clear()
wishlist.refresh_from_db()
return Response(WishlistSerializer(wishlist).data)
@action(detail=False, methods=['get'], url_path='check/(?P<product_id>[^/.]+)')
@extend_schema(
tags=["commerce", "wishlist"],
summary="Check if product is in wishlist",
responses={200: {"type": "object", "properties": {"in_wishlist": {"type": "boolean"}}}},
)
def check_product(self, request, product_id=None):
"""Check if a product is in user's wishlist"""
wishlist = self.get_or_create_wishlist()
try:
product = Product.objects.get(pk=product_id)
in_wishlist = wishlist.has_product(product)
return Response({'in_wishlist': in_wishlist})
except Product.DoesNotExist:
return Response({'detail': 'Product not found'}, status=404)
@extend_schema_view(
list=extend_schema(tags=["commerce", "admin"], summary="List all wishlists (admin)"),
retrieve=extend_schema(tags=["commerce", "admin"], summary="Get wishlist by ID (admin)"),
update=extend_schema(tags=["commerce", "admin"], summary="Update wishlist (admin)"),
partial_update=extend_schema(tags=["commerce", "admin"], summary="Partially update wishlist (admin)"),
destroy=extend_schema(tags=["commerce", "admin"], summary="Delete wishlist (admin)"),
)
class AdminWishlistViewSet(viewsets.ModelViewSet):
"""
Admin-only wishlist management - can view and modify any user's wishlist
"""
queryset = Wishlist.objects.prefetch_related('products').select_related('user')
serializer_class = WishlistSerializer
permission_classes = [AdminOnlyForPatchOtherwisePublic]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ["user__email", "user__first_name", "user__last_name"]
ordering_fields = ["created_at", "updated_at"]
ordering = ["-updated_at"]
# ---------- ANALYTICS SYSTEM ----------
class AnalyticsView(APIView):
"""
Comprehensive analytics and reporting API for business intelligence.
Supports configurable date ranges and various analytics modules.
GET /analytics/ - Complete dashboard overview
GET /analytics/?type=sales - Sales analytics
GET /analytics/?type=products - Product analytics
GET /analytics/?type=customers - Customer analytics
GET /analytics/?type=shipping - Shipping analytics
GET /analytics/?type=reviews - Review analytics
GET /analytics/?type=inventory - Inventory analytics
GET /analytics/?type=ranges - Available date ranges
POST /analytics/ - Generate custom analytics reports
"""
permission_classes = [permissions.IsAdminUser]
def _parse_date_params(self, request):
"""Parse date parameters from request"""
start_date = request.query_params.get('start_date')
end_date = request.query_params.get('end_date')
period = request.query_params.get('period', 'last_30_days')
if start_date:
start_date = parse_datetime(start_date)
if end_date:
end_date = parse_datetime(end_date)
# If no dates provided, use predefined period
if not start_date or not end_date:
predefined_ranges = get_predefined_date_ranges()
if period in predefined_ranges:
date_range = predefined_ranges[period]
start_date = start_date or date_range['start']
end_date = end_date or date_range['end']
return start_date, end_date, period
@extend_schema(
tags=["commerce", "analytics"],
summary="Get analytics data",
description="Get various analytics based on type parameter",
parameters=[
{"name": "type", "in": "query", "schema": {"type": "string", "enum": ["dashboard", "sales", "products", "customers", "shipping", "reviews", "inventory", "ranges"]}, "description": "Type of analytics to retrieve"},
{"name": "start_date", "in": "query", "schema": {"type": "string", "format": "date-time"}},
{"name": "end_date", "in": "query", "schema": {"type": "string", "format": "date-time"}},
{"name": "period", "in": "query", "schema": {"type": "string", "enum": ["today", "yesterday", "last_7_days", "last_30_days", "last_90_days", "this_month", "last_month", "this_year", "last_year"]}},
{"name": "limit", "in": "query", "schema": {"type": "integer", "minimum": 1, "maximum": 100}, "description": "Limit for product analytics"}
]
)
def get(self, request):
"""Get analytics data based on type parameter"""
analytics_type = request.query_params.get('type', 'dashboard')
start_date, end_date, period = self._parse_date_params(request)
if analytics_type == 'dashboard':
return self._get_dashboard_analytics(start_date, end_date, period)
elif analytics_type == 'sales':
return self._get_sales_analytics(start_date, end_date, period)
elif analytics_type == 'products':
return self._get_product_analytics(request, start_date, end_date)
elif analytics_type == 'customers':
return self._get_customer_analytics(start_date, end_date)
elif analytics_type == 'shipping':
return self._get_shipping_analytics(start_date, end_date)
elif analytics_type == 'reviews':
return self._get_review_analytics(start_date, end_date)
elif analytics_type == 'inventory':
return self._get_inventory_analytics()
elif analytics_type == 'ranges':
return self._get_date_ranges()
else:
return Response({'error': 'Invalid analytics type'}, status=400)
def _get_dashboard_analytics(self, start_date, end_date, period):
"""Get complete analytics dashboard overview"""
dashboard_data = AnalyticsAggregator.dashboard_overview(start_date, end_date)
dashboard_data['query_params'] = {
'start_date': start_date.isoformat() if start_date else None,
'end_date': end_date.isoformat() if end_date else None,
'period': period
}
return Response(dashboard_data)
def _get_sales_analytics(self, start_date, end_date, period):
"""Get detailed sales and revenue analytics"""
sales_data = SalesAnalytics.revenue_overview(start_date, end_date, period)
payment_data = SalesAnalytics.payment_methods_breakdown(start_date, end_date)
# Calculate percentages for payment methods
total_revenue = sum(item['revenue'] for item in payment_data)
for item in payment_data:
item['percentage'] = round(
(item['revenue'] / total_revenue * 100) if total_revenue > 0 else 0, 2
)
return Response({
'sales': sales_data,
'payment_methods': payment_data
})
def _get_product_analytics(self, request, start_date, end_date):
"""Get product performance analytics"""
limit = int(request.query_params.get('limit', 20))
top_products = ProductAnalytics.top_selling_products(start_date, end_date, limit)
category_performance = ProductAnalytics.category_performance(start_date, end_date)
return Response({
'top_products': top_products,
'category_performance': category_performance
})
def _get_customer_analytics(self, start_date, end_date):
"""Get customer behavior analytics"""
customer_overview = CustomerAnalytics.customer_overview(start_date, end_date)
cart_analysis = CustomerAnalytics.cart_abandonment_analysis()
return Response({
'customer_overview': customer_overview,
'cart_abandonment': cart_analysis
})
def _get_shipping_analytics(self, start_date, end_date):
"""Get shipping methods and Deutsche Post analytics"""
shipping_methods = ShippingAnalytics.shipping_methods_breakdown(start_date, end_date)
deutsche_post_data = ShippingAnalytics.deutsche_post_analytics()
return Response({
'shipping_methods': shipping_methods,
'deutsche_post': deutsche_post_data
})
def _get_review_analytics(self, start_date, end_date):
"""Get review and rating analytics"""
review_data = ReviewAnalytics.review_overview(start_date, end_date)
return Response(review_data)
def _get_inventory_analytics(self):
"""Get comprehensive inventory analytics"""
inventory_data = ProductAnalytics.inventory_analysis()
return Response(inventory_data)
def _get_date_ranges(self):
"""Get available predefined date ranges"""
ranges = get_predefined_date_ranges()
return Response({
'available_ranges': list(ranges.keys()),
'ranges': {
key: {
'start': value['start'].isoformat(),
'end': value['end'].isoformat(),
'display_name': key.replace('_', ' ').title()
}
for key, value in ranges.items()
}
})
@extend_schema(
tags=["commerce", "analytics"],
summary="Generate custom analytics report",
description="Generate custom analytics based on specified modules and parameters",
)
def post(self, request):
"""Generate custom analytics report based on specified modules"""
modules = request.data.get('modules', ['sales', 'products'])
start_date = request.data.get('start_date')
end_date = request.data.get('end_date')
period = request.data.get('period', 'daily')
options = request.data.get('options', {})
if start_date:
start_date = parse_datetime(start_date)
if end_date:
end_date = parse_datetime(end_date)
# If no dates provided, default to last 30 days
if not start_date or not end_date:
predefined_ranges = get_predefined_date_ranges()
date_range = predefined_ranges.get('last_30_days')
start_date = start_date or date_range['start']
end_date = end_date or date_range['end']
report_data = {}
if 'sales' in modules:
report_data['sales'] = {
'revenue': SalesAnalytics.revenue_overview(start_date, end_date, period),
'payment_methods': SalesAnalytics.payment_methods_breakdown(start_date, end_date)
}
if 'products' in modules:
limit = options.get('product_limit', 20)
report_data['products'] = {
'top_selling': ProductAnalytics.top_selling_products(start_date, end_date, limit),
'categories': ProductAnalytics.category_performance(start_date, end_date)
}
if 'customers' in modules:
report_data['customers'] = CustomerAnalytics.customer_overview(start_date, end_date)
if 'shipping' in modules:
report_data['shipping'] = {
'methods': ShippingAnalytics.shipping_methods_breakdown(start_date, end_date),
'deutsche_post': ShippingAnalytics.deutsche_post_analytics()
}
if 'reviews' in modules:
report_data['reviews'] = ReviewAnalytics.review_overview(start_date, end_date)
return Response({
'report': report_data,
'parameters': {
'modules': modules,
'start_date': start_date.isoformat(),
'end_date': end_date.isoformat(),
'period': period,
'generated_at': datetime.now().isoformat()
}
})