前言
三此君看了好几本书,看了好多遍源码整理的一张图进阶RocketMQ图片,关于RocketMQ你只须要记住这张图!感觉不错的话,记得点赞关注哦。
【重要】视频在B站同步更新,欢迎围观,轻轻松松涨坐姿。一张图进阶RocketMQ-通讯机制(视频版)
点击查看【bilibili】
本文是“一张图进阶RocketMQ”第4篇,对RocketMQ不了解的朋友可以先瞧瞧上面三期:
一张图进阶RocketMQ-整体构架一张图进阶RocketMQ-NameServer一张图进阶RocketMQ-消息发送
上一期分享了RocketMQ生产者启动流程及同步消息发送流程,我们晓得了在通讯层是基于Netty将消息传递给Broker进行储存的。假如对Netty完全不了解我们就很难真正理解RocketMQ,所以昨晚我们简单的聊一聊Netty基本流程,之后剖析RocketMQ的通讯机制,最后通过异步消息发送来串联RocketMQ通讯机制。
Netty介绍
Netty有好多概念,等介绍完概念你们都困了,我们就不过多介绍了,直接结合示例来瞧瞧Netty的基础流程,就能帮助我们更好的理解RocketMQ即可。
Netty服务端启动初始化两个线程组BossGroup&WorkerGroup,分别用于处理顾客端联接及网路读写。Netty顾客端启动初始化一个线程组,用于处理恳求及返回结果。顾客端connect到Netty服务端,创建用于传输数据的Channel。Netty服务端的BossGroup处理顾客端的联接恳求,之后把剩下的工作交给WorkerGroup。联接构建好了,顾客端就可以借助这个联接发送数据给Netty服务端。NettyWorkerGroup中的线程使用Pipeline(包含多个处理器Handler)对数据进行处理。Netty服务端的处理完恳求后,返回结果也经过Pipeline处理。Netty服务端通过Channel将数据返回给顾客端。顾客端通过Channel接收到数据,也经过Pipeline进行处理。Netty示例
我们先用Netty实现一个简单的服务端/顾客端通讯示例,我们是这样使用的,那RocketMQ基于Netty的通讯也应当是这样使用的,不过是在这个基础上封装了一层。主要关注以下几个点:服务端哪些时侯初始化的,服务端实现的Handler做了哪些事?顾客端哪些时侯初始化的,顾客端实现的Handler做了哪些事?
Netty服务端初始化:初始化的代码很关键,我们要从源码上理解RocketMQ的通讯机制,那肯定会见到类似的代码。按照前面的流程来看,首先是实例化bossGroup和workerGroup,之后初始化Channel,从代码可以看出我们是在Pipeline中添加了自己实现的Handler,这个Handler就是业务自己的逻辑了,那RocketMQ要处理数据应当也须要实现相应的Handler。
public class MyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务端的启动对象,设置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
//使用匿名内部类的形式初始化Channel对象
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道添加处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
//绑定端口号,启动服务端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
实现自定义的服务端处理器Handler:自定义的Handler须要实现Netty定义的HandlerAdapter,当有可读风波时都会调用这儿的channelRead()技巧。等下我们看RocketMQ通讯机制的时侯留心RocketMQ自定义了什么Handler,这种Handler有做了哪些事。
/**
* 自定义的Handler需要继承Netty规定好的 HandlerAdapter 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
//发送消息给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,记得关注三此君,记得三连", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常,关闭通道
ctx.close();
}
}
Netty顾客端初始化:Netty顾客端,在RocketMQ中对应了Producer/Consumer。在Producer启动中有一步是启动通讯模块服务,当然就是初始化Netty顾客端。顾客端也须要先实例化一个NioEventLoopGroup,之后将自定义的handler添加到Pipeline,还有很重要的一步是我们须要connect联接到Netty服务端。
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建bootstrap启动引导对象,配置参数
Bootstrap bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(eventExecutors)
//设置客户端的Channel实现类型
.channel(NioSocketChannel.class)
//使用匿名内部类初始化 Pipeline
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加客户端Channel的处理器
ch.pipeline().addLast(new MyClientHandler());
}
})
//connect连接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//对Channel关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
//关闭线程组
eventExecutors.shutdownGracefully();
}
}
}
实现自定义的顾客端处理器Handler:顾客端处理器也承继自Netty定义的HandlerAdapter,当Channel显得可读的时侯(服务端数据返回)会调用我们自己实现的channelRead()。
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送消息到服务端
ctx.writeAndFlush(Unpooled.copiedBuffer("三此君,我正在看 RocketMQ 生产者发送消息~", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收服务端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到三此君的消息,我一定会三连的" + ctx.channel().remoteAddress() + byteBuf.toString(CharsetUtil.UTF_8));
}
}
RocketMQ通讯流程
RocketMQ通讯模块基于Netty实现,总体代码量不多。主要是NettyRemotingServer和NettyRemotingClient,分别对应通讯的服务端和顾客端。依照上面的Netty示例,我们要理解RocketMQ怎样基于Netty通讯,只须要晓得4个地方:NettyRemotingServer怎么初始化,NettyRemotingClient初始化,怎样基于NettyRemotingClient发送消息,无论是顾客端还是服务端收到数据后都须要Handler来处理。
异步发送
不仅同步消息发送,RocketMQ还支持异步发送。我们只须要在后面是示例中稍作更改都会得到一个异步发送示例,最大的不同在于发送的时侯传入SendCallback接收异步返回结果反弹。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes("UTF-8"));
// SendCallback 接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("关注呀!!!%-10d OK %s %n", index,sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("三连呀!!!%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
同步发送个异步发送主要的过程都是一样的,不同点在于同步消息调用NettyChannel.writeAndFlush以后是waitResponse等待Broker返回,而异步消息是调用预先定义好的反弹函数。
异步消息和同步消息整体差不多,可以说在基于Netty实现异步消息比同步消息还要简单一下,我们这儿主要来看一些不同点:
总结
暂无评论内容