【mq】从零开始实现 mq-01-生产者、消费者启动

什么是 MQ?

MQ(Queue)消息队列是基本数据结构中“先进先出”的数据结构。

指将要传输的数据(消息)放入队列,利用队列机制实现消息传递——生产者生成消息,将消息放入队列,然后消费者处理。

p>

消费者可以从指定的队列中拉取消息,也可以订阅对应的队列,MQ服务器可以向其推送消息。

MQ 是做什么的?

消息队列中间件是分布式系统中的重要组件。主要解决应用解耦、消息异步、流量切割等问题,实现高性能、高可用、可扩展和最终一致性架构。

解耦:一个业务需要多个模块实现,或者一条消息需要多个系统处理。只需要在主业务完成后发送一个MQ,其他模块消费MQ消息实现业务,减少模块数量。之间的耦合。

异步:主业务执行后,通过MQ异步执行从属业务,减少业务响应时间,提升用户体验。

调峰:在高并发情况下,异步处理业务,提供高峰期业务处理能力,避免系统瘫痪。

ps:以上内容摘自维基百科。

准备实现mq maven介绍


    io.netty
    netty-all
    4.1.42.Final


    com.alibaba
    fastjson
    1.2.76

模块划分

java 中的队列。作为mq的零基础学习项目,目前已经开源。

项目的模块如下:

模块说明

mq-

公共代码

mq-

注册中心

mq-

消息生产者

mq-

消息消费者

消息消费者接口定义

package com.github.houbb.mq.consumer.api;
/**
 * @author binbin.hou
 * @since 1.0.0
 */
public interface IMqConsumer {
    /**
     * 订阅
     * @param topicName topic 名称
     * @param tagRegex 标签正则
     */
    void subscribe(String topicName, String tagRegex);
    /**
     * 注册监听器
     * @param listener 监听器
     */
    void registerListener(final IMqConsumerListener listener);
}

作为消息监听类的接口,定义如下:

public interface IMqConsumerListener {
    /**
     * 消费
     * @param mqMessage 消息体
     * @param context 上下文
     * @return 结果
     */
    ConsumerStatus consumer(final MqMessage mqMessage,
                            final IMqConsumerListenerContext context);
}

表示消息消费的几种状态。

消息正文

启动消息体定义如下:

package com.github.houbb.mq.common.dto;
import java.util.Arrays;
import java.util.List;
/**
 * @author binbin.hou
 * @since 1.0.0
 */
public class MqMessage {
    /**
     * 标题名称
     */
    private String topic;
    /**
     * 标签
     */
    private List tags;
    /**
     * 内容
     */
    private byte[] payload;
    /**
     * 业务标识
     */
    private String bizKey;
    /**
     * 负载分片标识
     */
    private String shardingKey;
    // getter&setter&toString
}

推动消费者战略实施

消费者发起的实现如下:

/**
 * 推送消费策略
 *
 * @author binbin.hou
 * @since 1.0.0
 */
public class MqConsumerPush extends Thread implements IMqConsumer  {
    // 省略...
    @Override
    public void run() {
        // 启动服务端
        log.info("MQ 消费者开始启动服务端 groupName: {}, port: {}, brokerAddress: {}",
                groupName, port, brokerAddress);
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(workerGroup, bossGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline().addLast(new MqConsumerHandler());
                        }
                    })
                    // 这个参数影响的是还没有被accept 取出的连接
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            // 绑定端口,开始接收进来的链接
            ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
            log.info("MQ 消费者启动完成,监听【" + port + "】端口");
            channelFuture.channel().closeFuture().syncUninterruptibly();
            log.info("MQ 消费者关闭完成");
        } catch (Exception e) {
            log.error("MQ 消费者启动异常", e);
            throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    // 省略...
}

ps:一开始我们把它当做服务端,后来引入只做服务端。

处理类

这个类是一个空的实现。

public class MqConsumerHandler extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {
        //nothing
    }
}

测试代码

MqConsumerPush mqConsumerPush = new MqConsumerPush();
mqConsumerPush.start();

启动日志:

[DEBUG] [2022-04-21 19:16:41.343] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:16:41.356] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress: 
[INFO] [2022-04-21 19:16:43.196] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动完成,监听【9527】端口

消息生产者接口定义

最基本的消息发送接口。

package com.github.houbb.mq.producer.api;
import com.github.houbb.mq.common.dto.MqMessage;
import com.github.houbb.mq.producer.dto.SendResult;
/**
 * @author binbin.hou
 * @since 1.0.0
 */
public interface IMqProducer {
    /**
     * 同步发送消息
     * @param mqMessage 消息类型
     * @return 结果
     */
    SendResult send(final MqMessage mqMessage);
    /**
     * 单向发送消息
     * @param mqMessage 消息类型
     * @return 结果
     */
    SendResult sendOneWay(final MqMessage mqMessage);
}

生产者实现

的实现如下,基于netty。

package com.github.houbb.mq.producer.core;
/**
 * 默认 mq 生产者
 * @author binbin.hou
 * @since 1.0.0
 */
public class MqProducer extends Thread implements IMqProducer {
    //省略...
    @Override
    public void run() {
        // 启动服务端
        log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",
                groupName, port, brokerAddress);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            ChannelFuture channelFuture = bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer(){
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new LoggingHandler(LogLevel.INFO))
                                    .addLast(new MqProducerHandler());
                        }
                    })
                    .connect("localhost", port)
                    .syncUninterruptibly();
            log.info("MQ 生产者启动客户端完成,监听端口:" + port);
            channelFuture.channel().closeFuture().syncUninterruptibly();
            log.info("MQ 生产者开始客户端已关闭");
        } catch (Exception e) {
            log.error("MQ 生产者启动遇到异常", e);
            throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
    //省略...
}

处理类

默认的空实现什么都不做。

package com.github.houbb.mq.producer.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * @author binbin.hou
 * @since 1.0.0
 */
public class MqProducerHandler extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {
        //do nothing now
    }
}

启动代码

MqProducer mqProducer = new MqProducer();
mqProducer.start();

启动日志:

[DEBUG] [2022-04-21 19:17:11.960] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:17:11.974] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者开始启动客户端 GROUP: P_DEFAULT_GROUP_NAME, PORT: 9527, brokerAddress: 
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x5cb48145] REGISTERED
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler connect
信息: [id: 0x5cb48145] CONNECT: localhost/127.0.0.1:9527
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x5cb48145, L:/127.0.0.1:57740 - R:localhost/127.0.0.1:9527] ACTIVE
[INFO] [2022-04-21 19:17:13.833] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端完成,监听端口:9527

总结

基于netty,最基本的服务端启动和客户端启动都到这里了。

千里之行始于足下。

我们将在下一节中与您一起学习如何实现客户端和服务器之间的交互。

希望这篇文章对你有所帮助。如果喜欢,请点赞,收藏转发。

我是老马,期待下次再见。

开源地址

java中的队列。(java简单版mq实现):

延伸阅读

rpc-从头实现rpc:

[mq]实现mq-01-,从头开始

© 版权声明
THE END
喜欢就支持一下吧
点赞9 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片