0%

深入理解Netty

Netty常见使用场景

Netty执行流程分析与重要组件介绍

Netty的三个应用场景:

  • Netty作为RPC通信的框架/协议/库实现了实现了基于Socket的远程过程调用

  • Netty可以作为一个基于WebSocket长连接的服务器实现客户端与服务端长连接的通信

  • Netty可以充当HTTP服务器(但不遵循Servlet规范)

模拟HTTP服务器的第一个程序

编写第一个程序,客户端发起不带参数请求,服务端返回Hello World

TestServer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}

TestServerInitializer类

1
2
3
4
5
6
7
8
9
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("httpServerCodec", new HttpServerCodec());
pipeline.addLast("testHttpServerHandler", new TestHttpServerHandler());
}
}

TestHttpServerHandler类

1
2
3
4
5
6
7
8
9
10
11
12
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { // 读入客户端发送的请求,并向客户端返回响应的方法
ByteBuf content = Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1,
HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

ctx.writeAndFlush(response);
}
}

bossGroup 线程组负责接受客户端的连接,但是不对连接做任何处理,之后将这个连接转给workerGroup, workerGroup完成对连接的后续处理,获取连接的参数,进行业务处理,将结果返回给客户端等…..

ServerBootstrap()是Netty用来帮助简化服务端启动的类, 关联处理器

TestServerInitializer会在channel 注册好之后自动创建并执行

TestHttpServerHandler读入客户端发送的请求,并向客户端返回响应的方法

Netty回调与Channel执行流程分析

改进代码进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { // 读入客户端发送的请求,并向客户端返回响应的方法
System.out.println(msg.getClass());
System.out.println(ctx.channel().remoteAddress());

if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
System.out.println("请求方法名: " + httpRequest.method().name());

URI uri = new URI(httpRequest.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println(uri.getPath());
return;
}

ByteBuf content = Unpooled.copiedBuffer("Hello World\n", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1,
HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

ctx.writeAndFlush(response);
ctx.channel().close(); // 进行连接关闭
}
}


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel active !");
super.channelActive(ctx);
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel registered");
super.channelRegistered(ctx);
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler added");
super.handlerAdded(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel inactive");
super.channelInactive(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel unregistered");
super.channelUnregistered(ctx);
}
}

Netty的Socket编程实现客户端连接与通信

客户端与服务端代码类似,建立连接进行通信

客户端程序

MyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MyClient {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new MyClientInitializer() );
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().closeFuture().sync();

} finally {
eventLoopGroup.shutdownGracefully();
}
}
}

MyClientInitializer

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyClientHandler());
}
}

MyClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MyClientHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress());
System.out.println("client output: " + msg);
ctx.writeAndFlush("from client: " + LocalDateTime.now());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("来自于客户端的问候!");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

Netty实现聊天室程序

服务端

MyChatServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MyChatServer {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyChatServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
MyChatServerInitializer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyChatServerHandler());


}
}
MyChatServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {

private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
channelGroup.forEach(ch -> {
if (channel != ch) {
ch.writeAndFlush(channel.remoteAddress() + "发送的消息: " + msg + "\n");
} else {
ch.writeAndFlush("【自己】: " + msg + "\n");
}
});
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + "加入\n");
channelGroup.add(channel);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + "离开\n");
//channelGroup.remove(channel); // 会自动调用
System.out.println("剩余的用户数量: " + channelGroup.size());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "上线\n");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "下线\n");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

客户端

MyChatClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyChatClient {
public static void main(String[] args) throws InterruptedException, IOException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new MyChatClientInitializer());

Channel channel = bootstrap.connect("localhost", 8899).sync().channel();

BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for (; ;) {
channel.writeAndFlush(br.readLine() + "\r\n");
}
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
MyChatClientInitializer
1
2
3
4
5
6
7
8
9
10
11
public class MyChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyChatClientHandler());
}
}
MyChatClientHandler
1
2
3
4
5
6
7
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}

Netty读写检测机制与长连接因素

MyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.sineagle.netty.fourthexample;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class MyServer {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

MyServerInitializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.sineagle.netty.fourthexample;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new IdleStateHandler(5, 7, 3, TimeUnit.SECONDS));
pipeline.addLast(new MyServerHandler());
}
}

MyServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.sineagle.netty.fourthexample;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;

public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;

String eventType = null;

switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}

System.out.println(ctx.channel().remoteAddress() + "超时事件: " + eventType);
ctx.channel().close();
}
}
}

Netty对于WebSocket的支援

Netty实现服务器与客户端的长连接通信

服务端

MyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.sineagle.netty.fifthexample;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

public class MyServer {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new WebSocketChannelInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

WebSocketChannelInitializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.sineagle.netty.fifthexample;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new HttpServerCodec()); // 采用HTTP编解码器
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8192));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

pipeline.addLast(new TextWebSocketFrameHandler());

}
}

TextWebSocketFrameHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.sineagle.netty.fifthexample;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.time.LocalDateTime;

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("收到消息: " + msg.text());

ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间: " + LocalDateTime.now()));

}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded " + ctx.channel().id().asLongText());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved " + ctx.channel().id().asLongText());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生");
ctx.close();
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WecSocket客户端</title>
</head>

<script type="text/javascript">
var socket;

if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8899/ws");
socket.onmessage = function (event) {
var ta = document.getElementById("responseText");
ta.value = ta.value + "\n" + event.data;
}
socket.onopen = function (event) {
var ta = document.getElementById("responseText");
ta.value = "连接开启!";
}
socket.onclose = function (event) {
var ta = document.getElementById("responseText");
ta.value = ta.value + "\n" + "连接关闭";
}
} else {
alert('浏览器不支持WebSocket!');
}

function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("连接未开启!");
}
}
</script>

<body>
<form onsubmit="return false">
<textarea name="message" style="width: 400px; height: 200px">
</textarea>

<input type="button" value="发送按钮" onclick="send(this.form.message.value)">

<h3>服务端输出: </h3>
<textarea id="responseText" style="width: 400px; height: 300px">
</textarea>

<input type="button" onclick="javascript: document.getElementById('responseText').value=''" value="清空内容">
</form>
</body>
</html>

RPC框架

RPC: 远程过程调用,很多RPC框架都是跨语言的

使用步骤:

  1. 定义一个接口说明文件:描述了对象(结构体)、对象成员、接口方法等一系列信息
  2. 通过RPC框架所提供的编译器,将接口说明文件编译成具体语言文件
  3. 在客户端与服务器端分别引入RPC编译器所生成的文件,即可像调用本地方法一样调用远程方法

Google Protobuf

Protobuf 集成 Netty

首先编写 Person.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
syntax = "proto2";

package com.sineagle.protobuf;

option optimize_for = SPEED;
option java_package = "com.sineagle.netty.sixthexample";
option java_outer_classname = "MyDataInfo";

message Person {
required string name = 1;
optional int32 age = 2;
optional string address = 3;
}

对该文件进行编译

1
protoc --java_out=../java Person.proto

编写服务端

TestServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.sineagle.netty.sixthexample;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class TestServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

TestServerInitializer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.sineagle.netty.sixthexample;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());

pipeline.addLast(new TestServerHandler());
}
}
TestServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.sineagle.netty.sixthexample;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {
System.out.println(msg.getName());
System.out.println(msg.getAge());
System.out.println(msg.getAddress());
}
}

编写客户端

TestClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.sineagle.netty.sixthexample;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.IOException;

public class TestClient {

public static void main(String[] args) throws InterruptedException, IOException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new TestClientInitializer());

ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().closeFuture().sync();

} finally {
eventLoopGroup.shutdownGracefully();
}
}
}

TestClientInitializer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.sineagle.netty.sixthexample;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());

pipeline.addLast(new TestClientHandler());
}
}

TestClientHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.sineagle.netty.sixthexample;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {

}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
MyDataInfo.Person person = MyDataInfo.Person.newBuilder().setName("张三")
.setAge(20).setAddress("北京").build();
ctx.channel().writeAndFlush(person);
}
}

Protobuf 多协议消息支援

编写 Person.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
syntax = "proto2";

package com.sineagle.protobuf;

option optimize_for = SPEED;
option java_package = "com.sineagle.netty.sixthexample";
option java_outer_classname = "MyDataInfo";

message MyMessage {
enum DataType {
PersonType = 1;
DogType = 2;
CatType = 3;
}
required DataType data_type = 1;

oneof dataBody {
Person person = 2;
Dog dog = 3;
Cat cat = 4;
}

}


message Person {
optional string name = 1;
optional int32 age = 2;
optional string address = 3;
}

message Dog {
optional string name = 1;
optional int32 age = 2;
}

message Cat {
optional string name = 1;
optional string city = 2;
}

客户端

TestClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestClient {

public static void main(String[] args) throws InterruptedException, IOException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new TestClientInitializer());

ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().closeFuture().sync();

} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
TestClientInitializer
1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());

pipeline.addLast(new TestClientHandler());
}
}
TestClientHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {

}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
int randomInt = new Random().nextInt(3);

MyDataInfo.MyMessage myMessage = null;
if (0 == randomInt) {
myMessage = MyDataInfo.MyMessage.newBuilder()
.setDataType(MyDataInfo.MyMessage.DataType.PersonType)
.setPerson(MyDataInfo.Person.newBuilder()
.setName("张三三").setAge(20).setAddress("北京").build()
).build();
} else if (1 == randomInt) {
myMessage = MyDataInfo.MyMessage.newBuilder()
.setDataType(MyDataInfo.MyMessage.DataType.DogType)
.setDog(MyDataInfo.Dog.newBuilder()
.setName("修狗").setAge(10).build()
).build();
} else {
myMessage = MyDataInfo.MyMessage.newBuilder()
.setDataType(MyDataInfo.MyMessage.DataType.CatType)
.setCat(MyDataInfo.Cat.newBuilder()
.setName("修猫").setCity("北京").build()
).build();
}

ctx.channel().writeAndFlush(myMessage);
}
}

服务端

TestServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
TestServerInitializer
1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());

pipeline.addLast(new TestServerHandler());
}
}
TestServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if (dataType == MyDataInfo.MyMessage.DataType.PersonType) {
MyDataInfo.Person person = msg.getPerson();
System.out.println(person.getName());
System.out.println(person.getAge());
System.out.println(person.getAddress());
} else if (dataType == MyDataInfo.MyMessage.DataType.DogType) {
MyDataInfo.Dog dog = msg.getDog();
System.out.println(dog.getName());
System.out.println(dog.getAge());
} else {
MyDataInfo.Cat cat = msg.getCat();

System.out.println(cat.getName());
System.out.println(cat.getCity());
}

}
}

Thrift

基于Thrift实现客户端服务端通信

编写data.thrift 并进行编译

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
namespace java thrift.generated

typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String

struct Person {
1: optional String username,
2: optional int age,
3: optional boolean married;
}

exception DataException {
1: optional String message,
2: optional String callStack,
3: optional String date
}

service PersonService {
Person getPersonByUsername(1: required String username) throws (1: DataException dataException),

void savePerson(1: required Person person) throws (1: DataException dataException)
}
1
thrift --gen java data.thrift

生成三个java文件如下图

