基于netty 和 websocket 做一个即时通信 聊天的小应用练习
先来了解即时通信 , 一般会使用三种实现方式:
- Ajax 轮训
- Long pull
- websocket
有很多的例子,比如一些电脑上群组聊天室,手游中的聊天平台等等,都需要一个实时通信,如何实现双向通信
Ajax轮训,是制定每过几秒钟,去ajax异步请求同步服务器新的数据
Long pull也是采用循环的方式,是一种阻塞的模式,当发出请求,如果服务器不响应,他就会一直卡住不动,早期的通信方式
websocket最初由H5提起,是一种协议,http1.1是支持长链接,http1.0是不支持长链接的,websocket基于TCP协议之上,提供具有持久性的协议
对比http每发起一个,必然存在request和response,且是1:1对应的
websocket的优点使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据, 只需要一次链接即可保持长久链接传输数据,除非自己退出游戏了,重新上线
Web即时通信
基于之前的经验,先写一个server服务端
package com.yus.netty.server;import io.netty.bootstrap.BootstrapConfig;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * websocket 服务端 */public class WebSocketServer { public static void main(String[] args) throws InterruptedException { //采用主从线程组模型 //主线程组 EventLoopGroup primaryGroup = new NioEventLoopGroup(); //从线程组 EventLoopGroup subGroup = new NioEventLoopGroup(); try { //服务启动器 ServerBootstrap bootstrap = new ServerBootstrap(); //建立通道,管道以及助手处理类 入口 bootstrap.group(primaryGroup, subGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebSocketChannelInit()); //绑定端口 ChannelFuture future = bootstrap.bind(8081).sync(); future.channel().closeFuture().sync(); } finally { //关闭 primaryGroup.shutdownGracefully(); subGroup.shutdownGracefully(); } }}
初始化器
package com.yus.netty.server;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import io.netty.handler.stream.ChunkedWriteHandler;/** * 通道初始化器 */public class WebSocketChannelInit extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { //获取管道 ChannelPipeline pipeline = ch.pipeline(); //=====开始============用于支持Http协议的处理类================= //通信 netty提供的编解码处理类 pipeline.addLast(new HttpServerCodec()); //对处理数量大的数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //聚合器,方便处理http消息 1024*64 为消息最大长度(byte)支持http协议的必要处理类 pipeline.addLast(new HttpObjectAggregator(1024*64)); //=====结束============用于支持Http协议的处理类================= // 支持websocket协议的处理类,建立链接时使用 // /ws指定客户端访问服务端的路由,可随便自定义,这边写ws是websocket简写 // 该处理类帮我们处理繁重的事情并run websocket服务端, // 并管理通信握手动作(包括close关闭,Ping请求,Pong响应)Ping+Pong=心跳,关于心跳后续再做说明 // 并以frame进行数据传输,不同的数据类型,frame也不同 pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); //自定义 处理类,主要用于读取客户端消息,然后对消息进行处理,最后可以返回给客户端 pipeline.addLast("myHandle", new MyHandle()); }}
自定义处理类
package com.yus.netty.server;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import io.netty.util.concurrent.GlobalEventExecutor;/** * 自定义处理类 * 在写初识化器时有说明,关于websocket传输时,主要以frames方式传输 * 在Netty中frame会专门为websocket处理 文本 的对象 - TextWebSocketFrame * frame是消息的载体 */public class MyHandle extends SimpleChannelInboundHandler{ // 第四步 // ChannelGroup:记录和管理Channel,使用DefaultChannelGroup默认实现,GlobalEventExecutor全局初始化 private static ChannelGroup channelClient = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //第一步:从消息载体中获取客户端的消息 String content = msg.text(); System.out.println("消息:" + content); //第二步: //拿到消息文本,然后将消息发给所有客户端,这时不管有多少个客户端 //都可以将此客户端的消息给所有的客户端,每一个客户端会注册一个channel进来 //通过channel通道进行消息推送出去,这时候就用到了上次学习的Channel的方法周期, //生命周期 重写handlerAdded 和 handlerRemoved Channel channel = ctx.channel(); //第七步 //将数据 刷到所有的客户端 第一种方式 for (Channel channels : channelClient){ //注意 这边的载体是泛型TextWebSocketFrame ,不能直接String扔出去 //要将消息放入载体,再送出去 channels.writeAndFlush(new TextWebSocketFrame("短ID为" + channel.id().asShortText() + "用户 发送消息:" + content)); } //将数据 刷到所有的客户端 第二种 方式直接用ChannelGroup的writeAndFlush //channelClient.writeAndFlush(new TextWebSocketFrame("我又在哪里,我被送出去了吗?")); } /** * 第三步 * 客户端创建了,触发 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //获取客户端的channel双向通道 Channel channel = ctx.channel(); //第五步 //添加到ChannelGroup,方便全局管理 channelClient.add(channel); } /** * 第六步 * 客户端离开了,触发 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //当触发这个handlerRemoved时,其实ChannelGroup会自动移除对应客户端的通道channel //所以不需要我们去调remove的方法,测试发现是多余的 //channelClient.remove(ctx.channel()); //ctx.channel().id()中存在两个ID,一个长ID,一个短ID,如果学习过zookeeper的同学会熟悉一些 //服务少的时候,短ID冲突的可能性小,会用短ID进行选择,反之就是长ID System.out.println("Channel 长ID为 " + ctx.channel().id().asLongText() + "客户端离开了"); System.out.println("Channel 短ID为 " + ctx.channel().id().asShortText() + "客户端离开了"); }}
编写完后端,已经快1点钟了,睡觉了,明天继续写个测试前端页面
--------------------------------------------------