ES 客户端 RestHighLevelClient Connection reset by peer 亲测有效 2022-11-05

导读

  最新公司ES集群老出现连接关闭,进而导致查询|写入ES时报错,报错日志显示如下

ES 客户端 RestHighLevelClient Connection reset by peer 亲测有效 2022-11-05

[2m2022-10-23 14:13:10.088 - ERROR - [NONE][NONE][NONE][0][1584065372948234240] - [XNIO-1 task-3] - com.mall.search.service.EsRestService:235 : 添加文档失败:{}  java.io.IOException: Connection reset by peer     at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:964)     at org.elasticsearch.client.RestClient.performRequest(RestClient.java:233)     at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1448)     at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1418)     at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1388)     at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836)     at sun.reflect.GeneratedMethodAccessor117.invoke(Unknown Source)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke(Method.java:498)     at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)     at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)     at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)     at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)     at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792)     at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)     at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)     at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)     at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)     at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)     at javax.servlet.http.HttpServlet.service(HttpServlet.java:523)     at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)     at javax.servlet.http.HttpServlet.service(HttpServlet.java:590)     at io.undertow.servlet.handlers.ServletHandler.handleRequest(ServletHandler.java:74)     at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:129)     at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)     at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)     at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)     at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)     at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)     at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)     at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)     at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)     at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:108)     at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)     at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)     at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)     at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)     at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)     at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)     at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)     at io.undertow.servlet.handlers.FilterHandler.handleRequest(FilterHandler.java:84)     at io.undertow.servlet.handlers.security.ServletSecurityRoleHandler.handleRequest(ServletSecurityRoleHandler.java:62)     at io.undertow.servlet.handlers.ServletChain$1.handleRequest(ServletChain.java:68)     at io.undertow.servlet.handlers.ServletDispatchingHandler.handleRequest(ServletDispatchingHandler.java:36)     at io.undertow.servlet.handlers.RedirectDirHandler.handleRequest(RedirectDirHandler.java:68)     at io.undertow.servlet.handlers.security.SSLInformationAssociationHandler.handleRequest(SSLInformationAssociationHandler.java:132)     at io.undertow.servlet.handlers.security.ServletAuthenticationCallHandler.handleRequest(ServletAuthenticationCallHandler.java:57)     at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)     at io.undertow.security.handlers.AbstractConfidentialityHandler.handleRequest(AbstractConfidentialityHandler.java:46)     at io.undertow.servlet.handlers.security.ServletConfidentialityConstraintHandler.handleRequest(ServletConfidentialityConstraintHandler.java:64)     at io.undertow.security.handlers.AuthenticationMechanismsHandler.handleRequest(AuthenticationMechanismsHandler.java:60)     at io.undertow.servlet.handlers.security.CachedAuthenticatedSessionHandler.handleRequest(CachedAuthenticatedSessionHandler.java:77)     at io.undertow.security.handlers.AbstractSecurityContextAssociationHandler.handleRequest(AbstractSecurityContextAssociationHandler.java:43)     at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)     at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)     at io.undertow.servlet.handlers.ServletInitialHandler.handleFirstRequest(ServletInitialHandler.java:269)     at io.undertow.servlet.handlers.ServletInitialHandler.access$100(ServletInitialHandler.java:78)     at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:133)     at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:130)     at io.undertow.servlet.core.ServletRequestContextThreadSetupAction$1.call(ServletRequestContextThreadSetupAction.java:48)     at io.undertow.servlet.core.ContextClassLoaderSetupAction$1.call(ContextClassLoaderSetupAction.java:43)     at io.undertow.servlet.handlers.ServletInitialHandler.dispatchRequest(ServletInitialHandler.java:249)     at io.undertow.servlet.handlers.ServletInitialHandler.access$000(ServletInitialHandler.java:78)     at io.undertow.servlet.handlers.ServletInitialHandler$1.handleRequest(ServletInitialHandler.java:99)     at io.undertow.server.Connectors.executeRootHandler(Connectors.java:376)     at io.undertow.server.HttpServerExchange$1.run(HttpServerExchange.java:830)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)     at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Connection reset by peer     at sun.nio.ch.FileDispatcherImpl.read0(Native Method)     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)     at sun.nio.ch.IOUtil.read(IOUtil.java:197)     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)     at org.apache.http.impl.nio.conn.LoggingIOSession$LoggingByteChannel.read(LoggingIOSession.java:204)     at org.apache.http.impl.nio.reactor.SessionInputBufferImpl.fill(SessionInputBufferImpl.java:231)     at org.apache.http.impl.nio.codecs.AbstractMessageParser.fillBuffer(AbstractMessageParser.java:136)     at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:241)     at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)     at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)     at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)     at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)     at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)     at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)     at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)     at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)     at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)     ... 1 common frames omitted

优化&解决

  因为这个项目是助农app(抖音+淘宝,杂交版app),app首页下拉获取最新推荐视频,需要通过第三方极光拉取推荐视频,存入本地数据库mysql后;一次拉取20条,然后通过线程池异步写入ES,大概架构如下

ES 客户端 RestHighLevelClient Connection reset by peer 亲测有效 2022-11-05

  初步判定,可能是app随着用户体量上去,线程池异步逐条写入ES,造成ES大量的写请求,然后将架构改为如图

