博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于Netty的RPC简易实现
阅读量:6037 次
发布时间:2019-06-20

本文共 7439 字,大约阅读时间需要 24 分钟。

代码地址如下:

http://www.demodashi.com/demo/13448.html

可以给你提供思路

也可以让你学到Netty相关的知识

当然,这只是一种实现方式

需求

看下图,其实这个项目就是为了做这样一件事。

有一个公共服务ServerA,它提供了一个名为getUserName的服务。

现在有多个类似ServerB的Web应用服务器。

当客户想通过ServerB要请求getUserName服务时,由于ServerB服务中因为没有UserService的实现类,导致不能正常提供服务的问题。

rpc原理

预期结果

可以看到,在Client项目中,UserService没有实现类,但是返回了正常的结果。

运行结果

项目结构

整个项目分为三个部分,Server端、Client端以及一个公共jar。

下图正是整个项目的目录结构图。

目录结构

公共部分

公共部分的存在是因为我将服务器端和客户端写在了一个项目中,为了不让代码重复警告,所以提出来的一个公共模块。主要是几个实体类和一些序列化工具类。

  • MsgPackDecoder

    • 该类是一个MsgPack编码器,主要作用将Object对象序列化成byte数组。
  • MsgPackEncoder

    • 该类是一个MsgPack解码器,主要作用将byte数组反序列化成Object对象。
  • ObjectCodec

    • 该类是继承了MessageToMessageCodec<ByteBuf, Object>。是自定义序列化类主要有两个方法,encode和decode

      @Override    protected void encode(ChannelHandlerContext ctx, Object msg, List out) {        // 调用工具类的序列化方法        byte[] data = ObjectSerializerUtils.serilizer(msg);        ByteBuf buf = Unpooled.buffer();        buf.writeBytes(data);        out.add(buf);    }    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) {        // 调用工具类的反序列化方法        byte[] bytes = new byte[msg.readableBytes()];        msg.readBytes(bytes);        Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes);        out.add(deSerilizer);    }
  • ObjectSerializerUtils

    • 序列化工具类。序列化方法可以自己实现
  • MethodInvokeMeta

    • 重点类。这个类是一个实体类。用于在网络中传输的类。主要有5个字段分别记录了一个接口的类对象,调用接口的方法名,方法的参数列表(包含参数类型,和参数列表),方法的返回值类型。
    • 在客户端中,这个类将调用方所要调用的方法封装。
    • 在服务端中,这个类主要用于服务器反射调用方法。
    • 当然,也可以用String来记录这些元信息。
  • NullWritable

    • 这个类主要用于在网络中传输null。当返回值为null时,服务端会返回NullWritable对象。客户端接收到NullWritable时进行null处理。
  • User

    • 实体对象。测试用例

客户端

上面的目录结构图也有提到,客户端中只有UserService接口,所以客户端中如果不做处理是不能正常运行的。

客户端中核心类有以下7个,其中与Netty相关的核心类与服务端一样有3个

  • ClientChannelHandlerAdapter
  • CustomChannelInitializerClient
  • NettyClient
  • RpcProxyFactoryBean
  • NettyBeanScanner
  • PackageClassUtils
  • WrapMethodUtils

NettyClient

这个类是Netty客户端的启动类,这个类中与Netty服务端进行通信

CustomChannelInitializerClient

这个类是用于初始化管道事件的类。主要添加了TCP粘包问题解决方案和自定义编解码器工具

ClientChannelHandlerAdapter

这个类是Netty客户端的处理类,主要通过这个类将调用信息写给Netty服务端。

NettyBeanScanner

这个类实现了BeanFactoryPostProcessor接口。BeanFactoryPostProcessor是Spring初始化Bean时对外暴露的扩展点。

Spring IoC容器允许BeanFactoryPostProcessor在容器实例化任何bean之前读取bean的定义(配置元数据),并可以修改它。同时可以定义多个BeanFactoryPostProcessor,通过设置'order'属性来确定各个BeanFactoryPostProcessor执行顺序。

在postProcessBeanFactory方法中,调用PackageClassUtils.resolver方法,将UserService.class注册到SpringBean工厂。

/**     * 注册Bean到Spring的bean工厂     */    @Override    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {        this.beanFactory = (DefaultListableBeanFactory) beanFactory;        // 加载远程服务的接口        List
resolverClass = PackageClassUtils.resolver(basePackage); for (String clazz : resolverClass) { String simpleName; if (clazz.lastIndexOf('.') != -1) { simpleName = clazz.substring(clazz.lastIndexOf('.') + 1); } else { simpleName = clazz; } BeanDefinitionBuilder gd = BeanDefinitionBuilder.genericBeanDefinition(RpcProxyFactoryBean.class); gd.addPropertyValue("interfaceClass", clazz); gd.addPropertyReference("nettyClient", clientName); this.beanFactory.registerBeanDefinition(simpleName, gd.getRawBeanDefinition()); } }

RpcProxyFactoryBean

重点类,这个类继承了AbstractFactoryBean

使用jdk动态代理的方式创建代理对象。

Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, this);

在invoke方法中,

