Hadoop RPC遠程過程調用源碼解析及實例
什么是RPC?
1、RPC(Remote Procedure Call)遠程過程調用,它允許一臺計算機程序遠程調用另外一臺計算機的子程序,而不用去關心底層的網絡通信細節,對我們來說是透明的。經常用于分布式網絡通信中。
2、Hadoop的進程間交互都是通過RPC來進行的,比如Namenode與Datanode之間,Jobtracker與Tasktracker之間等。
RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通信程序之間攜帶信息數據。在OSI網絡通信模型中, RPC跨越了傳輸層和應用層。 RPC使得開發包括網絡分布式多程序在內的應用程序更加容易。
RPC采用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。
首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,然后等待應答信息,在服務器端,進程保持睡眠狀態直到調用信息的到達為止。當一個調用信息到達,服務器獲得進程參數,計算結果,發送答復信息給client,然后等待下一個調用信息,最后,客戶端調用進程接收答復信息,獲得進程結果,然后調用執行繼續進行。
RPC特點
1、透明性:遠程調用其他機器上的程序,對用戶來說就像是調用本地方法一樣。
2、高性能:RPC server能夠并發處理多個來自Client的請求(請求隊列)。3、可控性:jdk中已經提供了一個RPC框架–RMI,但是該RPC框架過于重量級并且可控之處比較少,所以Hadoop RPC實現了自定義的RPC框架。
Hadoop RPC通信
1、序列化層:Client與Server端 通信傳遞的信息采用了Hadoop里提供的序列化類或自定義Writable類型。
2、函數調用層:Hadoop RPC通過動態代理以及Java反射機制實現函數調用。
3、網絡傳輸層:Hadoop RPC采用了基于TCP/IP的socket機制。
4、服務器端框架層:RPC Server利用Java NIO以及采用了事件驅動的I/O模型,提高RPC Server的并發處理能力。
Hadoop的整個體系結構就是構建在RPC之上(org.apache.hadoop.ipc)。
Hadoop RPC設計技術
1、動態代理
2、反射3、序列化4、非阻塞的異步IO(NIO)
動態代理
1、動態代理可以提供對另一個對象的訪問,同時隱藏實際對象的具體事實,代理對象對客戶隱藏了實際對象。
2、動態代理可以對請求進行其他的一些處理,在不允許直接訪問某些類,或需要對訪問做一些特殊處理等,這時候可以考慮使用代理。3)目前Java開發包中提供了對動態代理的支持,但現在只支持對接口的實現。相關的類與接口:java.lang.reflect.Proxy--類 java.lang.reflect.InvocationHandler--接口
動態代理創建對象過程:
InvocationHandler handler = new InvocationHandlerImpl(...) Proxy.newInstance(...)
具體實現可參考如下:
根據上圖查看hadoop2.6.0源碼
Client
Server
RPC
幾個重要的協議
ClientProtocol是客戶端(FileSystem)與NameNode通信的接口。
DatanodeProtocol是DataNode與NameNode通信的接口NamenodeProtocol是SecondaryNameNode與NameNode通信的接口。DFSClient是直接調用NameNode接口的對象。用戶代碼是通過DistributedFileSystem調用DFSClient對象,才能與NameNode打交道。
模擬Hadoop RPC通信
- package MyRPC;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.ipc.VersionedProtocol;
- public interface MyRPCProtocal extends VersionedProtocol{
- public static long versionID = 23234l;//很重要很重要,搞了一下午才解決掉。
- public Text test(Text t);
- }
- package MyRPC;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.ipc.ProtocolSignature;
- import org.apache.hadoop.ipc.RPC;
- import org.apache.hadoop.ipc.RPC.Server;
- public class RPCServer implements MyRPCProtocal{
- Server server = null;
- public RPCServer() throws IOException, InterruptedException{
- //server = RPC.getServer(this,"localhost",8888,new Configuration());
- //相對于以前的版本有略微的改動
- RPC.Builder ins = new RPC.Builder(new Configuration());
- ins.setInstance(this);
- ins.setBindAddress("localhost");
- ins.setPort(9999);
- ins.setProtocol(MyRPCProtocal.class);
- //RPC.setProtocolEngine(new Configuration(), MyRPCProtocal.class, RpcEngine.class);
- server = ins.build();//獲得一個server實例
- server.start();
- server.join();
- }
- public static void main(String[] args) throws IOException, InterruptedException {
- new RPCServer();
- }
- @Override
- public long getProtocolVersion(String protocol, long clientVersion)
- throws IOException {
- return MyRPCProtocal.versionID;
- }
- @Override
- public ProtocolSignature getProtocolSignature(String protocol,
- long clientVersion, int clientMethodsHash) throws IOException {
- return new ProtocolSignature();
- }
- @Override
- public Text test(Text t) {
- if(t.toString().equals("RPC")){
- return new Text("ok");
- }
- return new Text("false");
- }
- }
- package MyRPC;
- import java.net.InetSocketAddress;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.ipc.RPC;
- public class RPCClient {
- private MyRPCProtocal protocal;
- public RPCClient() throws Exception{
- InetSocketAddress address = new InetSocketAddress("localhost",9999);
- protocal = (MyRPCProtocal)RPC.waitForProxy
- (MyRPCProtocal.class,MyRPCProtocal.versionID, address, new Configuration());
- //RPC.setProtocolEngine(new Configuration(), MyRPCProtocal.class, RpcEngine.class);
- }
- public void call(String s){
- final Text string = protocal.test(new Text(s));
- System.out.println(string.toString());
- }
- public static void main(String[] args) throws Exception {
- RPCClient client = new RPCClient();
- client.call("RPC");
- }
- }