用Python教你如何“養”一只DHT爬蟲
廢話少說, 直接上菜.
我假設你了解:
1, DHT協議
2, 網絡字節序/主機字節序
3, bencode
4, UDP
5, 種子文件構造
不懂的趕緊去google, 要是缺一個, 我會一口鹽汽水噴死你的!
最重要的是, 你必須會編程!!!!!!! 必須會!!!!!!!!!!!
ok, DHT原理是什么我在這就不寫了, 畢竟會看我這文章的人都是已經知道了的.
本文貼的代碼均為Python, 使用其他編程語言的人可以看注釋. 為了簡單, 只會說大概思路和關鍵性代碼, 細節自行搞定.
本文講的是要實現一個爬蟲, 所以不會跟協議文檔那么嚴格. 只要保證你能正確請求,回應即可. 用軟件開發的一句話來說: 只要接口一致, 管你內部細節代碼是怎么寫的.
第一步, 構建自己的路由表, 這里涉及到大量Python代碼, 請深呼吸:
在構建自己的路由表之前, 得寫兩個輔助函數, 后面會用到:
- from hashlib import sha1
- from random import randint
- def node_id():
- """
- 把爬蟲"偽裝"成正常node, 一個正常的node有ip, port, node ID三個屬性, 因為是基于UDP協議,
- 所以向對方發送信息時, 即使沒"明確"說明自己的ip和port時, 對方自然會知道你的ip和port,
- 反之亦然. 那么我們自身node就只需要生成一個node ID就行, 協議里說到node ID用sha1算法生成,
- sha1算法生成的值是長度是20 byte, 也就是20 * 8 = 160 bit, 正好如DHT協議里說的那范圍: 0 至 2的160次方,
- 也就是總共能生成1461501637330902918203684832716283019655932542976個獨一無二的node.
- ok, 由于sha1總是生成20 byte的值, 所以哪怕你寫SHA1(20)或SHA1(19)或SHA1("I am a 2B")都可以,
- 只要保證大大降低與別人重復幾率就行. 注意, node ID非十六進制,
- 也就是說非FF5C85FE1FDB933503999F9EB2EF59E4B0F51ECA這個樣子, 即非hash.hexdigest().
- """
- hash = sha1()
- s = ""
- for i in range(20):
- s += chr(randint(0, 255))
- hash.update(s)
- return hash.digest()
- def intify(nid):
- """這是一個小工具, 把一個node ID轉換為數字. 后面會頻繁用到."""
- assert len(nid) == 20
- return long(nid.encode('hex'), 16) #先轉換成16進制, 再變成數字
協議里說道, table里有bucket, bucket里有node, 每個bucket有K個node, 目前K=8, 即每個bucket有8個node. 由于table范圍是0到2的160次方, 那么一個table最多能有(2的160次方)/K那么多的bucket.
OK, 按照OOP編程思想來說, 那么肯定會有table, bucket, node這3個類, 無OOP的, 自己看著辦.
由于是基于Kademila而寫, 所以我習慣上把這三個類名變為KTable, KBucket, KNode:
- class KNode:
- def __init__(self, nid, ip, port):
- """
- nid就是node ID的簡寫, 就不取id這么模糊的變量名了. __init__方法相當于別的OOP語言中的構造方法,
- 在python嚴格來說不是構造方法, 它是初始化, 不過, 功能差不多就行.
- """
- self.nid = nid #node ID
- self.ip = ip
- self.port = port
- #以下兩個方法非Python程序員不需關心
- def __eq__(self, other):
- return self.nid == other.nid
- def __ne__(self, other):
- return self.nid != other.nid
- class KBucket:
- def __init__(self, min, max):
- """
- min和max就是該bucket負責的范圍, 比如該bucket的min:0, max:16的話,
- 那么存儲的node的intify(nid)值均為: 0到15, 那16就不負責, 這16將會是該bucket后面的bucket的min值.
- nodes屬性就是個列表, 存儲node. last_accessed代表最后訪問時間, 因為協議里說到,
- 當該bucket負責的node有請求, 回應操作; 刪除node; 添加node; 更新node; 等這些操作時,
- 那么就要更新該bucket, 所以設置個last_accessed屬性, 該屬性標志著這個bucket的"新鮮程度". 用linux話來說, touch一下.
- 這個用來便于后面說的定時刷新路由表.
- """
- self.min = min #最小node ID數字值
- self.max = max #最大node ID數字值
- self.nodes = [] #node列表
- self.last_accessed = time() #最后訪問時間
- def nid_in_range(self, nid):
- """判斷指定的node ID是否屬于該bucket的范圍里"""
- return self.min <= intify(nid) < self.max
- def append(self, node):
- """
- 添加node, 參數node是KNode實例.
- 如果新插入的node的nid屬性長度不等于20, 終止.
- 如果滿了, 拋出bucket已滿的錯誤, 終止. 通知上層代碼進行拆表.
- 如果未滿, 先看看新插入的node是否已存在, 如果存在, 就替換掉, 不存在, 就添加,
- 添加/替換時, 更新該bucket的"新鮮程度".
- """
- if len(node.nid) != 20: return
- if len(self.nodes) < 8:
- if node in self.nodes:
- self.nodes.remove(node)
- self.nodes.append(node)
- else:
- self.nodes.append(node)
- self.last_accessed = time()
- else:
- raise BucketFull
- class KTable:
- """
- 該類只實例化一次.
- """
- def __init__(self, nid):
- """
- 這里的nid就是通過node_id()函數生成的自身node ID. 協議里說道, 每個路由表至少有一個bucket,
- 還規定第一個bucket的min=0, max=2的160次方, 所以這里就給予了一個buckets屬性來存儲bucket, 這個是列表.
- """
- self.nid = nid #自身node ID
- self.buckets = [KBucket(0, 2 ** 160)] #存儲bucket的例表
- def append(self, node):
- """添加node, 參數node是KNode實例"""
- #如果待插入的node的ID與自身一樣, 那么就忽略, 終止接下來的操作.
- if node.nid == self.nid: return
- #定位出待插入的node應該屬于哪個bucket.
- index = self.bucket_index(node.nid)
- bucket = self.buckets[index]
- #協議里說道, 插入新節點時, 如果所歸屬的bucket是滿的, 又都是活躍節點,
- #那么先看看自身的node ID是否在該bucket的范圍里, 如果不在該范圍里, 那么就把
- #該node忽略掉, 程序終止; 如果在該范圍里, 就要把該bucket拆分成兩個bucket, 按范圍"公平平分"node.
- try:
- bucket.append(node)
- except BucketFull:
- if not bucket.nid_in_range(self.nid): return #這個步驟很重要, 不然遞歸循環很狂暴, 導致程序死翹翹.
- self.split_bucket(index)
- self.append(node)
- def bucket_index(self, nid):
- """
- 定位bucket的所在索引
- 傳一個node的ID, 從buckets屬性里循環, 定位該nid屬于哪個bucket, 找到后, 返回對應的bucket的索引;
- 沒有找到, 說明就是要創建新的bucket了, 那么索引值就是最大索引值+1.
- 注意: 為了簡單, 就采用循環方式. 這個恐怕不是最有效率的方式.
- """
- for index, bucket in enumerate(self.buckets):
- if bucket.nid_in_range(nid):
- return index
- return index
- def split_bucket(self, index):
- """
- 拆表
- index是待拆分的bucket(old bucket)的所在索引值.
- 假設這個old bucket的min:0, max:16. 拆分該old bucket的話, 分界點是8, 然后把old bucket的max改為8, min還是0.
- 創建一個新的bucket, new bucket的min=8, max=16.
- 然后根據的old bucket中的各個node的nid, 看看是屬于哪個bucket的范圍里, 就裝到對應的bucket里.
- 各回各家,各找各媽.
- new bucket的所在索引值就在old bucket后面, 即index+1, 把新的bucket插入到路由表里.
- """
- old = self.buckets[index]
- point = old.max - (old.max - old.min)/2
- new = KBucket(point, old.max)
- old.max = point
- self.buckets.insert(index + 1, new)
- for node in old:
- if new.nid_in_range(node.nid):
- new.append(node)
- for node in new:
- old.remove(node)
- def find_close_nodes(self, target):
- """
- 返回與目標node ID或infohash的最近K個node.
- 定位出與目標node ID或infohash所在的bucket, 如果該bucuck有K個節點, 返回.
- 如果不夠到K個節點的話, 把該bucket前面的bucket和該bucket后面的bucket加起來, 只返回前K個節點.
- 還是不到K個話, 再重復這個動作. 要注意不要超出最小和最大索引范圍.
- 總之, 不管你用什么算法, 想盡辦法找出最近的K個節點.
- """
- nodes = []
- if len(self.buckets) == 0: return nodes
- index = self.bucket_index(target)
- nodes = self.buckets[index].nodes
- min = index - 1
- max = index + 1
- while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
- if min >= 0:
- nodes.extend(self.buckets[min].nodes)
- if max < len(self.buckets):
- nodes.extend(self.buckets[max].nodes)
- min -= 1
- max += 1
- num = intify(target)
- nodes.sort(lambda a, b, num=num: cmp(num^intify(a.nid), num^intify(b.nid)))
- return nodes[:K] #K是個常量, K=8
ok, 路由表這個玩意兒比較有技術含量, 又難以描述, 所以會貼代碼演示. 接下來的DHT客戶端/服務端就不再貼那么多的代碼了, 畢竟處理網絡的代碼太多太復雜, 技術含量不高, 按照DHT協議描述的那樣操作就行了.
第二步, 實現DHT客戶端
#p#
實現一個DHT客戶端, 不用都要實現ping, find_node, get_peers, announce_peer操作了, 做一個爬蟲, 僅僅只需要實現find_node功能就能達到目的了. 因為我們只想不停地交朋友而已.
要想加入DHT網絡, 首先得認識第一個已知的node, 這個node最好是長期在線的, 又穩定的. 我這里就認識一個, dht.transmissionbt.com:6881, 由于UDP的原因, 只能接受ip, 所以請提前解析出ip地址.
然后使用DHT協議說的那個find_node操作, 現在解釋一下某些key的潛在意義吧
t: transaction ID的簡稱, 傳輸ID. 起什么作用呢? 由于UDP無3次握手這個機制, 所以任何人都可隨便發送信息給你, 根本就不需與你提前進行連接. 想想這么個情況, 你發送了一請求數據給某node, 然后收到DHT說的回復類型的數據, 即y=r, 但是, 你怎么知道回應的是你之前的哪個請求呢? 所以就要靠t了, 當你發送時, t=aa的話, 對方回應這個請求時, 回應消息的t絕對是aa, 這樣你就能區分了. 在發送之前, 要為該t值注冊一個處理函數, 當收到回應時, 調用該函數進行處理. 記得設置一個定時器, 時間一到, 立馬刪除該函數, 不然你內存飆升. 如果超時后才收到信息的話, 很遺憾, 沒了處理函數, 直接忽略. 我建議定時器設定到5秒. 當然, 隨便你. 一定要保證成功收到第一個node的find_node回復, 不然失敗, 就沒法繼續find_node了.即: 認識不到第一個朋友, 就別想認識第二個朋友.
id: 就是自身node ID了, 是node_id()函數生成的哈, 樣子絕不是DHT協議例子中的樣子, 這主要是為了方便演示而已
target: 這個也是自身的node ID, 作用是問某node離我最近的都有哪些node.
收到對方回復后, 把那key為nodes給解析出來, 按DHT協議說的, 每個node是以4字節ip+2字節port+20字節node ID組成, 那么nodes的字節數就會是26的倍數. "解碼"node和"編碼"node的Python代碼如下:
- from struct import unpack
- def num_to_dotted(n):
- d = 256 * 256 * 256
- q = []
- while d > 0:
- m, n = divmod(n, d)
- q.append(str(m))
- d /= 256
- return '.'.join(q)
- def decode_nodes(nodes):
- n = []
- nrnodes = len(nodes) / 26
- nodes = unpack("!" + "20sIH" * nrnodes, nodes)
- for i in range(nrnodes):
- nid, ip, port = nodes[i * 3], num_to_dotted(nodes[i * 3 + 1]), nodes[i * 3 + 2]
- n.append((nid, ip, port))
- return n
- decode_nodes函數的反作用函數如下:
- def dotted_to_num(ip):
- hexn = ''.join(["%02X" % long(i) for i in ip.split('.')])
- return long(hexn, 16)
- def encode_nodes(nodes):
- n = []
- for node in nodes:
- n.extend([node.nid, dotted_to_num(node.ip), node.port])
- return pack("!" + "20sIH" * len(nodes), *n)
解析出來后, 插入到路由表里, 然后使用find_node繼續向剛解析出來的node進行請求, target還是自身node ID, 以此循環. 這樣就能認識很多很多的node啦. 細節就不說了, 自己看著辦.
第三步, 實現DHT服務器端, 協議文檔說得很清楚了, 我只列出幾個需要注意的問題:
1, 服務器端得實現處理node發來的ping, find_node, get_peers, announce_peer請求.
2, 回應信息里的t的值是對方的t值, 不是自己隨便寫的.
3, 最好要實現那個token機制, 這樣就減少被搗亂的幾率, 此token就按協議那種方式就行, 每5分鐘變換一次, 10分鐘內的有效.
4, 一定要記得前面說的那句"當該bucket負責的node有請求, 回應操作; 刪除node; 添加node; 更新node; 等這些操作時, 那么就刷新該bucket".
5, 由于是做DHT爬蟲, 所以處理get_peers請求和find_node請求差不多一樣, 都是返回最近的node. 當然, 你閑得蛋疼的話, 可以來得標準點, 做能返回peers那種, 不過, 沒必要.
6, announce_peer請求里的port就是對方提供下載種子的端口, 監聽于TCP, 不是DHT連接的端口. 還有請求消息里的id就僅僅指的是對方的node ID, 我看博客園某人寫的文章說是對方的peer ID, 我表示很不解.
第四步, 定時刷新路由表
按協議所說, 過一段時間后, 路由表里的node不全都是活躍了, 所以要定時刷新路由表. 說下大概思路, 遍歷路由表中的bucket列表, 看看bucket的last_accessed是不是超過了比如說15分鐘, 如果超過了, 說明有可能不"新鮮"了, 那么從該bucket隨機選擇一個node, 向該node發送find_node操作, 接著就不管了. 筆者為了簡單, 就采用這么簡單的方式了, 沒有去確認可疑node是否還"活"著. 你可以嚴格按照協議文檔那么寫.
你可能會問的問題:
1, 怎么知道一個資源的下載速度?
答: 有句話這么說: 下載人數越多, 下載速度越快. 那么, 如果某一個資源的infohash出現的announce_peer次數越高, 那么就說明下載人數就越多, 人數越多就越快. 這個下載速度就沒法用絕對速度表示, 不過可以使用相對速度.
2, 怎么在眾多的資源中過濾出影視資源?
答: 得到種子, 如果有files字段話, 遍歷它進行正則匹配, 看看有木有后綴名是rmvb, avi, mp4等什么的, 如果有, 那它大部分情況就是影視了. 如果沒有files字段, 就對name字段進行正則匹配吧.
3, 為什么那些影視資源總是些"很黃很暴力"?
答: 這個就不是很清楚了, 我想, 使用BT的人大多數都是些擼管男吧.
4, infohash這個詞是根據什么而來的?
答: 種子文件中info字段的hash值
5, 如何認識更多的node?
答: 多開啟幾個node實例.
6, 什么情況下, 對方把我給加入到對方的路由表里?
答: 當你向對方find_node時. 也許你的ping, get_peers也能讓對方把你添加到路由表里. 同理, 當你接到ping, find_node, get_peers時, 就把對方給添加到路由表里, 至少收到find_node請求時, 就要把對方給添加到路由表里.
7, 如何長期保存node?
答: 數據庫, 文件都行, 我不建議長期保存node, 你只要一直在線, 使用內存存儲最好不過了, 速度快, 代碼簡單.
原文鏈接:http://www.cnblogs.com/52web/p/How-to-write-a-DHT-crawler.html