blog

Pythonは、DHTクローラを "上げる "方法を教える

この記事はクローラの実装についてですので、プロトコルのドキュメントほど厳密ではありません。 ただ、リクエストとレスポンスが正しくできることを確認してください。 ソフトウェア開発の用語では、インターフェ...

May 20, 2015 · 18 min. read
シェア

前置きはこのくらいにして、料理に移りましょう。

ご理解いただけたと思います

1、DHTプロトコル

2、ネットワークバイトオーダー/ホストバイトオーダー

3、ベンコード

4、UDP

5、シードファイル作成

わからない人はググってください!1つでも欠けてたら、塩水を口に含んで死にます!

最も重要なことは、プログラミングができることです! 知っていなければなりません!

この記事を読んでいる人なら誰でも知っていることですから。

この記事はクローラーの実装に関するものなので、プロトコルのドキュメントほど厳密ではありません。 ただ、リクエストとレスポンスが正しくできることを確認してください。 ソフトウェア開発で言うなら、インターフェイスが一貫していれば、内部コードをどう書こうが関係ありません。

最初のステップは、独自のルーティングテーブルを構築することです。これは多くのPythonコードを含みますので、深呼吸してください。

独自のルーティングテーブルを作成するには、後で使用する2つのヘルパー関数を書く必要があります。

from hashlib import sha1  
from random import randint  
def node_id():  
    """  
    把爬虫"伪装"成正常node, 一个正常的node有ip, port, node ID三个属性, 因为是基于UDP协议,   
    所以向对方发送信息时, 即使没"明确"说明自己的ip和port时, 对方自然会知道ip和port,   
    また、その逆も同様である。. 那么自身node就只需要生成一个node ID就行, 协议里说到node ID用sha1算法生成,   
    sha1算法生成的值是长度是20 byte, 也就是20 * 8 = 160 bit, それはちょうどDHTプロトコルが言うようなものだ。: 0 至 2的160次方,   
    つまり、合計で1461501637330902918203684832716283019655932542976ユニークなノードを生成することができる。.   
    ok, 由于sha1总是生成20 byte的值, 所以哪怕你写SHA1(20)或SHA1(19)或SHA1("I am a 2B")都可以,   
    ただ、大幅に他の人との重複の可能性を減らすようにしてください。. 注意, node ID非十六进制,   
    也就是说非FF5C85FE1FDB933503999F9EB2EF59E4B0F51ECA这个样子, 即非hash.hexdigest().  
    """ 
    hash = sha1()  
    s = ""  
    for i in range(20):  
        s += chr(randint(0, 255))  
    hash.update(s)  
    return hash.digest()  
 
def intify(nid):  
    """这是一个小工具, 把一个node ID转换为数字. あなたは後でそれがたくさん必要になる。.""" 
    assert len(nid) == 20 
    return long(nid.encode('hex'), 16) #先转换成16进制, 再变成数字 

プロトコルは、テーブルにはバケット、バケットにはノードがあり、各バケットにはK個のノードがあり、現在はK=8、つまり各バケットには8個のノードがあります。 テーブルの範囲は0から2の160乗なので、テーブルには最大/K個のバケットを持つことができます。

OOPプログラミングの考え方に従えば、テーブル、バケット、ノードの3つのクラスが必要です。

Kademilaをベースにしているので、3つのクラス名をKTable、KBucket、KNodeに変えるのが習慣になっています。

class KNode:  
    def __init__(self, nid, ip, port):  
        """  
        nid就是node ID的简写, 就不取id这么模糊的变量名了. __init__メソッドは、他のOOP言語のコンストラクタのメソッドに相当する。,   
        在python严格来说不是构造方法, 它是初始化, 不过, 功能差不多就行.  
        """ 
        self.nid = nid #node ID  
        self.ip = ip  
        self.port = port  
 
    #ここでは、非Pythonプログラマが心配する必要はありません2つの方法がある。  
    def __eq__(self, other):  
        return self.nid == other.nid  
    def __ne__(self, other):  
        return self.nid != other.nid  
 
 