实现PersonService接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class PersonServiceImpl implements PersonService.Iface {
@Override
public Person getPersonByUsername(String username) throws DataException, TException {
System.out.println("Got Client param: " + username);

Person person = new Person();
person.setUsername(username);
person.setAge(20);
person.setMarried(false);

return person;
}

@Override
public void savePerson(Person person) throws DataException, TException {
System.out.println("Got Client Param: ");
System.out.println(person.getUsername());
System.out.println(person.getAge());
System.out.println(person.isMarried());

}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.sineagle.thrift;

import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import thrift.generated.PersonService;

public class ThriftServer {
public static void main(String[] args) throws TTransportException {
TNonblockingServerSocket socket = new TNonblockingServerSocket(8899);
THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(2).maxWorkerThreads(4);
PersonService.Processor<PersonServiceImpl> processor = new PersonService.Processor<>(new PersonServiceImpl());

arg.protocolFactory(new TCompactProtocol.Factory());
arg.transportFactory(new TFramedTransport.Factory());
arg.processorFactory(new TProcessorFactory(processor));

TServer server = new THsHaServer(arg);

System.out.println("Thrift Server Started !");
server.serve();
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.sineagle.thrift;

import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import thrift.generated.Person;
import thrift.generated.PersonService;

public class ThriftClient {
public static void main(String[] args) throws TTransportException {
TTransport transport = new TFramedTransport(new TSocket("localhost", 8899), 600);
TProtocol protocol = new TCompactProtocol(transport);
PersonService.Client client = new PersonService.Client(protocol);
try {
transport.open();

Person person = client.getPersonByUsername("张三");

System.out.println(person.getUsername());
System.out.println(person.getAge());
System.out.println(person.isMarried());
System.out.println("-----------");
Person person2 = new Person();
person2.setUsername("李四");
person2.setAge(30);
person2.setMarried(true);
client.savePerson(person2);

} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
} finally {
transport.close();
}
}
}

Thrift架构

GRPC

一个简单的例子

定义Student.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
syntax = "proto3";

package com.sineagle.proto;

option java_package = "com.sineagle.proto";
option java_outer_classname = "StudentProto";
option java_multiple_files = true;

service StudentService {
rpc getRealNameByUsername(MyRequest) returns (MyResponse) {}

rpc getStudentsByAge(StudentRequest) returns (stream StudentResponse) {}

rpc getStudentWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}

rpc biTalk(stream StreamRequest) returns (stream StreamResponse) {}
}

message MyRequest {
string username = 1;
}

message MyResponse {
string realname = 2;
}

message StudentRequest {
int32 age = 1;
}

message StudentResponse {
string name = 1;
int32 age = 2;
string city = 3;
}

message StudentResponseList {
repeated StudentResponse studentResponse = 1;
}

message StreamRequest {
string request_info = 1;
}

message StreamResponse {
string response_info = 1;
}

使用gradle generateProto命令生成相应的文件

编写GrpcServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.sineagle.grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class GrpcServer {
private Server server;

private void start() throws IOException {
this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();
System.out.println("server started!");

Runtime.getRuntime().addShutdownHook(new Thread(()-> {
System.out.println("关闭JVM");
GrpcServer.this.stop();
}));
}

private void stop() {
if (null != this.server) {
this.server.shutdown();
}
}

private void awaitTermination() throws InterruptedException {
if (null != this.server) {
//this.server.awaitTermination(3000, TimeUnit.MICROSECONDS);
this.server.awaitTermination();
}
}

public static void main(String[] args) throws IOException, InterruptedException {
GrpcServer server = new GrpcServer();
server.start();
server.awaitTermination();
}
}

编写GrpcClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.sineagle.grpc;

import com.sineagle.proto.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.time.LocalDateTime;
import java.util.Iterator;

public class GrpcClient {

public static void main(String[] args) throws InterruptedException {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
.usePlaintext().build();
StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(managedChannel);
StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc.newStub(managedChannel);

//MyResponse myResponse = blockingStub.getRealNameByUsername(MyRequest.newBuilder().setUsername("zhangsan").build());
//
//System.out.println(myResponse.getRealname());
//System.out.println(" =========================================== ");
//
//Iterator<StudentResponse> iter = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());
//while (iter.hasNext()) {
// StudentResponse studentResponse = iter.next();
// System.out.println(studentResponse.getName() + ", " + studentResponse.getAge() + ", " + studentResponse.getCity());
//}
//
//System.out.println(" =========================================== ");


//StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
// @Override
// public void onNext(StudentResponseList value) {
// value.getStudentResponseList().forEach(studentResponse -> {
// System.out.println(studentResponse.getName());
// System.out.println(studentResponse.getAge());
// System.out.println(studentResponse.getCity());
// System.out.println("******");
// });
// }
//
// @Override
// public void onError(Throwable t) {
// System.out.println(t.getMessage());
// }
//
// @Override
// public void onCompleted() {
// System.out.println("completed!");
// }
//};
//StreamObserver<StudentRequest> studentRequestStreamObserver = stub.getStudentWrapperByAges(studentResponseListStreamObserver);
//studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());
//studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());
//studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());
//studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(50).build());
//
//studentRequestStreamObserver.onCompleted();

StreamObserver<StreamRequest> requestStreamObserver = stub.biTalk(new StreamObserver<StreamResponse>() {
@Override
public void onNext(StreamResponse value) {
System.out.println(value.getResponseInfo());
}

@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}

@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
});

for (int i = 0; i < 10; i ++) {
requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
Thread.sleep(1000);
}


try {
Thread.sleep(50000);
} catch (Exception e) {
e.printStackTrace();
}



}
}

Java NIO

装饰模式

  • 装饰模式又名包装(Wrapper)模式

  • 装饰模式以客户端透明的方式扩展对象的功能,是继承关系的一个替代方案

  • 装饰模式以对客户透明的方式动态的给一个对象附加上更多的责任。换言之,客户端并不会觉得对象在装饰前和装饰后有什么不同

  • 装饰模式可以在不创造更多子类的情况下,将对象的功能加以扩展

  • 装饰模式把客户端的调用委派到装饰类。装饰模式的关键在于这种扩展是完全透明的

  • 装饰模式是在不改变原类对象和使用继承的情况下,动态的加载一个对象的功能。它通过创建一个包装对象,也就是装饰来包裹真实的对象

装饰模式的角色:

  • 抽象构件角色(Component):给出一个抽象接口,以规范准备接受附加角色的对象。
  • 具体构建角色(Concrete Component):定义一个将要接受附加责任的类
  • 装饰角色(Decorator):持有一个构件(Component)对象的引用,并定义一个与抽象构件接口一致的接口
  • 具体装饰角色(Concrete Decorator):负责给构建对象贴上附加的责任

装饰模式的特点

  • 装饰对象和真实对象有相同的接口,这样客户端就可以以和真实对象相同的方式和装饰器对象交互
  • 装饰对象包含一个真实对象的引用(reference)
  • 装饰对象接受所有来自客户端的请求。它把这些请求转发给真实的对象
  • 装饰对象可以在转发这些请求以前和以后增加一些附加功能。在面向对象的设计中,通常是通过继承来实现对给定类的功能扩展

Java IO NIO 体系分析

java.io 中最为核心的概念是流(Stream),面向流的编程。Java中,一个流要么是输入流,要么是输出流,不能同时是输入流和输出流。

java.nio中拥有三个核心概念:

  • Selector(选择器)
  • Channel(通道)
  • Buffer(缓冲区)

java.nio中,我们是面向块(block)或是缓冲区(buffer)编程的。Buffer本身就是一块内存,底层实现上就是个数组。数据的读写都是通过Buffer实现的

一个简单生成10个随机数的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class NioTest1 {

public static void main(String[] args) {
IntBuffer buffer = IntBuffer.allocate(10);

for (int i = 0; i < buffer.capacity(); i ++) {
int randomNumber = new SecureRandom().nextInt(20);
buffer.put(randomNumber);
}

buffer.flip();
while (buffer.hasRemaining()) {
System.out.println(buffer.get());
}
}
}

虽然nio中的Channel类似与io中的Stream,但我们不能直接使用Channel中的数据,必须通过Buffer来进行读取或写入,同一个Buffer既可以支持读,也可以支持写,通过flip方法进行翻转。

除了数组之外,Buffer还提供了数据的结构化访问方式,并且可以追踪到系统的读写过程。

Java中7种原生数据类型都有各自对应的Buffer类型,如IntBuffer, LongBuffer,ByteBuffer及CharBuffer等,并没有BooleanBuffer类型

Channel指的是可以向其写入数据或读取数据的对象,它类似于java.io中的Stream

所有数据的读写都是通过Buffer来进行的,永远不会出现直接向Channel写入数据的情况,或是直接从Channel读取数据的情况

Stream不同的是,Channel是双向的,一个流只可能是InputStream或是OutputStream, Channel打开后则可以进行读取、写入或是读写

