from django.shortcuts import render from django.db.models import Count from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.permissions import AllowAny from rest_framework import status from django.conf import settings from django.http import StreamingHttpResponse, JsonResponse from django.utils.text import slugify from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator # docs + schema helpers from rest_framework import serializers from drf_spectacular.utils import ( extend_schema, OpenApiExample, OpenApiParameter, OpenApiTypes, OpenApiResponse, inline_serializer, ) import os import math import json import tempfile from typing import Any, Dict, List, Optional, Tuple from .models import DownloaderModel from .serializers import DownloaderLogSerializer # ---------------------- Inline serializers for documentation only ---------------------- # Using inline_serializer to avoid creating new files. FormatOptionSchema = inline_serializer( name="FormatOption", fields={ "format_id": serializers.CharField(allow_null=True), "ext": serializers.CharField(allow_null=True), "vcodec": serializers.CharField(allow_null=True), "acodec": serializers.CharField(allow_null=True), "fps": serializers.FloatField(allow_null=True), "tbr": serializers.FloatField(allow_null=True), "abr": serializers.FloatField(allow_null=True), "vbr": serializers.FloatField(allow_null=True), "asr": serializers.IntegerField(allow_null=True), "filesize": serializers.IntegerField(allow_null=True), "filesize_approx": serializers.IntegerField(allow_null=True), "estimated_size_bytes": serializers.IntegerField(allow_null=True), "size_ok": serializers.BooleanField(), "format_note": serializers.CharField(allow_null=True), "resolution": serializers.CharField(allow_null=True), "audio_only": serializers.BooleanField(), }, ) FormatsRequestSchema = inline_serializer( name="FormatsRequest", fields={"url": serializers.URLField()}, ) FormatsResponseSchema = inline_serializer( name="FormatsResponse", fields={ "title": serializers.CharField(allow_null=True), "duration": serializers.FloatField(allow_null=True), "extractor": serializers.CharField(allow_null=True), "video_id": serializers.CharField(allow_null=True), "max_size_bytes": serializers.IntegerField(), "options": serializers.ListField(child=FormatOptionSchema), }, ) DownloadRequestSchema = inline_serializer( name="DownloadRequest", fields={ "url": serializers.URLField(), "format_id": serializers.CharField(), }, ) ErrorResponseSchema = inline_serializer( name="ErrorResponse", fields={ "detail": serializers.CharField(), "error": serializers.CharField(required=False), "estimated_size_bytes": serializers.IntegerField(required=False), "max_bytes": serializers.IntegerField(required=False), }, ) # --------------------------------------------------------------------------------------- def _estimate_size_bytes(fmt: Dict[str, Any], duration: Optional[float]) -> Optional[int]: """Estimate (or return exact) size in bytes for a yt-dlp format.""" # Prefer exact sizes from yt-dlp if fmt.get("filesize"): return int(fmt["filesize"]) if fmt.get("filesize_approx"): return int(fmt["filesize_approx"]) # Estimate via total bitrate (tbr is in Kbps) if duration and fmt.get("tbr"): try: kbps = float(fmt["tbr"]) return int((kbps * 1000 / 8) * float(duration)) except Exception: return None return None def _format_option(fmt: Dict[str, Any], duration: Optional[float], max_bytes: int) -> Dict[str, Any]: """Project yt-dlp format dict to a compact option object suitable for UI.""" est = _estimate_size_bytes(fmt, duration) w = fmt.get("width") h = fmt.get("height") resolution = f"{w}x{h}" if w and h else None return { "format_id": fmt.get("format_id"), "ext": fmt.get("ext"), "vcodec": fmt.get("vcodec"), "acodec": fmt.get("acodec"), "fps": fmt.get("fps"), "tbr": fmt.get("tbr"), "abr": fmt.get("abr"), "vbr": fmt.get("vbr"), "asr": fmt.get("asr"), "filesize": fmt.get("filesize"), "filesize_approx": fmt.get("filesize_approx"), "estimated_size_bytes": est, "size_ok": (est is not None and est <= max_bytes), "format_note": fmt.get("format_note"), "resolution": resolution, "audio_only": (fmt.get("vcodec") in (None, "none")), } def _client_meta(request) -> Tuple[Optional[Any], Optional[str], Optional[str]]: """Extract current user, client IP and User-Agent.""" xff = request.META.get("HTTP_X_FORWARDED_FOR") ip = (xff.split(",")[0].strip() if xff else request.META.get("REMOTE_ADDR")) ua = request.META.get("HTTP_USER_AGENT") user = getattr(request, "user", None) return user, ip, ua class DownloaderFormatsView(APIView): """Probe media URL and return available formats with estimated sizes and limit flags.""" permission_classes = [AllowAny] @extend_schema( tags=["downloader"], operation_id="downloader_formats", summary="List available formats for a media URL", description="Uses yt-dlp to extract formats and estimates size. Applies max size policy.", request=FormatsRequestSchema, responses={ 200: FormatsResponseSchema, 400: OpenApiResponse(response=ErrorResponseSchema), 500: OpenApiResponse(response=ErrorResponseSchema), }, examples=[ OpenApiExample( "Formats request", value={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}, request_only=True, ), OpenApiExample( "Formats response (excerpt)", value={ "title": "Example Title", "duration": 213.0, "extractor": "youtube", "video_id": "dQw4w9WgXcQ", "max_size_bytes": 209715200, "options": [ { "format_id": "140", "ext": "m4a", "vcodec": "none", "acodec": "mp4a.40.2", "fps": None, "tbr": 128.0, "abr": 128.0, "vbr": None, "asr": 44100, "filesize": None, "filesize_approx": 3342334, "estimated_size_bytes": 3400000, "size_ok": True, "format_note": "tiny", "resolution": None, "audio_only": True, } ], }, response_only=True, ), ], ) def post(self, request): """POST to probe a media URL and list available formats.""" try: import yt_dlp except Exception: return Response({"detail": "yt-dlp not installed. pip install yt-dlp"}, status=500) url = request.data.get("url") if not url: return Response({"detail": "Missing 'url'."}, status=400) max_bytes = getattr(settings, "DOWNLOADER_MAX_SIZE_BYTES", 200 * 1024 * 1024) ydl_opts = { "skip_download": True, "quiet": True, "no_warnings": True, "noprogress": True, "ignoreerrors": True, "socket_timeout": getattr(settings, "DOWNLOADER_TIMEOUT", 120), "extract_flat": False, "allow_playlist": False, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) except Exception as e: # log probe error user, ip, ua = _client_meta(request) DownloaderModel.from_ydl_info( info={"webpage_url": url}, requested_format=None, status="probe_error", url=url, user=user, ip_address=ip, user_agent=ua, error_message=str(e), ) return Response({"detail": "Failed to extract formats", "error": str(e)}, status=400) duration = info.get("duration") formats = info.get("formats") or [] options: List[Dict[str, Any]] = [] for f in formats: options.append(_format_option(f, duration, max_bytes)) # optional: sort by size then by resolution desc def sort_key(o): size = o["estimated_size_bytes"] if o["estimated_size_bytes"] is not None else math.inf res = 0 if o["resolution"]: try: w, h = o["resolution"].split("x") res = int(w) * int(h) except Exception: res = 0 return (size, -res) options_sorted = sorted(options, key=sort_key)[:50] # Log probe user, ip, ua = _client_meta(request) DownloaderModel.from_ydl_info( info=info, requested_format=None, status="probe_ok", url=url, user=user, ip_address=ip, user_agent=ua, ) return Response({ "title": info.get("title"), "duration": duration, "extractor": info.get("extractor"), "video_id": info.get("id"), "max_size_bytes": max_bytes, "options": options_sorted, }) class DownloaderFileView(APIView): """Download selected format if under max size, then stream the file back.""" permission_classes = [AllowAny] @extend_schema( tags=["downloader"], operation_id="downloader_download", summary="Download a selected format and stream file", description="Downloads with a strict max filesize guard and streams as application/octet-stream.", request=DownloadRequestSchema, responses={ 200: OpenApiResponse(response=OpenApiTypes.BINARY, media_type="application/octet-stream"), 400: OpenApiResponse(response=ErrorResponseSchema), 413: OpenApiResponse(response=ErrorResponseSchema), 500: OpenApiResponse(response=ErrorResponseSchema), }, examples=[ OpenApiExample( "Download request", value={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", "format_id": "140"}, request_only=True, ), ], ) def post(self, request): """POST to download a media URL in the selected format.""" try: import yt_dlp except Exception: return Response({"detail": "yt-dlp not installed. pip install yt-dlp"}, status=500) url = request.data.get("url") fmt_id = request.data.get("format_id") if not url or not fmt_id: return Response({"detail": "Missing 'url' or 'format_id'."}, status=400) max_bytes = getattr(settings, "DOWNLOADER_MAX_SIZE_BYTES", 200 * 1024 * 1024) timeout = getattr(settings, "DOWNLOADER_TIMEOUT", 120) tmp_dir = getattr(settings, "DOWNLOADER_TMP_DIR", os.path.join(settings.BASE_DIR, "tmp", "downloader")) os.makedirs(tmp_dir, exist_ok=True) # First, extract info to check/estimate size probe_opts = { "skip_download": True, "quiet": True, "no_warnings": True, "noprogress": True, "ignoreerrors": True, "socket_timeout": timeout, "extract_flat": False, "allow_playlist": False, } try: with yt_dlp.YoutubeDL(probe_opts) as ydl: info = ydl.extract_info(url, download=False) except Exception as e: user, ip, ua = _client_meta(request) DownloaderModel.from_ydl_info( info={"webpage_url": url}, requested_format=fmt_id, status="precheck_error", url=url, user=user, ip_address=ip, user_agent=ua, error_message=str(e), ) return Response({"detail": "Failed to analyze media", "error": str(e)}, status=400) duration = info.get("duration") selected = None for f in (info.get("formats") or []): if str(f.get("format_id")) == str(fmt_id): selected = f break if not selected: return Response({"detail": f"format_id '{fmt_id}' not found"}, status=400) # Enforce size policy est_size = _estimate_size_bytes(selected, duration) if est_size is not None and est_size > max_bytes: user, ip, ua = _client_meta(request) DownloaderModel.from_ydl_info( info=selected, requested_format=fmt_id, status="blocked_by_size", url=url, user=user, ip_address=ip, user_agent=ua, error_message=f"Estimated size {est_size} > max {max_bytes}", ) return Response( {"detail": "File too large for this server", "estimated_size_bytes": est_size, "max_bytes": max_bytes}, status=413, ) # Now download with strict max_filesize guard ydl_opts = { "format": str(fmt_id), "quiet": True, "no_warnings": True, "noprogress": True, "socket_timeout": timeout, "retries": 3, "outtmpl": os.path.join(tmp_dir, "%(id)s.%(ext)s"), "max_filesize": max_bytes, # hard cap during download "concurrent_fragment_downloads": 1, "http_chunk_size": 1024 * 1024, # 1MB chunks to reduce memory } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Will raise if max_filesize exceeded during transfer result = ydl.extract_info(url, download=True) # yt-dlp returns result for entries or single; get final info if "requested_downloads" in result and result["requested_downloads"]: rd = result["requested_downloads"][0] filepath = rd.get("filepath") or rd.get("__final_filename") else: # fallback filepath = result.get("requested_downloads", [{}])[0].get("filepath") or result.get("_filename") except Exception as e: user, ip, ua = _client_meta(request) DownloaderModel.from_ydl_info( info=selected, requested_format=fmt_id, status="download_error", url=url, user=user, ip_address=ip, user_agent=ua, error_message=str(e), ) return Response({"detail": "Download failed", "error": str(e)}, status=400) if not filepath or not os.path.exists(filepath): return Response({"detail": "Downloaded file not found"}, status=500) # Build a safe filename base_title = info.get("title") or "video" ext = os.path.splitext(filepath)[1].lstrip(".") or (selected.get("ext") or "bin") safe_name = f"{slugify(base_title)[:80]}.{ext}" # Log success user, ip, ua = _client_meta(request) try: selected_info = dict(selected) selected_info["filesize"] = os.path.getsize(filepath) DownloaderModel.from_ydl_info( info=selected_info, requested_format=fmt_id, status="success", url=url, user=user, ip_address=ip, user_agent=ua, ) except Exception: pass # Stream file and remove after sending def file_generator(path: str): with open(path, "rb") as f: while True: chunk = f.read(8192) if not chunk: break yield chunk try: os.remove(path) except Exception: pass resp = StreamingHttpResponse(file_generator(filepath), content_type="application/octet-stream") resp["Content-Disposition"] = f'attachment; filename="{safe_name}"' try: resp["Content-Length"] = str(os.path.getsize(filepath)) except Exception: pass resp["X-Content-Type-Options"] = "nosniff" return resp # Simple stats view (aggregations for UI charts) class DownloaderStatsView(APIView): permission_classes = [AllowAny] @extend_schema( tags=["downloader"], operation_id="downloader_stats", summary="Aggregated downloader statistics", description="Returns top extensions, requested formats, codecs and audio/video split.", responses={ 200: inline_serializer( name="DownloaderStats", fields={ "top_ext": serializers.ListField( child=inline_serializer(name="ExtCount", fields={ "ext": serializers.CharField(allow_null=True), "count": serializers.IntegerField(), }) ), "top_requested_format": serializers.ListField( child=inline_serializer(name="RequestedFormatCount", fields={ "requested_format": serializers.CharField(allow_null=True), "count": serializers.IntegerField(), }) ), "top_vcodec": serializers.ListField( child=inline_serializer(name="VCodecCount", fields={ "vcodec": serializers.CharField(allow_null=True), "count": serializers.IntegerField(), }) ), "top_acodec": serializers.ListField( child=inline_serializer(name="ACodecCount", fields={ "acodec": serializers.CharField(allow_null=True), "count": serializers.IntegerField(), }) ), "audio_vs_video": serializers.ListField( child=inline_serializer(name="AudioVsVideo", fields={ "is_audio_only": serializers.BooleanField(), "count": serializers.IntegerField(), }) ), }, ) }, ) def get(self, request): """GET to retrieve aggregated downloader statistics.""" top_ext = list(DownloaderModel.objects.values("ext").annotate(count=Count("id")).order_by("-count")[:10]) top_formats = list(DownloaderModel.objects.values("requested_format").annotate(count=Count("id")).order_by("-count")[:10]) top_vcodec = list(DownloaderModel.objects.values("vcodec").annotate(count=Count("id")).order_by("-count")[:10]) top_acodec = list(DownloaderModel.objects.values("acodec").annotate(count=Count("id")).order_by("-count")[:10]) audio_vs_video = list(DownloaderModel.objects.values("is_audio_only").annotate(count=Count("id")).order_by("-count")) return Response({ "top_ext": top_ext, "top_requested_format": top_formats, "top_vcodec": top_vcodec, "top_acodec": top_acodec, "audio_vs_video": audio_vs_video, }) # Minimal placeholder so existing URL doesn't break; prefer using automatic logs above. class DownloaderLogView(APIView): permission_classes = [AllowAny] @extend_schema( tags=["downloader"], operation_id="downloader_log_helper", summary="Deprecated helper", description="Use /api/downloader/formats/ then /api/downloader/download/.", responses={200: inline_serializer(name="LogHelper", fields={"detail": serializers.CharField()})}, ) def post(self, request): """POST to the deprecated log helper endpoint.""" return Response({"detail": "Use /api/downloader/formats/ then /api/downloader/download/."}, status=200)