当前位置: 首页 > 知识库问答 >
问题:

使用弹性RestHighLevelClient批量请求时的ConnectionClosedException

薛烈
2023-03-14

我正在为弹性搜索中的RestHighLevelClient使用以下代码。

val credentialsProvider = new BasicCredentialsProvider
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(conf.value.getString("elkUserName"), conf.value.getString("elkPassword")))
val builder = RestClient.builder(new HttpHost(conf.value.getString("elkIp"), Integer.valueOf(conf.value.getString("elkPort"))))
    .setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
        //set timeout
        override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = requestConfigBuilder.setConnectTimeout(Integer.valueOf(conf.value.getString("elkWriteTimeOut"))).setSocketTimeout(Integer.valueOf(conf.value.getString("elkWriteTimeOut")))
    }).setHttpClientConfigCallback(new HttpClientConfigCallback() {
        override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
        }
    })
client = new RestHighLevelClient(builder)
val requestBuilder = RequestOptions.DEFAULT.toBuilder
requestBuilder.setHttpAsyncResponseConsumerFactory(
    new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(1024 * 1024 * 1024))


var request = new BulkRequest()
request.setRefreshPolicy("wait_for")
var sizeOfRequest = 1 L
newListOfMap.foreach {
    vals =>
        val newMap = vals.asJava
    request.add(new IndexRequest(indexName).source(newMap))

}
client.bulk(request, requestBuilder.build)

但我遇到以下异常

java.lang.NoSuchMethodError:
org.apache.http.ConnectionClosedException: method <init>()V not found
  at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:356)
  at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
  at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
  at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
  at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
  at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
  at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
  at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
  at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
  at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
  at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
  at java.lang.Thread.run(Thread.java:748)
org.apache.http.ConnectionClosedException: Connection closed
unexpectedly at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:778)
  at org.elasticsearch.client.RestClient.performRequest(RestClient.java:218)
  at org.elasticsearch.client.RestClient.performRequest(RestClient.java:205)
  at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1454)
  at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424)
  at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394)
  at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:492)
  at Utils.ELKUtil$.postDataToELK(ELKUtil.scala:59)

注意:上面的代码适用于较小大小的请求,但在发布较大大小的请求时会出现上述错误。请建议。

共有1个答案

彭星津
2023-03-14

如果您的项目同时使用httpcorehttpcore-nio依赖项,请确保它们的两个版本同时为<=4.4.10或>4.4.10

Harshit的建议是造成我这个问题的部分原因。仔细检查日志,可以明显看出HTTPASyncRequestExecutor正在调用一个名为ConnectionClosedException的默认构造函数,但是,目标中不存在此默认构造函数

HttpAsyncRequestExecutorHttpCore-NIO包的类。ConnectionClosedExceptionHttpCore包的类。此问题在V4.4.10之后开始出现,超过这一点,ConnectionClosedException类包含了一个默认构造函数,HttpAsyncRequestExecutor会调用该构造函数。对于版本<=4.4.10,默认构造函数不存在,但是,它是用HTTPASyncRequestExecutor中的一个参数调用的。因此,当两个库一起使用时,它们的版本应该高于或低于V4.4.10。

 类似资料:
  • 使用RestHighLevelRestClient使用ElasticSearch批量插入时出现异常。 ElasticsearchStatusException[无法解析响应体];嵌套:ResponseException[method[POST],host[http:x.com],URI[/_bulk?timeout=1m],状态行[http/1.1 413请求实体太大]{“消息”:“请求大小超过1

  • 我使用elastic beanstalk(Tomcat8环境)部署了一个webapp。我的一个REST API请求(在服务器上)需要大约2分钟才能回复到客户端。请注意,这是一个涉及多个第三方系统调用的批量请求,因此最多需要2分钟,这对业务来说是可以接受的(至少目前是这样)。然而,我总是在60秒后在客户端收到以下错误: 问题是60秒的超时值来自何处,即弹性豆茎中60秒的超时值从何处更改?如何在ela

  • 批量调用 TOP 接口 参数 名称 类型 是否可选 含义 options Object 选项 options.query Array 请求参数数组 options.query[].topOptions Object 请求参数 options.query[].topOptions.method String TOP 接口名称 options.success Function optional 调用成

  • web3.BatchRequest类用来创建并执行批请求。 调用: new web3.BatchRequest() new web3.eth.BatchRequest() new web3.shh.BatchRequest() new web3.bzz.BatchRequest() 参数: 无 返回值: 一个对象,具有如下方法: add(request): 将请求对象添加到批调用中 execut

  • 一种写法同时支持 Curl 和 Swoole use \Yurun\Util\YurunHttp\Co\Batch; use \Yurun\Util\HttpRequest; $result = Batch::run([ (new HttpRequest)->url('https://www.imiphp.com'), (new HttpRequest)->url('https:

  • 这是在收到以db为单位的帖子列表后发送带有每个帖子链接数据的POST请求的代码。 在使用每个链接请求帖子后,从回复中提取playerCount并将其更新到每个帖子。 我在这段代码中使用Resttemplate,但有一个问题需要花费太长时间。 所以我想把这个代码改为一次发送一个请求,并在所有请求完成后更新每个帖子。 我怎样才能把这个代码转换成我想要的? 我将使用此代码作为计划任务。