sanitize_filename,
sanitize_path,
sanitize_url,
+ sanitized_Request,
shell_quote,
smuggle_url,
str_or_none,
self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar')
self.assertEqual(sanitize_url('foo bar'), 'foo bar')
+ def test_sanitized_Request(self):
+ self.assertFalse(sanitized_Request('http://foo.bar').has_header('Authorization'))
+ self.assertFalse(sanitized_Request('http://:foo.bar').has_header('Authorization'))
+ self.assertEqual(sanitized_Request('http://@foo.bar').get_header('Authorization'),
+ 'Basic Og==')
+ self.assertEqual(sanitized_Request('http://:pass@foo.bar').get_header('Authorization'),
+ 'Basic OnBhc3M=')
+ self.assertEqual(sanitized_Request('http://user:@foo.bar').get_header('Authorization'),
+ 'Basic dXNlcjo=')
+ self.assertEqual(sanitized_Request('http://user:pass@foo.bar').get_header('Authorization'),
+ 'Basic dXNlcjpwYXNz')
+
def test_expand_path(self):
def env(var):
return '%{0}%'.format(var) if sys.platform == 'win32' else '${0}'.format(var)
return url
+def extract_basic_auth(url):
+ parts = compat_urllib_parse.urlsplit(url)
+ if parts.username is None:
+ return url, None
+ url = compat_urllib_parse.urlunsplit(parts._replace(netloc=(
+ parts.hostname if parts.port is None
+ else '%s:%d' % (parts.hostname, parts.port))))
+ auth_payload = base64.b64encode(
+ ('%s:%s' % (parts.username, parts.password or '')).encode('utf-8'))
+ return url, 'Basic {0}'.format(auth_payload.decode('ascii'))
+
+
def sanitized_Request(url, *args, **kwargs):
- return compat_urllib_request.Request(escape_url(sanitize_url(url)), *args, **kwargs)
+ url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
+ if auth_header is not None:
+ headers = args[1] if len(args) > 1 else kwargs.get('headers')
+ headers = headers or {}
+ headers['Authorization'] = auth_header
+ if len(args) <= 1 and kwargs.get('headers') is None:
+ kwargs['headers'] = headers
+ kwargs = compat_kwargs(kwargs)
+ return compat_urllib_request.Request(url, *args, **kwargs)
def expand_path(s):