""" E-commerce Analytics Module Provides comprehensive business intelligence for the e-commerce platform. All analytics functions return data structures suitable for frontend charts/graphs. """ from django.db.models import Sum, Count, Avg, Q, F from django.utils import timezone from datetime import datetime, timedelta from decimal import Decimal from typing import Dict, List, Any, Optional from django.db.models.functions import TruncDate, TruncMonth, TruncWeek from .models import Order, Product, OrderItem, Payment, Carrier, Review, Cart, CartItem from configuration.models import SiteConfiguration class SalesAnalytics: """Sales and revenue analytics""" @staticmethod def revenue_overview( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, period: str = "daily" ) -> Dict[str, Any]: """ Get revenue overview with configurable date range and period Args: start_date: Start date for analysis (default: last 30 days) end_date: End date for analysis (default: today) period: "daily", "weekly", "monthly" (default: daily) Returns: Dict with total_revenue, order_count, avg_order_value, and time_series data """ if not start_date: start_date = timezone.now() - timedelta(days=30) if not end_date: end_date = timezone.now() # Base queryset for completed orders orders = Order.objects.filter( status=Order.OrderStatus.COMPLETED, created_at__range=(start_date, end_date) ) # Aggregate totals totals = orders.aggregate( total_revenue=Sum('total_price'), order_count=Count('id'), avg_order_value=Avg('total_price') ) # Time series data based on period trunc_function = { 'daily': TruncDate, 'weekly': TruncWeek, 'monthly': TruncMonth, }.get(period, TruncDate) time_series = ( orders .annotate(period=trunc_function('created_at')) .values('period') .annotate( revenue=Sum('total_price'), orders=Count('id') ) .order_by('period') ) return { 'total_revenue': totals['total_revenue'] or Decimal('0'), 'order_count': totals['order_count'] or 0, 'avg_order_value': totals['avg_order_value'] or Decimal('0'), 'time_series': list(time_series), 'period': period, 'date_range': { 'start': start_date.isoformat(), 'end': end_date.isoformat() } } @staticmethod def payment_methods_breakdown( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> List[Dict[str, Any]]: """Get breakdown of payment methods usage""" if not start_date: start_date = timezone.now() - timedelta(days=30) if not end_date: end_date = timezone.now() payment_stats = ( Payment.objects .filter(order__created_at__range=(start_date, end_date)) .values('payment_method') .annotate( count=Count('id'), revenue=Sum('order__total_price') ) .order_by('-revenue') ) return [ { 'method': item['payment_method'], 'method_display': dict(Payment.PAYMENT.choices).get(item['payment_method'], item['payment_method']), 'count': item['count'], 'revenue': item['revenue'] or Decimal('0'), 'percentage': 0 # Will be calculated in the view } for item in payment_stats ] class ProductAnalytics: """Product performance analytics""" @staticmethod def top_selling_products( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, limit: int = 10 ) -> List[Dict[str, Any]]: """Get top selling products by quantity and revenue""" if not start_date: start_date = timezone.now() - timedelta(days=30) if not end_date: end_date = timezone.now() top_products = ( OrderItem.objects .filter(order__created_at__range=(start_date, end_date)) .select_related('product') .values('product__id', 'product__name', 'product__price') .annotate( total_quantity=Sum('quantity'), total_revenue=Sum(F('quantity') * F('product__price')), order_count=Count('order', distinct=True) ) .order_by('-total_revenue')[:limit] ) return [ { 'product_id': item['product__id'], 'product_name': item['product__name'], 'unit_price': item['product__price'], 'total_quantity': item['total_quantity'], 'total_revenue': item['total_revenue'], 'order_count': item['order_count'], 'avg_quantity_per_order': round(item['total_quantity'] / item['order_count'], 2) } for item in top_products ] @staticmethod def category_performance( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> List[Dict[str, Any]]: """Get category performance breakdown""" if not start_date: start_date = timezone.now() - timedelta(days=30) if not end_date: end_date = timezone.now() category_stats = ( OrderItem.objects .filter(order__created_at__range=(start_date, end_date)) .select_related('product__category') .values('product__category__id', 'product__category__name') .annotate( total_quantity=Sum('quantity'), total_revenue=Sum(F('quantity') * F('product__price')), product_count=Count('product', distinct=True), order_count=Count('order', distinct=True) ) .order_by('-total_revenue') ) return [ { 'category_id': item['product__category__id'], 'category_name': item['product__category__name'], 'total_quantity': item['total_quantity'], 'total_revenue': item['total_revenue'], 'product_count': item['product_count'], 'order_count': item['order_count'] } for item in category_stats ] @staticmethod def inventory_analysis() -> Dict[str, Any]: """Get inventory status and low stock alerts""" total_products = Product.objects.filter(is_active=True).count() out_of_stock = Product.objects.filter(is_active=True, stock=0).count() low_stock = Product.objects.filter( is_active=True, stock__gt=0, stock__lte=10 # Consider configurable threshold ).count() low_stock_products = ( Product.objects .filter(is_active=True, stock__lte=10) .select_related('category') .values('id', 'name', 'stock', 'category__name') .order_by('stock')[:20] ) return { 'total_products': total_products, 'out_of_stock_count': out_of_stock, 'low_stock_count': low_stock, 'in_stock_count': total_products - out_of_stock, 'low_stock_products': list(low_stock_products), 'stock_distribution': { 'out_of_stock': out_of_stock, 'low_stock': low_stock, 'in_stock': total_products - out_of_stock - low_stock } } class CustomerAnalytics: """Customer behavior and demographics analytics""" @staticmethod def customer_overview( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> Dict[str, Any]: """Get customer acquisition and behavior overview""" if not start_date: start_date = timezone.now() - timedelta(days=30) if not end_date: end_date = timezone.now() # New vs returning customers period_orders = Order.objects.filter(created_at__range=(start_date, end_date)) # First-time customers (users with their first order in this period) first_time_customers = period_orders.filter( user__orders__created_at__lt=start_date ).values('user').distinct().count() # Returning customers total_customers = period_orders.values('user').distinct().count() returning_customers = total_customers - first_time_customers # Customer lifetime value (simplified) customer_stats = ( Order.objects .filter(user__isnull=False) .values('user') .annotate( total_orders=Count('id'), total_spent=Sum('total_price'), avg_order_value=Avg('total_price') ) ) avg_customer_ltv = customer_stats.aggregate( avg_ltv=Avg('total_spent') )['avg_ltv'] or Decimal('0') return { 'total_customers': total_customers, 'new_customers': first_time_customers, 'returning_customers': returning_customers, 'avg_customer_lifetime_value': avg_customer_ltv, 'date_range': { 'start': start_date.isoformat(), 'end': end_date.isoformat() } } @staticmethod def cart_abandonment_analysis() -> Dict[str, Any]: """Analyze cart abandonment rates""" # Active carts (updated in last 7 days) week_ago = timezone.now() - timedelta(days=7) active_carts = Cart.objects.filter(updated_at__gte=week_ago) # Completed orders from carts completed_orders = Order.objects.filter( user__cart__in=active_carts, created_at__gte=week_ago ).count() total_carts = active_carts.count() abandoned_carts = max(0, total_carts - completed_orders) abandonment_rate = (abandoned_carts / total_carts * 100) if total_carts > 0 else 0 return { 'total_active_carts': total_carts, 'completed_orders': completed_orders, 'abandoned_carts': abandoned_carts, 'abandonment_rate': round(abandonment_rate, 2), 'analysis_period': '7 days' } class ShippingAnalytics: """Shipping and logistics analytics""" @staticmethod def shipping_methods_breakdown( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> List[Dict[str, Any]]: """Get breakdown of shipping methods usage""" if not start_date: start_date = timezone.now() - timedelta(days=30) if not end_date: end_date = timezone.now() shipping_stats = ( Carrier.objects .filter(order__created_at__range=(start_date, end_date)) .values('shipping_method', 'state') .annotate( count=Count('id'), total_shipping_cost=Sum('shipping_price') ) .order_by('-count') ) return [ { 'shipping_method': item['shipping_method'], 'method_display': dict(Carrier.SHIPPING.choices).get(item['shipping_method'], item['shipping_method']), 'state': item['state'], 'state_display': dict(Carrier.STATE.choices).get(item['state'], item['state']), 'count': item['count'], 'total_cost': item['total_shipping_cost'] or Decimal('0') } for item in shipping_stats ] @staticmethod def deutsche_post_analytics() -> Dict[str, Any]: """Get Deutsche Post shipping analytics and pricing info""" try: # Import Deutsche Post models from thirdparty.deutschepost.models import DeutschePostOrder # Get Deutsche Post orders statistics dp_orders = DeutschePostOrder.objects.all() total_dp_orders = dp_orders.count() # Get configuration for pricing config = SiteConfiguration.get_solo() dp_default_price = config.deutschepost_shipping_price # Status breakdown (if available in the model) # Note: This depends on actual DeutschePostOrder model structure return { 'total_deutsche_post_orders': total_dp_orders, 'default_shipping_price': dp_default_price, 'api_configured': bool(config.deutschepost_client_id and config.deutschepost_client_secret), 'api_endpoint': config.deutschepost_api_url, 'analysis_note': 'Detailed Deutsche Post analytics require API integration' } except ImportError: return { 'error': 'Deutsche Post module not available', 'total_deutsche_post_orders': 0, 'default_shipping_price': Decimal('0') } class ReviewAnalytics: """Product review and rating analytics""" @staticmethod def review_overview( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> Dict[str, Any]: """Get review statistics and sentiment overview""" if not start_date: start_date = timezone.now() - timedelta(days=30) if not end_date: end_date = timezone.now() reviews = Review.objects.filter(created_at__range=(start_date, end_date)) rating_distribution = ( reviews .values('rating') .annotate(count=Count('id')) .order_by('rating') ) avg_rating = reviews.aggregate(avg=Avg('rating'))['avg'] or 0 total_reviews = reviews.count() # Top rated products top_rated_products = ( Review.objects .filter(created_at__range=(start_date, end_date)) .select_related('product') .values('product__id', 'product__name') .annotate( avg_rating=Avg('rating'), review_count=Count('id') ) .filter(review_count__gte=3) # At least 3 reviews .order_by('-avg_rating')[:10] ) return { 'total_reviews': total_reviews, 'average_rating': round(avg_rating, 2), 'rating_distribution': [ { 'rating': item['rating'], 'count': item['count'], 'percentage': round(item['count'] / total_reviews * 100, 1) if total_reviews > 0 else 0 } for item in rating_distribution ], 'top_rated_products': list(top_rated_products), 'date_range': { 'start': start_date.isoformat(), 'end': end_date.isoformat() } } class AnalyticsAggregator: """Main analytics aggregator for dashboard views""" @staticmethod def dashboard_overview( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> Dict[str, Any]: """Get comprehensive dashboard data""" return { 'sales': SalesAnalytics.revenue_overview(start_date, end_date), 'products': { 'top_selling': ProductAnalytics.top_selling_products(start_date, end_date, limit=5), 'inventory': ProductAnalytics.inventory_analysis() }, 'customers': CustomerAnalytics.customer_overview(start_date, end_date), 'shipping': { 'methods': ShippingAnalytics.shipping_methods_breakdown(start_date, end_date), 'deutsche_post': ShippingAnalytics.deutsche_post_analytics() }, 'reviews': ReviewAnalytics.review_overview(start_date, end_date), 'generated_at': timezone.now().isoformat() } def get_predefined_date_ranges() -> Dict[str, Dict[str, datetime]]: """Get predefined date ranges for easy frontend integration""" now = timezone.now() return { 'today': { 'start': now.replace(hour=0, minute=0, second=0, microsecond=0), 'end': now }, 'yesterday': { 'start': (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0), 'end': (now - timedelta(days=1)).replace(hour=23, minute=59, second=59) }, 'last_7_days': { 'start': now - timedelta(days=7), 'end': now }, 'last_30_days': { 'start': now - timedelta(days=30), 'end': now }, 'last_90_days': { 'start': now - timedelta(days=90), 'end': now }, 'this_month': { 'start': now.replace(day=1, hour=0, minute=0, second=0, microsecond=0), 'end': now }, 'last_month': { 'start': (now.replace(day=1) - timedelta(days=1)).replace(day=1, hour=0, minute=0, second=0, microsecond=0), 'end': (now.replace(day=1) - timedelta(days=1)).replace(hour=23, minute=59, second=59) }, 'this_year': { 'start': now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0), 'end': now }, 'last_year': { 'start': (now.replace(month=1, day=1) - timedelta(days=365)).replace(hour=0, minute=0, second=0, microsecond=0), 'end': (now.replace(month=1, day=1) - timedelta(days=1)).replace(hour=23, minute=59, second=59) } }