由于Channel是双向的,因此它能更好的反映出底层OS的真实情况,在Linux中,底层操作系统的通道就是双向的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class NioTest2 {

public static void main(String[] args) throws IOException {
FileInputStream fileInputStream = new FileInputStream("NioTest2.txt");
FileChannel fileChannel = fileInputStream.getChannel();

ByteBuffer byteBuffer = ByteBuffer.allocate(512);
fileChannel.read(byteBuffer);

byteBuffer.flip();
while (byteBuffer.remaining() > 0) {
byte b = byteBuffer.get();
System.out.println("Character: " + (char)b);
}
fileInputStream.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class NioTest3 {

public static void main(String[] args) throws IOException {
FileOutputStream fileOutputStream = new FileOutputStream("NioTest3.txt");

FileChannel fileChannel = fileOutputStream.getChannel();

ByteBuffer byteBuffer = ByteBuffer.allocate(512);

byte[] messages = "hello world welcome, nihao.".getBytes();

for (int i = 0; i < messages.length; i ++) {
byteBuffer.put(messages[i]);
}
byteBuffer.flip();

fileChannel.write(byteBuffer);
fileOutputStream.close();
}
}

Buffer中各重要状态属性的含义与关系

capacity, limit, position

关于NIO Buffer中3个重要状态属性的含义:capacity, limit, position

源码中这样描述:

  • capacity :the number of elements it contains. The capacity of a buffer is never negative and never changes
  • limit : the index of the first element that should not be read or written, A buffer’s limit is never negative and is never greater than its capacity.
  • position: the index of the next element to be read or written, A buffer’s position is never negative and is never greater than its limit.

翻译过来就是:

  1. capacity:capacity是Buffer对象中存储数据的最大容量。创建Buffer对象时,capacity必须由开发者指定。一旦创建,capacity就不会再改变。
  2. position:position表示下一个可读或可写的位置。当Buffer对象创建时,position被设置为0。每次读或写操作都会使position的值发生变化。
  3. limit:limit表示Buffer对象中有效数据的范围。当Buffer对象创建时,limit被设置为capacity。limit通常在读写操作时被使用,用于确保不会读取或写入超出有效范围的数据。
  4. mark:mark用于保存当前position的值,以便稍后进行重置操作。

flip()

flip()函数是Java NIO中Buffer的一个重要方法,它的作用是将Buffer从写模式切换为读模式。

1
2
3
4
5
6
public Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

当Buffer处于写模式时,position表示下一个可写入数据的位置,limit表示缓冲区的容量,即可写入数据的最大长度。写入数据后,position的值会递增,直到等于limit。

当需要读取Buffer中的数据时,必须先将Buffer切换到读模式,这时候flip()方法就派上用场了。调用flip()方法后,position的值将被设置为0,而limit的值将被设置为之前position的值,这意味着Buffer从写模式切换为读模式,并准备读取已写入数据的一部分或全部。

以下是一个使用flip()函数的示例:

1
2
3
4
5
6
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put("Hello, World!".getBytes());
buf.flip(); //切换到读模式
while (buf.hasRemaining()) {
System.out.print((char) buf.get()); //读取数据
}

在上述示例中,首先将一个字符串写入Buffer中,然后调用flip()方法,将Buffer从写模式切换到读模式。接着,使用while循环和get()方法从Buffer中读取数据。

需要注意的是,flip()函数不会清空Buffer中的数据,而是将position设置为0,limit设置为之前position的值,因此在读取完Buffer中的数据后,仍然可以使用clear()方法清空Buffer中的数据,使其恢复到写模式。

mark属性与reset()

mark属性是Buffer对象的一个标记属性,它用于标记Buffer中某个特定位置的标记。

在Buffer对象中调用mark()方法会将当前position的值保存为mark的值。之后调用reset()方法会将position的值重置为mark的值。也就是说,mark()方法标记了当前position的位置,reset()方法将position的位置重置为mark的位置。

下面是一个使用mark()和reset()方法的示例:

1
2
3
4
5
6
7
8
ByteBuffer buf = ByteBuffer.allocate(48);
buf.put("Hello, World!".getBytes());
buf.mark(); //标记当前position的位置
buf.put(" This is Java NIO".getBytes());
buf.reset(); //将position的位置重置为mark的位置
while (buf.hasRemaining()) {
System.out.print((char) buf.get()); //读取数据
}

输出结果为:This is Java NIO

在上述示例中,首先将一个字符串写入Buffer中,然后使用mark()方法标记当前position的位置。接着,继续向Buffer中写入另一个字符串,然后使用reset()方法将position的位置重置为mark的位置。最后,使用while循环和get()方法从Buffer中读取数据。

需要注意的是,调用mark()方法会将mark的值设置为当前position的值,而调用reset()方法会将position的值重置为mark的值。因此,在调用reset()方法之前,必须先调用mark()方法进行标记,否则将会抛出InvalidMarkException异常。同时,每次调用mark()方法时,之前的mark值都将被覆盖。因此,只有在需要时才应该使用mark()和reset()方法。

0 <= mark <= position <= limit <= capacity

clear()

clear()函数是Java NIO中Buffer的一个重要方法,它的作用是将Buffer从任何状态下重置为初始状态,即清除Buffer中的所有数据并将position设置为0,limit设置为capacity。

在调用clear()方法之前,如果Buffer处于写模式,那么它的position属性指向的是下一个可写入数据的位置,limit属性则指向Buffer的容量。而在读模式下,position属性则指向下一个可读取数据的位置,limit属性则指向写入Buffer时的position值。

当Buffer已经被写满或者读满时,需要将Buffer重置为初始状态,以便再次写入或者读取数据。这时候就可以使用clear()方法。

以下是一个使用clear()函数的示例:

1
2
3
4
5
6
7
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put("Hello, World!".getBytes());
buf.flip(); //切换到读模式
while (buf.hasRemaining()) {
System.out.print((char) buf.get()); //读取数据
}
buf.clear(); //清空Buffer中的数据

在上述示例中,首先将一个字符串写入Buffer中,然后调用flip()方法将Buffer切换到读模式,并使用while循环和get()方法从Buffer中读取数据。接着,使用clear()方法清空Buffer中的数据,使其恢复到初始状态,以便再次写入或者读取数据。

需要注意的是,clear()方法并不会清空Buffer中的数据,而是将position属性设置为0,limit属性设置为capacity,使Buffer恢复到初始状态。因此,在调用clear()方法之前,需要先读取或者删除Buffer中的数据,以免将Buffer中的旧数据重复写入或读取。

rewind()

rewind()函数是Java NIO中Buffer的一个重要方法,它的作用是将Buffer中的position属性重置为0,以便重新读取Buffer中的数据。与clear()方法不同,rewind()方法并不会清空Buffer中的数据,而只是将position属性重置为0,以便重新读取Buffer中的数据。

在调用rewind()方法之前,如果Buffer处于写模式,那么它的position属性指向的是下一个可写入数据的位置,limit属性则指向Buffer的容量。而在读模式下,position属性则指向下一个可读取数据的位置,limit属性则指向写入Buffer时的position值。

当需要重新读取Buffer中的数据时,可以使用rewind()方法将position属性重置为0,然后再次读取Buffer中的数据。

以下是一个使用rewind()函数的示例:

1
2
3
4
5
6
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put("Hello, World!".getBytes());
buf.flip(); //切换到读模式
System.out.println((char) buf.get()); //读取第一个字节
buf.rewind(); //将position属性重置为0
System.out.println((char) buf.get()); //再次读取第一个字节

在上述示例中,首先将一个字符串写入Buffer中,然后调用flip()方法将Buffer切换到读模式,并使用get()方法从Buffer中读取第一个字节。接着,使用rewind()方法将position属性重置为0,以便重新读取Buffer中的数据。最后,再次使用get()方法从Buffer中读取第一个字节。

需要注意的是,使用rewind()方法将position属性重置为0后,将会从Buffer中读取先前已经读取的数据,因此需要谨慎使用。同时,在调用rewind()方法之前,需要确保Buffer中已经有数据可读取,否则将会抛出BufferUnderflowException异常。

文件通道用法详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NioTest4 {

public static void main(String[] args) throws IOException {
FileInputStream inputStream = new FileInputStream("input.txt");
FileOutputStream outputStream = new FileOutputStream("output.txt");

FileChannel inputChannel = inputStream.getChannel();
FileChannel outputChannel = outputStream.getChannel();

ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
buffer.clear(); // 想想如果注释掉这一行,会发生什么
int read = inputChannel.read(buffer);

if (-1 == read) break;
buffer.flip();
outputChannel.write(buffer);
}
inputChannel.close();
outputChannel.close();

}
}

通过NIO读取文件涉及到的3个步骤:

  1. FileInputStream获取到FileChannel对象
  2. 创建Buffer
  3. 将数据从Channel读取到Buffer

绝对方法与相对方法的含义:

  • 相对方法:limit值和position值在操作时被考虑到
  • 绝对方法:完全忽略掉limit值和position

slice()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.sineagle.nio;

import java.nio.ByteBuffer;

public class NioTest6 {

public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
for (int i = 0; i < buffer.capacity(); i ++) {
buffer.put((byte) i);
}
buffer.position(2);
buffer.limit(6);
ByteBuffer sliceBuffer = buffer.slice();

for (int i = 0; i < sliceBuffer.capacity(); i ++) {
byte b = sliceBuffer.get(i);
b *= 2;
sliceBuffer.put(i, b);
}

buffer.position(0);
buffer.limit(buffer.capacity());

while (buffer.hasRemaining()) {
System.out.println(buffer.get());
}
}
}

Slice Buffer与原有的Buffer共享相同的底层数组

只读Buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.sineagle.nio;

import java.nio.ByteBuffer;

public class NioTest7 {

public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println(buffer.getClass());
for (int i = 0; i < buffer.capacity(); i ++) {
buffer.put((byte) i);
}
ByteBuffer readonlyBuffer = buffer.asReadOnlyBuffer();
System.out.println(readonlyBuffer.getClass());
readonlyBuffer.position(0);
//readonlyBuffer.put((byte) 2); // ReadOnlyBufferException
}
}

只读Buffer, 我们可以随时将一个普通Buffer调用asReadOnlyBuffer()方法返回一个只读Buffer, 但我们不能将一个只读Buffer转换为读写Buffer

NIO堆外内存与零拷贝

allocateDirect() 创建了DirectByteBuffer对象

  • DirectByteBuffer对象本身是在堆内生成的,它的字段address指向的是堆外内存

这段注释是Java NIO中ByteBuffer类中address字段的注释,下面是对其的详细解释:

  • Used by heap byte buffers or direct buffers with Unsafe access:address字段被堆缓冲区和直接缓冲区使用,以及使用Unsafe(一种Java内部API)进行访问。
  • For heap byte buffers this field will be the address relative to the array base address and offset into that array.:对于堆缓冲区,address字段表示相对于缓冲区底层数组的基地址和偏移量的地址。
  • The address might not align on a word boundary for slices, nor align at a long word (8 byte) boundary for byte[] allocations on 32-bit systems.:对于片段(slices),address字段可能不会在字(word)边界上对齐,在32位系统上,byte[]数组的分配可能不会在长字(long word)边界上对齐。
  • For direct buffers it is the start address of the memory region.:对于直接缓冲区,address字段表示内存区域的起始地址
  • The address might not align on a word boundary for slices, nor when created using JNI, see NewDirectByteBuffer(void*, long).:对于片段,address字段可能不会在字边界上对齐,使用JNI(Java Native Interface)创建的直接缓冲区也可能不会在字边界上对齐。
  • Should ideally be declared final:address字段应该声明为final。

这段注释主要是描述了address字段在ByteBuffer类中的含义和用法。在Java NIO中,ByteBuffer类是一个重要的缓冲区类,用于读取和写入数据。由于缓冲区底层是基于字节数组实现的,因此可以通过address字段来访问缓冲区底层的字节数组或者直接内存区域。该注释还提到了在某些情况下,address字段可能不会在字边界上对齐,需要注意这一点。

使用DirectByteBuffer对象可以提高效率,如果使用传统的HeapByteBuffer在JVM的堆空间操作IO时,会把相应的Buffer拷贝到OS内核的一段内存空间,然后进行IO操作,而如果使用DirectByteBuffer中的address指向堆外内存,可以直接在内核态进行IO操作,少了一次数据拷贝过程,提高效率,这就是所谓的零拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.sineagle.nio;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NioTest8 {

public static void main(String[] args) throws IOException {
FileInputStream inputStream = new FileInputStream("input2.txt");
FileOutputStream outputStream = new FileOutputStream("output2.txt");

FileChannel inputChannel = inputStream.getChannel();
FileChannel outputChannel = outputStream.getChannel();

ByteBuffer buffer = ByteBuffer.allocateDirect(10); // 分配堆外缓冲区内存
while (true) {
buffer.clear();
int read = inputChannel.read(buffer);
System.out.println("read : " + read);

if (-1 == read) break;

buffer.flip();
outputChannel.write(buffer);
}

inputChannel.close();
outputChannel.close();
}
}

内存映射文件

MappedByteBuffer

MappedByteBuffer类是通过FileChannel.map方法创建的一种特殊类型的ByteBuffer,用于表示内存映射文件区域。MappedByteBuffer类提供了特定于内存映射文件区域的操作。

MappedByteBuffer和其表示的文件映射将一直有效,直到缓冲区本身被垃圾回收。但是需要注意的是,内存映射文件区域的内容随时可能会发生变化,例如可能被本程序或其他程序更改。这些变化是否发生以及何时发生是操作系统相关的,因此是未指定的。

MappedByteBuffer是Java NIO中的一种ByteBuffer的实现,它的主要功能是将一个文件的部分或全部内容映射到内存中,从而可以直接对该内存区域进行读写操作。因此,MappedByteBuffer具有以下几个主要功能:

  1. 内存映射文件:MappedByteBuffer可以将文件的部分或全部内容映射到内存中,从而实现高效的文件访问,避免了频繁的磁盘I/O操作。
  2. 直接访问映射区域:由于MappedByteBuffer将文件内容映射到内存中,因此可以直接对映射区域进行读写操作,而无需进行复制或移动数据。这样可以大大提高读写文件的效率。
  3. 特定于内存映射文件区域的操作:MappedByteBuffer提供了一些特定于内存映射文件区域的操作,例如force()方法可以强制将修改过的缓冲区内容写入到文件中,以确保数据的持久化。还有load()方法可以将尚未加载的部分或全部映射区域加载到内存中,等等。

需要注意的是,由于内存映射文件的内容随时可能发生变化,因此需要注意避免多个程序同时对同一个映射文件进行操作,以避免数据不一致等问题。此外,使用MappedByteBuffer需要注意内存占用问题,以避免内存泄漏等问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class NioTest9 {

public static void main(String[] args) throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile("NioTest9.txt", "rw");
FileChannel fileChannel = randomAccessFile.getChannel();

MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
mappedByteBuffer.put(0, (byte) 'a');
mappedByteBuffer.put(3, (byte) 'b');

randomAccessFile.close();
}
}

关于Buffer中的Scattering与Gathering

在Java NIO中,Scattering与Gathering是两个用于描述缓冲区(Buffers)的数据读取与写入的概念。

  • Scattering:从一个channel中读取数据到多个缓冲区中。例如,我们可以使用Scattering将一个输入流中的数据分散到多个缓冲区中。
  • Gathering:将多个缓冲区中的数据写入到一个channel中。例如,我们可以使用Gathering将多个缓冲区中的数据汇总写入到一个输出流中。

Scattering与Gathering在使用时需要注意以下几点:

  • Scattering或Gathering时,缓冲区的数量必须与数据的数量相匹配。如果缓冲区数量不足或过多,数据读取或写入时将会出现异常。
  • Scattering和Gathering可以在不同的channel之间进行传输。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.sineagle.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;

/**
* 关于Buffer中的Scattering与Gathering
*/

