Netty实现打招呼

分类专栏:
Netty相关

文章标签:
Java自学
Netty
原创

Netty实现打招呼


NioEventLoopGroup是对应Reactor的事件循环组,组中包含很多个事件循环,每一个都有selector用于监听


BossGroup:负责接收客户端的连接
1)轮询是否有accept事件发生
2)处理事件,并且和客户端建立连接 将事件注册到WorkGroup中,使用selector进行后续监听
3)处理任务队列
WorkGroup:负责网络的读写
1)轮询查找是否有读写事件发生
2)找到NioSocketChannel
3)处理任务队列

具体实现

1)创建group服务端两个分别为boss和worker, 客户端一个
2)启动对象初始化ServerBootstrap和Bootstrap
3)服务端先后设置其中的线程组group,通道channel,处理器handler,客户端通道对应的处理器
自定义handler的设置,需要先有通道初始化器ChannelInitializer,实现其中的通道初始化方法,具体逻辑为获取通道中的管道,然后加入

其中handel的逻辑

a)继承ChannelInboundHandlerAdapter(netty提供的适配器)
b)重写其中的方法,ChannelActive,channelRead,channelReadComplete,分别对应于通道创建,读事件发生,读时间完成三个时间点
c)方法的参数有一个ChannelHandelContext,是处理器的上下文,除了获取通道和管道外,可以调用writeAndFlush()直接写入数据
4)绑定端口号(服务端),或者连接指定ip地址+端口号(客户端)
5)关闭group

###实现代码

创建NettyServer,NettyClient,NettyServerHandler,NettyClientHandler四个类。分别是服务端,客户端,以及各自自定义handel
/**
 * 1.EventLoopGroup是对应Reactor的事件循环组
 * 2.ServerBootstrap是配置参数的启动对象
 * 3.客户端通道需要使用childHandler设置,设置时需要创建ChannelInitializer并且声明其泛型
 * 4.实现初始化方法时拿到管道增加自定义的处理器
 * 5.异步启动并且关闭方式也设置为异步
 */
public class NettyServer {

    public static void main(String[] args) {

        //创建两个Reactor构建主从Reactor模型
        //用来管理channel监听事件,是无限循环的事件组(线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 服务端的引导程序/启动对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 设置相关的参数
        // 方法返回类型为对象自身,这提供了链式编程的使用方法
        serverBootstrap.group(bossGroup, workerGroup)
                //声明当前使用的通道类型
                //netty                        nio                    bio
                //NioServerSocketChannel  <-  ServerSocketChannel  <- ServerSocket
                //底层是通过反射进行调用的
                .channel(NioServerSocketChannel.class)
                //设置前面通道的处理器  使用netty提供的日志打印处理器
                .handler(new LoggingHandler(LogLevel.INFO))
                //定义客户端连接处理器的使用
                //此方法需要参数ChannelInitializer通道初始化器
                //初始化器要处理的是客户端通道,所以泛型设置为SocketChannel
                //此类为抽象类,需要实现其抽象方法initChannel
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //通过channel获取管道pipeline
                        //通道代表的是连接的角色,管道代表的是处理业务的逻辑管理
                        //管道相当于链表,将不同的处理器连接起来,管理的是处理器的顺序
                        //使用时常常使用尾插法addLast,将处理器增加至尾部
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                });
        System.out.println("服务端初始化完成");

        // 启动并设置端口号,但需要使用sync异步启动
        try {
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            //将关闭通道的方式 也设置为异步的
            //阻塞finally中的代码执行
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}



/**
 * 自定义handler的方式之一
 * 继承ChannelInboundHandlerAdapter
 * 提供了在不同时期会触发的方法
 * channelActive,channelRead,channelReadComplete
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 通道被启用,刚刚建立连接要使用的方法
     * channelHandlerContext是通道处理器的上下文
     * 可以整合使用过程中所需的参数,比如通道channel,管道pipeline写数据等等
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //写数据时可以调用writeAndFlush直接写入字符串
        //底层还是封装了  获取通道 - 创建缓冲区 - 写入数据 - 缓冲区写入通道 等流程
        System.out.println("channelActive done");
        ctx.writeAndFlush("Welcome to Netty Server");
//        super.channelActive(ctx);
    }

    /**
     * 数据读取的方法
     * 当客户端发送消息时,读事件触发的方法
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //netty中的缓冲区叫做ByteBuf(对ByteBuffer的封装)
        ByteBuf buf = (ByteBuf) msg;
        //直接设定编码格式CharsetUtil.UTF_8,其中CharsetUtil是netty提供的编码格式工具类
        System.out.println("client msg : " + buf.toString(CharsetUtil.UTF_8));
        //可以定位到客户端的远程地址,通过通道的remoteAddress方法
        System.out.println("client is from " + ctx.channel().remoteAddress());
//        super.channelRead(ctx, msg);
    }

    /**
     * 数据读取完成触发的方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //数据的处理还是使用ByteBuf,Unpooled是提供在ByteBuf和string之间方便转换的工具类
        //Unpooled的常用方法,copiedBuffer直接处理String返回ByteBuf
        ctx.writeAndFlush(Unpooled.copiedBuffer("Fuck Life",CharsetUtil.UTF_8));
//        super.channelReadComplete(ctx);
    }
}



public class NettyClient {

    public static void main(String[] args) {

        // 客户端只需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();

        // 客户端的启动对象
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new NettyClientHandler());
                    }
                });
        System.out.println("客户端初始化完成");
        try {
            ChannelFuture future = bootstrap.connect("127.0.0.1", 8888).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}



public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive done");
        ctx.writeAndFlush(Unpooled.copiedBuffer("Always Believe", CharsetUtil.UTF_8));
//        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead done");
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("server msg:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("server address: " + ctx.channel().remoteAddress());
//        super.channelRead(ctx, msg);
    }
}

实现效果

客户端

服务端

图片来自我的CSDN博客

  • 作者:潘震
  • 评论

    留言