当前位置: 首页 > 知识库问答 >
问题:

批量请求在 ElasticSinkConnector 中失败

华子航
2023-03-14

创建elasticsinkconnector时出现以下错误。

使用(" type.name"= '_doc '," input.data.format"= 'AVRO '," CONNECTOR . class " = ' io . confluent . connect . elasticsearch . elasticsearchsinkconnector '," tasks.max"= '1 '," transforms " = ' deadline '," topics"= 'es.contact.model '," transforms "创建源连接器< code > testdemosinkconnector 。dealerance . type " = ' io . confluent . connect . transforms . extract topic $ Value '," transforms。dealerance . field " = ' indexTopicName ',"转换。deadliner . skip . missing . or . null " = ' true '," connection . URL " = ' https://elastic searchdemo . es . us-central 1 . GCP . cloud . es . io:9243 '," connection.username"= 'elastic '," connecthtml" target="_blank">ion . password " = ' bugbxobg 3d v4 jp 4 z 3 w7 P4 thc '," key.ignore"= 'true '," value . converter " = ' io . confluent . connect . avro . avro converter '," value.converter.schemas.enable

错误是,

FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
                at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
                at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
                at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                at java.base/java.lang.Thread.run(Thread.java:829)
        Caused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed
                at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:397)
                at org.elasticsearch.action.bulk.BulkRequestHandler$1.onFailure(BulkRequestHandler.java:70)
                at org.elasticsearch.action.ActionListener$5.onFailure(ActionListener.java:258)
                at org.elasticsearch.action.bulk.Retry$RetryHandler.onFailure(Retry.java:126)
                at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:174)
                ... 5 more
        Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to execute bulk request due to 'java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=https://elasticsearchdemo.es.us-central1.gcp.cloud.es.io:9243, response=HTTP/1.1 200 OK}' after 6 attempt(s)
                at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:165)
                at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:119)
                at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:425)
                at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:168)
                ... 5 more
        Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=https://elasticsearchdemo.es.us-central1.gcp.cloud.es.io:9243, response=HTTP/1.1 200 OK}
                at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1632)
                at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1583)
                at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1553)
                at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:533)
                at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:170)
                at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:158)
                ... 8 more
        Caused by: java.lang.NullPointerException
                at java.base/java.util.Objects.requireNonNull(Objects.java:221)
                at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127)
                at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:54)
                at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:39)
                at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:107)
                at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:104)
                at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:159)
                at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:196)
                at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1892)
                at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:1554)
                at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1630)
                13 more
        
   

请帮我解决这个错误。弹性接收器连接器版本:11.1.10弹性搜索版本:8.2.2

共有1个答案

秦滨海
2023-03-14

Elasticsearch版本8不被合流的Elasticsearch接收器连接器版本11.1.10支持,这很可能是它不能正确解析Elasticsearch响应的原因

从版本 11.0.0 开始,连接器使用 Elasticsearch 高级 REST 客户端(版本 7.0.1),这意味着仅支持 Elasticsearch 7.x。

https://docs.confluent.io/kafka-connect-elasticsearch/current/overview.html

 类似资料:
  • 批量调用 TOP 接口 参数 名称 类型 是否可选 含义 options Object 选项 options.query Array 请求参数数组 options.query[].topOptions Object 请求参数 options.query[].topOptions.method String TOP 接口名称 options.success Function optional 调用成

  • web3.BatchRequest类用来创建并执行批请求。 调用: new web3.BatchRequest() new web3.eth.BatchRequest() new web3.shh.BatchRequest() new web3.bzz.BatchRequest() 参数: 无 返回值: 一个对象,具有如下方法: add(request): 将请求对象添加到批调用中 execut

  • 一种写法同时支持 Curl 和 Swoole use \Yurun\Util\YurunHttp\Co\Batch; use \Yurun\Util\HttpRequest; $result = Batch::run([ (new HttpRequest)->url('https://www.imiphp.com'), (new HttpRequest)->url('https:

  • 问题内容: 我已经实现了当前的一组路由(例如): 他们工作得很漂亮。现在,假设我要为同一API实现“批处理终结点”。它看起来应该像这样: 身体应该像这样: 为此,我想知道如何调用播放框架路由器来传递这些请求?我打算使用与单元测试建议类似的方法: 通过进入的源代码,您会发现如下所示: 所以我的问题是:与复制上面的代码相比,用Play做到这一点的方式是否更简单(我不反对将Scala和Java混合使用)

  • 问题内容: 我最近升级到了Elasticsearch版本6.1.1,现在我无法从JSON文件批量索引文档。当我内联完成时,它可以正常工作。以下是文档的内容: 当我运行此命令时, 我收到此错误: 如果我以内联方式和在Elasticsearch 5.x中发送数据,效果很好。我尝试将换行符以及换行符添加到文件末尾。似乎不起作用。 问题答案: 在JSON文件的末尾添加 空 行并保存文件,然后尝试运行以下命

  • 问题内容: 我想知道如何以n为一组进行ajax调用。 这是我的用例: 我有一个显示使用情况数据的表。您可以钻取每一行,如果每行都有一个可以更深入钻取的公共属性,则可以选择一次钻取所有它们。对于每一行,都会进行ajax调用以获取要附加到表中的数据。 在某些情况下,最多可以同时钻取50行。可以想象,这给服务器带来了很大压力。我如何最好地以较小的批次发送这些呼叫,这些呼叫在等待这些批次之后才能启动? 我