All pastes #1523340 Raw Edit

torrents.py

public python v1 · immutable
#1523340 ·published 2009-08-09 22:21 UTC
rendered paste body
#!/usr/bin/python3.1 -O'''    Library for making torrent files'''from functools import reducefrom hashlib   import sha1from operator  import addfrom os        import stat, listdirfrom os.path   import isdir, join as pathjoin, basenamefrom time      import time# As recommended BEP 003PIECE_LENGTH = 2 ** 18#### File hashing helper functions and classes###class ChunkedReader(object):    ''' An object to read many bite-size pieces of files at once '''    def __init__(self, fnames, piece_size):        ''' Takes a list of file names to read from.  Opens the first for an        eventual call to read(len). '''        self.fnames = fnames        self._open_new()        self.piece_size = piece_size    def _open_new(self):        ''' Open the next file in the list for reading, allowing exceptions for        missing files to propogate '''        self.f = open(self.fnames[0], 'rb')        self.fnames = self.fnames[1:]    def __iter__(self):        ''' Creates an iterator for the files, calling read() until empty '''        class ChunkIterator(object):            def __init__(self, f):                self.f = f            def __next__(self):                data = self.f.read()                if not len(data):                    raise StopIteration()                return data        # Python 2 iteration        ChunkIterator.next = ChunkIterator.__next__        return ChunkIterator(self)    def read(self):        ''' Reads (length) bytes from the file list; will always return exactly        (length) bytes unless it is at the very end of the last file. '''        b = bytearray(self.f.read(self.piece_size))        try:            while len(b) < self.piece_size:                self._open_new()                b.extend(self.f.read(self.piece_size - len(b)))        finally:            return bytes(b)# When uploading a directory, BitTorrent requires a file list consisting of a# list of dictionaries with length=(length) and path=(list, of, dirnames).def build_filelist(target, path=()):    l = []    for i in listdir(pathjoin(target, *path)):        this = path + (i,)        # Try to recurse directories        try:            l += build_filelist(target, this)        # Not a directory        except:            l.append({                'length': stat(pathjoin(target, *this)).st_size,                'path':   this            })    return l#### BitTorrent encoding/decoding functions###def bencode(s, encoding="utf-8"):    ''' bencodes a set of Python objects.  Behavior only defined for list,    dict, tuple, str, int, bytes, bytearray, and dict. Returns a bytes    object that can be written directly into a .torrent file'''    actions = {        list : lambda l : b''.join((            b'l',            b''.join(map(bencode, l)),            b'e'        )),        dict : lambda d : b''.join((            b'd',            b''.join(map(bencode, reduce(add, d.items()))),            b'e'        )),        str  : lambda s : bytes('%s:%s' % (len(s), s), encoding=encoding),        bytearray: lambda s : b':'.join((            bytes(str(len(s)), encoding=encoding),            bytes(s)        )),        int  : lambda i : b''.join((            b'i',            bytes(str(i), encoding=encoding),            b'e'        ))    }    # Aliases for types that work roughly the same    actions[tuple] = actions[list]    actions[bytes] = actions[str]    return actions[type(s)](s)def _bdecode_int(data):    value, data = data[1:].split(b'e', 1)    value = int(value)    return (value, data)def _bdecode_list(data):    data = data[1:]    l = []    while not data.startswith(b'e'):        value, data = _bdecode(data)        l.append(value)    return l, data[1:]def _bdecode_dict(data):    l, data = _bdecode_list(data)    d = dict(zip(l[::2], l[1::2]))    return d, datadef _bdecode(data, encoding="utf-8"):    if data.startswith(b'l'):        return _bdecode_list(data)    elif data.startswith(b'd'):        return _bdecode_dict(data)    elif data.startswith(b'i'):        return _bdecode_int(data)    # String data    length, data = data.split(b':', 1)    length = int(length)    try:        return str(data[:length], encoding=encoding), data[length:]    except:        return data[:length], data[length:]def bdecode(data, encoding="utf-8", ignore_errors=False):    data_struct, data = _bdecode(data, encoding)    if len(data) and not ignore_errors:        raise ValueError("Junk data at the end of input")    else:        return data_structdef save(data, filename):    ''' Save the given data to the given filename.  Will bencode data if    required. '''    if not isinstance(data, bytes):        data = bencode(data)    open(filename, 'wb').write(data)def load(filename, encoding="utf-8"):    ''' Load the given filename, using the given codec to decode strings '''    return bdecode(open(filename, 'rb').read(), encoding)def make(target, announce, **kwargs):    ''' Create a torrent file from the given directory target (will recurse    subdirectories if necessary), using the given announce URL.    target: Directory or file to make a torrent metafile for    announce: bittorrent announce URL for this torrent    optional keyword args:        piece_length: size of one piece, per bittorrent protocol        private: whether to mark the torrent as private (default: yes)        filename: filename to save the torrent to (will not return a value, only         write the file)        encode: whether to bencode the torrent file before returning (will return         a dictionary if False)    By default, the torrent will be returned as a bencoded string. '''    metafile = {        'announce': announce,        'encoding' : 'UTF-8',        'created by': 'Steve\'s torrent maker ' + __version__,        'creation date': int(time()),        'info': {            'piece length': kwargs.get('piece_length', PIECE_LENGTH),            'private': kwargs.get('private', 1),            'name': basename(target),        }    }    if isdir(target):        files   = build_filelist(target)        metafile['info']['files'] = files        reader  = ChunkedReader([pathjoin(target, *i['path']) for i in files], PIECE_LENGTH)    else:        metafile['info']['length'] = stat(target).st_size        reader = ChunkedReader([target])    metafile['info']['pieces'] = bytearray().join(sha1(i).digest() for i in reader)    if 'filename' in kwargs:        save(metafile, kwargs['filename'])    elif kwargs.get('encode', 1):        return bencode(metafile)    else:        return metafile__version__ = '0.4'__author__ = 'Steve Howard'__all__ = ['make', 'bencode', 'bdecode', 'save', 'load', 'PIECE_LENGTH']