# ---------------------- Inline serializers for documentation only ---------------------- # Using inline_serializer to avoid creating new files. import asyncio import os import shutil import yt_dlp import requests import base64 from urllib.parse import urlparse from .consumers import TOKEN_TTL from django.core import signing from rest_framework import serializers from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.permissions import AllowAny from drf_spectacular.utils import extend_schema, inline_serializer from drf_spectacular.types import OpenApiTypes from django.conf import settings from django.http import StreamingHttpResponse from django.utils.text import slugify from .models import DownloaderRecord # Common container formats - user can provide any extension supported by ffmpeg FORMAT_HELP = ( "Container format for the output file. Common formats: " "mp4 (H.264 + AAC, most compatible), " "mkv (flexible, lossless container), " "webm (VP9/AV1 + Opus), " "flv (legacy), mov (Apple-friendly), " "avi (older), ogg, m4a (audio only), mp3 (audio only). " "The extension will be validated by ffmpeg during conversion." ) class Downloader(APIView): permission_classes = [AllowAny] authentication_classes = [] @extend_schema( tags=["downloader", "public"], summary="Get video info from URL", description=""" Fetch detailed information about a video or playlist from supported platforms. **Supported platforms:** YouTube, TikTok, Vimeo, Twitter, Instagram, Facebook, Reddit, and many more. **Returns:** For single videos: - Video title, duration, and thumbnail - Available video qualities/resolutions - Available audio formats For playlists: - Array of videos with the same info structure as single videos - Each video includes title, duration, thumbnail, and available qualities **Usage:** ``` GET /api/downloader/download/?url=https://youtube.com/watch?v=VIDEO_ID GET /api/downloader/download/?url=https://youtube.com/playlist?list=PLAYLIST_ID ``` """, parameters=[ inline_serializer( name="VideoInfoParams", fields={ "url": serializers.URLField( help_text="Video/Playlist URL from YouTube, TikTok, Vimeo, etc. Must be a valid URL from a supported platform." ), }, ) ], responses={ 200: inline_serializer( name="VideoInfoResponse", fields={ "is_playlist": serializers.BooleanField(help_text="Whether the URL is a playlist"), "playlist_title": serializers.CharField(allow_null=True, help_text="Playlist title (if applicable)"), "playlist_count": serializers.IntegerField(allow_null=True, help_text="Number of videos in playlist (if applicable)"), "videos": serializers.ListField( child=inline_serializer( name="VideoInfo", fields={ "id": serializers.CharField(help_text="Video ID"), "title": serializers.CharField(help_text="Video title"), "duration": serializers.IntegerField(allow_null=True, help_text="Video duration in seconds (null if unavailable)"), "thumbnail": serializers.CharField(allow_null=True, help_text="Base64 encoded thumbnail image as data URL (e.g., data:image/jpeg;base64,...)"), "video_resolutions": serializers.ListField( child=serializers.CharField(), help_text="List of available video quality options (e.g., '1080p', '720p', '480p')" ), "audio_resolutions": serializers.ListField( child=serializers.CharField(), help_text="List of available audio format options" ), } ), help_text="Array of video information (single video for individual URLs, multiple for playlists)" ), }, ), 400: inline_serializer( name="ErrorResponse", fields={"error": serializers.CharField(help_text="Error message describing what went wrong")}, ), }, ) def get(self, request): url = request.data.get("url") or request.query_params.get("url") if not url: return Response({"error": "URL is required"}, status=400) ydl_options = { "quiet": True, "no_check_certificates": True, # Bypass SSL verification in Docker "extract_flat": False, # Extract full info for playlists too "ignoreerrors": False, # Don't ignore errors to get accurate info "js_runtimes": {"node": {}}, "remote_components": {"ejs:github"}, } try: with yt_dlp.YoutubeDL(ydl_options) as ydl: info = ydl.extract_info(url, download=False) except Exception as e: return Response({"error": f"Failed to retrieve video info: {str(e)}"}, status=400) def extract_video_info(video_data): """Extract video info from yt-dlp data""" formats = video_data.get("formats", []) or [] # Video: collect unique heights and sort desc (highest quality first) heights = { int(f.get("height")) for f in formats if f.get("vcodec") != "none" and isinstance(f.get("height"), int) } video_resolutions = [f"{h}p" for h in sorted(heights, reverse=True)] # Audio: collect unique bitrates (abr kbps), fallback to tbr when abr missing bitrates = set() for f in formats: if f.get("acodec") != "none" and f.get("vcodec") == "none": abr = f.get("abr") tbr = f.get("tbr") val = None if isinstance(abr, (int, float)): val = int(abr) elif isinstance(tbr, (int, float)): val = int(tbr) if val and val > 0: bitrates.add(val) audio_resolutions = [f"{b}kbps" for b in sorted(bitrates, reverse=True)] # Fetch thumbnail and convert to base64 blob thumbnail_blob = None thumbnail_url = video_data.get("thumbnail") if thumbnail_url: try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8', } response = requests.get(thumbnail_url, headers=headers, timeout=10) response.raise_for_status() if response.headers.get('content-type', '').startswith('image/'): # Convert to base64 image_data = base64.b64encode(response.content).decode('utf-8') content_type = response.headers.get('content-type', 'image/jpeg') thumbnail_blob = f"data:{content_type};base64,{image_data}" except Exception: # If thumbnail fetch fails, just continue without it pass return { "id": video_data.get("id", ""), "title": video_data.get("title", ""), "duration": video_data.get("duration"), "thumbnail": thumbnail_blob, # Now a base64 blob instead of URL "video_resolutions": video_resolutions, "audio_resolutions": audio_resolutions, } # Check if this is a playlist is_playlist = "entries" in info and info.get("entries") is not None if is_playlist: # Handle playlist videos = [] entries = info.get("entries", []) for entry in entries: if entry: # Skip None entries try: # For playlist entries, we need to extract full info if not already available if not entry.get("formats"): # Re-extract with full info for this specific video with yt_dlp.YoutubeDL(ydl_options) as ydl: full_entry = ydl.extract_info(entry.get("url") or entry.get("webpage_url"), download=False) videos.append(extract_video_info(full_entry)) else: videos.append(extract_video_info(entry)) except Exception as e: # Skip videos that fail to extract, but don't fail the entire request continue return Response({ "is_playlist": True, "playlist_title": info.get("title"), "playlist_count": len(videos), "videos": videos, }, status=200) else: # Handle single video video_info = extract_video_info(info) return Response({ "is_playlist": False, "playlist_title": None, "playlist_count": None, "videos": [video_info], }, status=200) @extend_schema( tags=["downloader", "public"], summary="Download file via signed token", description="Serve a file using a signed token from WebSocket download. Token expires in 10 minutes.", parameters=[ inline_serializer( name="DownloadTokenParams", fields={ "token": serializers.CharField(help_text="Signed token containing file info"), }, ) ], responses={ 200: OpenApiTypes.BINARY, 400: inline_serializer(name="TokenError", fields={"error": serializers.CharField()}), }, ) def post(self, request): token = request.data.get("token") if not token: return Response({"error": "Token is required"}, status=400) try: data = signing.loads(token, salt="downloader-file-token", max_age=TOKEN_TTL) except signing.BadSignature: return Response({"error": "Invalid token"}, status=400) except signing.SignatureExpired: return Response({"error": "Token expired"}, status=400) file_path = data["file_path"] tmpdir = data["tmpdir"] if not file_path or not os.path.exists(file_path): return Response({"error": "File no longer available"}, status=400) async def stream_and_cleanup(path: str, temp_dir: str, chunk_size: int = 8192): try: with open(path, "rb") as f: while True: chunk = await asyncio.to_thread(f.read, chunk_size) if not chunk: break yield chunk finally: shutil.rmtree(temp_dir, ignore_errors=True) response = StreamingHttpResponse( streaming_content=stream_and_cleanup(file_path, tmpdir), content_type=data["content_type"] or "application/octet-stream", ) if data["file_size"]: response["Content-Length"] = str(data["file_size"]) response["Content-Disposition"] = f'attachment; filename="{data["filename"]}"' return response # ---------------- STATS FOR GRAPHS ---------------- from .serializers import DownloaderStatsSerializer class DownloaderStats(APIView): """ Vrací agregované statistiky z tabulky DownloaderRecord. """ authentication_classes = [] permission_classes = [AllowAny] @extend_schema( tags=["downloader", "public"], summary="Get aggregated downloader statistics", responses={200: DownloaderStatsSerializer}, ) def get(self, request): return Response(DownloaderStatsSerializer(DownloaderRecord.objects.all()).data)