【MQ】java 从零开始实现消息队列 mq-02-如何实现生产者调用消费者?


前景回顾

上一节我们学习了如何实现基于 netty 客服端和服务端的启动。

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】java 从零开始实现消息队列 mq-02-如何实现生产者调用消费者?

那么客户端如何调用服务端呢?

我们本节就来一起实现一下。

【MQ】java 从零开始实现消息队列 mq-02-如何实现生产者调用消费者?

消费者实现

启动类的调整

ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(workerGroup, bossGroup)         .channel(NioServerSocketChannel.class)         .childHandler(new ChannelInitializer<Channel>() {             @Override             protected void initChannel(Channel ch) throws Exception {                 ch.pipeline()                         .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))                         .addLast(new MqConsumerHandler(invokeService));             }         })         // 这个参数影响的是还没有被accept 取出的连接         .option(ChannelOption.SO_BACKLOG, 128)         // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。         .childOption(ChannelOption.SO_KEEPALIVE, true); 

这里我们通过指定分隔符解决 netty 粘包问题。

解决 netty 粘包问题

MqConsumerHandler 处理类

MqConsumerHandler 的实现如下,添加对应的业务处理逻辑。

package com.github.houbb.mq.consumer.handler;  /**  * @author binbin.hou  * @since 1.0.0  */ public class MqConsumerHandler extends SimpleChannelInboundHandler {      private static final Log log = LogFactory.getLog(MqConsumerHandler.class);      /**      * 调用管理类      * @since 1.0.0      */     private final IInvokeService invokeService;      public MqConsumerHandler(IInvokeService invokeService) {         this.invokeService = invokeService;     }      @Override     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {         ByteBuf byteBuf = (ByteBuf) msg;         byte[] bytes = new byte[byteBuf.readableBytes()];         byteBuf.readBytes(bytes);          RpcMessageDto rpcMessageDto = null;         try {             rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);         } catch (Exception exception) {             log.error("RpcMessageDto json 格式转换异常 {}", new String(bytes));             return;         }          if (rpcMessageDto.isRequest()) {             MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx);              if(commonResp == null) {                 log.debug("当前消息为 null,忽略处理。");                 return;             }              writeResponse(rpcMessageDto, commonResp, ctx);         } else {             final String traceId = rpcMessageDto.getTraceId();              // 丢弃掉 traceId 为空的信息             if(StringUtil.isBlank(traceId)) {                 log.debug("[Server Response] response traceId 为空,直接丢弃", JSON.toJSON(rpcMessageDto));                 return;             }              // 添加消息             invokeService.addResponse(traceId, rpcMessageDto);         }     } } 

rpc 消息体定义

为了统一标准,我们的 rpc 消息体 RpcMessageDto 定义如下:

package com.github.houbb.mq.common.rpc;  /**  * @author binbin.hou  * @since 1.0.0  */ public class RpcMessageDto implements Serializable {      /**      * 请求时间      */     private long requestTime;      /**      * 请求标识      */     private String traceId;      /**      * 方法类型      */     private String methodType;      /**      * 是否为请求消息      */     private boolean isRequest;      private String respCode;      private String respMsg;      private String json;      //getter&setter  } 

消息分发

对于接收到的消息体 RpcMessageDto,分发逻辑如下:

/**  * 消息的分发  *  * @param rpcMessageDto 入参  * @param ctx 上下文  * @return 结果  */ private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) {     final String methodType = rpcMessageDto.getMethodType();     final String json = rpcMessageDto.getJson();     String channelId = ChannelUtil.getChannelId(ctx);     log.debug("channelId: {} 接收到 method: {} 内容:{}", channelId,             methodType, json);      // 消息发送     if(MethodType.P_SEND_MESSAGE.equals(methodType)) {         // 日志输出         log.info("收到服务端消息: {}", json);         // 如果是 broker,应该进行处理化等操作。         MqCommonResp resp = new MqCommonResp();         resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());         resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());         return resp;     }     throw new UnsupportedOperationException("暂不支持的方法类型"); } 

这里对于接收到的消息,只做一个简单的日志输出,后续将添加对应的业务逻辑处理。

结果回写

收到请求以后,我们需要返回对应的响应。

基于 channel 的回写实现如下:

/**  * 结果写回  *  * @param req  请求  * @param resp 响应  * @param ctx  上下文  */ private void writeResponse(RpcMessageDto req,                            Object resp,                            ChannelHandlerContext ctx) {     final String id = ctx.channel().id().asLongText();     RpcMessageDto rpcMessageDto = new RpcMessageDto();     // 响应类消息     rpcMessageDto.setRequest(false);     rpcMessageDto.setTraceId(req.getTraceId());     rpcMessageDto.setMethodType(req.getMethodType());     rpcMessageDto.setRequestTime(System.currentTimeMillis());     String json = JSON.toJSONString(resp);     rpcMessageDto.setJson(json);     // 回写到 client 端     ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);     ctx.writeAndFlush(byteBuf);     log.debug("[Server] channel {} response {}", id, JSON.toJSON(rpcMessageDto)); } 

调用管理类

为了方便管理异步返回的请求结果,我们统一定义了 IInvokeService 类,用于管理请求与响应。

接口

package com.github.houbb.mq.common.support.invoke;  import com.github.houbb.mq.common.rpc.RpcMessageDto;  /**  * 调用服务接口  * @author binbin.hou  * @since 1.0.0  */ public interface IInvokeService {      /**      * 添加请求信息      * @param seqId 序列号      * @param timeoutMills 超时时间      * @return this      * @since 1.0.0      */     IInvokeService addRequest(final String seqId,                               final long timeoutMills);      /**      * 放入结果      * @param seqId 唯一标识      * @param rpcResponse 响应结果      * @return this      * @since 1.0.0      */     IInvokeService addResponse(final String seqId, final RpcMessageDto rpcResponse);      /**      * 获取标志信息对应的结果      * @param seqId 序列号      * @return 结果      * @since 1.0.0      */     RpcMessageDto getResponse(final String seqId);  } 

实现

实现本身也不难。

package com.github.houbb.mq.common.support.invoke.impl;  /**  * 调用服务接口  * @author binbin.hou  * @since 1.0.0  */ public class InvokeService implements IInvokeService {      private static final Log logger = LogFactory.getLog(InvokeService.class);      /**      * 请求序列号 map      * (1)这里后期如果要添加超时检测,可以添加对应的超时时间。      * 可以把这里调整为 map      *      * key: seqId 唯一标识一个请求      * value: 存入该请求最长的有效时间。用于定时删除和超时判断。      * @since 0.0.2      */     private final ConcurrentHashMap<String, Long> requestMap;      /**      * 响应结果      * @since 1.0.0      */     private final ConcurrentHashMap<String, RpcMessageDto> responseMap;      public InvokeService() {         requestMap = new ConcurrentHashMap<>();         responseMap = new ConcurrentHashMap<>();          final Runnable timeoutThread = new TimeoutCheckThread(requestMap, responseMap);         Executors.newScheduledThreadPool(1)                 .scheduleAtFixedRate(timeoutThread,60, 60, TimeUnit.SECONDS);     }      @Override     public IInvokeService addRequest(String seqId, long timeoutMills) {         logger.debug("[Invoke] start add request for seqId: {}, timeoutMills: {}", seqId,                 timeoutMills);          final long expireTime = System.currentTimeMillis()+timeoutMills;         requestMap.putIfAbsent(seqId, expireTime);          return this;     }      @Override     public IInvokeService addResponse(String seqId, RpcMessageDto rpcResponse) {         // 1. 判断是否有效         Long expireTime = this.requestMap.get(seqId);         // 如果为空,可能是这个结果已经超时了,被定时 job 移除之后,响应结果才过来。直接忽略         if(ObjectUtil.isNull(expireTime)) {             return this;         }          //2. 判断是否超时         if(System.currentTimeMillis() > expireTime) {             logger.debug("[Invoke] seqId:{} 信息已超时,直接返回超时结果。", seqId);             rpcResponse = RpcMessageDto.timeout();         }          // 这里放入之前,可以添加判断。         // 如果 seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃。         // 通知所有等待方         responseMap.putIfAbsent(seqId, rpcResponse);         logger.debug("[Invoke] 获取结果信息,seqId: {}, rpcResponse: {}", seqId, JSON.toJSON(rpcResponse));         logger.debug("[Invoke] seqId:{} 信息已经放入,通知所有等待方", seqId);          // 移除对应的 requestMap         requestMap.remove(seqId);         logger.debug("[Invoke] seqId:{} remove from request map", seqId);          // 同步锁         synchronized (this) {             this.notifyAll();             logger.debug("[Invoke] {} notifyAll()", seqId);         }           return this;     }      @Override     public RpcMessageDto getResponse(String seqId) {         try {             RpcMessageDto rpcResponse = this.responseMap.get(seqId);             if(ObjectUtil.isNotNull(rpcResponse)) {                 logger.debug("[Invoke] seq {} 对应结果已经获取: {}", seqId, rpcResponse);                 return rpcResponse;             }              // 进入等待             while (rpcResponse == null) {                 logger.debug("[Invoke] seq {} 对应结果为空,进入等待", seqId);                  // 同步等待锁                 synchronized (this) {                     this.wait();                 }                  logger.debug("[Invoke] {} wait has notified!", seqId);                  rpcResponse = this.responseMap.get(seqId);                 logger.debug("[Invoke] seq {} 对应结果已经获取: {}", seqId, rpcResponse);             }              return rpcResponse;         } catch (InterruptedException e) {             logger.error("获取响应异常", e);             throw new MqException(MqCommonRespCode.RPC_GET_RESP_FAILED);         }     }  } 

这里 getResponse 获取不到会进入等待,直到 addResponse 唤醒。

但是这也有一个问题,如果一个请求的响应丢失了怎么办?

总不能一直等待吧。

TimeoutCheckThread 超时检测线程

超时检测线程就可以帮我们处理一些超时未返回的结果。

package com.github.houbb.mq.common.support.invoke.impl;  import com.github.houbb.heaven.util.common.ArgUtil; import com.github.houbb.mq.common.rpc.RpcMessageDto;  import java.util.Map; import java.util.concurrent.ConcurrentHashMap;  /**  * 超时检测线程  * @author binbin.hou  * @since 0.0.2  */ public class TimeoutCheckThread implements Runnable {      /**      * 请求信息      * @since 0.0.2      */     private final ConcurrentHashMap<String, Long> requestMap;      /**      * 请求信息      * @since 0.0.2      */     private final ConcurrentHashMap<String, RpcMessageDto> responseMap;      /**      * 新建      * @param requestMap  请求 Map      * @param responseMap 结果 map      * @since 0.0.2      */     public TimeoutCheckThread(ConcurrentHashMap<String, Long> requestMap,                               ConcurrentHashMap<String, RpcMessageDto> responseMap) {         ArgUtil.notNull(requestMap, "requestMap");         this.requestMap = requestMap;         this.responseMap = responseMap;     }      @Override     public void run() {         for(Map.Entry<String, Long> entry : requestMap.entrySet()) {             long expireTime = entry.getValue();             long currentTime = System.currentTimeMillis();              if(currentTime > expireTime) {                 final String key = entry.getKey();                 // 结果设置为超时,从请求 map 中移除                 responseMap.putIfAbsent(key, RpcMessageDto.timeout());                 requestMap.remove(key);             }         }     }  } 

处理逻辑就是定时检测,如果超时了,就默认设置结果为超时,并且从请求集合中移除。

消息生产者实现

启动核心类

public class MqProducer extends Thread implements IMqProducer {      private static final Log log = LogFactory.getLog(MqProducer.class);      /**      * 分组名称      */     private final String groupName;      /**      * 端口号      */     private final int port;      /**      * 中间人地址      */     private String brokerAddress  = "";      /**      * channel 信息      * @since 0.0.2      */     private ChannelFuture channelFuture;      /**      * 客户端处理 handler      * @since 0.0.2      */     private ChannelHandler channelHandler;      /**      * 调用管理服务      * @since 0.0.2      */     private final IInvokeService invokeService = new InvokeService();      /**      * 获取响应超时时间      * @since 0.0.2      */     private long respTimeoutMills = 5000;      /**      * 可用标识      * @since 0.0.2      */     private volatile boolean enableFlag = false;      /**      * 粘包处理分隔符      * @since 1.0.0      */     private String delimiter = DelimiterUtil.DELIMITER;      //set 方法            @Override     public synchronized void run() {         // 启动服务端         log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",                 groupName, port, brokerAddress);          EventLoopGroup workerGroup = new NioEventLoopGroup();          try {             // channel handler             this.initChannelHandler();              // 省略,同以前              // 标识为可用             enableFlag = true;         } catch (Exception e) {             log.error("MQ 生产者启动遇到异常", e);             throw new MqException(ProducerRespCode.RPC_INIT_FAILED);         }     }  } 

其中初始化 handler 的实现如下:

private void initChannelHandler() {     final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(delimiter);      final MqProducerHandler mqProducerHandler = new MqProducerHandler();     mqProducerHandler.setInvokeService(invokeService);      // handler 实际上会被多次调用,如果不是 @Shareable,应该每次都重新创建。     ChannelHandler handler = new ChannelInitializer<Channel>() {         @Override         protected void initChannel(Channel ch) throws Exception {             ch.pipeline()                     .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))                     .addLast(mqProducerHandler);         }     };     this.channelHandler = handler; } 

MqProducerHandler 生产者处理逻辑

和消费者处理逻辑类似。

这里最核心的就是添加响应结果:invokeService.addResponse(rpcMessageDto.getTraceId(), rpcMessageDto);

package com.github.houbb.mq.producer.handler;  /**  * @author binbin.hou  * @since 1.0.0  */ public class MqProducerHandler extends SimpleChannelInboundHandler {      private static final Log log = LogFactory.getLog(MqProducerHandler.class);      /**      * 调用管理类      */     private IInvokeService invokeService;      public void setInvokeService(IInvokeService invokeService) {         this.invokeService = invokeService;     }      @Override     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {         ByteBuf byteBuf = (ByteBuf)msg;         byte[] bytes = new byte[byteBuf.readableBytes()];         byteBuf.readBytes(bytes);          String text = new String(bytes);         log.debug("[Client] channelId {} 接收到消息 {}", ChannelUtil.getChannelId(ctx), text);          RpcMessageDto rpcMessageDto = null;         try {             rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);         } catch (Exception exception) {             log.error("RpcMessageDto json 格式转换异常 {}", JSON.parse(bytes));             return;         }          if(rpcMessageDto.isRequest()) {             // 请求类             final String methodType = rpcMessageDto.getMethodType();             final String json = rpcMessageDto.getJson();         } else {             // 丢弃掉 traceId 为空的信息             if(StringUtil.isBlank(rpcMessageDto.getTraceId())) {                 log.debug("[Client] response traceId 为空,直接丢弃", JSON.toJSON(rpcMessageDto));                 return;             }              invokeService.addResponse(rpcMessageDto.getTraceId(), rpcMessageDto);             log.debug("[Client] response is :{}", JSON.toJSON(rpcMessageDto));         }     } } 

消息的发送

关心请求结果的:

public SendResult send(MqMessage mqMessage) {     String messageId = IdHelper.uuid32();     mqMessage.setTraceId(messageId);     mqMessage.setMethodType(MethodType.P_SEND_MESSAGE);     MqCommonResp resp = callServer(mqMessage, MqCommonResp.class);     if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {         return SendResult.of(messageId, SendStatus.SUCCESS);     }     return SendResult.of(messageId, SendStatus.FAILED); } 

不关心请求结果的发送:

public SendResult sendOneWay(MqMessage mqMessage) {     String messageId = IdHelper.uuid32();     mqMessage.setTraceId(messageId);     mqMessage.setMethodType(MethodType.P_SEND_MESSAGE);     this.callServer(mqMessage, null);     return SendResult.of(messageId, SendStatus.SUCCESS); } 

其中 callServer 实现如下:

/**  * 调用服务端  * @param commonReq 通用请求  * @param respClass 类  * @param <T> 泛型  * @param <R> 结果  * @return 结果  * @since 1.0.0  */ public <T extends MqCommonReq, R extends MqCommonResp> R callServer(T commonReq, Class<R> respClass) {     final String traceId = commonReq.getTraceId();     final long requestTime = System.currentTimeMillis();     RpcMessageDto rpcMessageDto = new RpcMessageDto();     rpcMessageDto.setTraceId(traceId);     rpcMessageDto.setRequestTime(requestTime);     rpcMessageDto.setJson(JSON.toJSONString(commonReq));     rpcMessageDto.setMethodType(commonReq.getMethodType());     rpcMessageDto.setRequest(true);     // 添加调用服务     invokeService.addRequest(traceId, respTimeoutMills);      // 遍历 channel     // 关闭当前线程,以获取对应的信息     // 使用序列化的方式     ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);     //负载均衡获取 channel     Channel channel = channelFuture.channel();     channel.writeAndFlush(byteBuf);     String channelId = ChannelUtil.getChannelId(channel);      log.debug("[Client] channelId {} 发送消息 {}", channelId, JSON.toJSON(rpcMessageDto));     if (respClass == null) {         log.debug("[Client] 当前消息为 one-way 消息,忽略响应");         return null;     } else {         //channelHandler 中获取对应的响应         RpcMessageDto messageDto = invokeService.getResponse(traceId);         if (MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())) {             throw new MqException(MqCommonRespCode.TIMEOUT);         }         String respJson = messageDto.getJson();         return JSON.parseObject(respJson, respClass);     } } 

测试代码

启动消费者

MqConsumerPush mqConsumerPush = new MqConsumerPush(); mqConsumerPush.start(); 

启动日志如下:

[DEBUG] [2022-04-21 19:55:26.346] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2022-04-21 19:55:26.369] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress:  [INFO] [2022-04-21 19:55:27.845] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动完成,监听【9527】端口 

启动生产者

MqProducer mqProducer = new MqProducer(); mqProducer.start();  //等待启动完成 while (!mqProducer.isEnableFlag()) {     System.out.println("等待初始化完成...");     DateUtil.sleep(100); }  String message = "HELLO MQ!"; MqMessage mqMessage = new MqMessage(); mqMessage.setTopic("TOPIC"); mqMessage.setTags(Arrays.asList("TAGA", "TAGB")); mqMessage.setPayload(message.getBytes(StandardCharsets.UTF_8));  SendResult sendResult = mqProducer.send(mqMessage); System.out.println(JSON.toJSON(sendResult)); 

生产者日志:

[INFO] [2022-04-21 19:56:39.609] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端完成,监听端口:9527 ... [DEBUG] [2022-04-21 19:56:39.895] [main] [c.g.h.m.c.s.i.i.InvokeService.addRequest] - [Invoke] start add request for seqId: a70ea2c4325641d6a5b198323228dc24, timeoutMills: 5000 ... [DEBUG] [2022-04-21 19:56:40.282] [main] [c.g.h.m.c.s.i.i.InvokeService.getResponse] - [Invoke] seq a70ea2c4325641d6a5b198323228dc24 对应结果已经获取: com.github.houbb.mq.common.rpc.RpcMessageDto@a8f0b4 ... {"messageId":"a70ea2c4325641d6a5b198323228dc24","status":"SUCCESS"} 

消费者日志:

[DEBUG] [2022-04-21 19:56:40.179] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.dispatch] - channelId: 502b73fffec4485c-00003954-00000001-384d194f6233433e-c8246542 接收到 method: P_SEND_MESSAGE 内容:{"methodType":"P_SEND_MESSAGE","payload":"SEVMTE8gTVEh","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"a70ea2c4325641d6a5b198323228dc24"}  [INFO] [2022-04-21 19:56:40.180] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.dispatch] - 收到服务端消息: {"methodType":"P_SEND_MESSAGE","payload":"SEVMTE8gTVEh","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"a70ea2c4325641d6a5b198323228dc24"}  [DEBUG] [2022-04-21 19:56:40.234] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.writeResponse] - [Server] channel 502b73fffec4485c-00003954-00000001-384d194f6233433e-c8246542 response {"requestTime":1650542200182,"traceId":"a70ea2c4325641d6a5b198323228dc24","request":false,"methodType":"P_SEND_MESSAGE","json":"{"respCode":"0000","respMessage":"成功"}"} 

可以看到消费者成功的获取到了生产者的消息。

小结

到这里,我们就实现了一个消息生产者调用消费者的实现。

但是你可能会问,这不就是 rpc 吗?

没有解耦。

是的,为了解决耦合问题,我们将在下一节引入 broker 消息的中间人。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

拓展阅读

rpc-从零开始实现 rpc https://github.com/houbb/rpc

发表评论

相关文章