调用WrapMethodUtils工具类中的方法,将代理对象方法封装成MethodInvokeMeta对象。

然后通过NettyClient传输给NettyServer端,进行RPC调用,并将结果返回。

至此,客户端的核心类介绍完毕。

服务端

服务端主要的核心类有三个

  • ServerChannelHandlerAdapter
  • RequestDispatcher
  • NettyServer
NettyServer

这个类主要有两个方法,一个是启动Netty服务的方法,一个是关闭服务器的方法。核心代码如下:

/**     * 启动netty服务的方法     */    public void start() {        // 服务器监听端口号        int port = serverConfig.getPort();        serverBootstrap.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                // BACKLOG用于构造服务端套接字ServerSocket对象,                // 标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。                // 如果未设置或所设置的值小于1,Java将使用默认值50                .option(ChannelOption.SO_BACKLOG, 100)                .handler(new LoggingHandler(LogLevel.INFO));        try {            // 设置事件处理            serverBootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 自定义长度解码器解决TCP黏包问题 // maxFrameLength 最大包字节大小,超出报异常 // lengthFieldOffset 长度字段的偏差 // lengthFieldLength 长度字段占的字节数 // lengthAdjustment 添加到长度字段的补偿值 // initialBytesToStrip 从解码帧中第一次去除的字节数 pipeline.addLast(new LengthFieldBasedFrameDecoder(65535 , 0, 2, 0, 2)); // LengthFieldPrepender编码器,它可以计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf的缓冲区头中 pipeline.addLast(new LengthFieldPrepender(2)); // 序列化工具 pipeline.addLast(new ObjectCodec()); pipeline.addLast(channelHandlerAdapter); } }); ChannelFuture f = serverBootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
ServerChannelHandlerAdapter

这个类主要重写了ChannelRead方法,在ChannelRead方法中调用了RequestDispatcher类中的dispatcher方法来处理消息。

RequestDispatcher

这个类的作用为,将ChannelRead方法中读到的数据(也可以说命令),通过反射来调用,执行对应方法,将执行后的结果写回通道,供客户端使用。

/**     * 分发命令     *     * @param channelHandlerContext channelHandlerContext     * @param invokeMeta            invokeMeta     */    public void dispatcher(final ChannelHandlerContext channelHandlerContext, final MethodInvokeMeta invokeMeta) {        ChannelFuture f = null;        try {            // 获取客户端准备调用的接口类            Class
interfaceClass = invokeMeta.getInterfaceClass(); // 获取准备调用的方法名称 String name = invokeMeta.getMethodName(); // 获取方法对应的参数列表 Object[] args = invokeMeta.getArgs(); // 获取参数类型 Class
[] parameterTypes = invokeMeta.getParameterTypes(); // 通过Spring获取对象 Object targetObject = app.getBean(interfaceClass); // 获取待调用方法 Method method = targetObject.getClass().getMethod(name, parameterTypes); // 执行方法 Object obj = method.invoke(targetObject, args); if (obj == null) { // 如果方法结果为空,将NULL结果写给客户端 f = channelHandlerContext.writeAndFlush(NullWritable.nullWritable()); } else { // 写给客户端结果 f = channelHandlerContext.writeAndFlush(obj); } // 释放通道,不是关闭连接 f.addListener(ChannelFutureListener.CLOSE); } catch (Exception e) { // 出现异常后的处理 f = channelHandlerContext.writeAndFlush(e.getMessage()); } finally { if (f != null) { f.addListener(ChannelFutureListener.CLOSE); } } }

使用方法

  1. 启动ServerApplication
  2. 启动ClientApplication
  3. 打开Chrome浏览器,输入:http://localhost:8080/client/get/name 或者 http://localhost:8080/client/get/info 即可看到结果。

如果想整合到现有项目中,请直接留言或者联系作者,此次并没有提供集成版本,但如果此篇文章已理解,那么自己可以手动的去集成到自己的项目中。基于Netty的RPC简易实现

代码地址如下:

http://www.demodashi.com/demo/13448.html

注:本文著作权归作者,由demo大师代发,拒绝转载,转载需要作者授权

你可能感兴趣的文章
gulp压缩合并js与css
查看>>
块级、内联、内联块级
查看>>
Predicate
查看>>
[面试题记录01]实现一个function sum达到一下目的
查看>>
这个季节的忧伤,点到为止
查看>>
mysql通过配置文件进行优化
查看>>
省级网站群建设关注点
查看>>
工作第四天之采集资源
查看>>
innobackupex 在增量的基础上增量备份
查看>>
Windows Server 2012 R2 DirectAccess功能测试(2)App1服务器安装及配置
查看>>
基于清单的启动器的实现
查看>>
外网用户通过citrix打印慢的解决方法
查看>>
STL容器的使用
查看>>
关于std::map
查看>>
JXL导出Excel文件兼容性问题
查看>>
VBoot1.0发布,Vue & SpringBoot 综合开发入门
查看>>
centos7 安装wps 后 演示无法启动
查看>>
git简单命令
查看>>
LAMP编译部署
查看>>
XenDesktop7.6安装部署入门教程
查看>>