class KBucket:  
    def __init__(self, min, max):  
        """  
        min和max就是该bucket负责的范围, 比如该bucket的min:0, max:16 ,   
        次に、保存されたノードのintify値はすべて次のとおりである。: 0到15, 那16就不负责, 这16将会是该bucket后面的bucket的min值.   
        nodes属性就是个リスト, 存储node. last_accessed代表最終アクセス時間, 因为协议里说到,   
        当该bucket负责的node有请求, 回应操作; ノードを削除する; ノードを追加する; ノードを更新する; これらの操作が行われる,   
        那么就要更新该bucket, 所以设置个last_accessed属性, 该属性标志着这个bucket的"新鲜程度". 用linux话来说, touch一下.  
        これは、後で見るように、一定の間隔でルーティングテーブルを更新するために使用することができる。.  
        """ 
        self.min = min #最小node ID数字值  
        self.max = max #最大node ID数字值  
        self.nodes = [] #node列表  
        self.last_accessed = time() #最后访问时间  
 
    def nid_in_range(self, nid):  
        """判断指定的node ID是否属于该bucket的范围里""" 
        return self.min <= intify(nid) < self.max  
 
    def append(self, node):  
        """  
        添加node, 参数node是KNode实例.  
 
        如果新插入的node的nid属性长度不等于20, 终止.  
        如果满了, 抛出bucket已满的错误, 终止. テーブルを分割する上位レベルのコードを通知する.  
        如果未满, 先看看新插入的node是否已存在, 如果存在, 就替换掉, 不存在, 就添加,  
        添加/替换时, 更新该bucket的"新鲜程度".  
        """ 
        if len(node.nid) != 20: return 
        if len(self.nodes) < 8:  
            if node in self.nodes:  
                self.nodes.remove(node)  
                self.nodes.append(node)  
            else:  
                self.nodes.append(node)  
            self.last_accessed = time()  
        else:  
            raise BucketFull  
 
 
class KTable:  
    """  
    このクラスは一度だけインスタンス化される.  
    """ 
    def __init__(self, nid):  
        """  
        ここでnidは、ノードのメモリに格納されているノードである。_id()函数生成的自身node ID. 协议里说道, 每个路由表至少有一个bucket,   
        また、最初のバケットの最小値を指定する=0, max=2的160次方, 所以这里就给予了一个buckets属性来存储bucket, 这个是列表.  
        """ 
        self.nid = nid #自身node ID  
        self.buckets = [KBucket(0, 2 ** 160)] #バケットを格納する例  
 
    def append(self, node):  
        """添加node, 参数node是KNode实例""" 
 
        #如果待插入的node的ID与自身一样, 那么就忽略, 终止接下来的操作.  
        if node.nid == self.nid: return   
 
        #挿入するノードが属するバケットを見つける。.  
        index = self.bucket_index(node.nid)  
        bucket = self.buckets[index]  
 
        #协议里说道, 插入新节点时, 如果所归属的bucket是满的, 又都是活跃节点,   
        #那么先看看自身的node ID是否在该bucket的范围里, 如果不在该范围里, 那么就把  
        #该node忽略掉, 程序终止; 如果在该范围里, 就要把该bucket拆分成两个bucket, 按范围"公平平分"node.  
        try:  
            bucket.append(node)  
        except BucketFull:  
            if not bucket.nid_in_range(self.nid): return #这个步骤很重要, 不然递归循环很狂暴, 导致程序死翘翘.  
            self.split_bucket(index)   
            self.append(node)  
 
    def bucket_index(self, nid):  
        """  
        バケットのインデックスを見つける  
 
        传一个node的ID, 从buckets属性里循环, 定位该nid属于哪个bucket, 找到后, 返回对应的bucket的索引;   
        没有找到, 说明就是要创建新的bucket了, 那么索引值就是最大索引值+1.  
         : 为了简单, 就采用循环方式. 私はこれが最も効率的な方法ではないと思う。.  
        """ 
        for index, bucket in enumerate(self.buckets):  
            if bucket.nid_in_range(nid):  
                return index  
        return index          
 
    def split_bucket(self, index):  
        """  
        テーブルを分割する  
 
        index是待拆分的bucket(old bucket)的所在索引值.   
        假设这个old bucket的min:0, max:16. 拆分该old bucket的话, 分界点是8, 然后把old bucket的max改为8, min还是0.   
        创建一个新的bucket, new bucket的min=8, max=16.  
        然后根据的old bucket中的各个node的nid, 看看是属于哪个bucket的范围里, 就装到对应的bucket里.   
        自分の家に戻って、自分の母親を見つける。.  
        new bucket的所在索引值就在old bucket后面, 即index+1, ルーティングテーブルに新しいバケットを挿入する。.  
        """ 
        old = self.buckets[index]  
        point = old.max - (old.max - old.min)/2 
        new = KBucket(point, old.max)  
        old.max = point  
        self.buckets.insert(index + 1, new)  
        for node in old:  
            if new.nid_in_range(node.nid):  
                new.append(node)  
        for node in new:  
            old.remove(node)          
 
    def find_close_nodes(self, target):  
        """  
        返回与目标node ID或infohash的最近K个node.  
 
        定位出与目标node ID或infohash所在的bucket, 如果该bucuck有K个节点, 返回.   
        如果不够到K个节点的话, 把该bucket前面的bucket和该bucket后面的bucket加起来, 只返回前K个节点.  
        还是不到K个话, 再重复这个动作. 最小および最大インデックスの範囲を超えないように注意する.  
        总之, 不管你用什么算法, 想尽办法找出最近的K个节点.  
        """ 
        nodes = []  
        if len(self.buckets) == 0: return nodes  
        index = self.bucket_index(target)  
        nodes = self.buckets[index].nodes  
        min = index - 1 
        max = index + 1 
        while len(nodes) < K and (min >= 0 or max < len(self.buckets)):  
            if min >= 0:  
                nodes.extend(self.buckets[min].nodes)  
            if max < len(self.buckets):  
                nodes.extend(self.buckets[max].nodes)  
            min -= 1 
            max += 1 
 
        num = intify(target)  
        nodes.sort(lambda a, b, num=num: cmp(num^intify(a.nid), num^intify(b.nid)))  
        return nodes[:K] #Kそれは一定である。, K=8 

