针对以上问题,有两个场景:使用阿里云的云服务器的RocketMQ和使用自己搭建的RocketMQ。但无论采用这两种的任何一种,都是可以在同一个topic下,通过tag来进行业务区分的。

网上有很多分析相关使用方式的文章,虽然分析的结果都是“不可以”,但我们可以通过其他的一些方案来进行解决。

自主搭建的RocketMQ

通过自主搭建RocketMQ,然后通过SpringBoot进行集成实现,可以参考在公众号【程序新视界】中的文章《Spring Boot快速集成RocketMQ实战教程》,可关注公众号搜索,也可以关注公众号之后回复“1003”,完整的实战步骤。

这里我们只摘取其中消费者的部分代码:

@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC
        , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED
        , selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED)
public class MqRegisteredListenerDemo implements RocketMQListener<String> {

    private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);

    @Override
    public void onMessage(String message) {
        log.info("received registered message: {}", message);
    }
}

这是其中一个消费者,消费的topic为MqTopicConstant.DEMO_TOPIC,consumerGroup为REGISTERED的,tag便是selectorExpression指定的REGISTERED的tag。

针对同一的topic,另外一个tag的消费者的实现如下:

@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC
        , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_MODIFY
        , selectorExpression = MqTopicConstant.DEMO_TAG_MODIFY)
public class MqModifyListenerDemo implements RocketMQListener<String> {

    private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);

    @Override
    public void onMessage(String message) {
        log.info("received modify message: {}", message);
    }
}

我们可以看到topic是同一个,但consumerGroup和tag不同。这说明什么?这说明只要消费者的consumerGroup不同,那么topic相同的情况下,也可以通过tag进行区分的。

关于其他源码就不再这里贴出了,详情可关注公众号看对应文章。

基于云服务的RocketMQ

基于云服务的RocketMQ与自主搭建的基本一致,我们只要确保groupId(阿里云的叫法)不同,那么同一topic下的tag是可以进行区分处理的。

具体处理这里也只贴出部分代码:

@Configuration
public class ConsumerClient {

    @Resource
    private MqConfigProperties mqConfigProperties;

    @Resource
    private EquipmentMessageListener equipmentMessageListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        // 配置文件
        Properties properties = mqConfigProperties.getMqProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfigProperties.getGroupId());
        // 将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        // 订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();

        // --------业务板块开始--------
        Subscription subscription = new Subscription();
        // 设置需要消费的消息所属的topic
        subscription.setTopic(MqConfigProperties.getInnerTopic());
        // 设置需要消费的消息所属的tag
        subscription.setExpression(MqConfigProperties.getEquipmentMonitorTag());
        // 实现MessageListener接口,并且在consume方法中实现消费逻辑
        subscriptionTable.put(subscription, equipmentMessageListener);
        //订阅多个topic如上面设置
        // --------业务板块结束--------

        // 将订阅者消息放入consumerBean中,在Spring初始加载该bean时,监听MQ中的Topic和tag下的消息
        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
}

在上面的代码中,重点是业务板块部分的代码,如果在订阅关系中重新将业务板块内的代码copy一份,然后修改对应的Expression值(也就是tag值),那么基本上是不会成功的。

往往发送大量消息,只能够收到一部分。其他的会被覆盖掉。当然,如果你想采用不同的topic来处理,只需将业务板块中的内容重新修改,添加到subscriptionTable中即可。

那么,如何解决标题中的问题呢?思路与第一种方案一样,阿里云这里只是创建了一个ConsumerBean,而上面的自主搭建时采用了多个Consumer。那么解决方案就是:初始化多个ConsumerBean,每个ConsumerBean中的配置不同的groupId和tag,同时注册不同的监听器。

如此一来,就可以监听一个topic下的不同tag了。

原理分析

两个一样的ConsumerGroup的Consumer订阅同一个Topic,但是是不同的tag,Consumer1订阅Topic的tag1,Consumer2订阅Topic的tag2,然后分别启动。这时候往Topic的tag1里发送10条数据,Topic的tag2里发送10条。目测应该是Consumer1和Consumer2分别收到对应的10条消息。结果却是只有Consumer2收到了消息,而且只收到了4-6条消息,不固定。

这种现象的原因是:消息的分配是Broker决定的,而不是Consumer端,Consumer端发心跳给Broker,Broker收到后存到consumerTable里(就是个Map),key是GroupName,value是ConsumerGroupInfo。ConsumerGroupInfo里面是包含topic等信息的,但是问题就出在上一步骤,key是groupName,同GroupName的话Broker心跳最后收到的Consumer会覆盖前者的。

这样同key,肯定产生了覆盖。所以Consumer1不会收到任何消息,但是Consumer2为什么只收到了一半(不固定)消息呢?

那是因为:集群模式消费,它会负载均衡分配到各个节点去消费,所以一半消息(不固定个数)跑到了Consumer1上,结果Consumer1订阅的是tag1,所以不会任何输出。

如果换成BROADCASTING,那后者会收到全部消息,而不是一半,因为广播是广播全部Consumer。

如果还有其他相关问题,也可关注公众号“程序新视界”,相互沟通学习。



RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?插图

关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台

除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接

本文链接:http://choupangxia.com/2020/11/26/rocketmq-topic-tag/