通过实际的案例详解 Netty 中 LengthFieldBasedFrameDecoder 的具体使用

概述

本文通过一个具体的案例来详细介绍 LengthFieldBasedFrameDecoder 的使用,在案例中通过对数据的封包和解包实现对数据的加密,压缩,解压,解密等操作。

关键点

  1. 服务器端在启动的时候开放一个端口:19080
  2. 客户端在启动的时候通过 ip 和 端口连上服务器端
  3. 客户端和服务器端都通过 Channel 对象向彼此发送数据
  4. 服务器和客户端都通过继承 ChannelInboundHandlerAdapter 类实现对 消息实体 的读取和回写等操作
  5. 服务器和客户端都通过 LengthFieldBasedFrameDecoder 和 CommonDataEncoder 实现对 消息实体 的解码和转码操作
  6. 服务器和客户端启动的时候都会阻塞当前线程,因此需要在一个单独的线程中进行启动
  7. 服务器和客户端都在 发送数据 前 进行加密和压缩
  8. 服务器和客户端都在 接收数据 后 进行解压和解密
  9. 服务器和客户端需要确保发送的数据的字节长度不能超过 LengthFieldBasedFrameDecoder 设置的最大帧长度

使用案例介绍

  1. 本例是一个 spring boot web 项目,项目占用了 8080 端口
  2. 服务器端在启动的时候开放 19080 端口(注意不要和 web 端口冲突了)
  3. 客户端在启动的时候连上服务器端
  4. 通过 web api 向客户端发送数据,客户端再通过 Channel 对象向服务器端发送数据
  5. 服务器接收到客户端数据后也通过 Channel 对象向客户端发送数据
  6. 客户端和服务器端通过 CommonData 对象进行消息的封装和传输,并通过 LengthFieldBasedFrameDecoder 实现对数据的解码。

消息实体 CommonData

  • CommonData 的定义如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class CommonData {
// byte 表示的数据范围是 -128到127 之间,因此可以直接通过将 -128到127的数字通过 (byte)[-128 - 127] 方式强制转换
// 类型 系统编号 1 表示A系统,2 表示B系统
private byte type;
// 信息标志 1 表示心跳包 2 表示超时包 3 业务信息包
private byte flag;
// 主题信息的字节长度,
private int length;
// 主题信息
private String body;
}

LengthFieldBasedFrameDecoder 构造方法介绍

1
2
3
4
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip)

那么这几个重要的参数如下:

  • maxFrameLength:最大帧长度。也就是可以接收的数据的最大长度。如果超过,此次数据会被丢弃。
  • lengthFieldOffset:长度域偏移。就是说数据开始的几个字节可能不是表示数据长度,需要后移几个字节才是长度域。
  • lengthFieldLength:长度域字节数。用几个字节来表示数据长度。
  • lengthAdjustment:数据长度修正。因为长度域指定的长度可以使 header+body 的整个长度,也可以只是 body 的长度。如果表示 header+body 的整个长度,那么我们需要修正数据长度。
  • initialBytesToStrip:跳过的字节数。如果你需要接收 header+body 的所有数据,此值就是 0,如果你只想接收 body 数据,那么需要跳过 header 所占用的字节数。

LengthFieldBasedFrameDecoder 的构造参数

LengthFieldBasedFrameDecoder 是 Netty 中解决拆包粘包问题的一个重要的类,主要结构就是 header+body 结构。只需要传入正确的参数就可以发送和接收正确的数据。

  • 根据 CommonData 的定义,可以知道对应 LengthFieldBasedFrameDecoder 的构造参数如下
1
2
3
4
5
6
7
private static final int MAX_FRAME_LENGTH = 1024 * 1024;
private static final int LENGTH_FIELD_OFFSET = 2; // 第一个字节是 type, 第二个字节是 flag, 因此偏移是 2
private static final int LENGTH_FIELD_LENGTH = 4; // 存储数据字节长度的是 int 类型,含有四个字节。
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = 0;

ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP, true));

对消息实体 CommonData 进行加密,压缩,解压,解密等操作

  1. 将 对象转成 加密,压缩 后的字节数组
  2. 将 字节数组 解压,解密 后转成 java 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 将 对象转成 加密,压缩后的字节数组
