一、引言
目前我们对于MQTT的使用策略是将其作为一种纯粹的消息协议来使用的,对于整个MQTT技术栈来说,协议只占其中大约三分之一的应用开发范围;就如同W3C Trace Context协议之于OpenTelemetry,MQTT本身也是有一套基于OASIS提供的标准开发、实施、部署的完整解决方案。
而我司对MQTT的使用则是只使用了其中的协议层,并且这个协议层本身的特性也只实现了最基础的部分,更为复杂的协议特性大多都是放在业务层进行实现而非通过框架开发或者架构设计进行通用性处理,以致于我们在长连接消息任务的处理上经常遇到由于基础能力依赖业务实现导致的代码功能职责混乱,从而产生很多难以排查难以预警的问题,出现一种bug如同“打地鼠”的感觉。
因此对于我司的长连接架构使用我们应当进行对于现状与MQTT标准实现等方案进行更全面的对比,从而明确我司后续的长连接业务与技术发展框架。
二、现状与问题
上图是我司现在基于OpenResty的网关大概架构,我们可以发现,我司的长连接服务是典型的C/S架构设计。因为在此架构的设计之初,考虑到的绝大多数情况都是由客户端主动发起消息,服务端作为消息接收方再给予回执,因此采用了相对传统的“聊天系统架构”。然而MQTT协议在设计之初本身主要并不是作为聊天系统协议构建的,其技术架构形态更适合作为多设备间的广播消息处理,是用于IoT设备的,因此才出现了协议设计与当时实际业务不匹配导致的技术错位问题。
因此我司长连接架构中的客户端与服务端的业务能力是不对等的,但是MQTT本身又是一个对等协议,例如在MQTT协议里并未设计服务端向客户端的反向通知能力,因为在MQTT的整体框架里消息服务器本身是一个第三方,性质更接近消息队列的概念,只要使用了队列中的消息,那就是消费者,就是Client,而服务器的概念在MQTT的框架里只是一个用来处理接入、路由、消息缓存的中间件,本身并不与业务挂钩。也就是说我们的业务服务(例如wchat这样的业务),在MQTT框架下看来他应该也是一个Client,但是我们的业务服务接受消息的方式并不是通过向MQTT消息队列进行标准形式的订阅,而是通过路由派发的方式进行处理,也因此造成了服务端之间的跨服务间消息通信与回调需要在这个技术框架下进行额外处理。
而对于MQTT协议本身,我们的使用也仅限于作为一种消息传递的基本载体,对于MQTT协议标准中已经设计好的部分并没有按照标准方式进行处理,例如KeepAlive、持久会话、QoS、保留消息、遗嘱消息等。同时MQTT整个技术架构其实是偏向于多Topic、小队列消息的形态,本身就是以广播消息的形态作为其基本设计,点对点消息只是其中的很小一部分使用形态,同时在引入MQTT协议时当时市面上大部分的MQTT Broker在处理单Topic大批量消息上并没有比其他技术方案更优秀,典型的如mosquitto这样的基于单线程处理消息队列的逻辑,导致其性能与kafka等MQ相距胜远,只是单纯的资源占用很小,适用于IoT设备中的嵌入式网关,在大规模并发消息与长队列消息中并不占优。
基于以上论述我们可以大概总结出我司长连接服务的几个现状与问题:
- MQTT协议与聊天场景本身领域差异导致技术错位的历史问题
- 服务端的发布订阅机制不是直接作用于MQTT协议本身,因此产生因协议限制导致的诸多突破框架的业务操作,例如在业务服务这一侧其实与MQTT协议完全无关,完全走的是kafka
- 对于MQTT协议本身特性使用不足,大量已经定义好的协议特性放在了业务方进行处理,造成了协议冗余与业务复杂度提升,早期在灵活性与稳定性中选择了灵活性,使得在后续的业务发展中产生了技术债
三、标准实现是什么样子的?
标准框架
对于标准的MQTT框架来说,整体架构非常简单,整个框架体系中只包含了四个角色:发布者、订阅者、代理与主题。
在这个体系架构下,由MQTT Broker Cluster承担了鉴权、连接管理、用户权限管理、消息路由、消息队列、主题管理等一系列功能。由于MQTT协议自身的轻量级、灵活性与通用性,其在即时聊天与游戏领域缺少很多技术框架上的细节设计。例如在原始的MQTT架构内,两个MQTT Client只要订阅同一个Topic就能收到来自这个Topic的任何消息,MQTT Client也可以向任意topic发送消息而没有阻拦,同时由于MQTT具有父Topic会基于子Topic自动生成的特性,Topic的生成权限也没有控制,因此对于我司来说原始的MQTT协议只是一个毛坯房,依旧需要大量的定制化开发才能满足业务需求。
但是MQTT标准框架下依旧有很多通用性能力值得我们引入:
- 基于订阅发布的对等前后端关系便于后端业务的横向拓展,而不用在基础的CS架构上进行缝补,前后端由于在MQTT标准框架下的拓扑位置保持一致,因此聊天架构的拓扑模型可以简化为Client-Broker的二元关系,问题排查也会更加容易
- 明确了MQTT Broker作为消息处理中间件与业务解耦,通用化了持久层、QoS、遗嘱消息、大规模广播消息性能、多端登录等能力,使得这些通用基础能力可以在一个与业务无关的领域进行集中处理
- 基于Topic机制进行基础业务逻辑的处理,而不是完全依赖Payload中的自定义协议,使得常用业务流程得以快速处理,例如用户只需要订阅自己的inbox(如
chat/user/${user_id}/inbox
)即可接收所有有关自己的私聊消息,后端通过订阅group/#这个顶级topic即可处理所有群聊业务,而不需要通过网关进行主动转发等,同时大量的命令类消息无需通过payload即可通过topic来实现,例如添加好友可以采用订阅req/user/${user_id}/add
的方式,查看用户状态可以使用user/${user_id}/state
这样的方式等
标准协议
控制报文
一个MQTT的完整控制报文结构大致如下:
HEADER |
---|
Fixed Header固定报头,所有控制报文都包含 |
Variable Header 可变报头,部分控制报文包含 |
Payload 有效载荷,部分控制报文包含 |
固定报头
第一部分是固定报头,采用固定2个字节的长度
比特位 | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | MQTT控制报文的类型 | 用于指定控制报文类型的标志位 | ||||||
byte 2... | 剩余长度 |
可变报头
第二部分是可变报头,包含一些可以自定义的信息,可变报头长度由固定报头的第二个字节剩余长度决定,下表表示了哪些协议需要可变报头
名字 | 值 |
---|---|
CONNECT | 不需要 |
CONNACK | 不需要 |
PUBLISH | 需要(如果QoS>0) |
PUBACK | 需要 |
PUBREC | 需要 |
PUBREL | 需要 |
PUBCOMP | 需要 |
SUBSCRIBE | 需要 |
SUBACK | 需要 |
UNSUBSCRIBE | 需要 |
UNSUBACK | 需要 |
PINGREQ | 不需要 |
PINGRESP | 不需要 |
DISCONNECT | 不需要 |
AUTH | 不需要 |
MQTTV5的可变报头相比V311增加了属性部分,在V5协议中一个标准的可变报头结构如下:
说明 | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
协议名 Protocol Name | |||||||||
byte 1 | 长度 Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | 长度 Length LSB (4) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
byte 3 | ‘M’ | 0 | 1 | 0 | 0 | 1 | 1 | 0 | 1 |
byte 4 | ‘Q’ | 0 | 1 | 0 | 1 | 0 | 0 | 0 | 1 |
byte 5 | ‘T’ | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 |
byte 6 | ‘T’ | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 |
协议版本 Protocol Version | |||||||||
byte 7 | Version (5) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 1 |
连接标志 Connect Flags | |||||||||
byte 8 | 用户名标志 User Name Flag (1) 密码标志 Password Flag (1) 遗嘱保留标志 Will Retain (0) 遗嘱服务质量 Will QoS (01) 遗嘱标志 Will Flag (1) 新开始 Clean Start(1) 保留 Reserved (0) |
1 | 1 | 0 | 0 | 1 | 1 | 1 | 0 |
保持连接 Keep Alive | |||||||||
byte 9 | 保持连接 Keep Alive MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 10 | 保持连接 Keep Alive LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
属性 Properties (V5新增) | |||||||||
byte 11 | 长度 Length (5) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 1 |
byte 12 | 会话过期间隔标识符 (17) | 0 | 0 | 0 | 1 | 0 | 0 | 0 | 1 |
byte 13 | 会话过期间隔Session Expiry Interval (10) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 14 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 15 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 16 | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
有效载荷
第三部分是有效载荷,其中MQTTV5相比V311,在UNSUBACK下需要有效载荷
MQTT控制报文 | 有效载荷 |
---|---|
CONNECT | 需要 |
CONNACK | 不需要 |
PUBLISH | 可选 |
PUBACK | 不需要 |
PUBREC | 不需要 |
PUBREL | 不需要 |
PUBCOMP | 不需要 |
SUBSCRIBE | 需要 |
SUBACK | 需要 |
UNSUBSCRIBE | 需要 |
UNSUBACK | 需要(V311不需要) |
PINGREQ | 不需要 |
PINGRESP | 不需要 |
DISCONNECT | 不需要 |
AUTH | 不需要 |
MQTT 控制报文
因此MQTTV5的可用控制报文类型如下:
名字 | 值 | 报文流动方向 | 描述 |
---|---|---|---|
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | 客户端到服务端 | 客户端请求连接服务端 |
CONNACK | 2 | 服务端到客户端 | 连接报文确认 |
PUBLISH | 3 | 两个方向都允许 | 发布消息 |
PUBACK | 4 | 两个方向都允许 | QoS 1消息发布收到确认 |
PUBREC | 5 | 两个方向都允许 | 发布收到(保证交付第一步) |
PUBREL | 6 | 两个方向都允许 | 发布释放(保证交付第二步) |
PUBCOMP | 7 | 两个方向都允许 | QoS 2消息发布完成(保证交互第三步) |
SUBSCRIBE | 8 | 客户端到服务端 | 客户端订阅请求 |
SUBACK | 9 | 服务端到客户端 | 订阅请求报文确认 |
UNSUBSCRIBE | 10 | 客户端到服务端 | 客户端取消订阅请求 |
UNSUBACK | 11 | 服务端到客户端 | 取消订阅报文确认 |
PINGREQ | 12 | 客户端到服务端 | 心跳请求 |
PINGRESP | 13 | 服务端到客户端 | 心跳响应 |
DISCONNECT | 14 | 两个方向都允许 | 断开连接通知 |
AUTH | 15 | 两个方向都允许 | 认证信息交换 |
MQTT在V5下相比V311新增了一个Auth报文,该报文在V311下属于Reserved保留类型15,在V5下用于进行双向验证。具体使用方法可参照MQTT文档中3.15小节。
MQTTV5的控制报文在V311的基础上新增了属性段(Properties),该部分由属性长度与属性内容构成,属性内容建议使用一个字节表示,其内容如下:
标识符 | 属性名 | 数据类型 | 报文/遗嘱属性 | |
---|---|---|---|---|
Dec | Hex | |||
1 | 0x01 | 载荷格式说明 | 字节 | PUBLISH, Will Properties |
2 | 0x02 | 消息过期时间 | 四字节整数 | PUBLISH, Will Properties |
3 | 0x03 | 内容类型 | UTF-8编码字符串 | PUBLISH, Will Properties |
8 | 0x08 | 响应主题 | UTF-8编码字符串 | PUBLISH, Will Properties |
9 | 0x09 | 相关数据 | 二进制数据 | PUBLISH, Will Properties |
11 | 0x0B | 定义标识符 | 变长字节整数 | PUBLISH, SUBSCRIBE |
17 | 0x11 | 会话过期间隔 | 四字节整数 | CONNECT, CONNACK |
18 | 0x12 | 分配客户标识符 | UTF-8编码字符串 | CONNACK |
19 | 0x13 | 服务端保活时间 | 双字节整数 | CONNACK |
21 | 0x15 | 认证方法 | UTF-8编码字符串 | CONNECT, CONNACK, AUTH |
22 | 0x16 | 认证数据 | 二进制数据 | CONNECT, CONNACK, AUTH |
23 | 0x17 | 请求问题信息 | 字节 | CONNECT |
24 | 0x18 | 遗嘱延时间隔 | 四字节整数 | Will Properties |
25 | 0x19 | 请求响应信息 | 字节 | CONNECT |
26 | 0x1A | 请求信息 | UTF-8编码字符串 | CONNACK |
28 | 0x1C | 服务端参考 | UTF-8编码字符串 | CONNACK, DISCONNECT |
31 | 0x1F | 原因字符串 | UTF-8编码字符串 | CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH |
33 | 0x21 | 接收最大数量 | 双字节整数 | CONNECT, CONNACK |
34 | 0x22 | 主题别名最大长度 | 双字节整数 | CONNECT, CONNACK |
35 | 0x23 | 主题别名 | 双字节整数 | PUBLISH |
36 | 0x24 | 最大QoS | 字节 | CONNACK |
37 | 0x25 | 保留属性可用性 | 字节 | CONNACK |
38 | 0x26 | 用户属性 | UTF-8字符串对 | CONNECT, CONNACK, PUBLISH, Will Properties, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, AUTH |
39 | 0x27 | 最大报文长度 | 四字节整数 | CONNECT, CONNACK |
40 | 0x28 | 通配符订阅可用性 | 字节 | CONNACK |
41 | 0x29 | 订阅标识符可用性 | 字节 | CONNACK |
42 | 0x2A | 共享订阅可用性 | 字节 | CONNACK |
一个属性包含一段数据和一个定义了属性用途和数据类型的标识符。标识符被编码为变长字节整数。任何控制报文,如果包含了:对于该报文类型无效的标识符,或者错误类型的数据,都是无效报文。收到无效报文时,服务端或客户端使用包含原因码0x81(无效报文)CONNACK或DISCONNECT报文进行错误处理,标识符排序不分先后。
错误码处理机制
MQTTV5同时提供了一套标准的错误码处理机制,叫做原因码(Reason Code),在不同的报文类型下提供了不同的可用原因码用于排查故障原因,原因码放在可变报头内:
原因码 | 名称 | 报文 | |
---|---|---|---|
Dec | Hex | ||
0 | 0x00 | 成功 | CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK, AUTH |
0 | 0x00 | 正常断开 | DISCONNECT |
0 | 0x00 | 授权的QoS 0 | SUBACK |
1 | 0x01 | 授权的QoS 1 | SUBACK |
2 | 0x02 | 授权的QoS 2 | SUBACK |
4 | 0x04 | 包含遗嘱的断开 | DISCONNECT |
16 | 0x10 | 无匹配订阅 | PUBACK, PUBREC |
17 | 0x11 | 订阅不存在 | UNSUBACK |
24 | 0x18 | 继续认证 | AUTH |
25 | 0x19 | 重新认证 | AUTH |
128 | 0x80 | 未指明的错误 | CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT |
129 | 0x81 | 无效报文 | CONNACK, DISCONNECT |
130 | 0x82 | 协议错误 | CONNACK, DISCONNECT |
131 | 0x83 | 实现错误 | CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT |
132 | 0x84 | 协议版本不支持 | CONNACK |
133 | 0x85 | 客户标识符无效 | CONNACK |
134 | 0x86 | 用户名密码错误 | CONNACK |
135 | 0x87 | 未授权 | CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT |
136 | 0x88 | 服务端不可用 | CONNACK |
137 | 0x89 | 服务端端正忙 | CONNACK, DISCONNECT |
138 | 0x8A | 禁止 | CONNACK, DISCONNECT |
139 | 0x8B | 服务端关闭中 | DISCONNECT |
140 | 0x8C | 无效的认证方法 | CONNACK, DISCONNECT |
141 | 0x8D | 保活超时 | DISCONNECT |
142 | 0x8E | 会话被接管 | DISCONNECT |
143 | 0x8F | 主题过滤器无效 | SUBACK, UNSUBACK, DISCONNECT |
144 | 0x90 | 主题名无效 | CONNACK, PUBACK, PUBREC, DISCONNECT |
145 | 0x91 | 报文标识符已被占用 | PUBACK, PUBREC, SUBACK, UNSUBACK |
146 | 0x92 | 报文标识符无效 | PUBREL, PUBCOMP |
147 | 0x93 | 接收超出最大数量 | DISCONNECT |
148 | 0x94 | 主题别名无效 | DISCONNECT |
149 | 0x95 | 报文过长 | CONNACK, DISCONNECT |
150 | 0x96 | 消息太过频繁 | DISCONNECT |
151 | 0x97 | 超出配额 | CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT |
152 | 0x98 | 管理行为 | DISCONNECT |
153 | 0x99 | 载荷格式无效 | CONNACK, PUBACK, PUBREC, DISCONNECT |
154 | 0x9A | 不支持保留 | CONNACK, DISCONNECT |
155 | 0x9B | 不支持的QoS等级 | CONNACK, DISCONNECT |
156 | 0x9C | (临时)使用其他服务端 | CONNACK, DISCONNECT |
157 | 0x9D | 服务端已(永久)移动 | CONNACK, DISCONNECT |
158 | 0x9E | 不支持共享订阅 | SUBACK, DISCONNECT |
159 | 0x9F | 超出连接速率限制 | CONNACK, DISCONNECT |
160 | 0xA0 | 最大连接时间 | DISCONNECT |
161 | 0xA1 | 不支持订阅标识符 | SUBACK, DISCONNECT |
162 | 0xA2 | 不支持通配符订阅 | SUBACK, DISCONNECT |
请求响应机制
最后在MQTTV5下,为了使得MQTT协议也支持类似HTTP请求一样的来回成对的消息传递机制,提供了一套请求响应机制,该机制交互过程如下:
- MQTT客户端(请求方)向主题发布请求消息。请求消息是具有响应主题的应用消息。
- 另一个MQTT客户端(响应方)订阅了与请求消息发布时使用的主题名相匹配的主题过滤器。结果,它收到请求消息。可能有多个响应方订阅了此主题名,也可能没有响应方。
- 响应方根据请求消息采取适当的操作,然后往请求消息中携带的响应主题属性中的主题名发布响应消息。
- 典型用法,请求方订阅了响应主题,从而接收到响应信息。但是,其他某些客户端可能会订阅响应主题,因此它们也将接收和处理响应消息。与请求消息一样,可能有多个客户端订阅了响应消息的发送主题,也可能没有。
对于 MQTT 的请求响应机制的具体行为可以参照这篇翻译文档MQTT 的请求响应机制,通过 MQTTV5 提供的新机制,为后续我们在长连接上时间更强大的业务能力提供的基础。
MQTT 协议总结
总的来说MQTTV5相比MQTTV311新增了如下内容,便于我们利用协议完成更复杂的业务需求,并将其标准化:
- 会话过期 把清理会话标志拆分成新开始标志(指示会话应该在不使用现有会话的情况下开始)和会话过期间隔标志(指示连接断开之后会话保留的时间)。会话过期间隔时间可以在断开时修改。把新开始标志设置为1且会话过期间隔标志设置为0,等同于在MQTT v3.1.1中把清理会话(CleanSession)设置为1。
- 消息过期 允许消息在发布时设置一个过期间隔。
- 所有确认报文原因码 更改所有响应报文以包含原因码,包括CONNACK,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBACK,UNSUBACK,DISCONNECT和AUTH,以使得调用方确定请求的函数是否成功。
- 所有确认报文原因字符串 更改大部分报文以包含原因码同时也允许一个可选的原因字符串。这是为问题定位而设计的,并且不应由接收端所解析。
- 服务端断开 允许服务端发送DISCONNECT报文,以指示连接被关闭的原因。
- 载荷格式和内容类型 允许在消息发布时指定载荷格式(二进制、文本)和MIME样式内容类型。这些信息被转发到消息的接收端。
- 请求/响应 规定MQTT请求/响应模式,提供响应主题和对比数据属性,以使得响应消息被路由回请求的发布者。此外,为客户端添加从服务端获取获取关于构造响应主题的配置信息的能力。
- 共享订阅 添加对共享订阅的支持,以允许多个订阅消费者进行负载均衡。
- 订阅标识符 允许在SUBSCRIBE报文中指定一个数字订阅标识符,并在消息分发时返回此标识符。这使得客户端收到分发的消息时确定此消息是由哪个或哪些订阅导致的。
- 主题别名 通过将主题名缩写为小整数来减小MQTT报文的开销大小。客户端和服务端分别指定它们允许的主题别名的数量。
- 流量控制 允许客户端和服务端分别指定未完成的可靠消息(QoS>0)的数量。发送端可以暂停发送此类消息以保持消息数量低于配额。这被用于限制可靠消息的速率和某一时刻的传输中(in-flight)消息数量。
- 用户属性 为大多数报文添加用户属性。PUBLISH报文的用户属性由客户端应用程序定义。PUBLISH报文和遗嘱报文的用户属性由服务端转发给应用消息的接收端。CONNECT,SUBSCRIBE和UNSUBSCRIBE报文的用户属性由服务端实现定义。CONNACK,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBACK,UNSUBACK和AUTH报文的用户属性由发送端定义,且对发送端具有唯一性。MQTT规范不定义用户属性的意义。
- 最大报文长度 允许客户端和服务端各自指定它们支持的最大报文长度。会话参与方发送更大的报文将造成错误。
- 可选的服务端功能可用性 提供定义一组服务端不允许的功能,并告知客户端的机制。可以使用这种方式指定的功能包括:最大QoS等级,保留可用,通配符订阅可用,订阅标识符可用和共享订阅可用。客户端使用服务端通知了(不可用)的功能将造成错误。 在早期版本的MQTT协议中,服务端没有实现的功能通过未授权告知客户端。当客户端使用其中一种(不可用的)功能时,此功能允许服务端告知客户端,并添加特定的原因码。
- 增强的认证 提供一种机制来启用包括互相认证在内的质询/响应风格的认证。这允许在客户端和服务端都支持的情况下使用SASL风格的认证,包括客户端在连接中重新认证的功能。
- 订阅选项 提供主要用于定义允许消息桥接应用的订阅选项。包括不要把消息发送给消息源客户端(非本地)的选项和订阅时处理保留消息的选项。
- 遗嘱延迟 提供指定遗嘱消息在连接中断后延时发送的能力。设计此特性是为了在会话的连接重建的情况下不发送遗嘱消息。此特性允许连接短暂中断而不通知其他客户端。
- 服务端保持连接 允许服务端指定其希望客户端使用的保持连接值。此特性允许服务端设置最大允许的保持连接值并被客户端使用。
- 分配客户标识符 服务端分配了客户标识符的情况下,向客户端返回此客户标识符。服务端分配客户标识符只能用于新开始标志为1的连接。
- 服务端参考 允许服务端使用CONNACK或DISCONNECT报文指定备用服务端。此特性被用于(服务端)重定向或做准备。
Topic的使用案例
以IM场景为例,在标准协议的机制下我们可以通过Topic就能完成大量的用户识别与信息筛选工作。
单聊
用户订阅自己相关的 topic, 进行接收消息
A 订阅 'user/111/#'
B 订阅 'user/222/#'
C 订阅 'user/333/#'
用户发布消息到别人的 topic ,如 A 发消息给 B
A 发消息给 B ----> 发布消息到 'user/222/inbox/111'
A 申请加 B 为好友 ----> 发布消息到 'user/222/application/111'
群聊
用户订阅自己加入的群相关的 topic, 进行接收消息
A 订阅topic ----> 'group/1/#'
B 订阅topic ----> 'group/1/#'
C 订阅topic ----> 'group/1/#'
发消息
A 发消息到 Group1 ----> 发布消息到 'group/1/inbox/111'
B 发消息到 Group1 ----> 发布消息到 'group/1/inbox/222'
C 发消息到 Group1 ----> 发布消息到 'group/1/inbox/333'
加群、邀请
A 申请加入群聊 Group1 ----> 发布消息到 'group/1/application/111'
B 邀请 C 加入 Group1 ----> 发布消息到 'group/1/invitation/333/from/222'
接收邀请(注意是接收不是接受)
A 订阅topic ----> 'group/+/invitation/111/from/+'
B 订阅topic ----> 'group/+/invitation/222/from/+'
B 订阅topic ----> 'group/+/invitation/333/from/+'
群主接收申请(注意是接收不是接受)
A 订阅topic ----> 'group/1/application/+'
多设备登录
A-iPhone 设备登录设置 session-id 为 device-a-iPhone
A-iPad 设备登录设置 session-id 为 device-a-iPad
A-Android 设备登录设置 session-id 为 device-a-Android
- 登录之前可以判断是否有相关 session,如果没有就创建 session 并进行一遍如上订阅操作,当然也可根据设备进行区别订阅。
传输层处理
MQTT的连接层可以建立在TCP/Websocket/QUIC之上,在绝大多数场景下,使用TCP可完成。但在复杂的移动互联网的应用场景下,TCP的在以下几个场景会出现连接故障:
- 网络切换导致经常性连接中断
- 断网后重新建立连接困难:断网后操作系统释放资源较慢,且应用层无法及时感知断开状态,重连时 Server/Client 开销较大
- 弱网环境下数据传输受限于拥塞、丢包侦测和重传机制
因此新一代的MQTT协议往往基于QUIC进行传输层协议通信,使用MQTT over QUIC有以下几点好处:
- 更高级的拥塞控制:有效降低数据丢包率,在测试中在网络波动的情况下仍能持续稳定传输数据
- 运维友好:减少大规模重连导致的开销(时间开销、客户端/服务器性能开销),减少不必要的应用层状态迁移而引发的系统过载(0 RTT)
- 更灵活的架构创新:比如 Direct server return (DSR,服务器直接返回模式),只有入口/请求流量经过 LB,出口和响应流量绕过 LB 直接回到客户端,减少 LB 的瓶颈
- 减少握手延迟 (1 RTT)
- 多路径支持,连接平滑迁移:从 4G 切换到 WIFI, 或者因为 NAT Rebinding 导致五元组发生变化,QUIC 依然可以在新的五元组上继续进行连接状态,尤其适用于网络经常性变化的移动设备
- 更敏捷的开发部署:协议栈的实现在 userspace,能够开发快速迭代
- 端到端加密:未加密的包头带有极少信息, 减少传输路径中中间节点的影响,带来更好的安全性和更可控的用户体验
但是由于QUIC协议基于UDP进行传输,在国内网络环境下,某些网络处理设备终端会对UDP协议包进行封杀,运营商也往往对UDP协议的消息包进行限速(UDP QoS),导致在特定网络场景下无法传递UDP协议包,因此即使使用MQTT over QUIC,也需要提供一定的降级机制,避免长连接建立受到特殊环境影响。
拓展机制
由于我司的长连接相关基架是基于初始的WMQ相关业务能力构建出来的,本身的系统架构建设就是以业务为导向,逐步迭代累积构成。因此在基础设施的弹性扩展上往往并不注重,这也导致我们在基础设施层的迭代上经常受限于业务迭代与不同业务对基架的污染。
想要构建良好的拓展机制的前提,是识别出业务领域与非业务领域之间的界限,首先要做的便是去除接入层与业务形态之间的强绑定关系:对于客户端与服务端之间的交互来说,原则上应当精简为一个连接即可处理所有业务,对于具体的业务处理与分发,通过Topic完成足以满足业务需求。即使在某些特殊业务场景下无法通过单一连接满足需求,也应当将连接地址本身与业务无关化,从而保证接入层入口的平滑过渡。
其次对于所有使用MQTT协议的业务而言,可以通过共有中间件解决的问题不允许再通过业务方进行二次实现,这些公共能力应当去除业务状态,通过升级基建进行迭代升级,例如通过MQTT Broker来完成对于QoS的处理;通过Topic分层实现业务领域的划分等等。
能力对比
以下是我们对我司长连接服务与标准MQTT框架做的一个简单对比:
我司长连接服务 | 标准MQTT框架 | |
---|---|---|
传输层协议 | TCP | TCP、QUIC |
拓扑结构 | Client-Gateway-Router-MQ-Server | Client-Broker-Client |
Topic机制 | 仅用于大范围业务区分 | 基于多层Topic实现 |
命令消息机制 | 基于特定Payload中自定义协议实现 | 基于Topic与Topic通用符处理+必要Payload信息实现(Payload可为空) |
主要消息类型 | 客户端:点对点消息</br>服务端:广播消息(遍历实现) | 广播消息 |
QoS实现 | 业务方通过业务ACK实现 | 在Broker内基于MQTT的QoS Flag与标准PubAck实现 |
遗嘱消息 | 未实现 | 有实现,可以在Client断开连接时由Broker自动发送一条消息到指定的Topic上 |
保留消息 | 需要业务自身处理 | 有实现,在新Client订阅时Broker可以自动向Client推送最后一条保留消息 |
连接状态 | 通过TCP会话判断、部分场景有PingPong,难以处理半连接状态 | 基于完整的MQTT KeepAlive机制实现,可以确保不会出现半连接状态 |
离线消息 | 基于业务方需求实现 | 在Broker内基于持久会话机制实现 |
消息持久化 | 基于业务方需求实现 | 在Broker上扩展持久化层实现 |
通过以上对比我们发现,将现有的Gateway-Router-MQ-Server的消息处理机制进行调整与重构,在未来长连接服务的持续稳定迭代上是具有一定优势的。接下来我们将阐述如何在现有的情况下逐步过渡到一个相对理想的长连接服务上。
四、如何改进
导言
为什么我们的技术演进路线高度不可控,经过一段时间的分析,我认为可以理解为缺少一种基于数理逻辑的技术迭代连续性的认知。我们的技术开发人员往往容易缺乏对于技术发展与业务逻辑迭代的整体认知,容易沉浸在代码的逻辑中,缺乏对其趋势的预判与识别。对于一个技术路径的开发过程只关注其中离散的点,而忽略了其连续性的整体走势。即使输出技术规划,也难以保证其规划与目的的一致性。
这种先验的问题识别能力体现在对于技术演进路径的数理逻辑建模上,我们可以这一切过程简化为一种二维坐标模型,通过分析其连续曲线来完成对整体趋势的把握。通过分析一种技术规划路径的曲线走势,来判断我们的开发过程与规划路径的相关度解释。
对于一个初级模型我们可以用最基础的指标作为维度进行考虑,如复杂度,扩展性,健壮性等,通过对于具有相关性的指标进行二维坐标轴的构建,我们可以得出一系列的技术演进特定指标的变化曲线。
更进一步,通过将变化曲线的速率定义为新的指标,我们可以通过运动的变化趋势进行描述,如将复杂度与可控度进行关联,通过求其导数构建新的指标精确度,再将这些具有关联性的速率指标进行组合,我们就可以得出一套从整体层面描述技术演进路线的二维坐标图。
通过对坐标图中的曲线走势进行拟合,并将具有显著特征的坐标点带入其中,就可以得出不同的函数模型,再讲这些函数模型通过分类学机制进行归类,最终可以大致得出一个网格状的技术路线的发展定位。
修正点
单点问题
标准框架下的MQTT Broker负责了过多功能,使其本身容易成为单点与黑盒,因此我们可以在标准框架上进一步解耦,提高灵活性与节点稳定性。
首先我们可以独立出Broker的Cluster管理能力,将其构建为独立的LB组件,将多个Broker挂载在下面,并实现Broker间的数据共享。
其次,常用的MQTT Broker大多用于轻量级消息传递,其内部MQ的设计无法替代Kafka等用于大规模数据的消息队列,也有HiveMQ、RabbitMQ等MQ产品通过兼容MQTT协议的方式直接进行支持,但这样使用则会丧失MQTT Broker本身的超高连接并发数与大规模广播消息的优势。因此我们可以进行结合使用,在特定领域如IM消息传递等场景下挂载Kafka等分布式消息系统进行深队列处理;而在队列深度和消息长度不高的地方如推送等直接进行Broker转发。
对于持久化问题,我们也可以通过构建基于MQTT的数据库连接器,或者将持久化层通过Kafka与Broker关联,形成可解藕的双向通信机制,从而避免Broker单点的功能堆积。
协议适用性问题
标准协议下的MQTT也不适合直接用于IM与游戏场景,因此我们也需要对其进行一定程度的扩充,因为其基础设计并直接针对用户,而是用于设备间数据通信,因此缺少协议数据内容可变性上的设计。
流程控制
针对直接服务于用户的场景,我们可以充分利用MQTT协议轻量化的优势,将协议消息视作不可变数据,以覆写的方式处理业务逻辑中的替换部分载荷的机制。通过对Broker注入逻辑处理插件的方式,使消息builder内嵌原始消息,并对改变内容进行记录,每一次改变都重新生成消息实例,并通过链路追踪记录变化的标记签名,从而形成完整的业务流程处理路径的识别。
MQTT消息在标准框架内并不方便进行消息的过滤审查等处理,为了保证Broker与业务解耦,因此可以通过后端进行异步审查并返回消息删除状态的方式来实现。
业务领域管理
充分利用Topic的多层级与通配符处理机制,并利用topic自动创建的机制实现命令消息的精简化。
共享订阅是 MQTTV5 提供的新特性,通过共享订阅可以实现多个订阅者之间的负载均衡,具体信息可以参考MQTT 共享订阅机制与MQTT 持久会话与 Clean Session。
基础设施外围化
基础设施的可替换性在新一代结构中尤为重要,在 2024 年的今天,基础设施不再是一成不变的“底层库”的概念,而是一直跟随时代进步的可替换组件,好的设计应当是业务可以在不同的基础设施间随意切换且业务不受影响的。
五、如何实施
根据目前我司的长连接协议使用现状,从保守到激进,我们可能的改造路径可以分为三种:
- Open-Gate 组件功能继续扩展,将业务层的 MQTT Broker 能力剥离出来放入网关层,网关层的这些功能通过 OpenResty 插件的形式进行功能扩展,将所有的端交互约束在网关上,形成统一内外部流量入口
- 基于标准框架,重组现有长连接技术架构。简化网关功能,将网关改造为仅作为连接阶段提供路由能力,由broker完成后续的所有功能,包括加强验证等控制报文的逻辑处理全部由broker实现,网关接入逻辑透明化
- 基于流量网关设计与全协议插件化技术为核心的技术架构。将网关作为入口,只做协议识别和流量控制,起到高防与lsb的作用,消息无条件转发至broker,由broker完成mqtt协议消息处理,任何client不直接访问broker,都通过网关进行对接
对于这三种路线,我们可以通过简易架构图来描述三种路线的形态: