在 spring boot 项目中集成 Netty 进行消息发送和接收

概述

本文通过一个简单的 HelloWorld demo 来介绍 Netty 的基本使用,其中包括了服务器端和客户端的启动代码,客户端向服务器端发送文本消息。

maven 依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.76.Final</version>
</dependency>

关键点

  1. 服务器端在启动的时候开放一个端口:19080
  2. 客户端在启动的时候通过 ip 和 端口连上服务器端
  3. 客户端和服务器端都通过 Channel 对象向彼此发送数据
  4. 服务器和客户端都通过继承 ChannelInboundHandlerAdapter 类实现对消息的读取和回写等操作
  5. 服务器和客户端都通过 StringDecoder 和 StringEncoder 实现对消息的解码和转码操作
  6. 服务器和客户端启动的时候都会阻塞当前线程,因此需要在一个单独的线程中进行启动

消息发送的例子

  • 本例是一个 spring boot web 项目,项目占用了 8080 端口
  • 服务器端在启动的时候开放 19080 端口(注意不要和 web 端口冲突了
  • 客户端在启动的时候连上服务器端
  • 通过 web api 向客户端发送数据,客户端再通过 Channel 对象向服务器端发送数据
  • 服务器接收到客户端数据后也通过 Channel 对象向客户端发送数据

server 服务器端

  • 通过 @PostConstruct 注解的方法进行启动,具体如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.ckjava.test.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.concurrent.ForkJoinPool;

@Slf4j
@Component
public class HelloWorldServer {

private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup();

public void startServer(int port) {
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {

protected void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new HelloWorldServerHandler());
}

}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
sbs.bind(port).addListener(future -> {
log.info(String.format("服务器启动成功,并监听端口:%s ", port));
});

} catch (Exception e) {
log.error("启动 netty 服务器端出现异常", e);
}
}

// 服务器端启动,并绑定 19080 端口
@PostConstruct
public void init() {
ForkJoinPool.commonPool().submit(() -> startServer(19080));
}

@PreDestroy
public void destroy() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
  • 服务器端 HelloWorldServerHandler 如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.ckjava.test.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HelloWorldServerHandler extends ChannelInboundHandlerAdapter {

// 服务器端读取到 客户端发送过来的数据,然后通过 Channel 回写数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info(String.format("服务器端读取到从客户端:%s 发送过来的数据:%s", ctx.channel().remoteAddress(), msg.toString()));
ctx.channel().writeAndFlush(String.format("server write:%s", msg));
}

// 捕获到异常的处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

}

client 客户端

  • 通过 @PostConstruct 注解的方法进行启动,具体如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.ckjava.test.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ForkJoinPool;

@Slf4j
@Component
public class HelloWorldClient {

static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

private final EventLoopGroup group = new NioEventLoopGroup();
private ChannelFuture mChannelFuture = null;
private final ThreadLocal<Channel> mChannel = new ThreadLocal<>();

public void startClient(String host, int port) {
// Configure the client.
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("decoder", new StringDecoder());
p.addLast("encoder", new StringEncoder());
p.addLast(new HelloWorldClientHandler());
}
});

mChannelFuture = b.connect(host, port).addListener(future -> {
log.info(String.format("客户端启动成功,并监听端口:%s ", port));
});
} catch (Exception e) {
log.error("启动 netty 客户端出现异常", e);
}
}

/**
* 客户端通过 Channel 对象向服务器端发送数据
* @param data 文本数据
*/
public void send(String data) {
try {
if (mChannel.get() == null) {
mChannel.set(mChannelFuture.channel());
}
mChannel.get().writeAndFlush(data);
} catch (Exception e) {
log.error(this.getClass().getName().concat(".send has error"), e);
}
}

// 客户端启动,并连上服务器端
@PostConstruct
public void init() {
ForkJoinPool.commonPool().submit(() -> startClient("127.0.0.1", 19080));
}

@PreDestroy
public void destroy() {
group.shutdownGracefully();
}

}
  • 客户端 HelloWorldClientHandler 实现如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.ckjava.test.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HelloWorldClientHandler extends ChannelInboundHandlerAdapter {

// 客户端激活监听
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("客户端激活!");
}

// 客户端读取从服务器端发送过来的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info(String.format("客户端读取从服务器端发送过来的数据:%s", msg));
}

// 捕获异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

}

web api 数据发送入口

  • 这里只是通过
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.ckjava.test.web;

import com.ckjava.test.client.HelloWorldClient;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
* @author ckjava
* @date 2022/4/18 23:50
*/
@Api
@RequestMapping(produces = "application/json;charset=utf-8")
@RestController
public class HelloNettyCtrl {

@Resource
private HelloWorldClient mHelloWorldClient;

@GetMapping("/nettyClient")
public void nettyClient(@RequestParam String data) throws Exception {
mHelloWorldClient.send(data);
}
}

测试

  • 执行如下请求
1
curl -X GET "http://localhost:8080/nettyClient?data=%E4%BD%A0%E5%A5%BD%20ckjava" -H "accept: application/json;charset=utf-8"
  • 输出如下
1
2
22:36:00.178 [nioEventLoopGroup-4-1] INFO  c.c.t.server.HelloWorldServerHandler - 服务器端读取到从客户端:/127.0.0.1:9196 发送过来的数据:你好 ckjava
22:36:00.178 [nioEventLoopGroup-2-1] INFO c.c.t.client.HelloWorldClientHandler - 客户端读取从服务器端发送过来的数据:server write:你好 ckjava

打赏

  • 微信

  • 支付宝