【mq】从零开始实现 mq-02-如何实现生产者调用消费者?
【mq】从零开始实现 mq-03-引入 broker 中间人
【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat
心跳(heartbeat ),顾名思义就是心脏的跳动。
医学上一般通过心跳是否跳动,来判断一个人是否活着。
那么,分布式服务中如何判断一个服务是否还活着呢?
比如 mq 中,broker 需要把消息实时推送给在线的消费者。
那么如何判断一个消费者是否活着呢?
我们可以让消费者定时,比如每 5 秒钟给 broker 发送一个心跳包,考虑到网络延迟等,如果连续 1min 都没有收到心跳,我们则移除这个消费者,认为服务已经挂了。
上代码!
心跳可以是一个很简单的消息体。
@Override public void heartbeat() { final MqHeartBeatReq req = new MqHeartBeatReq(); final String traceId = IdHelper.uuid32(); req.setTraceId(traceId); req.setMethodType(MethodType.C_HEARTBEAT); req.setAddress(NetUtil.getLocalHost()); req.setPort(0); req.setTime(System.currentTimeMillis()); log.debug("[HEARTBEAT] 往服务端发送心跳包 {}", JSON.toJSON(req)); // 通知全部 for(RpcChannelFuture channelFuture : channelFutureList) { try { Channel channel = channelFuture.getChannelFuture().channel(); callServer(channel, req, null); } catch (Exception exception) { log.error("[HEARTBEAT] 往服务端处理异常", exception); } } }
消费者把心跳通知所有的 broker.
我们启动一个定时任务,5S 钟执行一次。
/** * 初始化心跳 * @since 0.0.6 */ private void initHeartbeat() { //5S 发一次心跳 scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { heartbeat(); } }, 5, 5, TimeUnit.SECONDS); }
心跳是在连接到 broker 之后就开始启动:
@Override public void initChannelFutureList(ConsumerBrokerConfig config) { //1. 配置初始化 //... //2. 初始化 this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress, initChannelHandler(), check); //3. 初始化心跳 this.initHeartbeat(); }
消费者定时发送消息,生产者肯定是需要接受的。
为了简单,我们让心跳是 ONE-WAY 的。
// 消费者心跳 if(MethodType.C_HEARTBEAT.equals(methodType)) { MqHeartBeatReq req = JSON.parseObject(json, MqHeartBeatReq.class); registerConsumerService.heartbeat(req, channel); return null; }
每次收到消息,我们把请求的 channelId 记录下来,并保存最新的访问时间
@Override public void heartbeat(MqHeartBeatReq mqHeartBeatReq, Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); log.info("[HEARTBEAT] 接收消费者心跳 {}, channelId: {}", JSON.toJSON(mqHeartBeatReq), channelId); ServiceEntry serviceEntry = new ServiceEntry(); serviceEntry.setAddress(mqHeartBeatReq.getAddress()); serviceEntry.setPort(mqHeartBeatReq.getPort()); BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel); entryChannel.setLastAccessTime(mqHeartBeatReq.getTime()); heartbeatMap.put(channelId, entryChannel); }
如果一些消费者长时间没有心跳,我们就认为服务已经挂了。
在 LocalBrokerConsumerService
服务启动的时候,同时启用一个定时清理任务。
public LocalBrokerConsumerService() { //120S 扫描一次 final long limitMills = 2 * 60 * 1000; scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { for(Map.Entry<String, BrokerServiceEntryChannel> entry : heartbeatMap.entrySet()) { String key = entry.getKey(); long lastAccessTime = entry.getValue().getLastAccessTime(); long currentTime = System.currentTimeMillis(); if(currentTime - lastAccessTime > limitMills) { removeByChannelId(key); } } } }, 2 * 60, 2 * 60, TimeUnit.SECONDS); }
这个任务 2min 执行一次,如果 2min 都没有心跳,这移除对应的消费者。
心跳,是网络传输中验证服务可用性非常简单,但是有效的方式。
希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
我是老马,期待与你的下次重逢。
The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq
rpc-从零开始实现 rpc https://github.com/houbb/rpc