Netty好用的功能拓展

分类专栏:
Netty相关

文章标签:
Netty
原创

1.心跳检测

检测逻辑:

1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。
2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费
3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。

需求设计:

1) 客户端向服务端发送 “I am alive” , sleep一个随机时间,模拟空闲状态
2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数
3) 设定阈值,达到阈值时主动关闭连接

实现原理:

IdleStateHandler , 是netty提供的处理器

1)超过多长时间没有读 readerIdleTime
2) 超过多长时间没有写 writerIdleTime
3) 超过多长时间没有读和写 allIdleTime
底层实现检测的是 IdleStateEvent事件,通过管道传递给下一个handler处理,处理方法是userEventTriggered
其中IdleStateEvent事件,分为READER_IDLE、WRITER_IDLE、ALL_IDLE三大类

代码举例:

public class HeartBeatServer {

  public static void main(String[] args) {

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap
        .group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(
            new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new StringDecoder());
                pipeline.addLast(new StringEncoder());
                // 使用心跳检测处理器
                // 对应三种状态,读空闲 写空闲 读写空闲
                // 三个参数,读空闲的超时时间 写空闲的超时时间 读写空闲的超时时间
                // 最后一个参数是时间的单位
                pipeline.addLast(new IdleStateHandler(5, 10, 20, TimeUnit.SECONDS));
                // IdleStateHandler发现有空闲时会触发IdleStateEvent事件
                // 会推送给下一个handler的指定方法userEventTriggered进行事件的处理
                ch.pipeline().addLast(new HeartBeatServerHandler());
              }
            });
    System.out.println("服务端初始化完成");

    try {
      ChannelFuture future = serverBootstrap.bind(2020).sync();
      future.channel().closeFuture().sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
}
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {

  private int times = 0;

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("clent data: " + msg);
    if ("I am alive".equals(msg)) {
      ctx.writeAndFlush(Unpooled.copiedBuffer("over", CharsetUtil.UTF_8));
    }
  }

  /**
   * 处理心跳检测事件的方法
   *
   * @param ctx
   * @param evt
   * @throws Exception
   */
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

    IdleStateEvent event = (IdleStateEvent) evt;
    String eventDesc = null;
    switch (event.state()) {
      case READER_IDLE:
        eventDesc = "读空闲";
        break;
      case WRITER_IDLE:
        eventDesc = "写空闲";
        break;
      case ALL_IDLE:
        eventDesc = "读写空闲";
        break;
    }
    System.out.println(ctx.channel().remoteAddress() + " 发生超时事件 -- " + eventDesc);
    times++;

    if (times > 3) {
      System.out.println("空闲次数超过3次  关闭连接");
      ctx.writeAndFlush("you are out");
      ctx.channel().close();
    }
    //        super.userEventTriggered(ctx, evt);
  }
}
public class HeartBeatClient {

  public static void main(String[] args) {

    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap
        .group(group)
        .channel(NioSocketChannel.class)
        .handler(
            new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(new StringEncoder());
                ch.pipeline().addLast(new HeartBeatClientHandler());
              }
            });
    System.out.println("客户端初始化完成");

    try {
      ChannelFuture future = bootstrap.connect("127.0.0.1", 2020).sync();
      String data = "I am alive";

      while (future.channel().isActive()) {
        // 模拟空闲的状态  随机等待时间
        int num = new Random().nextInt(10);
        Thread.sleep(num * 1000);
        System.out.println("等待" + num + "秒后进行下次发送");
        future.channel().writeAndFlush(data);
      }
      future.channel().closeFuture().sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      group.shutdownGracefully();
    }
  }

  static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
      System.out.println("server data: " + msg);

      if ("you are out".equals(msg)) {
        ctx.channel().close();
      }
    }
  }
}

实现效果:

在这里插入图片描述 在这里插入图片描述

2.TCP粘包拆包

TCP是基于流的
当客户端发送多个包,服务端只收到一个包,此时发生了粘包
当客户端发送多个包,服务端也接收到多个包,但是包是不完整,多了或少了数据,此时发生了拆包
UDP不会发生拆包和粘包,UDP是基于报文的,在UDP的首部使用16bit存储报文的长度

TCP发生粘包和拆包的本质原因:

