sanitized_Request(
self._test_url('content-encoding'),
headers={'ytdl-encoding': encoding}))
- self.assertEqual(res.headers.get('Content-Encoding'), encoding)
+ # decoded encodings are removed: only check for valid decompressed data
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
@unittest.skipUnless(brotli, 'brotli support is not installed')
- @unittest.expectedFailure
def test_brotli(self):
self.__test_compression('br')
- @unittest.expectedFailure
def test_deflate(self):
self.__test_compression('deflate')
- @unittest.expectedFailure
def test_gzip(self):
self.__test_compression('gzip')
- @unittest.expectedFailure # not yet implemented
def test_multiple_encodings(self):
# https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
- with FakeYDL() as ydl:
- for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
- res = ydl.urlopen(
- sanitized_Request(
- self._test_url('content-encoding'),
- headers={'ytdl-encoding': pair}))
- self.assertEqual(res.headers.get('Content-Encoding'), pair)
- self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
+ for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
+ self.__test_compression(pair)
def test_unsupported_encoding(self):
# it should return the raw content
import email.header
import errno
import functools
-import gzip
import inspect
import io
import itertools
compat_HTMLParseError,
compat_HTMLParser,
compat_basestring,
+ compat_brotli as brotli,
compat_casefold,
compat_chr,
compat_collections_abc,
compat_http_client,
compat_integer_types,
compat_kwargs,
+ compat_ncompress as ncompress,
compat_os_name,
compat_re_Match,
compat_re_Pattern,
req)
@staticmethod
- def deflate(data):
+ def deflate_gz(data):
try:
- return zlib.decompress(data, -zlib.MAX_WBITS)
+ # format:zlib,gzip + windowsize:32768
+ return data and zlib.decompress(data, 32 + zlib.MAX_WBITS)
except zlib.error:
- return zlib.decompress(data)
+ # raw zlib * windowsize:32768 (RFC 9110: "non-conformant")
+ return zlib.decompress(data, -zlib.MAX_WBITS)
+
+ @staticmethod
+ def gzip(data):
+
+ from gzip import GzipFile
+
+ def _gzip(data):
+ with io.BytesIO(data) as data_buf:
+ gz = GzipFile(fileobj=data_buf, mode='rb')
+ return gz.read()
+
+ try:
+ return _gzip(data)
+ except IOError as original_ioerror:
+ # There may be junk at the end of the file
+ # See http://stackoverflow.com/q/4928560/35070 for details
+ for i in range(1, 1024):
+ try:
+ return _gzip(data[:-i])
+ except IOError:
+ continue
+ else:
+ raise original_ioerror
+
+ @staticmethod
+ def brotli(data):
+ return data and brotli.decompress(data)
+
+ @staticmethod
+ def compress(data):
+ return data and ncompress.decompress(data)
def http_request(self, req):
# According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
def http_response(self, req, resp):
old_resp = resp
- # gzip
- if resp.headers.get('Content-encoding', '') == 'gzip':
- content = resp.read()
- gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
- try:
- uncompressed = io.BytesIO(gz.read())
- except IOError as original_ioerror:
- # There may be junk at the end of the file
- # See http://stackoverflow.com/q/4928560/35070 for details
- for i in range(1, 1024):
- try:
- gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
- uncompressed = io.BytesIO(gz.read())
- except IOError:
- continue
- break
- else:
- raise original_ioerror
- resp = compat_urllib_request.addinfourl(uncompressed, old_resp.headers, old_resp.url, old_resp.code)
- resp.msg = old_resp.msg
- del resp.headers['Content-encoding']
- # deflate
- if resp.headers.get('Content-encoding', '') == 'deflate':
- gz = io.BytesIO(self.deflate(resp.read()))
- resp = compat_urllib_request.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
+
+ # Content-Encoding header lists the encodings in order that they were applied [1].
+ # To decompress, we simply do the reverse.
+ # [1]: https://datatracker.ietf.org/doc/html/rfc9110#name-content-encoding
+ decoded_response = None
+ decoders = {
+ 'gzip': self.deflate_gz,
+ 'deflate': self.deflate_gz,
+ }
+ if brotli:
+ decoders['br'] = self.brotli
+ if ncompress:
+ decoders['compress'] = self.compress
+ if sys.platform.startswith('java'):
+ # Jython zlib implementation misses gzip
+ decoders['gzip'] = self.gzip
+
+ def encodings(hdrs):
+ # A header field that allows multiple values can have multiple instances [2].
+ # [2]: https://datatracker.ietf.org/doc/html/rfc9110#name-fields
+ for e in reversed(','.join(hdrs).split(',')):
+ if e:
+ yield e.strip()
+
+ encodings_left = []
+ try:
+ resp.headers.get_all
+ hdrs = resp.headers
+ except AttributeError:
+ # Py2 has no get_all() method: headers are rfc822.Message
+ from email.message import Message
+ hdrs = Message()
+ for k, v in resp.headers.items():
+ hdrs[k] = v
+
+ decoder, decoded_response = True, None
+ for encoding in encodings(hdrs.get_all('Content-Encoding', [])):
+ # "SHOULD consider" x-compress, x-gzip as compress, gzip
+ decoder = decoder and decoders.get(remove_start(encoding, 'x-'))
+ if not decoder:
+ encodings_left.insert(0, encoding)
+ continue
+ decoded_response = decoder(decoded_response or resp.read())
+ if decoded_response is not None:
+ resp = compat_urllib_request.addinfourl(
+ io.BytesIO(decoded_response), old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
- del resp.headers['Content-encoding']
+ del resp.headers['Content-Length']
+ resp.headers['Content-Length'] = '%d' % len(decoded_response)
+ del resp.headers['Content-Encoding']
+ if encodings_left:
+ resp.headers['Content-Encoding'] = ', '.join(encodings_left)
+
# Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
# https://github.com/ytdl-org/youtube-dl/issues/6457).
if 300 <= resp.code < 400: