一张图进阶RocketMQ-通信机制(视频版)(组图)

前言

三此君看了好几本书,看了好多遍源码整理的一张图进阶RocketMQ图片,关于RocketMQ你只须要记住这张图!感觉不错的话,记得点赞关注哦。

【重要】视频在B站同步更新,欢迎围观,轻轻松松涨坐姿。一张图进阶RocketMQ-通讯机制(视频版)

点击查看【bilibili】

本文是“一张图进阶RocketMQ”第4篇,对RocketMQ不了解的朋友可以先瞧瞧上面三期:

一张图进阶RocketMQ-整体构架一张图进阶RocketMQ-NameServer一张图进阶RocketMQ-消息发送

上一期分享了RocketMQ生产者启动流程及同步消息发送流程,我们晓得了在通讯层是基于Netty将消息传递给Broker进行储存的。假如对Netty完全不了解我们就很难真正理解RocketMQ,所以昨晚我们简单的聊一聊Netty基本流程,之后剖析RocketMQ的通讯机制,最后通过异步消息发送来串联RocketMQ通讯机制。

Netty介绍

Netty有好多概念,等介绍完概念你们都困了,我们就不过多介绍了,直接结合示例来瞧瞧Netty的基础流程,就能帮助我们更好的理解RocketMQ即可。

Netty服务端启动初始化两个线程组BossGroup&WorkerGroup,分别用于处理顾客端联接及网路读写。Netty顾客端启动初始化一个线程组,用于处理恳求及返回结果。顾客端connect到Netty服务端,创建用于传输数据的Channel。Netty服务端的BossGroup处理顾客端的联接恳求,之后把剩下的工作交给WorkerGroup。联接构建好了,顾客端就可以借助这个联接发送数据给Netty服务端。NettyWorkerGroup中的线程使用Pipeline(包含多个处理器Handler)对数据进行处理。Netty服务端的处理完恳求后,返回结果也经过Pipeline处理。Netty服务端通过Channel将数据返回给顾客端。顾客端通过Channel接收到数据,也经过Pipeline进行处理。Netty示例

我们先用Netty实现一个简单的服务端/顾客端通讯示例,我们是这样使用的,那RocketMQ基于Netty的通讯也应当是这样使用的,不过是在这个基础上封装了一层。主要关注以下几个点:服务端哪些时侯初始化的,服务端实现的Handler做了哪些事?顾客端哪些时侯初始化的,顾客端实现的Handler做了哪些事?

Netty服务端初始化:初始化的代码很关键,我们要从源码上理解RocketMQ的通讯机制,那肯定会见到类似的代码。按照前面的流程来看,首先是实例化bossGroup和workerGroup,之后初始化Channel,从代码可以看出我们是在Pipeline中添加了自己实现的Handler,这个Handler就是业务自己的逻辑了,那RocketMQ要处理数据应当也须要实现相应的Handler。

public class MyServer {
    public static void main(String[] args) throws Exception {
        //创建两个线程组 boosGroup、workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务端的启动对象,设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置两个线程组boosGroup和workerGroup
            bootstrap.group(bossGroup, workerGroup)
                //设置服务端通道实现类型    

图片[1]-一张图进阶RocketMQ-通信机制(视频版)(组图)-唐朝资源网

.channel(NioServerSocketChannel.class) //使用匿名内部类的形式初始化Channel对象 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //给pipeline管道添加处理器 socketChannel.pipeline().addLast(new MyServerHandler()); } });//给workerGroup的EventLoop对应的管道设置处理器 //绑定端口号,启动服务端 ChannelFuture channelFuture = bootstrap.bind(6666).sync(); //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

实现自定义的服务端处理器Handler:自定义的Handler须要实现Netty定义的HandlerAdapter,当有可读风波时都会调用这儿的channelRead()技巧。等下我们看RocketMQ通讯机制的时侯留心RocketMQ自定义了什么Handler,这种Handler有做了哪些事。

/**
 * 自定义的Handler需要继承Netty规定好的 HandlerAdapter 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
 **/

图片[2]-一张图进阶RocketMQ-通信机制(视频版)(组图)-唐朝资源网

public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取客户端发送过来的消息 ByteBuf byteBuf = (ByteBuf) msg; System.out.println("收到" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8)); //发送消息给客户端 ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,记得关注三此君,记得三连", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //发生异常,关闭通道 ctx.close(); } }

Netty顾客端初始化:Netty顾客端,在RocketMQ中对应了Producer/Consumer。在Producer启动中有一步是启动通讯模块服务,当然就是初始化Netty顾客端。顾客端也须要先实例化一个NioEventLoopGroup,之后将自定义的handler添加到Pipeline,还有很重要的一步是我们须要connect联接到Netty服务端。

public class MyClient {
    public static void main(String[] args) throws Exception {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //创建bootstrap启动引导对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            //设置线程组

图片[3]-一张图进阶RocketMQ-通信机制(视频版)(组图)-唐朝资源网

bootstrap.group(eventExecutors) //设置客户端的Channel实现类型 .channel(NioSocketChannel.class) //使用匿名内部类初始化 Pipeline .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加客户端Channel的处理器 ch.pipeline().addLast(new MyClientHandler()); } }) //connect连接服务端 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync(); //对Channel关闭进行监听 channelFuture.channel().closeFuture().sync(); } finally { //关闭线程组 eventExecutors.shutdownGracefully(); } } }

实现自定义的顾客端处理器Handler:顾客端处理器也承继自Netty定义的HandlerAdapter,当Channel显得可读的时侯(服务端数据返回)会调用我们自己实现的channelRead()。

public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("三此君,我正在看 RocketMQ 生产者发送消息~", CharsetUtil.UTF_8));
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到三此君的消息,我一定会三连的" + ctx.channel().remoteAddress() + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

RocketMQ通讯流程

RocketMQ通讯模块基于Netty实现,总体代码量不多。主要是NettyRemotingServer和NettyRemotingClient,分别对应通讯的服务端和顾客端。依照上面的Netty示例,我们要理解RocketMQ怎样基于Netty通讯,只须要晓得4个地方:NettyRemotingServer怎么初始化,NettyRemotingClient初始化,怎样基于NettyRemotingClient发送消息,无论是顾客端还是服务端收到数据后都须要Handler来处理。

异步发送

不仅同步消息发送,RocketMQ还支持异步发送。我们只须要在后面是示例中稍作更改都会得到一个异步发送示例,最大的不同在于发送的时侯传入SendCallback接收异步返回结果反弹。

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");

图片[4]-一张图进阶RocketMQ-通信机制(视频版)(组图)-唐朝资源网

// 启动Producer实例 producer.start(); // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes("UTF-8")); // SendCallback 接收异步返回结果的回调 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("关注呀!!!%-10d OK %s %n", index,sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("三连呀!!!%-10d Exception %s %n", index, e); e.printStackTrace(); } }); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }

同步发送个异步发送主要的过程都是一样的,不同点在于同步消息调用NettyChannel.writeAndFlush以后是waitResponse等待Broker返回,而异步消息是调用预先定义好的反弹函数。

异步消息和同步消息整体差不多,可以说在基于Netty实现异步消息比同步消息还要简单一下,我们这儿主要来看一些不同点:

总结

© 版权声明
THE END
喜欢就支持一下吧
点赞177赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容