ES 客户端 RestHighLevelClient Connection reset by peer 亲测有效 2022-11-05

  就这样以为问题就解决了呢,第二天去服务器里拉下日志,发现之前的问题还是存在,就上百度,github,ES官网,很多人遇到相同问题,各种解决方案,对我们这套架构来说,都不管用。大概意思是ES客户端如果没有设置KeepAlive的话,默认为-1就是永不过期,然后就设置ES客户端的KeepAlive时间,问题还是复现,这边最终做了如下修改,从此ES客户端连接关闭问题就没有复现过

  我们是通过Nginx反向代理,构建一个ES集群,架构如下

ES 客户端 RestHighLevelClient Connection reset by peer 亲测有效 2022-11-05

问题修复

步骤一

  修改客户端tcp keepalive,系统默认为7200,修改为300(单位秒,5分钟)

ES 客户端 RestHighLevelClient Connection reset by peer 亲测有效 2022-11-05

修改为300

ES 客户端 RestHighLevelClient Connection reset by peer 亲测有效 2022-11-05

[root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time  7200 [root@ybchen-1 ~]#  [root@ybchen-1 ~]#  [root@ybchen-1 ~]#  [root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time  7200 [root@ybchen-1 ~]# echo 300 > /proc/sys/net/ipv4/tcp_keepalive_time  [root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time  300 [root@ybchen-1 ~]#  [root@ybchen-1 ~]#  [root@ybchen-1 ~]#  [root@ybchen-1 ~]# 

步骤二

  修改ES客户端的KeepAlive时间,并添加线程池大小等参数

=========ES配置类-开始=============  @Data @Component @ConfigurationProperties(prefix = "elastic.search") public class EsConfiguration {     //主机     private String host;     //端口     private int port;     //集群名称     private String clusterName;     //访问协议,如:http     private String schema;     //用户名     private String userName;     //密码     private String password; }  =========ES配置类-结束=============  ============ES客户端-开始=======================     @Autowired     EsConfiguration esConfiguration;      private static RestHighLevelClient restHighLevelClient;      public RestHighLevelClient getRestClient() {          if (null != restHighLevelClient) {             return restHighLevelClient;         }         List<HttpHost> httpHosts = new ArrayList<>();         //填充数据         httpHosts.add(new HttpHost(esConfiguration.getHost(), esConfiguration.getPort()));         //填充host节点         RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));          if (StringUtils.isNotBlank(esConfiguration.getUserName()) && StringUtils.isNotBlank(esConfiguration.getPassword())) {             //填充用户名密码             final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();             credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfiguration.getUserName(), esConfiguration.getPassword()));             //异步链接延时配置             builder.setRequestConfigCallback(requestConfigBuilder ->                     requestConfigBuilder                             .setConnectTimeout(5000) //5秒                             .setSocketTimeout(5000)                             .setConnectionRequestTimeout(5000)             );             //异步链接数配置             builder.setHttpClientConfigCallback(httpClientBuilder -> {                 //最大连接数100个                 httpClientBuilder.setMaxConnTotal(100);                 //最大路由连接数                 httpClientBuilder.setMaxConnPerRoute(100);                 httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);                 // 设置KeepAlive为5分钟的时间,不设置默认为-1,也就是持续连接,然而这会受到外界的影响比如Firewall,会将TCP连接单方面断开,从而会导致Connection reset by peer的报错                 // 参考github解决方案:https://github.com/TFdream/Elasticsearch-learning/issues/30                 httpClientBuilder.setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(3))                         .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).setSoKeepAlive(true).build());                 return httpClientBuilder;             });         }          restHighLevelClient = new RestHighLevelClient(builder);         return restHighLevelClient;     }  ==============ES客户端-结束================  ========== ES客户端使用示例 =============== @Data @ApiModel("新增/更新ES通用数据结构") public class DocSearchVo implements Serializable {      @ApiModelProperty(value = "文档ID")     private String docId;      @ApiModelProperty(value = "文档JSON")     private String docStrJson;  }       /**      * 批量添加文档      *      * @param indexName      * @param docSearchVoList      * @return      */     public boolean batchIndexDoc(String indexName, List<DocSearchVo> docSearchVoList) {         RestHighLevelClient client = getRestClient();         BulkRequest request = new BulkRequest();         for (DocSearchVo docSearchVo : docSearchVoList) {             IndexRequest indexRequest = new IndexRequest(indexName).id(docSearchVo.getDocId()).source(docSearchVo.getDocStrJson(), XContentType.JSON);             request.add(indexRequest);         }         client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {             @Override             public void onResponse(BulkResponse bulkItemResponses) {                 log.debug("异步批量添加文档成功,indexName:{},docSearchVoList:{}", indexName, docSearchVoList);             }              @Override             public void onFailure(Exception e) {                 log.error("异步批量添加文档失败,indexName:{},docSearchVoList:{},错误信息:{}", indexName, docSearchVoList, e);             }         });         return true;     }

  注:细心的童鞋已经发现,创建ES客户端的时候,不是线程安全的单例模式(这块别的同事写的,我只是负责修改这个bug,然后就没管这个线程安全问题,其实是来背锅的,呜呜呜~~~~~~)

步骤三

  添加ES客户端心跳检查,30秒一次

@Component @Slf4j public class EsSchedule {     @Autowired     EsRestService esRestService;      /**      * 30秒一次检查es状态      */     @Scheduled(fixedRate = 30 * 1000)     public void heartbeatToES() {         try {             RequestOptions requestOptions = RequestOptions.DEFAULT.toBuilder().build();             boolean result = esRestService.getRestClient().ping(requestOptions);             log.info("检查ES状态:{}", result);         } catch (Exception e) {             log.error("检查ES状态发生异常:{}", e);         }     } }

搞定~

发表评论

相关文章

当前内容话题