基于Locust实现MQTT协议服务的压测脚本

最近在忙业务的间隙,穿插着做了些性能测试。

一、背景简介

业务背景大概介绍一下,就是按照国标规定,车辆需要上传一些指定的数据到ZF的指定平台,同时车辆也会把数据传到企业云端服务上,于是乎就产生了一些性能需求。

目前我们只是先简单的进行了一个性能场景的测试,就是评估目前服务是否能够支持,预期的最大同时在线车辆上传数据。经过评估,在线车辆数据按照预期的10倍来进行的,并且后面增加持续运行12h查看服务链路的稳定性。

本篇并不是一个严谨的性能测试过程结果分享,主要是分享下关于mqtt协议服务的压测脚本的编写。因为之前我也没接触过MQTT协议的压测,网上关于相关的压测脚本的内容也比较杂乱,所以记录一下,仅供参考。

捋一下链路就知道需要生成哪些数据(因为服务还未上线使用,所以产生的压测数据后面可以直接清理掉即可。):

  1. 一些前置数据:比如数据库、缓存里涉及到的车辆数据,通信秘钥数据等等,这些可以之前写脚本一次性生成即可。
  2. 车辆上报的数据:车辆上报到云端的数据,是经过一系列加密转码,期间还要设计到解密等,这个经过评估,可以简化其中的某些环境,所以所有的车可以直接发送相同的数据即可。
  3. 车辆数据:最后就是生成对应的车辆数据,同时在线,按照评估的频率发送数据。

其中第1、2的数据在之前针对性的分别生成即可,第3步的车辆发送数据就是压测脚本要干的事情了。

二、技术选型

这个倒是很快,搜索引擎大概搜了一下,内容很少,或者说对我有用的内容很少。有看到jmeter有相关插件的,但是这个方案基本上我都是否决的,一来我不擅长用,而来我觉得用起来肯定会比自己编码要麻烦的多。

所以就继续编码好了,仍然首选python,想到了locust库,后来看官方文档的时候,看到locust也针对mqtt协议拓展了一些内容。但是我尝试下来不太符合我这的需求,也可能当时我用的不对吧,所以就只能自己来从零开始编写了。

搜索中又发现Python中用于mqtt协议的库叫paho.mqtt,支持连接代理,消息的订阅、收发等等,于是最后确定使用:locust+paho.mqtt的组合来实现本次的负载脚本。

三、代码编写

1. 脚本代码

暂时没做代码分层,目前场景简单,就直接都放一个模块里了,有点长,先贴上来,后面部分会对脚本的重点内容进行拆解。

脚本目前做了这些事情:

  • 从db中查询有效可用的所有测试车辆信息数据
  • 根据命令行的输入参数,指定启动的车辆数,以及与broker代理建立连接的频率
  • 建立连接成功的车辆,就可以根据脚本里指定的频次,来像broker发送数据
  • 脚本统计连接数、请求数、响应时间等信息写到报表中
  • 调试遇到车辆会批量断开连接的情况,增加了当车辆断开连接时,把断开时间、车辆信息写到本地csv中,方便第二天来查看分析。