*
* @return bytes 数组
*/
public static ByteBuf toBytes(CommonData commonData) {
// 1 对 body 进行加密
byte[] bodyBytes = commonData.getBody().getBytes(StandardCharsets.UTF_8);
byte[] bodyEncryptBytes = AESUtils.encrypt(bodyBytes, "123456");
// 2 对 body 进行压缩
byte[] bodyEncryptGzipBytes = GZIPUtils.compress(bodyEncryptBytes);

ByteBuf byteBuf = Unpooled.directBuffer(1+1+4+bodyEncryptGzipBytes.length);
byteBuf.writeByte(commonData.getType());
byteBuf.writeByte(commonData.getFlag());
byteBuf.writeInt(bodyEncryptGzipBytes.length);
byteBuf.writeBytes(bodyEncryptGzipBytes);
int size = byteBuf.capacity();
return byteBuf;
}

/**
* 将字节数组 解压,解密后转成 java 对象
*
* @param byteBuf ByteBuf 对象
* @return CommonData
*/
public static CommonData fromBytes(ByteBuf byteBuf) {
byte type = byteBuf.readByte();
byte flag = byteBuf.readByte();
int length = byteBuf.readInt();
byte[] bodyBytes = new byte[length];
byteBuf.readBytes(bodyBytes);
// 1 对 body 解压
byte[] bodyUnCompressBytes = GZIPUtils.uncompress(bodyBytes);
// 2 对 body 解密
byte[] bodyDecryptBytes = AESUtils.decrypt(bodyUnCompressBytes, "123456");
return CommonData.builder().type(type).flag(flag).length(length).body(new String(bodyDecryptBytes, StandardCharsets.UTF_8)).build();
}

通用消息转码类 CommonDataEncoder

  1. 服务器端和客户端在 业务处理类 都是直接将 消息体 直接写入的 channel 中的
  2. 在 CommonDataEncoder 类中才会对 消息体 进行 加密和压缩处理
  3. 服务器端和客户端分别在 业务处理类 中对 消息体 进行解压和解密处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.ckjava.test.netty.encoder;

import com.ckjava.test.netty.vo.CommonData;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
* @author ckjava
* @date 2022/4/23 21:26
*/
public class CommonDataEncoder extends MessageToByteEncoder<CommonData> {

/**
* 将数据转成字节
* @param ctx
* @param msg
* @param out
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext ctx, CommonData msg, ByteBuf out) throws Exception {
if (null == msg) {
throw new Exception("msg is null");
}
out.writeBytes(CommonData.toBytes(msg));
}
}

服务器端启动类

  • 通过 init 方法启动 Netty Server
  • 通过 childHandler 绑定 CommonDataEncoder,LengthFieldBasedFrameDecoder,CommonServerDataHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.ckjava.test.server;

import com.ckjava.test.netty.encoder.CommonDataEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.concurrent.ForkJoinPool;

@Slf4j
@Component
public class CommonNettyServer {

private static final int MAX_FRAME_LENGTH = 1024 * 1024;
private static final int LENGTH_FIELD_OFFSET = 2; // 第一个字节是 type, 第二个字节是 flag, 因此偏移是 2
private static final int LENGTH_FIELD_LENGTH = 4; // 存储数据字节长度的是 int 类型,含有四个字节。
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = 0;

private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup();

public void startServer(int port) {
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) {
ch.pipeline().addLast(new CommonDataEncoder());
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP, true));
ch.pipeline().addLast(new CommonServerDataHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
sbs.bind(port).addListener(future -> {
log.info(String.format("服务器启动成功,并监听端口:%s ", port));
});

} catch (Exception e) {
log.error("启动 netty 服务器端出现异常", e);
}
}

// 服务器端启动,并绑定 19080 端口
@PostConstruct
public void init() {
ForkJoinPool.commonPool().submit(() -> startServer(19080));
}

@PreDestroy
public void destroy() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

服务器端业务处理类

  1. 继承 SimpleChannelInboundHandler 类,用于接收客户端发送的消息体,并向客户端返回数据。
  2. 通过 CommonData.fromBytes(byteBuf) 将接收的数据进行解压,解密
  3. 向客户端返回数据由 CommonDataEncoder 进行加密和压缩处理
  4. 通过 handleMap 封装具体的业务逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.ckjava.test.server;

import com.ckjava.test.netty.vo.CommonData;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

/**
* @author ckjava
* @date 2022/4/23 21:31
*/
@Slf4j
public class CommonServerDataHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
CommonData commonData = CommonData.fromBytes(byteBuf);
log.info(String.format("服务器端从客户端读取的数据:%s", commonData.toString()));
String key = String.valueOf(commonData.getType()).concat(String.valueOf(commonData.getFlag()));
if (Objects.nonNull(handleMap.get(key))) {
ctx.channel().writeAndFlush(handleMap.get(key).apply(commonData));
} else {
CommonData r = handleMap.get(null).apply(commonData);
ctx.channel().writeAndFlush(r);
}
}