public class NioTest11 {

public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(8899);
serverSocketChannel.socket().bind(address);

int messageLength = 2 + 3 + 4;

ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.allocate(2);
buffers[1] = ByteBuffer.allocate(3);
buffers[2] = ByteBuffer.allocate(4);

SocketChannel socketChannel = serverSocketChannel.accept();


while (true) {
int bytesRead = 0;

while (bytesRead < messageLength) {
long r = socketChannel.read(buffers);
bytesRead += r;
System.out.println("byteRead: " + bytesRead);

Arrays.asList(buffers).stream().map(buffer -> "position: " + buffer.position() + ", limit: " + buffer.limit()).forEach(System.out::println);
}
Arrays.asList(buffers).forEach(buffer -> {
buffer.flip();
});

long bytesWritten = 0;
while (bytesWritten < messageLength) {
long r = socketChannel.write(buffers);
bytesWritten += r;
}
Arrays.asList(buffers).forEach(buffer -> {
buffer.clear();
});
System.out.println("bytesRead: " + bytesRead + ", bytesWritten:" + bytesWritten + ", messageLength: " + messageLength);
}

}
}

Selector源码深入分析

使用ChatGPT翻译的文档:

Selector 类提供了一种多路复用的方式,用于管理多个可选择通道的状态。通过使用 Selector 类,应用程序可以通过单个线程来处理多个通道,这可以大大减少线程的数量,并提高系统的性能。

一个 Selector 实例可以使用系统默认的 SelectorProvider 创建,也可以使用自定义的 SelectorProvider 创建。创建一个 Selector 实例后,它会保持打开状态,直到调用其 close 方法关闭。

可选择通道在 Selector 中的注册由 SelectionKey 对象表示。Selector 维护了三个 SelectionKey 集合:

  1. key set:包含代表此 Selector 的当前通道注册的键的集合。可以通过 keys 方法返回该集合。
  2. selected-key set:这是这样一个键的集合,每个键的通道在先前的选择操作中被检测到至少准备好了键的兴趣集合中标识的一个操作。该集合可以通过 selectedKeys 方法返回。selected-key set 总是 key set 的一个子集。
  3. cancelled-key set:这是一组已经取消但其通道尚未被注销的键。该集合不可直接访问。cancelled-key set 总是 key set 的一个子集。

在新创建的 Selector 中,这三个集合都是空的。

当使用通道的 register 方法注册通道时,Selector 的 key set 集合会作为副作用将该键添加到集合中。在选择操作期间,取消的键将从 key set 集合中移除。key set 本身不能直接修改。

当一个键被取消时,它会被添加到其 Selector 的 cancelled-key set 集合中,无论是通过关闭其通道还是通过调用其 cancel 方法。取消键将导致其通道在下一个选择操作中被注销,在那时键将从 Selector 的所有键集合中移除。

通过选择操作,将键添加到 selected-key set 集合中。可以通过调用集合的 remove 方法或从集合中获取的迭代器的 remove 方法来直接从 selected-key set 集合中删除键。可以通过调用集合的 clear 方法来从 selected-key set 中删除所有键。不能直接向 selected-key set 集合中添加键。

总之,Selector 类提供了一种方便的方式来管理多个通道的状态,并且可以使用单个线程来处理多个通道,从而提高系统的性能。

选择(Selection)

选择操作会查询底层操作系统以更新每个已注册的通道的准备状态,以执行由其键的兴趣集所标识的任何操作。选择操作有两种形式:

select()、select(long)和selectNow()方法将准备好执行操作的通道的键添加到所选键集中,或更新已在所选键集中的键的就绪操作集。

select(Consumer)、select(Consumer,long)和selectNow(Consumer)方法对于每个准备执行操作的通道的键执行操作。这些方法不会添加到所选键集中。

向所选键集添加的选择操作

在每个选择操作期间,键可能会添加到选择器的所选键集并从中删除,并且可能会从其键和已取消键集中删除。选择由select()、select(long)和selectNow()方法执行,涉及三个步骤:

从已取消键集中删除每个键,并从其是成员的每个键集中删除它,并注销其通道。这一步骤使取消键集为空。

查询底层操作系统以更新每个剩余通道的准备状态,以执行由其键的兴趣集所标识的任何操作,即当选择操作开始时。对于准备执行至少一个此类操作的通道,将执行以下两个操作之一:

如果通道的键尚未在所选键集中,则将其添加到该集合中,并修改其就绪操作集,以精确地标识通道现在报告为已准备的那些操作。任何先前记录在就绪集中的准备信息都将被丢弃。

否则,通道的键已经在所选键集中,因此其就绪操作集将修改为标识通道报告为已准备的任何新操作。任何先前记录在就绪集中的准备信息都将被保留;换句话说,底层系统返回的就绪集将与键的当前就绪集按位分离。

如果在此步骤开始时键集中的所有键都具有空的兴趣集,则不会更新所选键集或任何键的就绪操作集。

如果在步骤(2)进行时添加了任何键到取消键集中,则它们将像步骤(1)中一样被处理。

选择操作是否阻塞以等待一个或多个通道变为准备状态,如果是,则阻塞的时间长度是三种选择方法之间唯一的基本区别。

对所选键执行操作的选择操作

在每个选择操作期间,键可能会从选择器的键、所选键和取消键集中删除。选择由select(Consumer)、select(Consumer,long)和selectNow(Consumer)方法执行,涉及三个步骤:

从已取消键集中删除每个键,并从其是成员的每个键集中删除它,并注销其通道。

NIO 网络编程

SelectionKey

SelectionKey是Java NIO中的一个关键类,它代表了一个SelectableChannel(可选择通道)和一个Selector(选择器)之间的注册关系。每当一个通道被注册到选择器中时,就会创建一个SelectionKey对象来表示它们之间的关系。

SelectionKey对象包含两个操作集,分别表示关联的通道支持的操作类型。其中,兴趣操作集(interest set)表示选择器将会测试哪些操作类型是否准备就绪,可以通过interestOps()方法修改;就绪操作集(ready set)表示通道已经准备就绪的操作类型,由选择器在选择操作过程中自动更新。

SelectionKey对象还支持关联一个任意类型的对象,这可以用于将应用程序特定的信息与选择键关联起来,例如代表高层协议状态的对象。

选择器在调用select()方法时,会阻塞线程直到至少有一个通道已准备好进行I/O操作。当一个或多个通道准备就绪时,select()方法返回一个选择键集合,其中包含已准备就绪的通道对应的SelectionKey对象。应用程序可以通过访问这些SelectionKey对象来获取通道的就绪操作集、兴趣操作集和关联的任意对象。这使得应用程序能够根据通道状态采取相应的操作,从而实现非阻塞的I/O操作。

每当一个通道注册到Selector时,就会创建一个SelectionKey,它代表了通道和选择器之间的注册关系。一个SelectionKey对象的有效期可以通过调用cancel方法、关闭通道或关闭选择器来结束。取消一个SelectionKey并不会立即从Selector中移除它,而是在下一次选择操作时被加入到选择器的取消键集中以进行移除。可以通过调用isValid方法来测试一个SelectionKey是否仍然有效。

SelectionKey包含两个整数值表示的操作集合,分别是interest set和ready set。每个操作集合的每一位表示一种可选择的操作类别,这些类别由通道所支持。interest set表示下一次选择方法被调用时将要测试的操作类别,初始化时为创建SelectionKey时指定的值,之后可以通过interestOps(int)方法来修改。ready set表示通道已准备就绪的操作类别,在创建SelectionKey时初始化为0,之后在选择操作中由Selector更新。但是需要注意的是,ready set并不能直接更新,因为它只是一个提示,不能保证操作类别可以在不阻塞线程的情况下执行。

为了满足应用程序的需要,可以将一个任意的对象与SelectionKey关联起来,例如表示高层协议状态的对象,用于处理准备就绪通知。可以通过attach方法来将对象附加到SelectionKey上,并在需要时通过attachment方法获取。

需要注意的是,SelectionKey可以安全地用于多个并发线程,选择操作始终使用操作集合的当前值。每个可选择通道的子类定义了一个validOps()方法,返回一个标识通道所支持操作的集合,尝试设置或测试不支持的操作将导致运行时异常。

总之,SelectionKey是Java NIO中的一个重要概念,用于表示一个通道和选择器之间的注册关系,同时还可以附加一些额外的信息。可以通过SelectionKey的属性和方法来处理NIO中的事件。

select

select()是Java NIO中Selector类的一个方法,其作用是阻塞直到至少有一个通道在之前注册的操作集合中准备好进行I/O操作,或者达到超时时间。

具体而言,select()方法的行为如下:

  1. 如果当前没有任何通道已准备好进行I/O操作,并且没有设置超时时间,那么select()方法会一直阻塞直到至少有一个通道准备好进行I/O操作。
  2. 如果当前没有任何通道已准备好进行I/O操作,但是设置了超时时间,则select()方法会阻塞直到超时时间到达,或者至少有一个通道准备好进行I/O操作。
  3. 如果至少有一个通道已准备好进行I/O操作,则select()方法会立即返回,select()方法返回一个整数值,表示有多少个通道已准备好进行I/O操作。

select()方法是Java NIO中实现非阻塞I/O的核心之一。它可以让应用程序在进行I/O操作时不必一直阻塞等待,而是可以利用这段时间进行其他操作。同时,它也提供了一种高效的方式来检测哪些通道已准备好进行I/O操作,从而避免了不必要的阻塞和轮询。

select()方法是一种阻塞式的操作,它会一直阻塞当前线程,直到至少有一个通道已准备好进行I/O操作。在阻塞期间,线程不会执行其他任务,直到select()方法返回。这也是所谓的“选择器模型”中的“选择器”部分。

但是,在调用select()方法之前,通常会使用Java NIO中的非阻塞I/O操作注册一个或多个通道到选择器中,这样这些通道就可以在后续的select()操作中被检测到是否准备好进行I/O操作。这种非阻塞的注册和检测机制可以大大提高程序的性能,避免了轮询和阻塞等操作。

因此,虽然select()方法是一种阻塞式的操作,但是整个选择器模型的目的是为了实现非阻塞的I/O操作,从而提高程序的性能和可伸缩性。

一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class NioTest12 {

public static void main(String[] args) throws IOException {
int[] ports = new int[5];

ports[0] = 5000;
ports[1] = 5001;
ports[2] = 5002;
ports[3] = 5003;
ports[4] = 5004;

Selector selector = Selector.open();

for (int i = 0; i < ports.length; i ++) {
ServerSocketChannel serverSocketChannel= ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
InetSocketAddress address = new InetSocketAddress(ports[i]);
serverSocket.bind(address);

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

System.out.println("监听端口: " + ports[i]);
}

while (true) {
int numbers = selector.select();
System.out.println("numbers: " + numbers);
Set<SelectionKey> selectionKeys = selector.selectedKeys();

System.out.println("selectedKeys: " + selectionKeys);
Iterator<SelectionKey> iter = selectionKeys.iterator();

while (iter.hasNext()) {
SelectionKey selectionKey = iter.next();

if (selectionKey.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);

socketChannel.register(selector, SelectionKey.OP_READ);

iter.remove();

System.out.println("获得客户端连接: " + socketChannel);
} else if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

int byteRead = 0;
while (true) {
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
byteBuffer.clear();

int read = socketChannel.read(byteBuffer);
if (read <= 0) break;

byteBuffer.flip();
socketChannel.write(byteBuffer);
byteRead += read;
}
System.out.println("读取:" + byteRead + ", 来自于: " + socketChannel);
iter.remove();
}
}

}


}
}

完整程序

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class NioServer {

private static Map<String, SocketChannel> clientMap = new HashMap<>();

public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));

Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();

selectionKeys.forEach(selectionKey -> {
final SocketChannel client;
try {
if (selectionKey.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);

String key = "[" + UUID.randomUUID().toString() + "]";
clientMap.put(key, client);


} else if (selectionKey.isReadable()) {
client = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);

int count = client.read(readBuffer);

if (count > 0) {
readBuffer.flip();

Charset charset = Charset.forName("utf-8");
String receivedMessage = String.valueOf(charset.decode(readBuffer).array());

System.out.println(client + ": " + receivedMessage);

String senderKey = null;

for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {
if (client == entry.getValue()) {
senderKey = entry.getKey();
break;
}
}

for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {
SocketChannel value = entry.getValue();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put((senderKey + ": " + receivedMessage).getBytes());
writeBuffer.flip();
value.write(writeBuffer);
}

}
}
} catch (Exception e) {
e.printStackTrace();
}

selectionKeys.clear();
});


} catch (Exception e) {
e.printStackTrace();
}
}


}
}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class NioClient {

public static void main(String[] args) throws IOException {
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);

Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8899));

