导读
最新公司ES集群老出现连接关闭,进而导致查询|写入ES时报错,报错日志显示如下
[2m2022-10-23 14:13:10.088[0;39m - [31mERROR[0;39m - [35m[NONE][NONE][NONE][0][1584065372948234240][0;39m - [2m[XNIO-1 task-3][0;39m - [36mcom.mall.search.service.EsRestService:235[0;39m [2m:[0;39m 添加文档失败:{} java.io.IOException: Connection reset by peer at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:964) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:233) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1448) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1418) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1388) at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836) at sun.reflect.GeneratedMethodAccessor117.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) at javax.servlet.http.HttpServlet.service(HttpServlet.java:523) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) at javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at io.undertow.servlet.handlers.ServletHandler.handleRequest(ServletHandler.java:74) at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:129) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61) at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61) at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131) at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:108) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61) at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61) at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131) at io.undertow.servlet.handlers.FilterHandler.handleRequest(FilterHandler.java:84) at io.undertow.servlet.handlers.security.ServletSecurityRoleHandler.handleRequest(ServletSecurityRoleHandler.java:62) at io.undertow.servlet.handlers.ServletChain$1.handleRequest(ServletChain.java:68) at io.undertow.servlet.handlers.ServletDispatchingHandler.handleRequest(ServletDispatchingHandler.java:36) at io.undertow.servlet.handlers.RedirectDirHandler.handleRequest(RedirectDirHandler.java:68) at io.undertow.servlet.handlers.security.SSLInformationAssociationHandler.handleRequest(SSLInformationAssociationHandler.java:132) at io.undertow.servlet.handlers.security.ServletAuthenticationCallHandler.handleRequest(ServletAuthenticationCallHandler.java:57) at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43) at io.undertow.security.handlers.AbstractConfidentialityHandler.handleRequest(AbstractConfidentialityHandler.java:46) at io.undertow.servlet.handlers.security.ServletConfidentialityConstraintHandler.handleRequest(ServletConfidentialityConstraintHandler.java:64) at io.undertow.security.handlers.AuthenticationMechanismsHandler.handleRequest(AuthenticationMechanismsHandler.java:60) at io.undertow.servlet.handlers.security.CachedAuthenticatedSessionHandler.handleRequest(CachedAuthenticatedSessionHandler.java:77) at io.undertow.security.handlers.AbstractSecurityContextAssociationHandler.handleRequest(AbstractSecurityContextAssociationHandler.java:43) at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43) at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43) at io.undertow.servlet.handlers.ServletInitialHandler.handleFirstRequest(ServletInitialHandler.java:269) at io.undertow.servlet.handlers.ServletInitialHandler.access$100(ServletInitialHandler.java:78) at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:133) at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:130) at io.undertow.servlet.core.ServletRequestContextThreadSetupAction$1.call(ServletRequestContextThreadSetupAction.java:48) at io.undertow.servlet.core.ContextClassLoaderSetupAction$1.call(ContextClassLoaderSetupAction.java:43) at io.undertow.servlet.handlers.ServletInitialHandler.dispatchRequest(ServletInitialHandler.java:249) at io.undertow.servlet.handlers.ServletInitialHandler.access$000(ServletInitialHandler.java:78) at io.undertow.servlet.handlers.ServletInitialHandler$1.handleRequest(ServletInitialHandler.java:99) at io.undertow.server.Connectors.executeRootHandler(Connectors.java:376) at io.undertow.server.HttpServerExchange$1.run(HttpServerExchange.java:830) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.http.impl.nio.conn.LoggingIOSession$LoggingByteChannel.read(LoggingIOSession.java:204) at org.apache.http.impl.nio.reactor.SessionInputBufferImpl.fill(SessionInputBufferImpl.java:231) at org.apache.http.impl.nio.codecs.AbstractMessageParser.fillBuffer(AbstractMessageParser.java:136) at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:241) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ... 1 common frames omitted
优化&解决
因为这个项目是助农app(抖音+淘宝,杂交版app),app首页下拉获取最新推荐视频,需要通过第三方极光拉取推荐视频,存入本地数据库mysql后;一次拉取20条,然后通过线程池异步写入ES,大概架构如下
初步判定,可能是app随着用户体量上去,线程池异步逐条写入ES,造成ES大量的写请求,然后将架构改为如图
就这样以为问题就解决了呢,第二天去服务器里拉下日志,发现之前的问题还是存在,就上百度,github,ES官网,很多人遇到相同问题,各种解决方案,对我们这套架构来说,都不管用。大概意思是ES客户端如果没有设置KeepAlive的话,默认为-1就是永不过期,然后就设置ES客户端的KeepAlive时间,问题还是复现,这边最终做了如下修改,从此ES客户端连接关闭问题就没有复现过
我们是通过Nginx反向代理,构建一个ES集群,架构如下
问题修复
步骤一
修改客户端tcp keepalive,系统默认为7200,修改为300(单位秒,5分钟)
修改为300
[root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time 7200 [root@ybchen-1 ~]# [root@ybchen-1 ~]# [root@ybchen-1 ~]# [root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time 7200 [root@ybchen-1 ~]# echo 300 > /proc/sys/net/ipv4/tcp_keepalive_time [root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time 300 [root@ybchen-1 ~]# [root@ybchen-1 ~]# [root@ybchen-1 ~]# [root@ybchen-1 ~]#
步骤二
修改ES客户端的KeepAlive时间,并添加线程池大小等参数
=========ES配置类-开始============= @Data @Component @ConfigurationProperties(prefix = "elastic.search") public class EsConfiguration { //主机 private String host; //端口 private int port; //集群名称 private String clusterName; //访问协议,如:http private String schema; //用户名 private String userName; //密码 private String password; } =========ES配置类-结束============= ============ES客户端-开始======================= @Autowired EsConfiguration esConfiguration; private static RestHighLevelClient restHighLevelClient; public RestHighLevelClient getRestClient() { if (null != restHighLevelClient) { return restHighLevelClient; } List<HttpHost> httpHosts = new ArrayList<>(); //填充数据 httpHosts.add(new HttpHost(esConfiguration.getHost(), esConfiguration.getPort())); //填充host节点 RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0])); if (StringUtils.isNotBlank(esConfiguration.getUserName()) && StringUtils.isNotBlank(esConfiguration.getPassword())) { //填充用户名密码 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfiguration.getUserName(), esConfiguration.getPassword())); //异步链接延时配置 builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder .setConnectTimeout(5000) //5秒 .setSocketTimeout(5000) .setConnectionRequestTimeout(5000) ); //异步链接数配置 builder.setHttpClientConfigCallback(httpClientBuilder -> { //最大连接数100个 httpClientBuilder.setMaxConnTotal(100); //最大路由连接数 httpClientBuilder.setMaxConnPerRoute(100); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); // 设置KeepAlive为5分钟的时间,不设置默认为-1,也就是持续连接,然而这会受到外界的影响比如Firewall,会将TCP连接单方面断开,从而会导致Connection reset by peer的报错 // 参考github解决方案:https://github.com/TFdream/Elasticsearch-learning/issues/30 httpClientBuilder.setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(3)) .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).setSoKeepAlive(true).build()); return httpClientBuilder; }); } restHighLevelClient = new RestHighLevelClient(builder); return restHighLevelClient; } ==============ES客户端-结束================ ========== ES客户端使用示例 =============== @Data @ApiModel("新增/更新ES通用数据结构") public class DocSearchVo implements Serializable { @ApiModelProperty(value = "文档ID") private String docId; @ApiModelProperty(value = "文档JSON") private String docStrJson; } /** * 批量添加文档 * * @param indexName * @param docSearchVoList * @return */ public boolean batchIndexDoc(String indexName, List<DocSearchVo> docSearchVoList) { RestHighLevelClient client = getRestClient(); BulkRequest request = new BulkRequest(); for (DocSearchVo docSearchVo : docSearchVoList) { IndexRequest indexRequest = new IndexRequest(indexName).id(docSearchVo.getDocId()).source(docSearchVo.getDocStrJson(), XContentType.JSON); request.add(indexRequest); } client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkItemResponses) { log.debug("异步批量添加文档成功,indexName:{},docSearchVoList:{}", indexName, docSearchVoList); } @Override public void onFailure(Exception e) { log.error("异步批量添加文档失败,indexName:{},docSearchVoList:{},错误信息:{}", indexName, docSearchVoList, e); } }); return true; }
注:细心的童鞋已经发现,创建ES客户端的时候,不是线程安全的单例模式(这块别的同事写的,我只是负责修改这个bug,然后就没管这个线程安全问题,其实是来背锅的,呜呜呜~~~~~~)
步骤三
添加ES客户端心跳检查,30秒一次
@Component @Slf4j public class EsSchedule { @Autowired EsRestService esRestService; /** * 30秒一次检查es状态 */ @Scheduled(fixedRate = 30 * 1000) public void heartbeatToES() { try { RequestOptions requestOptions = RequestOptions.DEFAULT.toBuilder().build(); boolean result = esRestService.getRestClient().ping(requestOptions); log.info("检查ES状态:{}", result); } catch (Exception e) { log.error("检查ES状态发生异常:{}", e); } } }
搞定~