youtube-dl

Another place where youtube-dl lives on
git clone git://git.oshgnacknak.de/youtube-dl.git
Log | Files | Refs | README | LICENSE

test_download.py (10162B)


      1 #!/usr/bin/env python
      2 
      3 from __future__ import unicode_literals
      4 
      5 # Allow direct execution
      6 import os
      7 import sys
      8 import unittest
      9 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
     10 
     11 from test.helper import (
     12     assertGreaterEqual,
     13     expect_warnings,
     14     get_params,
     15     gettestcases,
     16     expect_info_dict,
     17     try_rm,
     18     report_warning,
     19 )
     20 
     21 
     22 import hashlib
     23 import io
     24 import json
     25 import socket
     26 
     27 import youtube_dl.YoutubeDL
     28 from youtube_dl.compat import (
     29     compat_http_client,
     30     compat_urllib_error,
     31     compat_HTTPError,
     32 )
     33 from youtube_dl.utils import (
     34     DownloadError,
     35     ExtractorError,
     36     format_bytes,
     37     UnavailableVideoError,
     38 )
     39 from youtube_dl.extractor import get_info_extractor
     40 
     41 RETRIES = 3
     42 
     43 
     44 class YoutubeDL(youtube_dl.YoutubeDL):
     45     def __init__(self, *args, **kwargs):
     46         self.to_stderr = self.to_screen
     47         self.processed_info_dicts = []
     48         super(YoutubeDL, self).__init__(*args, **kwargs)
     49 
     50     def report_warning(self, message):
     51         # Don't accept warnings during tests
     52         raise ExtractorError(message)
     53 
     54     def process_info(self, info_dict):
     55         self.processed_info_dicts.append(info_dict)
     56         return super(YoutubeDL, self).process_info(info_dict)
     57 
     58 
     59 def _file_md5(fn):
     60     with open(fn, 'rb') as f:
     61         return hashlib.md5(f.read()).hexdigest()
     62 
     63 
     64 defs = gettestcases()
     65 
     66 
     67 class TestDownload(unittest.TestCase):
     68     # Parallel testing in nosetests. See
     69     # http://nose.readthedocs.org/en/latest/doc_tests/test_multiprocess/multiprocess.html
     70     _multiprocess_shared_ = True
     71 
     72     maxDiff = None
     73 
     74     def __str__(self):
     75         """Identify each test with the `add_ie` attribute, if available."""
     76 
     77         def strclass(cls):
     78             """From 2.7's unittest; 2.6 had _strclass so we can't import it."""
     79             return '%s.%s' % (cls.__module__, cls.__name__)
     80 
     81         add_ie = getattr(self, self._testMethodName).add_ie
     82         return '%s (%s)%s:' % (self._testMethodName,
     83                                strclass(self.__class__),
     84                                ' [%s]' % add_ie if add_ie else '')
     85 
     86     def setUp(self):
     87         self.defs = defs
     88 
     89 # Dynamically generate tests
     90 
     91 
     92 def generator(test_case, tname):
     93 
     94     def test_template(self):
     95         ie = youtube_dl.extractor.get_info_extractor(test_case['name'])()
     96         other_ies = [get_info_extractor(ie_key)() for ie_key in test_case.get('add_ie', [])]
     97         is_playlist = any(k.startswith('playlist') for k in test_case)
     98         test_cases = test_case.get(
     99             'playlist', [] if is_playlist else [test_case])
    100 
    101         def print_skipping(reason):
    102             print('Skipping %s: %s' % (test_case['name'], reason))
    103         if not ie.working():
    104             print_skipping('IE marked as not _WORKING')
    105             return
    106 
    107         for tc in test_cases:
    108             info_dict = tc.get('info_dict', {})
    109             if not (info_dict.get('id') and info_dict.get('ext')):
    110                 raise Exception('Test definition incorrect. The output file cannot be known. Are both \'id\' and \'ext\' keys present?')
    111 
    112         if 'skip' in test_case:
    113             print_skipping(test_case['skip'])
    114             return
    115         for other_ie in other_ies:
    116             if not other_ie.working():
    117                 print_skipping('test depends on %sIE, marked as not WORKING' % other_ie.ie_key())
    118                 return
    119 
    120         params = get_params(test_case.get('params', {}))
    121         params['outtmpl'] = tname + '_' + params['outtmpl']
    122         if is_playlist and 'playlist' not in test_case:
    123             params.setdefault('extract_flat', 'in_playlist')
    124             params.setdefault('skip_download', True)
    125 
    126         ydl = YoutubeDL(params, auto_init=False)
    127         ydl.add_default_info_extractors()
    128         finished_hook_called = set()
    129 
    130         def _hook(status):
    131             if status['status'] == 'finished':
    132                 finished_hook_called.add(status['filename'])
    133         ydl.add_progress_hook(_hook)
    134         expect_warnings(ydl, test_case.get('expected_warnings', []))
    135 
    136         def get_tc_filename(tc):
    137             return ydl.prepare_filename(tc.get('info_dict', {}))
    138 
    139         res_dict = None
    140 
    141         def try_rm_tcs_files(tcs=None):
    142             if tcs is None:
    143                 tcs = test_cases
    144             for tc in tcs:
    145                 tc_filename = get_tc_filename(tc)
    146                 try_rm(tc_filename)
    147                 try_rm(tc_filename + '.part')
    148                 try_rm(os.path.splitext(tc_filename)[0] + '.info.json')
    149         try_rm_tcs_files()
    150         try:
    151             try_num = 1
    152             while True:
    153                 try:
    154                     # We're not using .download here since that is just a shim
    155                     # for outside error handling, and returns the exit code
    156                     # instead of the result dict.
    157                     res_dict = ydl.extract_info(
    158                         test_case['url'],
    159                         force_generic_extractor=params.get('force_generic_extractor', False))
    160                 except (DownloadError, ExtractorError) as err:
    161                     # Check if the exception is not a network related one
    162                     if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError, compat_http_client.BadStatusLine) or (err.exc_info[0] == compat_HTTPError and err.exc_info[1].code == 503):
    163                         raise
    164 
    165                     if try_num == RETRIES:
    166                         report_warning('%s failed due to network errors, skipping...' % tname)
    167                         return
    168 
    169                     print('Retrying: {0} failed tries\n\n##########\n\n'.format(try_num))
    170 
    171                     try_num += 1
    172                 else:
    173                     break
    174 
    175             if is_playlist:
    176                 self.assertTrue(res_dict['_type'] in ['playlist', 'multi_video'])
    177                 self.assertTrue('entries' in res_dict)
    178                 expect_info_dict(self, res_dict, test_case.get('info_dict', {}))
    179 
    180             if 'playlist_mincount' in test_case:
    181                 assertGreaterEqual(
    182                     self,
    183                     len(res_dict['entries']),
    184                     test_case['playlist_mincount'],
    185                     'Expected at least %d in playlist %s, but got only %d' % (
    186                         test_case['playlist_mincount'], test_case['url'],
    187                         len(res_dict['entries'])))
    188             if 'playlist_count' in test_case:
    189                 self.assertEqual(
    190                     len(res_dict['entries']),
    191                     test_case['playlist_count'],
    192                     'Expected %d entries in playlist %s, but got %d.' % (
    193                         test_case['playlist_count'],
    194                         test_case['url'],
    195                         len(res_dict['entries']),
    196                     ))
    197             if 'playlist_duration_sum' in test_case:
    198                 got_duration = sum(e['duration'] for e in res_dict['entries'])
    199                 self.assertEqual(
    200                     test_case['playlist_duration_sum'], got_duration)
    201 
    202             # Generalize both playlists and single videos to unified format for
    203             # simplicity
    204             if 'entries' not in res_dict:
    205                 res_dict['entries'] = [res_dict]
    206 
    207             for tc_num, tc in enumerate(test_cases):
    208                 tc_res_dict = res_dict['entries'][tc_num]
    209                 # First, check test cases' data against extracted data alone
    210                 expect_info_dict(self, tc_res_dict, tc.get('info_dict', {}))
    211                 # Now, check downloaded file consistency
    212                 tc_filename = get_tc_filename(tc)
    213                 if not test_case.get('params', {}).get('skip_download', False):
    214                     self.assertTrue(os.path.exists(tc_filename), msg='Missing file ' + tc_filename)
    215                     self.assertTrue(tc_filename in finished_hook_called)
    216                     expected_minsize = tc.get('file_minsize', 10000)
    217                     if expected_minsize is not None:
    218                         if params.get('test'):
    219                             expected_minsize = max(expected_minsize, 10000)
    220                         got_fsize = os.path.getsize(tc_filename)
    221                         assertGreaterEqual(
    222                             self, got_fsize, expected_minsize,
    223                             'Expected %s to be at least %s, but it\'s only %s ' %
    224                             (tc_filename, format_bytes(expected_minsize),
    225                                 format_bytes(got_fsize)))
    226                     if 'md5' in tc:
    227                         md5_for_file = _file_md5(tc_filename)
    228                         self.assertEqual(tc['md5'], md5_for_file)
    229                 # Finally, check test cases' data again but this time against
    230                 # extracted data from info JSON file written during processing
    231                 info_json_fn = os.path.splitext(tc_filename)[0] + '.info.json'
    232                 self.assertTrue(
    233                     os.path.exists(info_json_fn),
    234                     'Missing info file %s' % info_json_fn)
    235                 with io.open(info_json_fn, encoding='utf-8') as infof:
    236                     info_dict = json.load(infof)
    237                 expect_info_dict(self, info_dict, tc.get('info_dict', {}))
    238         finally:
    239             try_rm_tcs_files()
    240             if is_playlist and res_dict is not None and res_dict.get('entries'):
    241                 # Remove all other files that may have been extracted if the
    242                 # extractor returns full results even with extract_flat
    243                 res_tcs = [{'info_dict': e} for e in res_dict['entries']]
    244                 try_rm_tcs_files(res_tcs)
    245 
    246     return test_template
    247 
    248 
    249 # And add them to TestDownload
    250 for n, test_case in enumerate(defs):
    251     tname = 'test_' + str(test_case['name'])
    252     i = 1
    253     while hasattr(TestDownload, tname):
    254         tname = 'test_%s_%d' % (test_case['name'], i)
    255         i += 1
    256     test_method = generator(test_case, tname)
    257     test_method.__name__ = str(tname)
    258     ie_list = test_case.get('add_ie')
    259     test_method.add_ie = ie_list and ','.join(ie_list)
    260     setattr(TestDownload, test_method.__name__, test_method)
    261     del test_method
    262 
    263 
    264 if __name__ == '__main__':
    265     unittest.main()