OK]を、ルーティングテーブルは、このことは、より技術的であり、記述するのは難しいので、実証するためのコードを投稿します。 次のDHTクライアント/サーバーは、結局のところ、ネットワークのコードを扱うことがあまりにもあまりにも複雑で、技術的な内容が高くない、DHTプロトコルに従って、行の操作を説明するので、多くのコードを投稿しません。

ステップ2、DHTクライアントの実装

#p#

DHTクライアントを実装するには、ping、find_node、get_peers、announce_peerを実装する必要はなく、クローラーを行うには、find_node関数を実装するだけで目的を達成できます。 なぜなら、私はただ友達を作り続けたいだけだからです。

DHTネットワークに参加するには、最初の既知のノードを知る必要があります。 dht.transmissionbt.com:6881は、UDPのため、ipアドレスしか受け付けません。

次に、DHTプロトコルが行うというfind_nodeオペレーションを使用して、いくつかのキーの潜在的な意味を説明します。

t:トランザクションIDの略。 これは何をするものですか? UDPには3ウェイ・ハンドシェイクがないため、事前に接続しなくても、誰でも情報を送ることができます。 あるノードにリクエストを送信すると、DHTが要求しているタイプのリプライ、つまりy=rが返ってくる状況を考えてみてください。 t=aaで送信すれば、相手がリクエストに応答したとき、応答メッセージのtは間違いなくaaになります。 送信するときに、そのt値に対応するハンドラ関数を登録し、レスポンスを受信したら、その関数を呼び出して処理します。 タイマーを設定し、時間が来たらすぐに関数を削除することを忘れないでください。 タイムアウト後にメッセージを受け取った場合、残念ながらハンドラはありません。 タイマーを5秒に設定することをお勧めします。 もちろん、それはあなた次第です。 最初のノードからのfind_nodeリプライが成功したことを確認してください。つまり、最初の友達に会えなければ、2番目の友達にも会えないということです。

idは:独自のノードIDは、ハハによって生成されたnode_id()関数は、外観のDHTプロトコルの例ではありませんが、これは主にデモの便宜上のみです。

target:これも独自のノードIDで、私に最も近いノードを尋ねます。

DHTプロトコルによると、各ノードは4バイトのip + 2バイトのポート + 20バイトのノードIDの構成であり、ノードのバイト数は26の倍数になります。 デコード」ノードと「エンコード」ノードのPythonコードは以下の通りです。

from struct import unpack  
 
def num_to_dotted(n):  
    d = 256 * 256 * 256 
    q = []  
    while d > 0:  
        m, n = divmod(n, d)  
        q.append(str(m))  
        d /= 256 
    return '.'.join(q)  
 
def decode_nodes(nodes):  
    n = []  
    nrnodes = len(nodes) / 26 
    nodes = unpack("!" + "20sIH" * nrnodes, nodes)  
    for i in range(nrnodes):  
        nid, ip, port = nodes[i * 3], num_to_dotted(nodes[i * 3 + 1]), nodes[i * 3 + 2]  
        n.append((nid, ip, port))  
    return n  
 
decode_nodes関数の逆関数は次のとおりである。:  
def dotted_to_num(ip):  
    hexn = ''.join(["%02X" % long(i) for i in ip.split('.')])  
    return long(hexn, 16)  
 