要发送的数据先经过TCP的缓冲区,还限制了最大报文长度
1)如果要发送的数据 > TCP剩余的缓冲区大小,发生拆包
2)如果要发送的数据 > 最大报文长度,发生拆包
3)如果要发送的数据 << TCP剩余的缓冲区大小,发生粘包
4)接收数据的应用层,没有及时读取缓冲区数据,也会发生粘包

解决办法:

1) 设置出消息的长度
2) 设置出消息的边界——分隔符

Netty提供的解码器,两类

1)基于长度的解码器,在包头部设置出数据的长度。(类似于UDP的头部处理)

LengthFieldBasedFrameDecoder 自定义长度的处理方式
FixedLengthFrameDecoder 固定长度的处理方式

2)基于分隔符的解码器

DelimiterBasedFrameDecoder 自定义分隔符的处理方式
LineBasedFrameDecoder 行尾("\n"或"\r\n")分隔符的处理方式

Demo逻辑

需求:客户端循环100次向服务端请求时间
1)第一种方式,传输的过程数据单位是字节流ByteBuf,需要自行处理分隔符以及数据的长度,此时会出现粘包和拆包的问题
2)第二种方式,使用LineBasedFrameDecoder,配合StringDecoder使用,传输的数据单位变成字符串,可以直接处理,保证业务逻辑上的包和真正传输的包是一致的
public class TimeServer {

    public static void main(String[] args) {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 基于分隔符的解码器
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            // 字符串解码器
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            // 启动
            ChannelFuture channelFuture = serverBootstrap.bind(6300).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    static class TimeServerHandler extends ChannelInboundHandlerAdapter {

        // 请求次数计数
        int count;

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//            ByteBuf buf = (ByteBuf) msg;
              // 声明数组接收其内容
//            byte[] req = new byte[buf.readableBytes()];
//            buf.readBytes(req);
            // 请求的长度-系统分隔符的长度=数据的长度 如:字节流 ABC/r/n = 5 - 2 = 3(真正的数据长度)
            // 将数组转为字符串后  截取
            // System.getProperty("line.separator") 代表系统所支持的分隔符
            //  windows和linux支持的不同
//            String data = new String(req, "UTF-8").substring(0,
//            req.length - System.getProperty("line.separator").length());

            String data = (String) msg;
            String timeStr = new Date().toString();
            String currentTime = "Query Data :" + data + "; current time is " + timeStr
                    + "; count is " + ++count;
            System.out.println(currentTime);
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            ctx.writeAndFlush(resp);
        }
    }
}
public class TimeClient {

