mqtt集群订阅如何只消费一个(一次)消息?
共享订阅
在实践中我们的业务系统集成是集群存在,对于mqtt消息,如果没有做特殊处理,那么集群中每个服务只要订阅了mqtt中的一个主题,那么每台服务器都会进行消费。不仅浪费资源,还限制了服务的处理上限。
因此,有不少朋友在问这类问题:走MQTT协议,两个订阅者都订阅同一个topic,向该topic发布一个消息,怎么只让其中的订阅者消费?
针对这个问题,笔者经过实践发现,可以通过broker共享订阅机制来解决此问题。
什么是共享订阅
共享订阅是在多个订阅者之间实现负载均衡的订阅方式:
[subscriber1] got msg1 msg1, msg2, msg3 / [publisher] ----------------> "$share/g/topic" -- [subscriber2] got msg2 \ [subscriber3] got msg3
上图中,共享 3 个 subscriber 用共享订阅的方式订阅了同一个主题 $share/g/topic
,其中topic
是它们订阅的真实主题名,而 $share/g/
是共享订阅前缀。EMQ X 支持两种格式的共享订阅前缀:
如何实现共享订阅
以emqx为例,可通过三方面来进行配置:
第一,配置emqx,开启共享订阅。
# etc/emqx.conf # 均衡策略 broker.shared_subscription_strategy = random # 适用于 QoS1 QoS2 消息,启用时在其中一个组离线时,将派发给另一个组 broker.shared_dispatch_ack_enabled = true
在emqx.conf配置文件中,通过shared_subscription_strategy指定共享订阅均衡策略。通过shared_dispatch_ack_enabled设置为true来开启共享订阅功能。
其中random表示在所有订阅者中随机选择,round_robin表示按照订阅顺序,sticky表示一直发往上次选取的订阅者,hash表示按照发布者 ClientID 的哈希值。
第二,消息发布。消息发布者与正常的消息发布一样,比如定义了这样一个topic路径。
iot/equipment/heart
第三,消息订阅
订阅的时候通过添加$queue前缀或$share前缀进行共享订阅。
$queue/iot/equipment/heart // 或 $share/iot/equipment/heart
一定要注意,真是的topic是iot/equipment/heart,因此发布的时候只能用这个路径。而订阅时添加上$queue/或$share/前缀来表示是基于不带群组的共享定义还是待群组的共享订阅。
如果发布的主题直接添加上两个前缀的一个,则消息是无法正常发送的。
关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台
除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接