while (true) {
selector.select();
Set<SelectionKey> keySet = selector.selectedKeys();

for (SelectionKey selectionKey : keySet) {
if (selectionKey.isConnectable()) {
SocketChannel client = (SocketChannel) selectionKey.channel();

if (client.isConnectionPending()) {
client.finishConnect();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put((LocalDateTime.now() + "连接成功").getBytes());
writeBuffer.flip();
client.write(writeBuffer);

ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
executorService.submit(() -> {
while (true) {
try {
writeBuffer.clear();
InputStreamReader input = new InputStreamReader(System.in);
BufferedReader br = new BufferedReader(input);

String sendMessage = br.readLine();
writeBuffer.put(sendMessage.getBytes());
writeBuffer.flip();
client.write(writeBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
client.register(selector, SelectionKey.OP_READ);
}
else if (selectionKey.isReadable()) {
SocketChannel client = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int count = client.read(readBuffer);
if (count > 0) {
String receiveMessage = new String(readBuffer.array(), 0, count);
System.out.println(receiveMessage);
}
}
}

keySet.clear();
}

} catch (Exception e) {
e.printStackTrace();
}
}
}

BIO、NIO、AIO对比

BIO NIO AIO
IO模型 同步阻塞 同步非阻塞 异步非阻塞
编程难度 简单 复杂 复杂
可靠性
吞吐量

Netty源码剖析

零拷贝深入剖析及用户空间与内核空间切换方式

NIO零拷贝

sendfile() 是一个用于在两个文件描述符之间直接传输数据的系统调用函数。它通常用于在网络编程中,将文件内容直接从磁盘传输到网络套接字,以提高传输效率。

sendfile() 函数使用了零拷贝(zero-copy)技术,因此它可以避免在内核空间和用户空间之间复制数据。这意味着,数据可以在内核空间中直接传输,从而提高了传输效率。

需要注意的是,sendfile() 函数只能在 Linux 系统上使用,并且只能用于传输普通文件,不能用于传输目录或管道等其他类型的文件。此外,它还有一些限制,如传输的文件大小不能超过 2GB 等。

  • 首先将磁盘中的文件读取到kernel buffer
  • kernel buffer对应的文件描述符(kernel buffer在内存中的地址和长度)写到socket buffer中,避免全部拷贝
  • 然后通过协议引擎通过gather操作从两个地方得到信息,然后直接把kernel buffer完成真正的数据发送到对应的服务器端

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NewIOServer {

public static void main(String[] args) throws IOException {
InetSocketAddress address = new InetSocketAddress(8899);

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();

serverSocket.setReuseAddress(true);
serverSocket.bind(address);

ByteBuffer byteBuffer = ByteBuffer.allocate(4096);

while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(true);

int readCount = 0;
while (-1 != readCount) {
readCount = socketChannel.read(byteBuffer);
byteBuffer.rewind();
}

}


}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class NewIOClient {

public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8899));
socketChannel.configureBlocking(true);

String fileName = "C:\\Users\\bottlecodes\\Desktop\\mariadb.gz";

FileChannel fileChannel = new FileInputStream(fileName).getChannel();

long startTime = System.currentTimeMillis();

long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);

}
}

FileChannel.transferTo()

从当前通道的文件中将字节传输到给定的可写字节通道中。该方法尝试从给定位置开始读取最多count个字节,并将它们写入目标通道。该方法的调用可能会传输所有请求的字节,也可能不会传输所有请求的字节,这取决于通道的性质和状态。如果当前通道的文件在给定位置处包含少于count个字节,或者目标通道是非阻塞的且其输出缓冲区中有少于count个字节可用,则传输的字节数少于请求的字节数。

此方法不修改当前通道的位置。如果给定位置大于文件的当前大小,则不会传输任何字节。如果目标通道具有位置,则从该位置开始写入字节,然后通过写入的字节数递增该位置。

此方法可能比从该通道读取并写入目标通道的简单循环更有效。许多操作系统可以直接从文件系统缓存中将字节传输到目标通道,而无需实际复制它们

NioEventLoopGroup源码分析

bossGroup主要用于接受客户端的连接

workerGroup完成真正用户请求处理的业务逻辑

EventLoopGroup

EventLoopGroup是Netty框架中的一个概念,它是用于管理EventLoop的一种抽象,可以看做是一个线程池,用于处理IO事件。在Netty中,所有的IO操作都是由EventLoop来处理的,而EventLoopGroup则负责管理这些EventLoop。

具体来说,EventLoopGroup包含一个或多个EventLoop,每个EventLoop都是一个独立的线程,用于处理IO事件。在Netty中,一个Channel通道通常会绑定一个EventLoop,所有的IO事件都由该EventLoop来处理。而EventLoopGroup则是负责分配这些Channel通道所需的EventLoop,并在需要时进行扩容或缩容。

EventLoopGroup通常分为两种类型:NioEventLoopGroup和EpollEventLoopGroup。其中,NioEventLoopGroup基于Java的NIO API实现,而EpollEventLoopGroup则基于Linux的Epoll机制实现,两者的主要区别在于底层实现的不同。在使用Netty框架时,可以根据实际需求选择不同的EventLoopGroup类型。

总之,EventLoopGroup是Netty框架中的一个重要概念,它负责管理和分配EventLoop,从而实现高效的IO事件处理。

该接口继承自EventExecutorGroup接口,用于处理I/O事件。EventLoopGroup接口具有以下方法:

  1. next():返回下一个要使用的EventLoop。
  2. register(Channel channel):将一个Channel注册到EventLoop中,返回ChannelFuture对象,用于接收注册完成的通知。
  3. register(ChannelPromise promise):将一个ChannelPromise对象注册到EventLoop中,返回ChannelFuture对象,用于接收注册完成的通知。
  4. register(Channel channel, ChannelPromise promise):将一个Channel注册到EventLoop中,并使用一个ChannelPromise对象接收注册完成的通知。该方法已经被标记为过时,建议使用register(ChannelPromise promise)方法代替。

在Netty中,EventLoopGroup是一个重要的概念,用于管理EventLoop,实现高效的I/O事件处理。该接口定义了EventLoopGroup的基本操作,包括获取下一个可用的EventLoop、注册Channel等。

ChannelFuture

ChannelFuture是Netty网络编程框架中的一个重要概念。在Netty中,所有的I/O操作都是异步的,包括连接的建立、数据的读写等操作。为了方便用户处理这些异步操作,Netty提供了ChannelFuture这个类。

ChannelFuture表示一个异步操作的结果。当一个异步操作被触发后,会返回一个ChannelFuture对象,通过这个对象可以获取异步操作的结果或者注册一个监听器来在异步操作完成后得到通知。

例如,在Netty中向远程服务器发送数据时,调用write()方法会返回一个ChannelFuture对象,通过这个对象可以等待写操作完成或者注册一个监听器在写操作完成后得到通知。

总的来说,ChannelFuture提供了一种方便的方式来处理Netty中的异步操作,并且可以避免阻塞线程等待异步操作完成。

NioEventLoopGroup

Executor

在计算机编程中,Executor(执行者)是一种用于执行异步任务的接口或抽象类。它允许您将任务提交给执行者并在后台线程或进程中执行这些任务。这样,您可以在程序的其他部分继续执行其他任务,而不必等待异步任务完成。

在Java编程语言中,Executor框架是一组接口和类,用于管理线程池和任务调度。它提供了一个Executor接口,该接口定义了一个execute()方法,该方法接受一个Runnable对象并将其提交给线程池中的空闲线程进行执行。Executor框架还提供了一些预定义的线程池,例如FixedThreadPool、CachedThreadPool和ScheduledThreadPool,您可以使用这些线程池来执行任务。

Executor框架的一个重要优点是它提供了一种可以管理线程生命周期的方式。当您使用线程池时,线程可以在完成任务后被重复使用,而不是在每次需要执行任务时都创建一个新线程。这样,线程池可以减少线程创建和销毁的开销,并提高程序的性能和可伸缩性。

除了Java Executor框架之外,其他编程语言和平台也提供了类似的Executor接口和框架,例如Python中的concurrent.futures模块和.NET平台中的Task Parallel Library(TPL)。这些框架可以帮助您轻松地管理异步任务,提高程序的性能和可伸缩性。

Netty剖析

poll与epoll函数

NIO中实现多路复用的核心类是Selector, 当多路复用器Selector调用select方法时,将会查找发生事件的channel,问题是,该如何在注册到selector上的channel中找到哪些channel发生了事件,此时NIO不同版本有不同的做法

select(早期版本) poll epoll(1.5以后)
操作方式 遍历 遍历 回调
底层实现 数组 链表 哈希表
IO效率 线性遍历数组中所有的channel,性能较差 线性遍历链表中所有的channel,性能较差 由操作系统将发生事件的channel存入到服务端的就绪事件列表中,selector直接从就绪事件列表中获得发生事件的channel,而不需要遍历所有的channel
最大连接 有上限 无上限 无上限

Reactor模型

传统的阻塞IO模型

在传统的BIO模型中,服务端的一个线程必须处理完一个客户端的所有请求后才能处理另外一个客户端的请求

基于事件响应式的基础Reactor模型

由单线程逐一处理来自多个客户端的不同事件,而不需要等待某一客户端的请求全部执行完才能处理另一个客户端的请求

多线程的Reactor模型

在基础Reactor模型中,Reactor在处理某一个客户端的读请求时,其它请求依然需要阻塞等待,如何提升这一块的并发性呢,就可以使用线程池来提升处理某一个具体业务的并发操作

多主多从的Reactor模型

如果只把业务处理模块交给多线程来解决,但是在实际场景中,有可能会面临成千上万客户端的连接,而这些客户端的连接请求只能等待Reactor完成业务处理后才能被处理。此时创建一个单独的Reactor专门负责处理客户端的连接请求,而且这样的Reactor也可以支持多线程,那么就能解决海量客户端连接的应用场景。

Netty的线程模型

基于Reactor模型的Netty线程模型

Netty核心组件

Bootstrap和ServerBootstrap

Bootstrap是 Netty 的启动程序,一个Netty应用通常是由一个Bootstrap开始。Bootstrap的主要作用是配置Netty程序,串联Netty各个组件

  • Bootstrap: 客户端的启动程序
  • ServerBootstrap: 服务端的启动程序

Future和ChannelFulture

Netty 中的所有操作都是异步的,即不能立即得知消息是否被正确处理。因此需要通过FutureChannelFulture来注册监听器,当操作执行成功或者失败来调用具体的监听器。

Future通过sync方法获得同步执行的效果。

ChannelFuture是Future的子类,提供了针对于Channel的异步监听操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ChannelFuture channelFuture = bootstrap.bind(9090);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("监听9090 success!");
} else {
System.out.println("监听9090 failed !");
}
}
});

System.out.println("=======================");

打印结果如图:

Channel

Java NIO的Channel是Java NIO中的核心概念之一,用于在非阻塞IO模式下进行数据的读取和写入。它提供了一种将数据从Buffer中读取到Channel中,或将数据从Channel中写入到Buffer中的方式。在Channel中,可以注册感兴趣的事件(如读事件和写事件),并通过Selector来监听这些事件的发生,从而实现非阻塞IO的操作。

Netty中的Channel则是在Java NIO的Channel的基础上进一步封装的,它提供了更为灵活的API和更高级别的抽象,以简化网络编程的实现。Netty的Channel支持多种协议,如TCP、UDP和HTTP等,并提供了一些高级别的特性,如SSL/TLS安全传输、多路复用、拆包粘包处理等。Netty的Channel还支持异步IO模式,能够更好地支持高并发场景。

此外,Netty的Channel还具有更加方便的事件处理机制,通过ChannelPipeline将Channel的读写事件与数据处理逻辑进行解耦,从而使得代码更加清晰易懂。此外,Netty的Channel还提供了更为灵活的IO线程池机制,能够更好地控制IO线程的数量和行为。

总之,Java NIO的Channel和Netty的Channel都是用于实现非阻塞IO操作的关键组件,Netty的Channel在Java NIO的基础上提供了更为高级别和灵活的API和抽象,能够更好地支持高并发和多协议的网络编程。

