Files
vontor-cz/backend/thirdparty/downloader/views.py

307 lines
13 KiB
Python

# ---------------------- Inline serializers for documentation only ----------------------
# Using inline_serializer to avoid creating new files.
import asyncio
import os
import shutil
import yt_dlp
import requests
import base64
from urllib.parse import urlparse
from .consumers import TOKEN_TTL
from django.core import signing
from rest_framework import serializers
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import AllowAny
from drf_spectacular.utils import extend_schema, inline_serializer
from drf_spectacular.types import OpenApiTypes
from django.conf import settings
from django.http import StreamingHttpResponse
from django.utils.text import slugify
from .models import DownloaderRecord
# Common container formats - user can provide any extension supported by ffmpeg
FORMAT_HELP = (
"Container format for the output file. Common formats: "
"mp4 (H.264 + AAC, most compatible), "
"mkv (flexible, lossless container), "
"webm (VP9/AV1 + Opus), "
"flv (legacy), mov (Apple-friendly), "
"avi (older), ogg, m4a (audio only), mp3 (audio only). "
"The extension will be validated by ffmpeg during conversion."
)
class Downloader(APIView):
permission_classes = [AllowAny]
authentication_classes = []
@extend_schema(
tags=["downloader", "public"],
summary="Get video info from URL",
description="""
Fetch detailed information about a video or playlist from supported platforms.
**Supported platforms:** YouTube, TikTok, Vimeo, Twitter, Instagram, Facebook, Reddit, and many more.
**Returns:**
For single videos:
- Video title, duration, and thumbnail
- Available video qualities/resolutions
- Available audio formats
For playlists:
- Array of videos with the same info structure as single videos
- Each video includes title, duration, thumbnail, and available qualities
**Usage:**
```
GET /api/downloader/download/?url=https://youtube.com/watch?v=VIDEO_ID
GET /api/downloader/download/?url=https://youtube.com/playlist?list=PLAYLIST_ID
```
""",
parameters=[
inline_serializer(
name="VideoInfoParams",
fields={
"url": serializers.URLField(
help_text="Video/Playlist URL from YouTube, TikTok, Vimeo, etc. Must be a valid URL from a supported platform."
),
},
)
],
responses={
200: inline_serializer(
name="VideoInfoResponse",
fields={
"is_playlist": serializers.BooleanField(help_text="Whether the URL is a playlist"),
"playlist_title": serializers.CharField(allow_null=True, help_text="Playlist title (if applicable)"),
"playlist_count": serializers.IntegerField(allow_null=True, help_text="Number of videos in playlist (if applicable)"),
"videos": serializers.ListField(
child=inline_serializer(
name="VideoInfo",
fields={
"id": serializers.CharField(help_text="Video ID"),
"title": serializers.CharField(help_text="Video title"),
"duration": serializers.IntegerField(allow_null=True, help_text="Video duration in seconds (null if unavailable)"),
"thumbnail": serializers.CharField(allow_null=True, help_text="Base64 encoded thumbnail image as data URL (e.g., data:image/jpeg;base64,...)"),
"video_resolutions": serializers.ListField(
child=serializers.CharField(),
help_text="List of available video quality options (e.g., '1080p', '720p', '480p')"
),
"audio_resolutions": serializers.ListField(
child=serializers.CharField(),
help_text="List of available audio format options"
),
}
),
help_text="Array of video information (single video for individual URLs, multiple for playlists)"
),
},
),
400: inline_serializer(
name="ErrorResponse",
fields={"error": serializers.CharField(help_text="Error message describing what went wrong")},
),
},
)
def get(self, request):
url = request.data.get("url") or request.query_params.get("url")
if not url:
return Response({"error": "URL is required"}, status=400)
ydl_options = {
"quiet": True,
"no_check_certificates": True, # Bypass SSL verification in Docker
"extract_flat": False, # Extract full info for playlists too
"ignoreerrors": False, # Don't ignore errors to get accurate info
"js_runtimes": {"node": {}},
"remote_components": {"ejs:github"},
}
try:
with yt_dlp.YoutubeDL(ydl_options) as ydl:
info = ydl.extract_info(url, download=False)
except Exception as e:
return Response({"error": f"Failed to retrieve video info: {str(e)}"}, status=400)
def extract_video_info(video_data):
"""Extract video info from yt-dlp data"""
formats = video_data.get("formats", []) or []
# Video: collect unique heights and sort desc (highest quality first)
heights = {
int(f.get("height"))
for f in formats
if f.get("vcodec") != "none" and isinstance(f.get("height"), int)
}
video_resolutions = [f"{h}p" for h in sorted(heights, reverse=True)]
# Audio: collect unique bitrates (abr kbps), fallback to tbr when abr missing
bitrates = set()
for f in formats:
if f.get("acodec") != "none" and f.get("vcodec") == "none":
abr = f.get("abr")
tbr = f.get("tbr")
val = None
if isinstance(abr, (int, float)):
val = int(abr)
elif isinstance(tbr, (int, float)):
val = int(tbr)
if val and val > 0:
bitrates.add(val)
audio_resolutions = [f"{b}kbps" for b in sorted(bitrates, reverse=True)]
# Fetch thumbnail and convert to base64 blob
thumbnail_blob = None
thumbnail_url = video_data.get("thumbnail")
if thumbnail_url:
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
}
response = requests.get(thumbnail_url, headers=headers, timeout=10)
response.raise_for_status()
if response.headers.get('content-type', '').startswith('image/'):
# Convert to base64
image_data = base64.b64encode(response.content).decode('utf-8')
content_type = response.headers.get('content-type', 'image/jpeg')
thumbnail_blob = f"data:{content_type};base64,{image_data}"
except Exception:
# If thumbnail fetch fails, just continue without it
pass
return {
"id": video_data.get("id", ""),
"title": video_data.get("title", ""),
"duration": video_data.get("duration"),
"thumbnail": thumbnail_blob, # Now a base64 blob instead of URL
"video_resolutions": video_resolutions,
"audio_resolutions": audio_resolutions,
}
# Check if this is a playlist
is_playlist = "entries" in info and info.get("entries") is not None
if is_playlist:
# Handle playlist
videos = []
entries = info.get("entries", [])
for entry in entries:
if entry: # Skip None entries
try:
# For playlist entries, we need to extract full info if not already available
if not entry.get("formats"):
# Re-extract with full info for this specific video
with yt_dlp.YoutubeDL(ydl_options) as ydl:
full_entry = ydl.extract_info(entry.get("url") or entry.get("webpage_url"), download=False)
videos.append(extract_video_info(full_entry))
else:
videos.append(extract_video_info(entry))
except Exception as e:
# Skip videos that fail to extract, but don't fail the entire request
continue
return Response({
"is_playlist": True,
"playlist_title": info.get("title"),
"playlist_count": len(videos),
"videos": videos,
}, status=200)
else:
# Handle single video
video_info = extract_video_info(info)
return Response({
"is_playlist": False,
"playlist_title": None,
"playlist_count": None,
"videos": [video_info],
}, status=200)
@extend_schema(
tags=["downloader", "public"],
summary="Download file via signed token",
description="Serve a file using a signed token from WebSocket download. Token expires in 10 minutes.",
parameters=[
inline_serializer(
name="DownloadTokenParams",
fields={
"token": serializers.CharField(help_text="Signed token containing file info"),
},
)
],
responses={
200: OpenApiTypes.BINARY,
400: inline_serializer(name="TokenError", fields={"error": serializers.CharField()}),
},
)
def post(self, request):
token = request.data.get("token")
if not token:
return Response({"error": "Token is required"}, status=400)
try:
data = signing.loads(token, salt="downloader-file-token", max_age=TOKEN_TTL)
except signing.BadSignature:
return Response({"error": "Invalid token"}, status=400)
except signing.SignatureExpired:
return Response({"error": "Token expired"}, status=400)
file_path = data["file_path"]
tmpdir = data["tmpdir"]
if not file_path or not os.path.exists(file_path):
return Response({"error": "File no longer available"}, status=400)
async def stream_and_cleanup(path: str, temp_dir: str, chunk_size: int = 8192):
try:
with open(path, "rb") as f:
while True:
chunk = await asyncio.to_thread(f.read, chunk_size)
if not chunk:
break
yield chunk
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
response = StreamingHttpResponse(
streaming_content=stream_and_cleanup(file_path, tmpdir),
content_type=data["content_type"] or "application/octet-stream",
)
if data["file_size"]:
response["Content-Length"] = str(data["file_size"])
response["Content-Disposition"] = f'attachment; filename="{data["filename"]}"'
return response
# ---------------- STATS FOR GRAPHS ----------------
from .serializers import DownloaderStatsSerializer
class DownloaderStats(APIView):
"""
Vrací agregované statistiky z tabulky DownloaderRecord.
"""
authentication_classes = []
permission_classes = [AllowAny]
@extend_schema(
tags=["downloader", "public"],
summary="Get aggregated downloader statistics",
responses={200: DownloaderStatsSerializer},
)
def get(self, request):
return Response(DownloaderStatsSerializer(DownloaderRecord.objects.all()).data)