    public static void main(String[] args) {

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {

                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            Channel channel = bootstrap.connect("127.0.0.1", 6300).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    static class TimeClientHandler extends ChannelInboundHandlerAdapter {

        private int count;
        private byte[] request;
        public TimeClientHandler() {
            request = ("query time" + System.getProperty("line.separator")).getBytes();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            System.out.println("channel active");
            ByteBuf message = null;
            for (int i = 0; i < 100; i++) {
                message = Unpooled.buffer(request.length);
                message.writeBytes(request);
                ctx.writeAndFlush(message);
            }
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//            super.channelRead(ctx, msg);
//            ByteBuf buf = (ByteBuf) msg;
//            byte[] tmp = new byte[buf.readableBytes()];
//            buf.readBytes(tmp);
//            String data = new String(tmp, "UTF-8");
            String data = (String) msg;
            System.out.println("data is : " + data + "; count is " + ++count);
        }
    }
}

实现效果:

在这里插入图片描述 在这里插入图片描述

3.序列化框架——protobuf

原生序列化:
public class UserInfo implements Serializable {

    private static final long serialVersionUID = 7607690012569645441L;

    private int ID;
    private String name;

    public UserInfo(int ID, String name) {
        this.ID = ID;
        this.name = name;
    }

    public int getID() {
        return ID;
    }

    public void setID(int ID) {
        this.ID = ID;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    /**
     * 根据buffer缓冲区模拟的序列化操作,获得字节数组的结果
     * @return
     */
    public byte[] codec() {

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 存储name数据
        byte[] value = this.name.getBytes();
        buffer.putInt(value.length);
        buffer.put(value);
        // 存储ID数据
        buffer.putInt(this.ID);
        // 写完成
        buffer.flip();
        value = null;
        // 读取并存储到result中
        byte[] result = new byte[buffer.remaining()];
        buffer.get(result);
        return result;
    }

    public static void main(String[] args) throws Exception {

        UserInfo userInfo = new UserInfo(1, "勒布朗詹姆斯");
        // 原生序列化
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(userInfo);
        oos.flush();
        oos.close();
        byte[] arr = bos.toByteArray();
        System.out.println("JDK序列化后 字节数组的长度:" + arr.length);
        bos.close();
        System.out.println("ByteBuffer转化为字节数组形式的长度:"
                + userInfo.codec().length);
    }
}
public class UserInfo2 implements Serializable {

    private static final long serialVersionUID = -253661239306911857L;

    private int ID;
    private String name;

    public UserInfo2(int ID, String name) {
        this.ID = ID;
        this.name = name;
    }

    public int getID() {
        return ID;
    }

    public void setID(int ID) {
        this.ID = ID;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    /**
     * 根据buffer缓冲区模拟的序列化操作获得字节数组的结果
     * @param buffer
     * @return
     */
    public byte[] codec(ByteBuffer buffer) {

        buffer.clear();
        // 存储name数据
        byte[] value = this.name.getBytes();
        buffer.putInt(value.length);
        buffer.put(value);
        // 存储ID数据
        buffer.putInt(this.ID);
        // 写完成
        buffer.flip();
        value = null;
        // 读取并存储到result中
        byte[] result = new byte[buffer.remaining()];
        buffer.get(result);
        return result;
    }

    public static void main(String[] args) throws Exception {

        UserInfo2 userInfo2 = new UserInfo2(1, "勒布朗詹姆斯");
        ByteArrayOutputStream bos = null;
        ObjectOutputStream oos = null;
        long startTime = System.currentTimeMillis();
        //循环1000000次
        //原生序列化
        for (int i = 0; i < 1000000; i++) {
            bos =  new ByteArrayOutputStream();
            oos = new ObjectOutputStream(bos);
            oos.writeObject(userInfo2);
            oos.flush();
            oos.close();
            byte[] arr = bos.toByteArray();
            bos.close();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("JDK序列化 耗时:" + (endTime - startTime) + "ms");
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        startTime = System.currentTimeMillis();
        for (int i = 0; i < 1000000; i++) {
            userInfo2.codec(buffer);
        }
        endTime = System.currentTimeMillis();
        System.out.println("ByteBuffer转化 耗时:" + (endTime - startTime) + "ms");
    }
}

实现结果:

在这里插入图片描述

在这里插入图片描述

protobuf = protocol buffers

类似于xml的生成和解析,但效率更高,生成的是字节码,可读性稍差

实现:

1)安装idea插件,protobuf support
如果安装之后,创建*.proto文件没有使用插件,手动设置关联关系
settings -> file types -> 找到protobuf -> 增加正则表达式 *.proto
2)引入maven依赖和插件
<properties>
    <os.detected.classifier>windows-x86_64</os.detected.classifier>
</properties>

<build>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.0</version>
                <configuration>
                    <protocArtifact>
                        com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
                    </protocArtifact>
                    <pluginId>grpc-java</pluginId>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
3)在右侧maven project中可以找到相应的插件 (没有的话刷新)

在这里插入图片描述

4)在和java平级的目录下,创建proto文件夹,然后创建person.proto文件
5)person.proto
// 声明包名称的空间
syntax="proto3";
// 具体的类生成目录
option java_package="com.pz";
// 具体的类名
option java_outer_classname="PersonModel";
// 类结构
message Person{
    int32 id = 1;    // 此处的1代表顺序
    string name = 2;
}
6) 使用插件进行编译,将编译生成的代码拷贝到需要的目录下
7)编写测例进行序列化和反序列化操作
public class ProtobufTest {

    public static void main(String[] args) throws Exception {

        //建造者模式创建对象
        PersonModel.Person.Builder builder = PersonModel.Person.newBuilder();
        builder.setId(3);
        builder.setName("勒布朗詹姆斯");
        PersonModel.Person person = builder.build();
        System.out.println(person);
        System.out.println("=====person bytes:");
        for(byte b : person.toByteArray()){
            System.out.print(b);
        }
        System.out.println();
        System.out.println("====================");
        byte[] byteArr = person.toByteArray();
        // 使用parseFrom方法  反向构造对象
        PersonModel.Person  person2 = PersonModel.Person.parseFrom(byteArr);
        System.out.println(person2.getId());
        System.out.println(person2.getName());
    }
}

在这里插入图片描述

图片来自本人CSDN

  • 作者:潘震
  • 评论

    留言