上一节我们学习了如何实现基于 netty 客服端和服务端的启动。
【mq】java 从零开始实现消息队列 mq-02-如何实现生产者调用消费者?
那么客户端如何调用服务端呢?
我们本节就来一起实现一下。
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(workerGroup, bossGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf)) .addLast(new MqConsumerHandler(invokeService)); } }) // 这个参数影响的是还没有被accept 取出的连接 .option(ChannelOption.SO_BACKLOG, 128) // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 .childOption(ChannelOption.SO_KEEPALIVE, true);
这里我们通过指定分隔符解决 netty 粘包问题。
MqConsumerHandler 的实现如下,添加对应的业务处理逻辑。
package com.github.houbb.mq.consumer.handler; /** * @author binbin.hou * @since 1.0.0 */ public class MqConsumerHandler extends SimpleChannelInboundHandler { private static final Log log = LogFactory.getLog(MqConsumerHandler.class); /** * 调用管理类 * @since 1.0.0 */ private final IInvokeService invokeService; public MqConsumerHandler(IInvokeService invokeService) { this.invokeService = invokeService; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); RpcMessageDto rpcMessageDto = null; try { rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class); } catch (Exception exception) { log.error("RpcMessageDto json 格式转换异常 {}", new String(bytes)); return; } if (rpcMessageDto.isRequest()) { MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx); if(commonResp == null) { log.debug("当前消息为 null,忽略处理。"); return; } writeResponse(rpcMessageDto, commonResp, ctx); } else { final String traceId = rpcMessageDto.getTraceId(); // 丢弃掉 traceId 为空的信息 if(StringUtil.isBlank(traceId)) { log.debug("[Server Response] response traceId 为空,直接丢弃", JSON.toJSON(rpcMessageDto)); return; } // 添加消息 invokeService.addResponse(traceId, rpcMessageDto); } } }
为了统一标准,我们的 rpc 消息体 RpcMessageDto
定义如下:
package com.github.houbb.mq.common.rpc; /** * @author binbin.hou * @since 1.0.0 */ public class RpcMessageDto implements Serializable { /** * 请求时间 */ private long requestTime; /** * 请求标识 */ private String traceId; /** * 方法类型 */ private String methodType; /** * 是否为请求消息 */ private boolean isRequest; private String respCode; private String respMsg; private String json; //getter&setter }
对于接收到的消息体 RpcMessageDto,分发逻辑如下:
/** * 消息的分发 * * @param rpcMessageDto 入参 * @param ctx 上下文 * @return 结果 */ private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) { final String methodType = rpcMessageDto.getMethodType(); final String json = rpcMessageDto.getJson(); String channelId = ChannelUtil.getChannelId(ctx); log.debug("channelId: {} 接收到 method: {} 内容:{}", channelId, methodType, json); // 消息发送 if(MethodType.P_SEND_MESSAGE.equals(methodType)) { // 日志输出 log.info("收到服务端消息: {}", json); // 如果是 broker,应该进行处理化等操作。 MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } throw new UnsupportedOperationException("暂不支持的方法类型"); }
这里对于接收到的消息,只做一个简单的日志输出,后续将添加对应的业务逻辑处理。
收到请求以后,我们需要返回对应的响应。
基于 channel 的回写实现如下:
/** * 结果写回 * * @param req 请求 * @param resp 响应 * @param ctx 上下文 */ private void writeResponse(RpcMessageDto req, Object resp, ChannelHandlerContext ctx) { final String id = ctx.channel().id().asLongText(); RpcMessageDto rpcMessageDto = new RpcMessageDto(); // 响应类消息 rpcMessageDto.setRequest(false); rpcMessageDto.setTraceId(req.getTraceId()); rpcMessageDto.setMethodType(req.getMethodType()); rpcMessageDto.setRequestTime(System.currentTimeMillis()); String json = JSON.toJSONString(resp); rpcMessageDto.setJson(json); // 回写到 client 端 ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto); ctx.writeAndFlush(byteBuf); log.debug("[Server] channel {} response {}", id, JSON.toJSON(rpcMessageDto)); }
为了方便管理异步返回的请求结果,我们统一定义了 IInvokeService 类,用于管理请求与响应。
package com.github.houbb.mq.common.support.invoke; import com.github.houbb.mq.common.rpc.RpcMessageDto; /** * 调用服务接口 * @author binbin.hou * @since 1.0.0 */ public interface IInvokeService { /** * 添加请求信息 * @param seqId 序列号 * @param timeoutMills 超时时间 * @return this * @since 1.0.0 */ IInvokeService addRequest(final String seqId, final long timeoutMills); /** * 放入结果 * @param seqId 唯一标识 * @param rpcResponse 响应结果 * @return this * @since 1.0.0 */ IInvokeService addResponse(final String seqId, final RpcMessageDto rpcResponse); /** * 获取标志信息对应的结果 * @param seqId 序列号 * @return 结果 * @since 1.0.0 */ RpcMessageDto getResponse(final String seqId); }
实现本身也不难。
package com.github.houbb.mq.common.support.invoke.impl; /** * 调用服务接口 * @author binbin.hou * @since 1.0.0 */ public class InvokeService implements IInvokeService { private static final Log logger = LogFactory.getLog(InvokeService.class); /** * 请求序列号 map * (1)这里后期如果要添加超时检测,可以添加对应的超时时间。 * 可以把这里调整为 map * * key: seqId 唯一标识一个请求 * value: 存入该请求最长的有效时间。用于定时删除和超时判断。 * @since 0.0.2 */ private final ConcurrentHashMap<String, Long> requestMap; /** * 响应结果 * @since 1.0.0 */ private final ConcurrentHashMap<String, RpcMessageDto> responseMap; public InvokeService() { requestMap = new ConcurrentHashMap<>(); responseMap = new ConcurrentHashMap<>(); final Runnable timeoutThread = new TimeoutCheckThread(requestMap, responseMap); Executors.newScheduledThreadPool(1) .scheduleAtFixedRate(timeoutThread,60, 60, TimeUnit.SECONDS); } @Override public IInvokeService addRequest(String seqId, long timeoutMills) { logger.debug("[Invoke] start add request for seqId: {}, timeoutMills: {}", seqId, timeoutMills); final long expireTime = System.currentTimeMillis()+timeoutMills; requestMap.putIfAbsent(seqId, expireTime); return this; } @Override public IInvokeService addResponse(String seqId, RpcMessageDto rpcResponse) { // 1. 判断是否有效 Long expireTime = this.requestMap.get(seqId); // 如果为空,可能是这个结果已经超时了,被定时 job 移除之后,响应结果才过来。直接忽略 if(ObjectUtil.isNull(expireTime)) { return this; } //2. 判断是否超时 if(System.currentTimeMillis() > expireTime) { logger.debug("[Invoke] seqId:{} 信息已超时,直接返回超时结果。", seqId); rpcResponse = RpcMessageDto.timeout(); } // 这里放入之前,可以添加判断。 // 如果 seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃。 // 通知所有等待方 responseMap.putIfAbsent(seqId, rpcResponse); logger.debug("[Invoke] 获取结果信息,seqId: {}, rpcResponse: {}", seqId, JSON.toJSON(rpcResponse)); logger.debug("[Invoke] seqId:{} 信息已经放入,通知所有等待方", seqId); // 移除对应的 requestMap requestMap.remove(seqId); logger.debug("[Invoke] seqId:{} remove from request map", seqId); // 同步锁 synchronized (this) { this.notifyAll(); logger.debug("[Invoke] {} notifyAll()", seqId); } return this; } @Override public RpcMessageDto getResponse(String seqId) { try { RpcMessageDto rpcResponse = this.responseMap.get(seqId); if(ObjectUtil.isNotNull(rpcResponse)) { logger.debug("[Invoke] seq {} 对应结果已经获取: {}", seqId, rpcResponse); return rpcResponse; } // 进入等待 while (rpcResponse == null) { logger.debug("[Invoke] seq {} 对应结果为空,进入等待", seqId); // 同步等待锁 synchronized (this) { this.wait(); } logger.debug("[Invoke] {} wait has notified!", seqId); rpcResponse = this.responseMap.get(seqId); logger.debug("[Invoke] seq {} 对应结果已经获取: {}", seqId, rpcResponse); } return rpcResponse; } catch (InterruptedException e) { logger.error("获取响应异常", e); throw new MqException(MqCommonRespCode.RPC_GET_RESP_FAILED); } } }
这里 getResponse 获取不到会进入等待,直到 addResponse 唤醒。
但是这也有一个问题,如果一个请求的响应丢失了怎么办?
总不能一直等待吧。
超时检测线程就可以帮我们处理一些超时未返回的结果。
package com.github.houbb.mq.common.support.invoke.impl; import com.github.houbb.heaven.util.common.ArgUtil; import com.github.houbb.mq.common.rpc.RpcMessageDto; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 超时检测线程 * @author binbin.hou * @since 0.0.2 */ public class TimeoutCheckThread implements Runnable { /** * 请求信息 * @since 0.0.2 */ private final ConcurrentHashMap<String, Long> requestMap; /** * 请求信息 * @since 0.0.2 */ private final ConcurrentHashMap<String, RpcMessageDto> responseMap; /** * 新建 * @param requestMap 请求 Map * @param responseMap 结果 map * @since 0.0.2 */ public TimeoutCheckThread(ConcurrentHashMap<String, Long> requestMap, ConcurrentHashMap<String, RpcMessageDto> responseMap) { ArgUtil.notNull(requestMap, "requestMap"); this.requestMap = requestMap; this.responseMap = responseMap; } @Override public void run() { for(Map.Entry<String, Long> entry : requestMap.entrySet()) { long expireTime = entry.getValue(); long currentTime = System.currentTimeMillis(); if(currentTime > expireTime) { final String key = entry.getKey(); // 结果设置为超时,从请求 map 中移除 responseMap.putIfAbsent(key, RpcMessageDto.timeout()); requestMap.remove(key); } } } }
处理逻辑就是定时检测,如果超时了,就默认设置结果为超时,并且从请求集合中移除。
public class MqProducer extends Thread implements IMqProducer { private static final Log log = LogFactory.getLog(MqProducer.class); /** * 分组名称 */ private final String groupName; /** * 端口号 */ private final int port; /** * 中间人地址 */ private String brokerAddress = ""; /** * channel 信息 * @since 0.0.2 */ private ChannelFuture channelFuture; /** * 客户端处理 handler * @since 0.0.2 */ private ChannelHandler channelHandler; /** * 调用管理服务 * @since 0.0.2 */ private final IInvokeService invokeService = new InvokeService(); /** * 获取响应超时时间 * @since 0.0.2 */ private long respTimeoutMills = 5000; /** * 可用标识 * @since 0.0.2 */ private volatile boolean enableFlag = false; /** * 粘包处理分隔符 * @since 1.0.0 */ private String delimiter = DelimiterUtil.DELIMITER; //set 方法 @Override public synchronized void run() { // 启动服务端 log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}", groupName, port, brokerAddress); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // channel handler this.initChannelHandler(); // 省略,同以前 // 标识为可用 enableFlag = true; } catch (Exception e) { log.error("MQ 生产者启动遇到异常", e); throw new MqException(ProducerRespCode.RPC_INIT_FAILED); } } }
其中初始化 handler 的实现如下:
private void initChannelHandler() { final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(delimiter); final MqProducerHandler mqProducerHandler = new MqProducerHandler(); mqProducerHandler.setInvokeService(invokeService); // handler 实际上会被多次调用,如果不是 @Shareable,应该每次都重新创建。 ChannelHandler handler = new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf)) .addLast(mqProducerHandler); } }; this.channelHandler = handler; }
和消费者处理逻辑类似。
这里最核心的就是添加响应结果:invokeService.addResponse(rpcMessageDto.getTraceId(), rpcMessageDto);
package com.github.houbb.mq.producer.handler; /** * @author binbin.hou * @since 1.0.0 */ public class MqProducerHandler extends SimpleChannelInboundHandler { private static final Log log = LogFactory.getLog(MqProducerHandler.class); /** * 调用管理类 */ private IInvokeService invokeService; public void setInvokeService(IInvokeService invokeService) { this.invokeService = invokeService; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf)msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String text = new String(bytes); log.debug("[Client] channelId {} 接收到消息 {}", ChannelUtil.getChannelId(ctx), text); RpcMessageDto rpcMessageDto = null; try { rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class); } catch (Exception exception) { log.error("RpcMessageDto json 格式转换异常 {}", JSON.parse(bytes)); return; } if(rpcMessageDto.isRequest()) { // 请求类 final String methodType = rpcMessageDto.getMethodType(); final String json = rpcMessageDto.getJson(); } else { // 丢弃掉 traceId 为空的信息 if(StringUtil.isBlank(rpcMessageDto.getTraceId())) { log.debug("[Client] response traceId 为空,直接丢弃", JSON.toJSON(rpcMessageDto)); return; } invokeService.addResponse(rpcMessageDto.getTraceId(), rpcMessageDto); log.debug("[Client] response is :{}", JSON.toJSON(rpcMessageDto)); } } }
关心请求结果的:
public SendResult send(MqMessage mqMessage) { String messageId = IdHelper.uuid32(); mqMessage.setTraceId(messageId); mqMessage.setMethodType(MethodType.P_SEND_MESSAGE); MqCommonResp resp = callServer(mqMessage, MqCommonResp.class); if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { return SendResult.of(messageId, SendStatus.SUCCESS); } return SendResult.of(messageId, SendStatus.FAILED); }
不关心请求结果的发送:
public SendResult sendOneWay(MqMessage mqMessage) { String messageId = IdHelper.uuid32(); mqMessage.setTraceId(messageId); mqMessage.setMethodType(MethodType.P_SEND_MESSAGE); this.callServer(mqMessage, null); return SendResult.of(messageId, SendStatus.SUCCESS); }
其中 callServer 实现如下:
/** * 调用服务端 * @param commonReq 通用请求 * @param respClass 类 * @param <T> 泛型 * @param <R> 结果 * @return 结果 * @since 1.0.0 */ public <T extends MqCommonReq, R extends MqCommonResp> R callServer(T commonReq, Class<R> respClass) { final String traceId = commonReq.getTraceId(); final long requestTime = System.currentTimeMillis(); RpcMessageDto rpcMessageDto = new RpcMessageDto(); rpcMessageDto.setTraceId(traceId); rpcMessageDto.setRequestTime(requestTime); rpcMessageDto.setJson(JSON.toJSONString(commonReq)); rpcMessageDto.setMethodType(commonReq.getMethodType()); rpcMessageDto.setRequest(true); // 添加调用服务 invokeService.addRequest(traceId, respTimeoutMills); // 遍历 channel // 关闭当前线程,以获取对应的信息 // 使用序列化的方式 ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto); //负载均衡获取 channel Channel channel = channelFuture.channel(); channel.writeAndFlush(byteBuf); String channelId = ChannelUtil.getChannelId(channel); log.debug("[Client] channelId {} 发送消息 {}", channelId, JSON.toJSON(rpcMessageDto)); if (respClass == null) { log.debug("[Client] 当前消息为 one-way 消息,忽略响应"); return null; } else { //channelHandler 中获取对应的响应 RpcMessageDto messageDto = invokeService.getResponse(traceId); if (MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())) { throw new MqException(MqCommonRespCode.TIMEOUT); } String respJson = messageDto.getJson(); return JSON.parseObject(respJson, respClass); } }
MqConsumerPush mqConsumerPush = new MqConsumerPush(); mqConsumerPush.start();
启动日志如下:
[DEBUG] [2022-04-21 19:55:26.346] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2022-04-21 19:55:26.369] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress: [INFO] [2022-04-21 19:55:27.845] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动完成,监听【9527】端口
MqProducer mqProducer = new MqProducer(); mqProducer.start(); //等待启动完成 while (!mqProducer.isEnableFlag()) { System.out.println("等待初始化完成..."); DateUtil.sleep(100); } String message = "HELLO MQ!"; MqMessage mqMessage = new MqMessage(); mqMessage.setTopic("TOPIC"); mqMessage.setTags(Arrays.asList("TAGA", "TAGB")); mqMessage.setPayload(message.getBytes(StandardCharsets.UTF_8)); SendResult sendResult = mqProducer.send(mqMessage); System.out.println(JSON.toJSON(sendResult));
生产者日志:
[INFO] [2022-04-21 19:56:39.609] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端完成,监听端口:9527 ... [DEBUG] [2022-04-21 19:56:39.895] [main] [c.g.h.m.c.s.i.i.InvokeService.addRequest] - [Invoke] start add request for seqId: a70ea2c4325641d6a5b198323228dc24, timeoutMills: 5000 ... [DEBUG] [2022-04-21 19:56:40.282] [main] [c.g.h.m.c.s.i.i.InvokeService.getResponse] - [Invoke] seq a70ea2c4325641d6a5b198323228dc24 对应结果已经获取: com.github.houbb.mq.common.rpc.RpcMessageDto@a8f0b4 ... {"messageId":"a70ea2c4325641d6a5b198323228dc24","status":"SUCCESS"}
消费者日志:
[DEBUG] [2022-04-21 19:56:40.179] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.dispatch] - channelId: 502b73fffec4485c-00003954-00000001-384d194f6233433e-c8246542 接收到 method: P_SEND_MESSAGE 内容:{"methodType":"P_SEND_MESSAGE","payload":"SEVMTE8gTVEh","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"a70ea2c4325641d6a5b198323228dc24"} [INFO] [2022-04-21 19:56:40.180] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.dispatch] - 收到服务端消息: {"methodType":"P_SEND_MESSAGE","payload":"SEVMTE8gTVEh","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"a70ea2c4325641d6a5b198323228dc24"} [DEBUG] [2022-04-21 19:56:40.234] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.writeResponse] - [Server] channel 502b73fffec4485c-00003954-00000001-384d194f6233433e-c8246542 response {"requestTime":1650542200182,"traceId":"a70ea2c4325641d6a5b198323228dc24","request":false,"methodType":"P_SEND_MESSAGE","json":"{"respCode":"0000","respMessage":"成功"}"}
可以看到消费者成功的获取到了生产者的消息。
到这里,我们就实现了一个消息生产者调用消费者的实现。
但是你可能会问,这不就是 rpc 吗?
没有解耦。
是的,为了解决耦合问题,我们将在下一节引入 broker 消息的中间人。
希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
我是老马,期待与你的下次重逢。
The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq
rpc-从零开始实现 rpc https://github.com/houbb/rpc