import collections
import errno
-import itertools
import io
+import itertools
import json
-import operator
import os.path
import re
import socket
import string
import struct
import traceback
+import xml.etree.ElementTree
import zlib
from .common import InfoExtractor, SearchInfoExtractor
if ose.errno != errno.EEXIST:
raise
write_json_file(cache_spec, cache_fn)
- except Exception as e:
+ except Exception:
tb = traceback.format_exc()
self._downloader.report_warning(
u'Writing cache to %r failed: %s' % (cache_fn, tb))
return u's[%s%s%s]' % (starts, ends, steps)
step = None
+ start = '(Never used)' # Quelch pyflakes warnings - start will be
+ # set as soon as step is set
for i, prev in zip(idxs[1:], idxs[:-1]):
if step is not None:
if i - prev == step:
def interpret_statement(stmt, local_vars, allow_recursion=20):
if allow_recursion < 0:
- raise ExctractorError(u'Recursion limit reached')
+ raise ExtractorError(u'Recursion limit reached')
if stmt.startswith(u'var '):
stmt = stmt[len(u'var '):]
v = - ((v ^ 0xffffffff) + 1)
return v
- def string(reader=None):
+ def read_string(reader=None):
if reader is None:
reader = code_reader
slen = u30(reader)
return res
# minor_version + major_version
- _ = read_bytes(2 + 2)
+ read_bytes(2 + 2)
# Constant pool
int_count = u30()
for _c in range(1, int_count):
- _ = s32()
+ s32()
uint_count = u30()
for _c in range(1, uint_count):
- _ = u32()
+ u32()
double_count = u30()
- _ = read_bytes((double_count-1) * 8)
+ read_bytes((double_count-1) * 8)
string_count = u30()
constant_strings = [u'']
for _c in range(1, string_count):
- s = string()
+ s = read_string()
constant_strings.append(s)
namespace_count = u30()
for _c in range(1, namespace_count):
- _ = read_bytes(1) # kind
- _ = u30() # name
+ read_bytes(1) # kind
+ u30() # name
ns_set_count = u30()
for _c in range(1, ns_set_count):
count = u30()
for _c2 in range(count):
- _ = u30()
+ u30()
multiname_count = u30()
MULTINAME_SIZES = {
0x07: 2, # QName
kind = u30()
assert kind in MULTINAME_SIZES, u'Invalid multiname kind %r' % kind
if kind == 0x07:
- namespace_idx = u30()
+ u30() # namespace_idx
name_idx = u30()
multinames.append(constant_strings[name_idx])
else:
multinames.append('[MULTINAME kind: %d]' % kind)
for _c2 in range(MULTINAME_SIZES[kind]):
- _ = u30()
+ u30()
# Methods
method_count = u30()
method_infos = []
for method_id in range(method_count):
param_count = u30()
- _ = u30() # return type
+ u30() # return type
for _ in range(param_count):
- _ = u30() # param type
- _ = u30() # name index (always 0 for youtube)
+ u30() # param type
+ u30() # name index (always 0 for youtube)
flags = read_byte()
if flags & 0x08 != 0:
# Options present
option_count = u30()
for c in range(option_count):
- _ = u30() # val
- _ = read_bytes(1) # kind
+ u30() # val
+ read_bytes(1) # kind
if flags & 0x80 != 0:
# Param names present
for _ in range(param_count):
- _ = u30() # param name
+ u30() # param name
mi = MethodInfo(flags & 0x01 != 0, flags & 0x04 != 0)
method_infos.append(mi)
# Metadata
metadata_count = u30()
for _c in range(metadata_count):
- _ = u30() # name
+ u30() # name
item_count = u30()
for _c2 in range(item_count):
- _ = u30() # key
- _ = u30() # value
+ u30() # key
+ u30() # value
def parse_traits_info():
trait_name_idx = u30()
attrs = kind_full >> 4
methods = {}
if kind in [0x00, 0x06]: # Slot or Const
- _ = u30() # Slot id
- type_name_idx = u30()
+ u30() # Slot id
+ u30() # type_name_idx
vindex = u30()
if vindex != 0:
- _ = read_byte() # vkind
+ read_byte() # vkind
elif kind in [0x01, 0x02, 0x03]: # Method / Getter / Setter
- _ = u30() # disp_id
+ u30() # disp_id
method_idx = u30()
methods[multinames[trait_name_idx]] = method_idx
elif kind == 0x04: # Class
- _ = u30() # slot_id
- _ = u30() # classi
+ u30() # slot_id
+ u30() # classi
elif kind == 0x05: # Function
- _ = u30() # slot_id
+ u30() # slot_id
function_idx = u30()
methods[function_idx] = multinames[trait_name_idx]
else:
if attrs & 0x4 != 0: # Metadata present
metadata_count = u30()
for _c3 in range(metadata_count):
- _ = u30()
+ u30() # metadata index
return methods
if name_idx == searched_idx:
# We found the class we're looking for!
searched_class_id = class_id
- _ = u30() # super_name idx
+ u30() # super_name idx
flags = read_byte()
if flags & 0x08 != 0: # Protected namespace is present
- protected_ns_idx = u30()
+ u30() # protected_ns_idx
intrf_count = u30()
for _c2 in range(intrf_count):
- _ = u30()
- _ = u30() # iinit
+ u30()
+ u30() # iinit
trait_count = u30()
for _c2 in range(trait_count):
- _ = parse_traits_info()
+ parse_traits_info()
if searched_class_id is None:
raise ExtractorError(u'Target class %r not found' %
method_names = {}
method_idxs = {}
for class_id in range(class_count):
- _ = u30() # cinit
+ u30() # cinit
trait_count = u30()
for _c2 in range(trait_count):
trait_methods = parse_traits_info()
# Scripts
script_count = u30()
for _c in range(script_count):
- _ = u30() # init
+ u30() # init
trait_count = u30()
for _c2 in range(trait_count):
- _ = parse_traits_info()
+ parse_traits_info()
# Method bodies
method_body_count = u30()
methods = {}
for _c in range(method_body_count):
method_idx = u30()
- max_stack = u30()
+ u30() # max_stack
local_count = u30()
- init_scope_depth = u30()
- max_scope_depth = u30()
+ u30() # init_scope_depth
+ u30() # max_scope_depth
code_length = u30()
code = read_bytes(code_length)
if method_idx in method_idxs:
methods[method_idxs[method_idx]] = m
exception_count = u30()
for _c2 in range(exception_count):
- _ = u30() # from
- _ = u30() # to
- _ = u30() # target
- _ = u30() # exc_type
- _ = u30() # var_name
+ u30() # from
+ u30() # to
+ u30() # target
+ u30() # exc_type
+ u30() # var_name
trait_count = u30()
for _c2 in range(trait_count):
- _ = parse_traits_info()
+ parse_traits_info()
assert p + code_reader.tell() == len(code_tag)
assert len(methods) == len(method_idxs)
assert isinstance(obj, list)
stack.append(obj[idx])
elif opcode == 128: # coerce
- _ = u30(coder)
+ u30(coder)
elif opcode == 133: # coerce_s
assert isinstance(stack[-1], (type(None), compat_str))
elif opcode == 164: # modulo
if self._downloader.params.get('youtube_print_sig_code'):
self._print_sig_code(func, len(s))
return func(s)
- except Exception as e:
+ except Exception:
tb = traceback.format_exc()
self._downloader.report_warning(
u'Automatic signature extraction failed: ' + tb)