Add comprehensive analytics and VAT rate management

Introduced a full-featured analytics module for e-commerce business intelligence, including sales, product, customer, shipping, and review analytics, with API endpoints for dashboard and custom reports. Added VAT rate management: new VATRate model, admin interface, serializer, and API endpoints, and integrated VAT logic into Product and pricing calculations. Refactored configuration and admin code to support VAT rates, improved email notification tasks, and updated related serializers, views, and URLs for unified configuration and VAT management.
This commit is contained in:
2026-01-19 02:13:47 +01:00
parent e78baf746c
commit 2a26edac80
9 changed files with 1055 additions and 133 deletions

View File

@@ -0,0 +1,506 @@
"""
E-commerce Analytics Module
Provides comprehensive business intelligence for the e-commerce platform.
All analytics functions return data structures suitable for frontend charts/graphs.
"""
from django.db.models import Sum, Count, Avg, Q, F
from django.utils import timezone
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Dict, List, Any, Optional
from django.db.models.functions import TruncDate, TruncMonth, TruncWeek
from .models import Order, Product, OrderItem, Payment, Carrier, Review, Cart, CartItem
from configuration.models import SiteConfiguration
class SalesAnalytics:
"""Sales and revenue analytics"""
@staticmethod
def revenue_overview(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
period: str = "daily"
) -> Dict[str, Any]:
"""
Get revenue overview with configurable date range and period
Args:
start_date: Start date for analysis (default: last 30 days)
end_date: End date for analysis (default: today)
period: "daily", "weekly", "monthly" (default: daily)
Returns:
Dict with total_revenue, order_count, avg_order_value, and time_series data
"""
if not start_date:
start_date = timezone.now() - timedelta(days=30)
if not end_date:
end_date = timezone.now()
# Base queryset for completed orders
orders = Order.objects.filter(
status=Order.OrderStatus.COMPLETED,
created_at__range=(start_date, end_date)
)
# Aggregate totals
totals = orders.aggregate(
total_revenue=Sum('total_price'),
order_count=Count('id'),
avg_order_value=Avg('total_price')
)
# Time series data based on period
trunc_function = {
'daily': TruncDate,
'weekly': TruncWeek,
'monthly': TruncMonth,
}.get(period, TruncDate)
time_series = (
orders
.annotate(period=trunc_function('created_at'))
.values('period')
.annotate(
revenue=Sum('total_price'),
orders=Count('id')
)
.order_by('period')
)
return {
'total_revenue': totals['total_revenue'] or Decimal('0'),
'order_count': totals['order_count'] or 0,
'avg_order_value': totals['avg_order_value'] or Decimal('0'),
'time_series': list(time_series),
'period': period,
'date_range': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
}
}
@staticmethod
def payment_methods_breakdown(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""Get breakdown of payment methods usage"""
if not start_date:
start_date = timezone.now() - timedelta(days=30)
if not end_date:
end_date = timezone.now()
payment_stats = (
Payment.objects
.filter(order__created_at__range=(start_date, end_date))
.values('payment_method')
.annotate(
count=Count('id'),
revenue=Sum('order__total_price')
)
.order_by('-revenue')
)
return [
{
'method': item['payment_method'],
'method_display': dict(Payment.PAYMENT.choices).get(item['payment_method'], item['payment_method']),
'count': item['count'],
'revenue': item['revenue'] or Decimal('0'),
'percentage': 0 # Will be calculated in the view
}
for item in payment_stats
]
class ProductAnalytics:
"""Product performance analytics"""
@staticmethod
def top_selling_products(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Get top selling products by quantity and revenue"""
if not start_date:
start_date = timezone.now() - timedelta(days=30)
if not end_date:
end_date = timezone.now()
top_products = (
OrderItem.objects
.filter(order__created_at__range=(start_date, end_date))
.select_related('product')
.values('product__id', 'product__name', 'product__price')
.annotate(
total_quantity=Sum('quantity'),
total_revenue=Sum(F('quantity') * F('product__price')),
order_count=Count('order', distinct=True)
)
.order_by('-total_revenue')[:limit]
)
return [
{
'product_id': item['product__id'],
'product_name': item['product__name'],
'unit_price': item['product__price'],
'total_quantity': item['total_quantity'],
'total_revenue': item['total_revenue'],
'order_count': item['order_count'],
'avg_quantity_per_order': round(item['total_quantity'] / item['order_count'], 2)
}
for item in top_products
]
@staticmethod
def category_performance(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""Get category performance breakdown"""
if not start_date:
start_date = timezone.now() - timedelta(days=30)
if not end_date:
end_date = timezone.now()
category_stats = (
OrderItem.objects
.filter(order__created_at__range=(start_date, end_date))
.select_related('product__category')
.values('product__category__id', 'product__category__name')
.annotate(
total_quantity=Sum('quantity'),
total_revenue=Sum(F('quantity') * F('product__price')),
product_count=Count('product', distinct=True),
order_count=Count('order', distinct=True)
)
.order_by('-total_revenue')
)
return [
{
'category_id': item['product__category__id'],
'category_name': item['product__category__name'],
'total_quantity': item['total_quantity'],
'total_revenue': item['total_revenue'],
'product_count': item['product_count'],
'order_count': item['order_count']
}
for item in category_stats
]
@staticmethod
def inventory_analysis() -> Dict[str, Any]:
"""Get inventory status and low stock alerts"""
total_products = Product.objects.filter(is_active=True).count()
out_of_stock = Product.objects.filter(is_active=True, stock=0).count()
low_stock = Product.objects.filter(
is_active=True,
stock__gt=0,
stock__lte=10 # Consider configurable threshold
).count()
low_stock_products = (
Product.objects
.filter(is_active=True, stock__lte=10)
.select_related('category')
.values('id', 'name', 'stock', 'category__name')
.order_by('stock')[:20]
)
return {
'total_products': total_products,
'out_of_stock_count': out_of_stock,
'low_stock_count': low_stock,
'in_stock_count': total_products - out_of_stock,
'low_stock_products': list(low_stock_products),
'stock_distribution': {
'out_of_stock': out_of_stock,
'low_stock': low_stock,
'in_stock': total_products - out_of_stock - low_stock
}
}
class CustomerAnalytics:
"""Customer behavior and demographics analytics"""
@staticmethod
def customer_overview(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get customer acquisition and behavior overview"""
if not start_date:
start_date = timezone.now() - timedelta(days=30)
if not end_date:
end_date = timezone.now()
# New vs returning customers
period_orders = Order.objects.filter(created_at__range=(start_date, end_date))
# First-time customers (users with their first order in this period)
first_time_customers = period_orders.filter(
user__orders__created_at__lt=start_date
).values('user').distinct().count()
# Returning customers
total_customers = period_orders.values('user').distinct().count()
returning_customers = total_customers - first_time_customers
# Customer lifetime value (simplified)
customer_stats = (
Order.objects
.filter(user__isnull=False)
.values('user')
.annotate(
total_orders=Count('id'),
total_spent=Sum('total_price'),
avg_order_value=Avg('total_price')
)
)
avg_customer_ltv = customer_stats.aggregate(
avg_ltv=Avg('total_spent')
)['avg_ltv'] or Decimal('0')
return {
'total_customers': total_customers,
'new_customers': first_time_customers,
'returning_customers': returning_customers,
'avg_customer_lifetime_value': avg_customer_ltv,
'date_range': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
}
}
@staticmethod
def cart_abandonment_analysis() -> Dict[str, Any]:
"""Analyze cart abandonment rates"""
# Active carts (updated in last 7 days)
week_ago = timezone.now() - timedelta(days=7)
active_carts = Cart.objects.filter(updated_at__gte=week_ago)
# Completed orders from carts
completed_orders = Order.objects.filter(
user__cart__in=active_carts,
created_at__gte=week_ago
).count()
total_carts = active_carts.count()
abandoned_carts = max(0, total_carts - completed_orders)
abandonment_rate = (abandoned_carts / total_carts * 100) if total_carts > 0 else 0
return {
'total_active_carts': total_carts,
'completed_orders': completed_orders,
'abandoned_carts': abandoned_carts,
'abandonment_rate': round(abandonment_rate, 2),
'analysis_period': '7 days'
}
class ShippingAnalytics:
"""Shipping and logistics analytics"""
@staticmethod
def shipping_methods_breakdown(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""Get breakdown of shipping methods usage"""
if not start_date:
start_date = timezone.now() - timedelta(days=30)
if not end_date:
end_date = timezone.now()
shipping_stats = (
Carrier.objects
.filter(order__created_at__range=(start_date, end_date))
.values('shipping_method', 'state')
.annotate(
count=Count('id'),
total_shipping_cost=Sum('shipping_price')
)
.order_by('-count')
)
return [
{
'shipping_method': item['shipping_method'],
'method_display': dict(Carrier.SHIPPING.choices).get(item['shipping_method'], item['shipping_method']),
'state': item['state'],
'state_display': dict(Carrier.STATE.choices).get(item['state'], item['state']),
'count': item['count'],
'total_cost': item['total_shipping_cost'] or Decimal('0')
}
for item in shipping_stats
]
@staticmethod
def deutsche_post_analytics() -> Dict[str, Any]:
"""Get Deutsche Post shipping analytics and pricing info"""
try:
# Import Deutsche Post models
from thirdparty.deutschepost.models import DeutschePostOrder
# Get Deutsche Post orders statistics
dp_orders = DeutschePostOrder.objects.all()
total_dp_orders = dp_orders.count()
# Get configuration for pricing
config = SiteConfiguration.get_solo()
dp_default_price = config.deutschepost_shipping_price
# Status breakdown (if available in the model)
# Note: This depends on actual DeutschePostOrder model structure
return {
'total_deutsche_post_orders': total_dp_orders,
'default_shipping_price': dp_default_price,
'api_configured': bool(config.deutschepost_client_id and config.deutschepost_client_secret),
'api_endpoint': config.deutschepost_api_url,
'analysis_note': 'Detailed Deutsche Post analytics require API integration'
}
except ImportError:
return {
'error': 'Deutsche Post module not available',
'total_deutsche_post_orders': 0,
'default_shipping_price': Decimal('0')
}
class ReviewAnalytics:
"""Product review and rating analytics"""
@staticmethod
def review_overview(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get review statistics and sentiment overview"""
if not start_date:
start_date = timezone.now() - timedelta(days=30)
if not end_date:
end_date = timezone.now()
reviews = Review.objects.filter(created_at__range=(start_date, end_date))
rating_distribution = (
reviews
.values('rating')
.annotate(count=Count('id'))
.order_by('rating')
)
avg_rating = reviews.aggregate(avg=Avg('rating'))['avg'] or 0
total_reviews = reviews.count()
# Top rated products
top_rated_products = (
Review.objects
.filter(created_at__range=(start_date, end_date))
.select_related('product')
.values('product__id', 'product__name')
.annotate(
avg_rating=Avg('rating'),
review_count=Count('id')
)
.filter(review_count__gte=3) # At least 3 reviews
.order_by('-avg_rating')[:10]
)
return {
'total_reviews': total_reviews,
'average_rating': round(avg_rating, 2),
'rating_distribution': [
{
'rating': item['rating'],
'count': item['count'],
'percentage': round(item['count'] / total_reviews * 100, 1) if total_reviews > 0 else 0
}
for item in rating_distribution
],
'top_rated_products': list(top_rated_products),
'date_range': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
}
}
class AnalyticsAggregator:
"""Main analytics aggregator for dashboard views"""
@staticmethod
def dashboard_overview(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get comprehensive dashboard data"""
return {
'sales': SalesAnalytics.revenue_overview(start_date, end_date),
'products': {
'top_selling': ProductAnalytics.top_selling_products(start_date, end_date, limit=5),
'inventory': ProductAnalytics.inventory_analysis()
},
'customers': CustomerAnalytics.customer_overview(start_date, end_date),
'shipping': {
'methods': ShippingAnalytics.shipping_methods_breakdown(start_date, end_date),
'deutsche_post': ShippingAnalytics.deutsche_post_analytics()
},
'reviews': ReviewAnalytics.review_overview(start_date, end_date),
'generated_at': timezone.now().isoformat()
}
def get_predefined_date_ranges() -> Dict[str, Dict[str, datetime]]:
"""Get predefined date ranges for easy frontend integration"""
now = timezone.now()
return {
'today': {
'start': now.replace(hour=0, minute=0, second=0, microsecond=0),
'end': now
},
'yesterday': {
'start': (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0),
'end': (now - timedelta(days=1)).replace(hour=23, minute=59, second=59)
},
'last_7_days': {
'start': now - timedelta(days=7),
'end': now
},
'last_30_days': {
'start': now - timedelta(days=30),
'end': now
},
'last_90_days': {
'start': now - timedelta(days=90),
'end': now
},
'this_month': {
'start': now.replace(day=1, hour=0, minute=0, second=0, microsecond=0),
'end': now
},
'last_month': {
'start': (now.replace(day=1) - timedelta(days=1)).replace(day=1, hour=0, minute=0, second=0, microsecond=0),
'end': (now.replace(day=1) - timedelta(days=1)).replace(hour=23, minute=59, second=59)
},
'this_year': {
'start': now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0),
'end': now
},
'last_year': {
'start': (now.replace(month=1, day=1) - timedelta(days=365)).replace(hour=0, minute=0, second=0, microsecond=0),
'end': (now.replace(month=1, day=1) - timedelta(days=1)).replace(hour=23, minute=59, second=59)
}
}