共享订阅

在实践中我们的业务系统集成是集群存在,对于mqtt消息,如果没有做特殊处理,那么集群中每个服务只要订阅了mqtt中的一个主题,那么每台服务器都会进行消费。不仅浪费资源,还限制了服务的处理上限。

因此,有不少朋友在问这类问题:走MQTT协议,两个订阅者都订阅同一个topic,向该topic发布一个消息,怎么只让其中的订阅者消费?

针对这个问题,笔者经过实践发现,可以通过broker共享订阅机制来解决此问题。

什么是共享订阅

共享订阅是在多个订阅者之间实现负载均衡的订阅方式:

                                                   [subscriber1] got msg1
             msg1, msg2, msg3                    /
[publisher]  ---------------->  "$share/g/topic"  -- [subscriber2] got msg2
                                                 \
                                                   [subscriber3] got msg3

上图中,共享 3 个 subscriber 用共享订阅的方式订阅了同一个主题 $share/g/topic,其中topic 是它们订阅的真实主题名,而 $share/g/ 是共享订阅前缀。EMQ X 支持两种格式的共享订阅前缀:

mqtt集群订阅如何只消费一个(一次)消息?插图

如何实现共享订阅

以emqx为例,可通过三方面来进行配置:

第一,配置emqx,开启共享订阅。

# etc/emqx.conf

# 均衡策略
broker.shared_subscription_strategy = random

# 适用于 QoS1 QoS2 消息,启用时在其中一个组离线时,将派发给另一个组
broker.shared_dispatch_ack_enabled = true

在emqx.conf配置文件中,通过shared_subscription_strategy指定共享订阅均衡策略。通过shared_dispatch_ack_enabled设置为true来开启共享订阅功能。

其中random表示在所有订阅者中随机选择,round_robin表示按照订阅顺序,sticky表示一直发往上次选取的订阅者,hash表示按照发布者 ClientID 的哈希值。

第二,消息发布。消息发布者与正常的消息发布一样,比如定义了这样一个topic路径。

iot/equipment/heart

第三,消息订阅

订阅的时候通过添加$queue前缀或$share前缀进行共享订阅。

$queue/iot/equipment/heart
// 或
$share/iot/equipment/heart

一定要注意,真是的topic是iot/equipment/heart,因此发布的时候只能用这个路径。而订阅时添加上$queue/或$share/前缀来表示是基于不带群组的共享定义还是待群组的共享订阅。

如果发布的主题直接添加上两个前缀的一个,则消息是无法正常发送的。



mqtt集群订阅如何只消费一个(一次)消息?插图1

关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台

除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接

本文链接:http://choupangxia.com/2021/02/07/mqtt-2/