一张图进阶RocketMQ——整体架构

前言

三继君看过几本书,多次阅读源代码整理的一个图片进阶图片链接。你只需要记住这张照片!如果您是第一次看到这个系列,Wall Crack 建议您打开链接。如果觉得不错,记得点赞关注哦。

视频会在B站同步更新(B站:三河君),欢迎看图进阶-整体架构(视频版),记得调整视频的清晰度~

本文为“进阶一图”系列的第一篇。今天的内容主要分为两部分:

整体架构

什么是消息队列?顾名思义,首先要有一个队列,用来存储消息。那么当有消息队列的时候,就得有人把它放进去,有人去取它。是不是感觉很眼熟jio,这就是连小学生都知道的经典“生产者消费者模式”吗?接下来,我们看看里面有什么?

别着急,我们先来回顾一下老朋友“生产者-消费者模式”。简单来说,这个模型由两种类型的线程和一个队列组成:

有了这个队列,生产者只需要专注于生产,不用关心消费者的消费行为,更不用说等待消费者线程执行完毕;消费者只关心消费,不管生产者如何生产,更不用说等待生产者生产了。

这是什么意思,生产者和消费者之间的解耦和异步。这是古老而强大的。在以后的工作和学习中,你会不断地遇到这些概念,但是你读的越多,就越不能理解它们。我们的很多生活都是异步的。比如最近新冠疫情死灰复燃,我点的外卖只能送到小区门口的外卖队列,只能到外卖队列去取外卖,然后狼吞虎咽.

如何实现“生产者-消费者模式”,你小学就学过吧。让我们看看这个模型有什么问题。最大的问题是,我们小学学的“生产者-消费者模式”是单机版,我们只能为自己骄傲。这相当于,我是一个外卖骑手,我点了一份外卖放在外卖队列中,然后从外卖队列中拿起,操作猛如虎!所以有一个进化版,我们把消费者、队列、生产者放在不同的服务器上,这就是传说中的分布式消息队列。

图片[1]-一张图进阶RocketMQ——整体架构-唐朝资源网

生产者产生的消息通过网络传递到队列存储,消费者通过网络从队列中获取消息。但是还有一个问题。新闻可能有很多种。如果全部放在一起,岂不是一团糟?我订购的所有外卖和快递都放在一起,太难找了。所以我们需要区分不同类型的消息,相同类型的消息称为Topic。同时,不可能只有一个骑手,也不会只有我一个人点外卖,所以有生产者群体和消费者群体。

但是还是有问题。社区太大了,一个队列无法容纳。我住小区南门,外卖还得去北门,真是蛋疼。因此,物业在南门、南门、西北门设置了外卖快递点。也就是我们有多个队列,形成一个队列集群。

然而,问题又来了(而且没完没了),一个社区外卖快递排这么多,骑手怎么知道送到哪里?去吧,我怎么知道在哪里得到它?它很容易导航。我们把导航信息称为路由信息。这个信息需要有一个管理的地方,它告诉生产者某个主题的消息可以发送到哪些队列,同时告诉消费者你需要的消息可以从哪些队列中取出。这些路由信息都设置了管理员,当然可以有很多个组成一个集群。

到此为止,您应该知道里面是什么。这包括生产者()、消费者()和队列本身()。就是代理的意思,负责队列的访问等操作,我们可以理解为队列本身。

消息收发部署示例

我们刚刚了解了整体架构,那么如何发送和接收消息呢?您需要先部署一个集合:

如果没有安装,可以按照菜鸟教程MacOS安装/安装进行安装。然后,通过 – 部署:

注意:如果你现在不明白,那不重要,按照步骤部署即可,不会妨碍我们了解相关内容。

图片[2]-一张图进阶RocketMQ——整体架构-唐朝资源网

部署完成后,我们可以看到相关的容器,包括,和(),这里我们可以使用部署来发送和接收消息。

既然已经部署好了,我们来看一个简单的发送和接收消息的例子,可以说是“Hello World”了。

消息发送

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();

        // 创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("Topic1","Tag", "Key",
                                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); 
        // 发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
      	// 通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

图片[3]-一张图进阶RocketMQ——整体架构-唐朝资源网

消息接收

public class Consumer {
	public static void main(String[] args) throws InterruptedException, MQClientException {
    	// 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    	// 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
    	// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        erbconsumerijun.subscribe("sancijun", "*");
    	// 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

图片[4]-一张图进阶RocketMQ——整体架构-唐朝资源网

@Override public ConsumeConcurrentlyStatus consumeMessage( List msgs,ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); } }

以上就是本文的全部内容。本文不堆砌太多无意义的概念,不讲削峰、解耦、异步通信。网上的内容很多,看和不看没有区别。

© 版权声明
THE END
喜欢就支持一下吧
点赞216赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容