public class HeartBeatServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 使用心跳检测处理器
// 对应三种状态,读空闲 写空闲 读写空闲
// 三个参数,读空闲的超时时间 写空闲的超时时间 读写空闲的超时时间
// 最后一个参数是时间的单位
pipeline.addLast(new IdleStateHandler(5, 10, 20, TimeUnit.SECONDS));
// IdleStateHandler发现有空闲时会触发IdleStateEvent事件
// 会推送给下一个handler的指定方法userEventTriggered进行事件的处理
ch.pipeline().addLast(new HeartBeatServerHandler());
}
});
System.out.println("服务端初始化完成");
try {
ChannelFuture future = serverBootstrap.bind(2020).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
private int times = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("clent data: " + msg);
if ("I am alive".equals(msg)) {
ctx.writeAndFlush(Unpooled.copiedBuffer("over", CharsetUtil.UTF_8));
}
}
/**
* 处理心跳检测事件的方法
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventDesc = null;
switch (event.state()) {
case READER_IDLE:
eventDesc = "读空闲";
break;
case WRITER_IDLE:
eventDesc = "写空闲";
break;
case ALL_IDLE:
eventDesc = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + " 发生超时事件 -- " + eventDesc);
times++;
if (times > 3) {
System.out.println("空闲次数超过3次 关闭连接");
ctx.writeAndFlush("you are out");
ctx.channel().close();
}
// super.userEventTriggered(ctx, evt);
}
}
public class HeartBeatClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new HeartBeatClientHandler());
}
});
System.out.println("客户端初始化完成");
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 2020).sync();
String data = "I am alive";
while (future.channel().isActive()) {
// 模拟空闲的状态 随机等待时间
int num = new Random().nextInt(10);
Thread.sleep(num * 1000);
System.out.println("等待" + num + "秒后进行下次发送");
future.channel().writeAndFlush(data);
}
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("server data: " + msg);
if ("you are out".equals(msg)) {
ctx.channel().close();
}
}
}
}
public class TimeServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 基于分隔符的解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 字符串解码器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
// 启动
ChannelFuture channelFuture = serverBootstrap.bind(6300).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
static class TimeServerHandler extends ChannelInboundHandlerAdapter {
// 请求次数计数
int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf buf = (ByteBuf) msg;
// 声明数组接收其内容
// byte[] req = new byte[buf.readableBytes()];
// buf.readBytes(req);
// 请求的长度-系统分隔符的长度=数据的长度 如:字节流 ABC/r/n = 5 - 2 = 3(真正的数据长度)
// 将数组转为字符串后 截取
// System.getProperty("line.separator") 代表系统所支持的分隔符
// windows和linux支持的不同
// String data = new String(req, "UTF-8").substring(0,
// req.length - System.getProperty("line.separator").length());
String data = (String) msg;
String timeStr = new Date().toString();
String currentTime = "Query Data :" + data + "; current time is " + timeStr
+ "; count is " + ++count;
System.out.println(currentTime);
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
}
}
public class TimeClient {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
Channel channel = bootstrap.connect("127.0.0.1", 6300).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
static class TimeClientHandler extends ChannelInboundHandlerAdapter {
private int count;
private byte[] request;
public TimeClientHandler() {
request = ("query time" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel active");
ByteBuf message = null;
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(request.length);
message.writeBytes(request);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
// ByteBuf buf = (ByteBuf) msg;
// byte[] tmp = new byte[buf.readableBytes()];
// buf.readBytes(tmp);
// String data = new String(tmp, "UTF-8");
String data = (String) msg;
System.out.println("data is : " + data + "; count is " + ++count);
}
}
}
public class UserInfo implements Serializable {
private static final long serialVersionUID = 7607690012569645441L;
private int ID;
private String name;
public UserInfo(int ID, String name) {
this.ID = ID;
this.name = name;
}
public int getID() {
return ID;
}
public void setID(int ID) {
this.ID = ID;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
/**
* 根据buffer缓冲区模拟的序列化操作,获得字节数组的结果
* @return
*/
public byte[] codec() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 存储name数据
byte[] value = this.name.getBytes();
buffer.putInt(value.length);
buffer.put(value);
// 存储ID数据
buffer.putInt(this.ID);
// 写完成
buffer.flip();
value = null;
// 读取并存储到result中
byte[] result = new byte[buffer.remaining()];
buffer.get(result);
return result;
}
public static void main(String[] args) throws Exception {
UserInfo userInfo = new UserInfo(1, "勒布朗詹姆斯");
// 原生序列化
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(userInfo);
oos.flush();
oos.close();
byte[] arr = bos.toByteArray();
System.out.println("JDK序列化后 字节数组的长度:" + arr.length);
bos.close();
System.out.println("ByteBuffer转化为字节数组形式的长度:"
+ userInfo.codec().length);
}
}
public class UserInfo2 implements Serializable {
private static final long serialVersionUID = -253661239306911857L;
private int ID;
private String name;
public UserInfo2(int ID, String name) {
this.ID = ID;
this.name = name;
}
public int getID() {
return ID;
}
public void setID(int ID) {
this.ID = ID;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
/**
* 根据buffer缓冲区模拟的序列化操作获得字节数组的结果
* @param buffer
* @return
*/
public byte[] codec(ByteBuffer buffer) {
buffer.clear();
// 存储name数据
byte[] value = this.name.getBytes();
buffer.putInt(value.length);
buffer.put(value);
// 存储ID数据
buffer.putInt(this.ID);
// 写完成
buffer.flip();
value = null;
// 读取并存储到result中
byte[] result = new byte[buffer.remaining()];
buffer.get(result);
return result;
}
public static void main(String[] args) throws Exception {
UserInfo2 userInfo2 = new UserInfo2(1, "勒布朗詹姆斯");
ByteArrayOutputStream bos = null;
ObjectOutputStream oos = null;
long startTime = System.currentTimeMillis();
//循环1000000次
//原生序列化
for (int i = 0; i < 1000000; i++) {
bos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(bos);
oos.writeObject(userInfo2);
oos.flush();
oos.close();
byte[] arr = bos.toByteArray();
bos.close();
}
long endTime = System.currentTimeMillis();
System.out.println("JDK序列化 耗时:" + (endTime - startTime) + "ms");
ByteBuffer buffer = ByteBuffer.allocate(1024);
startTime = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
userInfo2.codec(buffer);
}
endTime = System.currentTimeMillis();
System.out.println("ByteBuffer转化 耗时:" + (endTime - startTime) + "ms");
}
}
<properties>
<os.detected.classifier>windows-x86_64</os.detected.classifier>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
// 声明包名称的空间
syntax="proto3";
// 具体的类生成目录
option java_package="com.pz";
// 具体的类名
option java_outer_classname="PersonModel";
// 类结构
message Person{
int32 id = 1; // 此处的1代表顺序
string name = 2;
}
public class ProtobufTest {
public static void main(String[] args) throws Exception {
//建造者模式创建对象
PersonModel.Person.Builder builder = PersonModel.Person.newBuilder();
builder.setId(3);
builder.setName("勒布朗詹姆斯");
PersonModel.Person person = builder.build();
System.out.println(person);
System.out.println("=====person bytes:");
for(byte b : person.toByteArray()){
System.out.print(b);
}
System.out.println();
System.out.println("====================");
byte[] byteArr = person.toByteArray();
// 使用parseFrom方法 反向构造对象
PersonModel.Person person2 = PersonModel.Person.parseFrom(byteArr);
System.out.println(person2.getId());
System.out.println(person2.getName());
}
}
评论