[YouTube] Rework n-sig processing, realigning with yt-dlp
authordirkf <fieldhouse@gmx.net>
Mon, 15 Jan 2024 18:34:21 +0000 (18:34 +0000)
committerdirkf <fieldhouse@gmx.net>
Mon, 22 Jan 2024 11:10:34 +0000 (11:10 +0000)
* apply n-sig before chunked fragments, fixes #32692

youtube_dl/extractor/youtube.py

index 3bf483c1c8a7263274112f2ac858e08bbeab2da9..cd4b3ef60dbcc542e2fc110e2a3e3cd06cc54bd2 100644 (file)
@@ -2,6 +2,7 @@
 
 from __future__ import unicode_literals
 
+import collections
 import itertools
 import json
 import os.path
@@ -23,10 +24,10 @@ from ..compat import (
 )
 from ..jsinterp import JSInterpreter
 from ..utils import (
-    ExtractorError,
     clean_html,
     dict_get,
     error_to_compat_str,
+    ExtractorError,
     float_or_none,
     extract_attributes,
     get_element_by_attribute,
@@ -36,6 +37,7 @@ from ..utils import (
     LazyList,
     merge_dicts,
     mimetype2ext,
+    NO_DEFAULT,
     parse_codecs,
     parse_duration,
     parse_qs,
@@ -45,6 +47,7 @@ from ..utils import (
     str_or_none,
     str_to_int,
     traverse_obj,
+    try_call,
     try_get,
     txt_or_none,
     unescapeHTML,
@@ -1460,6 +1463,30 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
         self._code_cache = {}
         self._player_cache = {}
 
+    # *ytcfgs, webpage=None
+    def _extract_player_url(self, *ytcfgs, **kw_webpage):
+        if ytcfgs and not isinstance(ytcfgs[0], dict):
+            webpage = kw_webpage.get('webpage') or ytcfgs[0]
+        if webpage:
+            player_url = self._search_regex(
+                r'"(?:PLAYER_JS_URL|jsUrl)"\s*:\s*"([^"]+)"',
+                webpage or '', 'player URL', fatal=False)
+            if player_url:
+                ytcfgs = ytcfgs + ({'PLAYER_JS_URL': player_url},)
+        return traverse_obj(
+            ytcfgs, (Ellipsis, 'PLAYER_JS_URL'), (Ellipsis, 'WEB_PLAYER_CONTEXT_CONFIGS', Ellipsis, 'jsUrl'),
+            get_all=False, expected_type=lambda u: urljoin('https://www.youtube.com', u))
+
+    def _download_player_url(self, video_id, fatal=False):
+        res = self._download_webpage(
+            'https://www.youtube.com/iframe_api',
+            note='Downloading iframe API JS', video_id=video_id, fatal=fatal)
+        player_version = self._search_regex(
+            r'player\\?/([0-9a-fA-F]{8})\\?/', res or '', 'player version', fatal=fatal,
+            default=NO_DEFAULT if res else None)
+        if player_version:
+            return 'https://www.youtube.com/s/player/{0}/player_ias.vflset/en_US/base.js'.format(player_version)
+
     def _signature_cache_id(self, example_sig):
         """ Return a string representation of a signature """
         return '.'.join(compat_str(len(part)) for part in example_sig.split('.'))
@@ -1474,46 +1501,49 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
             raise ExtractorError('Cannot identify player %r' % player_url)
         return id_m.group('id')
 
-    def _get_player_code(self, video_id, player_url, player_id=None):
+    def _load_player(self, video_id, player_url, fatal=True, player_id=None):
         if not player_id:
             player_id = self._extract_player_info(player_url)
-
         if player_id not in self._code_cache:
-            self._code_cache[player_id] = self._download_webpage(
-                player_url, video_id,
+            code = self._download_webpage(
+                player_url, video_id, fatal=fatal,
                 note='Downloading player ' + player_id,
                 errnote='Download of %s failed' % player_url)
-        return self._code_cache[player_id]
+            if code:
+                self._code_cache[player_id] = code
+        return self._code_cache[player_id] if fatal else self._code_cache.get(player_id)
 
     def _extract_signature_function(self, video_id, player_url, example_sig):
         player_id = self._extract_player_info(player_url)
 
         # Read from filesystem cache
-        func_id = 'js_%s_%s' % (
+        func_id = 'js_{0}_{1}'.format(
             player_id, self._signature_cache_id(example_sig))
         assert os.path.basename(func_id) == func_id
 
-        cache_spec = self._downloader.cache.load('youtube-sigfuncs', func_id)
-        if cache_spec is not None:
-            return lambda s: ''.join(s[i] for i in cache_spec)
+        self.write_debug('Extracting signature function {0}'.format(func_id))
+        cache_spec, code = self.cache.load('youtube-sigfuncs', func_id), None
 
-        code = self._get_player_code(video_id, player_url, player_id)
-        res = self._parse_sig_js(code)
+        if not cache_spec:
+            code = self._load_player(video_id, player_url, player_id)
+        if code:
+            res = self._parse_sig_js(code)
+            test_string = ''.join(map(compat_chr, range(len(example_sig))))
+            cache_spec = [ord(c) for c in res(test_string)]
+            self.cache.store('youtube-sigfuncs', func_id, cache_spec)
 
-        test_string = ''.join(map(compat_chr, range(len(example_sig))))
-        cache_res = res(test_string)
-        cache_spec = [ord(c) for c in cache_res]
-
-        self._downloader.cache.store('youtube-sigfuncs', func_id, cache_spec)
-        return res
+        return lambda s: ''.join(s[i] for i in cache_spec)
 
     def _print_sig_code(self, func, example_sig):
+        if not self.get_param('youtube_print_sig_code'):
+            return
+
         def gen_sig_code(idxs):
             def _genslice(start, end, step):
                 starts = '' if start == 0 else str(start)
                 ends = (':%d' % (end + step)) if end + step >= 0 else ':'
                 steps = '' if step == 1 else (':%d' % step)
-                return 's[%s%s%s]' % (starts, ends, steps)
+                return 's[{0}{1}{2}]'.format(starts, ends, steps)
 
             step = None
             # Quelch pyflakes warnings - start will be set when step is set
@@ -1564,143 +1594,137 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
             jscode, 'Initial JS player signature function name', group='sig')
 
         jsi = JSInterpreter(jscode)
-
         initial_function = jsi.extract_function(funcname)
-
         return lambda s: initial_function([s])
 
-    def _decrypt_signature(self, s, video_id, player_url):
-        """Turn the encrypted s field into a working signature"""
+    def _cached(self, func, *cache_id):
+        def inner(*args, **kwargs):
+            if cache_id not in self._player_cache:
+                try:
+                    self._player_cache[cache_id] = func(*args, **kwargs)
+                except ExtractorError as e:
+                    self._player_cache[cache_id] = e
+                except Exception as e:
+                    self._player_cache[cache_id] = ExtractorError(traceback.format_exc(), cause=e)
 
-        if player_url is None:
-            raise ExtractorError('Cannot decrypt signature without player_url')
+            ret = self._player_cache[cache_id]
+            if isinstance(ret, Exception):
+                raise ret
+            return ret
+        return inner
 
-        try:
-            player_id = (player_url, self._signature_cache_id(s))
-            if player_id not in self._player_cache:
-                func = self._extract_signature_function(
-                    video_id, player_url, s
-                )
-                self._player_cache[player_id] = func
-            func = self._player_cache[player_id]
-            if self._downloader.params.get('youtube_print_sig_code'):
-                self._print_sig_code(func, s)
-            return func(s)
-        except Exception as e:
-            tb = traceback.format_exc()
-            raise ExtractorError(
-                'Signature extraction failed: ' + tb, cause=e)
-
-    def _extract_player_url(self, webpage):
-        player_url = self._search_regex(
-            r'"(?:PLAYER_JS_URL|jsUrl)"\s*:\s*"([^"]+)"',
-            webpage or '', 'player URL', fatal=False)
-        if not player_url:
-            return
-        if player_url.startswith('//'):
-            player_url = 'https:' + player_url
-        elif not re.match(r'https?://', player_url):
-            player_url = compat_urllib_parse.urljoin(
-                'https://www.youtube.com', player_url)
-        return player_url
+    def _decrypt_signature(self, s, video_id, player_url):
+        """Turn the encrypted s field into a working signature"""
+        extract_sig = self._cached(
+            self._extract_signature_function, 'sig', player_url, self._signature_cache_id(s))
+        func = extract_sig(video_id, player_url, s)
+        self._print_sig_code(func, s)
+        return func(s)
 
     # from yt-dlp
     # See also:
     # 1. https://github.com/ytdl-org/youtube-dl/issues/29326#issuecomment-894619419
     # 2. https://code.videolan.org/videolan/vlc/-/blob/4fb284e5af69aa9ac2100ccbdd3b88debec9987f/share/lua/playlist/youtube.lua#L116
     # 3. https://github.com/ytdl-org/youtube-dl/issues/30097#issuecomment-950157377
+    def _decrypt_nsig(self, n, video_id, player_url):
+        """Turn the encrypted n field into a working signature"""
+        if player_url is None:
+            raise ExtractorError('Cannot decrypt nsig without player_url')
+
+        try:
+            jsi, player_id, func_code = self._extract_n_function_code(video_id, player_url)
+        except ExtractorError as e:
+            raise ExtractorError('Unable to extract nsig jsi, player_id, func_codefunction code', cause=e)
+        if self.get_param('youtube_print_sig_code'):
+            self.to_screen('Extracted nsig function from {0}:\n{1}\n'.format(
+                player_id, func_code[1]))
+
+        try:
+            extract_nsig = self._cached(self._extract_n_function_from_code, 'nsig func', player_url)
+            ret = extract_nsig(jsi, func_code)(n)
+        except JSInterpreter.Exception as e:
+            self.report_warning(
+                '%s (%s %s)' % (
+                    self.__ie_msg(
+                        'Unable to decode n-parameter: download likely to be throttled'),
+                    error_to_compat_str(e),
+                    traceback.format_exc()))
+            return
+
+        self.write_debug('Decrypted nsig {0} => {1}'.format(n, ret))
+        return ret
+
     def _extract_n_function_name(self, jscode):
-        target = r'(?P<nfunc>[a-zA-Z_$][\w$]*)(?:\[(?P<idx>\d+)\])?'
-        nfunc_and_idx = self._search_regex(
-            r'\.get\("n"\)\)&&\(b=(%s)\([\w$]+\)' % (target, ),
-            jscode, 'Initial JS player n function name')
-        nfunc, idx = re.match(target, nfunc_and_idx).group('nfunc', 'idx')
+        func_name, idx = self._search_regex(
+            r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z_$][\w$]*)(?:\[(?P<idx>\d+)\])?\([\w$]+\)',
+            jscode, 'Initial JS player n function name', group=('nfunc', 'idx'))
         if not idx:
-            return nfunc
+            return func_name
 
-        VAR_RE_TMPL = r'var\s+%s\s*=\s*(?P<name>\[(?P<alias>%s)\])[;,]'
-        note = 'Initial JS player n function {0} (%s[%s])' % (nfunc, idx)
+        return self._parse_json(self._search_regex(
+            r'var {0}\s*=\s*(\[.+?\])\s*[,;]'.format(re.escape(func_name)), jscode,
+            'Initial JS player n function list ({0}.{1})'.format(func_name, idx)),
+            func_name, transform_source=js_to_json)[int(idx)]
 
-        def search_function_code(needle, group):
-            return self._search_regex(
-                VAR_RE_TMPL % (re.escape(nfunc), needle), jscode,
-                note.format(group), group=group)
+    def _extract_n_function_code(self, video_id, player_url):
+        player_id = self._extract_player_info(player_url)
+        func_code = self.cache.load('youtube-nsig', player_id)
+        jscode = func_code or self._load_player(video_id, player_url)
+        jsi = JSInterpreter(jscode)
 
-        if int_or_none(idx) == 0:
-            real_nfunc = search_function_code(r'[a-zA-Z_$][\w$]*', group='alias')
-            if real_nfunc:
-                return real_nfunc
-        return self._parse_json(
-            search_function_code('.+?', group='name'),
-            nfunc, transform_source=js_to_json)[int(idx)]
+        if func_code:
+            return jsi, player_id, func_code
 
-    def _extract_n_function(self, video_id, player_url):
-        player_id = self._extract_player_info(player_url)
-        func_code = self._downloader.cache.load('youtube-nsig', player_id)
+        func_name = self._extract_n_function_name(jscode)
 
+        # For redundancy
+        func_code = self._search_regex(
+            r'''(?xs)%s\s*=\s*function\s*\((?P<var>[\w$]+)\)\s*
+                     # NB: The end of the regex is intentionally kept strict
+                     {(?P<code>.+?}\s*return\ [\w$]+.join\(""\))};''' % func_name,
+            jscode, 'nsig function', group=('var', 'code'), default=None)
         if func_code:
-            jsi = JSInterpreter(func_code)
+            func_code = ([func_code[0]], func_code[1])
         else:
-            jscode = self._get_player_code(video_id, player_url, player_id)
-            funcname = self._extract_n_function_name(jscode)
-            jsi = JSInterpreter(jscode)
-            func_code = jsi.extract_function_code(funcname)
-            self._downloader.cache.store('youtube-nsig', player_id, func_code)
-
-        if self._downloader.params.get('youtube_print_sig_code'):
-            self.to_screen('Extracted nsig function from {0}:\n{1}\n'.format(player_id, func_code[1]))
-
-        return lambda s: jsi.extract_function_from_code(*func_code)([s])
-
-    def _n_descramble(self, n_param, player_url, video_id):
-        """Compute the response to YT's "n" parameter challenge,
-           or None
-
-        Args:
-        n_param     -- challenge string that is the value of the
-                       URL's "n" query parameter
-        player_url  -- URL of YT player JS
-        video_id
-        """
+            self.write_debug('Extracting nsig function with jsinterp')
+            func_code = jsi.extract_function_code(func_name)
 
-        sig_id = ('nsig_value', n_param)
-        if sig_id in self._player_cache:
-            return self._player_cache[sig_id]
+        self.cache.store('youtube-nsig', player_id, func_code)
+        return jsi, player_id, func_code
+
+    def _extract_n_function_from_code(self, jsi, func_code):
+        func = jsi.extract_function_from_code(*func_code)
+
+        def extract_nsig(s):
+            try:
+                ret = func([s])
+            except JSInterpreter.Exception:
+                raise
+            except Exception as e:
+                raise JSInterpreter.Exception(traceback.format_exc(), cause=e)
 
-        try:
-            player_id = ('nsig', player_url)
-            if player_id not in self._player_cache:
-                self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
-            func = self._player_cache[player_id]
-            ret = func(n_param)
             if ret.startswith('enhanced_except_'):
-                raise ExtractorError('Unhandled exception in decode')
-            self._player_cache[sig_id] = ret
-            if self._downloader.params.get('verbose', False):
-                self._downloader.to_screen('[debug] [%s] %s' % (self.IE_NAME, 'Decrypted nsig {0} => {1}'.format(n_param, self._player_cache[sig_id])))
-            return self._player_cache[sig_id]
-        except Exception as e:
-            self._downloader.report_warning(
-                '[%s] %s (%s %s)' % (
-                    self.IE_NAME,
-                    'Unable to decode n-parameter: download likely to be throttled',
-                    error_to_compat_str(e),
-                    traceback.format_exc()))
+                raise JSInterpreter.Exception('Signature function returned an exception')
+            return ret
+
+        return extract_nsig
+
+    def _unthrottle_format_urls(self, video_id, player_url, *formats):
+
+        def decrypt_nsig(n):
+            return self._cached(self._decrypt_nsig, 'nsig', n, player_url)
 
-    def _unthrottle_format_urls(self, video_id, player_url, formats):
         for fmt in formats:
             parsed_fmt_url = compat_urllib_parse.urlparse(fmt['url'])
             n_param = compat_parse_qs(parsed_fmt_url.query).get('n')
             if not n_param:
                 continue
             n_param = n_param[-1]
-            n_response = self._n_descramble(n_param, player_url, video_id)
+            n_response = decrypt_nsig(n_param)(n_param, video_id, player_url)
             if n_response is None:
                 # give up if descrambling failed
                 break
-            for fmt_dct in traverse_obj(fmt, (None, (None, ('fragments', Ellipsis))), expected_type=dict):
-                fmt_dct['url'] = update_url(
-                    fmt_dct['url'], query_update={'n': [n_response]})
+            fmt['url'] = update_url_query(fmt['url'], {'n': n_response})
 
     # from yt-dlp, with tweaks
     def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=False):
@@ -1708,16 +1732,16 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
         Extract signatureTimestamp (sts)
         Required to tell API what sig/player version is in use.
         """
-        sts = int_or_none(ytcfg.get('STS')) if isinstance(ytcfg, dict) else None
+        sts = traverse_obj(ytcfg, 'STS', expected_type=int)
         if not sts:
             # Attempt to extract from player
             if player_url is None:
                 error_msg = 'Cannot extract signature timestamp without player_url.'
                 if fatal:
                     raise ExtractorError(error_msg)
-                self._downloader.report_warning(error_msg)
+                self.report_warning(error_msg)
                 return
-            code = self._get_player_code(video_id, player_url)
+            code = self._load_player(video_id, player_url, fatal=fatal)
             sts = int_or_none(self._search_regex(
                 r'(?:signatureTimestamp|sts)\s*:\s*(?P<sts>[0-9]{5})', code or '',
                 'JS player signature timestamp', group='sts', fatal=fatal))
@@ -1733,12 +1757,18 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
         # cpn generation algorithm is reverse engineered from base.js.
         # In fact it works even with dummy cpn.
         CPN_ALPHABET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_'
-        cpn = ''.join((CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(0, 16)))
-
-        playback_url = update_url(
-            playback_url, query_update={
-                'ver': ['2'],
-                'cpn': [cpn],
+        cpn = ''.join(CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(0, 16))
+
+        # more consistent results setting it to right before the end
+        qs = parse_qs(playback_url)
+        video_length = '{0}'.format(float((qs.get('len') or ['1.5'])[0]) - 1)
+
+        playback_url = update_url_query(
+            playback_url, {
+                'ver': '2',
+                'cpn': cpn,
+                'cmt': video_length,
+                'el': 'detailpage',  # otherwise defaults to "shorts"
             })
 
         self._download_webpage(
@@ -1986,8 +2016,11 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
             else:
                 self.to_screen('Downloading just video %s because of --no-playlist' % video_id)
 
+        if not player_url:
+            player_url = self._extract_player_url(webpage)
+
         formats = []
-        itags = []
+        itags = collections.defaultdict(set)
         itag_qualities = {}
         q = qualities(['tiny', 'small', 'medium', 'large', 'hd720', 'hd1080', 'hd1440', 'hd2160', 'hd2880', 'highres'])
         CHUNK_SIZE = 10 << 20
@@ -2003,58 +2036,92 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
                 })
             } for range_start in range(0, f['filesize'], CHUNK_SIZE))
 
+        lower = lambda s: s.lower()
+
         for fmt in streaming_formats:
-            if fmt.get('targetDurationSec') or fmt.get('drmFamilies'):
+            if fmt.get('targetDurationSec'):
                 continue
 
             itag = str_or_none(fmt.get('itag'))
-            quality = fmt.get('quality')
-            if itag and quality:
+            audio_track = traverse_obj(fmt, ('audioTrack', T(dict))) or {}
+
+            quality = traverse_obj(fmt, ((
+                # The 3gp format (17) in android client has a quality of "small",
+                # but is actually worse than other formats
+                T(lambda _: 'tiny' if itag == 17 else None),
+                ('quality', T(lambda q: q if q and q != 'tiny' else None)),
+                ('audioQuality', T(lower)),
+                'quality'), T(txt_or_none)), get_all=False)
+            if quality and itag:
                 itag_qualities[itag] = quality
             # FORMAT_STREAM_TYPE_OTF(otf=1) requires downloading the init fragment
             # (adding `&sq=0` to the URL) and parsing emsg box to determine the
-            # number of fragment that would subsequently requested with (`&sq=N`)
+            # number of fragments that would subsequently be requested with (`&sq=N`)
             if fmt.get('type') == 'FORMAT_STREAM_TYPE_OTF':
                 continue
 
             fmt_url = fmt.get('url')
             if not fmt_url:
                 sc = compat_parse_qs(fmt.get('signatureCipher'))
-                fmt_url = url_or_none(try_get(sc, lambda x: x['url'][0]))
-                encrypted_sig = try_get(sc, lambda x: x['s'][0])
-                if not (sc and fmt_url and encrypted_sig):
+                fmt_url = traverse_obj(sc, ('url', -1, T(url_or_none)))
+                encrypted_sig = traverse_obj(sc, ('s', -1))
+                if not (fmt_url and encrypted_sig):
                     continue
+                player_url = player_url or self._extract_player_url(webpage)
                 if not player_url:
-                    player_url = self._extract_player_url(webpage)
-                if not player_url:
                     continue
-                signature = self._decrypt_signature(sc['s'][0], video_id, player_url)
-                sp = try_get(sc, lambda x: x['sp'][0]) or 'signature'
-                fmt_url += '&' + sp + '=' + signature
+                try:
+                    fmt_url = update_url_query(fmt_url, {
+                        traverse_obj(sc, ('sp', -1)) or 'signature':
+                            [self._decrypt_signature(encrypted_sig, video_id, player_url)],
+                    })
+                except ExtractorError as e:
+                    self.report_warning('Signature extraction failed: Some formats may be missing',
+                                        video_id=video_id, only_once=True)
+                    self.write_debug(error_to_compat_str(e), only_once=True)
+                    continue
 
-            if itag:
-                itags.append(itag)
-            tbr = float_or_none(
-                fmt.get('averageBitrate') or fmt.get('bitrate'), 1000)
+            language_preference = (
+                10 if audio_track.get('audioIsDefault')
+                else -10 if 'descriptive' in (traverse_obj(audio_track, ('displayName', T(lower))) or '')
+                else -1)
+            name = (
+                traverse_obj(fmt, ('qualityLabel', T(txt_or_none)))
+                or quality.replace('audio_quality_', ''))
             dct = {
-                'asr': int_or_none(fmt.get('audioSampleRate')),
-                'filesize': int_or_none(fmt.get('contentLength')),
-                'format_id': itag,
-                'format_note': fmt.get('qualityLabel') or quality,
-                'fps': int_or_none(fmt.get('fps')),
-                'height': int_or_none(fmt.get('height')),
-                'quality': q(quality),
-                'tbr': tbr,
+                'format_id': join_nonempty(itag, fmt.get('isDrc') and 'drc'),
                 'url': fmt_url,
-                'width': fmt.get('width'),
+                # Format 22 is likely to be damaged: see https://github.com/yt-dlp/yt-dlp/issues/3372
+                'source_preference': ((-5 if itag == '22' else -1)
+                                      + (100 if 'Premium' in name else 0)),
+                'quality': q(quality),
+                'language': join_nonempty(audio_track.get('id', '').split('.')[0],
+                                          'desc' if language_preference < -1 else '') or None,
+                'language_preference': language_preference,
+                # Strictly de-prioritize 3gp formats
+                'preference': -2 if itag == '17' else None,
             }
-            mimetype = fmt.get('mimeType')
-            if mimetype:
-                mobj = re.match(
-                    r'((?:[^/]+)/(?:[^;]+))(?:;\s*codecs="([^"]+)")?', mimetype)
-                if mobj:
-                    dct['ext'] = mimetype2ext(mobj.group(1))
-                    dct.update(parse_codecs(mobj.group(2)))
+            if itag:
+                itags[itag].add(('https', dct.get('language')))
+            self._unthrottle_format_urls(video_id, player_url, dct)
+            dct.update(traverse_obj(fmt, {
+                'asr': ('audioSampleRate', T(int_or_none)),
+                'filesize': ('contentLength', T(int_or_none)),
+                'format_note': ('qualityLabel', T(lambda x: x or quality)),
+                # for some formats, fps is wrongly returned as 1
+                'fps': ('fps', T(int_or_none), T(lambda f: f if f > 1 else None)),
+                'audio_channels': ('audioChannels', T(int_or_none)),
+                'height': ('height', T(int_or_none)),
+                'has_drm': ('drmFamilies', T(bool)),
+                'tbr': (('averageBitrate', 'bitrate'), T(lambda t: float_or_none(t, 1000))),
+                'width': ('width', T(int_or_none)),
+                '_duration_ms': ('approxDurationMs', T(int_or_none)),
+            }, get_all=False))
+            mime_mobj = re.match(
+                r'((?:[^/]+)/(?:[^;]+))(?:;\s*codecs="([^"]+)")?', fmt.get('mimeType') or '')
+            if mime_mobj:
+                dct['ext'] = mimetype2ext(mime_mobj.group(1))
+                dct.update(parse_codecs(mime_mobj.group(2)))
             single_stream = 'none' in (dct.get(c) for c in ('acodec', 'vcodec'))
             if single_stream and dct.get('ext'):
                 dct['container'] = dct['ext'] + '_dash'
@@ -2069,32 +2136,62 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
 
             formats.append(dct)
 
+        def process_manifest_format(f, proto, client_name, itag, all_formats=False):
+            key = (proto, f.get('language'))
+            if not all_formats and key in itags[itag]:
+                return False
+            itags[itag].add(key)
+
+            if itag:
+                f['format_id'] = (
+                    '{0}-{1}'.format(itag, proto)
+                    if all_formats or any(p != proto for p, _ in itags[itag])
+                    else itag)
+
+            if f.get('source_preference') is None:
+                f['source_preference'] = -1
+
+            if itag in ('616', '235'):
+                f['format_note'] = join_nonempty(f.get('format_note'), 'Premium', delim=' ')
+                f['source_preference'] += 100
+
+            f['quality'] = q(traverse_obj(f, (
+                'format_id', T(lambda s: itag_qualities[s.split('-')[0]])), default=-1))
+            if try_call(lambda: f['fps'] <= 1):
+                del f['fps']
+
+            if proto == 'hls' and f.get('has_drm'):
+                f['has_drm'] = 'maybe'
+                f['source_preference'] -= 5
+            return True
+
         hls_manifest_url = streaming_data.get('hlsManifestUrl')
         if hls_manifest_url:
             for f in self._extract_m3u8_formats(
                     hls_manifest_url, video_id, 'mp4', fatal=False):
-                itag = self._search_regex(
-                    r'/itag/(\d+)', f['url'], 'itag', default=None)
-                if itag:
-                    f['format_id'] = itag
-                formats.append(f)
+                if process_manifest_format(
+                        f, 'hls', None, self._search_regex(
+                            r'/itag/(\d+)', f['url'], 'itag', default=None)):
+                    formats.append(f)
 
         if self._downloader.params.get('youtube_include_dash_manifest', True):
             dash_manifest_url = streaming_data.get('dashManifestUrl')
             if dash_manifest_url:
                 for f in self._extract_mpd_formats(
                         dash_manifest_url, video_id, fatal=False):
-                    itag = f['format_id']
-                    if itag in itags:
-                        continue
-                    if itag in itag_qualities:
-                        f['quality'] = q(itag_qualities[itag])
-                    filesize = int_or_none(self._search_regex(
-                        r'/clen/(\d+)', f.get('fragment_base_url')
-                        or f['url'], 'file size', default=None))
-                    if filesize:
-                        f['filesize'] = filesize
-                    formats.append(f)
+                    if process_manifest_format(
+                            f, 'dash', None, f['format_id']):
+                        f['filesize'] = traverse_obj(f, (
+                            ('fragment_base_url', 'url'), T(lambda u: self._search_regex(
+                                r'/clen/(\d+)', u, 'file size', default=None)),
+                            T(int_or_none)), get_all=False)
+                        formats.append(f)
+
+        playable_formats = [f for f in formats if not f.get('has_drm')]
+        if formats and not playable_formats:
+            # If there are no formats that definitely don't have DRM, all have DRM
+            self.report_drm(video_id)
+        formats[:] = playable_formats
 
         if not formats:
             if streaming_data.get('licenseInfos'):
@@ -2166,6 +2263,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
             video_details.get('lengthSeconds')
             or microformat.get('lengthSeconds')) \
             or parse_duration(search_meta('duration'))
+
+        for f in formats:
+            # Some formats may have much smaller duration than others (possibly damaged during encoding)
+            # but avoid false positives with small duration differences.
+            # Ref: https://github.com/yt-dlp/yt-dlp/issues/2823
+            if try_call(lambda x: float(x.pop('_duration_ms')) / duration < 500, args=(f,)):
+                self.report_warning(
+                    '{0}: Some possibly damaged formats will be deprioritized'.format(video_id), only_once=True)
+                # Strictly de-prioritize damaged formats
+                f['preference'] = -10
+
         is_live = video_details.get('isLive')
 
         owner_profile_url = self._yt_urljoin(self._extract_author_var(
@@ -2174,10 +2282,6 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
         uploader = self._extract_author_var(
             webpage, 'name', videodetails=video_details, metadata=microformat)
 
-        if not player_url:
-            player_url = self._extract_player_url(webpage)
-        self._unthrottle_format_urls(video_id, player_url, formats)
-
         info = {
             'id': video_id,
             'title': self._live_title(video_title) if is_live else video_title,