import csv import datetime import queue import os import sys import time import ssl  from paho.mqtt import client as mqtt_client  # 根据不同系统进行路径适配 if os.name == "nt":     path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))     sys.path.insert(0, path)     from GB_test.utils.mysql_operating import DB elif os.name == "posix":     sys.path.append("/app/qa_test_app/")     from GB_test.utils.mysql_operating import DB  from locust import User, TaskSet, events, task, between, run_single_user   BROKER_ADDRESS = "broker服务地址" PORT = 1111 PASSWORD = "111111" PUBLISH_TIMEOUT = 10000  # 超时时间 TEST_TOPIC = "test_topic"  TEST_VALUE = [16, 3, -26, 4, 0, 36,.......]  # 用来publish的测试数据,仅示意  BYTES_DATA = bytes(i % 256 for i in TEST_VALUE)  # 业务需要转换成 byte 类型后再发送  # 创建队列 client_queue = queue.Queue()  # 连接DB,读取车辆数据 db = DB("db_vmd") select_sql = "select xxxx"   client_list = db.fetch_all(select_sql) print("车辆数据查询完毕,数据量:{}".format(len(client_list))) for t in client_list:     # 把可用的车辆信息存到队列中去     client_queue.put(t)   def fire_success(**kwargs):     """请求成功时调用"""     events.request.fire(**kwargs)   def calculate_resp_time(t1, t2):     """计算响应时间"""     return int((t2 - t1) * 1000)   class MQTTMessage:     """已发送的消息实体类"""     def __init__(self, _type, qos, topic, payload, start_time, timeout):         self.type = _type,         self.qos = qos,         self.topic = topic         self.payload = payload         self.start_time = start_time         self.timeout = timeout   # 统计总共发送成功的消息数量 total_published = 0 disconnect_record_list = []  # 定义存放连接断开的记录的列表容器   class PublishTask(TaskSet):      @task     def task_publish(self):         self.client.loop_start()         topic = TEST_TOPIC         payload = BYTES_DATA         # 记录发送的开始时间         start_time = time.time()         mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)         published_mid = mqtt_msg_info.mid         # 将发送成功的消息内容,放入client实例的 published_message 字段         self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,                                                                    0,                                                                    topic,                                                                    payload,                                                                    start_time,                                                                    PUBLISH_TIMEOUT)         # 发送成功回调         self.client.on_publish = self.on_publish         # 断开连接回调         self.client.on_disconnect = self.on_disconnect      @staticmethod     def on_disconnect(client, userdata, rc):         """ broker连接断开,放入列表容器"""         disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]         disconnect_record_list.append(disconnected_info)         print("rc状态:{} - -".format(rc), "{}-broker连接已断开".format(str(client._client_id)))      @staticmethod     def on_publish(client, userdata, mid):         if mid:             # 记录消息发送成功的时间             end_time = time.time()             # 从已发送的消息容器中,取出消息             message = client.published_message.pop(mid, None)             # 计算开始发送到发送成功的耗时             publish_resp_time = calculate_resp_time(message.start_time, end_time)             fire_success(                 request_type="p_success",                 name="client_id: " + str(client._client_id),                 response_time=publish_resp_time,                 response_length=len(message.payload),                 exception=None,                 context=None             )             global total_published             # 成功发送累加1             total_published += 1   class MQTTLocustUser(User):     tasks = [PublishTask]     wait_time = between(2, 2)      def __init__(self, *args, **kwargs):         super().__init__(*args, **kwargs)         # 从队列中获取客户端 username 和 client_id         current_client = client_queue.get()          self.client = mqtt_client.Client(current_client[1])         self.client.username_pw_set(current_client[0], PASSWORD)         # self.client.username_pw_set(current_client[0] + "1", PASSWORD)  # 模拟client连接报错          # 定义一个容器,存放已发送的消息         self.client.published_message = {}      def on_start(self):         # 设置tls         context = ssl.SSLContext(ssl.PROTOCOL_TLS)         self.client.tls_set_context(context)          self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)         self.client.on_connect = self.on_connect      def on_stop(self):         print("publish 成功, 当前已成功发送数量:{}".format(total_published))         if len(disconnect_record_list) == 0:             print("无断开连接的client")         else:             # 把断开记录里的信息写入csv             with open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:                 writer = csv.writer(csvfile)                 writer.writerow(['client_id', 'rc_status', 'disconnected_time'])                 for i in disconnect_record_list:                     writer.writerow(i)             print("断开连接的client信息已写入csv文件")      @staticmethod     def on_connect(client, userdata, flags, rc, props=None):         if rc == 0:             print("rc状态:{} - -".format(rc), "{}-连接broker成功".format(str(client._client_id)))             fire_success(                 request_type="c_success",                 name='count_connected',                 response_time=0,                 response_length=0,                 exception=None,                 context=None             )         else:             print("rc状态:{} - -".format(rc), "{}-连接broker失败".format(str(client._client_id)))             fire_success(                 request_type="c_fail",                 name="client_id: " + str(client._client_id),                 response_time=0,                 response_length=0,                 exception=None,                 context=None             )   if __name__ == '__main__':     run_single_user(MQTTLocustUser) 

2. 代码分析-locust库部分

并发请求能力还是使用的locust库的能力。官方只提供了http协议接口的相关类,没直接提供mqtt协议的,但是我们可以按照官方的规范,自定义相关的类,只要继承UserTaskSet即可。