// 捕获到异常的处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

private static Map<String, Function<CommonData, CommonData>> handleMap = new HashMap<String, Function<CommonData, CommonData>>() {{
// 业务系统 1 的心跳
put("11", commonData -> {
String data = "心跳 ok";
commonData.setLength(data.getBytes(StandardCharsets.UTF_8).length);
commonData.setBody(data);
return commonData;
});

// 业务系统 1 的 超时处理逻辑
put("12", commonData -> {
String data = "超时处理 ok";
commonData.setLength(data.getBytes(StandardCharsets.UTF_8).length);
commonData.setBody(data);
return commonData;
});

// 业务系统 1 的业务处理逻辑
put("13", commonData -> {
String data = "业务处理 ok";
commonData.setLength(data.getBytes(StandardCharsets.UTF_8).length);
commonData.setBody(data);
return commonData;
});

// 业务系统 1 的心跳
put("21", commonData -> {
String data = "心跳 ok";
commonData.setLength(data.getBytes(StandardCharsets.UTF_8).length);
commonData.setBody(data);
return commonData;
});

// 业务系统 1 的 超时处理逻辑
put("22", commonData -> {
String data = "超时处理 ok";
commonData.setLength(data.getBytes(StandardCharsets.UTF_8).length);
commonData.setBody(data);
return commonData;
});

// 业务系统 1 的业务处理逻辑
put("23", commonData -> {
String data = "业务处理 ok";
commonData.setLength(data.getBytes(StandardCharsets.UTF_8).length);
commonData.setBody(data);
return commonData;
});

// 业务系统 1 的业务处理逻辑
put(null, commonData -> {
String data = String.format("服务:%s 不支持,result:%s", commonData.getType(), "no available");
commonData.setLength(data.getBytes(StandardCharsets.UTF_8).length);
commonData.setBody(data);
return commonData;
});
}};

}

客户端启动类

  1. 通过 init 方法启动客户端
  2. 通过 childHandler 绑定 CommonDataEncoder,LengthFieldBasedFrameDecoder,CommonServerDataHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.ckjava.test.client;

import com.ckjava.test.netty.encoder.CommonDataEncoder;
import com.ckjava.test.netty.vo.CommonData;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ForkJoinPool;

@Slf4j
@Component
public class CommonNettyClient {

private static final int MAX_FRAME_LENGTH = 1024 * 1024;
private static final int LENGTH_FIELD_OFFSET = 2; // 第一个字节是 type, 第二个字节是 flag, 因此偏移是 2
private static final int LENGTH_FIELD_LENGTH = 4; // 存储数据字节长度的是 int 类型,含有四个字节。
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = 0;

private final EventLoopGroup group = new NioEventLoopGroup();
private ChannelFuture mChannelFuture = null;
private final ThreadLocal<Channel> mChannel = new ThreadLocal<>();

public void startClient(String host, int port) {
// Configure the client.
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new CommonDataEncoder());
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP, true));
ch.pipeline().addLast(new CommonClientDataHandler());
}
});

mChannelFuture = b.connect(host, port).addListener(future -> {
log.info(String.format("客户端启动成功,连接端口:%s ", port));
});
} catch (Exception e) {
log.error("启动 netty 客户端出现异常", e);
}
}

/**
* 客户端通过 Channel 对象向服务器端发送数据
* @param data 文本数据
*/
public void send(CommonData data) {
try {
if (mChannel.get() == null) {
mChannel.set(mChannelFuture.channel());
}
data.setLength(data.getBody().getBytes(StandardCharsets.UTF_8).length);
mChannel.get().writeAndFlush(data);
} catch (Exception e) {
log.error(this.getClass().getName().concat(".send has error"), e);
} finally {
mChannel.get().closeFuture();
}
}

