Files
vontor-cz/backend/thirdparty/downloader/serializers.py
2026-04-20 00:04:15 +02:00

111 lines
4.3 KiB
Python

from rest_framework import serializers
from django.db.models import Count, Avg, Sum, Q
from django.db.models.functions import TruncDay, TruncHour
class DownloaderRecordSerializer(serializers.Serializer):
"""Single download history entry."""
id = serializers.IntegerField()
url = serializers.URLField()
title = serializers.CharField()
platform = serializers.CharField()
format = serializers.CharField()
video_quality = serializers.IntegerField(allow_null=True)
is_audio_only = serializers.BooleanField()
length_of_media = serializers.IntegerField(allow_null=True)
file_size = serializers.IntegerField(allow_null=True)
processing_time = serializers.FloatField(allow_null=True)
success = serializers.BooleanField()
error_message = serializers.CharField()
download_time = serializers.DateTimeField()
class PlatformCountSerializer(serializers.Serializer):
platform = serializers.CharField()
count = serializers.IntegerField()
class QualityCountSerializer(serializers.Serializer):
video_quality = serializers.IntegerField(allow_null=True)
count = serializers.IntegerField()
class TimeseriesPointSerializer(serializers.Serializer):
period = serializers.DateTimeField()
count = serializers.IntegerField()
class TopUrlSerializer(serializers.Serializer):
url = serializers.URLField()
title = serializers.CharField()
count = serializers.IntegerField()
class DownloaderStatsSerializer(serializers.Serializer):
# Totals
total_downloads = serializers.IntegerField()
successful_downloads = serializers.IntegerField()
failed_downloads = serializers.IntegerField()
success_rate = serializers.FloatField(help_text="Percentage 0-100")
# Media metrics
avg_length_of_media = serializers.FloatField(allow_null=True)
total_length_of_media = serializers.IntegerField(allow_null=True)
avg_file_size = serializers.FloatField(allow_null=True)
total_file_size = serializers.IntegerField(allow_null=True)
avg_processing_time = serializers.FloatField(allow_null=True)
# Format / quality breakdown
most_common_format = serializers.CharField(allow_null=True)
audio_only_count = serializers.IntegerField()
video_count = serializers.IntegerField()
downloads_by_platform = PlatformCountSerializer(many=True)
downloads_by_quality = QualityCountSerializer(many=True)
# Top content
most_downloaded_urls = TopUrlSerializer(many=True)
# Timeseries
downloads_per_day = TimeseriesPointSerializer(many=True)
downloads_per_hour = TimeseriesPointSerializer(many=True)
def to_representation(self, qs):
agg = qs.aggregate(
total_downloads=Count('id'),
successful_downloads=Count('id', filter=Q(success=True)),
failed_downloads=Count('id', filter=Q(success=False)),
avg_length_of_media=Avg('length_of_media'),
total_length_of_media=Sum('length_of_media'),
avg_file_size=Avg('file_size'),
total_file_size=Sum('file_size'),
avg_processing_time=Avg('processing_time'),
audio_only_count=Count('id', filter=Q(is_audio_only=True)),
video_count=Count('id', filter=Q(is_audio_only=False)),
)
total = agg['total_downloads'] or 0
agg['success_rate'] = round(agg['successful_downloads'] / total * 100, 2) if total else 0.0
most_common = qs.values('format').annotate(count=Count('id')).order_by('-count').first()
agg['most_common_format'] = most_common['format'] if most_common else None
agg['downloads_by_platform'] = list(
qs.values('platform').annotate(count=Count('id')).order_by('-count')
)
agg['downloads_by_quality'] = list(
qs.values('video_quality').annotate(count=Count('id')).order_by('-count')
)
agg['most_downloaded_urls'] = list(
qs.values('url', 'title').annotate(count=Count('id')).order_by('-count')[:10]
)
agg['downloads_per_day'] = list(
qs.annotate(period=TruncDay('download_time'))
.values('period').annotate(count=Count('id')).order_by('period')
)
agg['downloads_per_hour'] = list(
qs.annotate(period=TruncHour('download_time'))
.values('period').annotate(count=Count('id')).order_by('period')
)
return super().to_representation(agg)