netty项目记录

如何编写并启动一个服务端?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void run() throws Exception{
//指定bossGroup,也就是负责处理连接请求的线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//指定workerGroup,也就是负责处理IO请求的线程
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个
try {
//创建启动类
ServerBootstrap bootstrap = new ServerBootstrap();
//链式编程为启动类加入线程,channel,handler
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
//为管道加入handler或者自定义的handler,netty扩展性的体现
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取到pipeline
ChannelPipeline pipeline = socketChannel.pipeline();
//因为是基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//是以块方式写,添加 ChunkedWriteHandler 处理器
pipeline.addLast(new ChunkedWriteHandler());
/*
说明:
1. http 数据在传输过程中是分段,HttpObjectAggregator,就是可以将多个报文段聚合
2. 这就是为什么,当浏览器发送大量数据时,就会发出多次http请求。
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
* 说明:
* 1.对应websocket ,它的数据是以帧(frame)形式传播
* 2. 可以看到WebSocketFrame ,下面有六个子类
* 3. 浏览器请求时,ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws 协议,保持长连接
* 5. 是通过一个状态码 101
*/
pipeline.addLast(new WebSocketServerProtocolHandler(contentPath));
//自定义的handler,处理业务逻辑
pipeline.addLast(new WebSocketServerHandler());
}
});
System.out.println("netty 服务器启动");
ChannelFuture channelFuture = bootstrap.bind(port).sync();
//监听关闭
channelFuture.channel().closeFuture().sync();
}finally {
//关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//自定义的handler
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//定义一个channel组,管理所有的channel
//GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
//收到消息后触发的方法
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器端收到消息");
//获取到当前的channel
Channel channel = ctx.channel();
//这时我们遍历channelGroup,根据不同的情况,回送不同的消息
channelGroup.forEach(ch -> {
if(channel != ch){ //不是当前的channel,转发消息
//将信息刷回客户端
ch.writeAndFlush(new TextWebSocketFrame(msg.text()));
}
} );
//回复消息
//ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器事件" + LocalDateTime.now() + " " + msg.text()));
}
//当web客户端连接后,触发方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//id表示唯一的值,LongText 是唯一的 ShortText 不是唯一
//System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText());
//System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText());
Channel channel = ctx.channel();
//将该客户加入聊天的信息推送给其他在线的客户端
//该方法会将 channelGroup 中所有的channel 遍历,并发送消息,我们不需要自己遍历
//channelGroup.writeAndFlush(new TextWebSocketFrame("[客户端]" + channel.remoteAddress() + "加入聊天"
// + sdf.format(new Date()) + "\n"));
channelGroup.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生" + cause.getMessage());
ctx.close();
}
}

netty项目记录
http://example.com/post/netty项目记录——react线程模型简述.html
作者
SamuelZhou
发布于
2022年10月22日
许可协议