Files
vontor-cz/backend/thirdparty/downloader/views.py
David Bruno Vontor 1cec6be6d7 feat(api): generate API models and hooks for public shop configuration and commerce entities
- Added generated API hooks and models for public shop configuration, including listing and retrieving configurations.
- Introduced models for commerce categories, discount codes, orders, product images, and products with pagination and search parameters.
- Ensured all generated files are structured for easy integration with React Query.
2025-12-22 02:20:43 +01:00

467 lines
19 KiB
Python

# ---------------------- Inline serializers for documentation only ----------------------
# Using inline_serializer to avoid creating new files.
import yt_dlp
import tempfile
import os
import shutil
import mimetypes
from rest_framework import serializers
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, AllowAny
from drf_spectacular.utils import extend_schema, inline_serializer
from drf_spectacular.types import OpenApiTypes
from django.conf import settings
from django.http import StreamingHttpResponse
from django.utils.text import slugify
# NEW: aggregations and timeseries helpers
from django.db import models
from django.utils import timezone
from django.db.models.functions import TruncDay, TruncHour
from .models import DownloaderRecord
# Common container formats - user can provide any extension supported by ffmpeg
FORMAT_HELP = (
"Container format for the output file. Common formats: "
"mp4 (H.264 + AAC, most compatible), "
"mkv (flexible, lossless container), "
"webm (VP9/AV1 + Opus), "
"flv (legacy), mov (Apple-friendly), "
"avi (older), ogg, m4a (audio only), mp3 (audio only). "
"The extension will be validated by ffmpeg during conversion."
)
class Downloader(APIView):
permission_classes = [AllowAny]
authentication_classes = []
@extend_schema(
tags=["downloader", "public"],
summary="Get video info from URL",
description="""
Fetch detailed information about a video from supported platforms.
**Supported platforms:** YouTube, TikTok, Vimeo, Twitter, Instagram, Facebook, Reddit, and many more.
**Returns:**
- Video title, duration, and thumbnail
- Available video qualities/resolutions
- Available audio formats
**Usage:**
```
GET /api/downloader/download/?url=https://youtube.com/watch?v=VIDEO_ID
```
""",
parameters=[
inline_serializer(
name="VideoInfoParams",
fields={
"url": serializers.URLField(
help_text="Video URL from YouTube, TikTok, Vimeo, etc. Must be a valid URL from a supported platform."
),
},
)
],
responses={
200: inline_serializer(
name="VideoInfoResponse",
fields={
"title": serializers.CharField(help_text="Video title"),
"duration": serializers.IntegerField(allow_null=True, help_text="Video duration in seconds (null if unavailable)"),
"thumbnail": serializers.URLField(allow_null=True, help_text="URL to video thumbnail image"),
"video_resolutions": serializers.ListField(
child=serializers.CharField(),
help_text="List of available video quality options (e.g., '1080p', '720p', '480p')"
),
"audio_resolutions": serializers.ListField(
child=serializers.CharField(),
help_text="List of available audio format options"
),
},
),
400: inline_serializer(
name="ErrorResponse",
fields={"error": serializers.CharField(help_text="Error message describing what went wrong")},
),
},
)
def get(self, request):
url = request.data.get("url") or request.query_params.get("url")
if not url:
return Response({"error": "URL is required"}, status=400)
ydl_options = {
"quiet": True,
}
try:
with yt_dlp.YoutubeDL(ydl_options) as ydl:
info = ydl.extract_info(url, download=False)
except Exception:
return Response({"error": "Failed to retrieve video info"}, status=400)
formats = info.get("formats", []) or []
# Video: collect unique heights and sort desc
heights = {
int(f.get("height"))
for f in formats
if f.get("vcodec") != "none" and isinstance(f.get("height"), int)
}
video_resolutions = [f"{h}p" for h in sorted(heights, reverse=True)]
# Audio: collect unique bitrates (abr kbps), fallback to tbr when abr missing
bitrates = set()
for f in formats:
if f.get("acodec") != "none" and f.get("vcodec") == "none":
abr = f.get("abr")
tbr = f.get("tbr")
val = None
if isinstance(abr, (int, float)):
val = int(abr)
elif isinstance(tbr, (int, float)):
val = int(tbr)
if val and val > 0:
bitrates.add(val)
audio_resolutions = [f"{b}kbps" for b in sorted(bitrates, reverse=True)]
return Response(
{
"title": info.get("title"),
"duration": info.get("duration"),
"thumbnail": info.get("thumbnail"),
"video_resolutions": video_resolutions,
"audio_resolutions": audio_resolutions,
},
status=200,
)
@extend_schema(
tags=["downloader", "public"],
summary="Download video from URL",
description="""
Download video with optional quality constraints and container format conversion.
**Quality Parameters (optional):**
- If not specified, yt-dlp will automatically select the best available quality.
- `video_quality`: Maximum video height in pixels (e.g., 1080, 720, 480).
- `audio_quality`: Maximum audio bitrate in kbps (e.g., 320, 192, 128).
**Format/Extension:**
- Any format supported by ffmpeg (mp4, mkv, webm, avi, mov, flv, m4a, mp3, etc.).
- Defaults to 'mp4' if not specified.
- The conversion is handled automatically by ffmpeg in the background.
**Advanced Options:**
- `subtitles`: Download subtitles (language codes like 'en,cs' or 'all')
- `embed_subtitles`: Embed subtitles into video file
- `embed_thumbnail`: Embed thumbnail as cover art
- `extract_audio`: Extract audio only (ignores video quality)
- `start_time`: Trim start (format: HH:MM:SS or seconds)
- `end_time`: Trim end (format: HH:MM:SS or seconds)
- `playlist_items`: Download specific playlist items (e.g., '1-5,8,10')
- `cookies`: Browser cookies for age-restricted content (Netscape format)
""",
request=inline_serializer(
name="DownloadRequest",
fields={
"url": serializers.URLField(help_text="Video URL to download from supported platforms"),
"ext": serializers.CharField(
required=False,
default="mp4",
help_text=FORMAT_HELP,
),
"video_quality": serializers.IntegerField(
required=False,
allow_null=True,
help_text="Optional: Target max video height in pixels (e.g. 1080, 720). If omitted, best quality is selected."
),
"audio_quality": serializers.IntegerField(
required=False,
allow_null=True,
help_text="Optional: Target max audio bitrate in kbps (e.g. 320, 192, 128). If omitted, best quality is selected."
),
"subtitles": serializers.CharField(
required=False,
allow_null=True,
allow_blank=True,
help_text="Language codes (e.g., 'en', 'cs', 'en,cs') or 'all' for all available subtitles"
),
"embed_subtitles": serializers.BooleanField(
required=False,
default=False,
help_text="Embed subtitles into the video file (requires mkv or mp4 container)"
),
"embed_thumbnail": serializers.BooleanField(
required=False,
default=False,
help_text="Embed thumbnail as cover art in the file"
),
"extract_audio": serializers.BooleanField(
required=False,
default=False,
help_text="Extract audio only, ignoring video quality settings"
),
"start_time": serializers.CharField(
required=False,
allow_null=True,
allow_blank=True,
help_text="Start time for trimming (format: HH:MM:SS or seconds as integer)"
),
"end_time": serializers.CharField(
required=False,
allow_null=True,
allow_blank=True,
help_text="End time for trimming (format: HH:MM:SS or seconds as integer)"
),
"playlist_items": serializers.CharField(
required=False,
allow_null=True,
allow_blank=True,
help_text="Playlist items to download (e.g., '1-5,8,10' or '1,2,3')"
),
"cookies": serializers.CharField(
required=False,
allow_null=True,
allow_blank=True,
help_text="Browser cookies in Netscape format for age-restricted content. Export from browser extensions like 'Get cookies.txt'"
),
},
),
responses={
200: OpenApiTypes.BINARY,
400: inline_serializer(
name="DownloadErrorResponse",
fields={
"error": serializers.CharField(),
},
),
},
)
def post(self, request):
url = request.data.get("url")
# Accept ext parameter, default to mp4
ext = request.data.get("ext", "mp4")
# Optional quality parameters - only parse if provided
video_quality = None
audio_quality = None
if request.data.get("video_quality"):
try:
video_quality = int(request.data.get("video_quality"))
except (ValueError, TypeError):
return Response({"error": "Invalid video_quality parameter, must be an integer!"}, status=400)
if request.data.get("audio_quality"):
try:
audio_quality = int(request.data.get("audio_quality"))
except (ValueError, TypeError):
return Response({"error": "Invalid audio_quality parameter, must be an integer!"}, status=400)
# Advanced options
subtitles = request.data.get("subtitles")
embed_subtitles = request.data.get("embed_subtitles", False)
embed_thumbnail = request.data.get("embed_thumbnail", False)
extract_audio = request.data.get("extract_audio", False)
start_time = request.data.get("start_time")
end_time = request.data.get("end_time")
playlist_items = request.data.get("playlist_items")
cookies = request.data.get("cookies")
if not url:
return Response({"error": "URL is required"}, status=400)
if not ext or not isinstance(ext, str):
return Response({"error": "Extension must be a valid string"}, status=400)
# Ensure base tmp dir exists
os.makedirs(settings.DOWNLOADER_TMP_DIR, exist_ok=True)
tmpdir = tempfile.mkdtemp(prefix="downloader_", dir=settings.DOWNLOADER_TMP_DIR)
outtmpl = os.path.join(tmpdir, "download.%(ext)s")
# Build a format selector using optional quality caps
# If quality params are not provided, use best quality
if video_quality is not None and audio_quality is not None:
# Both specified: "bv[height<=1080]+ba[abr<=160]/b"
format_selector = f"bv[height<={video_quality}]+ba[abr<={audio_quality}]/b"
elif video_quality is not None:
# Only video quality specified: "bv[height<=1080]+ba/b"
format_selector = f"bv[height<={video_quality}]+ba/b"
elif audio_quality is not None:
# Only audio quality specified: "bv+ba[abr<=160]/b"
format_selector = f"bv+ba[abr<={audio_quality}]/b"
else:
# No quality constraints, let yt-dlp choose best
format_selector = "b/bv+ba"
ydl_options = {
"format": format_selector, # select by requested quality
"merge_output_format": ext, # container
"outtmpl": outtmpl, # temp dir
"quiet": True,
"max_filesize": settings.DOWNLOADER_MAX_SIZE_BYTES,
"socket_timeout": settings.DOWNLOADER_TIMEOUT,
"postprocessors": [],
}
# Handle cookies for age-restricted content
if cookies:
# Save cookies to a temporary file
cookie_file = os.path.join(tmpdir, "cookies.txt")
try:
with open(cookie_file, "w") as f:
f.write(cookies)
ydl_options["cookiefile"] = cookie_file
except Exception as e:
shutil.rmtree(tmpdir, ignore_errors=True)
return Response({"error": f"Invalid cookies format: {str(e)}"}, status=400)
# Subtitles
if subtitles:
if subtitles.lower() == "all":
ydl_options["writesubtitles"] = True
ydl_options["writeautomaticsub"] = True
ydl_options["subtitleslangs"] = ["all"]
else:
ydl_options["writesubtitles"] = True
ydl_options["subtitleslangs"] = [lang.strip() for lang in subtitles.split(",")]
# Embed subtitles (only for mkv/mp4)
if embed_subtitles and subtitles:
if ext in ["mkv", "mp4"]:
ydl_options["postprocessors"].append({"key": "FFmpegEmbedSubtitle"})
else:
shutil.rmtree(tmpdir, ignore_errors=True)
return Response({"error": "Subtitle embedding requires mkv or mp4 format"}, status=400)
# Embed thumbnail
if embed_thumbnail:
ydl_options["writethumbnail"] = True
ydl_options["postprocessors"].append({"key": "EmbedThumbnail"})
# Extract audio only
if extract_audio:
ydl_options["postprocessors"].append({
"key": "FFmpegExtractAudio",
"preferredcodec": ext if ext in ["mp3", "m4a", "opus", "vorbis", "wav"] else "mp3",
})
# Download sections (trim)
if start_time or end_time:
download_ranges = {}
if start_time:
download_ranges["start_time"] = start_time
if end_time:
download_ranges["end_time"] = end_time
ydl_options["download_ranges"] = lambda info_dict, ydl: [download_ranges]
# Playlist items
if playlist_items:
ydl_options["playlist_items"] = playlist_items
# Add remux postprocessor if not extracting audio
if not extract_audio:
ydl_options["postprocessors"].append(
{"key": "FFmpegVideoRemuxer", "preferedformat": ext}
)
file_path = ""
try:
with yt_dlp.YoutubeDL(ydl_options) as ydl:
info = ydl.extract_info(url, download=True)
base = ydl.prepare_filename(info)
file_path = base if base.endswith(f".{ext}") else os.path.splitext(base)[0] + f".{ext}"
# Stats before streaming
duration = int((info or {}).get("duration") or 0)
size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
DownloaderRecord.objects.create(
url=url,
format=ext,
length_of_media=duration,
file_size=size,
)
# Streaming generator that deletes file & temp dir after send (or on abort)
def stream_and_cleanup(path: str, temp_dir: str, chunk_size: int = 8192):
try:
with open(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
finally:
try:
if os.path.exists(path):
os.remove(path)
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
safe_title = slugify(info.get("title") or "video")
filename = f"{safe_title}.{ext}"
content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
response = StreamingHttpResponse(
streaming_content=stream_and_cleanup(file_path, tmpdir),
content_type=content_type,
)
if size:
response["Content-Length"] = str(size)
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
except Exception as e:
shutil.rmtree(tmpdir, ignore_errors=True)
return Response({"error": str(e)}, status=400)
# ---------------- STATS FOR GRAPHS ----------------
from .serializers import DownloaderStatsSerializer
from django.db.models import Count, Avg, Sum
class DownloaderStats(APIView):
"""
Vrací agregované statistiky z tabulky DownloaderRecord.
"""
authentication_classes = []
permission_classes = [AllowAny]
@extend_schema(
tags=["downloader", "public"],
summary="Get aggregated downloader statistics",
responses={200: DownloaderStatsSerializer},
)
def get(self, request):
# agregace číselných polí
agg = DownloaderRecord.objects.aggregate(
total_downloads=Count("id"),
avg_length_of_media=Avg("length_of_media"),
avg_file_size=Avg("file_size"),
total_length_of_media=Sum("length_of_media"),
total_file_size=Sum("file_size"),
)
# zjištění nejčastějšího formátu
most_common = (
DownloaderRecord.objects.values("format")
.annotate(count=Count("id"))
.order_by("-count")
.first()
)
agg["most_common_format"] = most_common["format"] if most_common else None
serializer = DownloaderStatsSerializer(agg)
return Response(serializer.data)