Channel是Netty网络通信的重要组件,用于执行网络IO操作。Channel具备以下能力:

  • 获得当前网络连接通道的状态
  • 网络连接的配置参数
  • 提供异步的网络IO操作,比如建立连接、绑定端口、读写操作等
  • 获得ChannelFuture实例,并注册监听器到ChannelFuture上,用于监听IO操作的成功、失败、取消时的事件回调。

Channel具体的实现类有以下几种:

  • NioSocketChannel : 异步的客户端TCP Socket连接通道
  • NioServerSocketChannel : 异步的服务端TCP Socket连接通道
  • NioDatagramChannel : 异步的UDP连接通道
  • NioSctpChannel : 异步的客户端Sctp连接通道
  • NioSctpServerChannel : 异步的服务端Sctp连接通道

Selector

Selector是Java NIO中的关键组件之一,用于实现非阻塞IO操作。它可以监听多个Channel上的事件,如读事件、写事件和连接事件等,并在事件发生时通知应用程序进行相应的处理。

Selector的工作流程如下:

  1. 创建Selector对象,并将其注册到一个或多个Channel中。
  2. Selector会不断地轮询注册的Channel,检查是否有事件发生。
  3. 当某个Channel上有感兴趣的事件发生时,该Channel会被Selector标记为“就绪状态”。
  4. Selector将就绪状态的Channel返回给应用程序,并通知应用程序进行相应的处理。
  5. 应用程序对就绪状态的Channel进行操作,如读取数据或写入数据等。
  6. 应用程序处理完毕后,需要手动将Channel重新注册到Selector中,以便继续监听该Channel上的事件。

Selector的优点在于可以使用较少的线程来处理多个Channel的IO操作,从而提高程序的并发处理能力。在高并发场景下,Selector可以帮助应用程序充分利用系统资源,提高IO操作的效率。

需要注意的是,Selector在不同操作系统上的实现可能存在差异,因此在使用Selector时需要注意遵守操作系统相关的规则和限制。此外,在使用Selector时也需要注意对线程安全和异常处理等问题进行充分的考虑。

通过Selector多路复用器可以实现IO的多路复用。Selector可以监听多个连接的Channel事件,同时可以不断查询注册的Channel是否处于就绪状态,实现一个线程可以高效的管理多个Channel

NioEventLoop

NioEventLoop是Netty中的一个重要组件,它负责处理IO事件,并在内部使用NIO的Selector来实现事件的异步处理和调度。NioEventLoop可以看作是一个执行IO操作的事件循环器,它为每个连接维护了一个EventLoop对象,并通过轮询的方式来检测IO事件的到来,然后异步地将事件分派给事件处理器进行处理。

NioEventLoop的主要作用包括:

  1. 处理IO事件:NioEventLoop通过调用底层的Selector来检测IO事件的到来,并将事件分派给对应的Channel进行处理。
  2. 管理任务队列:NioEventLoop内部维护了一个任务队列,可以异步地执行任务,包括处理IO事件、定时任务等。
  3. 实现多线程处理:NioEventLoop可以在多个线程之间共享,通过实现事件分派和任务执行的多线程处理,提高了系统的并发性能。

需要注意的是,NioEventLoop的实例通常与Channel绑定,每个Channel都有自己的EventLoop,以便于将事件分派到正确的EventLoop进行处理。同时,NioEventLoop也支持使用多个线程来处理IO事件,以提高系统的吞吐量和响应性能。

除了NioEventLoop之外,Netty中还有一些其他的组件和概念,如Channel、Pipeline、Handler等,它们一起构成了Netty框架的核心,为开发者提供了高效的网络编程解决方案。

NioEventLoop内部维护了一个线程和任务队列,支持异步提交执行任务。当线程启动时会调用NioEventLooprun 方法来执行 IO任务或 非IO任务

  • io任务: 如accept、connect、read、write事件等,由processSelectedKeys方法触发
  • 非io任务: 如register0、bind0等任务将会被添加到taskQueue任务队列中,由runAllTasks方法触发

NioEventLoopGroup

NioEventLoopGroup是Netty中一个线程池组,它管理着一组NioEventLoop,每个NioEventLoop都是一个事件循环器,负责处理IO事件和任务。NioEventLoopGroup在Netty应用程序中扮演着非常重要的角色,它能够帮助开发者高效地管理和调度线程池,以提高系统的性能和并发能力。

NioEventLoopGroup的主要作用包括:

  1. 创建和管理多个NioEventLoop:NioEventLoopGroup可以创建多个NioEventLoop,每个NioEventLoop都是一个事件循环器,负责处理IO事件和任务。通过NioEventLoopGroup,可以有效地管理和调度NioEventLoop,以提高系统的性能和并发能力。
  2. 线程池的配置:NioEventLoopGroup可以配置线程池的大小和属性,包括线程池的最大和最小线程数、线程的存活时间、线程池队列的大小等。
  3. EventLoop和Channel的绑定:NioEventLoopGroup可以将每个Channel与对应的EventLoop进行绑定,以便将事件分派给正确的EventLoop进行处理。
  4. 线程的安全和可控:NioEventLoopGroup可以有效地控制线程的安全和可控,避免出现线程安全问题和资源竞争问题。

需要注意的是,NioEventLoopGroup在创建时需要指定线程数,通常建议设置为机器CPU核心数的2倍,以提高系统的性能和并发能力。同时,在使用NioEventLoopGroup时,需要注意线程池的配置和性能优化,以充分利用线程池的资源和提高系统的吞吐量和响应性能

管理NioEventLoop的生命周期,可以理解为线程池,内部维护了一组线程。每个线程(即NioEventLoop)负责处理多个Channel上的事件。注意,一个Channel只能对应一个线程,NioEventLoop和Channel它们是一对多的关系

ByteBuf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.sineagle.netty.start;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

/**
* ByteBuf操作
*/
public class ByteBufDemo {

public static void main(String[] args) {
// 创建一个有10容量数组的ByteBuf
ByteBuf buf = Unpooled.buffer(10);
System.out.println("init buf : " + buf);
// 添加数据
for (int i = 0; i < 5; i ++) {
buf.writeByte(i);
}
System.out.println("after write: " + buf);
// 按照索引读数据
for (int i = 0; i < 3; i ++) {
System.out.println(buf.getByte(i));
}
System.out.println("after get: " + buf);
// 读取数据
for (int i = 0; i < 3; i ++) {
System.out.println(buf.readByte());
}
System.out.println("after read: " + buf);

System.out.println("read index: " + buf.readerIndex());
System.out.println("write index: " + buf.writerIndex());
System.out.println("capacity index: " + buf.capacity());

}
}

ChannelHandler

在Netty中,ChannelHandler是用于处理Channel事件的基本组件。它是一个接口,定义了一组处理Channel事件的方法,可以根据具体需求来实现这些方法。

在Netty中,每个Channel都有一个与之关联的ChannelPipeline,而ChannelPipeline又由一系列的ChannelHandler组成。当一个事件触发时,ChannelPipeline会按照处理器链的顺序依次调用各个ChannelHandler的对应方法来处理事件。

常用的ChannelHandler接口包括以下几个:

  1. ChannelInboundHandler:用于处理入站事件,例如Channel连接建立、数据读取、消息解码等。
  2. ChannelOutboundHandler:用于处理出站事件,例如数据编码、消息发送等。
  3. ChannelDuplexHandler:继承自ChannelInboundHandler和ChannelOutboundHandler,用于同时处理入站和出站事件

在实现ChannelHandler接口时,需要实现其定义的一些方法,例如对于ChannelInboundHandler接口,常用的方法包括:

  1. channelRegistered(ChannelHandlerContext ctx):当Channel注册到EventLoop时被调用。
  2. channelActive(ChannelHandlerContext ctx):当Channel连接建立时被调用。
  3. channelRead(ChannelHandlerContext ctx, Object msg):当从Channel读取数据时被调用。
  4. exceptionCaught(ChannelHandlerContext ctx, Throwable cause):当ChannelPipeline中的某个Handler抛出异常时被调用。
  5. channelInactive(ChannelHandlerContext ctx):当Channel连接关闭时被调用。

通过实现ChannelHandler接口,我们可以自定义自己的事件处理逻辑,从而实现对Channel事件的灵活处理。同时,Netty还提供了一些现成的ChannelHandler实现类,例如ChannelInitializer、ChannelInboundHandlerAdapter等,方便我们进行事件处理。

ChannelHandler用于处理拦截IO事件,往往在ChannelHandler中可以加入业务逻辑处理。ChannelHandler执行完成后会将IO事件转发到ChannelPipline中的下一个处理程序

ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapter是Netty中的一个ChannelInboundHandler的抽象实现类,它提供了一些默认的实现方法,使得我们可以在不需要实现所有方法的情况下定制自己的ChannelInboundHandler。

在Netty中,ChannelInboundHandler主要负责处理入站的事件,如连接建立、数据读取、消息解码等。ChannelInboundHandlerAdapter实现了ChannelInboundHandler接口,并提供了默认的实现方法,其中最常用的方法包括:

  1. channelRead(ChannelHandlerContext ctx, Object msg):当从Channel读取数据时被调用,可以在这个方法中对读取到的数据进行处理。
  2. exceptionCaught(ChannelHandlerContext ctx, Throwable cause):当ChannelPipeline中的某个Handler抛出异常时被调用,可以在这个方法中对异常进行处理。
  3. channelActive(ChannelHandlerContext ctx):当Channel连接建立时被调用,可以在这个方法中执行连接建立后的逻辑。
  4. channelInactive(ChannelHandlerContext ctx):当Channel连接关闭时被调用,可以在这个方法中执行连接关闭后的逻辑。

使用ChannelInboundHandlerAdapter时,我们可以只重写需要的方法,而不需要实现所有的方法。同时,我们还可以通过调用super方法来获得ChannelInboundHandlerAdapter提供的默认实现。例如,如果我们只需要处理channelRead事件,可以在我们的Handler中重写channelRead方法,并调用super.channelRead(ctx, msg)来获得默认实现。

ChannelHandlerContext

保存Channel相关的上下文,并关联一个ChannelHandler对象

在Netty中,ChannelHandlerContext表示一个ChannelHandler在ChannelPipeline中的上下文信息。每个ChannelHandler都有一个与之关联的ChannelHandlerContext对象,用于表示处理器在处理事件时的上下文信息。ChannelHandlerContext对象包含了与处理器相关的一些信息,例如当前处理器的名称、所属的ChannelPipeline、与之关联的Channel等等。

ChannelHandlerContext主要提供了以下几个方法:

  1. fireChannelRegistered():将channelRegistered事件转发给下一个ChannelHandler。
  2. fireChannelUnregistered():将channelUnregistered事件转发给下一个ChannelHandler。
  3. fireChannelActive():将channelActive事件转发给下一个ChannelHandler。
  4. fireChannelInactive():将channelInactive事件转发给下一个ChannelHandler。
  5. fireChannelRead(Object msg):将channelRead事件转发给下一个ChannelHandler。
  6. fireChannelReadComplete():将channelReadComplete事件转发给下一个ChannelHandler。
  7. fireExceptionCaught(Throwable cause):将exceptionCaught事件转发给下一个ChannelHandler。
  8. channel():返回与之关联的Channel对象。
  9. pipeline():返回与之关联的ChannelPipeline对象。
  10. name():返回当前ChannelHandler的名称。

通过ChannelHandlerContext,我们可以获得与处理器相关的一些信息,并且可以通过调用fireXXX()方法将事件传递给下一个ChannelHandler,从而实现事件处理的传递。同时,我们还可以通过调用channel()和pipeline()方法获得与当前ChannelHandler相关联的Channel和ChannelPipeline对象,从而实现对它们的操作。

总之,ChannelHandlerContext是Netty中非常重要的一个概念,它提供了与处理器相关的一些信息,并且通过fireXXX()方法将事件传递给下一个ChannelHandler,实现事件处理的传递。

ChannelPipeline

在Netty中,ChannelPipeline是一个处理入站和出站事件的事件处理器链。每个Channel都有一个与之相关联的ChannelPipeline对象,用于处理该Channel的所有事件,包括读取数据、写入数据、异常处理等等。