// 客户端启动,并连上服务器端
@PostConstruct
public void init() {
ForkJoinPool.commonPool().submit(() -> startClient("127.0.0.1", 19080));
}

@PreDestroy
public void destroy() {
group.shutdownGracefully();
}

}

客户端业务处理类

  1. 通过 channelRead0 方法读取服务器端发送过来的数据
  2. 通过 channelActive 方法在启动的时候主动向服务器端发送消息
  3. 在接收消息的时候需要进行解压,解密操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.ckjava.test.client;

import com.ckjava.test.netty.vo.CommonData;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
* @author ckjava
* @date 2022/4/23 21:33
*/
@Slf4j
public class CommonClientDataHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
log.info(String.format("客户端读取从服务器端发送过来的数据:%s", CommonData.fromBytes(byteBuf).toString()));
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
String data = "Hello,Netty";
CommonData commonData = CommonData.builder()
.type((byte) 1)
.flag((byte) 1)
.length(data.getBytes(StandardCharsets.UTF_8).length)
.body(data)
.build();
ctx.writeAndFlush(commonData);
}

// 捕获到异常的处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

通过 api 向 Netty client 发送数据

这里为了测试方便,通过 api 向 Netty client 发送数据,Netty client 再将数据发送给 服务器端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.ckjava.test.web;

import com.ckjava.test.client.CommonNettyClient;
import com.ckjava.test.netty.vo.CommonData;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
* @author ckjava
* @date 2022/4/18 23:50
*/
@Api
@RequestMapping(produces = "application/json;charset=utf-8")
@RestController
public class CommonDataCtrl {

@Resource
private CommonNettyClient mHelloWorldClient;

@PostMapping("/nettyClient")
public void nettyClient(@RequestBody CommonData data) {
mHelloWorldClient.send(data);
}
}

测试

  1. 通过 api 发送数据如下
1
curl -X POST "http://localhost:8080/nettyClient" -H "accept: application/json;charset=utf-8" -H "Content-Type: application/json" -d "{ \"body\": \"ckjava\", \"flag\": 1, \"length\": 0, \"type\": 1}"
  1. 输出如下
1
2
3
4
5
6
7
8
9
10
16:48:09.471 [nioEventLoopGroup-3-1] INFO  c.c.test.server.CommonNettyServer - 服务器启动成功,并监听端口:19080
16:48:09.478 [nioEventLoopGroup-2-1] INFO c.c.test.client.CommonNettyClient - 客户端启动成功,连接端口:19080
16:48:09.928 [nioEventLoopGroup-4-1] INFO c.c.t.server.CommonServerDataHandler - 服务器端从客户端读取的数据:CommonData(type=1, flag=1, length=37, body=Hello,Netty)
16:48:09.929 [nioEventLoopGroup-2-1] INFO c.c.t.client.CommonClientDataHandler - 客户端读取从服务器端发送过来的数据:CommonData(type=1, flag=1, length=37, body=心跳 ok)
16:48:13.449 [http-nio-8080-exec-1] INFO o.a.tomcat.util.http.parser.Cookie - A cookie header was received [1650709813,1651396672,1651397189] that contained an invalid cookie. That cookie will be ignored.Note: further occurrences of this error will be logged at DEBUG level.
16:48:13.456 [http-nio-8080-exec-1] INFO o.a.c.c.C.[Tomcat].[localhost].[/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
16:48:13.456 [http-nio-8080-exec-1] INFO o.s.web.servlet.DispatcherServlet - Initializing Servlet 'dispatcherServlet'
16:48:13.465 [http-nio-8080-exec-1] INFO o.s.web.servlet.DispatcherServlet - Completed initialization in 9 ms
16:48:13.534 [nioEventLoopGroup-4-1] INFO c.c.t.server.CommonServerDataHandler - 服务器端从客户端读取的数据:CommonData(type=1, flag=1, length=37, body=ckjava)
16:48:13.536 [nioEventLoopGroup-2-1] INFO c.c.t.client.CommonClientDataHandler - 客户端读取从服务器端发送过来的数据:CommonData(type=1, flag=1, length=37, body=心跳 ok)

打赏

  • 微信

  • 支付宝