[utils] Handle user:pass in URLs (#28801)
authorHubert Hirtz <hubert@hirtz.pm>
Mon, 4 Mar 2024 01:27:55 +0000 (01:27 +0000)
committerGitHub <noreply@github.com>
Mon, 4 Mar 2024 01:27:55 +0000 (01:27 +0000)
* Handle user:pass in URLs

Fixes "nonnumeric port" errors when youtube-dl is given URLs with
usernames and passwords such as:

    http://username:password@example.com/myvideo.mp4

Refs:
- https://en.wikipedia.org/wiki/Basic_access_authentication
- https://tools.ietf.org/html/rfc1738#section-3.1
- https://docs.python.org/3.8/library/urllib.parse.html#urllib.parse.urlsplit

Fixes #18276 (point 4)
Fixes #20258
Fixes #26211 (see comment)

* Align code with yt-dlp

---------

Co-authored-by: dirkf <fieldhouse@gmx.net>
test/test_utils.py
youtube_dl/utils.py

index 102420fcb882961f3faf384a89f4ec7d559747e4..90d64b5811e117595995e30418579b7b65f1e666 100644 (file)
@@ -81,6 +81,7 @@ from youtube_dl.utils import (
     sanitize_filename,
     sanitize_path,
     sanitize_url,
+    sanitized_Request,
     shell_quote,
     smuggle_url,
     str_or_none,
@@ -255,6 +256,18 @@ class TestUtil(unittest.TestCase):
         self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar')
         self.assertEqual(sanitize_url('foo bar'), 'foo bar')
 
+    def test_sanitized_Request(self):
+        self.assertFalse(sanitized_Request('http://foo.bar').has_header('Authorization'))
+        self.assertFalse(sanitized_Request('http://:foo.bar').has_header('Authorization'))
+        self.assertEqual(sanitized_Request('http://@foo.bar').get_header('Authorization'),
+                         'Basic Og==')
+        self.assertEqual(sanitized_Request('http://:pass@foo.bar').get_header('Authorization'),
+                         'Basic OnBhc3M=')
+        self.assertEqual(sanitized_Request('http://user:@foo.bar').get_header('Authorization'),
+                         'Basic dXNlcjo=')
+        self.assertEqual(sanitized_Request('http://user:pass@foo.bar').get_header('Authorization'),
+                         'Basic dXNlcjpwYXNz')
+
     def test_expand_path(self):
         def env(var):
             return '%{0}%'.format(var) if sys.platform == 'win32' else '${0}'.format(var)
index 61b94d84c44e54488bde08f30f53e6bf51be5b14..c249e71681dc2e33e3283ca6e79ae8fef91c8ebf 100644 (file)
@@ -2182,8 +2182,28 @@ def sanitize_url(url):
     return url
 
 
+def extract_basic_auth(url):
+    parts = compat_urllib_parse.urlsplit(url)
+    if parts.username is None:
+        return url, None
+    url = compat_urllib_parse.urlunsplit(parts._replace(netloc=(
+        parts.hostname if parts.port is None
+        else '%s:%d' % (parts.hostname, parts.port))))
+    auth_payload = base64.b64encode(
+        ('%s:%s' % (parts.username, parts.password or '')).encode('utf-8'))
+    return url, 'Basic {0}'.format(auth_payload.decode('ascii'))
+
+
 def sanitized_Request(url, *args, **kwargs):
-    return compat_urllib_request.Request(escape_url(sanitize_url(url)), *args, **kwargs)
+    url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
+    if auth_header is not None:
+        headers = args[1] if len(args) > 1 else kwargs.get('headers')
+        headers = headers or {}
+        headers['Authorization'] = auth_header
+        if len(args) <= 1 and kwargs.get('headers') is None:
+            kwargs['headers'] = headers
+            kwargs = compat_kwargs(kwargs)
+    return compat_urllib_request.Request(url, *args, **kwargs)
 
 
 def expand_path(s):