ChannelPipeline是由一系列的ChannelHandler对象构成的,这些ChannelHandler对象按照添加的顺序形成了一个链式结构。当一个事件触发时,ChannelPipeline会按照处理器链的顺序依次调用各个ChannelHandler的对应方法来处理事件。

在ChannelPipeline中,每个ChannelHandler都有一个与之关联的ChannelHandlerContext对象,用于表示处理器在处理事件时的上下文信息。ChannelHandlerContext对象包含了与处理器相关的一些信息,例如当前处理器的名称、所属的ChannelPipeline、与之关联的Channel等等。

当一个Channel被创建时,它会自动创建一个空的ChannelPipeline对象,并将该ChannelPipeline对象与该Channel相关联。我们可以通过调用ChannelPipeline的addLast()方法向ChannelPipeline中添加我们自定义的ChannelHandler对象,从而实现自定义的事件处理逻辑。

总之,ChannelPipeline是Netty中非常重要的一个概念,它为Netty提供了一种灵活的事件处理机制,可以方便地实现不同类型的事件处理逻辑。

ChannelPipeline是一个双向链表,其中保存着多个ChannelHandler。ChannelPipeline实现了一种高级形式的过滤模式,在IO操作时发生的入站和出站事件,会导致ChannelPipeline中的多个ChannelHandler被依次调用。

Netty编解码

Netty编解码相关的组件:Channel、ChannelHandler、ChannelPipe

入站和出站

ChannelHandler用来处理入站和出站数据。ChannelHandler的实现类ChannelInboundHandlerAdapter表示入站程序,ChannelHandler的实现类ChannelOutboundHandlerAdapter表示出站程序

一个Channel包含一个ChannelPipeline,而ChannelPipline维护着由多个ChannelHandlerContext组成的双向链表,而且每个ChannelHandlerContext内包含一个ChannelHandler

  • 入站:如果站在服务端角度,对应着ChannelPipeline中链表的尾部后面。如果时接受数据,则为入站事件,即数据会从head到tail的过程,会依次经历多个ChannelHandler
  • 出站:如果发生的是发送数据的事件,则数据从tail尾部发送到head的头部,这个过程会经历多个ChannelHandler
  • 入站和出站两种类型互不打扰

一个例子

NettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.sineagle.coder;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
* Netty服务端
*/
public class NettyServer {

public static void main(String[] args) throws InterruptedException {
// 创建一个只处理连接请求的线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(10);
// 创建只处理客户端读写业务的线程组
EventLoopGroup workGroup = new NioEventLoopGroup(10);

try {
// 创建服务端启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置参数
bootstrap.group(bossGroup, workGroup)
// 使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 配置用于存放因没有空闲线程导致连接请求暂存到队列中的队列长度。
.option(ChannelOption.SO_BACKLOG, 1024)
// 创建通道初始化的对象并配置该对象, 向该对象添加处理器来实现具体的业务处理
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加解码器handler
pipeline.addLast(new StringDecoder());
// 添加编码器handler
pipeline.addLast(new StringEncoder());
// 添加处理器, 处理器里面是真正处理业务的
pipeline.addLast(new NettyServerHandler());

}
});
System.out.println("Netty服务端启动啦!");
// 同步阻塞启动服务端
ChannelFuture channelFuture = bootstrap.bind(9090).sync();
//ChannelFuture channelFuture = bootstrap.bind(9090);
//channelFuture.addListener(new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture channelFuture) throws Exception {
// if (channelFuture.isSuccess()) {
// System.out.println("监听9090 success!");
// } else {
// System.out.println("监听9090 failed !");
// }
// }
//});
System.out.println("=======================");
// 只要服务没关闭,该方法会一直阻塞
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 断开所有连接并清理内存
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}

}
}

NettyServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.sineagle.coder;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("从客户端读到的数据:" + msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("服务端发送的测试数据");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

NettyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.sineagle.coder;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyClient {

public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
System.out.println("Netty客户端启动啦!");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}

NettyClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.sineagle.coder;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("客户端发送的测试数据!!");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("来自服务端 " + ctx.channel().remoteAddress() + " 的消息:" + msg);
}
}

对象编解码

入站和出站操作时,数据需要被编解码才能被正常处理。Netty提供了一些编解码器应对不同对象的编解码:

对象类型 编码器 解码器
String StringEncoder StringDecoder
Object ObjectEncoder ObjectDecoder

Student

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.sineagle.coder;

import java.io.Serializable;

public class Student implements Serializable {
private Long id;
private String name;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}

public Student(Long id, String name) {
this.id = id;
this.name = name;
}
}

NettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.sineagle.coder;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
* Netty服务端
*/
public class NettyServer {

public static void main(String[] args) throws InterruptedException {
// 创建一个只处理连接请求的线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(10);
// 创建只处理客户端读写业务的线程组
EventLoopGroup workGroup = new NioEventLoopGroup(10);

try {
// 创建服务端启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置参数
bootstrap.group(bossGroup, workGroup)
// 使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 配置用于存放因没有空闲线程导致连接请求暂存到队列中的队列长度。
.option(ChannelOption.SO_BACKLOG, 1024)
// 创建通道初始化的对象并配置该对象, 向该对象添加处理器来实现具体的业务处理
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
// 添加解码器handler
//pipeline.addLast(new StringDecoder());
// 添加编码器handler
//pipeline.addLast(new StringEncoder());
// 添加处理器, 处理器里面是真正处理业务的
pipeline.addLast(new NettyServerHandler());

}
});
System.out.println("Netty服务端启动啦!");
// 同步阻塞启动服务端
ChannelFuture channelFuture = bootstrap.bind(9090).sync();
//ChannelFuture channelFuture = bootstrap.bind(9090);
//channelFuture.addListener(new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture channelFuture) throws Exception {
// if (channelFuture.isSuccess()) {
// System.out.println("监听9090 success!");
// } else {
// System.out.println("监听9090 failed !");
// }
// }
//});
System.out.println("=======================");
// 只要服务没关闭,该方法会一直阻塞
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 断开所有连接并清理内存
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}

}
}

NettyServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.sineagle.coder;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("从客户端读到的数据:" + (Student) msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("服务端发送的测试数据");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

NettyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.sineagle.coder;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyClient {

public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//pipeline.addLast(new StringDecoder());
//pipeline.addLast(new StringEncoder());
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
System.out.println("Netty客户端启动啦!");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}

NettyClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.sineagle.coder;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//ctx.writeAndFlush("客户端发送的测试数据!!");
ctx.writeAndFlush(new Student(1001L, "小明"));
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("来自服务端 " + ctx.channel().remoteAddress() + " 的消息:" + msg);
}
}

使用Protobuf提高序列化性能

为了提升性能,可以使用Protobuf或者Protostuff对数据进行序列化和反序列化

  • 引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!--protostuff-->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-api</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.11</version>
</dependency>

服务端handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.sineagle.coder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//System.out.println("从客户端读到的数据:" + (Student) msg);
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
Student student = ProtostuffUtil.deserialize(bytes, Student.class);
System.out.println("从客户端读取到的数据: " + student);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("服务端发送的测试数据");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

客户端handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.sineagle.coder;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//ctx.writeAndFlush("客户端发送的测试数据!!");
//ctx.writeAndFlush(new Student(1001L, "小明"));
// 使用protobuf编码对象
ByteBuf buf = Unpooled.copiedBuffer(ProtostuffUtil.serialize(new Student(1001L, "小明")));
ctx.writeAndFlush(buf);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("来自服务端 " + ctx.channel().remoteAddress() + " 的消息:" + msg);
}
}

Netty实现群聊系统

ChatServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.sineagle.chat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ChatServer {

public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 获得pipeline
ChannelPipeline pipeline = channel.pipeline();
// 添加handler
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加业务处理handler
pipeline.addLast(new ChatServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
System.out.println("聊天室启动啦 !");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}


}
}

ChatServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.sineagle.chat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
* SimpleChannelInboundHandler和直接继承ChannelInboundHandlerAdapter的直接的区别:
* SimpleChannelInboundHandler可以在读完数据后清空掉msg占用的内存空间
*/
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

// 存放channel的容器,而且可以执行对每个channel进行操作的任务,任务由GlobalEventExecutor提供的单线程去执行
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

/**
* 有新的客户端(channel)连接了,将该客户端的上线消息广播给所有其它客户端
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 获得客户端的channel
Channel channel = ctx.channel();
String message = "客户端-" + channel.remoteAddress() + "于" + sdf.format(new Date()) + "上线了!\n";
System.out.println(message);
// 得到其它的客户端的channel,向其它客户端发送该客户端的上线消息
channelGroup.writeAndFlush(message);
// 加入到channelGroup中
channelGroup.add(channel);
}

/**
* 客户端下线则广播给其它客户端
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 生成一个下线的信息
String message = "客户端-" + channel.remoteAddress() + "下线了\n";
System.out.println(message);
// 广播给其它客户端
channelGroup.writeAndFlush(message);
//channelGroup.remove(channel);
}

/**
* 具体的读数据业务
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 获得当前发消息的客户端的channel
Channel channel = ctx.channel();
// 遍历所有的channel
channelGroup.forEach(ch -> {
if (channel != ch) {
// 发给其它客户端
ch.writeAndFlush("客户端-" + channel.remoteAddress() + "于" + sdf.format(new Date()) + "说:" + msg + "\n");
} else {
// 发给当前发消息的客户端(自己)
ch.writeAndFlush("我于: " + sdf.format(new Date()) + "说:" + msg + "\n");
}
});
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

ChatClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.sineagle.chat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

public class ClientClient {

public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
Channel channel = channelFuture.channel();
// 发送消息
System.out.println("欢迎进入SEG聊天室");
Scanner in = new Scanner(System.in);
while (in.hasNextLine()) {
String message = in.nextLine();
channel.writeAndFlush(message);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}

}
}

ChatClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
package com.sineagle.chat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ChatClientHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s.trim());
}
}

粘包和拆包

当在TCP协议中发送数据时,TCP会把数据分割成多个TCP数据段。这些数据段可以被看作是独立的数据包,但是在网络传输过程中,它们可能会被合并成一个更大的数据包,也可能会被拆分成更小的数据包。这就是粘包和拆包的基本原因。

具体来说,粘包指的是当发送方连续发送多个小的数据包时,底层的TCP传输协议可能会将这些数据包合并成一个更大的数据包。这种情况下,接收方无法区分这些小的数据包,因为它们被组合成了一个大的数据包。这会导致接收方在解析数据包时出现问题。

拆包则是指当发送方发送一个大的数据包时,底层的TCP传输协议可能会将这个大的数据包分割成多个小的数据包。这种情况下,接收方可能无法完整地接收到这些小的数据包,因为它们被拆分成了多个部分。这同样会导致接收方在解析数据包时出现问题。

解决这些问题的方法有很多种,其中一种常见的方法是在应用层上添加特殊的分隔符或长度信息。例如,可以在每个数据包的末尾添加一个特殊字符或字符串,作为分隔符,这样接收方就可以根据这个分隔符来判断每个数据包的边界。另外一种方法是在每个数据包的开头添加一个特殊的长度字段,这样接收方就可以根据长度信息来正确地解析每个数据包。此外,还可以使用固定长度的数据包来避免粘包和拆包的问题。

TCP协议特点

作为一个流式传输协议,数据在TCP中传输是没有边界的。也就是说,客户端发送多条数据,有可能会被认为是一条数据。或者,客户端发送一条数据,有可能会被拆分成多条数据。这是由于TCP协议并不了解上层业务数据的具体含义,在使用TCP协议传输数据时,是根据TCP缓冲区的实际情况进行数据包的划分。

什么是粘包和拆包

  • 粘包

    缓冲区数据量满了就会作为整体来发送,而这整体中包含了多条业务数据,拿这种情况就是粘包

  • 拆包

    在缓冲区数据量满了的时候,把一条数据分成两次缓冲区发送,这种情况就是拆包

分包解决方案

Netty是一个高性能、异步、事件驱动的网络编程框架,它提供了丰富的功能和工具来帮助我们解决TCP协议中的粘包和拆包问题。以下是Netty解决粘包和拆包问题的一些常见方法:

  1. 使用DelimiterBasedFrameDecoder:这是一个Netty提供的解码器,它基于特定的分隔符对接收到的数据包进行解码。我们可以在使用DelimiterBasedFrameDecoder时指定不同的分隔符,例如换行符、回车符、空格等等。当接收到的数据包包含指定的分隔符时,DelimiterBasedFrameDecoder会自动将数据包分隔开,然后传递给下一个处理器进行处理。

  2. 使用LengthFieldBasedFrameDecoder:这也是一个Netty提供的解码器,它基于长度信息对接收到的数据包进行解码。我们可以在使用LengthFieldBasedFrameDecoder时指定长度信息所在的位置、长度信息的字节长度以及数据包中实际数据的偏移量和长度。当接收到的数据包包含长度信息时,LengthFieldBasedFrameDecoder会根据长度信息自动将数据包分隔开,并将分隔后的数据包传递给下一个处理器进行处理。

  3. 自定义解码器:除了使用Netty提供的解码器外,我们也可以自定义解码器来解决粘包和拆包问题。在自定义解码器中,我们可以根据应用层协议中的格式,将接收到的数据包分解成多个小的数据包,然后将它们依次传递给下一个处理器进行处理。

  4. 使用ChannelHandler中的ByteBuffer:在Netty中,ChannelHandler中的ByteBuffer可以用来缓存接收到的数据,当缓存中的数据量达到一定的阈值时,我们可以将缓存中的数据分解成多个小的数据包,并将它们依次传递给下一个处理器进行处理。

总的来说,Netty提供了多种解决TCP协议中粘包和拆包问题的方法,开发人员可以根据具体的需求和情况选择合适的方法来解决这些问题。同时,Netty还提供了丰富的文档和示例代码,方便开发人员快速入门和掌握Netty的使用。

自定义分包解码器

整个流程可以通过以下部分:

  • 消息:客户端发送的消息内容
  • 消息协议:这一次消息包含两个部分,即消息长度和消息内容本身
  • 自定义消息解码器:消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息长度和消息内容的消息
  • 自定义消息解码器:消息解码器根据消息协议的消息长度,来获得指定长度的消息长度

具体代码的实现如下:

MessageProtocol

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.sineagle.packet;

/**
* 自定义的消息协议
*/

