Hadoop源码学习笔记——Socket到RPC调用

Hadoop是一个分布式程序,分布在多台机器上运行,事必会涉及到网络编程。那这里如何让网络编程变得简单、透明的呢?
超人学院吴超老师为你讲解:
网络编程中,首先我们要学的就是Socket编程,这是网络编程中最底层的程序接口,分为服务器端和客户端,服务器负责监听某个端口,客户端负责连接服务器上的某个端口,一旦连接通过后,服务器和客户端就可以双向通讯了。

操作方法

  • 01

    1.   ServerSocket server = new ServerSocket(8111); 2.   Socket socket = server.accept(); 3. 4.   //由Socket对象得到输入流,并构造相应的BufferedReader对象 5.   BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream())); 6.   //由Socket对象得到输出流,并构造PrintWriter对象 7.   PrintWriter os = new PrintWriter(socket.getOutputStream()); 8. 9.   while(true){ 10.     String inline = is.readLine(); 11.     System.out.println(" 收到信息:" + inline); 12.     //服务器反回 13.     os.println("serverSend:" + inline); 14.     os.flush(); 15.     if (inline == "bye") 16.  break; 17.  } 18.  os.close(); 19.  is.close(); 20.  socket.close(); 21.  server.close(); 22.  System.out.println("服务器退出");

  • 02

    1.   Socket socket = new Socket("127.0.0.1",8111); 2. 3.   //由Socket对象得到输入流,并构造相应的BufferedReader对象 4.   BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream())); 5.   //由Socket对象得到输出流,并构造PrintWriter对象 6.   PrintWriter os = new PrintWriter(socket.getOutputStream()); 7.   BufferedReader sin=new BufferedReader(new InputStreamReader(System.in)); 8.   while(true){ 9.      System.out.println("请输入:"); 10.     String line = sin.readLine(); 11.     os.println(line); 12.     os.flush(); 13.     String inline = is.readLine(); 14.     System.out.println("服务器获取值:" + inline); 15.     if (line=="bye") 16.        break; 17.  } 18.  os.close(); 19.  is.close(); 20.  socket.close(); 21.  System.out.println("客户端退出");

  • 03

    这两段代码分别帖入两个类中,分开执行,先执行服务器端,再执行客户端,就可以互发消息了。 观察下代码,发现代码中下面4~20行逻辑是一至的,都是通过流来通讯,所以Socket中不同的是开始地方,服务器是通过server.accept()来获取Socket,而客户端是通过直接创建Socket对象的。 这段代码,其本运行是没问题的,但存在一个问题,就是当客户端接入时服务器端的accept函数才走下去,不然的话,会一直处于卡死等待状态。包括getInputStream函数,也会等待双方接通后,才往下走。除非等到客户端接入,或中断。当然有人会说,可以引入多线程啊,没错,是可以,但是想一下,是不是每个客户接入都得有一个线程? 否则少一个线程,就会有一堆的卡着。所以这种方式不适合在大最客户端接入的情况。 在JDK1.4引入了非阻塞的通信方式,这样使得服务器端只需要一个线程就能处理所有客户端socket的请求。 下面是几个需要用到的核心类: ·         ServerSocketChannel: ServerSocket 的替代类, 支持阻塞通信与非阻塞通信. ·         SocketChannel: Socket 的替代类, 支持阻塞通信与非阻塞通信. ·         Selector: 为ServerSocketChannel 监控接收客户端连接就绪事件, 为 SocketChannel 监控连接服务器就绪, 读就绪和写就绪事件. ·         SelectionKey: 代表 ServerSocketChannel 及 SocketChannel 向 Selector 注册事件的句柄. 当一个 SelectionKey 对象位于Selector 对象的 selected-keys 集合中时, 就表示与这个 SelectionKey 对象相关的事件发生了.在SelectionKey 类中有几个静态常量 · SelectionKey.OP_ACCEPT->客户端连接就绪事件 等于监听serversocket.accept()返回一个socket · SelectionKey.OP_CONNECT->准备连接服务器就绪跟上面类似,只不过是对于socket的相当于监听了socket.connect() · SelectionKey.OP_READ->读就绪事件, 表示输入流中已经有了可读数据, 可以执行读操作了 · SelectionKey.OP_WRITE->写就绪事件 所以服务器端代码就可以升一下级了,变成如下: 1.   public class SocketChannelTest implements Runnable { 2. 3.      @Override 4.      public void run() { 5.         while (true) { 6.            try { 7.               selector.select(); 8.               Set<SelectionKey> keys = selector.selectedKeys(); 9.               Iterator<SelectionKey> iter = keys.iterator(); 10.              SocketChannel sc; 11.              while (iter.hasNext()) { 12.                 SelectionKey key = iter.next(); 13.                 if (key.isAcceptable()) 14.                    ; // 新的连接 15.                 else if (key.isReadable()) 16.                    ;// 可读 17.                 iter.remove(); // 处理完事件的要从keys中删去 18.              } 19.           } catch (Exception e) { 20.              e.printStackTrace(); 21.           } 22.        } 23.     } 24.     static Selector selector; 25. 26.     public static void main(String[] args) throws IOException, 27.           InterruptedException { 28.        selector = Selector.open(); // 静态方法 实例化selector 29.        ServerSocketChannel serverChannel = ServerSocketChannel.open(); 30.        serverChannel.configureBlocking(false); // 设置为非阻塞方式,如果为true 那么就为传统的阻塞方式 31.        serverChannel.socket().bind(new InetSocketAddress(8001)); // 绑定IP 及 端口 32.        serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 注册 33.                                                     // OP_ACCEPT事件 34.        Thread thd = new Thread(new SocketChannelTest()); 35.        thd.start();// 开启线程 处理请求 36.        thd.join(); 37.     } 38.  }

  • 04

    好,这样通讯代码简化了。但继续想,我们通讯的目的是什么?客户端发一个指令,服务器执行一些内容,然后把结果返回给客户端。这不就像调用一下函数么,调用函数名、传入参数、返回值。 这个就称之为远程方法调用(RPC Remote Procedure Call Protocol),毫无疑问,这个RPC实现肯定是基于上面的这个Socket的。至于具体如何实现呢,我们看下面的分解。 在看实现之前,我们先看一下,这个RPC是如何用的,如何做到调用透明的: 我们在src下新建一个RPCTest的包,定义一个功能接口IRPCTestEntity.java: 1.   package RPCTest; 2.   import org.apache.hadoop.ipc.VersionedProtocol; 3.   public interface IRPCTestEntity  extends VersionedProtocol { 4.       int Calc(int x,int y); 5.   } 该接口中有一个Calc的函数。 定义一个实现类RPCTestEntity.java: 1.   package RPCTest; 2.   import java.io.IOException; 3.   public class RPCTestEntity implements IRPCTestEntity{ 4.      @Override 5.      public long getProtocolVersion(String protocol, long clientVersion)  throws IOException { 6.         return 0; 7.      } 8. 9.      public int Calc(int x,int y){ 10.        int z =0 ; 11.        z = x + y; 12.        return z; 13.     } 14. 15.  } 这个类中实现了Calc函数,执行内容为将x,y相加,将和返回。 我们再定义一个服务器类(RPCTestSvr.java),将该实现类注册成RPC服务: 1.   package RPCTest; 2.   import java.io.IOException; 3. 4.   public class RPCTestSvr { 5.      public static void main(String[] args) throws IOException, InterruptedException { 6.         RPCTestEntity obj = new RPCTestEntity(); 7.         Configuration conf = new Configuration(); 8.         Server server = RPC.getServer(obj, "", 9001, conf); 9.         server.start(); 10.        server.join(); 11.     } 12.  } 代码比较简单,定义了一个RPCTestEntity的实体,然后RPC创建一个Server,传入实体对象,然后这个服务就调用join卡住,用于不断接收请求。 创建完后,就可把这个"服务器"启动起来了。 再创建一个客户端(RPCTestClient.java): 1.   package RPCTest; 2. 3.   import java.io.IOException; 4.   import java.net.InetSocketAddress; 5. 6.   import org.apache.hadoop.conf.Configuration; 7.   import org.apache.hadoop.ipc.RPC; 8.   import org.apache.hadoop.ipc.VersionedProtocol; 9. 10.  public class RPCTestClient { 11.     public static void main(String[] args) throws IOException { 12.        InetSocketAddress addr = new InetSocketAddress("127.0.0.1",9001); 13.        Configuration conf = new Configuration(); 14.        VersionedProtocol obj = RPC.getProxy(IRPCTestEntity.class, 0, addr, conf); 15.        IRPCTestEntity ent = (IRPCTestEntity)obj; 16.        int x = ent.Calc(5, 6); 17.        System.out.println(x); 18.     } 19.  } 这里,我们通过RPC.getProxy函数获了一个IRPCTestEntity的接口实例,然后就可以直接调用了。 运行后,发现这个值马上返回了过来,同时在"服务器"端也会收到一定的请求信息。说明两者之间通了。 仔细看,这个客户端中,整个过程就没有涉及到RPCTestEntity这个实现的实体,换句话说,客户端产生的是一个虚拟的实现类,然后调用起来了。 OK,示例程序跑起来了,也带给我们几个问题,1、这个客户端中的obj是什么对象?2、为什么我们调用obj对象中的函数(Calc)会跑到服务器上运行,如何实现的? 底层的通讯,我们是知道的,肯定用socket,用它能够传递各种数据。如何与函数关联呢? 我们进入getProxy函数, 我们看到这个getProxy函数中,返回了VersionedProtocol接口的对象,从字面意思,这个Proxy意为代理, 所以我们得到的obj就是一个代理类。同时也看出,要作为RPC处理对象,这个接口必实现VersionedProtocol(简单地看下里面,只有一个函数,返回版本号,是用于判断双方版本所用,只有版本匹配,才能调用)。 其创建可以看到,用到了: Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(addr, ticket, conf, factory)); 然后这个代理类,就自动实现了伟放的protocol这个接口类型。然后当我们调用代理类中的函数时,这个传入的Invoker类,就会收到通知,通知里包含了调用信息,我们进入Invoker中看一下: private static class Invoker implements InvocationHandler 这是一个写在RPC类中的内部类,且是私有的,意思就是只为这个RPC调用,其实现的规定接口InvocationHandler,那么就要实现规定的函数Invoke咯: 1.   public Object invoke(Object proxy, Method method, Object[] args) 2.        throws Throwable { 3.        final boolean logDebug = LOG.isDebugEnabled(); 4.        long startTime = 0; 5.        if (logDebug) { 6.          startTime = System.currentTimeMillis(); 7.        } 8. 9.        ObjectWritable value = (ObjectWritable) 10.         client.call(new Invocation(method, args), address, 11.                     method.getDeclaringClass(), ticket); 12.       if (logDebug) { 13.         long callTime = System.currentTimeMillis() - startTime; 14.         LOG.debug("Call: " + method.getName() + "" + callTime); 15.       } 16.       return value.get(); 17.     } 这个invoke函数,就是当我们调用代理类中的函数(obj.Calc)时,会收到的请求,看下参数,传入的有,Method(函数),args(参数),一应俱全,有了这些内容后,就可以调用底层的Socket,将这些信息打包起来(放入的Invocation类)中,一并发向服务器中。 同时,服务器端中,就比较容易了,在收到请求后,就可以解析出要调用的函数和参数,然后通过反射来调用在服务器一开始注册上的对象中的函数,再将返回值通过Socket传回客户端,再由这个invoke函数将值返回。 OK,这个几个点想通了,整个过程就容易理解了。总之: 服务器端——注册服务:RPC.getServer(obj, "", 9001, conf); 客户端——取得代理类:obj = RPC.getProxy() 通过这样的包装后,网络访问就非常透明了。

(0)

相关推荐

  • Linux系统下使用ctags查找源码的方法

    太多的文件导致我们无法去定位一个函数的实现或者一个宏定义的位置。显然对于源码的阅读和个人的学习都是非常不利的。多么希望有一个软件能够输入命令就能够精确的定位函数的位置。是的,现在有这么一个工具:cta ...

  • Linux下源码安装的经验详解

    在linux下安装软件,难免会碰到需要源码安装的,而就是这简简单单的./configure.make.sudo make install三步,却让不少人头疼不已,这里以安装X11为例具体介绍下我在安装 ...

  • 易语言编写盗号源码

    现在有人学习易语言想盗号,就是不知道易语言盗号源码是什么.怎么发到指定邮箱.今天我就来教大家怎么编写易语言盗号源码. 操作方法 01 打开易语言,新建一个Windows窗口程序. 02 然后呢,制作一 ...

  • 易语言——如何写计算器源码?

    关于易语言编写计算器,方法有很多,这里要编写的源码比较简单,适合初学者学习.效果如下: 一.绘制窗体. 01 第一步不用说,打开易语言,新建-- 02 然后分别用编辑框和按钮绘制软件框架. 03 再是 ...

  • Eclipse 如何查看jdk源码

    java很多东西都是开源的,jdk的源码也不例外,能查看到jdk的源码能方便我们学习.jdk源码也是官方的源码,源码信息安全可靠. 操作方法 01 首先,在安装jdk的时候要安装源码,或许你没注意到, ...

  • Linux系统下怎么用CheckInstall从源码创建一个RPM或DEB包

    问题:我想要从源码创建安装的软件包。有没有一种方式从源码来创建和安装软件包,而不是运行“make install”?这样的话,以后如果我想,我可以容易的卸载程序。 如果你已经从它的源码运行“make ...

  • centos5 源码安装Nginx + mysql + apache + php的方法

    主要依照的是张宴的一篇博文《http://blog.s135.com/post/366/》,安装的软件是也是从他提供的列表中下载,这里都是tar.gz格式,如果更希望是rpm格式的话,提供一个网址《h ...

  • 使用zend加密源码(zend guard安装和破解)

    我使用的是Wampserver,其中php的版本是5.3.10。 Zend Guard的安装及破解 点击下载Zend Guard5.5.0,下载完成后,请自行傻瓜化安装 这里需要注意以下几点: 1、本 ...

  • 教大家保护源码防破解教程

    当今社会当程序员不容易。这点是有目共睹的。再好的程序源码被别人一盗努力就全废了。 自我介绍不必说了,来论坛也不是几天的事了。进入正题 准备一个手机内存卡。准备一个读卡器。然后准备一个火炉。let's ...