User

首先是先定义User类,这里就是用来生成我要用来测试的车辆。

基于Locust实现MQTT协议服务的压测脚本

类初始化的时候,黄色框里,会去队列里取出车辆信息,用来做一些相关的设置。client来源于from paho.mqtt import client as mqtt_client提供的能力,固定用法,按照人家的文档使用就行。

红色框里,是User类的2个重要熟悉属性:

  • tasks: 这里定义了生成的用户需要去干哪些事情,也就是对应脚本里的PublishTask类下面定义的内容。
  • wait_time: 用户在执行task时间隔停留的时间,可以是个区间,在里面随机。我这里意思是每2s发送一次数据到broker。

绿色框里,定义了一个字典容器,用来存放当前用户已发送成功的消息内容,因为后面我要取出来把里面相关的数据写到生成的报表中去。

蓝色框里有2个方法,也是locust提供的能力:

  • on_start:当用户开始运行时调用,这里我做了车辆连接broker代理的处理,注意这里需要设置tls,因为服务连接需要。
    基于Locust实现MQTT协议服务的压测脚本

  • on_stop:当用户结束运行时调用,这里我做了一些其他的处理,比如把运行期间断开连接的车辆信息写到本地csv中。
    基于Locust实现MQTT协议服务的压测脚本

TaskSet

定义好User类,就需要来定义TaskSet类,你得告诉产生出来的用户,要干点啥。

我这根据业务需要,就是让车辆不停的像broker发送数据即可。
基于Locust实现MQTT协议服务的压测脚本

红色部分,同样是paho.mqtt提供的能力,会启动新的线程去执行你定义的事情。

黄色部分,就是做发送数据的操作,并且我可以拿到一些返回,查看源码就可以知道返回的是MQTTMessageInfo类。

基于Locust实现MQTT协议服务的压测脚本

注意返回的2个属性:

  • mid: 返回这个消息发送的顺序
  • rc: 表示发送的响应状态,0 就是成功

绿色部分,还记得我在上面的User类中定义了一个容器,在这里就把发送的消息相关信息放到容器中去,留着后面使用。

2. 代码分析-paho.mqtt库部分

上面的代码已经用到了不少paho.mqtt的能力,这里再进行整体梳理下。

  • client.Client():声明一个client
  • client.username_pw_set(): 设置客户端的用户名,密码
  • client.tls_set_context: 设置ssl模式
  • client.connect(): 连接代理
  • client.publish:向代理推送消息

还用到了一些回调函数:

  • on_connect:连接操作成功时回调
  • on_publish:发布成功时回调
  • on_disconnect:客户端与代理断开连接时回调

另外还用到了一个事件函数events.request

基于Locust实现MQTT协议服务的压测脚本

当客户端发送请求时会调用,不管是请求成功还是请求失败;当我需要自定义我的报告内容时,就需要用到这个event

基于Locust实现MQTT协议服务的压测脚本

查看源码,知道里面要传哪些参数,那我们在调用时候就需要传入对应的参数。

比如我在发送回调函数里调用了该方法。

基于Locust实现MQTT协议服务的压测脚本

所以最后在控制台显示的报告里就有我定义的内容了。

基于Locust实现MQTT协议服务的压测脚本

由于后来在使用中发现,不知道会在什么时候出现批量断开的情况,于是在on_disconnect回调函数里增加了对应处理,把相关的断开信息记录下来,运行结束的时候写到本地文件里去。

基于Locust实现MQTT协议服务的压测脚本

后来我主动尝试客户端断开的情况测试了下文件的写入结果,功能正常。

基于Locust实现MQTT协议服务的压测脚本

三、小结

后面就开始运行了,在运行过程中,开发关注链路服务的各项指标,这里就不展开了,业务缠身就并没有过多的去做这个事情,况且也不专业。确实也发现了不少问题,后面逐步优化,再继续测试。

现在稳定运行12h,服务正常,暂时就先告一段落了。后面还有会相关其他性能测试场景,届时就可以针对性的展开分享下了。

另外,这个脚本分享也只是仅供参考,现在我这是使用简单,本着能用就行,可能存在一些不合理需要优化的地方,有需要的朋友还请自行查阅相关文档。

发表评论

相关文章