public class MessageProtocol {
// 消息的长度
private int length;
// 消息的内容
private byte[] content;

public int getLength() {
return length;
}

public void setLength(int length) {
this.length = length;
}

public byte[] getContent() {
return content;
}

public void setContent(byte[] content) {
this.content = content;
}
}

NettyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.sineagle.packet;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {

public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup(1);

try {
Bootstrap bootstrap = new Bootstrap();

// 设置相关的参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 添加处理器,分包编码器
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new NettyMessageClientHandler());
}
});
System.out.println("客户端启动啦!");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}

}
}

NettyMessageClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.sineagle.packet;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.StandardCharsets;

public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {

// 连接通道创建后要向服务端发送消息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 200; i++) {
String msg = "千峰校验院";
// 创建协议对象
MessageProtocol messageProtocol = new MessageProtocol();
// 封装长度和内容
byte[] content = msg.getBytes(StandardCharsets.UTF_8);
messageProtocol.setLength(content.length);
messageProtocol.setContent(content);

// 发送消息协议对象,注意此时ctx只能发送ByteBuf数据,因此需要用编码器把它编码成ByteBuf数据
ctx.writeAndFlush(messageProtocol);
}
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol) throws Exception {

}
}

MessageEncoder

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.sineagle.packet;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
out.writeInt(msg.getLength());
out.writeBytes(msg.getContent());
}
}

NettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.sineagle.packet;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 添加解码器
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new NettyMessageServerHandler());
}
});
System.out.println("Netty服务端启动啦!");
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

MessageDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.sineagle.packet;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
* 自定义解码器
*/
public class MessageDecoder extends ByteToMessageDecoder {
private int length = 0;

/**
* @param ctx
* @param in 客户端发送过来的MessageProtocol编码后的ByteBuf数据
* @param out out里的数据会被放行到下一个handler, 把解码出来的MessageProtocol放到out里面
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("ByteBuf: " + in);
// 获得前面4个字节的数据== 描述实际内容的长度
if (in.readableBytes() >= 4) {
// ByteBuf里面可能有MessageProtocol数据
if (length == 0) {
length = in.readInt();
}
if (in.readableBytes() < length) {
// 说明数据还没到齐,等待下一次调用decode
System.out.println("当前数据量不够,继续等待");
return;
}
// 可读数据量 >= length ==>意味着这一次MessageProtocol中的内容已经到齐了
// 创建一个指定length长度的字节数组
byte[] content = new byte[length];
// 把ByteBuf里的指定长度的数据读到content中
in.readBytes(content);
// 创建MessageProtocol对象并赋值
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLength(length);
messageProtocol.setContent(content);
// 传递给下一个handler
out.add(messageProtocol);
// 重置length
length = 0;
}
}
}

NettyMessageServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.sineagle.packet;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.StandardCharsets;

public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
System.out.println("=========== 服务器收到的数据 =============");
System.out.println("消息的长度: " + msg.getLength());
System.out.println("消息的内容: " + new String(msg.getContent(), StandardCharsets.UTF_8));
}
}

Netty心跳机制

在分布式系统中,心跳机制常常在注册中心组件中提及,比如ZookeeperEurekaNacos等,通过维护客户端的心跳,来判断客户端是否正常在线。如果客户端达到超时次数等预设条件时,服务端将释放客户端的连接资源。

NettyTCP的长连接中,客户端定期向服务端发送一种特殊的数据包,告知对方自己正常在线,以确保TCP连接的有效性。Netty实现心跳的关键是IdleStateHandler

IdleStateHandler类描述三种空闲的状态:

  • 读空闲:在指定的时间间隔内没有从Channel中读取数据,将会创建状态为READ_IDLEIdleStateEvent对象
  • 写空闲:在指定的时间间隔内没有数据写入到Channel中,将会创建状态为WRITE_IDLEIdleStateEvent对象
  • 读写空闲:在指定的时间间隔内Channel中没有发生读写操作,将会创建ALL_IDLEIdleStateEvent对象

创建IdleStateHandler对象时,IdleStateHandler会初始化定时任务的线程,用于在指定延迟时间后执行相关的处理。该线程将满足条件的超时事件IdleStateEvent对象传递给pipeline中的下一个handler

NettyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.sineagle.heartbeat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Random;

public class NettyClient {

public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup(1);

try {
Bootstrap bootstrap = new Bootstrap();

// 设置参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 添加编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加业务处理器
pipeline.addLast(new NettyHeartBeatClientHandler());
}
});
System.out.println("客户端启动啦!");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
Channel channel = channelFuture.channel();
// 模拟向服务端发送心跳数据
String packet = "heartbeat packet";
Random random = new Random();
while(channel.isActive()) {
// 随机间隔等待
int num = random.nextInt(10);
Thread.sleep(20 * 1000);
channel.writeAndFlush(packet);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}

NettyHeartBeatClientHandler

1
2
3
4
5
6
7
8
9
10
11
package com.sineagle.heartbeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class NettyHeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println("客户端收到的数据:" + msg);
}
}

NettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.sineagle.heartbeat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

public class NettyServer {

public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
/**
* IdleStateHandler处理器会在服务端发现有超过3s没有发生读时间的话会触发超时事件
* 创建出IdleStateEvent对象,将该对象交给下一个Handler
*/
pipeline.addLast(new IdleStateHandler(3, 0, 0));
// HeartbeatServerHandler必须重写userEventTriggered方法,用来做具体的超时业务处理
pipeline.addLast(new HeartbeartServerHandler());
}
});
System.out.println("Netty服务器启动了");
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

HeartbeartServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.sineagle.heartbeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;

public class HeartbeartServerHandler extends SimpleChannelInboundHandler<String> {
// 读操作的超时次数
int readIdleTimes = 0;

@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
readIdleTimes = 0;
System.out.println("服务端收到的心跳:" + s);
ctx.writeAndFlush("========= 服务端已经收到心跳 ===========");
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
case READER_IDLE:
readIdleTimes ++;
System.out.println("读超时");
break;
case WRITER_IDLE:
System.out.println("写超时");
break;
case ALL_IDLE:
System.out.println("读写超时");
break;
}
// 如果超时次数 > 3, 则关闭客户端连接
if (readIdleTimes > 3) {
System.out.println("读超时超过三次,关闭连接");
ctx.writeAndFlush("超时关闭");
ctx.channel().close();
}
}
}

断线重连

NettyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.sineagle.reconnect;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* 短线重连的客户端
*/
public class NettyClient {
private Bootstrap bootstrap;

public Bootstrap getBootstrap() {
return bootstrap;
}

public void setBootstrap(Bootstrap bootstrap) {
this.bootstrap = bootstrap;
}

public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();

NettyClient nettyClient = new NettyClient();
nettyClient.setBootstrap(bootstrap);

bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 添加业务处理器,解决连接成功后的服务器断开连接的重连动作
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new NettyReconnectClientHandler(nettyClient));
}
});
// 第一次连接
nettyClient.connect();
//阻塞
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}

public void connect() {
// 连接服务器
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9091);
// 异步的知道连接的结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
boolean result = channelFuture.isSuccess();
if (!result) {
// 连接失败 => 在一定的时间间隔以后重新连接
channelFuture.channel().eventLoop().schedule(()->{
System.out.println("重新连接中....");
connect();
}, 2, TimeUnit.SECONDS);
} else {
System.out.println("连接成功!");
}
}
});
}
}

NettyReconnectClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.sineagle.reconnect;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class NettyReconnectClientHandler extends SimpleChannelInboundHandler<String> {
private NettyClient nettyClient;

public NettyReconnectClientHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println("收到服务端的数据: " + msg);
}

/**
* 断线后会调用此方法,重新连接
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务器断线,重连中....");
nettyClient.connect();
}
}

NettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.sineagle.reconnect;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyServer {

public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("服务器启动啦!");
ChannelFuture channelFuture = serverBootstrap.bind(9091).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

NettyServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
package com.sineagle.reconnect;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class NettyServerHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println("收到客户端的数据: " + msg);
}
}

Netty的零拷贝

堆内存和直接内存

JVM在运行时产生的数据大部分放在堆内存中,除此之外,还有一部分数据,比如元空间、栈空间等,这部分数据放在直接内存中。所谓的直接内存,是操作系统除去分配给JVM专有的内存,剩下的内存,也就是机器内存。

jdk为我们提供了DirectByteBuffer类来操作直接内存。通过该类将数据保存在直接内存中,而非堆内存中,这样的好处在于可以在Netty的环境中实现更加高效的数据传输。

DirectByteBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.sineagle.directbytebuffer;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
* 直接内存和堆内存
*/
public class DirectByteBufferDemo {

public static void main(String[] args) {
// 创建了一个堆内存中的ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("Hello Byte Buffer".getBytes());
// 把数据存在直接内存
ByteBuffer directByteBuffer = ByteBuffer.allocateDirect(1024);
directByteBuffer.put("Hello direct Byte Buffer".getBytes());
System.out.println("处理完了");
}
}

零拷贝的实现逻辑

零拷⻉是提升读写性能的有效⼿段。⾸先我们来看下在IO环境下进⾏堆内存的数据

读写。

  • 普通的IO读写

在普通的IO中,⼀次数据的传输和反馈需要进⾏四次拷⻉和多次⽤户态和内核态的

切换。客户端发送数据给操作系统的Socket缓冲区,操作系统将缓冲区数据拷⻉到

直接内存,直接内存再将数据拷⻉到JVM内存中,供Java程序使⽤。当数据需要返

回时,Java程序将数据拷⻉到直接内存,再由操作系统将直接内存的数据拷⻉到

Socket缓冲区,然后发送给⽤户。Java程序的操作属于⽤户态,操作系统的操作属

于内核态,在这个过程中,需要经历这两个状态的多次切换。

  • 零拷贝的IO读写

零拷贝是将用户态的java程序操作数据的方式是直接操作内核态内存中的数据,而非自身JVM内存中的数据。这样可以减少用户态和内核态之间的两次拷贝,也大大减少了用户态和内核态的切换次数,提升了性能。

求大佬赏个饭