【mq】从零开始实现 mq-02-如何实现生产者调用消费者?
【mq】从零开始实现 mq-03-引入 broker 中间人
【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat
【mq】从零开始实现 mq-07-负载均衡 load balance
【mq】从零开始实现 mq-09-消费者拉取消息 pull message
【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack
【mq】从零开始实现 mq-11-消费者消息回执添加分组信息 pull message ack groupName
对于消息的发送,有时候可能需要一次发送多个,比如日志消息等。
批量操作可以提升性能。
本节老马就和大家一起添加一点批量特性。
/** * 同步发送消息-批量 * @param mqMessageList 消息类型 * @return 结果 * @since 0.1.3 */ SendBatchResult sendBatch(final List<MqMessage> mqMessageList); /** * 单向发送消息-批量 * @param mqMessageList 消息类型 * @return 结果 * @since 0.1.3 */ SendBatchResult sendOneWayBatch(final List<MqMessage> mqMessageList);
一次支持发送多个消息。
生产者实现如下。
@Override public SendBatchResult sendBatch(List<MqMessage> mqMessageList) { final List<String> messageIdList = this.fillMessageList(mqMessageList); final MqMessageBatchReq batchReq = new MqMessageBatchReq(); batchReq.setMqMessageList(mqMessageList); String traceId = IdHelper.uuid32(); batchReq.setTraceId(traceId); batchReq.setMethodType(MethodType.P_SEND_MSG_BATCH); return Retryer.<SendBatchResult>newInstance() .maxAttempt(maxAttempt) .callable(new Callable<SendBatchResult>() { @Override public SendBatchResult call() throws Exception { return doSendBatch(messageIdList, batchReq, false); } }).retryCall(); } @Override public SendBatchResult sendOneWayBatch(List<MqMessage> mqMessageList) { List<String> messageIdList = this.fillMessageList(mqMessageList); MqMessageBatchReq batchReq = new MqMessageBatchReq(); batchReq.setMqMessageList(mqMessageList); String traceId = IdHelper.uuid32(); batchReq.setTraceId(traceId); batchReq.setMethodType(MethodType.P_SEND_MSG_ONE_WAY_BATCH); return doSendBatch(messageIdList, batchReq, true); } private SendBatchResult doSendBatch(List<String> messageIdList, MqMessageBatchReq batchReq, boolean oneWay) { log.info("[Producer] 批量发送消息 messageIdList: {}, batchReq: {}, oneWay: {}", messageIdList, JSON.toJSON(batchReq), oneWay); // 以第一个 sharding-key 为准。 // 后续的会被忽略 MqMessage mqMessage = batchReq.getMqMessageList().get(0); Channel channel = getChannel(mqMessage.getShardingKey()); //one-way if(oneWay) { log.warn("[Producer] ONE-WAY send, ignore result"); return SendBatchResult.of(messageIdList, SendStatus.SUCCESS); } MqCommonResp resp = callServer(channel, batchReq, MqCommonResp.class); if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { return SendBatchResult.of(messageIdList, SendStatus.SUCCESS); } throw new MqException(ProducerRespCode.MSG_SEND_FAILED); }
ps: 这里和单个发送有一个区别,那就是对于 channel 的选择。因为只能选择一个,所以不能兼顾每一个消息的 sharding-key。
// 生产者消息发送-批量 if(MethodType.P_SEND_MSG_BATCH.equals(methodType)) { return handleProducerSendMsgBatch(channelId, json); } // 生产者消息发送-ONE WAY-批量 if(MethodType.P_SEND_MSG_ONE_WAY_BATCH.equals(methodType)) { handleProducerSendMsgBatch(channelId, json); return null; }
/** * 处理生产者发送的消息 * * @param channelId 通道标识 * @param json 消息体 * @since 0.1.3 */ private MqCommonResp handleProducerSendMsgBatch(String channelId, String json) { MqMessageBatchReq batchReq = JSON.parseObject(json, MqMessageBatchReq.class); final ServiceEntry serviceEntry = registerProducerService.getServiceEntry(channelId); List<MqMessagePersistPut> putList = buildPersistPutList(batchReq, serviceEntry); MqCommonResp commonResp = mqBrokerPersist.putBatch(putList); // 遍历异步推送 for(MqMessagePersistPut persistPut : putList) { this.asyncHandleMessage(persistPut); } return commonResp; }
这里对消息列表进行持久化保存。
演示的持久化策略如下:
@Override public MqCommonResp putBatch(List<MqMessagePersistPut> putList) { // 构建列表 for(MqMessagePersistPut put : putList) { this.doPut(put); } MqCommonResp commonResp = new MqCommonResp(); commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return commonResp; }
以前的实现方式是每一个消息消费完成之后,进行一次 ACK。
对于 pull 策略的消息消费,我们可以等当前批次结束,统一进行 ACK 回执。
实现调整如下:
for(MqTopicTagDto tagDto : subscribeList) { final String topicName = tagDto.getTopicName(); final String tagRegex = tagDto.getTagRegex(); MqConsumerPullResp resp = consumerBrokerService.pull(topicName, tagRegex, size); if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { List<MqMessage> mqMessageList = resp.getList(); if(CollectionUtil.isNotEmpty(mqMessageList)) { List<MqConsumerUpdateStatusDto> statusDtoList = new ArrayList<>(mqMessageList.size()); for(MqMessage mqMessage : mqMessageList) { IMqConsumerListenerContext context = new MqConsumerListenerContext(); final String messageId = mqMessage.getTraceId(); ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context); log.info("消息:{} 消费结果 {}", messageId, consumerStatus); // 状态同步更新 if(!ackBatchFlag) { MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus); log.info("消息:{} 状态回执结果 {}", messageId, JSON.toJSON(ackResp)); } else { // 批量 MqConsumerUpdateStatusDto statusDto = new MqConsumerUpdateStatusDto(); statusDto.setMessageId(messageId); statusDto.setMessageStatus(consumerStatus.getCode()); statusDto.setConsumerGroupName(groupName); statusDtoList.add(statusDto); } } // 批量执行 if(ackBatchFlag) { MqCommonResp ackResp = consumerBrokerService.consumerStatusAckBatch(statusDtoList); log.info("消息:{} 状态批量回执结果 {}", statusDtoList, JSON.toJSON(ackResp)); statusDtoList = null; } } } else { log.error("拉取消息失败: {}", JSON.toJSON(resp)); } }
如果 ackBatchFlag = false,则处理逻辑和以前一样。
如果 ackBatchFlag = true,则首先把消息放到 list 中,结束后统一执行。
//消费者消费状态 ACK-批量 if(MethodType.C_CONSUMER_STATUS_BATCH.equals(methodType)) { MqConsumerUpdateStatusBatchReq req = JSON.parseObject(json, MqConsumerUpdateStatusBatchReq.class); final List<MqConsumerUpdateStatusDto> statusDtoList = req.getStatusList(); return mqBrokerPersist.updateStatusBatch(statusDtoList); }
默认的持久化实现,更新如下:
@Override public MqCommonResp updateStatusBatch(List<MqConsumerUpdateStatusDto> statusDtoList) { for(MqConsumerUpdateStatusDto statusDto : statusDtoList) { this.doUpdateStatus(statusDto.getMessageId(), statusDto.getConsumerGroupName(), statusDto.getMessageStatus()); } MqCommonResp commonResp = new MqCommonResp(); commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return commonResp; }
遍历每一个元素,进行状态的更新。
异步和批量,是提升性能最常用的 2 种方式。
批量的实现相关来说是最简单,也是效果最显著的。
希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
我是老马,期待与你的下次重逢。
The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq
rpc-从零开始实现 rpc https://github.com/houbb/rpc