def encode_nodes(nodes):  
    n = []  
    for node in nodes:  
        n.extend([node.nid, dotted_to_num(node.ip), node.port])  
    return pack("!" + "20sIH" * len(nodes), *n) 

パースされると、ルーティングテーブルに挿入され、find_nodeを使って、新しくパースされたノードに、ターゲットはそのノードIDのまま、リクエストを出し続けます。 このようにして、たくさんのノードを知ることができます。 詳細は省きます。

3番目のステップ、DHTサーバーサイドの実装ですが、プロトコルのドキュメントは非常に明確です。

2、応答メッセージのtの値は、相手のtの値であり、あなたが書いたものではありません。

3、それは、有効性の10分以内に、変更するには、5分ごとに、行にそのようにプロトコルに従って、トークンを台無しにされる可能性を減らすように、トークンのメカニズムを実装することが最善です。

4、前の文を覚えておく必要があります "バケットを担当するノードの要求がある場合、操作に応答する;ノードの削除;ノードの追加;ノードの更新;など、これらの操作では、バケットを更新"。

5、DHTクローラなので、get_peersリクエストとfind_nodeリクエストを処理するのはほぼ同じで、一番近いノードに返されます。 もちろん、首の痛みに時間がない、標準に来ることができる、その種のピアを返すために行うことができますが、必要はありません。

6、ポートのanounce_peer要求は、TCPではなく、DHTの接続ポートをリッスン、ダウンロードシードポートを提供するために反対側です。 そして、idのリクエストメッセージは、単にノードIDの反対側を指し、私はピアIDの反対側である記事を書いたブログ公園を読んで、私は非常に混乱を表明しました。

ステップ4:一定間隔でルーティングテーブルを更新

プロトコルによると、一定時間後に、ルーティングテーブル内のノードがすべてアクティブではないので、我々は定期的にルーティングテーブルを更新する必要があります。 アイデアは、バケットリストのルーティングテーブルをトラバース、バケットlast_accessedは、例えば、15分以上でないことを確認し、それ以上の場合は、 "新鮮な "ではない可能性があり、その後、ランダムにバケットからノードを選択し、ノードにfind_node操作を送信し、関係ありません。 シンプルにするために、疑わしいノードがまだ "生きている "かどうかをチェックせずに、このシンプルなアプローチを採用しました。 プロトコルのドキュメントに忠実に従えばよいのです。

と聞かれるかもしれません。

1、リソースのダウンロード速度を知るには?

A: 「より多くの人がダウンロードするほど、ダウンロード速度は速くなる」ということわざがあります。 つまり、あるリソースのインフォハッシュにおけるannounce_peerの出現数が多ければ多いほど、より多くの人がそれをダウンロードしていることになり、より多くの人がダウンロードすればするほど、ダウンロード速度は速くなります。 ダウンロード速度は絶対値で表すことはできませんが、相対値で表すことができます。

2、多くのリソースの中から映画やテレビのリソースをフィルタリングする方法は?

答え:シードを取得し、filesフィールドがあれば、それを繰り返し、接尾辞にrmvb、avi、mp4などがあるかどうかを正規検索します。 filesフィールドがない場合は、nameフィールドを正規検索します。

3、なぜ映画やテレビの資料はいつも「非常にポルノ的で暴力的」なのですか?

4、インフォハッシュという言葉は何に基づいているのですか?

A: シードファイルのinfoフィールドのハッシュ値。

5、ノードについてもっと知るには?

A: より多くのノードインスタンスを開いてください。

6、どのような状況で、相手側は私を相手側のルーティングテーブルに追加しますか?

A: find_nodeで相手側を見つけたら。 多分、ping、get_peersでルーティングテーブルに追加できます。 同様に、ping、find_node、get_peersを受信したとき、相手をルーティングテーブルに追加します。少なくとも、find_nodeリクエストを受信したとき、相手をルーティングテーブルに追加します。

7、ノードを長期間保存する方法は?

A:データベースは、ファイルをすることができます、私は長期ストレージノードをお勧めしませんが、あなただけの常にオンライン、メモリストレージを使用して、最高の、高速、シンプルなコードです。

Read next

オープンデータ:2014年最もホットな競争分野

2014年はオープンデータにとって重要な年になるでしょう。オープンデータ活用の焦点は、オープンデータを活用したイノベーションと、全く新しい活用分野の発見にあります。オープンデータ活用の注目分野トップ5は?成功している企業とは?その背景にあるビジネスモデルと収益モデルとは?この記事で明らかにします。

May 20, 2015 · 5 min read