test_http.py (6082B)
1 #!/usr/bin/env python 2 # coding: utf-8 3 from __future__ import unicode_literals 4 5 # Allow direct execution 6 import os 7 import sys 8 import unittest 9 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 10 11 from test.helper import http_server_port 12 from youtube_dl import YoutubeDL 13 from youtube_dl.compat import compat_http_server, compat_urllib_request 14 import ssl 15 import threading 16 17 TEST_DIR = os.path.dirname(os.path.abspath(__file__)) 18 19 20 class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): 21 def log_message(self, format, *args): 22 pass 23 24 def do_GET(self): 25 if self.path == '/video.html': 26 self.send_response(200) 27 self.send_header('Content-Type', 'text/html; charset=utf-8') 28 self.end_headers() 29 self.wfile.write(b'<html><video src="/vid.mp4" /></html>') 30 elif self.path == '/vid.mp4': 31 self.send_response(200) 32 self.send_header('Content-Type', 'video/mp4') 33 self.end_headers() 34 self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]') 35 elif self.path == '/302': 36 if sys.version_info[0] == 3: 37 # XXX: Python 3 http server does not allow non-ASCII header values 38 self.send_response(404) 39 self.end_headers() 40 return 41 42 new_url = 'http://127.0.0.1:%d/中文.html' % http_server_port(self.server) 43 self.send_response(302) 44 self.send_header(b'Location', new_url.encode('utf-8')) 45 self.end_headers() 46 elif self.path == '/%E4%B8%AD%E6%96%87.html': 47 self.send_response(200) 48 self.send_header('Content-Type', 'text/html; charset=utf-8') 49 self.end_headers() 50 self.wfile.write(b'<html><video src="/vid.mp4" /></html>') 51 else: 52 assert False 53 54 55 class FakeLogger(object): 56 def debug(self, msg): 57 pass 58 59 def warning(self, msg): 60 pass 61 62 def error(self, msg): 63 pass 64 65 66 class TestHTTP(unittest.TestCase): 67 def setUp(self): 68 self.httpd = compat_http_server.HTTPServer( 69 ('127.0.0.1', 0), HTTPTestRequestHandler) 70 self.port = http_server_port(self.httpd) 71 self.server_thread = threading.Thread(target=self.httpd.serve_forever) 72 self.server_thread.daemon = True 73 self.server_thread.start() 74 75 def test_unicode_path_redirection(self): 76 # XXX: Python 3 http server does not allow non-ASCII header values 77 if sys.version_info[0] == 3: 78 return 79 80 ydl = YoutubeDL({'logger': FakeLogger()}) 81 r = ydl.extract_info('http://127.0.0.1:%d/302' % self.port) 82 self.assertEqual(r['entries'][0]['url'], 'http://127.0.0.1:%d/vid.mp4' % self.port) 83 84 85 class TestHTTPS(unittest.TestCase): 86 def setUp(self): 87 certfn = os.path.join(TEST_DIR, 'testcert.pem') 88 self.httpd = compat_http_server.HTTPServer( 89 ('127.0.0.1', 0), HTTPTestRequestHandler) 90 self.httpd.socket = ssl.wrap_socket( 91 self.httpd.socket, certfile=certfn, server_side=True) 92 self.port = http_server_port(self.httpd) 93 self.server_thread = threading.Thread(target=self.httpd.serve_forever) 94 self.server_thread.daemon = True 95 self.server_thread.start() 96 97 def test_nocheckcertificate(self): 98 if sys.version_info >= (2, 7, 9): # No certificate checking anyways 99 ydl = YoutubeDL({'logger': FakeLogger()}) 100 self.assertRaises( 101 Exception, 102 ydl.extract_info, 'https://127.0.0.1:%d/video.html' % self.port) 103 104 ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True}) 105 r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port) 106 self.assertEqual(r['entries'][0]['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port) 107 108 109 def _build_proxy_handler(name): 110 class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): 111 proxy_name = name 112 113 def log_message(self, format, *args): 114 pass 115 116 def do_GET(self): 117 self.send_response(200) 118 self.send_header('Content-Type', 'text/plain; charset=utf-8') 119 self.end_headers() 120 self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8')) 121 return HTTPTestRequestHandler 122 123 124 class TestProxy(unittest.TestCase): 125 def setUp(self): 126 self.proxy = compat_http_server.HTTPServer( 127 ('127.0.0.1', 0), _build_proxy_handler('normal')) 128 self.port = http_server_port(self.proxy) 129 self.proxy_thread = threading.Thread(target=self.proxy.serve_forever) 130 self.proxy_thread.daemon = True 131 self.proxy_thread.start() 132 133 self.geo_proxy = compat_http_server.HTTPServer( 134 ('127.0.0.1', 0), _build_proxy_handler('geo')) 135 self.geo_port = http_server_port(self.geo_proxy) 136 self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever) 137 self.geo_proxy_thread.daemon = True 138 self.geo_proxy_thread.start() 139 140 def test_proxy(self): 141 geo_proxy = '127.0.0.1:{0}'.format(self.geo_port) 142 ydl = YoutubeDL({ 143 'proxy': '127.0.0.1:{0}'.format(self.port), 144 'geo_verification_proxy': geo_proxy, 145 }) 146 url = 'http://foo.com/bar' 147 response = ydl.urlopen(url).read().decode('utf-8') 148 self.assertEqual(response, 'normal: {0}'.format(url)) 149 150 req = compat_urllib_request.Request(url) 151 req.add_header('Ytdl-request-proxy', geo_proxy) 152 response = ydl.urlopen(req).read().decode('utf-8') 153 self.assertEqual(response, 'geo: {0}'.format(url)) 154 155 def test_proxy_with_idn(self): 156 ydl = YoutubeDL({ 157 'proxy': '127.0.0.1:{0}'.format(self.port), 158 }) 159 url = 'http://中文.tw/' 160 response = ydl.urlopen(url).read().decode('utf-8') 161 # b'xn--fiq228c' is '中文'.encode('idna') 162 self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') 163 164 165 if __name__ == '__main__': 166 unittest.main()