当前位置: 首页 > 知识库问答 >
问题:

使用ElasticSearch6.0.0接收器NoSuchMethoderRror BulkProcessor.Builder闪烁

姬昊焱
2023-03-14

我尝试用elasticsearch(版本为6.0.0)Sink构建一个flink流式单词计数演示。不幸的是出现了以下错误。这似乎是依赖关系。

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.builder(Ljava/util/function/BiConsumer;Lorg/elasticsearch/action/bulk/BulkProcessor$Listener;)Lorg/elasticsearch/action/bulk/BulkProcessor$Builder;
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    at com.quvideo.xiaoying.flink.elasticsearch.WordCountSinkElasticsearch.main(WordCountSinkElasticsearch.java:68)
Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.builder(Ljava/util/function/BiConsumer;Lorg/elasticsearch/action/bulk/BulkProcessor$Listener;)Lorg/elasticsearch/action/bulk/BulkProcessor$Builder;
    at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createBulkProcessorBuilder(Elasticsearch6ApiCallBridge.java:92)
    at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createBulkProcessorBuilder(Elasticsearch6ApiCallBridge.java:45)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.buildBulkProcessor(ElasticsearchSinkBase.java:353)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:297)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 1

我的elasticsearch集群是6.0.0,flink依赖项如下

<properties>
    <flink.version>1.6.0</flink.version>
    <elastic>6.0.0</elastic>
</properties>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
<dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>transport</artifactId>
        <version>${elastic}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-client</artifactId>
        <version>${elastic}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>${elastic}</version>
    </dependency>
    public static ElasticsearchSink<WordWithCount> getEsSink(){
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"));
        ElasticsearchSink.Builder<WordWithCount> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<WordWithCount>() {
                    public IndexRequest createIndexRequest(WordWithCount element) {
                        Map<String, Object> json = new HashMap<>();
                        json.put("word", element.word);
                        json.put("count", element.count);

                        return Requests.indexRequest()
                                .index("wordcount_idx")
                                .type("test_type")
                                .source(json);
                    }

                    @Override
                    public void process(WordWithCount element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );
        esSinkBuilder.setBulkFlushMaxActions(1);
        return esSinkBuilder.build();
    }

有关详细信息,此错误在ElasticSearch6ApicallBridge.java方法中触发

@Override
     public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) {
        return BulkProcessor.builder(client::bulkAsync, listener);// error message "client is not a functional interface."
     }

谢谢你

共有1个答案

轩辕实
2023-03-14

我提交了https://issues.apache.org/jira/browse/flink-10173。作为一种变通方法,您可以简单地添加:

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-high-level-client</artifactId>
  <version>6.3.1</version>
</dependency>
 类似资料:
  • 我在这里读了这个答案,也在这里读了这个答案,我正在努力找出最适合我的情况。 我在中启动一个服务,在这里我发出一个HTTP请求,并得到一个作为响应。然后我广播这个,并在我的活动中接收它。 问题是,用户显然可以通过打开抽屉并选择一个选项导航到另一个活动,而我可能会错过广播。 很明显,我可以让我的所有活动扩展一个抽象类,这个抽象类扩展了这里提到的,但我不能100%肯定这是最好的解决方案。如果用户在我收到

  • 问题内容: 对于我来说,目前尚不清楚,在这种情况下,我想使用值接收器而不是始终使用指针接收器。 回顾一下文档: 该 文档 还说:“对于基本类型,切片和小型结构之类的类型,值接收器非常便宜,因此,除非该方法的语义要求使用指针,否则值接收器是高效且清晰的。” 首先, 它说“非常便宜”,但问题是它比指针接收器便宜。因此,我做了一个小的基准测试(基于要点的代码),向我展示了,即使对于只有一个字符串字段的结

  • 我试图在Flink中编写一个需要两个阶段的计算。 在第一阶段,我创建一个Graph并获取它的顶点id: 在第二阶段,我想使用这些ID为每个顶点运行SingleSourceShortestPath。 它在本地工作(在IntelliJ IDE和命令行中使用),但当我使用其WebUI在Flink上提交作业时,程序只是执行直到方法并且不运行程序的剩余部分(用于语句和)。 问题是什么? 这是我的代码:

  • 我在Databricks上阅读下面的博客 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html 在解释spark-kafka集成如何使用WAl接收器的过程时,它说 1.Kafka数据由在火花工作线程/执行程序中运行的Kafka接收器持续接收。这使用了Kafka

  • 我设置了一个Kafka JDBC接收器以将事件发送到PostgreSQL。我编写了这个简单的生产者,它将带有模式(avro)数据的JSON发送到一个主题,如下所示: producer.py(kafka-python) 价值架构: 连接器配置(无主机、密码等) 但我的连接器出现严重故障,有三个错误,我无法找出其中任何一个错误的原因: TL;博士;日志版本 完整日志 有人能帮我理解这些错误和潜在的原因

  • 问题内容: 有人可以解释和之间的确切区别吗? 在什么情况下我们必须使用每个Receiver类? 问题答案: 和之间只有一个区别。 当您收到内部广播方法时, 假设, BroadcastReceiver : 它 不保证 该 CPU将保持清醒 ,如果你启动一些长时间运行的进程。CPU可能会立即回到睡眠状态。 WakefulBroadcastReceiver : 这是 保证 该 CPU将保持清醒 ,直到你